diff --git a/doc/release_new_version b/doc/release_new_version index 94dbd36c..50adbc81 100644 --- a/doc/release_new_version +++ b/doc/release_new_version @@ -129,7 +129,9 @@ http://nd.gd/2j Watch the intro video http://nd.gd/0s https://savannah.gnu.org/news/?group=parallel -cat twitters | parallel -j1 echo @{} You have earlier shown interest in GNU Parallel. \ +# Only 350 requests per hour: 1 tweet = 3 requests +# 119 tweets/hour = sleep 30 per tweet (40 to be safe) +cat twitters | parallel -j1 sleep 40\; echo @{} You have earlier shown interest in GNU Parallel. \ A new version is out: http://nd.gd/2j '|' ttytter == Send announce == diff --git a/src/parallel b/src/parallel index 714d2919..f6fd35b8 100755 --- a/src/parallel +++ b/src/parallel @@ -81,7 +81,6 @@ if($::opt_halt_on_error) { sub spreadstdin { # read a record # print it to the first jobs that is ready - my $sleep = 0.1; my $record; my $buf = ""; my ($recstart,$recend,$recerror); @@ -173,7 +172,7 @@ sub nindex { sub flush_and_close_pipes { my $flush_done; - my $sleep = 0.1; + my $sleep = 0.05; do { $flush_done = 1; # Make sure everything is written to the jobs @@ -181,8 +180,8 @@ sub flush_and_close_pipes { for my $job (values %Global::running) { if($job->remaining()) { if($job->complete_write()) { - # Some data was written - $sleep = 0.1; + # Some data was written - reset sleep timer + $sleep = 0.05; } $flush_done = 0; } @@ -210,7 +209,8 @@ sub write_record_to_pipe { $$record_ref =~ s/^$recstart//os; $$record_ref =~ s/$recend$//os; } - my $sleep = 0.1; # 0.1 ms + # Keep the pipes hot, but if nothing happens sleep should back off + my $sleep = 0.00001; # 0.00001 ms - better performance on highend write_record: while(1) { # Sorting according to sequence is necessary for -k to work do_not_reap(); # If Global::running is changed the for loop has a race condition @@ -219,8 +219,8 @@ sub write_record_to_pipe { if($job->remaining()) { # Part of the job's last record has not finished being written if($job->complete_write()) { - # Something got written - $sleep = 0.1; + # Something got written - reset sleep timer + $sleep = 0.00001; } } else { if($job->datawritten() > 0) { @@ -232,14 +232,17 @@ sub write_record_to_pipe { close $fh; } else { $job->write($record_ref); - $sleep = 0.1; + # Something got written - reset sleep timer + $sleep = 0.00001; last write_record; } } } - reap_if_needed(); - usleep($sleep); - $sleep *= 1.1; # exponential back off + # Force reaping as sigchild is sometimes forgotten; + if(not reaper()) { + usleep($sleep); + $sleep = ($sleep < 1000) ? ($sleep * 1.1) : ($sleep); # exponential back off + } } reap_if_needed(); return; @@ -381,7 +384,7 @@ sub get_options_from_array { sub parse_options { # Returns: N/A # Defaults: - $Global::version = 20110322; + $Global::version = 20110323; $Global::progname = 'parallel'; $Global::infinity = 2**31; $Global::debug = 0; @@ -852,8 +855,10 @@ sub hostname { } sub usleep { - my $ms = shift; - select(undef, undef, undef, $ms/1000); + # Sleep this many milliseconds. + my $secs = shift; + ::debug("Sleeping ",$secs," millisecs\n"); + select(undef, undef, undef, $secs/1000); } sub multiply_binary_prefix { @@ -906,34 +911,39 @@ sub drain_job_queue { reap_if_needed(); } my $last_header=""; + my $sleep = 0.2; do { while($Global::total_running > 0) { debug("jobs running: ",$Global::total_running," ",scalar keys %Global::running, " Memory usage:".my_memory_usage()."\n"); - sleep 1; + $sleep = ($sleep < 1000) ? ($sleep * 1.1) : ($sleep); + usleep($sleep); + do_not_reap(); if($::opt_pipe) { # When using --pipe sometimes file handles are not closed properly - do_not_reap(); for my $job (values %Global::running) { my $fh = $job->stdin(); close $fh; } - reap_if_needed(); } - reaper(); # Some systems fail to catch the SIGCHLD if($::opt_progress) { my %progress = progress(); - do_not_reap(); if($last_header ne $progress{'header'}) { print $Global::original_stderr "\n",$progress{'header'},"\n"; $last_header = $progress{'header'}; } print $Global::original_stderr "\r",$progress{'status'}; - reap_if_needed(); } + # Sometimes SIGCHLD is not registered, so force reaper + if(reaper()) { + # Child finished this time around: Reset sleep time + $sleep = 0.2; + } + reap_if_needed(); } if(not $Global::JobQueue->empty()) { start_more_jobs(); # These jobs may not be started because of loadavg - sleep 1; + $sleep = ($sleep < 1000) ? ($sleep * 1.1) : ($sleep); + usleep($sleep); } } while ($Global::total_running > 0 or @@ -1420,8 +1430,10 @@ sub reaper { do_not_reap(); $Private::reaperlevel++; my $stiff; + my $children_reaped = 0; debug("Reaper called $Private::reaperlevel\n"); while (($stiff = waitpid(-1, &WNOHANG)) > 0) { + $children_reaped++; if($Global::sshmaster{$stiff}) { # This is one of the ssh -M: ignore next; @@ -1488,6 +1500,7 @@ sub reaper { reap_if_needed(); debug("Reaper exit $Private::reaperlevel\n"); $Private::reaperlevel--; + return $children_reaped; } sub __USAGE__ {} @@ -3989,7 +4002,7 @@ sub new { sub acquire { my $self = shift; - my $exponential_backoff = 1; + my $sleep = 1; # 1 ms while(1) { $self->atomic_link_if_count_less_than() and last; ::debug("Remove dead locks"); @@ -4008,9 +4021,10 @@ sub acquire { } # try again $self->atomic_link_if_count_less_than() and last; - ::usleep(rand()*$exponential_backoff); # Retry slower and slower up to 1 second - $exponential_backoff = ($exponential_backoff < 1000) ? ($exponential_backoff * 1.1) : ($exponential_backoff); + $sleep = ($sleep < 1000) ? ($sleep * 1.1) : ($sleep); + # Random to avoid every sleeping job waking up at the same time + ::usleep(rand()*$sleep); # TODO if timeout: last } ::debug("acquired $self->{'pid'}\n"); @@ -4064,13 +4078,16 @@ sub nlinks { sub lock { my $self = shift; + my $sleep = 100; # 100 ms open $self->{'lockfh'}, ">", $self->{'lockfile'} or ::die_bug("Can't open semaphore file $self->{'lockfile'}: $!"); chmod 0666, $self->{'lockfile'}; # assuming you want it a+rw while(not flock $self->{'lockfh'}, LOCK_EX()|LOCK_NB()) { ::debug("Cannot lock $self->{'lockfile'}"); # TODO if timeout: last - sleep 1; + $sleep = ($sleep < 1000) ? ($sleep * 1.1) : ($sleep); + # Random to avoid every sleeping job waking up at the same time + ::usleep(rand()*$sleep); } ::debug("locked $self->{'lockfile'}"); } diff --git a/src/parallel.pod b/src/parallel.pod index 31e6a5e9..8035bc3a 100644 --- a/src/parallel.pod +++ b/src/parallel.pod @@ -538,7 +538,7 @@ Implies B<-X> unless B<-m> is set. Do not start new jobs on a given computer unless the load is less than I. I uses the same syntax as B<--jobs>, so I<100%> -is a valid setting. +for one per CPU is a valid setting. The load average is only sampled every 10 seconds to avoid stressing small computers. diff --git a/testsuite/Portable.sh b/testsuite/Portable.sh index c7e01841..bbe18c70 100644 --- a/testsuite/Portable.sh +++ b/testsuite/Portable.sh @@ -4,7 +4,7 @@ export LANG=C SHFILE=/tmp/unittest-parallel.sh # These tests seem to work on another machine -ls -t tests-to-run/test{01,03,04,05,06,07,08,09,11,15,22,24,25,26,28,29,31,33,34,39,40,43,49,52,53,54}.sh \ +ls -t tests-to-run/test{01,03,04,05,06,07,08,09,11,15,22,24,25,26,28,29,31,33,34,39,40,43,49,52,53,54,55}.sh \ tests-to-run/niceload01.sh tests-to-run/sem01.sh \ | perl -pe 's:(.*/(.*)).sh:bash $1.sh > actual-results/$2; diff -Naur wanted-results/$2 actual-results/$2:' \ >$SHFILE diff --git a/testsuite/tests-to-run/test55.sh b/testsuite/tests-to-run/test55.sh new file mode 100644 index 00000000..affa9b60 --- /dev/null +++ b/testsuite/tests-to-run/test55.sh @@ -0,0 +1,6 @@ +#!/bin/bash + +echo '### Test race condition on 8 CPU (my laptop)' +seq 1 5000000 > /tmp/parallel_test +seq 1 10 | parallel -k "cat /tmp/parallel_test | parallel --pipe --recend '' -k gzip >/dev/null; echo {}" +