parallel: --timeout 200% implemented. Test missing.

This commit is contained in:
Ole Tange 2013-04-29 00:43:43 +02:00
parent 780f50c654
commit eac8d05e3a
3 changed files with 80 additions and 71 deletions

View file

@ -538,7 +538,7 @@ sub options_hash {
"use-cpus-instead-of-cores" => \$opt::use_cpus_instead_of_cores, "use-cpus-instead-of-cores" => \$opt::use_cpus_instead_of_cores,
"shellquote|shell_quote|shell-quote" => \$opt::shellquote, "shellquote|shell_quote|shell-quote" => \$opt::shellquote,
"nice=i" => \$opt::nice, "nice=i" => \$opt::nice,
"timeout=i" => \$opt::timeout, "timeout=s" => \$opt::timeout,
"tag" => \$opt::tag, "tag" => \$opt::tag,
"tagstring=s" => \$opt::tagstring, "tagstring=s" => \$opt::tagstring,
"onall" => \$opt::onall, "onall" => \$opt::onall,
@ -646,7 +646,7 @@ sub get_options_from_array {
sub parse_options { sub parse_options {
# Returns: N/A # Returns: N/A
# Defaults: # Defaults:
$Global::version = 20130422; $Global::version = 20130429;
$Global::progname = 'parallel'; $Global::progname = 'parallel';
$Global::infinity = 2**31; $Global::infinity = 2**31;
$Global::debug = 0; $Global::debug = 0;
@ -744,6 +744,9 @@ sub parse_options {
if(defined $opt::fg) { $Global::semaphore = 1; } if(defined $opt::fg) { $Global::semaphore = 1; }
if(defined $opt::bg) { $Global::semaphore = 1; } if(defined $opt::bg) { $Global::semaphore = 1; }
if(defined $opt::wait) { $Global::semaphore = 1; } if(defined $opt::wait) { $Global::semaphore = 1; }
if(defined $opt::timeout and $opt::timeout !~ /^\d+%?$/) {
::error("--timeout must be seconds or percentage\n");
}
if(defined $opt::minversion) { if(defined $opt::minversion) {
print $Global::version,"\n"; print $Global::version,"\n";
if($Global::version < $opt::minversion) { if($Global::version < $opt::minversion) {
@ -1328,7 +1331,7 @@ sub start_more_jobs {
} }
debug("Job started on ".$sshlogin->string()."\n"); debug("Job started on ".$sshlogin->string()."\n");
$sshlogin->inc_jobs_running(); $sshlogin->inc_jobs_running();
$sshlogin->set_last_login_at(::hires_time()); $sshlogin->set_last_login_at(::now());
$jobs_started++; $jobs_started++;
} }
debug("Running jobs after on ".$sshlogin->string().": ".$sshlogin->jobs_running() debug("Running jobs after on ".$sshlogin->string().": ".$sshlogin->jobs_running()
@ -1863,6 +1866,10 @@ sub reaper {
} }
if(not $job->should_be_retried()) { if(not $job->should_be_retried()) {
if($opt::timeout) {
# Update average runtime for timeout
$Global::timeoutq->update_delta_time($job->runtime());
}
# Force printing now if the job failed and we are going to exit # Force printing now if the job failed and we are going to exit
my $print_now = ($job->exitstatus() and my $print_now = ($job->exitstatus() and
$opt::halt_on_error and $opt::halt_on_error == 2); $opt::halt_on_error and $opt::halt_on_error == 2);
@ -1917,21 +1924,6 @@ sub reaper {
return $children_reaped; return $children_reaped;
} }
sub timeout {
# SIGALRM was received. Check if there was a timeout
# @Global::timeout is sorted by timeout
while (@Global::timeouts) {
my $t = $Global::timeouts[0];
if($t->timed_out()) {
$t->kill();
shift @Global::timeouts;
} else {
# Because they are sorted by timeout
last;
}
}
}
sub __USAGE__ {} sub __USAGE__ {}
@ -2149,13 +2141,12 @@ sub usleep {
::debug(int($secs),"ms "); ::debug(int($secs),"ms ");
select(undef, undef, undef, $secs/1000); select(undef, undef, undef, $secs/1000);
if($opt::timeout) { if($opt::timeout) {
::debug(my_dump($Global::timeoutq));
$Global::timeoutq->process_timeouts(); $Global::timeoutq->process_timeouts();
} }
} }
sub hires_time { sub now {
# Returns time since epoch as float # Returns time since epoch as in seconds with 3 decimals
if(not $Global::use{"Time::HiRes"}) { if(not $Global::use{"Time::HiRes"}) {
if(eval "use Time::HiRes qw ( time );") { if(eval "use Time::HiRes qw ( time );") {
@ -2166,7 +2157,7 @@ sub hires_time {
$Global::use{"Time::HiRes"} = 1; $Global::use{"Time::HiRes"} = 1;
} }
return TimeHiRestime(); return (int(TimeHiRestime()*1000))/1000;
} }
sub multiply_binary_prefix { sub multiply_binary_prefix {
@ -2441,7 +2432,7 @@ sub too_fast_remote_login {
my $self = shift; my $self = shift;
if($self->{'last_login_at'} and $self->{'time_to_login'}) { if($self->{'last_login_at'} and $self->{'time_to_login'}) {
# If now <= last_login + wait time: Then it is too soon. # If now <= last_login + wait time: Then it is too soon.
my $too_fast = (::hires_time() <= $self->{'last_login_at'} my $too_fast = (::now() <= $self->{'last_login_at'}
+ $self->{'time_to_login'}); + $self->{'time_to_login'});
::debug("Too fast? $too_fast\n"); ::debug("Too fast? $too_fast\n");
return $too_fast; return $too_fast;
@ -3601,47 +3592,43 @@ sub set_pid {
} }
sub starttime { sub starttime {
# Returns:
# UNIX-timestamp this job started
my $self = shift; my $self = shift;
return ((int(($self->{'starttime'})*1000))/1000); return $self->{'starttime'};
} }
sub set_starttime { sub set_starttime {
my $self = shift; my $self = shift;
my $starttime = shift || ::hires_time(); my $starttime = shift || ::now();
$self->{'starttime'} = $starttime; $self->{'starttime'} = $starttime;
} }
sub runtime { sub runtime {
# Returns:
# Run time in seconds
my $self = shift; my $self = shift;
return ((int(($self->{'endtime'}-$self->{'starttime'})*1000))/1000); return $self->endtime() - $self->starttime();
} }
sub endtime { sub endtime {
# Returns:
# UNIX-timestamp this job ended
# now if not ended yet
my $self = shift; my $self = shift;
return $self->{'endtime'}; return ($self->{'endtime'} || ::now());
} }
sub set_endtime { sub set_endtime {
my $self = shift; my $self = shift;
my $endtime = shift || ::hires_time(); my $endtime = shift || ::now();
$self->{'endtime'} = $endtime; $self->{'endtime'} = $endtime;
} }
sub set_timeout {
my $self = shift;
my $delta_time = shift;
$self->{'timeout'} = time + $delta_time;
}
sub timeout {
my $self = shift;
return $self->{'timeout'};
}
sub timedout { sub timedout {
my $self = shift; my $self = shift;
return time > $self->{'timeout'}; my $delta_time = shift;
return time > $self->{'starttime'} + $delta_time;
} }
sub kill { sub kill {
@ -4189,8 +4176,6 @@ sub start {
$job->set_pid($pid); $job->set_pid($pid);
$job->set_starttime(); $job->set_starttime();
if($opt::timeout) { if($opt::timeout) {
# Timeout must be set before inserting into queue
$job->set_timeout($opt::timeout);
$Global::timeoutq->insert($job); $Global::timeoutq->insert($job);
} }
if($opt::delay) { if($opt::delay) {
@ -5574,26 +5559,59 @@ package TimeoutQueue;
sub new { sub new {
my $class = shift; my $class = shift;
my $delta_time = shift; my $delta_time = shift;
my ($pct,$avg_damper);
if($delta_time =~ /(\d+)%/) {
# Timeout in percent
$pct = $1/100;
$delta_time = 1_000_000;
$avg_damper = (1-0.001)/0.9;
}
return bless { return bless {
'queue' => [], 'queue' => [],
'delta_time' => $delta_time, 'delta_time' => $delta_time,
'pct' => $pct,
'avg_damper' => $avg_damper,
}, ref($class) || $class; }, ref($class) || $class;
} }
sub delta_time {
my $self = shift;
return $self->{'delta_time'};
}
sub set_delta_time {
my $self = shift;
$self->{'delta_time'} = shift;
}
sub update_delta_time {
# Update delta_time based on runtime of finished job if timeout is
# a percentage
my $self = shift;
my $runtime = shift;
if($self->{'pct'}) {
# Converge to the average fast in the start, and slower later
$self->{'avg_damper'} = $self->{'avg_damper'} * 0.9 + 0.001;
$self->{'delta_time'} =
$self->{'pct'} * $runtime * $self->{'avg_damper'} +
(1 - $self->{'avg_damper'}) * $self->{'delta_time'};
::debug("Timeout: $self->{'delta_time'}s ");
}
}
sub process_timeouts { sub process_timeouts {
# Check if there was a timeout # Check if there was a timeout
my $self = shift; my $self = shift;
# @Global::timeout is sorted by timeout # $self->{'queue'} is sorted by start time
while (@{$self->{'queue'}}) { while (@{$self->{'queue'}}) {
my $job = $self->{'queue'}[0]; my $job = $self->{'queue'}[0];
if($job->timedout()) { if($job->timedout($self->{'delta_time'})) {
# Need to shift off queue before kill # Need to shift off queue before kill
# because kill calls usleep -> process_timeouts # because kill calls usleep that calls process_timeouts
shift @{$self->{'queue'}}; shift @{$self->{'queue'}};
$job->kill(); $job->kill();
} else { } else {
# Because they are sorted by timeout # Because they are sorted by start time the rest are later
last; last;
} }
} }
@ -5602,24 +5620,7 @@ sub process_timeouts {
sub insert { sub insert {
my $self = shift; my $self = shift;
my $in = shift; my $in = shift;
my $lower = 0; push @{$self->{'queue'}}, $in;
my $upper = $#{$self->{'queue'}};
my $looking = int(($lower + $upper)/2);
my $in_time = $in->timeout();
# Find the position between $lower and $upper
while($lower < $upper) {
if($self->{'queue'}[$looking]->timeout() < $in_time) {
# Upper half
$lower = $looking+1;
} else {
# Lower half
$upper = $looking;
}
$looking = int(($lower + $upper)/2);
}
# splice at position $looking
splice @{$self->{'queue'}}, $looking, 0, $in;
} }

View file

@ -1370,12 +1370,16 @@ different dir for the files. Setting B<--tmpdir> is equivalent to
setting $TMPDIR. setting $TMPDIR.
=item B<--timeout> I<sec> =item B<--timeout> I<val> (new alpha testing)
Time out for command. If the command runs for longer than I<sec> Time out for command. If the command runs for longer than I<val>
seconds it will get killed with SIGTERM, followed by SIGTERM 200 ms seconds it will get killed with SIGTERM, followed by SIGTERM 200 ms
later, followed by SIGKILL 200 ms later. later, followed by SIGKILL 200 ms later.
If I<val> is followed by a % then the timeout will dynamically be
computed as a percentage of the smoothed average runtime. Only values
> 100% will make sense.
=item B<--tollef> (obsolete - will be retired 20140222) =item B<--tollef> (obsolete - will be retired 20140222)

View file

@ -1473,13 +1473,17 @@ into temporary files in /tmp. By setting @strong{--tmpdir} you can use a
different dir for the files. Setting @strong{--tmpdir} is equivalent to different dir for the files. Setting @strong{--tmpdir} is equivalent to
setting $TMPDIR. setting $TMPDIR.
@item @strong{--timeout} @emph{sec} @item @strong{--timeout} @emph{val} (new alpha testing)
@anchor{@strong{--timeout} @emph{sec}} @anchor{@strong{--timeout} @emph{val} (new alpha testing)}
Time out for command. If the command runs for longer than @emph{sec} Time out for command. If the command runs for longer than @emph{val}
seconds it will get killed with SIGTERM, followed by SIGTERM 200 ms seconds it will get killed with SIGTERM, followed by SIGTERM 200 ms
later, followed by SIGKILL 200 ms later. later, followed by SIGKILL 200 ms later.
If @emph{val} is followed by a % then the timeout will dynamically be
computed as a percentage of the smoothed average runtime. Only values
> 100% will make sense.
@item @strong{--tollef} (obsolete - will be retired 20140222) @item @strong{--tollef} (obsolete - will be retired 20140222)
@anchor{@strong{--tollef} (obsolete - will be retired 20140222)} @anchor{@strong{--tollef} (obsolete - will be retired 20140222)}