From 2e572c00c9c990aadc73fee0fd80d0835b8ea6ac Mon Sep 17 00:00:00 2001 From: Ole Tange Date: Tue, 21 Sep 2010 09:34:53 +0200 Subject: [PATCH] parallel: --retries implemented. Passes unittest. --- doc/FUTURE_IDEAS | 24 ++++++ src/parallel | 189 ++++++++++++++++++++++++++++++++++++----------- 2 files changed, 170 insertions(+), 43 deletions(-) diff --git a/doc/FUTURE_IDEAS b/doc/FUTURE_IDEAS index 5acaa32f..b62d26b3 100644 --- a/doc/FUTURE_IDEAS +++ b/doc/FUTURE_IDEAS @@ -1,3 +1,27 @@ +== Retry == + +--retries n +If a job fails, retry it on another computer. Do this n times. If there are fewer than n computers in --sshlogin GNU parallel will re-use the computers. This is useful if some jobs fail for no apparent reason (such as network failure). + +When the command dies and fails: +Update $Global::failed_command{$test_failed} +If max n has been reached for all: fail + +What if all jobs in the queue failed for this sshlogin so only undef is left for this sshlogin? + +BUG: Rækkeflg er forkert. +(seq 0 1) | parallel -kj100% -S 1/:,2/parallel@server2 -vq perl -e 'sleep 1;print "job{}\n";exit({})' +OK: (seq 0 1) | parallel -kj2 -S 1/:,2/parallel@server2 -vq perl -e 'sleep 1;print "job{}\n";exit({})' +Fejlen eksisterer også i 20100906 og 20100822 + +perl -e sleep\ 1\;print\ \"job0\\n\"\;exit\(0\)ARRAY(0x965e928)perl -e sleep\ 1\;print\ \"job1\\n\"\;exit\(1\)ARRAY(0x9727bc0)perl -e sleep\ 1\;print\ \"job2\\n\"\;exit\(2\)ARRAY(0x9727a70)Limited to procs: 3 +Wanted procs: 1 +Time to fork ten procs: (processes so far: 1) +perl -e sleep\ 1\;print\ \"job1\\n\"\;exit\(1\)ARRAY(0x9727bc0)perl -e sleep\ 1\;print\ \"job2\\n\"\;exit\(2\)ARRAY(0x9727a70)perl -e sleep\ 1\;print\ \"job0\\n\"\;exit\(0\)ARRAY(0x965e928)Limited to procs: 1 +Running jobs on parallel@server2: 0 + +== FSCONS == + Annoncer foredrag til FSCONS I will be at FSCONS 2010-11-07 talking about GNU Parallel. diff --git a/src/parallel b/src/parallel index bf42a022..8679b61f 100755 --- a/src/parallel +++ b/src/parallel @@ -552,6 +552,15 @@ default. If the stdin (standard input) only contains whitespace, do not run the command. + +=item B<--retries> I + +If a job fails, retry it on another computer. Do this I times. If +there are fewer than I computers in B<--sshlogin> GNU parallel will +re-use the computers. This is useful if some jobs fail for no apparent +reason (such as network failure). + + =item B<--return> I Transfer files from remote servers. B<--return> is used with @@ -2280,6 +2289,7 @@ sub parse_options { "cleanup" => \$::opt_cleanup, "basefile|B=s" => \@::opt_basefile, "halt-on-error|H=s" => \$::opt_halt_on_error, + "retries=i" => \$::opt_retries, "progress" => \$::opt_progress, "eta" => \$::opt_eta, "arg-sep|argsep=s" => \$::opt_arg_sep, @@ -2970,8 +2980,10 @@ sub compute_number_of_processes { my $sshlogin = shift; my $wanted_processes = user_requested_processes($opt_P,$sshlogin); debug("Wanted procs: $wanted_processes\n"); + debug("unget before:",@Global::unget_next_command_line); my $system_limit = processes_available_by_system_limit($wanted_processes,$sshlogin); + debug("unget after:",@Global::unget_next_command_line); debug("Limited to procs: $system_limit\n"); return $system_limit; } @@ -3155,7 +3167,7 @@ sub user_requested_processes { # E.g. -P +2 my $j = $1; $processes = - $j + no_of_processing_units_sshlogin($sshlogin); + no_of_processing_units_sshlogin($sshlogin) + $j; } elsif ($opt_P =~ /^-(\d+)$/) { # E.g. -P -2 my $j = $1; @@ -3362,6 +3374,8 @@ sub min { my $min = shift; my @args = @_; for my $a (@args) { + # Skip undefs + defined $a or next; $min = ($min < $a) ? $min : $a; } return $min; @@ -3373,18 +3387,43 @@ sub max { my $max = shift; my @args = @_; for my $a (@args) { + # Skip undefs + defined $a or next; $max = ($max > $a) ? $max : $a; } return $max; } +sub sum { + # Returns: + # Sum of values of array + my @args = @_; + my $sum = 0; + for (@args) { + # Skip undefs + $_ and do { $sum += $_; } + } + return $sum; +} + +sub undef_as_zero { + my $a = shift; + return $a ? $a : 0; +} sub __RUNNING_AND_PRINTING_THE_JOBS__ {} # Variable structure: +# +# $Global::failed{$clean_command}{'count'}{$sshlogin} = number of times failed on this sshlogin +# $Global::failed{$clean_command}{'seq'} = original sequence number # $Global::running{$pid}{'seq'} = printsequence # $Global::running{$pid}{sshlogin} = server to run on # $Global::running{$pid}{'exitstatus'} = exit status +# $Global::running{$pid}{'out'} = stdout filehandle +# $Global::running{$pid}{'err'} = stderr filehandle +# $Global::running{$pid}{'command'} = command being run (including rsync/ssh and args) +# $Global::running{$pid}{'cleancommand'} = command being run (excluding rsync/ssh but including args) # $Global::host{$sshlogin}{'no_of_running'} = number of currently running jobs # $Global::host{$sshlogin}{'completed'} = number of completed jobs # $Global::host{$sshlogin}{'ncpus'} = number of CPUs (or CPU cores) @@ -3668,12 +3707,13 @@ sub start_another_job { my $sshlogin = shift; # Do we have enough file handles to start another job? if(enough_file_handles()) { - my $command = get_command_line_with_sshlogin($sshlogin); + my ($command,$clean_command) = get_command_line_with_sshlogin($sshlogin); if(defined $command) { debug("Command to run on '$sshlogin': $command\n"); - my %jobinfo = start_job($command,$sshlogin); + my %jobinfo = start_job($command,$sshlogin,$clean_command); if(%jobinfo) { $Global::running{$jobinfo{"pid"}} = \%jobinfo; + debug("Started as seq ".$jobinfo{'seq'},"\n"); return 1; } else { # If interactive says: Dont run the job, then skip it and run the next @@ -3698,8 +3738,10 @@ sub start_job { # "err" => STDERR filehandle (if grouped) # "sshlogin" => sshlogin # "command" => command being run + # "clean_command" => command being run without wrapping my $command = shift; my $sshlogin = shift; + my $clean_command = shift; my ($pid,$out,$err,%out,%err,$outname,$errname,$name); if($Global::grouped) { # To group we create temporary files for STDOUT and STDERR @@ -3738,10 +3780,18 @@ sub start_job { } $Global::total_running++; $Global::total_started++; - debug("$Global::total_running processes. Starting: $command\n"); #print STDERR "LEN".length($command)."\n"; - $Private::job_start_sequence++; - if(@::opt_a and $Private::job_start_sequence == 1) { + my $job_start_sequence; + if($Global::failed{$clean_command}{'seq'}) { + # This is a retried job: Keep the old seq + $job_start_sequence = $Global::failed{$clean_command}{'seq'}; + } else { + # This is a new (non-retried) job: Give it a new seq + $Private::job_start_sequence++; + $job_start_sequence = $Private::job_start_sequence; + } + debug("$Global::total_running processes. Starting ($job_start_sequence): $command\n"); + if(@::opt_a and $job_start_sequence == 1) { # Give STDIN to the first job if using -a $pid = open3("<&STDIN", ">&STDOUT", ">&STDERR", $command) || die("open3 (with -a) failed. Report a bug to \n"); @@ -3766,17 +3816,19 @@ sub start_job { or die "Can't dup \$Global::original_stderr: $!"; if($Global::grouped) { - return ("seq" => $Private::job_start_sequence, + return ("seq" => $job_start_sequence, "pid" => $pid, "out" => $out{$outname}, "err" => $err{$errname}, "sshlogin" => $sshlogin, - "command" => $command); + "command" => $command, + "clean_command" => $clean_command); } else { - return ("seq" => $Private::job_start_sequence, + return ("seq" => $job_start_sequence, "pid" => $pid, "sshlogin" => $sshlogin, - "command" => $command); + "command" => $command, + "clean_command" => $clean_command); } } @@ -3823,8 +3875,29 @@ sub __READING_AND_QUOTING_ARGUMENTS__ {} sub get_command_line_with_sshlogin { # Returns: # next command to run with ssh command wrapping if remote + # next command to run with no wrapping (clean_command) my $sshlogin = shift; my ($next_command_line, $args_ref) = get_command_line(); + my ($clean_command) = $next_command_line; + if($::opt_retries and $clean_command and + $Global::failed{$clean_command}{'count'}{$sshlogin}) { + # This command with these args failed for this sshlogin + my $min_failures = + min(map { $Global::failed{$clean_command}{'count'}{$_} } + keys %Global::host); + if($Global::failed{$clean_command}{'count'}{$sshlogin} == $min_failures) { + # It failed the same or more times on another host: + # run it on this host + } else { + # If it failed fewer times on another host: + # Find another job to run + my @next_job_to_run = get_command_line_with_sshlogin($sshlogin); + # Push the command back on the queue + unget_command_line($next_command_line,$args_ref); + return @next_job_to_run; + } + } + my ($sshcmd,$serverlogin) = sshcommand_of_sshlogin($sshlogin); my ($pre,$post)=("",""); if($next_command_line and $serverlogin ne ":") { @@ -3847,9 +3920,9 @@ sub get_command_line_with_sshlogin { $post = '_EXIT_status=$?; '.$post.' exit $_EXIT_status;'; } return ($pre . "$sshcmd $serverlogin " - .shell_quote($next_command_line).";".$post); + .shell_quote($next_command_line).";".$post,$clean_command); } else { - return $next_command_line; + return ($next_command_line,$clean_command); } } @@ -4302,39 +4375,69 @@ sub reaper { # The process that died had the tty => release it $Global::tty_taken = 0; } - # Force printing now if the job failed and we are going to exit - my $print_now = ($Global::running{$stiff}{'exitstatus'} and - $::opt_halt_on_error and $::opt_halt_on_error == 2); - if($Global::keeporder and not $print_now) { - $Global::print_later{$Global::running{$stiff}{"seq"}} = - $Global::running{$stiff}; - $Private::job_end_sequence ||= 1; - while($Global::print_later{$Private::job_end_sequence}) { - debug("Found job end $Private::job_end_sequence"); - print_job($Global::print_later{$Private::job_end_sequence}); - delete $Global::print_later{$Private::job_end_sequence}; - $Private::job_end_sequence++; + my $retry_job = 0; + if ($::opt_retries) { + my $clean_command = $Global::running{$stiff}{'clean_command'}; + my $sshlogin = $Global::running{$stiff}{'sshlogin'}; + if(not $Global::running{$stiff}{'exitstatus'}) { + # Completed with success. If there is a recorded failure: forget it + delete $Global::failed{$clean_command}; + } else { + # The job failed. Should it be retried? + $Global::failed{$clean_command}{'count'}{$sshlogin}++; + $Global::failed{$clean_command}{'seq'} = $Global::running{$stiff}{'seq'}; + my $total_failures = + sum(map { $Global::failed{$clean_command}{'count'}{$_} } + keys %Global::host); + if($total_failures == $::opt_retries) { + # This has been retried enough + $retry_job = 0; + delete $Global::failed{$clean_command}; + } else { + # This command should be retried + unget_command_line($clean_command,[]); + $retry_job = 1; + } } - } else { - print_job ($Global::running{$stiff}); } - if($Global::running{$stiff}{'exitstatus'}) { - # The jobs had a exit status <> 0, so error - $Global::exitstatus++; - if($::opt_halt_on_error) { - if($::opt_halt_on_error == 1) { - # If halt on error == 1 we should gracefully exit - print STDERR ("$Global::progname: Starting no more jobs. ", - "Waiting for ", scalar(keys %Global::running), - " jobs to finish. This job failed:\n", - $Global::running{$stiff}{"command"},"\n"); - $Global::start_no_new_jobs++; - $Global::halt_on_error_exitstatus = $Global::running{$stiff}{'exitstatus'}; - } elsif($::opt_halt_on_error == 2) { - # If halt on error == 2 we should exit immediately - print STDERR ("$Global::progname: This job failed:\n", - $Global::running{$stiff}{"command"},"\n"); - exit ($Global::running{$stiff}{'exitstatus'}); + + if(not $retry_job) { + # Force printing now if the job failed and we are going to exit + my $print_now = ($Global::running{$stiff}{'exitstatus'} and + $::opt_halt_on_error and $::opt_halt_on_error == 2); + if($Global::keeporder and not $print_now) { + $Private::print_later{$Global::running{$stiff}{"seq"}} = + $Global::running{$stiff}; + $Private::job_end_sequence ||= 1; + debug("Looking for: $Private::job_end_sequence ". + "Current: ".$Global::running{$stiff}{"seq"}."\n"); + while($Private::print_later{$Private::job_end_sequence}) { + debug("Found job end $Private::job_end_sequence"); + print_job($Private::print_later{$Private::job_end_sequence}); + delete $Private::print_later{$Private::job_end_sequence}; + $Private::job_end_sequence++; + } + } else { + print_job ($Global::running{$stiff}); + } + if($Global::running{$stiff}{'exitstatus'}) { + # The jobs had a exit status <> 0, so error + $Global::exitstatus++; + if($::opt_halt_on_error) { + if($::opt_halt_on_error == 1) { + # If halt on error == 1 we should gracefully exit + print STDERR ("$Global::progname: Starting no more jobs. ", + "Waiting for ", scalar(keys %Global::running), + " jobs to finish. This job failed:\n", + $Global::running{$stiff}{"command"},"\n"); + $Global::start_no_new_jobs++; + $Global::halt_on_error_exitstatus = $Global::running{$stiff}{'exitstatus'}; + } elsif($::opt_halt_on_error == 2) { + # If halt on error == 2 we should exit immediately + print STDERR ("$Global::progname: This job failed:\n", + $Global::running{$stiff}{"command"},"\n"); + exit ($Global::running{$stiff}{'exitstatus'}); + } } } }