diff --git a/src/parallel b/src/parallel index ac264e6d..554c80a1 100755 --- a/src/parallel +++ b/src/parallel @@ -66,11 +66,6 @@ my @fhlist; if(not @fhlist) { @fhlist = (*STDIN); } -if($::opt_skip_first_line) { - # Skip the first line for the first file handle - my $fh = $fhlist[0]; - <$fh>; -} if($::opt_header and not $::opt_pipe) { my $fh = $fhlist[0]; # split with colsep or \t @@ -89,6 +84,78 @@ if($::opt_header and not $::opt_pipe) { } } +# Parallel check for all hosts are up +if($::opt_filter_hosts) { + my @S = map { "-S " . ::shell_quote_scalar($_) } @::opt_sshlogin; + my @slf = map { "--slf " . ::shell_quote_scalar($_) } @::opt_sshloginfile; + my $cmd = "$0 --tag --joblog - -k --nonall @S @slf " . + "parallel --number-of-cores \\;". + "parallel --number-of-cpus \\;". + "parallel --max-line-length-allowed"; + ::debug($cmd."\n"); + open(HOST, "$cmd |") || ::die_bug("parallel host check: $cmd"); + my (%ncores, %ncpus, %time_to_login, %maxlen); + while() { + my @col = split /\t/, $_; + if(defined $col[6]) { + if($col[6] eq "255") { + # signal == 255: ssh failed + # Remove sshlogin + delete $Global::host{$col[1]}; + } elsif($col[6] eq "127") { + # signal == 127: parallel not installed remote + # Set ncpus and ncores = 1 + print $Global::original_stderr + ("parallel: Warning: Could not figure out ", + "number of cpus on $col[1]. Using 1\n"); + $ncores{$col[1]} = 1; + $ncpus{$col[1]} = 1; + $maxlen{$col[1]} = Limits::Command::max_length(); + } elsif($col[0] eq "1" and $Global::host{$col[1]}) { + # 1 server 1338156112.05 0.303 0 0 0 0 + # parallel --number-of-cores ; parallel --number-of-cpus + # Remember how log it took to log in + $time_to_login{$col[1]} = $col[3]; + } elsif($col[0] eq "Seq" and $col[1] eq "Host" and + $col[2] eq "Starttime" and $col[3] eq "Runtime") { + # skip + } else { + ::die_bug("host check unmatched long jobline : $_"); + } + } elsif($Global::host{$col[0]}) { + # ncores: server 8 + # ncpus: server 2 + # maxlen: server 131071 + if(not $ncores{$col[0]}) { + $ncores{$col[0]} = $col[1]; + } elsif(not $ncpus{$col[0]}) { + $ncpus{$col[0]} = $col[1]; + } elsif(not $maxlen{$col[0]}) { + $maxlen{$col[0]} = $col[1]; + } else { + ::die_bug("host check too many col0: $_"); + } + } else { + ::die_bug("host check unmatched short jobline: $_"); + } + } + close HOST; + while (my ($sshlogin, $obj) = each %Global::host) { + $ncpus{$sshlogin} or ::die_bug("ncpus missing: ".$obj->serverlogin()); + $ncores{$sshlogin} or ::die_bug("ncores missing: ".$obj->serverlogin()); + $time_to_login{$sshlogin} or ::die_bug("ncores missing: ".$obj->serverlogin()); + $maxlen{$sshlogin} or ::die_bug("maxlen missing: ".$obj->serverlogin()); + if($::opt_use_cpus_instead_of_cores) { + $obj->set_ncpus($ncpus{$sshlogin}); + } else { + $obj->set_ncpus($ncores{$sshlogin}); + } + $obj->set_time_to_login($time_to_login{$sshlogin}); + $obj->set_time_to_login($time_to_login{$sshlogin}); + $obj->set_maxlength($maxlen{$sshlogin}); + } +} + if($::opt_nonall or $::opt_onall) { # Copy all @fhlist into tempfiles my @argfiles = (); @@ -121,15 +188,15 @@ if($::opt_nonall or $::opt_onall) { ((defined $::opt_D) ? "-D" : ""), ((defined $::opt_timeout) ? "--timeout ".$::opt_timeout : ""), ); - ::debug("| parallel"); - open(PARALLEL,"| $0 $options") || + ::debug("| $0 $options\n"); + open(PARALLEL,"| $0 -j0 $options") || ::die_bug("This does not run GNU Parallel: $0 $options"); for my $sshlogin (values %Global::host) { print PARALLEL "$0 $suboptions -j1 ". ((defined $::opt_tag) ? - "--tagstring ".shell_quote_scalar($sshlogin->string()) : ""). - " -S ". shell_quote_scalar($sshlogin->string())." ". - shell_quote_scalar($command)." :::: @argfiles\n"; + "--tagstring ".shell_quote_scalar($sshlogin->string()) : ""). + " -S ". shell_quote_scalar($sshlogin->string())." ". + shell_quote_scalar($command)." :::: @argfiles\n"; } close PARALLEL; $Global::exitstatus = $? >> 8; @@ -401,6 +468,7 @@ sub options_hash { "tagstring=s" => \$::opt_tagstring, "onall" => \$::opt_onall, "nonall" => \$::opt_nonall, + "filter-hosts" => \$::opt_filter_hosts, "sshlogin|S=s" => \@::opt_sshlogin, "sshloginfile|slf=s" => \@::opt_sshloginfile, "controlmaster|M" => \$::opt_controlmaster, @@ -468,7 +536,6 @@ sub options_hash { # Shebang #!/usr/bin/parallel --shebang "shebang|hashbang" => \$::opt_shebang, "Y" => \$::opt_retired, - "skip-first-line" => \$::opt_skip_first_line, "header=s" => \$::opt_header, ); } @@ -499,7 +566,7 @@ sub get_options_from_array { sub parse_options { # Returns: N/A # Defaults: - $Global::version = 20120527; + $Global::version = 20120528; $Global::progname = 'parallel'; $Global::infinity = 2**31; $Global::debug = 0; @@ -2032,6 +2099,7 @@ sub new { 'serverlogin' => undef, 'control_path_dir' => undef, 'control_path' => undef, + 'time_to_login' => undef, 'loadavg_file' => $ENV{'HOME'} . "/.parallel/tmp/loadavg-" . $$."-".$string, 'loadavg' => undef, @@ -2285,6 +2353,16 @@ sub compute_max_loadavg { return $load; } +sub time_to_login { + my $self = shift; + return $self->{'time_to_login'}; +} + +sub set_time_to_login { + my $self = shift; + $self->{'time_to_login'} = shift; +} + sub max_jobs_running { my $self = shift; if(not defined $self->{'max_jobs_running'}) {