parallel: --load implemented and documented. How do to testsuite?

This commit is contained in:
Ole Tange 2010-12-04 14:14:28 +01:00
parent e907f723e7
commit 01f3a08b55

View file

@ -507,9 +507,11 @@ B<-L> instead.
Implies B<-X> unless B<-m> is set.
=item B<--load>=I<max-load> (unimplemented)
=item B<--load> I<max-load> (experimental)
Do not start new jobs unless the load is less than I<max-load>.
Do not start new jobs unless the load is less than I<max-load>. The
load average is only sampled every 10 seconds to avoid stressing small
machines.
=item B<--controlmaster> (experimental)
@ -3155,6 +3157,7 @@ sub drain_job_queue {
reap_if_needed();
}
my $last_header="";
do {
while($Global::total_running > 0) {
debug("jobs running: ",$Global::total_running," Memory usage:".my_memory_usage()."\n");
sleep 1;
@ -3170,6 +3173,12 @@ sub drain_job_queue {
reap_if_needed();
}
}
if(not $Global::JobQueue->empty()) {
start_more_jobs(); # These jobs may not be started because of loadavg
sleep 1;
}
} while (not $Global::JobQueue->empty());
if($::opt_progress) {
print $Global::original_stderr "\n";
}
@ -3370,11 +3379,11 @@ sub start_more_jobs {
}
for my $sshlogin (values %Global::host) {
debug("Running jobs on ".$sshlogin->string().": ".$sshlogin->jobs_running()."\n");
while ($sshlogin->jobs_running() < $sshlogin->max_jobs_running()
and
(($::opt_load and $sshlogin->loadavg() < $sshlogin->max_loadavg())
or
1)) {
if($::opt_load and $sshlogin->loadavg_too_high()) {
# The load is too high or unknown
next;
}
while ($sshlogin->jobs_running() < $sshlogin->max_jobs_running()) {
if($Global::JobQueue->empty()) {
last;
}
@ -3856,6 +3865,9 @@ sub new {
'serverlogin' => undef,
'control_path_dir' => undef,
'control_path' => undef,
'loadavg_file' => $ENV{'HOME'} . "/.parallel/tmp/" .
$$."-".$string,
'loadavg' => undef,
}, ref($class) || $class;
}
@ -3910,19 +3922,60 @@ sub set_max_jobs_running {
$self->{'max_jobs_running'} = shift;
}
sub loadavg {
sub loadavg_too_high {
my $self = shift;
# TODO add some caching so we do not compute this more than
# once per second
my $uptime = $self->sshcommand() . " " . $self->serverlogin() . " uptime";
my $loadavg;
my $loadavg = $self->loadavg();
return (not defined $loadavg or
$loadavg > $self->max_loadavg());
}
sub loadavg {
# If the currently know loadavg is too old:
# Recompute a new one in the background
# Returns:
# last load average computed
my $self = shift;
# Should we update the loadavg file?
my $update_loadavg_file = 0;
if(-r $self->{'loadavg_file'}) {
open(UPTIME,"<".$self->{'loadavg_file'}) || die;
my $uptime_out = <UPTIME>;
close UPTIME;
# load average: 0.76, 1.53, 1.45
if($uptime =~ /load average: (\d+.\d+)/) {
$loadavg = $1;
} else {
die "Cannot find loadaverage from ".$self->string();
if($uptime_out =~ /load average: (\d+.\d+)/) {
$self->{'loadavg'} = $1;
::debug("New loadavg: ".$self->{'loadavg'});
}
return $loadavg;
::debug("Last update: ".$self->{'last_loadavg_update'});
if(time - $self->{'last_loadavg_update'} > 10) {
# last loadavg was started 10 seconds ago
::debug("Older than 10 sec: ".$self->{'loadavg_file'});
$update_loadavg_file = 1;
}
} else {
::debug("NXfile: ".$self->{'loadavg_file'});
$self->{'loadavg'} = undef;
$update_loadavg_file = 1;
}
if($update_loadavg_file) {
::debug("Updating".$self->{'loadavg_file'});
$self->{'last_loadavg_update'} = time;
-e $ENV{'HOME'}."/.parallel" or mkdir $ENV{'HOME'}."/.parallel";
-e $ENV{'HOME'}."/.parallel/tmp" or mkdir $ENV{'HOME'}."/.parallel/tmp";
my $uptime;
if($self->{'string'} eq ":") {
$uptime = "uptime";
} else {
$uptime = $self->sshcommand() . " " . $self->serverlogin() . " uptime";
}
# Run uptime.
# As the command can take long to run if run remote
# save it to a tmp file before moving it to the correct file
my $file = $self->{'loadavg_file'};
my $tmpfile = $self->{'loadavg_file'}.$$;
qx{ ($uptime > $tmpfile; mv $tmpfile $file) & };
}
return $self->{'loadavg'};
}
sub max_loadavg {
@ -3931,6 +3984,7 @@ sub max_loadavg {
$self->{'max_loadavg'} =
$self->compute_max_loadavg($::opt_load);
}
::debug("max_loadavg: ".$self->string()." ".$self->{'max_loadavg'});
return $self->{'max_loadavg'};
}
@ -3956,7 +4010,7 @@ sub compute_max_loadavg {
my $j = $1;
$load =
$self->ncpus() * $j / 100;
} elsif ($loadspec =~ /^(\d+)$/) {
} elsif ($loadspec =~ /^(\d+(\.\d+)?)$/) {
$load = $1;
if($load == 0) {
# --load 0 = infinity (or at least close)
@ -4399,13 +4453,12 @@ sub sshcommand_of_sshlogin {
# sshcommand - defaults to 'ssh'
# login@host
my $self = shift;
my $sshlogin = $self->{'string'};
my ($sshcmd, $serverlogin);
if($::oodebug and not defined $sshlogin) {
if($::oodebug and not defined $self->{'string'}) {
Carp::confess("No sshlogin");
die;
}
if($sshlogin =~ /(.+) (\S+)$/) {
if($self->{'string'} =~ /(.+) (\S+)$/) {
# Own ssh command
$sshcmd = $1; $serverlogin = $2;
} else {
@ -4414,7 +4467,7 @@ sub sshcommand_of_sshlogin {
# Use control_path to make ssh faster
my $control_path = $self->control_path_dir()."/ssh-%r@%h:%p";
$sshcmd = "ssh -S ".$control_path;
$serverlogin = $sshlogin;
$serverlogin = $self->{'string'};
#my $master = "ssh -MTS ".control_path_dir()."/ssh-%r@%h:%p ".$serverlogin;
# my $master = "ssh -MTS ".$self->control_path_dir()."/ssh-%r@%h:%p ".$serverlogin." sleep 1";
my $master = "ssh -MTS $control_path $serverlogin sleep 1";
@ -4431,7 +4484,7 @@ sub sshcommand_of_sshlogin {
}
}
} else {
$sshcmd = "ssh"; $serverlogin = $sshlogin;
$sshcmd = "ssh"; $serverlogin = $self->{'string'};
}
}
$self->{'sshcommand'} = $sshcmd;
@ -4668,7 +4721,9 @@ sub sshlogin_wrap {
my $serverlogin = $sshlogin->serverlogin();
my ($next_command_line) = $self->replaced();
my ($pre,$post,$cleanup)=("","","");
if($serverlogin ne ":") {
if($serverlogin eq ":") {
$self->{'sshlogin_wrap'} = $next_command_line;
} else {
$Global::transfer_seq++;
# --transfer
$pre .= $self->sshtransfer();
@ -4690,8 +4745,6 @@ sub sshlogin_wrap {
$self->{'sshlogin_wrap'} = ($pre . "$sshcmd $serverlogin $parallel_env "
.::shell_quote_scalar($next_command_line).";".$post);
}
} else {
$self->{'sshlogin_wrap'} = $next_command_line;
}
}
return $self->{'sshlogin_wrap'};