From f4f5fdd6c67a46320924b5b81199613d87ecdc6b Mon Sep 17 00:00:00 2001 From: Ole Tange Date: Mon, 9 Apr 2012 16:31:05 +0200 Subject: [PATCH] Fixed: bug #36139: --load causes parallel not to finish --- doc/release_new_version | 13 +++++ src/parallel | 93 ++++++++++---------------------- testsuite/tests-to-run/test62.sh | 18 +++---- testsuite/tests-to-run/test66.sh | 20 +++++++ testsuite/wanted-results/test62 | 7 ++- testsuite/wanted-results/test66 | 28 ++++++++++ 6 files changed, 101 insertions(+), 78 deletions(-) create mode 100644 testsuite/tests-to-run/test66.sh create mode 100644 testsuite/wanted-results/test66 diff --git a/doc/release_new_version b/doc/release_new_version index 939abe9a..7d6587cd 100644 --- a/doc/release_new_version +++ b/doc/release_new_version @@ -181,9 +181,22 @@ for stable long-term use. New in this release: +* Article: Computational and experimental analyses of + retrotransposon-associated minisatellite DNAs in the soybean genome. + http://www.biomedcentral.com/content/supplementary/1471-2105-13-s2-s13-s1.pdf + * Blog post: GNU parallel - the best thing since sliced bread. https://arrayal.wordpress.com/2012/03/27/gnu-parallel-the-best-thing-since-sliced-bread/ +* Blog post: GNU Parallel makes everything faster. + http://compbiously.blogspot.com/2012/03/gnu-parallel-makes-everything-faster.html + +* Blog post (German): Howto: GNU parallel. + http://d24m.de/2012/04/05/howto-gnu-parallel/ + +* Blog post: Running in Parallel + http://interactivity.ifactory.com/2012/04/running-in-parallel/ + * Bug fixes and man page updates. diff --git a/src/parallel b/src/parallel index 16ac0552..6b596b3a 100755 --- a/src/parallel +++ b/src/parallel @@ -42,7 +42,6 @@ if(not $ENV{SHELL}) { $SIG{TERM} = sub {}; # Dummy until jobs really start open $Global::original_stderr, ">&STDERR" or ::die_bug("Can't dup STDERR: $!"); -do_not_reap(); parse_options(); my $number_of_args; if($Global::max_number_of_args) { @@ -159,10 +158,10 @@ start_more_jobs(); if($::opt_pipe) { spreadstdin(@fhlist); } -reap_if_needed(); ::debug("Start draining\n"); drain_job_queue(); ::debug("Done draining\n"); +reaper(); cleanup(); if($Global::semaphore) { $sem->release(); @@ -225,7 +224,6 @@ sub spreadstdin { for my $in (@fhlist) { while(!$force_one_time_through++ or read($in,substr($buf,length $buf,0),$::opt_blocksize)) { # substr above = append to $buf - reap_if_needed(); # Re-enable reaping after read() (Bug#33352) if($::opt_r) { # Remove empty lines $buf=~s/^\s*\n//gm; @@ -267,12 +265,11 @@ sub spreadstdin { $i += length $recend; # find the actual splitting location my $record = $header.substr($buf,0,$i); substr($buf,0,$i) = ""; - ::debug("Read record: ".length($record)."\n"); + # ::debug("Read record: ".length($record)."\n"); write_record_to_pipe(\$record,$recstart,$recend); } } } - do_not_reap(); # Disable reaping before read($in) (Bug#33352) } } @@ -309,7 +306,6 @@ sub flush_and_close_pipes { do { $flush_done = 1; # Make sure everything is written to the jobs - do_not_reap(); for my $job (values %Global::running) { if($job->remaining()) { if($job->complete_write()) { @@ -319,16 +315,12 @@ sub flush_and_close_pipes { $flush_done = 0; } } - reap_if_needed(); - usleep($sleep); - $sleep *= 1.1; # exponential back off + $sleep = ::reap_usleep($sleep); } while (not $flush_done); - do_not_reap(); for my $job (values %Global::running) { my $fh = $job->stdin(); close $fh; } - reap_if_needed(); } sub write_record_to_pipe { @@ -346,7 +338,6 @@ sub write_record_to_pipe { my $sleep = 0.00001; # 0.00001 ms - better performance on highend write_record: while(1) { # Sorting according to sequence is necessary for -k to work - do_not_reap(); # If Global::running is changed the for loop has a race condition for my $job (sort { $a->seq() <=> $b->seq() } values %Global::running) { ::debug("Looking at ",$job->seq(),"-",$job->remaining(),"-",$job->datawritten(),"\n"); if($job->remaining()) { @@ -371,13 +362,10 @@ sub write_record_to_pipe { } } } - # Force reaping as sigchild is sometimes forgotten; - if(not reaper()) { - usleep($sleep); - $sleep = ($sleep < 1000) ? ($sleep * 1.1) : ($sleep); # exponential back off - } - } - reap_if_needed(); + # Maybe this should be in an if statement: if sleep > 0.001: start more + start_more_jobs(); # These jobs may not be started because of loadavg + $sleep = ::reap_usleep($sleep); + } return; } @@ -1166,7 +1154,7 @@ sub start_another_job { debug("Command to run on '".$job->sshlogin()."': '".$job->replaced()."'\n"); if($job->start()) { $Global::running{$job->pid()} = $job; - debug("Started as seq ".$job->seq(),"\n"); + debug("Started as seq ",$job->seq()," pid:",$job->pid(),"\n"); return 1; } else { # If interactive says: Dont run the job, then skip it and run the next @@ -1184,9 +1172,7 @@ sub drain_job_queue { # Returns: N/A $Private::first_completed ||= time; if($::opt_progress) { - do_not_reap(); print $Global::original_stderr init_progress(); - reap_if_needed(); } my $last_header=""; my $sleep = 0.2; @@ -1195,9 +1181,6 @@ sub drain_job_queue { debug("jobs running: ", $Global::total_running, "==", scalar keys %Global::running," slots: ", $Global::max_jobs_running, " Memory usage:".my_memory_usage()." "); - $sleep = ($sleep < 1000) ? ($sleep * 1.1) : ($sleep); - usleep($sleep); - do_not_reap(); if($::opt_pipe) { # When using --pipe sometimes file handles are not closed properly for my $job (values %Global::running) { @@ -1214,16 +1197,11 @@ sub drain_job_queue { print $Global::original_stderr "\r",$progress{'status'}; } # Sometimes SIGCHLD is not registered, so force reaper - if(reaper()) { - # Child finished this time around: Reset sleep time - $sleep = 0.2; - } - reap_if_needed(); + $sleep = ::reap_usleep($sleep); } if(not $Global::JobQueue->empty()) { start_more_jobs(); # These jobs may not be started because of loadavg - $sleep = ($sleep < 1000) ? ($sleep * 1.1) : ($sleep); - usleep($sleep); + $sleep = ::reap_usleep($sleep); } } while ($Global::total_running > 0 or @@ -1633,47 +1611,22 @@ sub start_no_new_jobs { $Global::start_no_new_jobs++; } -sub count_sig_child { - # Returns: N/A - $Global::sig_child_caught++; -} - -sub do_not_reap { - # This will postpone SIGCHILD for sections that cannot be distracted by a dying child - # (Racecondition) - # Returns: N/A - $SIG{CHLD} = \&count_sig_child; -} - -sub reap_if_needed { - # Do the postponed SIGCHILDs if any and re-install normal reaper for SIGCHILD - # (Racecondition) - # Returns: N/A - if($Global::sig_child_caught) { - $Global::sig_child_caught = 0; - reaper(); - } - $SIG{CHLD} = \&reaper; -} - sub reaper { # A job finished. # Print the output. # Start another job # Returns: N/A - do_not_reap(); - $Private::reaperlevel++; my $stiff; my $children_reaped = 0; - debug("Reaper called $Private::reaperlevel "); + debug("Reaper called "); while (($stiff = waitpid(-1, &WNOHANG)) > 0) { $children_reaped++; if($Global::sshmaster{$stiff}) { # This is one of the ssh -M: ignore next; } - # Ignore processes that we did not start my $job = $Global::running{$stiff}; + # '-a <(seq 10)' will give us a pid not in %Global::running $job or next; $job->set_exitstatus($? >> 8); $job->set_exitsignal($? & 127); @@ -1732,9 +1685,7 @@ sub reaper { delete $Global::running{$stiff}; start_more_jobs(); } - reap_if_needed(); - debug("Reaper exit $Private::reaperlevel\n"); - $Private::reaperlevel--; + debug("Reaper exit\n"); return $children_reaped; } @@ -1931,6 +1882,21 @@ sub hostname { return $Private::hostname; } +sub reap_usleep { + # Reap dead children. + # If no children: Sleep specified amount with exponential backoff + # Returns: + # 0.00001 if children reaped (0.00001 ms works best on highend) + # $ms*1.1 if no children reaped + my $ms = shift; + if(reaper()) { + return 0.00001; + } else { + usleep($ms); + return (($ms < 1000) ? ($ms * 1.1) : ($ms)); # exponential back off + } +} + sub usleep { # Sleep this many milliseconds. my $secs = shift; @@ -2244,7 +2210,7 @@ sub loadavg { $update_loadavg_file = 1; } if($update_loadavg_file) { - ::debug("Updating loadavg file".$self->{'loadavg_file'}); + ::debug("Updating loadavg file".$self->{'loadavg_file'}."\n"); $self->{'last_loadavg_update'} = time; -e $ENV{'HOME'}."/.parallel" or mkdir $ENV{'HOME'}."/.parallel"; -e $ENV{'HOME'}."/.parallel/tmp" or mkdir $ENV{'HOME'}."/.parallel/tmp"; @@ -2372,7 +2338,6 @@ sub processes_available_by_system_limit { my $time = time; my %fh; my @children; - ::do_not_reap(); # Reserve filehandles # perl uses 7 filehandles for something? diff --git a/testsuite/tests-to-run/test62.sh b/testsuite/tests-to-run/test62.sh index f8395621..938cf4bc 100755 --- a/testsuite/tests-to-run/test62.sh +++ b/testsuite/tests-to-run/test62.sh @@ -1,15 +1,13 @@ #!/bin/bash -echo '### Test --joblog with exitval' -parallel --joblog /tmp/parallel_test_joblog 'sleep {} && echo foo' ::: 100 2>/dev/null & -sleep 1 +echo '### Test --joblog with exitval and Test --joblog with signal' +parallel --joblog /tmp/parallel_joblog_exitval 'sleep {} && echo foo' ::: 100 2>/dev/null & +parallel --joblog /tmp/parallel_joblog_signal 'sleep {}' ::: 100 2>/dev/null & +sleep 0.5 killall -6 sleep -grep -q 134 /tmp/parallel_test_joblog && echo OK +sleep 0.1 +grep -q 134 /tmp/parallel_joblog_exitval && echo exitval OK +grep -q '[^0-9]6[^0-9]' /tmp/parallel_joblog_signal && echo signal OK -echo '### Test --joblog with signal' -parallel --joblog /tmp/parallel_test_joblog 'sleep {}' ::: 100 2>/dev/null & -sleep 1 -killall -6 sleep -grep -q '[^0-9]6[^0-9]' /tmp/parallel_test_joblog && echo OK +rm /tmp/parallel_joblog_exitval /tmp/parallel_joblog_signal -rm /tmp/parallel_test_joblog diff --git a/testsuite/tests-to-run/test66.sh b/testsuite/tests-to-run/test66.sh new file mode 100644 index 00000000..9e3357e2 --- /dev/null +++ b/testsuite/tests-to-run/test66.sh @@ -0,0 +1,20 @@ +#!/bin/bash + +echo "### Test : as delimiter. This can be confusing for uptime ie. --load"; +parallel -k --load 100% -d : echo ::: a:b:c + +export PARALLEL="--load 100%" +echo PARALLEL=$PARALLEL + +for i in $(seq 2 10); do + i2=$[i*i] + seq $i2 | parallel -j0 --load 100% -kX echo {} |wc + seq 1 ${i2}0000 | nice parallel -kj20 --recend "\n" --spreadstdin gzip -9 | zcat | sort -n | md5sum +done + +echo "### Test if --load blocks. Bug."; +seq 1 1000 | parallel -kj2 --load 100% --recend "\n" --spreadstdin gzip -9 | zcat | sort -n | md5sum +seq 1 1000 | parallel -kj0 --load 100% --recend "\n" --spreadstdin gzip -9 | zcat | sort -n | md5sum + +seq 1 1000000 | parallel -kj0 --recend "\n" --spreadstdin gzip -9 | zcat | sort -n | md5sum +seq 1 1000000 | nice parallel -kj20 --recend "\n" --spreadstdin gzip -9 | zcat | sort -n | md5sum diff --git a/testsuite/wanted-results/test62 b/testsuite/wanted-results/test62 index e892a94d..bba71ca5 100644 --- a/testsuite/wanted-results/test62 +++ b/testsuite/wanted-results/test62 @@ -1,4 +1,3 @@ -### Test --joblog with exitval -OK -### Test --joblog with signal -OK +### Test --joblog with exitval and Test --joblog with signal +exitval OK +signal OK diff --git a/testsuite/wanted-results/test66 b/testsuite/wanted-results/test66 new file mode 100644 index 00000000..20ef0c51 --- /dev/null +++ b/testsuite/wanted-results/test66 @@ -0,0 +1,28 @@ +### Test : as delimiter. This can be confusing for uptime ie. --load +a +b +c +PARALLEL=--load 100% + 4 4 8 +1c0f34fee7176dc367bead8f96cba6bc - + 9 9 18 +fa364205fcf6665c6f3e6cb868f65fd6 - + 16 16 39 +6f5db0373227d2281dc26b1bf63b4027 - + 25 25 66 +17e914b4a407dccd370c13173865deb1 - + 36 36 99 +5ee21398ecde0f3ea9b6093fbaf5a3c2 - + 49 49 138 +2af8be7306df18164a68e30e427217e0 - + 64 64 183 +f78c5b3d13146c60c9b586f51d05a4ae - + 81 81 234 +c88e1757ddc619efd9ee507a7702b53c - + 100 100 292 +8a7095c1c23bfadc311fe6b16d950582 - +### Test if --load blocks. Bug. +53d025127ae99ab79e8502aae2d9bea6 - +53d025127ae99ab79e8502aae2d9bea6 - +8a7095c1c23bfadc311fe6b16d950582 - +8a7095c1c23bfadc311fe6b16d950582 -