From 044e46fa74c70d64dd6d9a0c86b36741ce84cef8 Mon Sep 17 00:00:00 2001 From: Ole Tange Date: Thu, 21 Nov 2024 23:10:49 +0100 Subject: [PATCH] parallel: --pipe --block with negative size works like --pipepart. --- doc/haikus | 9 +++-- src/parallel | 73 +++++++++++++++++++++++++++++++---- src/parallel.pod | 24 +++++++----- src/parallel_alternatives.pod | 8 ++-- 4 files changed, 90 insertions(+), 24 deletions(-) diff --git a/doc/haikus b/doc/haikus index 94aa5e21..0b60cfd6 100644 --- a/doc/haikus +++ b/doc/haikus @@ -4,11 +4,12 @@ Quote of the month: -ainda não inventaram palavras capazes de expressar minha gratidão aos desenvolvedores do GNU Parallel --- nueidris ‪@nueidris.kawaii.social‬ -Und die Tage jetzt hab ich GNU parallel für mich entdeckt, auch ne nette Geschichte, gerade wenn's irgendwelche remote APIs sind. --- Vince ‪@dd1des.bsky.social‬ + ainda não inventaram palavras capazes de expressar minha gratidão aos desenvolvedores do GNU Parallel + -- nueidris ‪@nueidris.kawaii.social‬ + + Und die Tage jetzt hab ich GNU parallel für mich entdeckt, auch ne nette Geschichte, gerade wenn's irgendwelche remote APIs sind. + -- Vince ‪@dd1des.bsky.social‬ GNU parallel is so satisfying -- ‪James Coman‬ ‪@jcoman.bsky.social‬ diff --git a/src/parallel b/src/parallel index 7618ec8f..4566a4fc 100755 --- a/src/parallel +++ b/src/parallel @@ -1165,6 +1165,8 @@ sub spreadstdin() { my $blocksize = int($Global::blocksize); my $in = *STDIN; my $timeout = $Global::blocktimeout; + my @parts; + my $everything_read; if($opt::skip_first_line) { my $newline; @@ -1178,12 +1180,14 @@ sub spreadstdin() { my $eof; my $garbage_read; - sub read_block() { - # Read a --blocksize from STDIN + sub read_with_alarm($) { + my ($readsize) = @_; + my ($nread,$alarm,$read_everything); + if($readsize < 0) { + $readsize = -$readsize; + $read_everything = 1; + } # possibly interrupted by --blocktimeout - # Add up to the next full block - my $readsize = $blocksize - (length $buf) % $blocksize; - my ($nread,$alarm); eval { local $SIG{ALRM} = sub { die "alarm\n" }; # NB: \n required # --blocktimeout (or 0 if not set) @@ -1191,13 +1195,13 @@ sub spreadstdin() { if($] >= 5.026) { do { $nread = sysread $in, $buf, $readsize, length $buf; - $readsize -= $nread; + if(not $read_everything) { $readsize -= $nread; } } while($readsize and $nread); } else { # Less efficient reading, but 32-bit sysread compatible do { $nread = sysread($in,substr($buf,length $buf,0),$readsize,0); - $readsize -= $nread; + if(not $read_everything) { $readsize -= $nread; } } while($readsize and $nread); } alarm 0; @@ -1208,7 +1212,60 @@ sub spreadstdin() { } else { $alarm = 0; } - $eof = not ($nread or $alarm); + # Is this EOF? + return not ($nread or $alarm); + } + + sub read_block_per_jobslot() { + if(not $everything_read) { + # Read everything in readsize of 2^17 + # 21 = 20s + # 20 = 16s + # 19 = 16s,15s + # 18 = 15s + # 17 = 14s + # 16 = 14s + # 15 = 14s + $eof = read_with_alarm(-2**17); + if($eof) { + $everything_read = 1; + # Trick the rest of the code to think we are not done reading blocks yet. + $eof = 0; + # Chop into parts + my $total_size = length($buf); + my $jobslots = $Global::max_jobs_running; + my $parts = -$Global::blocksize * $jobslots; + my $part_size = int($total_size / $parts); + for my $i (0 .. $parts - 1) { + my $start = $i * $part_size; + my $end = ($i == $parts - 1) ? $total_size : $start + $part_size; + # Extract the chunk from buffer + push @parts, substr($buf, $start, $end - $start); + } + $buf=""; + } else { + # What do we do here? Caused by alarm. + ::die_bug("Read block from pipe failed"); + } + } + if(@parts) { + # pop part and return that + $buf .= shift @parts; + } else { + # All parts are done: Let the rest of the code know we are EOF + $eof = 1; + } + } + + sub read_block() { + # Read a --blocksize from STDIN + if($Global::blocksize < 0) { + read_block_per_jobslot(); + } else { + # Add up to the next full block + my $readsize = $blocksize - (length $buf) % $blocksize; + $eof = read_with_alarm($readsize); + } } sub pass_n_line_records() { diff --git a/src/parallel.pod b/src/parallel.pod index e4d0d16c..c0022a1d 100644 --- a/src/parallel.pod +++ b/src/parallel.pod @@ -740,19 +740,25 @@ length of one record. For performance reasons I should be bigger than a two records. GNU B will warn you and automatically increase the size if you choose a I that is too small. -If you use B<-N>, B<--block> should be bigger than N+1 records. - I defaults to 1M. -When using B<--pipe-part> a negative block size is not interpreted as a -blocksize but as the number of blocks each jobslot should have. So -this will run 10*5 = 50 jobs in total: +A negative block size is not interpreted as a blocksize but as the +number of blocks each jobslot should have. So B<--block -3> will make +3 jobs for each jobslot. In other words: this will run 3*5 = 15 jobs +in total: - parallel --pipe-part -a myfile --block -10 -j5 wc + parallel --pipe-part -a myfile --block -3 -j5 wc + cat myfile | parallel --pipe --block -3 -j5 wc -This is an efficient alternative to B<--round-robin> because data is -never read by GNU B, but you can still have very few -jobslots process large amounts of data. +B<--pipe-part --block> is an efficient alternative to B<--round-robin> +because data is never read by GNU B, but you can still have +very few jobslots process huge amounts of data. + +On the other hand, B<--pipe --block> is quite I: It reads +the whole file into memory before splitting it. Thus input must be +able to fit in memory. + +If you use B<--block> -I, input should be bigger than I+1 records. See also: UNIT PREFIX B<-N> B<--pipe> B<--pipe-part> B<--round-robin> B<--block-timeout> diff --git a/src/parallel_alternatives.pod b/src/parallel_alternatives.pod index 81cbbc9b..e75c42fc 100644 --- a/src/parallel_alternatives.pod +++ b/src/parallel_alternatives.pod @@ -173,11 +173,13 @@ The following features are in some of the comparable tools: =back -As every new version of the programs are not tested the table may be -outdated. Please file a bug report if you find errors (See REPORTING +Since each new version of the programs is not tested, the table may be +outdated. Please file a bug report if you find errors (See REPORTING BUGS). -parallel: +=head2 GNU Parallel + +Summary (see legend above): =over