parallel: --retries implemented. Passes unittest.

This commit is contained in:
Ole Tange 2010-09-21 09:34:53 +02:00
parent 8703e6b5c5
commit 2e572c00c9
2 changed files with 170 additions and 43 deletions

View file

@ -1,3 +1,27 @@
== Retry ==
--retries n
If a job fails, retry it on another computer. Do this n times. If there are fewer than n computers in --sshlogin GNU parallel will re-use the computers. This is useful if some jobs fail for no apparent reason (such as network failure).
When the command dies and fails:
Update $Global::failed_command{$test_failed}
If max n has been reached for all: fail
What if all jobs in the queue failed for this sshlogin so only undef is left for this sshlogin?
BUG: Rækkeflg er forkert.
(seq 0 1) | parallel -kj100% -S 1/:,2/parallel@server2 -vq perl -e 'sleep 1;print "job{}\n";exit({})'
OK: (seq 0 1) | parallel -kj2 -S 1/:,2/parallel@server2 -vq perl -e 'sleep 1;print "job{}\n";exit({})'
Fejlen eksisterer også i 20100906 og 20100822
perl -e sleep\ 1\;print\ \"job0\\n\"\;exit\(0\)ARRAY(0x965e928)perl -e sleep\ 1\;print\ \"job1\\n\"\;exit\(1\)ARRAY(0x9727bc0)perl -e sleep\ 1\;print\ \"job2\\n\"\;exit\(2\)ARRAY(0x9727a70)Limited to procs: 3
Wanted procs: 1
Time to fork ten procs: (processes so far: 1)
perl -e sleep\ 1\;print\ \"job1\\n\"\;exit\(1\)ARRAY(0x9727bc0)perl -e sleep\ 1\;print\ \"job2\\n\"\;exit\(2\)ARRAY(0x9727a70)perl -e sleep\ 1\;print\ \"job0\\n\"\;exit\(0\)ARRAY(0x965e928)Limited to procs: 1
Running jobs on parallel@server2: 0
== FSCONS ==
Annoncer foredrag til FSCONS Annoncer foredrag til FSCONS
I will be at FSCONS 2010-11-07 talking about GNU Parallel. I will be at FSCONS 2010-11-07 talking about GNU Parallel.

View file

