parallel: implemented --resume incl. test.

This commit is contained in:
Ole Tange 2012-01-07 03:29:48 +01:00
parent dbfb878246
commit fc65ca9626
5 changed files with 107 additions and 26 deletions

View file

@ -386,6 +386,7 @@ sub options_hash {
"X" => \$::opt_X, "X" => \$::opt_X,
"v" => \@::opt_v, "v" => \@::opt_v,
"joblog=s" => \$::opt_joblog, "joblog=s" => \$::opt_joblog,
"resume" => \$::opt_resume,
"silent" => \$::opt_silent, "silent" => \$::opt_silent,
"keep-order|keeporder|k" => \$::opt_k, "keep-order|keeporder|k" => \$::opt_k,
"group" => \$::opt_group, "group" => \$::opt_group,
@ -536,6 +537,7 @@ sub parse_options {
$Global::arg_file_sep = "::::"; $Global::arg_file_sep = "::::";
$Global::trim = 'n'; $Global::trim = 'n';
$Global::max_jobs_running = 0; $Global::max_jobs_running = 0;
$Global::job_already_run = '';
@ARGV=read_options(); @ARGV=read_options();
@ -727,20 +729,56 @@ sub parse_options {
if(not defined $::opt_P) { if(not defined $::opt_P) {
$::opt_P = "100%"; $::opt_P = "100%";
} }
open_joblog();
}
sub open_joblog {
my $append = 0;
if($::opt_resume and not $::opt_joblog) {
print STDERR ("$Global::progname: --resume requires --joblog\n");
::wait_and_exit(255);
}
if($::opt_joblog) { if($::opt_joblog) {
if(not open($Global::joblog,">$::opt_joblog")) { if($::opt_resume) {
print STDERR ("$Global::progname: Cannot write to ", if(open(JOBLOG, $::opt_joblog)) {
"--joblog $::opt_joblog\n"); # Read the joblog
::wait_and_exit(255); $append = <JOBLOG>; # If there is a header: Open as append later
} else { while(<JOBLOG>) {
print $Global::joblog if(/^(\d+)/) {
join("\t", "Seq", "Host", "Starttime", "Runtime", # This is 30% faster than set_job_already_run($1);
"Send", "Receive", "Exitval", "Signal", "Command" vec($Global::job_already_run,$1,1) = 1;
). "\n"; } else {
} print STDERR ("$Global::progname: Format of '$::opt_joblog' is wrong\n");
::wait_and_exit(255);
}
}
close JOBLOG;
}
}
if($append) {
# Append to joblog
if(not open($Global::joblog,">>$::opt_joblog")) {
print STDERR ("$Global::progname: Cannot append to ",
"--joblog $::opt_joblog\n");
::wait_and_exit(255);
}
} else {
# Overwrite the joblog
if(not open($Global::joblog,">$::opt_joblog")) {
print STDERR ("$Global::progname: Cannot write to ",
"--joblog $::opt_joblog\n");
::wait_and_exit(255);
} else {
print $Global::joblog
join("\t", "Seq", "Host", "Starttime", "Runtime",
"Send", "Receive", "Exitval", "Signal", "Command"
). "\n";
}
}
} }
} }
sub read_options { sub read_options {
# Read options from command line, profile and $PARALLEL # Read options from command line, profile and $PARALLEL
# Returns: # Returns:
@ -1079,13 +1117,16 @@ sub start_another_job {
debug("Not starting: JobQueue empty\n"); debug("Not starting: JobQueue empty\n");
return 0; return 0;
} else { } else {
my $job = get_job_with_sshlogin($sshlogin); my $job;
if(not defined $job) { do {
# No command available for that sshlogin $job = get_job_with_sshlogin($sshlogin);
debug("Not starting: no jobs available for ".$sshlogin->string()."\n"); if(not defined $job) {
return 0; # No command available for that sshlogin
} debug("Not starting: no jobs available for ".$sshlogin->string()."\n");
debug("Command to run on '".$job->sshlogin()."': '".$job->replaced()."'\n"); return 0;
}
} while ($job->is_already_in_joblog());
debug("Command to run on '".$job->sshlogin()."': '".$job->replaced()."'\n");
if($job->start()) { if($job->start()) {
$Global::running{$job->pid()} = $job; $Global::running{$job->pid()} = $job;
debug("Started as seq ".$job->seq(),"\n"); debug("Started as seq ".$job->seq(),"\n");
@ -3496,6 +3537,16 @@ sub start {
return $job; return $job;
} }
sub is_already_in_joblog {
my $job = shift;
return vec($Global::job_already_run,$job->seq(),1);
}
sub set_job_in_joblog {
my $job = shift;
vec($Global::job_already_run,$job->seq(),1) = 1;
}
sub should_be_retried { sub should_be_retried {
# Should this job be retried? # Should this job be retried?
# Returns # Returns
@ -3551,6 +3602,7 @@ sub print {
$self->exitstatus(), $self->exitsignal(), $cmd $self->exitstatus(), $self->exitsignal(), $cmd
). "\n"; ). "\n";
flush $Global::joblog; flush $Global::joblog;
$self->set_job_in_joblog();
} }
if(($::opt_dryrun or $Global::verbose) and $Global::grouped) { if(($::opt_dryrun or $Global::verbose) and $Global::grouped) {

View file

@ -525,6 +525,8 @@ To convert the times into ISO-8601 strict do:
B<perl -a -F"\t" -ne 'chomp($F[2]=`date -d \@$F[2] +%FT%T`); print join("\t",@F)'> B<perl -a -F"\t" -ne 'chomp($F[2]=`date -d \@$F[2] +%FT%T`); print join("\t",@F)'>
See also B<--resume>.
=item B<--jobs> I<N> =item B<--jobs> I<N>
@ -923,6 +925,17 @@ it to the command.
Only used with B<--pipe>. Only used with B<--pipe>.
=item B<--resume>
Resumes from the last unfinished job. By reading B<--joblog> GNU
B<parallel> will figure out the last unfinished job and continue from
there. As GNU B<parallel> only looks at the sequence numbers in
B<--joblog> then the input, the command, and B<--joblog> all have to
remain unchanged; otherwise GNU B<parallel> may run wrong commands.
See also: B<--joblog>.
=item B<--retries> I<n> =item B<--retries> I<n>
If a job fails, retry it on another computer. Do this I<n> times. If If a job fails, retry it on another computer. Do this I<n> times. If

View file

@ -12,6 +12,18 @@ echo '### Test retired';
stdout parallel -U; stdout parallel -U;
stdout parallel -W; stdout parallel -W;
stdout parallel -Y; stdout parallel -Y;
echo '### Test --joblog followed by --resume --joblog';
rm -f /tmp/joblog;
timeout -k 1 1 parallel -j2 --joblog /tmp/joblog sleep {} ::: 1.1 2.2 3.3 4.4 2>/dev/null;
parallel -j2 --resume --joblog /tmp/joblog sleep {} ::: 1.1 2.2 3.3 4.4;
cat /tmp/joblog | wc;
rm -f /tmp/joblog;
echo '### Test --resume --joblog followed by --resume --joblog';
rm -f /tmp/joblog2;
timeout -k 1 1 parallel -j2 --resume --joblog /tmp/joblog2 sleep {} ::: 1.1 2.2 3.3 4.4 2>/dev/null;
parallel -j2 --resume --joblog /tmp/joblog2 sleep {} ::: 1.1 2.2 3.3 4.4;
cat /tmp/joblog2 | wc;
rm -f /tmp/joblog2;
EOF EOF
echo '### Test --shellquote' echo '### Test --shellquote'

View file

@ -19,8 +19,8 @@ Computers / CPU cores / Max jobs to run
1:local / 2 / 1 1:local / 2 / 1
### --timeout on remote machines ### --timeout on remote machines
1 slept 1
1 slept 1
2 jobs failed: 2
### --pipe without command ### --pipe without command
parallel: --pipe must have a command to pipe into (e.g. 'cat') parallel: --pipe must have a command to pipe into (e.g. 'cat')

View file

@ -26,6 +26,10 @@ parallel: -U has been retired. Use --er.
parallel: -W has been retired. Use --wd. parallel: -W has been retired. Use --wd.
parallel: -Y has been retired. Use --shebang. parallel: -Y has been retired. Use --shebang.
parallel: -H has been retired. Use --halt. parallel: -H has been retired. Use --halt.
### Test --joblog followed by --resume --joblog
5 49 205
### Test --resume --joblog followed by --resume --joblog
5 49 205
### Test --shellquote ### Test --shellquote
awk\ -v\ FS=\"\\\",\\\"\"\ \'\{print\ \$1,\ \$3,\ \$4,\ \$5,\ \$9,\ \$14\}\'\ \|\ grep\ -v\ \"\#\"\ \|\ sed\ -e\ \'1d\'\ -e\ \'s/\\\"//g\'\ -e\ \'s/\\/\\/\\//\\t/g\'\ \|\ cut\ -f1-6,11\ \|\ sed\ -e\ \'s/\\/\\//\\t/g\'\ -e\ \'s/\ /\\t/g awk\ -v\ FS=\"\\\",\\\"\"\ \'\{print\ \$1,\ \$3,\ \$4,\ \$5,\ \$9,\ \$14\}\'\ \|\ grep\ -v\ \"\#\"\ \|\ sed\ -e\ \'1d\'\ -e\ \'s/\\\"//g\'\ -e\ \'s/\\/\\/\\//\\t/g\'\ \|\ cut\ -f1-6,11\ \|\ sed\ -e\ \'s/\\/\\//\\t/g\'\ -e\ \'s/\ /\\t/g
### Test make .deb package ### Test make .deb package