From 67c4377715122cc40b4104d061bef21e167b6679 Mon Sep 17 00:00:00 2001 From: Ole Tange Date: Sat, 12 Aug 2017 18:37:52 +0200 Subject: [PATCH] parcat: --rm, reading args from stdin. --- CREDITS | 3 + Makefile.am | 2 +- Makefile.in | 2 +- doc/release_new_version | 18 ++- src/parcat | 121 +++++++++++------- src/parcat.pod | 15 ++- testsuite/tests-to-run/parallel-local-0.3s.sh | 19 +++ testsuite/tests-to-run/parallel-local-3s.sh | 23 ++++ testsuite/wanted-results/parallel-local-3s | 5 + 9 files changed, 154 insertions(+), 54 deletions(-) diff --git a/CREDITS b/CREDITS index ea5e430d..6391c0ff 100644 --- a/CREDITS +++ b/CREDITS @@ -1,3 +1,6 @@ +People who have helped GNU Parallel different ways. + +John Rusnak: Feedback on all documentation. FrithMartin: Bug patch for orphan blocks. Rasmus Villemoes: Code snips for signal processing. Martin d'Anjou: Code snips for signal processing. diff --git a/Makefile.am b/Makefile.am index 9568b3bf..4a0c313a 100644 --- a/Makefile.am +++ b/Makefile.am @@ -119,4 +119,4 @@ monitorman: # If man page changed: open new pdfman inotifywait -qmre MOVED_TO -e CLOSE_WRITE --format %w%f . | parallel -uj1 'echo {=/\.pod$$/ or skip()=};make -j && sudo make install; pdfman {/.} &' -EXTRA_DIST = CITATION CREDITS +EXTRA_DIST = CITATION CREDITS cc-by-sa.txt fdl.txt diff --git a/Makefile.in b/Makefile.in index 27808371..53f28360 100644 --- a/Makefile.in +++ b/Makefile.in @@ -275,7 +275,7 @@ top_build_prefix = @top_build_prefix@ top_builddir = @top_builddir@ top_srcdir = @top_srcdir@ SUBDIRS = src -EXTRA_DIST = CITATION CREDITS +EXTRA_DIST = CITATION CREDITS cc-by-sa.txt fdl.txt all: config.h $(MAKE) $(AM_MAKEFLAGS) all-recursive diff --git a/doc/release_new_version b/doc/release_new_version index e8e97c07..62c8914c 100644 --- a/doc/release_new_version +++ b/doc/release_new_version @@ -195,6 +195,8 @@ file:///home/tange/privat/parallel/doc/release_new_version from:tange@gnu.org to:parallel@gnu.org, bug-parallel@gnu.org +stable-bcc: Jesse Alama + Subject: GNU Parallel 20170822 ('<<>>') released <<[stable]>> @@ -214,10 +216,24 @@ New in this release: http://meta.askubuntu.com/a/16750/22307 http://meta.serverfault.com/a/9040/45704 -* GNU Parallel was cited in: +* GNU Parallel was cited in: https://springerplus.springeropen.com/articles/10.1186/s40064-016-2022-y + +https://dzone.com/articles/running-bash-commands-in-parallel-1 * https://medium.com/@nornagon/today-i-learned-gnu-parallel-plate-tectonics-9fcf24045e63 +* https://www.upf.edu/web/sct-sit/gnu-parallel-tutorial + + +http://blogs.fluidinfo.com/terry/2017/08/05/do-stuff-on-things-in-parallel/ + +http://ino.pm/outreach/presentations/2014/03/genomics-wranglers/index.html#/5 +http://www.ettemalab.org/using-for-loop-vs-gnu-parallel-for-blast/ + +https://medium.com/@nornagon/today-i-learned-gnu-parallel-plate-tectonics-9fcf24045e63 + +https://gxnotes.com/article/175866.html + <> <> diff --git a/src/parcat b/src/parcat index 820c17ac..3ea4add1 100755 --- a/src/parcat +++ b/src/parcat @@ -16,24 +16,44 @@ my $okq = Thread::Queue->new(); my @producers; if(not @ARGV) { - print "Usage:\n"; - print " parcat file(s)\n"; + if(-t *STDIN) { + print "Usage:\n"; + print " parcat file(s)\n"; + print " cat argfile | parcat\n"; + } else { + # Read arguments from stdin + chomp(@ARGV = ); + } } +my $files_to_open = 0; +# Default: fd = stdout +my $fd = 1; for (@ARGV) { - push @producers, threads->create('producer', $_); + # --rm = remove file when opened + /^--rm$/ and do { $opt::rm = 1; next; }; + # -1 = output to fd 1, -2 = output to fd 2 + /^-(\d+)$/ and do { $fd = $1; next; }; + push @producers, threads->create('producer', $_, $fd); + $files_to_open++; } sub producer { # Open a file/fifo, set non blocking, enqueue fileno of the file handle my $file = shift; + my $output_fd = shift; open(my $fh, "<", $file) || do { print STDERR "parcat: Cannot open $file\n"; exit(1); }; + # Remove file when it has been opened + if($opt::rm) { + unlink $file; + } set_fh_non_blocking($fh); - $q->enqueue(fileno($fh)); $opened++; + # Pass the fileno to parent + $q->enqueue(fileno($fh),$output_fd); # Get an OK that the $fh is opened and we can release the $fh while(1) { my $ok = $okq->dequeue(); @@ -48,31 +68,38 @@ my $s = IO::Select->new(); my %buffer; sub add_file { - my $fd = shift; - open(my $fh, "<&=", $fd) || die; - $s->add($fh); + my $infd = shift; + my $outfd = shift; + open(my $infh, "<&=", $infd) || die; + open(my $outfh, ">&=", $outfd) || die; + $s->add($infh); # Tell the producer now opened here and can be released - $okq->enqueue($fd); + $okq->enqueue($infd); # Initialize the buffer - @{$buffer{$fh}} = (); + @{$buffer{$infh}{$outfd}} = (); + $Global::fh{$outfd} = $outfh; } sub add_files { # Non-blocking dequeue - while(defined(my $fd = $q->dequeue_nb())) { - add_file($fd); - } + my ($infd,$outfd); + do { + ($infd,$outfd) = $q->dequeue_nb(2); + if(defined($outfd)) { + add_file($infd,$outfd); + } + } while(defined($outfd)); } sub add_files_block { # Blocking dequeue - my $fd = $q->dequeue(); - add_file($fd); + my ($infd,$outfd) = $q->dequeue(2); + add_file($infd,$outfd); } my $fd; -my (@ready,$file,$rv,$buf); +my (@ready,$infh,$rv,$buf); do { # Wait until at least one file is opened add_files_block(); @@ -83,43 +110,46 @@ do { if(not @ready) { add_files(); } - for $file (@ready) { - $rv = sysread($file, $buf, 65536); - if (!$rv) { - if($! == EAGAIN) { - # Would block: Nothing read - next; + for $infh (@ready) { + # There is only one key, namely the output file descriptor + for my $outfd (keys %{$buffer{$infh}}) { + $rv = sysread($infh, $buf, 65536); + if (!$rv) { + if($! == EAGAIN) { + # Would block: Nothing read + next; + } else { + # Nothing read, but would not block: + # This file is done + $s->remove($infh); + syswrite($Global::fh{$outfd},"@{$buffer{$infh}{$outfd}}"); + delete $buffer{$infh}; + # Closing the $infh causes it to block + # close $infh; + add_files(); + next; + } + } + # Something read. + # Find \n or \r for full line + my $i = (rindex($buf,"\n")+1); + if($i) { + # Print full line + for(@{$buffer{$infh}{$outfd}}, substr($buf,0,$i)) { + syswrite($Global::fh{$outfd},$_); + } + # @buffer = remaining half line + $buffer{$infh}{$outfd} = [substr($buf,$i,$rv-$i)]; } else { - # This file is done - $s->remove($file); - print @{$buffer{$file}}; - delete $buffer{$file}; - # Closing the $file causes it to block - # close $file; - add_files(); - next; + # Something read, but not a full line + push @{$buffer{$infh}{$outfd}}, $buf; } - } - - # Find \n for full line - my $i = (rindex($buf,"\n")+1); - if($i) { - # Print full line - for(@{$buffer{$file}}, substr($buf,0,$i)) { - syswrite(STDOUT,$_); - } - # @buffer = remaining half line - @{$buffer{$file}} = (substr($buf,$i,$rv-$i)); - redo; - } else { - # Something read, but not a full line - push @{$buffer{$file}}, $buf; redo; } } } } -} while($opened <= $#ARGV); +} while($opened < $files_to_open); for (@producers) { @@ -139,4 +169,3 @@ sub set_fh_non_blocking { $flags |= &O_NONBLOCK; # Add non-blocking to the flags fcntl($fh, &F_SETFL, $flags) || die $!; # Set the flags on the filehandle } - diff --git a/src/parcat.pod b/src/parcat.pod index 60065268..0ba57e48 100644 --- a/src/parcat.pod +++ b/src/parcat.pod @@ -19,15 +19,20 @@ you use: =head1 EXAMPLES -=head2 Do be done +=head2 Simple line buffered output - mkfifo slot-{1..5}-digit-{0..9} - parallel -j5 'seq 100000 | grep {} > slot-{%}-digit-{}' ::: {0..9} & - parallel parcat slot-{1..5}-digit-{} '>' digit-{} ::: {0..9} +GNU Parallel saves output to tempfiles. If the amount of data is +bigger than the free disk space, then you can use this technique to do +line buffering without saving to disk: + + mkfifo slot-{1..5} + seq 10000000 | parallel -j5 --round --pipe 'cat > slot-{%}' & + parcat slot-{1..5} | wc =head1 REPORTING BUGS -GNU B is part of GNU B. Report bugs to . +GNU B is part of GNU B. Report bugs to +. =head1 AUTHOR diff --git a/testsuite/tests-to-run/parallel-local-0.3s.sh b/testsuite/tests-to-run/parallel-local-0.3s.sh index b7c2cf74..16d2a2bc 100644 --- a/testsuite/tests-to-run/parallel-local-0.3s.sh +++ b/testsuite/tests-to-run/parallel-local-0.3s.sh @@ -728,6 +728,25 @@ par_X_eta_div_zero() { perl -pe 's/\d+/0/g' } +par_parcat_args_stdin() { + echo 'bug #51690: parcat: read args from stdin' + tmp1=$(tempfile) + tmp2=$(tempfile) + echo OK1 > $tmp1 + echo OK2 > $tmp2 + (echo $tmp1 + echo $tmp2) | parcat + rm $tmp1 $tmp2 +} + +par_parcat_rm() { + echo 'bug #51691: parcat --rm remove fifo when opened' + tmp1=$(tempfile) + echo OK1 > $tmp1 + parcat --rm $tmp1 + rm $tmp1 2>/dev/null || echo OK file removed +} + export -f $(compgen -A function | grep par_) compgen -A function | grep par_ | sort | parallel -j6 --tag -k --joblog +/tmp/jl-`basename $0` '{} 2>&1' diff --git a/testsuite/tests-to-run/parallel-local-3s.sh b/testsuite/tests-to-run/parallel-local-3s.sh index 9509c403..337d37f2 100644 --- a/testsuite/tests-to-run/parallel-local-3s.sh +++ b/testsuite/tests-to-run/parallel-local-3s.sh @@ -152,5 +152,28 @@ newlines"' ::: a b c d e | sort ) | perl -pe 's/\0//g;s/\d+/./g' } +par_parcat_mixing() { + echo 'parcat output should mix: a b a b' + mktmpfifo() { + tmp=$(tempfile) + rm $tmp + mkfifo $tmp + echo $tmp + } + slow_output() { + string=$1 + perl -e 'print "'$string'"x9000,"start\n"' + sleep 1 + perl -e 'print "'$string'"x9000,"end\n"' + } + tmp1=$(mktmpfifo) + tmp2=$(mktmpfifo) + slow_output a > $tmp1 & + sleep 0.5 + slow_output b > $tmp2 & + parcat $tmp1 $tmp2 | tr -s ab +} + + export -f $(compgen -A function | grep par_) compgen -A function | grep par_ | sort | parallel -j6 --tag -k '{} 2>&1' diff --git a/testsuite/wanted-results/parallel-local-3s b/testsuite/wanted-results/parallel-local-3s index 7919141f..d9799d33 100644 --- a/testsuite/wanted-results/parallel-local-3s +++ b/testsuite/wanted-results/parallel-local-3s @@ -52,6 +52,11 @@ par_multiline_commands finish 2 par_multiline_commands parallel: Warning: Command lines contain newline. Forcing --null. par_multiline_commands 3 par_multiline_commands finish 3 +par_parcat_mixing parcat output should mix: a b a b +par_parcat_mixing astart +par_parcat_mixing bstart +par_parcat_mixing bend +par_parcat_mixing aend par_pipepart_block ### --pipepart --block -# (# < 0) par_pipepart_block 1 par_pipepart_block 2