Fixed: bug #36139: --load causes parallel not to finish

This commit is contained in:
Ole Tange 2012-04-09 16:31:05 +02:00
parent 27c776acb5
commit f4f5fdd6c6
6 changed files with 101 additions and 78 deletions

View file

@ -181,9 +181,22 @@ for stable long-term use.
New in this release: New in this release:
* Article: Computational and experimental analyses of
retrotransposon-associated minisatellite DNAs in the soybean genome.
http://www.biomedcentral.com/content/supplementary/1471-2105-13-s2-s13-s1.pdf
* Blog post: GNU parallel - the best thing since sliced bread. * Blog post: GNU parallel - the best thing since sliced bread.
https://arrayal.wordpress.com/2012/03/27/gnu-parallel-the-best-thing-since-sliced-bread/ https://arrayal.wordpress.com/2012/03/27/gnu-parallel-the-best-thing-since-sliced-bread/
* Blog post: GNU Parallel makes everything faster.
http://compbiously.blogspot.com/2012/03/gnu-parallel-makes-everything-faster.html
* Blog post (German): Howto: GNU parallel.
http://d24m.de/2012/04/05/howto-gnu-parallel/
* Blog post: Running in Parallel
http://interactivity.ifactory.com/2012/04/running-in-parallel/
* Bug fixes and man page updates. * Bug fixes and man page updates.

View file

