parallel: Fixed timing issue for --pipe on 8 core cpu.

This commit is contained in:
Ole Tange 2011-03-22 21:02:22 +01:00
parent 43bc68ae3a
commit 105c92676f
5 changed files with 53 additions and 28 deletions

View file

@ -129,7 +129,9 @@ http://nd.gd/2j Watch the intro video http://nd.gd/0s
https://savannah.gnu.org/news/?group=parallel https://savannah.gnu.org/news/?group=parallel
cat twitters | parallel -j1 echo @{} You have earlier shown interest in GNU Parallel. \ # Only 350 requests per hour: 1 tweet = 3 requests
# 119 tweets/hour = sleep 30 per tweet (40 to be safe)
cat twitters | parallel -j1 sleep 40\; echo @{} You have earlier shown interest in GNU Parallel. \
A new version is out: http://nd.gd/2j '|' ttytter A new version is out: http://nd.gd/2j '|' ttytter
== Send announce == == Send announce ==

View file

@ -81,7 +81,6 @@ if($::opt_halt_on_error) {
sub spreadstdin { sub spreadstdin {
# read a record # read a record
# print it to the first jobs that is ready # print it to the first jobs that is ready
my $sleep = 0.1;
my $record; my $record;
my $buf = ""; my $buf = "";
my ($recstart,$recend,$recerror); my ($recstart,$recend,$recerror);
@ -173,7 +172,7 @@ sub nindex {
sub flush_and_close_pipes { sub flush_and_close_pipes {
my $flush_done; my $flush_done;
my $sleep = 0.1; my $sleep = 0.05;
do { do {
$flush_done = 1; $flush_done = 1;
# Make sure everything is written to the jobs # Make sure everything is written to the jobs
@ -181,8 +180,8 @@ sub flush_and_close_pipes {
for my $job (values %Global::running) { for my $job (values %Global::running) {
if($job->remaining()) { if($job->remaining()) {
if($job->complete_write()) { if($job->complete_write()) {
# Some data was written # Some data was written - reset sleep timer
$sleep = 0.1; $sleep = 0.05;
} }
$flush_done = 0; $flush_done = 0;
} }
@ -210,7 +209,8 @@ sub write_record_to_pipe {
$$record_ref =~ s/^$recstart//os; $$record_ref =~ s/^$recstart//os;
$$record_ref =~ s/$recend$//os; $$record_ref =~ s/$recend$//os;
} }
my $sleep = 0.1; # 0.1 ms # Keep the pipes hot, but if nothing happens sleep should back off
my $sleep = 0.00001; # 0.00001 ms - better performance on highend
write_record: while(1) { write_record: while(1) {
# Sorting according to sequence is necessary for -k to work # Sorting according to sequence is necessary for -k to work
do_not_reap(); # If Global::running is changed the for loop has a race condition do_not_reap(); # If Global::running is changed the for loop has a race condition
@ -219,8 +219,8 @@ sub write_record_to_pipe {
if($job->remaining()) { if($job->remaining()) {
# Part of the job's last record has not finished being written # Part of the job's last record has not finished being written
if($job->complete_write()) { if($job->complete_write()) {
# Something got written # Something got written - reset sleep timer
$sleep = 0.1; $sleep = 0.00001;
} }
} else { } else {
if($job->datawritten() > 0) { if($job->datawritten() > 0) {
@ -232,14 +232,17 @@ sub write_record_to_pipe {
close $fh; close $fh;
} else { } else {
$job->write($record_ref); $job->write($record_ref);
$sleep = 0.1; # Something got written - reset sleep timer
$sleep = 0.00001;
last write_record; last write_record;
} }
} }
} }
reap_if_needed(); # Force reaping as sigchild is sometimes forgotten;
usleep($sleep); if(not reaper()) {
$sleep *= 1.1; # exponential back off usleep($sleep);
$sleep = ($sleep < 1000) ? ($sleep * 1.1) : ($sleep); # exponential back off
}
} }
reap_if_needed(); reap_if_needed();
return; return;
@ -381,7 +384,7 @@ sub get_options_from_array {
sub parse_options { sub parse_options {
# Returns: N/A # Returns: N/A
# Defaults: # Defaults:
$Global::version = 20110322; $Global::version = 20110323;
$Global::progname = 'parallel'; $Global::progname = 'parallel';
$Global::infinity = 2**31; $Global::infinity = 2**31;
$Global::debug = 0; $Global::debug = 0;
@ -852,8 +855,10 @@ sub hostname {
} }
sub usleep { sub usleep {
my $ms = shift; # Sleep this many milliseconds.
select(undef, undef, undef, $ms/1000); my $secs = shift;
::debug("Sleeping ",$secs," millisecs\n");
select(undef, undef, undef, $secs/1000);
} }
sub multiply_binary_prefix { sub multiply_binary_prefix {
@ -906,34 +911,39 @@ sub drain_job_queue {
reap_if_needed(); reap_if_needed();
} }
my $last_header=""; my $last_header="";
my $sleep = 0.2;
do { do {
while($Global::total_running > 0) { while($Global::total_running > 0) {
debug("jobs running: ",$Global::total_running," ",scalar keys %Global::running, " Memory usage:".my_memory_usage()."\n"); debug("jobs running: ",$Global::total_running," ",scalar keys %Global::running, " Memory usage:".my_memory_usage()."\n");
sleep 1; $sleep = ($sleep < 1000) ? ($sleep * 1.1) : ($sleep);
usleep($sleep);
do_not_reap();
if($::opt_pipe) { if($::opt_pipe) {
# When using --pipe sometimes file handles are not closed properly # When using --pipe sometimes file handles are not closed properly
do_not_reap();
for my $job (values %Global::running) { for my $job (values %Global::running) {
my $fh = $job->stdin(); my $fh = $job->stdin();
close $fh; close $fh;
} }
reap_if_needed();
} }
reaper(); # Some systems fail to catch the SIGCHLD
if($::opt_progress) { if($::opt_progress) {
my %progress = progress(); my %progress = progress();
do_not_reap();
if($last_header ne $progress{'header'}) { if($last_header ne $progress{'header'}) {
print $Global::original_stderr "\n",$progress{'header'},"\n"; print $Global::original_stderr "\n",$progress{'header'},"\n";
$last_header = $progress{'header'}; $last_header = $progress{'header'};
} }
print $Global::original_stderr "\r",$progress{'status'}; print $Global::original_stderr "\r",$progress{'status'};
reap_if_needed();
} }
# Sometimes SIGCHLD is not registered, so force reaper
if(reaper()) {
# Child finished this time around: Reset sleep time
$sleep = 0.2;
}
reap_if_needed();
} }
if(not $Global::JobQueue->empty()) { if(not $Global::JobQueue->empty()) {
start_more_jobs(); # These jobs may not be started because of loadavg start_more_jobs(); # These jobs may not be started because of loadavg
sleep 1; $sleep = ($sleep < 1000) ? ($sleep * 1.1) : ($sleep);
usleep($sleep);
} }
} while ($Global::total_running > 0 } while ($Global::total_running > 0
or or
@ -1420,8 +1430,10 @@ sub reaper {
do_not_reap(); do_not_reap();
$Private::reaperlevel++; $Private::reaperlevel++;
my $stiff; my $stiff;
my $children_reaped = 0;
debug("Reaper called $Private::reaperlevel\n"); debug("Reaper called $Private::reaperlevel\n");
while (($stiff = waitpid(-1, &WNOHANG)) > 0) { while (($stiff = waitpid(-1, &WNOHANG)) > 0) {
$children_reaped++;
if($Global::sshmaster{$stiff}) { if($Global::sshmaster{$stiff}) {
# This is one of the ssh -M: ignore # This is one of the ssh -M: ignore
next; next;
@ -1488,6 +1500,7 @@ sub reaper {
reap_if_needed(); reap_if_needed();
debug("Reaper exit $Private::reaperlevel\n"); debug("Reaper exit $Private::reaperlevel\n");
$Private::reaperlevel--; $Private::reaperlevel--;
return $children_reaped;
} }
sub __USAGE__ {} sub __USAGE__ {}
@ -3989,7 +4002,7 @@ sub new {
sub acquire { sub acquire {
my $self = shift; my $self = shift;
my $exponential_backoff = 1; my $sleep = 1; # 1 ms
while(1) { while(1) {
$self->atomic_link_if_count_less_than() and last; $self->atomic_link_if_count_less_than() and last;
::debug("Remove dead locks"); ::debug("Remove dead locks");
@ -4008,9 +4021,10 @@ sub acquire {
} }
# try again # try again
$self->atomic_link_if_count_less_than() and last; $self->atomic_link_if_count_less_than() and last;
::usleep(rand()*$exponential_backoff);
# Retry slower and slower up to 1 second # Retry slower and slower up to 1 second
$exponential_backoff = ($exponential_backoff < 1000) ? ($exponential_backoff * 1.1) : ($exponential_backoff); $sleep = ($sleep < 1000) ? ($sleep * 1.1) : ($sleep);
# Random to avoid every sleeping job waking up at the same time
::usleep(rand()*$sleep);
# TODO if timeout: last # TODO if timeout: last
} }
::debug("acquired $self->{'pid'}\n"); ::debug("acquired $self->{'pid'}\n");
@ -4064,13 +4078,16 @@ sub nlinks {
sub lock { sub lock {
my $self = shift; my $self = shift;
my $sleep = 100; # 100 ms
open $self->{'lockfh'}, ">", $self->{'lockfile'} open $self->{'lockfh'}, ">", $self->{'lockfile'}
or ::die_bug("Can't open semaphore file $self->{'lockfile'}: $!"); or ::die_bug("Can't open semaphore file $self->{'lockfile'}: $!");
chmod 0666, $self->{'lockfile'}; # assuming you want it a+rw chmod 0666, $self->{'lockfile'}; # assuming you want it a+rw
while(not flock $self->{'lockfh'}, LOCK_EX()|LOCK_NB()) { while(not flock $self->{'lockfh'}, LOCK_EX()|LOCK_NB()) {
::debug("Cannot lock $self->{'lockfile'}"); ::debug("Cannot lock $self->{'lockfile'}");
# TODO if timeout: last # TODO if timeout: last
sleep 1; $sleep = ($sleep < 1000) ? ($sleep * 1.1) : ($sleep);
# Random to avoid every sleeping job waking up at the same time
::usleep(rand()*$sleep);
} }
::debug("locked $self->{'lockfile'}"); ::debug("locked $self->{'lockfile'}");
} }

View file

@ -538,7 +538,7 @@ Implies B<-X> unless B<-m> is set.
Do not start new jobs on a given computer unless the load is less than Do not start new jobs on a given computer unless the load is less than
I<max-load>. I<max-load> uses the same syntax as B<--jobs>, so I<100%> I<max-load>. I<max-load> uses the same syntax as B<--jobs>, so I<100%>
is a valid setting. for one per CPU is a valid setting.
The load average is only sampled every 10 seconds to avoid stressing The load average is only sampled every 10 seconds to avoid stressing
small computers. small computers.

View file

@ -4,7 +4,7 @@ export LANG=C
SHFILE=/tmp/unittest-parallel.sh SHFILE=/tmp/unittest-parallel.sh
# These tests seem to work on another machine # These tests seem to work on another machine
ls -t tests-to-run/test{01,03,04,05,06,07,08,09,11,15,22,24,25,26,28,29,31,33,34,39,40,43,49,52,53,54}.sh \ ls -t tests-to-run/test{01,03,04,05,06,07,08,09,11,15,22,24,25,26,28,29,31,33,34,39,40,43,49,52,53,54,55}.sh \
tests-to-run/niceload01.sh tests-to-run/sem01.sh \ tests-to-run/niceload01.sh tests-to-run/sem01.sh \
| perl -pe 's:(.*/(.*)).sh:bash $1.sh > actual-results/$2; diff -Naur wanted-results/$2 actual-results/$2:' \ | perl -pe 's:(.*/(.*)).sh:bash $1.sh > actual-results/$2; diff -Naur wanted-results/$2 actual-results/$2:' \
>$SHFILE >$SHFILE

View file

@ -0,0 +1,6 @@
#!/bin/bash
echo '### Test race condition on 8 CPU (my laptop)'
seq 1 5000000 > /tmp/parallel_test
seq 1 10 | parallel -k "cat /tmp/parallel_test | parallel --pipe --recend '' -k gzip >/dev/null; echo {}"