-N for --pipe. Fixed race condition in --pipe.

Tests for --pipe. Passes testsuite.
This commit is contained in:
Ole Tange 2011-01-24 20:06:30 +01:00
parent d5c7399d5b
commit 76d9e4f41b
8 changed files with 269 additions and 90 deletions

View file

@ -1,6 +1,7 @@
--block -N for --pipe. Fixed race condition in --pipe.
parallel: --joblog implemented. Testsuite missing. Tests for --pipe.
parallel: --spreadstdin prototype. Testsuite missing. Passes testsuite.
codecoverage codecoverage

View file

@ -47,6 +47,7 @@ pushd /tmp
tar xjvf parallel-$YYYYMMDD.tar.bz2 tar xjvf parallel-$YYYYMMDD.tar.bz2
cd parallel-$YYYYMMDD cd parallel-$YYYYMMDD
./configure && make -j && sudo make -j install ./configure && make -j && sudo make -j install
pushd
== Upload == == Upload ==
@ -135,43 +136,50 @@ cc:Peter Simons <simons@cryp.to>, Sandro Cazzaniga <kharec@mandriva.org>,
Rogério Brito <rbrito@ime.usp.br>, Jonathan Palardy <jonathan.palardy@gmail.com>, Rogério Brito <rbrito@ime.usp.br>, Jonathan Palardy <jonathan.palardy@gmail.com>,
Koen Vervloesem <koen@vervloesem.eu>, R. Tyler Croy <tyler@monkeypox.org>, Koen Vervloesem <koen@vervloesem.eu>, R. Tyler Croy <tyler@monkeypox.org>,
ryoichiro.suzuki@gmail.com,kerick@shiftedbit.net, ryoichiro.suzuki@gmail.com,kerick@shiftedbit.net,
Christian Faulhammer <fauli@gentoo.org> Christian Faulhammer <fauli@gentoo.org>, Ryoichiro Suzuki <ryoichiro.suzuki@gmail.com>
Subject: GNU Parallel 20110122 released Subject: GNU Parallel 2011XXXX released
GNU Parallel 20110122 has been released. It is available for GNU Parallel 2011XXXX has been released. It is available for
download at: http://ftp.gnu.org/gnu/parallel/ download at: http://ftp.gnu.org/gnu/parallel/
This is a major release as the --pipe option introduces a new way to
work. To learn about --pipe see the example section for uses of
--pipe.
But rest assured: No old functionality is changed.
New in this release: New in this release:
* --joblog makes a simple log of completed jobs. * --pipe splits piped data into blocks. Each block is piped to a
program for processing. The piping and the programs will be run in
parallel. Useful if the data you want to process is data for a
program and not arguments.
* -X now spreads arguments between job slots when reaching last * --blocksize sets the blocksize in bytes for --pipe. The blocksize is
argument. Use -j1 to avoid this. approximate. It can deviate as much as the size of one record.
Default is 1M.
* People on the email list have voted -j+0 to be the new default * --recstart sets the regular expression matching the start of a
instead of -j9. record. Default is "".
* First review in Polish. Thanks to Patryk Krawaczyński. * --recend sets the regular expression matching the end of a
http://nfsec.pl/root/2458 record. Default is "\n". To specify none use --recend "".
* Review in Spanish (in print). If both --recstart and --recend are set, the end of a record must be
https://www.linux-magazine.es/issue/67/moreinfo.html followed immediately by a start of a record. This is useful if
either recend or recstart can occur in the middle of a record.
* Review in English. Thanks to Brian Gough. * --output-as-files will put the output of the programs into files and
http://blogs.fsfe.org/bjg/2011/01/gnu-parallel-a-map-operator-for-the-command-line/ instead of giving the output GNU Parallel will output the name of
these files.
* Review in French. Thanks to Denis Dordoigne. * -N set the number of records to read. If used with --blocksize
http://linuxfr.org/2010/12/29/27715.html the block read will at most be --blocksize. -N is much slower
than --blocksize so avoid -N if performance is important.
* Review in Spanish. * Advanced recursive example. Thanks to Ruarí Ødegaard.
http://gufete.net/index.php?entry=entry110116-200022 http://my.opera.com/ruario/blog/2011/01/24/editing-debian-packages-more-fun-with-gnu
* Article with advanced recursive example. Thanks to Ruarí Ødegaard
http://my.opera.com/ruario/blog/2011/01/18/fun-with-gnu-parallel
* Use case for memcache.
http://www.dctrwatson.com/2010/12/how-to-dump-memcache-keyvalue-pairs-fast/
* Bug fixes and man page updates. * Bug fixes and man page updates.

