parallel: slow down ssh to do more logins than MaxStartup. Passes unittest.

This commit is contained in:
Ole Tange 2012-09-09 19:16:48 +02:00
parent 0b5a90c11f
commit cdb8944aad
2 changed files with 73 additions and 20 deletions

View file

@ -1,3 +1,9 @@
Aliases for bash -c:
bash -c 'shopt -s expand_aliases; alias llll="ls -l"; alias bbb=ls;
bbb; llll'
Luk filen ved EOF - lad være med bare at læse videre. Luk filen ved EOF - lad være med bare at læse videre.
> /tmp/ged; tail -f /tmp/ged| xargs -n1 -E eof & sleep 1; echo echo a >>/tmp/ged; echo eof >>/tmp/ged; seq 4 >>/tmp/ged; wait > /tmp/ged; tail -f /tmp/ged| xargs -n1 -E eof & sleep 1; echo echo a >>/tmp/ged; echo eof >>/tmp/ged; seq 4 >>/tmp/ged; wait

View file

@ -90,19 +90,23 @@ if($::opt_header and not $::opt_pipe) {
} }
# Parallel check for all hosts are up # Parallel check for all hosts are up
if($::opt_filter_hosts) { #if(not $::opt_plain and (@::opt_sshlogin or @::opt_sshloginfile)) {
if($::opt_filter_hosts and (@::opt_sshlogin or @::opt_sshloginfile)) {
my @S = map { "-S " . ::shell_quote_scalar($_) } @::opt_sshlogin; my @S = map { "-S " . ::shell_quote_scalar($_) } @::opt_sshlogin;
my @slf = map { "--slf " . ::shell_quote_scalar($_) } @::opt_sshloginfile; my @slf = map { "--slf " . ::shell_quote_scalar($_) } @::opt_sshloginfile;
my $cmd = "$0 --tag --joblog - -k --nonall @S @slf " . my $cmd = "$0 --plain --tag --joblog - -k --onall @S @slf " .
"parallel --number-of-cores \\;". "::: ".
"parallel --number-of-cpus \\;". "'parallel --number-of-cores ' ".
"parallel --max-line-length-allowed"; "'parallel --number-of-cpus' ".
"'parallel --max-line-length-allowed' ".
"'true' ";
::debug($cmd."\n"); ::debug($cmd."\n");
open(HOST, "$cmd |") || ::die_bug("parallel host check: $cmd"); open(HOST, "$cmd |") || ::die_bug("parallel host check: $cmd");
my (%ncores, %ncpus, %time_to_login, %maxlen); my (%ncores, %ncpus, %time_to_login, %maxlen);
while(<HOST>) { while(<HOST>) {
my @col = split /\t/, $_; my @col = split /\t/, $_;
if(defined $col[6]) { if(defined $col[6]) {
# This is a line from --joblog
if($col[6] eq "255") { if($col[6] eq "255") {
# signal == 255: ssh failed # signal == 255: ssh failed
# Remove sshlogin # Remove sshlogin
@ -115,18 +119,20 @@ if($::opt_filter_hosts) {
$ncores{$col[1]} = 1; $ncores{$col[1]} = 1;
$ncpus{$col[1]} = 1; $ncpus{$col[1]} = 1;
$maxlen{$col[1]} = Limits::Command::max_length(); $maxlen{$col[1]} = Limits::Command::max_length();
} elsif($col[0] eq "1" and $Global::host{$col[1]}) { } elsif($col[0] =~ /^\d+$/ and $Global::host{$col[1]}) {
# 1 server 1338156112.05 0.303 0 0 0 0 # 1 server 1338156112.05 0.303 0 0 0 0
# parallel --number-of-cores ; parallel --number-of-cpus # parallel --number-of-cores ; parallel --number-of-cpus
# Remember how log it took to log in # Remember how log it took to log in
$time_to_login{$col[1]} = $col[3]; $time_to_login{$col[1]} = ::min($time_to_login{$col[1]},$col[3]);
} elsif($col[0] eq "Seq" and $col[1] eq "Host" and } elsif($col[0] eq "Seq" and $col[1] eq "Host" and
$col[2] eq "Starttime" and $col[3] eq "Runtime") { $col[2] eq "Starttime" and $col[3] eq "Runtime") {
# skip # skip
} else { } else {
::die_bug("host check unmatched long jobline : $_"); ::die_bug("host check unmatched long jobline: $_");
} }
} elsif($Global::host{$col[0]}) { } elsif($Global::host{$col[0]}) {
# This output from --number-of-cores, --number-of-cpus,
# --max-line-length-allowed
# ncores: server 8 # ncores: server 8
# ncpus: server 2 # ncpus: server 2
# maxlen: server 131071 # maxlen: server 131071
@ -147,7 +153,7 @@ if($::opt_filter_hosts) {
while (my ($sshlogin, $obj) = each %Global::host) { while (my ($sshlogin, $obj) = each %Global::host) {
$ncpus{$sshlogin} or ::die_bug("ncpus missing: ".$obj->serverlogin()); $ncpus{$sshlogin} or ::die_bug("ncpus missing: ".$obj->serverlogin());
$ncores{$sshlogin} or ::die_bug("ncores missing: ".$obj->serverlogin()); $ncores{$sshlogin} or ::die_bug("ncores missing: ".$obj->serverlogin());
$time_to_login{$sshlogin} or ::die_bug("ncores missing: ".$obj->serverlogin()); $time_to_login{$sshlogin} or ::die_bug("time_to_login missing: ".$obj->serverlogin());
$maxlen{$sshlogin} or ::die_bug("maxlen missing: ".$obj->serverlogin()); $maxlen{$sshlogin} or ::die_bug("maxlen missing: ".$obj->serverlogin());
if($::opt_use_cpus_instead_of_cores) { if($::opt_use_cpus_instead_of_cores) {
$obj->set_ncpus($ncpus{$sshlogin}); $obj->set_ncpus($ncpus{$sshlogin});
@ -155,8 +161,9 @@ if($::opt_filter_hosts) {
$obj->set_ncpus($ncores{$sshlogin}); $obj->set_ncpus($ncores{$sshlogin});
} }
$obj->set_time_to_login($time_to_login{$sshlogin}); $obj->set_time_to_login($time_to_login{$sshlogin});
$obj->set_time_to_login($time_to_login{$sshlogin});
$obj->set_maxlength($maxlen{$sshlogin}); $obj->set_maxlength($maxlen{$sshlogin});
::debug("Timing from -S:$sshlogin ncpus:$ncpus{$sshlogin} ncores:$ncores{$sshlogin} ",
"time_to_login:$time_to_login{$sshlogin} maxlen:$maxlen{$sshlogin}");
} }
} }
@ -181,6 +188,7 @@ if($::opt_nonall or $::opt_onall) {
((defined $::opt_u) ? "-u" : ""), ((defined $::opt_u) ? "-u" : ""),
((defined $::opt_group) ? "-g" : ""), ((defined $::opt_group) ? "-g" : ""),
((defined $::opt_D) ? "-D" : ""), ((defined $::opt_D) ? "-D" : ""),
((defined $::opt_plain) ? "--plain" : ""),
); );
my $suboptions = my $suboptions =
join(" ", join(" ",
@ -191,6 +199,7 @@ if($::opt_nonall or $::opt_onall) {
((@::opt_v) ? "-vv" : ""), ((@::opt_v) ? "-vv" : ""),
((defined $::opt_D) ? "-D" : ""), ((defined $::opt_D) ? "-D" : ""),
((defined $::opt_timeout) ? "--timeout ".$::opt_timeout : ""), ((defined $::opt_timeout) ? "--timeout ".$::opt_timeout : ""),
((defined $::opt_plain) ? "--plain" : ""),
); );
::debug("| $0 $options\n"); ::debug("| $0 $options\n");
open(PARALLEL,"| $0 -j0 $options") || open(PARALLEL,"| $0 -j0 $options") ||
@ -387,7 +396,9 @@ sub write_record_to_pipe {
while(not @Global::virgin_jobs) { while(not @Global::virgin_jobs) {
::debug("No virgin jobs"); ::debug("No virgin jobs");
$sleep = ::reap_usleep($sleep); $sleep = ::reap_usleep($sleep);
start_more_jobs(); # These jobs may not be started because of loadavg # Jobs may not be started because of loadavg
# or too little time between each ssh login.
start_more_jobs();
} }
my $job = shift @Global::virgin_jobs; my $job = shift @Global::virgin_jobs;
if(fork()) { if(fork()) {
@ -1165,6 +1176,9 @@ sub start_more_jobs {
# The server is swapping # The server is swapping
next; next;
} }
if($sshlogin->too_fast_remote_login()) {
next;
}
while ($sshlogin->jobs_running() < $sshlogin->max_jobs_running()) { while ($sshlogin->jobs_running() < $sshlogin->max_jobs_running()) {
if($Global::JobQueue->empty() and not $::opt_pipe) { if($Global::JobQueue->empty() and not $::opt_pipe) {
last; last;
@ -1179,6 +1193,7 @@ sub start_more_jobs {
} }
debug("Job started on ".$sshlogin->string()."\n"); debug("Job started on ".$sshlogin->string()."\n");
$sshlogin->inc_jobs_running(); $sshlogin->inc_jobs_running();
$sshlogin->set_last_login_at(::hires_time());
$jobs_started++; $jobs_started++;
} }
debug("Running jobs after on ".$sshlogin->string().": ".$sshlogin->jobs_running() debug("Running jobs after on ".$sshlogin->string().": ".$sshlogin->jobs_running()
@ -1252,9 +1267,8 @@ sub drain_job_queue {
my $sleep = 0.2; my $sleep = 0.2;
do { do {
while($Global::total_running > 0) { while($Global::total_running > 0) {
debug("jobs running: ", $Global::total_running, "==", scalar debug($Global::total_running, "==", scalar
keys %Global::running," slots: ", $Global::max_jobs_running, keys %Global::running," slots: ", $Global::max_jobs_running);
" Memory usage:".my_memory_usage()." ");
if($::opt_pipe) { if($::opt_pipe) {
# When using --pipe sometimes file handles are not closed properly # When using --pipe sometimes file handles are not closed properly
for my $job (values %Global::running) { for my $job (values %Global::running) {
@ -1270,11 +1284,19 @@ sub drain_job_queue {
} }
print $Global::original_stderr "\r",$progress{'status'}; print $Global::original_stderr "\r",$progress{'status'};
} }
if($Global::total_running < $Global::max_jobs_running
and not $Global::JobQueue->empty()) {
# These jobs may not be started because of loadavg
# or too little time between each ssh login.
start_more_jobs();
}
# Sometimes SIGCHLD is not registered, so force reaper # Sometimes SIGCHLD is not registered, so force reaper
$sleep = ::reap_usleep($sleep); $sleep = ::reap_usleep($sleep);
} }
if(not $Global::JobQueue->empty()) { if(not $Global::JobQueue->empty()) {
start_more_jobs(); # These jobs may not be started because of loadavg # These jobs may not be started because of loadavg
# or too little time between each ssh login.
start_more_jobs();
$sleep = ::reap_usleep($sleep); $sleep = ::reap_usleep($sleep);
} }
} while ($Global::total_running > 0 } while ($Global::total_running > 0
@ -1538,8 +1560,8 @@ sub __REMOTE_SSH__ {}
sub read_sshloginfiles { sub read_sshloginfiles {
# Returns: N/A # Returns: N/A
for (@_) { for my $s (@_) {
read_sshloginfile($_); read_sshloginfile($s);
} }
} }
@ -1687,7 +1709,7 @@ sub reaper {
# Returns: N/A # Returns: N/A
my $stiff; my $stiff;
my $children_reaped = 0; my $children_reaped = 0;
debug("Reaper called "); debug("Reaper ");
while (($stiff = waitpid(-1, &WNOHANG)) > 0) { while (($stiff = waitpid(-1, &WNOHANG)) > 0) {
$children_reaped++; $children_reaped++;
if($Global::sshmaster{$stiff}) { if($Global::sshmaster{$stiff}) {
@ -1754,7 +1776,7 @@ sub reaper {
delete $Global::running{$stiff}; delete $Global::running{$stiff};
start_more_jobs(); start_more_jobs();
} }
debug("Reaper exit\n"); debug("done ");
return $children_reaped; return $children_reaped;
} }
@ -1987,7 +2009,7 @@ sub reap_usleep {
sub usleep { sub usleep {
# Sleep this many milliseconds. # Sleep this many milliseconds.
my $secs = shift; my $secs = shift;
::debug("Sleeping ",$secs," millisecs\n"); ::debug(int($secs),"ms ");
select(undef, undef, undef, $secs/1000); select(undef, undef, undef, $secs/1000);
if($::opt_timeout) { if($::opt_timeout) {
::debug(my_dump($Global::timeoutq)); ::debug(my_dump($Global::timeoutq));
@ -2144,6 +2166,7 @@ sub new {
'control_path_dir' => undef, 'control_path_dir' => undef,
'control_path' => undef, 'control_path' => undef,
'time_to_login' => undef, 'time_to_login' => undef,
'last_login_at' => undef,
'loadavg_file' => $ENV{'HOME'} . "/.parallel/tmp/loadavg-" . 'loadavg_file' => $ENV{'HOME'} . "/.parallel/tmp/loadavg-" .
$$."-".$no_slash_string, $$."-".$no_slash_string,
'loadavg' => undef, 'loadavg' => undef,
@ -2277,6 +2300,30 @@ sub swap_activity {
return $self->{'swap_activity'}; return $self->{'swap_activity'};
} }
sub too_fast_remote_login {
my $self = shift;
if($self->{'last_login_at'} and $self->{'time_to_login'}) {
# If now <= last_login + wait time: Then it is too soon.
my $too_fast = (::hires_time() <= $self->{'last_login_at'}
+ $self->{'time_to_login'});
::debug("Too fast? $too_fast\n");
return $too_fast;
} else {
# No logins so far (or time_to_login not computed): it is not too fast
return 0;
}
}
sub last_login_at {
my $self = shift;
return $self->{'last_login_at'};
}
sub set_last_login_at {
my $self = shift;
$self->{'last_login_at'} = shift;
}
sub loadavg_too_high { sub loadavg_too_high {
my $self = shift; my $self = shift;
my $loadavg = $self->loadavg(); my $loadavg = $self->loadavg();