@ -552,6 +552,15 @@ default.
If the stdin (standard input) only contains whitespace, do not run the command. If the stdin (standard input) only contains whitespace, do not run the command.
=item B<--retries> I<n>
If a job fails, retry it on another computer. Do this I<n> times. If
there are fewer than I<n> computers in B<--sshlogin> GNU parallel will
re-use the computers. This is useful if some jobs fail for no apparent
reason (such as network failure).
=item B<--return> I<filename> =item B<--return> I<filename>
Transfer files from remote servers. B<--return> is used with Transfer files from remote servers. B<--return> is used with
@ -2280,6 +2289,7 @@ sub parse_options {
"cleanup" => \$::opt_cleanup, "cleanup" => \$::opt_cleanup,
"basefile|B=s" => \@::opt_basefile, "basefile|B=s" => \@::opt_basefile,
"halt-on-error|H=s" => \$::opt_halt_on_error, "halt-on-error|H=s" => \$::opt_halt_on_error,
"retries=i" => \$::opt_retries,
"progress" => \$::opt_progress, "progress" => \$::opt_progress,
"eta" => \$::opt_eta, "eta" => \$::opt_eta,
"arg-sep|argsep=s" => \$::opt_arg_sep, "arg-sep|argsep=s" => \$::opt_arg_sep,
@ -2970,8 +2980,10 @@ sub compute_number_of_processes {
my $sshlogin = shift; my $sshlogin = shift;
my $wanted_processes = user_requested_processes($opt_P,$sshlogin); my $wanted_processes = user_requested_processes($opt_P,$sshlogin);
debug("Wanted procs: $wanted_processes\n"); debug("Wanted procs: $wanted_processes\n");
debug("unget before:",@Global::unget_next_command_line);
my $system_limit = my $system_limit =
processes_available_by_system_limit($wanted_processes,$sshlogin); processes_available_by_system_limit($wanted_processes,$sshlogin);
debug("unget after:",@Global::unget_next_command_line);
debug("Limited to procs: $system_limit\n"); debug("Limited to procs: $system_limit\n");
return $system_limit; return $system_limit;
} }
@ -3155,7 +3167,7 @@ sub user_requested_processes {
# E.g. -P +2 # E.g. -P +2
my $j = $1; my $j = $1;
$processes = $processes =
$j + no_of_processing_units_sshlogin($sshlogin); no_of_processing_units_sshlogin($sshlogin) + $j;
} elsif ($opt_P =~ /^-(\d+)$/) { } elsif ($opt_P =~ /^-(\d+)$/) {
# E.g. -P -2 # E.g. -P -2
my $j = $1; my $j = $1;
@ -3362,6 +3374,8 @@ sub min {
my $min = shift; my $min = shift;
my @args = @_; my @args = @_;
for my $a (@args) { for my $a (@args) {
# Skip undefs
defined $a or next;
$min = ($min < $a) ? $min : $a; $min = ($min < $a) ? $min : $a;
} }
return $min; return $min;
@ -3373,18 +3387,43 @@ sub max {
my $max = shift; my $max = shift;
my @args = @_; my @args = @_;
for my $a (@args) { for my $a (@args) {
# Skip undefs
defined $a or next;
$max = ($max > $a) ? $max : $a; $max = ($max > $a) ? $max : $a;
} }
return $max; return $max;
} }
sub sum {
# Returns:
# Sum of values of array
my @args = @_;
my $sum = 0;
for (@args) {
# Skip undefs
$_ and do { $sum += $_; }
}
return $sum;
}
sub undef_as_zero {
my $a = shift;
return $a ? $a : 0;
}
sub __RUNNING_AND_PRINTING_THE_JOBS__ {} sub __RUNNING_AND_PRINTING_THE_JOBS__ {}
# Variable structure: # Variable structure:
#
# $Global::failed{$clean_command}{'count'}{$sshlogin} = number of times failed on this sshlogin
# $Global::failed{$clean_command}{'seq'} = original sequence number
# $Global::running{$pid}{'seq'} = printsequence # $Global::running{$pid}{'seq'} = printsequence
# $Global::running{$pid}{sshlogin} = server to run on # $Global::running{$pid}{sshlogin} = server to run on
# $Global::running{$pid}{'exitstatus'} = exit status # $Global::running{$pid}{'exitstatus'} = exit status
# $Global::running{$pid}{'out'} = stdout filehandle
# $Global::running{$pid}{'err'} = stderr filehandle
# $Global::running{$pid}{'command'} = command being run (including rsync/ssh and args)
# $Global::running{$pid}{'cleancommand'} = command being run (excluding rsync/ssh but including args)
# $Global::host{$sshlogin}{'no_of_running'} = number of currently running jobs # $Global::host{$sshlogin}{'no_of_running'} = number of currently running jobs
# $Global::host{$sshlogin}{'completed'} = number of completed jobs # $Global::host{$sshlogin}{'completed'} = number of completed jobs
# $Global::host{$sshlogin}{'ncpus'} = number of CPUs (or CPU cores) # $Global::host{$sshlogin}{'ncpus'} = number of CPUs (or CPU cores)
@ -3668,12 +3707,13 @@ sub start_another_job {
my $sshlogin = shift; my $sshlogin = shift;
# Do we have enough file handles to start another job? # Do we have enough file handles to start another job?
if(enough_file_handles()) { if(enough_file_handles()) {
my $command = get_command_line_with_sshlogin($sshlogin); my ($command,$clean_command) = get_command_line_with_sshlogin($sshlogin);
if(defined $command) { if(defined $command) {
debug("Command to run on '$sshlogin': $command\n"); debug("Command to run on '$sshlogin': $command\n");
my %jobinfo = start_job($command,$sshlogin); my %jobinfo = start_job($command,$sshlogin,$clean_command);
if(%jobinfo) { if(%jobinfo) {
$Global::running{$jobinfo{"pid"}} = \%jobinfo; $Global::running{$jobinfo{"pid"}} = \%jobinfo;
debug("Started as seq ".$jobinfo{'seq'},"\n");
return 1; return 1;
} else { } else {
# If interactive says: Dont run the job, then skip it and run the next # If interactive says: Dont run the job, then skip it and run the next
@ -3698,8 +3738,10 @@ sub start_job {
# "err" => STDERR filehandle (if grouped) # "err" => STDERR filehandle (if grouped)
# "sshlogin" => sshlogin # "sshlogin" => sshlogin
# "command" => command being run # "command" => command being run
# "clean_command" => command being run without wrapping
my $command = shift; my $command = shift;
my $sshlogin = shift; my $sshlogin = shift;
my $clean_command = shift;
my ($pid,$out,$err,%out,%err,$outname,$errname,$name); my ($pid,$out,$err,%out,%err,$outname,$errname,$name);
if($Global::grouped) { if($Global::grouped) {
# To group we create temporary files for STDOUT and STDERR # To group we create temporary files for STDOUT and STDERR
@ -3738,10 +3780,18 @@ sub start_job {
} }
$Global::total_running++; $Global::total_running++;
$Global::total_started++; $Global::total_started++;
debug("$Global::total_running processes. Starting: $command\n");
#print STDERR "LEN".length($command)."\n"; #print STDERR "LEN".length($command)."\n";
my $job_start_sequence;
if($Global::failed{$clean_command}{'seq'}) {
# This is a retried job: Keep the old seq
$job_start_sequence = $Global::failed{$clean_command}{'seq'};
} else {
# This is a new (non-retried) job: Give it a new seq
$Private::job_start_sequence++; $Private::job_start_sequence++;
if(@::opt_a and $Private::job_start_sequence == 1) { $job_start_sequence = $Private::job_start_sequence;
}
debug("$Global::total_running processes. Starting ($job_start_sequence): $command\n");
if(@::opt_a and $job_start_sequence == 1) {
# Give STDIN to the first job if using -a # Give STDIN to the first job if using -a
$pid = open3("<&STDIN", ">&STDOUT", ">&STDERR", $command) || $pid = open3("<&STDIN", ">&STDOUT", ">&STDERR", $command) ||
die("open3 (with -a) failed. Report a bug to <bug-parallel\@gnu.org>\n"); die("open3 (with -a) failed. Report a bug to <bug-parallel\@gnu.org>\n");
@ -3766,17 +3816,19 @@ sub start_job {
or die "Can't dup \$Global::original_stderr: $!"; or die "Can't dup \$Global::original_stderr: $!";
if($Global::grouped) { if($Global::grouped) {
return ("seq" => $Private::job_start_sequence, return ("seq" => $job_start_sequence,
"pid" => $pid, "pid" => $pid,
"out" => $out{$outname}, "out" => $out{$outname},
"err" => $err{$errname}, "err" => $err{$errname},
"sshlogin" => $sshlogin, "sshlogin" => $sshlogin,
"command" => $command); "command" => $command,
"clean_command" => $clean_command);
} else { } else {
return ("seq" => $Private::job_start_sequence, return ("seq" => $job_start_sequence,
"pid" => $pid, "pid" => $pid,
"sshlogin" => $sshlogin, "sshlogin" => $sshlogin,
"command" => $command); "command" => $command,
"clean_command" => $clean_command);
} }
} }
@ -3823,8 +3875,29 @@ sub __READING_AND_QUOTING_ARGUMENTS__ {}
sub get_command_line_with_sshlogin { sub get_command_line_with_sshlogin {
# Returns: # Returns:
# next command to run with ssh command wrapping if remote # next command to run with ssh command wrapping if remote
# next command to run with no wrapping (clean_command)
my $sshlogin = shift; my $sshlogin = shift;
my ($next_command_line, $args_ref) = get_command_line(); my ($next_command_line, $args_ref) = get_command_line();
my ($clean_command) = $next_command_line;
if($::opt_retries and $clean_command and
$Global::failed{$clean_command}{'count'}{$sshlogin}) {
# This command with these args failed for this sshlogin
my $min_failures =
min(map { $Global::failed{$clean_command}{'count'}{$_} }
keys %Global::host);
if($Global::failed{$clean_command}{'count'}{$sshlogin} == $min_failures) {
# It failed the same or more times on another host:
# run it on this host
} else {
# If it failed fewer times on another host:
# Find another job to run
my @next_job_to_run = get_command_line_with_sshlogin($sshlogin);
# Push the command back on the queue
unget_command_line($next_command_line,$args_ref);
return @next_job_to_run;
}
}
my ($sshcmd,$serverlogin) = sshcommand_of_sshlogin($sshlogin); my ($sshcmd,$serverlogin) = sshcommand_of_sshlogin($sshlogin);
my ($pre,$post)=("",""); my ($pre,$post)=("","");
if($next_command_line and $serverlogin ne ":") { if($next_command_line and $serverlogin ne ":") {
@ -3847,9 +3920,9 @@ sub get_command_line_with_sshlogin {
$post = '_EXIT_status=$?; '.$post.' exit $_EXIT_status;'; $post = '_EXIT_status=$?; '.$post.' exit $_EXIT_status;';
} }
return ($pre . "$sshcmd $serverlogin " return ($pre . "$sshcmd $serverlogin "
.shell_quote($next_command_line).";".$post); .shell_quote($next_command_line).";".$post,$clean_command);
} else { } else {
return $next_command_line; return ($next_command_line,$clean_command);
} }
} }
@ -4302,17 +4375,46 @@ sub reaper {
# The process that died had the tty => release it # The process that died had the tty => release it
$Global::tty_taken = 0; $Global::tty_taken = 0;
} }
my $retry_job = 0;
if ($::opt_retries) {
my $clean_command = $Global::running{$stiff}{'clean_command'};
my $sshlogin = $Global::running{$stiff}{'sshlogin'};
if(not $Global::running{$stiff}{'exitstatus'}) {
# Completed with success. If there is a recorded failure: forget it
delete $Global::failed{$clean_command};
} else {
# The job failed. Should it be retried?
$Global::failed{$clean_command}{'count'}{$sshlogin}++;
$Global::failed{$clean_command}{'seq'} = $Global::running{$stiff}{'seq'};
my $total_failures =
sum(map { $Global::failed{$clean_command}{'count'}{$_} }
keys %Global::host);
if($total_failures == $::opt_retries) {
# This has been retried enough
$retry_job = 0;
delete $Global::failed{$clean_command};
} else {
# This command should be retried
unget_command_line($clean_command,[]);
$retry_job = 1;
}
}
}
if(not $retry_job) {
# Force printing now if the job failed and we are going to exit # Force printing now if the job failed and we are going to exit
my $print_now = ($Global::running{$stiff}{'exitstatus'} and my $print_now = ($Global::running{$stiff}{'exitstatus'} and
$::opt_halt_on_error and $::opt_halt_on_error == 2); $::opt_halt_on_error and $::opt_halt_on_error == 2);
if($Global::keeporder and not $print_now) { if($Global::keeporder and not $print_now) {
$Global::print_later{$Global::running{$stiff}{"seq"}} = $Private::print_later{$Global::running{$stiff}{"seq"}} =
$Global::running{$stiff}; $Global::running{$stiff};
$Private::job_end_sequence ||= 1; $Private::job_end_sequence ||= 1;
while($Global::print_later{$Private::job_end_sequence}) { debug("Looking for: $Private::job_end_sequence ".
"Current: ".$Global::running{$stiff}{"seq"}."\n");
while($Private::print_later{$Private::job_end_sequence}) {
debug("Found job end $Private::job_end_sequence"); debug("Found job end $Private::job_end_sequence");
print_job($Global::print_later{$Private::job_end_sequence}); print_job($Private::print_later{$Private::job_end_sequence});
delete $Global::print_later{$Private::job_end_sequence}; delete $Private::print_later{$Private::job_end_sequence};
$Private::job_end_sequence++; $Private::job_end_sequence++;
} }
} else { } else {
@ -4338,6 +4440,7 @@ sub reaper {
} }
} }
} }
}
my $sshlogin = $Global::running{$stiff}{'sshlogin'}; my $sshlogin = $Global::running{$stiff}{'sshlogin'};
$Global::host{$sshlogin}{'no_of_running'}--; $Global::host{$sshlogin}{'no_of_running'}--;
$Global::host{$sshlogin}{'completed'}++; $Global::host{$sshlogin}{'completed'}++;