From fc65ca9626a7f81d8a11df5af3d6571ba4150332 Mon Sep 17 00:00:00 2001 From: Ole Tange Date: Sat, 7 Jan 2012 03:29:48 +0100 Subject: [PATCH] parallel: implemented --resume incl. test. --- src/parallel | 86 +++++++++++++++++++++++++------- src/parallel.pod | 13 +++++ testsuite/tests-to-run/test65.sh | 24 ++++++--- testsuite/wanted-results/test30 | 6 +-- testsuite/wanted-results/test65 | 4 ++ 5 files changed, 107 insertions(+), 26 deletions(-) diff --git a/src/parallel b/src/parallel index 0755124f..a8268c6f 100755 --- a/src/parallel +++ b/src/parallel @@ -386,6 +386,7 @@ sub options_hash { "X" => \$::opt_X, "v" => \@::opt_v, "joblog=s" => \$::opt_joblog, + "resume" => \$::opt_resume, "silent" => \$::opt_silent, "keep-order|keeporder|k" => \$::opt_k, "group" => \$::opt_group, @@ -536,6 +537,7 @@ sub parse_options { $Global::arg_file_sep = "::::"; $Global::trim = 'n'; $Global::max_jobs_running = 0; + $Global::job_already_run = ''; @ARGV=read_options(); @@ -727,20 +729,56 @@ sub parse_options { if(not defined $::opt_P) { $::opt_P = "100%"; } + open_joblog(); +} + +sub open_joblog { + my $append = 0; + if($::opt_resume and not $::opt_joblog) { + print STDERR ("$Global::progname: --resume requires --joblog\n"); + ::wait_and_exit(255); + } if($::opt_joblog) { - if(not open($Global::joblog,">$::opt_joblog")) { - print STDERR ("$Global::progname: Cannot write to ", - "--joblog $::opt_joblog\n"); - ::wait_and_exit(255); - } else { - print $Global::joblog - join("\t", "Seq", "Host", "Starttime", "Runtime", - "Send", "Receive", "Exitval", "Signal", "Command" - ). "\n"; - } + if($::opt_resume) { + if(open(JOBLOG, $::opt_joblog)) { + # Read the joblog + $append = ; # If there is a header: Open as append later + while() { + if(/^(\d+)/) { + # This is 30% faster than set_job_already_run($1); + vec($Global::job_already_run,$1,1) = 1; + } else { + print STDERR ("$Global::progname: Format of '$::opt_joblog' is wrong\n"); + ::wait_and_exit(255); + } + } + close JOBLOG; + } + } + if($append) { + # Append to joblog + if(not open($Global::joblog,">>$::opt_joblog")) { + print STDERR ("$Global::progname: Cannot append to ", + "--joblog $::opt_joblog\n"); + ::wait_and_exit(255); + } + } else { + # Overwrite the joblog + if(not open($Global::joblog,">$::opt_joblog")) { + print STDERR ("$Global::progname: Cannot write to ", + "--joblog $::opt_joblog\n"); + ::wait_and_exit(255); + } else { + print $Global::joblog + join("\t", "Seq", "Host", "Starttime", "Runtime", + "Send", "Receive", "Exitval", "Signal", "Command" + ). "\n"; + } + } } } + sub read_options { # Read options from command line, profile and $PARALLEL # Returns: @@ -1079,13 +1117,16 @@ sub start_another_job { debug("Not starting: JobQueue empty\n"); return 0; } else { - my $job = get_job_with_sshlogin($sshlogin); - if(not defined $job) { - # No command available for that sshlogin - debug("Not starting: no jobs available for ".$sshlogin->string()."\n"); - return 0; - } - debug("Command to run on '".$job->sshlogin()."': '".$job->replaced()."'\n"); + my $job; + do { + $job = get_job_with_sshlogin($sshlogin); + if(not defined $job) { + # No command available for that sshlogin + debug("Not starting: no jobs available for ".$sshlogin->string()."\n"); + return 0; + } + } while ($job->is_already_in_joblog()); + debug("Command to run on '".$job->sshlogin()."': '".$job->replaced()."'\n"); if($job->start()) { $Global::running{$job->pid()} = $job; debug("Started as seq ".$job->seq(),"\n"); @@ -3496,6 +3537,16 @@ sub start { return $job; } +sub is_already_in_joblog { + my $job = shift; + return vec($Global::job_already_run,$job->seq(),1); +} + +sub set_job_in_joblog { + my $job = shift; + vec($Global::job_already_run,$job->seq(),1) = 1; +} + sub should_be_retried { # Should this job be retried? # Returns @@ -3551,6 +3602,7 @@ sub print { $self->exitstatus(), $self->exitsignal(), $cmd ). "\n"; flush $Global::joblog; + $self->set_job_in_joblog(); } if(($::opt_dryrun or $Global::verbose) and $Global::grouped) { diff --git a/src/parallel.pod b/src/parallel.pod index 6604bd5c..641148cd 100644 --- a/src/parallel.pod +++ b/src/parallel.pod @@ -525,6 +525,8 @@ To convert the times into ISO-8601 strict do: B +See also B<--resume>. + =item B<--jobs> I @@ -923,6 +925,17 @@ it to the command. Only used with B<--pipe>. +=item B<--resume> + +Resumes from the last unfinished job. By reading B<--joblog> GNU +B will figure out the last unfinished job and continue from +there. As GNU B only looks at the sequence numbers in +B<--joblog> then the input, the command, and B<--joblog> all have to +remain unchanged; otherwise GNU B may run wrong commands. + +See also: B<--joblog>. + + =item B<--retries> I If a job fails, retry it on another computer. Do this I times. If diff --git a/testsuite/tests-to-run/test65.sh b/testsuite/tests-to-run/test65.sh index 3cba9659..22561ee6 100644 --- a/testsuite/tests-to-run/test65.sh +++ b/testsuite/tests-to-run/test65.sh @@ -5,13 +5,25 @@ cat <<'EOF' | sed -e s/\$SERVER1/$SERVER1/\;s/\$SERVER2/$SERVER2/ | parallel -j1 echo '### Test --timeout'; parallel -j0 -k --timeout 1 echo {}\; sleep {}\; echo {} ::: 1.1 2.2 3.3 4.4 echo '### Test retired'; - stdout parallel -B; - stdout parallel -g; - stdout parallel -H; - stdout parallel -T; - stdout parallel -U; - stdout parallel -W; + stdout parallel -B; + stdout parallel -g; + stdout parallel -H; + stdout parallel -T; + stdout parallel -U; + stdout parallel -W; stdout parallel -Y; +echo '### Test --joblog followed by --resume --joblog'; + rm -f /tmp/joblog; + timeout -k 1 1 parallel -j2 --joblog /tmp/joblog sleep {} ::: 1.1 2.2 3.3 4.4 2>/dev/null; + parallel -j2 --resume --joblog /tmp/joblog sleep {} ::: 1.1 2.2 3.3 4.4; + cat /tmp/joblog | wc; + rm -f /tmp/joblog; +echo '### Test --resume --joblog followed by --resume --joblog'; + rm -f /tmp/joblog2; + timeout -k 1 1 parallel -j2 --resume --joblog /tmp/joblog2 sleep {} ::: 1.1 2.2 3.3 4.4 2>/dev/null; + parallel -j2 --resume --joblog /tmp/joblog2 sleep {} ::: 1.1 2.2 3.3 4.4; + cat /tmp/joblog2 | wc; + rm -f /tmp/joblog2; EOF echo '### Test --shellquote' diff --git a/testsuite/wanted-results/test30 b/testsuite/wanted-results/test30 index 639d5db7..ff8fc995 100644 --- a/testsuite/wanted-results/test30 +++ b/testsuite/wanted-results/test30 @@ -19,8 +19,8 @@ Computers / CPU cores / Max jobs to run 1:local / 2 / 1 ### --timeout on remote machines -1 -1 -2 +slept 1 +slept 1 +jobs failed: 2 ### --pipe without command parallel: --pipe must have a command to pipe into (e.g. 'cat') diff --git a/testsuite/wanted-results/test65 b/testsuite/wanted-results/test65 index ed84ee26..4d7d470a 100644 --- a/testsuite/wanted-results/test65 +++ b/testsuite/wanted-results/test65 @@ -26,6 +26,10 @@ parallel: -U has been retired. Use --er. parallel: -W has been retired. Use --wd. parallel: -Y has been retired. Use --shebang. parallel: -H has been retired. Use --halt. +### Test --joblog followed by --resume --joblog + 5 49 205 +### Test --resume --joblog followed by --resume --joblog + 5 49 205 ### Test --shellquote awk\ -v\ FS=\"\\\",\\\"\"\ \'\{print\ \$1,\ \$3,\ \$4,\ \$5,\ \$9,\ \$14\}\'\ \|\ grep\ -v\ \"\#\"\ \|\ sed\ -e\ \'1d\'\ -e\ \'s/\\\"//g\'\ -e\ \'s/\\/\\/\\//\\t/g\'\ \|\ cut\ -f1-6,11\ \|\ sed\ -e\ \'s/\\/\\//\\t/g\'\ -e\ \'s/\ /\\t/g ### Test make .deb package