parallel: --resume-failed works with --pipe.

This commit is contained in:
Ole Tange 2013-02-11 23:47:11 +01:00
parent 42751ae6a9
commit 1de3edfb57
2 changed files with 55 additions and 18 deletions

View file

@ -307,17 +307,16 @@ sub spreadstdin {
$recend =~ s/\\([rnt'"\\])/"qq|\\$1|"/gee;
}
my $recendrecstart = $recend.$recstart;
# Force the while-loop once if everything was read by header reading
my $force_one_time_through = 0;
my $chunk_number = 1;
my $blocksize = $opt::blocksize;
for my $in (@fhlist) {
piperead: while(1) {
my $anything_written = 0;
if(not read($in,substr($buf,length $buf,0),$blocksize)
and $force_one_time_through) {
and $chunk_number != 1) {
# Force the while-loop once if everything was read by header reading
last;
}
$force_one_time_through ||= 1;
if($opt::r) {
# Remove empty lines
@ -335,7 +334,9 @@ sub spreadstdin {
$last_newline_pos = rindex($buf,"\n",$last_newline_pos-1);
}
# Chop at $last_newline_pos as that is where n-line record ends
$anything_written += write_record_to_pipe(\$header,\$buf,$recstart,$recend,$last_newline_pos+1);
$anything_written +=
write_record_to_pipe($chunk_number++,\$header,\$buf,
$recstart,$recend,$last_newline_pos+1);
substr($buf,0,$last_newline_pos+1) = "";
} elsif($opt::regexp) {
if($Global::max_number_of_args) {
@ -343,12 +344,16 @@ sub spreadstdin {
# -L -N => (start..*?end){n*l}
my $read_n_lines = $Global::max_number_of_args * ($Global::max_lines || 1);
while($buf =~ s/((?:$recstart.*?$recend){$read_n_lines})($recstart.*)$/$2/os) {
$anything_written += write_record_to_pipe(\$header,\$1,$recstart,$recend,length $1);
$anything_written +=
write_record_to_pipe($chunk_number++,\$header,\$1,
$recstart,$recend,length $1);
}
} else {
# Find the last recend-recstart in $buf
if($buf =~ s/(.*$recend)($recstart.*?)$/$2/os) {
$anything_written += write_record_to_pipe(\$header,\$1,$recstart,$recend,length $1);
$anything_written +=
write_record_to_pipe($chunk_number++,\$header,\$1,
$recstart,$recend,length $1);
}
}
} else {
@ -358,7 +363,9 @@ sub spreadstdin {
my $read_n_lines = $Global::max_number_of_args * ($Global::max_lines || 1);
while(($i = nindex(\$buf,$recendrecstart,$read_n_lines)) != -1) {
$i += length $recend; # find the actual splitting location
$anything_written += write_record_to_pipe(\$header,\$buf,$recstart,$recend,$i);
$anything_written +=
write_record_to_pipe($chunk_number++,\$header,\$buf,
$recstart,$recend,$i);
substr($buf,0,$i) = "";
}
} else {
@ -366,7 +373,9 @@ sub spreadstdin {
my $i = rindex($buf,$recendrecstart);
if($i != -1) {
$i += length $recend; # find the actual splitting location
$anything_written += write_record_to_pipe(\$header,\$buf,$recstart,$recend,$i);
$anything_written +=
write_record_to_pipe($chunk_number++,\$header,
\$buf,$recstart,$recend,$i);
substr($buf,0,$i) = "";
}
}
@ -382,7 +391,7 @@ sub spreadstdin {
# If there is anything left in the buffer write it
substr($buf,0,0) = "";
write_record_to_pipe(\$header,\$buf,$recstart,$recend,length $buf);
write_record_to_pipe($chunk_number++,\$header,\$buf,$recstart,$recend,length $buf);
::debug("Done reading input\n");
$Global::start_no_new_jobs ||= 1;
@ -406,12 +415,9 @@ sub nindex {
sub write_record_to_pipe {
# Fork then
# Write record from pos 0 .. $endpos to pipe
my $header_ref = shift;
my $record_ref = shift;
my $recstart = shift;
my $recend = shift;
my $endpos = shift;
my ($chunk_number,$header_ref,$record_ref,$recstart,$recend,$endpos) = @_;
if($endpos == 0) { return 0; }
if(vec($Global::job_already_run,$chunk_number,1)) { return 1; }
# Find the minimal seq $job that has no data written == virgin
# If no virgin found, backoff
my $sleep = 0.0001; # 0.01 ms - better performance on highend
@ -423,6 +429,8 @@ sub write_record_to_pipe {
start_more_jobs();
}
my $job = shift @Global::virgin_jobs;
# Job is no longer virgin
$job->set_virgin(0);
if(fork()) {
# Skip
} else {
@ -921,9 +929,9 @@ sub open_joblog {
$joblog_regexp='^(\d+)';
}
while(<$joblog_fh>) {
if(/$joblog_regexp/o) {
if(/$joblog_regexp/) {
# This is 30% faster than set_job_already_run($1);
vec($Global::job_already_run,$1||0,1) = 1;
vec($Global::job_already_run,($1||0),1) = 1;
} elsif(not /\d+\t[^\t]+\t([0-9.]+\t){6}/) {
::error("Format of '$opt::joblog' is wrong: $_");
::wait_and_exit(255);
@ -931,6 +939,7 @@ sub open_joblog {
}
close $joblog_fh;
}
# print unpack("b*", $Global::job_already_run)."\n";
}
if($append) {
# Append to joblog
@ -3314,6 +3323,7 @@ sub new {
'exitsignal' => undef,
# Timestamp for timeout if any
'timeout' => undef,
'virgin' => 1,
}, ref($class) || $class;
}
@ -3402,6 +3412,16 @@ sub write {
syswrite($in,$$remaining_ref);
}
sub virgin {
my $self = shift;
return $self->{'virgin'};
}
sub set_virgin {
my $self = shift;
$self->{'virgin'} = shift;
}
sub pid {
my $self = shift;
return $self->{'pid'};
@ -4050,7 +4070,7 @@ sub print {
my $err = $self->stderr();
my $command = $self->sshlogin_wrap();
if($Global::joblog) {
if($Global::joblog and not $self->virgin()) {
my $cmd;
if($Global::verbose <= 1) {
$cmd = $self->replaced();

View file

@ -114,4 +114,21 @@ echo '### -k -0 -i repl'
echo '### test --sshdelay'
stdout /usr/bin/time -f %e parallel -j0 --sshdelay 0.5 -S localhost true ::: 1 2 3 | perl -ne 'print($_ > 1.80 ? "OK\n" : "Not OK\n")'
echo 'bug #38299: --resume-failed -k'
rm /tmp/joblog-38299
parallel -k --resume-failed --joblog /tmp/joblog-38299 echo job{#}id{}\;exit {} ::: 0 1 2 3 0 1
echo try 2
parallel -k --resume-failed --joblog /tmp/joblog-38299 echo job{#}id{}\;exit {} ::: 0 1 2 3 0 1
echo with exit 0
parallel -k --resume-failed --joblog /tmp/joblog-38299 echo job{#}id{}\;exit 0 ::: 0 1 2 3 0 1
echo '--resume -k'
rm /tmp/joblog-resume
parallel -k --resume --joblog /tmp/joblog-resume echo job{}id\;exit {} ::: 0 1 2 3 0 5
echo try 2 = nothing
parallel -k --resume --joblog /tmp/joblog-resume echo job{}id\;exit {} ::: 0 1 2 3 0 5
echo two extra
parallel -k --resume --joblog /tmp/joblog-resume echo job{}id\;exit 0 ::: 0 1 2 3 0 5 6 7
EOF