parallel: basic --filter-hosts working.

This commit is contained in:
Ole Tange 2012-05-28 02:50:03 +02:00
parent 832ce24df5
commit 541bde1838

View file

@ -66,11 +66,6 @@ my @fhlist;
if(not @fhlist) { if(not @fhlist) {
@fhlist = (*STDIN); @fhlist = (*STDIN);
} }
if($::opt_skip_first_line) {
# Skip the first line for the first file handle
my $fh = $fhlist[0];
<$fh>;
}
if($::opt_header and not $::opt_pipe) { if($::opt_header and not $::opt_pipe) {
my $fh = $fhlist[0]; my $fh = $fhlist[0];
# split with colsep or \t # split with colsep or \t
@ -89,6 +84,78 @@ if($::opt_header and not $::opt_pipe) {
} }
} }
# Parallel check for all hosts are up
if($::opt_filter_hosts) {
my @S = map { "-S " . ::shell_quote_scalar($_) } @::opt_sshlogin;
my @slf = map { "--slf " . ::shell_quote_scalar($_) } @::opt_sshloginfile;
my $cmd = "$0 --tag --joblog - -k --nonall @S @slf " .
"parallel --number-of-cores \\;".
"parallel --number-of-cpus \\;".
"parallel --max-line-length-allowed";
::debug($cmd."\n");
open(HOST, "$cmd |") || ::die_bug("parallel host check: $cmd");
my (%ncores, %ncpus, %time_to_login, %maxlen);
while(<HOST>) {
my @col = split /\t/, $_;
if(defined $col[6]) {
if($col[6] eq "255") {
# signal == 255: ssh failed
# Remove sshlogin
delete $Global::host{$col[1]};
} elsif($col[6] eq "127") {
# signal == 127: parallel not installed remote
# Set ncpus and ncores = 1
print $Global::original_stderr
("parallel: Warning: Could not figure out ",
"number of cpus on $col[1]. Using 1\n");
$ncores{$col[1]} = 1;
$ncpus{$col[1]} = 1;
$maxlen{$col[1]} = Limits::Command::max_length();
} elsif($col[0] eq "1" and $Global::host{$col[1]}) {
# 1 server 1338156112.05 0.303 0 0 0 0
# parallel --number-of-cores ; parallel --number-of-cpus
# Remember how log it took to log in
$time_to_login{$col[1]} = $col[3];
} elsif($col[0] eq "Seq" and $col[1] eq "Host" and
$col[2] eq "Starttime" and $col[3] eq "Runtime") {
# skip
} else {
::die_bug("host check unmatched long jobline : $_");
}
} elsif($Global::host{$col[0]}) {
# ncores: server 8
# ncpus: server 2
# maxlen: server 131071
if(not $ncores{$col[0]}) {
$ncores{$col[0]} = $col[1];
} elsif(not $ncpus{$col[0]}) {
$ncpus{$col[0]} = $col[1];
} elsif(not $maxlen{$col[0]}) {
$maxlen{$col[0]} = $col[1];
} else {
::die_bug("host check too many col0: $_");
}
} else {
::die_bug("host check unmatched short jobline: $_");
}
}
close HOST;
while (my ($sshlogin, $obj) = each %Global::host) {
$ncpus{$sshlogin} or ::die_bug("ncpus missing: ".$obj->serverlogin());
$ncores{$sshlogin} or ::die_bug("ncores missing: ".$obj->serverlogin());
$time_to_login{$sshlogin} or ::die_bug("ncores missing: ".$obj->serverlogin());
$maxlen{$sshlogin} or ::die_bug("maxlen missing: ".$obj->serverlogin());
if($::opt_use_cpus_instead_of_cores) {
$obj->set_ncpus($ncpus{$sshlogin});
} else {
$obj->set_ncpus($ncores{$sshlogin});
}
$obj->set_time_to_login($time_to_login{$sshlogin});
$obj->set_time_to_login($time_to_login{$sshlogin});
$obj->set_maxlength($maxlen{$sshlogin});
}
}
if($::opt_nonall or $::opt_onall) { if($::opt_nonall or $::opt_onall) {
# Copy all @fhlist into tempfiles # Copy all @fhlist into tempfiles
my @argfiles = (); my @argfiles = ();
@ -121,15 +188,15 @@ if($::opt_nonall or $::opt_onall) {
((defined $::opt_D) ? "-D" : ""), ((defined $::opt_D) ? "-D" : ""),
((defined $::opt_timeout) ? "--timeout ".$::opt_timeout : ""), ((defined $::opt_timeout) ? "--timeout ".$::opt_timeout : ""),
); );
::debug("| parallel"); ::debug("| $0 $options\n");
open(PARALLEL,"| $0 $options") || open(PARALLEL,"| $0 -j0 $options") ||
::die_bug("This does not run GNU Parallel: $0 $options"); ::die_bug("This does not run GNU Parallel: $0 $options");
for my $sshlogin (values %Global::host) { for my $sshlogin (values %Global::host) {
print PARALLEL "$0 $suboptions -j1 ". print PARALLEL "$0 $suboptions -j1 ".
((defined $::opt_tag) ? ((defined $::opt_tag) ?
"--tagstring ".shell_quote_scalar($sshlogin->string()) : ""). "--tagstring ".shell_quote_scalar($sshlogin->string()) : "").
" -S ". shell_quote_scalar($sshlogin->string())." ". " -S ". shell_quote_scalar($sshlogin->string())." ".
shell_quote_scalar($command)." :::: @argfiles\n"; shell_quote_scalar($command)." :::: @argfiles\n";
} }
close PARALLEL; close PARALLEL;
$Global::exitstatus = $? >> 8; $Global::exitstatus = $? >> 8;
@ -401,6 +468,7 @@ sub options_hash {
"tagstring=s" => \$::opt_tagstring, "tagstring=s" => \$::opt_tagstring,
"onall" => \$::opt_onall, "onall" => \$::opt_onall,
"nonall" => \$::opt_nonall, "nonall" => \$::opt_nonall,
"filter-hosts" => \$::opt_filter_hosts,
"sshlogin|S=s" => \@::opt_sshlogin, "sshlogin|S=s" => \@::opt_sshlogin,
"sshloginfile|slf=s" => \@::opt_sshloginfile, "sshloginfile|slf=s" => \@::opt_sshloginfile,
"controlmaster|M" => \$::opt_controlmaster, "controlmaster|M" => \$::opt_controlmaster,
@ -468,7 +536,6 @@ sub options_hash {
# Shebang #!/usr/bin/parallel --shebang # Shebang #!/usr/bin/parallel --shebang
"shebang|hashbang" => \$::opt_shebang, "shebang|hashbang" => \$::opt_shebang,
"Y" => \$::opt_retired, "Y" => \$::opt_retired,
"skip-first-line" => \$::opt_skip_first_line,
"header=s" => \$::opt_header, "header=s" => \$::opt_header,
); );
} }
@ -499,7 +566,7 @@ sub get_options_from_array {
sub parse_options { sub parse_options {
# Returns: N/A # Returns: N/A
# Defaults: # Defaults:
$Global::version = 20120527; $Global::version = 20120528;
$Global::progname = 'parallel'; $Global::progname = 'parallel';
$Global::infinity = 2**31; $Global::infinity = 2**31;
$Global::debug = 0; $Global::debug = 0;
@ -2032,6 +2099,7 @@ sub new {
'serverlogin' => undef, 'serverlogin' => undef,
'control_path_dir' => undef, 'control_path_dir' => undef,
'control_path' => undef, 'control_path' => undef,
'time_to_login' => undef,
'loadavg_file' => $ENV{'HOME'} . "/.parallel/tmp/loadavg-" . 'loadavg_file' => $ENV{'HOME'} . "/.parallel/tmp/loadavg-" .
$$."-".$string, $$."-".$string,
'loadavg' => undef, 'loadavg' => undef,
@ -2285,6 +2353,16 @@ sub compute_max_loadavg {
return $load; return $load;
} }
sub time_to_login {
my $self = shift;
return $self->{'time_to_login'};
}
sub set_time_to_login {
my $self = shift;
$self->{'time_to_login'} = shift;
}
sub max_jobs_running { sub max_jobs_running {
my $self = shift; my $self = shift;
if(not defined $self->{'max_jobs_running'}) { if(not defined $self->{'max_jobs_running'}) {