@ -42,7 +42,6 @@ if(not $ENV{SHELL}) {
$SIG{TERM} = sub {}; # Dummy until jobs really start $SIG{TERM} = sub {}; # Dummy until jobs really start
open $Global::original_stderr, ">&STDERR" or ::die_bug("Can't dup STDERR: $!"); open $Global::original_stderr, ">&STDERR" or ::die_bug("Can't dup STDERR: $!");
do_not_reap();
parse_options(); parse_options();
my $number_of_args; my $number_of_args;
if($Global::max_number_of_args) { if($Global::max_number_of_args) {
@ -159,10 +158,10 @@ start_more_jobs();
if($::opt_pipe) { if($::opt_pipe) {
spreadstdin(@fhlist); spreadstdin(@fhlist);
} }
reap_if_needed();
::debug("Start draining\n"); ::debug("Start draining\n");
drain_job_queue(); drain_job_queue();
::debug("Done draining\n"); ::debug("Done draining\n");
reaper();
cleanup(); cleanup();
if($Global::semaphore) { if($Global::semaphore) {
$sem->release(); $sem->release();
@ -225,7 +224,6 @@ sub spreadstdin {
for my $in (@fhlist) { for my $in (@fhlist) {
while(!$force_one_time_through++ or read($in,substr($buf,length $buf,0),$::opt_blocksize)) { while(!$force_one_time_through++ or read($in,substr($buf,length $buf,0),$::opt_blocksize)) {
# substr above = append to $buf # substr above = append to $buf
reap_if_needed(); # Re-enable reaping after read() (Bug#33352)
if($::opt_r) { if($::opt_r) {
# Remove empty lines # Remove empty lines
$buf=~s/^\s*\n//gm; $buf=~s/^\s*\n//gm;
@ -267,12 +265,11 @@ sub spreadstdin {
$i += length $recend; # find the actual splitting location $i += length $recend; # find the actual splitting location
my $record = $header.substr($buf,0,$i); my $record = $header.substr($buf,0,$i);
substr($buf,0,$i) = ""; substr($buf,0,$i) = "";
::debug("Read record: ".length($record)."\n"); # ::debug("Read record: ".length($record)."\n");
write_record_to_pipe(\$record,$recstart,$recend); write_record_to_pipe(\$record,$recstart,$recend);
} }
} }
} }
do_not_reap(); # Disable reaping before read($in) (Bug#33352)
} }
} }
@ -309,7 +306,6 @@ sub flush_and_close_pipes {
do { do {
$flush_done = 1; $flush_done = 1;
# Make sure everything is written to the jobs # Make sure everything is written to the jobs
do_not_reap();
for my $job (values %Global::running) { for my $job (values %Global::running) {
if($job->remaining()) { if($job->remaining()) {
if($job->complete_write()) { if($job->complete_write()) {
@ -319,16 +315,12 @@ sub flush_and_close_pipes {
$flush_done = 0; $flush_done = 0;
} }
} }
reap_if_needed(); $sleep = ::reap_usleep($sleep);
usleep($sleep);
$sleep *= 1.1; # exponential back off
} while (not $flush_done); } while (not $flush_done);
do_not_reap();
for my $job (values %Global::running) { for my $job (values %Global::running) {
my $fh = $job->stdin(); my $fh = $job->stdin();
close $fh; close $fh;
} }
reap_if_needed();
} }
sub write_record_to_pipe { sub write_record_to_pipe {
@ -346,7 +338,6 @@ sub write_record_to_pipe {
my $sleep = 0.00001; # 0.00001 ms - better performance on highend my $sleep = 0.00001; # 0.00001 ms - better performance on highend
write_record: while(1) { write_record: while(1) {
# Sorting according to sequence is necessary for -k to work # Sorting according to sequence is necessary for -k to work
do_not_reap(); # If Global::running is changed the for loop has a race condition
for my $job (sort { $a->seq() <=> $b->seq() } values %Global::running) { for my $job (sort { $a->seq() <=> $b->seq() } values %Global::running) {
::debug("Looking at ",$job->seq(),"-",$job->remaining(),"-",$job->datawritten(),"\n"); ::debug("Looking at ",$job->seq(),"-",$job->remaining(),"-",$job->datawritten(),"\n");
if($job->remaining()) { if($job->remaining()) {
@ -371,13 +362,10 @@ sub write_record_to_pipe {
} }
} }
} }
# Force reaping as sigchild is sometimes forgotten; # Maybe this should be in an if statement: if sleep > 0.001: start more
if(not reaper()) { start_more_jobs(); # These jobs may not be started because of loadavg
usleep($sleep); $sleep = ::reap_usleep($sleep);
$sleep = ($sleep < 1000) ? ($sleep * 1.1) : ($sleep); # exponential back off
} }
}
reap_if_needed();
return; return;
} }
@ -1166,7 +1154,7 @@ sub start_another_job {
debug("Command to run on '".$job->sshlogin()."': '".$job->replaced()."'\n"); debug("Command to run on '".$job->sshlogin()."': '".$job->replaced()."'\n");
if($job->start()) { if($job->start()) {
$Global::running{$job->pid()} = $job; $Global::running{$job->pid()} = $job;
debug("Started as seq ".$job->seq(),"\n"); debug("Started as seq ",$job->seq()," pid:",$job->pid(),"\n");
return 1; return 1;
} else { } else {
# If interactive says: Dont run the job, then skip it and run the next # If interactive says: Dont run the job, then skip it and run the next
@ -1184,9 +1172,7 @@ sub drain_job_queue {
# Returns: N/A # Returns: N/A
$Private::first_completed ||= time; $Private::first_completed ||= time;
if($::opt_progress) { if($::opt_progress) {
do_not_reap();
print $Global::original_stderr init_progress(); print $Global::original_stderr init_progress();
reap_if_needed();
} }
my $last_header=""; my $last_header="";
my $sleep = 0.2; my $sleep = 0.2;
@ -1195,9 +1181,6 @@ sub drain_job_queue {
debug("jobs running: ", $Global::total_running, "==", scalar debug("jobs running: ", $Global::total_running, "==", scalar
keys %Global::running," slots: ", $Global::max_jobs_running, keys %Global::running," slots: ", $Global::max_jobs_running,
" Memory usage:".my_memory_usage()." "); " Memory usage:".my_memory_usage()." ");
$sleep = ($sleep < 1000) ? ($sleep * 1.1) : ($sleep);
usleep($sleep);
do_not_reap();
if($::opt_pipe) { if($::opt_pipe) {
# When using --pipe sometimes file handles are not closed properly # When using --pipe sometimes file handles are not closed properly
for my $job (values %Global::running) { for my $job (values %Global::running) {
@ -1214,16 +1197,11 @@ sub drain_job_queue {
print $Global::original_stderr "\r",$progress{'status'}; print $Global::original_stderr "\r",$progress{'status'};
} }
# Sometimes SIGCHLD is not registered, so force reaper # Sometimes SIGCHLD is not registered, so force reaper
if(reaper()) { $sleep = ::reap_usleep($sleep);
# Child finished this time around: Reset sleep time
$sleep = 0.2;
}
reap_if_needed();
} }
if(not $Global::JobQueue->empty()) { if(not $Global::JobQueue->empty()) {
start_more_jobs(); # These jobs may not be started because of loadavg start_more_jobs(); # These jobs may not be started because of loadavg
$sleep = ($sleep < 1000) ? ($sleep * 1.1) : ($sleep); $sleep = ::reap_usleep($sleep);
usleep($sleep);
} }
} while ($Global::total_running > 0 } while ($Global::total_running > 0
or or
@ -1633,47 +1611,22 @@ sub start_no_new_jobs {
$Global::start_no_new_jobs++; $Global::start_no_new_jobs++;
} }
sub count_sig_child {
# Returns: N/A
$Global::sig_child_caught++;
}
sub do_not_reap {
# This will postpone SIGCHILD for sections that cannot be distracted by a dying child
# (Racecondition)
# Returns: N/A
$SIG{CHLD} = \&count_sig_child;
}
sub reap_if_needed {
# Do the postponed SIGCHILDs if any and re-install normal reaper for SIGCHILD
# (Racecondition)
# Returns: N/A
if($Global::sig_child_caught) {
$Global::sig_child_caught = 0;
reaper();
}
$SIG{CHLD} = \&reaper;
}
sub reaper { sub reaper {
# A job finished. # A job finished.
# Print the output. # Print the output.
# Start another job # Start another job
# Returns: N/A # Returns: N/A
do_not_reap();
$Private::reaperlevel++;
my $stiff; my $stiff;
my $children_reaped = 0; my $children_reaped = 0;
debug("Reaper called $Private::reaperlevel "); debug("Reaper called ");
while (($stiff = waitpid(-1, &WNOHANG)) > 0) { while (($stiff = waitpid(-1, &WNOHANG)) > 0) {
$children_reaped++; $children_reaped++;
if($Global::sshmaster{$stiff}) { if($Global::sshmaster{$stiff}) {
# This is one of the ssh -M: ignore # This is one of the ssh -M: ignore
next; next;
} }
# Ignore processes that we did not start
my $job = $Global::running{$stiff}; my $job = $Global::running{$stiff};
# '-a <(seq 10)' will give us a pid not in %Global::running
$job or next; $job or next;
$job->set_exitstatus($? >> 8); $job->set_exitstatus($? >> 8);
$job->set_exitsignal($? & 127); $job->set_exitsignal($? & 127);
@ -1732,9 +1685,7 @@ sub reaper {
delete $Global::running{$stiff}; delete $Global::running{$stiff};
start_more_jobs(); start_more_jobs();
} }
reap_if_needed(); debug("Reaper exit\n");
debug("Reaper exit $Private::reaperlevel\n");
$Private::reaperlevel--;
return $children_reaped; return $children_reaped;
} }
@ -1931,6 +1882,21 @@ sub hostname {
return $Private::hostname; return $Private::hostname;
} }
sub reap_usleep {
# Reap dead children.
# If no children: Sleep specified amount with exponential backoff
# Returns:
# 0.00001 if children reaped (0.00001 ms works best on highend)
# $ms*1.1 if no children reaped
my $ms = shift;
if(reaper()) {
return 0.00001;
} else {
usleep($ms);
return (($ms < 1000) ? ($ms * 1.1) : ($ms)); # exponential back off
}
}
sub usleep { sub usleep {
# Sleep this many milliseconds. # Sleep this many milliseconds.
my $secs = shift; my $secs = shift;
@ -2244,7 +2210,7 @@ sub loadavg {
$update_loadavg_file = 1; $update_loadavg_file = 1;
} }
if($update_loadavg_file) { if($update_loadavg_file) {
::debug("Updating loadavg file".$self->{'loadavg_file'}); ::debug("Updating loadavg file".$self->{'loadavg_file'}."\n");
$self->{'last_loadavg_update'} = time; $self->{'last_loadavg_update'} = time;
-e $ENV{'HOME'}."/.parallel" or mkdir $ENV{'HOME'}."/.parallel"; -e $ENV{'HOME'}."/.parallel" or mkdir $ENV{'HOME'}."/.parallel";
-e $ENV{'HOME'}."/.parallel/tmp" or mkdir $ENV{'HOME'}."/.parallel/tmp"; -e $ENV{'HOME'}."/.parallel/tmp" or mkdir $ENV{'HOME'}."/.parallel/tmp";
@ -2372,7 +2338,6 @@ sub processes_available_by_system_limit {
my $time = time; my $time = time;
my %fh; my %fh;
my @children; my @children;
::do_not_reap();
# Reserve filehandles # Reserve filehandles
# perl uses 7 filehandles for something? # perl uses 7 filehandles for something?

View file

@ -1,15 +1,13 @@
#!/bin/bash #!/bin/bash
echo '### Test --joblog with exitval' echo '### Test --joblog with exitval and Test --joblog with signal'
parallel --joblog /tmp/parallel_test_joblog 'sleep {} && echo foo' ::: 100 2>/dev/null & parallel --joblog /tmp/parallel_joblog_exitval 'sleep {} && echo foo' ::: 100 2>/dev/null &
sleep 1 parallel --joblog /tmp/parallel_joblog_signal 'sleep {}' ::: 100 2>/dev/null &
sleep 0.5
killall -6 sleep killall -6 sleep
grep -q 134 /tmp/parallel_test_joblog && echo OK sleep 0.1
grep -q 134 /tmp/parallel_joblog_exitval && echo exitval OK
grep -q '[^0-9]6[^0-9]' /tmp/parallel_joblog_signal && echo signal OK
echo '### Test --joblog with signal' rm /tmp/parallel_joblog_exitval /tmp/parallel_joblog_signal
parallel --joblog /tmp/parallel_test_joblog 'sleep {}' ::: 100 2>/dev/null &
sleep 1
killall -6 sleep
grep -q '[^0-9]6[^0-9]' /tmp/parallel_test_joblog && echo OK
rm /tmp/parallel_test_joblog

View file

@ -0,0 +1,20 @@
#!/bin/bash
echo "### Test : as delimiter. This can be confusing for uptime ie. --load";
parallel -k --load 100% -d : echo ::: a:b:c
export PARALLEL="--load 100%"
echo PARALLEL=$PARALLEL
for i in $(seq 2 10); do
i2=$[i*i]
seq $i2 | parallel -j0 --load 100% -kX echo {} |wc
seq 1 ${i2}0000 | nice parallel -kj20 --recend "\n" --spreadstdin gzip -9 | zcat | sort -n | md5sum
done
echo "### Test if --load blocks. Bug.";
seq 1 1000 | parallel -kj2 --load 100% --recend "\n" --spreadstdin gzip -9 | zcat | sort -n | md5sum
seq 1 1000 | parallel -kj0 --load 100% --recend "\n" --spreadstdin gzip -9 | zcat | sort -n | md5sum
seq 1 1000000 | parallel -kj0 --recend "\n" --spreadstdin gzip -9 | zcat | sort -n | md5sum
seq 1 1000000 | nice parallel -kj20 --recend "\n" --spreadstdin gzip -9 | zcat | sort -n | md5sum

View file

@ -1,4 +1,3 @@
### Test --joblog with exitval ### Test --joblog with exitval and Test --joblog with signal
OK exitval OK
### Test --joblog with signal signal OK
OK

View file

@ -0,0 +1,28 @@
### Test : as delimiter. This can be confusing for uptime ie. --load
a
b
c
PARALLEL=--load 100%
4 4 8
1c0f34fee7176dc367bead8f96cba6bc -
9 9 18
fa364205fcf6665c6f3e6cb868f65fd6 -
16 16 39
6f5db0373227d2281dc26b1bf63b4027 -
25 25 66
17e914b4a407dccd370c13173865deb1 -
36 36 99
5ee21398ecde0f3ea9b6093fbaf5a3c2 -
49 49 138
2af8be7306df18164a68e30e427217e0 -
64 64 183
f78c5b3d13146c60c9b586f51d05a4ae -
81 81 234
c88e1757ddc619efd9ee507a7702b53c -
100 100 292
8a7095c1c23bfadc311fe6b16d950582 -
### Test if --load blocks. Bug.
53d025127ae99ab79e8502aae2d9bea6 -
53d025127ae99ab79e8502aae2d9bea6 -
8a7095c1c23bfadc311fe6b16d950582 -
8a7095c1c23bfadc311fe6b16d950582 -