From 76d9e4f41b9e4291afc32a5c2452eb9fa3e60126 Mon Sep 17 00:00:00 2001 From: Ole Tange Date: Mon, 24 Jan 2011 20:06:30 +0100 Subject: [PATCH] -N for --pipe. Fixed race condition in --pipe. Tests for --pipe. Passes testsuite. --- doc/FUTURE_IDEAS | 7 +- doc/release_new_version | 56 +++++++------ packager/obs/README | 2 +- src/parallel | 140 +++++++++++++++++++------------ src/parallel.pod | 19 +++-- testsuite/tests-to-run/test49.sh | 2 +- testsuite/tests-to-run/test51.sh | 41 +++++++++ testsuite/wanted-results/test51 | 92 ++++++++++++++++++++ 8 files changed, 269 insertions(+), 90 deletions(-) create mode 100644 testsuite/tests-to-run/test51.sh create mode 100644 testsuite/wanted-results/test51 diff --git a/doc/FUTURE_IDEAS b/doc/FUTURE_IDEAS index 04320b5b..3fa7824a 100644 --- a/doc/FUTURE_IDEAS +++ b/doc/FUTURE_IDEAS @@ -1,6 +1,7 @@ ---block -parallel: --joblog implemented. Testsuite missing. -parallel: --spreadstdin prototype. Testsuite missing. +-N for --pipe. Fixed race condition in --pipe. +Tests for --pipe. +Passes testsuite. + codecoverage diff --git a/doc/release_new_version b/doc/release_new_version index 722a861c..f3662e27 100644 --- a/doc/release_new_version +++ b/doc/release_new_version @@ -47,6 +47,7 @@ pushd /tmp tar xjvf parallel-$YYYYMMDD.tar.bz2 cd parallel-$YYYYMMDD ./configure && make -j && sudo make -j install +pushd == Upload == @@ -135,43 +136,50 @@ cc:Peter Simons , Sandro Cazzaniga , Rogério Brito , Jonathan Palardy , Koen Vervloesem , R. Tyler Croy , ryoichiro.suzuki@gmail.com,kerick@shiftedbit.net, - Christian Faulhammer + Christian Faulhammer , Ryoichiro Suzuki -Subject: GNU Parallel 20110122 released +Subject: GNU Parallel 2011XXXX released -GNU Parallel 20110122 has been released. It is available for +GNU Parallel 2011XXXX has been released. It is available for download at: http://ftp.gnu.org/gnu/parallel/ +This is a major release as the --pipe option introduces a new way to +work. To learn about --pipe see the example section for uses of +--pipe. + +But rest assured: No old functionality is changed. + New in this release: -* --joblog makes a simple log of completed jobs. +* --pipe splits piped data into blocks. Each block is piped to a + program for processing. The piping and the programs will be run in + parallel. Useful if the data you want to process is data for a + program and not arguments. -* -X now spreads arguments between job slots when reaching last - argument. Use -j1 to avoid this. +* --blocksize sets the blocksize in bytes for --pipe. The blocksize is + approximate. It can deviate as much as the size of one record. + Default is 1M. -* People on the email list have voted -j+0 to be the new default - instead of -j9. +* --recstart sets the regular expression matching the start of a + record. Default is "". -* First review in Polish. Thanks to Patryk Krawaczyński. - http://nfsec.pl/root/2458 +* --recend sets the regular expression matching the end of a + record. Default is "\n". To specify none use --recend "". -* Review in Spanish (in print). - https://www.linux-magazine.es/issue/67/moreinfo.html + If both --recstart and --recend are set, the end of a record must be + followed immediately by a start of a record. This is useful if + either recend or recstart can occur in the middle of a record. -* Review in English. Thanks to Brian Gough. - http://blogs.fsfe.org/bjg/2011/01/gnu-parallel-a-map-operator-for-the-command-line/ +* --output-as-files will put the output of the programs into files and + instead of giving the output GNU Parallel will output the name of + these files. -* Review in French. Thanks to Denis Dordoigne. - http://linuxfr.org/2010/12/29/27715.html +* -N set the number of records to read. If used with --blocksize + the block read will at most be --blocksize. -N is much slower + than --blocksize so avoid -N if performance is important. -* Review in Spanish. - http://gufete.net/index.php?entry=entry110116-200022 - -* Article with advanced recursive example. Thanks to Ruarí Ødegaard - http://my.opera.com/ruario/blog/2011/01/18/fun-with-gnu-parallel - -* Use case for memcache. - http://www.dctrwatson.com/2010/12/how-to-dump-memcache-keyvalue-pairs-fast/ +* Advanced recursive example. Thanks to Ruarí Ødegaard. + http://my.opera.com/ruario/blog/2011/01/24/editing-debian-packages-more-fun-with-gnu * Bug fixes and man page updates. diff --git a/packager/obs/README b/packager/obs/README index de11edcf..6b10bdf3 100644 --- a/packager/obs/README +++ b/packager/obs/README @@ -1,4 +1,4 @@ make -Edit parallel.spec +Edit home\:tange/parallel/parallel.spec Then go to https://build.opensuse.org/package/files?package=parallel&project=home%3Atange and add all the files diff --git a/src/parallel b/src/parallel index aa8800da..3d3673bc 100755 --- a/src/parallel +++ b/src/parallel @@ -60,11 +60,13 @@ if($Global::semaphore) { } $SIG{TERM} = \&start_no_new_jobs; start_more_jobs(); -if($::opt_spreadstdin) { +if($::opt_pipe) { spreadstdin(); } reap_if_needed(); +::debug("Start draining\n"); drain_job_queue(); +::debug("Done draining\n"); cleanup(); if($Global::semaphore) { $sem->release(); @@ -92,38 +94,85 @@ sub spreadstdin { # If both --recstart and --recend is given then both must match $recstart = $::opt_recstart; $recend = $::opt_recend; - $recerror = "Warning: --recend and --recstart unmatched"; + $recerror = "Warning: --recend and --recstart unmatched. Is --blocksize too small?"; } elsif(defined($::opt_recstart)) { # If --recstart is given it must match start of record $recstart = $::opt_recstart; $recend = ""; - $recerror = "Warning: --recstart unmatched"; + $recerror = "Warning: --recstart unmatched. Is --blocksize too small?"; } elsif(defined($::opt_recend)) { # If --recend is given then it must match end of record $recstart = ""; $recend = $::opt_recend; - $recerror = "Warning: --recend unmatched"; + $recerror = "Warning: --recend unmatched. Is --blocksize too small?"; } while(read(STDIN,$buf,$::opt_blocksize)) { $record = $partial.$buf; - if(eof STDIN) { - # There is no partial record at the end of file - $partial = ""; + if($Global::max_number_of_args) { + # -N => (start..*?end){n} + while($record =~ s/(($recstart.*?$recend){$Global::max_number_of_args})($recstart.*)$/$1/os) { + $partial = $3; + ::debug("Read record: ".length($record)."\n"); + write_record_to_pipe($record); + $record = $partial; + } + if(eof STDIN) { + # There is no partial record at the end of file + write_record_to_pipe($record); + } } else { if($record =~ s/(.*$recend)($recstart.*?)$/$1/os) { $partial = $2; } else { print $Global::original_stderr $recerror,"\n"; } + ::debug("Read record: ".length($record)."\n"); + write_record_to_pipe($record); + if(eof STDIN) { + # There is no partial record at the end of file + write_record_to_pipe($partial); + } } - ::debug("Read record: ".length($record)."\n"); - my $reap_needed = 0; - write_record: while(defined $record) { + } + ::debug("Done reading STDIN\n"); + flush_and_close_pipes(); + ::debug("Done flushing to children\n"); + $Global::start_no_new_jobs = 1; +} + +sub flush_and_close_pipes { + my $flush_done; + my $sleep = 0.1; + do { + $flush_done = 1; + # Make sure everything is written to the jobs + do_not_reap(); + for my $job (values %Global::running) { + if($job->remaining()) { + $job->complete_write(); + $flush_done = 0; + } + } + reap_if_needed(); + usleep($sleep); + $sleep *= 1.1; # exponential back off + } while (not $flush_done); + do_not_reap(); + for my $job (values %Global::running) { + my $fh = $job->stdin(); + close $fh; + } + reap_if_needed(); +} + +sub write_record_to_pipe { + my $record = shift; + my $sleep = 0.1; # 0.1 ms + write_record: while(1) { # Sorting according to sequence is necessary for -k to work do_not_reap(); # If Global::running is changed the for loop has a race condition for my $job (sort { $a->seq() <=> $b->seq() } values %Global::running) { - ::debug("Looking at ",$job," "); ::debug("Looking at ",$job->seq(),"-",$job->remaining(),"-",$job->datawritten(),"\n"); if($job->remaining()) { # Part of the job's last record has not finished being written @@ -132,53 +181,23 @@ sub spreadstdin { if($job->datawritten() > 0) { # There is no data remaining and we have written data before: # So this means we have completed writing a block. - # close stdin + # close stdin # This will cause the job to finish and when it dies we will spawn another job my $fh = $job->stdin(); close $fh; - $reap_needed = 1; } else { $job->write($record); - $record = undef; - $first++; - $sleep=1; + $sleep = 0.1; last write_record; } } } reap_if_needed(); - # Rotate jobs to spread the input -# @jobs = ($jobs[$#jobs],@jobs[0..$#jobs-1]); usleep($sleep); $sleep *= 1.1; # exponential back off - $max_sleep = max($max_sleep, $sleep); - $second++; } - if($reap_needed) { - # One of more jobs finished in last round - reap_if_needed(); - } - } - my $flush_done; - do { - $flush_done = 1; - # Make sure everything is written to the jobs - for my $job (values %Global::running) { - if($job->remaining()) { - $job->complete_write(); - $flush_done = 0; - } - } - usleep($sleep); - $sleep *= 1.1; # exponential back off - } while (not $flush_done); - - for my $job (values %Global::running) { - my $fh = $job->stdin(); - close $fh; - } - $Global::start_no_new_jobs = 1; - ::debug("Blocks send directly: $first Delays: $second Max sleep ms: $max_sleep\n"); + reap_if_needed(); + return; } sub acquire_semaphore { @@ -237,7 +256,7 @@ sub get_options_from_array { "quote|q" => \$::opt_q, "I=s" => \$::opt_I, "extensionreplace|U=s" => \$::opt_U, - "basenamereplace=s" => \$::opt_basenamereplace, + "basenamereplace|bnr=s" => \$::opt_basenamereplace, "basenameextensionreplace=s" => \$::opt_basenameextensionreplace, "jobs|j=s" => \$::opt_P, "load=s" => \$::opt_load, @@ -267,7 +286,7 @@ sub get_options_from_array { "arg-file-sep|argfilesep=s" => \$::opt_arg_file_sep, "trim=s" => \$::opt_trim, "profile|J=s" => \$::opt_profile, - "pipe|spreadstdin" => \$::opt_spreadstdin, + "pipe|spreadstdin" => \$::opt_pipe, "recstart=s" => \$::opt_recstart, "recend=s" => \$::opt_recend, "files|output-as-files|outputasfiles" => \$::opt_files, @@ -381,7 +400,7 @@ sub parse_options { if(defined @::opt_sshlogin) { @Global::sshlogin = @::opt_sshlogin; } if(defined $::opt_sshloginfile) { read_sshloginfile($::opt_sshloginfile); } if(defined @::opt_return) { push @Global::ret_files, @::opt_return; } - if(not defined $::opt_recstart and + if(not defined $::opt_recstart and not defined $::opt_recend) { $::opt_recend = "\n"; } if(not defined $::opt_blocksize) { $::opt_blocksize = "1M"; } $::opt_blocksize = multiply_binary_prefix($::opt_blocksize); @@ -809,8 +828,17 @@ sub drain_job_queue { my $last_header=""; do { while($Global::total_running > 0) { - debug("jobs running: ",$Global::total_running," Memory usage:".my_memory_usage()."\n"); + debug("jobs running: ",$Global::total_running," ",scalar keys %Global::running, " Memory usage:".my_memory_usage()."\n"); sleep 1; + if($::opt_pipe) { + # When using --pipe sometimes file handles are not closed properly + do_not_reap(); + for my $job (values %Global::running) { + my $fh = $job->stdin(); + close $fh; + } + reap_if_needed(); + } reaper(); # Some systems fail to catch the SIGCHLD if($::opt_progress) { my %progress = progress(); @@ -1038,7 +1066,7 @@ sub start_more_jobs { next; } while ($sshlogin->jobs_running() < $sshlogin->max_jobs_running()) { - if($Global::JobQueue->empty() and not $::opt_spreadstdin) { + if($Global::JobQueue->empty() and not $::opt_pipe) { last; } debug("Try starting a job on ".$sshlogin->string()."\n"); @@ -1067,7 +1095,7 @@ sub start_another_job { my $sshlogin = shift; # Do we have enough file handles to start another job? if(enough_file_handles()) { - if($Global::JobQueue->empty() and not $::opt_spreadstdin) { + if($Global::JobQueue->empty() and not $::opt_pipe) { # No more commands to run return 0; } else { @@ -1390,7 +1418,6 @@ sub wait_and_exit { waitpid($_,0); delete $Global::unkilled_children{$_}; } - wait(); exit(shift); } @@ -1484,6 +1511,9 @@ sub my_size { } } +# Guvf vf n grfg gb frr vs bguref ner ernqvat zl pbqr. Cyrnfr rznvy +# pbbxvr@gnatr.qx jura lbh ernq guvf + sub my_dump { # Returns: # ascii expression of object if Data::Dump(er) is installed @@ -2808,7 +2838,7 @@ sub start { $ENV{'PARALLEL_SEQ'} = $job->seq(); $ENV{'PARALLEL_PID'} = $$; ::debug("$Global::total_running processes. Starting (".$job->seq()."): $command\n"); - if($::opt_spreadstdin) { + if($::opt_pipe) { my ($in); $pid = ::open3($in, ">&OUT", ">&ERR", $command) || die("open3 (with spreadstdin) failed. Report a bug to \n"); @@ -3001,7 +3031,7 @@ sub populate { # Add arguments from arg_queue until the number of arguments or # max line length is reached my $self = shift; - if($::opt_spreadstdin) { + if($::opt_pipe) { # Do no read any args $self->push([Arg->new("")]); return; @@ -3413,7 +3443,7 @@ sub get { ); $cmd_line->populate(); ::debug("cmd_line->number_of_args ".$cmd_line->number_of_args()."\n"); - if(not $::opt_spreadstdin and $cmd_line->number_of_args() == 0) { + if(not $::opt_pipe and $cmd_line->number_of_args() == 0) { # We did not get more args - maybe at EOF string? return undef; } else { diff --git a/src/parallel.pod b/src/parallel.pod index 8a992e92..00cdfe4b 100644 --- a/src/parallel.pod +++ b/src/parallel.pod @@ -222,7 +222,9 @@ Multiple B<-B> can be specified to transfer more basefiles. The I will be transferred the same way as B<--transfer>. -=item B<--basenamereplace> I (beta testing) +=item B<--basenamereplace> I + +=item B<--bnr> I Use the replacement string I instead of B<{/}> for basename of input line. @@ -334,7 +336,7 @@ If I is omitted, there is no end of file string. If neither B<-E> nor B<-e> is used, no end of file string is used. -=item B<--eta> +=item B<--eta> (alpha testing) Show the estimated number of seconds before finishing. This forces GNU B to read all jobs before starting to find the number of @@ -630,7 +632,11 @@ This will set the owner of the homedir to the user: B -Implies B<-X> unless B<-m> is set. +Implies B<-X> unless B<-m> or <--pipe> is set. + +When used with B<--pipe> B<-N> is the number of records to read. This +is much slower than B<--blocksize> so avoid it if performance is +important. =item B<--max-line-length-allowed> @@ -709,12 +715,13 @@ If B<--recend> is given I will be used to split at record end. If both B<--recstart> and B<--recend> are given the regular expression II will have to match to find a split -position. +position. This is useful if either I or I +match in the middle of a record. -If neither B<--recstart> nor B<--recend> are given B<--recend> +If neither B<--recstart> nor B<--recend> are given then B<--recend> defaults to '\n'. To have no record separator use B<--recend "">. -B<--recstart> and B<--recend> used with B<--pipe>. +B<--recstart> and B<--recend> are used with B<--pipe>. =item B<--retries> I (beta testing) diff --git a/testsuite/tests-to-run/test49.sh b/testsuite/tests-to-run/test49.sh index 98ab2de7..32ab38e5 100644 --- a/testsuite/tests-to-run/test49.sh +++ b/testsuite/tests-to-run/test49.sh @@ -1,7 +1,7 @@ #!/bin/bash echo '### Test --spreadstdin - more procs than args' -rm /tmp/parallel.ss.* +rm -f /tmp/parallel.ss.* seq 1 5 | stdout parallel -j 10 --spreadstdin 'cat >/tmp/parallel.ss.$PARALLEL_SEQ' >/dev/null cat /tmp/parallel.ss.* diff --git a/testsuite/tests-to-run/test51.sh b/testsuite/tests-to-run/test51.sh new file mode 100644 index 00000000..07822b23 --- /dev/null +++ b/testsuite/tests-to-run/test51.sh @@ -0,0 +1,41 @@ +#!/bin/bash + +echo '### Test --pipe' +# Make some pseudo random input that stays the same +seq 1 1000000 >/tmp/parallel-seq +shuf --random-source=/tmp/parallel-seq /tmp/parallel-seq >/tmp/blocktest + +echo '### Test -N even' +seq 1 10 | parallel -j2 -k -N 2 --pipe cat";echo ole;sleep 0.1" + +echo '### Test -N odd' +seq 1 11 | parallel -j2 -k -N 2 --pipe cat";echo ole;sleep 0.1" + +echo '### Test -N even+2' +seq 1 12 | parallel -j2 -k -N 2 --pipe cat";echo ole;sleep 0.1" + +echo '### Test --recstart + --recend' +cat /tmp/blocktest | parallel -k --recstart 44 --recend "44" -j10 --pipe sort -n |md5sum + +echo '### Race condition bug - 1 - would block' +seq 1 80 | nice parallel -j0 'seq 1 10| parallel --block 1 --recend "" --pipe cat;true' >/dev/null + +echo '### Race condition bug - 2 - would block' +seq 1 100 | nice parallel -j100 --block 1 --recend "" --pipe cat >/dev/null + +echo '### Test --block size=1' +seq 1 10| parallel --block 1 --files --recend "" --pipe sort -n | parallel -Xj1 sort -nm {} ";"rm {} + +echo '### Test --block size=1M -j10 --files - more jobs than data' +sort -n < /tmp/blocktest | md5sum +cat /tmp/blocktest | parallel --files --recend "\n" -j10 --pipe sort -n | parallel -Xj1 sort -nm {} ";"rm {} | md5sum + +echo '### Test --block size=1M -j1 - more data than cpu' +cat /tmp/blocktest | parallel --files --recend "\n" -j1 --pipe sort -n | parallel -Xj1 sort -nm {} ";"rm {} | md5sum + +echo '### Test --block size=1M -j1 - more data than cpu' +cat /tmp/blocktest | parallel --files --recend "\n" -j2 --pipe sort -n | parallel -Xj1 sort -nm {} ";"rm {} | md5sum + +echo '### Test --pipe default settings' +cat /tmp/blocktest | parallel --pipe sort | sort -n | md5sum + diff --git a/testsuite/wanted-results/test51 b/testsuite/wanted-results/test51 new file mode 100644 index 00000000..bc424cef --- /dev/null +++ b/testsuite/wanted-results/test51 @@ -0,0 +1,92 @@ +### Test --pipe +### Test -N even +1 +2 +ole +3 +4 +ole +5 +6 +ole +7 +8 +ole +9 +10 +ole +ole +### Test -N odd +1 +2 +ole +3 +4 +ole +5 +6 +ole +7 +8 +ole +9 +10 +ole +11 +ole +### Test -N even+2 +1 +2 +ole +3 +4 +ole +5 +6 +ole +7 +8 +ole +9 +10 +ole +11 +12 +ole +ole +ole +### Test --recstart + --recend +0d53a19cdc880668a7c745794dcafbf1 - +### Race condition bug - 1 - would block +### Race condition bug - 2 - would block +### Test --block size=1 + + + + + + + + + + +0 +1 +1 +2 +3 +4 +5 +6 +7 +8 +9 +### Test --block size=1M -j10 --files - more jobs than data +8a7095c1c23bfadc311fe6b16d950582 - +8a7095c1c23bfadc311fe6b16d950582 - +### Test --block size=1M -j1 - more data than cpu +8a7095c1c23bfadc311fe6b16d950582 - +### Test --block size=1M -j1 - more data than cpu +8a7095c1c23bfadc311fe6b16d950582 - +### Test --pipe default settings +8a7095c1c23bfadc311fe6b16d950582 -