View file

@ -1,4 +1,4 @@
make make
Edit parallel.spec Edit home\:tange/parallel/parallel.spec
Then go to https://build.opensuse.org/package/files?package=parallel&project=home%3Atange Then go to https://build.opensuse.org/package/files?package=parallel&project=home%3Atange
and add all the files and add all the files

View file

@ -60,11 +60,13 @@ if($Global::semaphore) {
} }
$SIG{TERM} = \&start_no_new_jobs; $SIG{TERM} = \&start_no_new_jobs;
start_more_jobs(); start_more_jobs();
if($::opt_spreadstdin) { if($::opt_pipe) {
spreadstdin(); spreadstdin();
} }
reap_if_needed(); reap_if_needed();
::debug("Start draining\n");
drain_job_queue(); drain_job_queue();
::debug("Done draining\n");
cleanup(); cleanup();
if($Global::semaphore) { if($Global::semaphore) {
$sem->release(); $sem->release();
@ -92,38 +94,85 @@ sub spreadstdin {
# If both --recstart and --recend is given then both must match # If both --recstart and --recend is given then both must match
$recstart = $::opt_recstart; $recstart = $::opt_recstart;
$recend = $::opt_recend; $recend = $::opt_recend;
$recerror = "Warning: --recend and --recstart unmatched"; $recerror = "Warning: --recend and --recstart unmatched. Is --blocksize too small?";
} elsif(defined($::opt_recstart)) { } elsif(defined($::opt_recstart)) {
# If --recstart is given it must match start of record # If --recstart is given it must match start of record
$recstart = $::opt_recstart; $recstart = $::opt_recstart;
$recend = ""; $recend = "";
$recerror = "Warning: --recstart unmatched"; $recerror = "Warning: --recstart unmatched. Is --blocksize too small?";
} elsif(defined($::opt_recend)) { } elsif(defined($::opt_recend)) {
# If --recend is given then it must match end of record # If --recend is given then it must match end of record
$recstart = ""; $recstart = "";
$recend = $::opt_recend; $recend = $::opt_recend;
$recerror = "Warning: --recend unmatched"; $recerror = "Warning: --recend unmatched. Is --blocksize too small?";
} }
while(read(STDIN,$buf,$::opt_blocksize)) { while(read(STDIN,$buf,$::opt_blocksize)) {
$record = $partial.$buf; $record = $partial.$buf;
if($Global::max_number_of_args) {
# -N => (start..*?end){n}
while($record =~ s/(($recstart.*?$recend){$Global::max_number_of_args})($recstart.*)$/$1/os) {
$partial = $3;
::debug("Read record: ".length($record)."\n");
write_record_to_pipe($record);
$record = $partial;
}
if(eof STDIN) { if(eof STDIN) {
# There is no partial record at the end of file # There is no partial record at the end of file
$partial = ""; write_record_to_pipe($record);
}
} else { } else {
if($record =~ s/(.*$recend)($recstart.*?)$/$1/os) { if($record =~ s/(.*$recend)($recstart.*?)$/$1/os) {
$partial = $2; $partial = $2;
} else { } else {
print $Global::original_stderr $recerror,"\n"; print $Global::original_stderr $recerror,"\n";
} }
}
::debug("Read record: ".length($record)."\n"); ::debug("Read record: ".length($record)."\n");
my $reap_needed = 0; write_record_to_pipe($record);
write_record: while(defined $record) { if(eof STDIN) {
# There is no partial record at the end of file
write_record_to_pipe($partial);
}
}
}
::debug("Done reading STDIN\n");
flush_and_close_pipes();
::debug("Done flushing to children\n");
$Global::start_no_new_jobs = 1;
}
sub flush_and_close_pipes {
my $flush_done;
my $sleep = 0.1;
do {
$flush_done = 1;
# Make sure everything is written to the jobs
do_not_reap();
for my $job (values %Global::running) {
if($job->remaining()) {
$job->complete_write();
$flush_done = 0;
}
}
reap_if_needed();
usleep($sleep);
$sleep *= 1.1; # exponential back off
} while (not $flush_done);
do_not_reap();
for my $job (values %Global::running) {
my $fh = $job->stdin();
close $fh;
}
reap_if_needed();
}
sub write_record_to_pipe {
my $record = shift;
my $sleep = 0.1; # 0.1 ms
write_record: while(1) {
# Sorting according to sequence is necessary for -k to work # Sorting according to sequence is necessary for -k to work
do_not_reap(); # If Global::running is changed the for loop has a race condition do_not_reap(); # If Global::running is changed the for loop has a race condition
for my $job (sort { $a->seq() <=> $b->seq() } values %Global::running) { for my $job (sort { $a->seq() <=> $b->seq() } values %Global::running) {
::debug("Looking at ",$job," ");
::debug("Looking at ",$job->seq(),"-",$job->remaining(),"-",$job->datawritten(),"\n"); ::debug("Looking at ",$job->seq(),"-",$job->remaining(),"-",$job->datawritten(),"\n");
if($job->remaining()) { if($job->remaining()) {
# Part of the job's last record has not finished being written # Part of the job's last record has not finished being written
@ -136,49 +185,19 @@ sub spreadstdin {
# This will cause the job to finish and when it dies we will spawn another job # This will cause the job to finish and when it dies we will spawn another job
my $fh = $job->stdin(); my $fh = $job->stdin();
close $fh; close $fh;
$reap_needed = 1;
} else { } else {
$job->write($record); $job->write($record);
$record = undef; $sleep = 0.1;
$first++;
$sleep=1;
last write_record; last write_record;
} }
} }
} }
reap_if_needed(); reap_if_needed();
# Rotate jobs to spread the input
# @jobs = ($jobs[$#jobs],@jobs[0..$#jobs-1]);
usleep($sleep); usleep($sleep);
$sleep *= 1.1; # exponential back off $sleep *= 1.1; # exponential back off
$max_sleep = max($max_sleep, $sleep);
$second++;
} }
if($reap_needed) {
# One of more jobs finished in last round
reap_if_needed(); reap_if_needed();
} return;
}
my $flush_done;
do {
$flush_done = 1;
# Make sure everything is written to the jobs
for my $job (values %Global::running) {
if($job->remaining()) {
$job->complete_write();
$flush_done = 0;
}
}
usleep($sleep);
$sleep *= 1.1; # exponential back off
} while (not $flush_done);
for my $job (values %Global::running) {
my $fh = $job->stdin();
close $fh;
}
$Global::start_no_new_jobs = 1;
::debug("Blocks send directly: $first Delays: $second Max sleep ms: $max_sleep\n");
} }
sub acquire_semaphore { sub acquire_semaphore {
@ -237,7 +256,7 @@ sub get_options_from_array {
"quote|q" => \$::opt_q, "quote|q" => \$::opt_q,
"I=s" => \$::opt_I, "I=s" => \$::opt_I,
"extensionreplace|U=s" => \$::opt_U, "extensionreplace|U=s" => \$::opt_U,
"basenamereplace=s" => \$::opt_basenamereplace, "basenamereplace|bnr=s" => \$::opt_basenamereplace,
"basenameextensionreplace=s" => \$::opt_basenameextensionreplace, "basenameextensionreplace=s" => \$::opt_basenameextensionreplace,
"jobs|j=s" => \$::opt_P, "jobs|j=s" => \$::opt_P,
"load=s" => \$::opt_load, "load=s" => \$::opt_load,
@ -267,7 +286,7 @@ sub get_options_from_array {
"arg-file-sep|argfilesep=s" => \$::opt_arg_file_sep, "arg-file-sep|argfilesep=s" => \$::opt_arg_file_sep,
"trim=s" => \$::opt_trim, "trim=s" => \$::opt_trim,
"profile|J=s" => \$::opt_profile, "profile|J=s" => \$::opt_profile,
"pipe|spreadstdin" => \$::opt_spreadstdin, "pipe|spreadstdin" => \$::opt_pipe,
"recstart=s" => \$::opt_recstart, "recstart=s" => \$::opt_recstart,
"recend=s" => \$::opt_recend, "recend=s" => \$::opt_recend,
"files|output-as-files|outputasfiles" => \$::opt_files, "files|output-as-files|outputasfiles" => \$::opt_files,
@ -809,8 +828,17 @@ sub drain_job_queue {
my $last_header=""; my $last_header="";
do { do {
while($Global::total_running > 0) { while($Global::total_running > 0) {
debug("jobs running: ",$Global::total_running," Memory usage:".my_memory_usage()."\n"); debug("jobs running: ",$Global::total_running," ",scalar keys %Global::running, " Memory usage:".my_memory_usage()."\n");
sleep 1; sleep 1;
if($::opt_pipe) {
# When using --pipe sometimes file handles are not closed properly
do_not_reap();
for my $job (values %Global::running) {
my $fh = $job->stdin();
close $fh;
}
reap_if_needed();
}
reaper(); # Some systems fail to catch the SIGCHLD reaper(); # Some systems fail to catch the SIGCHLD
if($::opt_progress) { if($::opt_progress) {
my %progress = progress(); my %progress = progress();
@ -1038,7 +1066,7 @@ sub start_more_jobs {
next; next;
} }
while ($sshlogin->jobs_running() < $sshlogin->max_jobs_running()) { while ($sshlogin->jobs_running() < $sshlogin->max_jobs_running()) {
if($Global::JobQueue->empty() and not $::opt_spreadstdin) { if($Global::JobQueue->empty() and not $::opt_pipe) {
last; last;
} }
debug("Try starting a job on ".$sshlogin->string()."\n"); debug("Try starting a job on ".$sshlogin->string()."\n");
@ -1067,7 +1095,7 @@ sub start_another_job {
my $sshlogin = shift; my $sshlogin = shift;
# Do we have enough file handles to start another job? # Do we have enough file handles to start another job?
if(enough_file_handles()) { if(enough_file_handles()) {
if($Global::JobQueue->empty() and not $::opt_spreadstdin) { if($Global::JobQueue->empty() and not $::opt_pipe) {
# No more commands to run # No more commands to run
return 0; return 0;
} else { } else {
@ -1390,7 +1418,6 @@ sub wait_and_exit {
waitpid($_,0); waitpid($_,0);
delete $Global::unkilled_children{$_}; delete $Global::unkilled_children{$_};
} }
wait(); wait();
exit(shift); exit(shift);
} }
@ -1484,6 +1511,9 @@ sub my_size {
} }
} }
# Guvf vf n grfg gb frr vs bguref ner ernqvat zl pbqr. Cyrnfr rznvy
# pbbxvr@gnatr.qx jura lbh ernq guvf
sub my_dump { sub my_dump {
# Returns: # Returns:
# ascii expression of object if Data::Dump(er) is installed # ascii expression of object if Data::Dump(er) is installed
@ -2808,7 +2838,7 @@ sub start {
$ENV{'PARALLEL_SEQ'} = $job->seq(); $ENV{'PARALLEL_SEQ'} = $job->seq();
$ENV{'PARALLEL_PID'} = $$; $ENV{'PARALLEL_PID'} = $$;
::debug("$Global::total_running processes. Starting (".$job->seq()."): $command\n"); ::debug("$Global::total_running processes. Starting (".$job->seq()."): $command\n");
if($::opt_spreadstdin) { if($::opt_pipe) {
my ($in); my ($in);
$pid = ::open3($in, ">&OUT", ">&ERR", $command) || $pid = ::open3($in, ">&OUT", ">&ERR", $command) ||
die("open3 (with spreadstdin) failed. Report a bug to <bug-parallel\@gnu.org>\n"); die("open3 (with spreadstdin) failed. Report a bug to <bug-parallel\@gnu.org>\n");
@ -3001,7 +3031,7 @@ sub populate {
# Add arguments from arg_queue until the number of arguments or # Add arguments from arg_queue until the number of arguments or
# max line length is reached # max line length is reached
my $self = shift; my $self = shift;
if($::opt_spreadstdin) { if($::opt_pipe) {
# Do no read any args # Do no read any args
$self->push([Arg->new("")]); $self->push([Arg->new("")]);
return; return;
@ -3413,7 +3443,7 @@ sub get {
); );
$cmd_line->populate(); $cmd_line->populate();
::debug("cmd_line->number_of_args ".$cmd_line->number_of_args()."\n"); ::debug("cmd_line->number_of_args ".$cmd_line->number_of_args()."\n");
if(not $::opt_spreadstdin and $cmd_line->number_of_args() == 0) { if(not $::opt_pipe and $cmd_line->number_of_args() == 0) {
# We did not get more args - maybe at EOF string? # We did not get more args - maybe at EOF string?
return undef; return undef;
} else { } else {

View file

@ -222,7 +222,9 @@ Multiple B<-B> can be specified to transfer more basefiles. The
I<file> will be transferred the same way as B<--transfer>. I<file> will be transferred the same way as B<--transfer>.
=item B<--basenamereplace> I<replace-str> (beta testing) =item B<--basenamereplace> I<replace-str>
=item B<--bnr> I<replace-str>
Use the replacement string I<replace-str> instead of B<{/}> for basename of input line. Use the replacement string I<replace-str> instead of B<{/}> for basename of input line.
@ -334,7 +336,7 @@ If I<eof-str> is omitted, there is no end of file string. If neither
B<-E> nor B<-e> is used, no end of file string is used. B<-E> nor B<-e> is used, no end of file string is used.
=item B<--eta> =item B<--eta> (alpha testing)
Show the estimated number of seconds before finishing. This forces GNU Show the estimated number of seconds before finishing. This forces GNU
B<parallel> to read all jobs before starting to find the number of B<parallel> to read all jobs before starting to find the number of
@ -630,7 +632,11 @@ This will set the owner of the homedir to the user:
B<tr ':' '\012' < /etc/passwd | parallel -N7 chown {1} {6}> B<tr ':' '\012' < /etc/passwd | parallel -N7 chown {1} {6}>
Implies B<-X> unless B<-m> is set. Implies B<-X> unless B<-m> or <--pipe> is set.
When used with B<--pipe> B<-N> is the number of records to read. This
is much slower than B<--blocksize> so avoid it if performance is
important.
=item B<--max-line-length-allowed> =item B<--max-line-length-allowed>
@ -709,12 +715,13 @@ If B<--recend> is given I<endregexp> will be used to split at record end.
If both B<--recstart> and B<--recend> are given the regular expression If both B<--recstart> and B<--recend> are given the regular expression
I<startregexp>I<endregexp> will have to match to find a split I<startregexp>I<endregexp> will have to match to find a split
position. position. This is useful if either I<startregexp> or I<endregexp>
match in the middle of a record.
If neither B<--recstart> nor B<--recend> are given B<--recend> If neither B<--recstart> nor B<--recend> are given then B<--recend>
defaults to '\n'. To have no record separator use B<--recend "">. defaults to '\n'. To have no record separator use B<--recend "">.
B<--recstart> and B<--recend> used with B<--pipe>. B<--recstart> and B<--recend> are used with B<--pipe>.
=item B<--retries> I<n> (beta testing) =item B<--retries> I<n> (beta testing)

View file

@ -1,7 +1,7 @@
#!/bin/bash #!/bin/bash
echo '### Test --spreadstdin - more procs than args' echo '### Test --spreadstdin - more procs than args'
rm /tmp/parallel.ss.* rm -f /tmp/parallel.ss.*
seq 1 5 | stdout parallel -j 10 --spreadstdin 'cat >/tmp/parallel.ss.$PARALLEL_SEQ' >/dev/null seq 1 5 | stdout parallel -j 10 --spreadstdin 'cat >/tmp/parallel.ss.$PARALLEL_SEQ' >/dev/null
cat /tmp/parallel.ss.* cat /tmp/parallel.ss.*

View file

@ -0,0 +1,41 @@
#!/bin/bash
echo '### Test --pipe'
# Make some pseudo random input that stays the same
seq 1 1000000 >/tmp/parallel-seq
shuf --random-source=/tmp/parallel-seq /tmp/parallel-seq >/tmp/blocktest
echo '### Test -N even'
seq 1 10 | parallel -j2 -k -N 2 --pipe cat";echo ole;sleep 0.1"
echo '### Test -N odd'
seq 1 11 | parallel -j2 -k -N 2 --pipe cat";echo ole;sleep 0.1"
echo '### Test -N even+2'
seq 1 12 | parallel -j2 -k -N 2 --pipe cat";echo ole;sleep 0.1"
echo '### Test --recstart + --recend'
cat /tmp/blocktest | parallel -k --recstart 44 --recend "44" -j10 --pipe sort -n |md5sum
echo '### Race condition bug - 1 - would block'
seq 1 80 | nice parallel -j0 'seq 1 10| parallel --block 1 --recend "" --pipe cat;true' >/dev/null
echo '### Race condition bug - 2 - would block'
seq 1 100 | nice parallel -j100 --block 1 --recend "" --pipe cat >/dev/null
echo '### Test --block size=1'
seq 1 10| parallel --block 1 --files --recend "" --pipe sort -n | parallel -Xj1 sort -nm {} ";"rm {}
echo '### Test --block size=1M -j10 --files - more jobs than data'
sort -n < /tmp/blocktest | md5sum
cat /tmp/blocktest | parallel --files --recend "\n" -j10 --pipe sort -n | parallel -Xj1 sort -nm {} ";"rm {} | md5sum
echo '### Test --block size=1M -j1 - more data than cpu'
cat /tmp/blocktest | parallel --files --recend "\n" -j1 --pipe sort -n | parallel -Xj1 sort -nm {} ";"rm {} | md5sum
echo '### Test --block size=1M -j1 - more data than cpu'
cat /tmp/blocktest | parallel --files --recend "\n" -j2 --pipe sort -n | parallel -Xj1 sort -nm {} ";"rm {} | md5sum
echo '### Test --pipe default settings'
cat /tmp/blocktest | parallel --pipe sort | sort -n | md5sum

View file

@ -0,0 +1,92 @@
### Test --pipe
### Test -N even
1
2
ole
3
4
ole
5
6
ole
7
8
ole
9
10
ole
ole
### Test -N odd
1
2
ole
3
4
ole
5
6
ole
7
8
ole
9
10
ole
11
ole
### Test -N even+2
1
2
ole
3
4
ole
5
6
ole
7
8
ole
9
10
ole
11
12
ole
ole
ole
### Test --recstart + --recend
0d53a19cdc880668a7c745794dcafbf1 -
### Race condition bug - 1 - would block
### Race condition bug - 2 - would block
### Test --block size=1
0
1
1
2
3
4
5
6
7
8
9
### Test --block size=1M -j10 --files - more jobs than data
8a7095c1c23bfadc311fe6b16d950582 -
8a7095c1c23bfadc311fe6b16d950582 -
### Test --block size=1M -j1 - more data than cpu
8a7095c1c23bfadc311fe6b16d950582 -
### Test --block size=1M -j1 - more data than cpu
8a7095c1c23bfadc311fe6b16d950582 -
### Test --pipe default settings
8a7095c1c23bfadc311fe6b16d950582 -