parallel: --pipe --block with negative size works like --pipepart.

This commit is contained in:
Ole Tange 2024-11-21 23:10:49 +01:00
parent 6e69bc6b38
commit 044e46fa74
4 changed files with 90 additions and 24 deletions

View file

@ -4,11 +4,12 @@
Quote of the month: Quote of the month:
ainda não inventaram palavras capazes de expressar minha gratidão aos desenvolvedores do GNU Parallel
-- nueidris @nueidris.kawaii.social
Und die Tage jetzt hab ich GNU parallel für mich entdeckt, auch ne nette Geschichte, gerade wenn's irgendwelche remote APIs sind. ainda não inventaram palavras capazes de expressar minha gratidão aos desenvolvedores do GNU Parallel
-- Vince @dd1des.bsky.social -- nueidris @nueidris.kawaii.social
Und die Tage jetzt hab ich GNU parallel für mich entdeckt, auch ne nette Geschichte, gerade wenn's irgendwelche remote APIs sind.
-- Vince @dd1des.bsky.social
GNU parallel is so satisfying GNU parallel is so satisfying
-- James Coman @jcoman.bsky.social -- James Coman @jcoman.bsky.social

View file

@ -1165,6 +1165,8 @@ sub spreadstdin() {
my $blocksize = int($Global::blocksize); my $blocksize = int($Global::blocksize);
my $in = *STDIN; my $in = *STDIN;
my $timeout = $Global::blocktimeout; my $timeout = $Global::blocktimeout;
my @parts;
my $everything_read;
if($opt::skip_first_line) { if($opt::skip_first_line) {
my $newline; my $newline;
@ -1178,12 +1180,14 @@ sub spreadstdin() {
my $eof; my $eof;
my $garbage_read; my $garbage_read;
sub read_block() { sub read_with_alarm($) {
# Read a --blocksize from STDIN my ($readsize) = @_;
my ($nread,$alarm,$read_everything);
if($readsize < 0) {
$readsize = -$readsize;
$read_everything = 1;
}
# possibly interrupted by --blocktimeout # possibly interrupted by --blocktimeout
# Add up to the next full block
my $readsize = $blocksize - (length $buf) % $blocksize;
my ($nread,$alarm);
eval { eval {
local $SIG{ALRM} = sub { die "alarm\n" }; # NB: \n required local $SIG{ALRM} = sub { die "alarm\n" }; # NB: \n required
# --blocktimeout (or 0 if not set) # --blocktimeout (or 0 if not set)
@ -1191,13 +1195,13 @@ sub spreadstdin() {
if($] >= 5.026) { if($] >= 5.026) {
do { do {
$nread = sysread $in, $buf, $readsize, length $buf; $nread = sysread $in, $buf, $readsize, length $buf;
$readsize -= $nread; if(not $read_everything) { $readsize -= $nread; }
} while($readsize and $nread); } while($readsize and $nread);
} else { } else {
# Less efficient reading, but 32-bit sysread compatible # Less efficient reading, but 32-bit sysread compatible
do { do {
$nread = sysread($in,substr($buf,length $buf,0),$readsize,0); $nread = sysread($in,substr($buf,length $buf,0),$readsize,0);
$readsize -= $nread; if(not $read_everything) { $readsize -= $nread; }
} while($readsize and $nread); } while($readsize and $nread);
} }
alarm 0; alarm 0;
@ -1208,7 +1212,60 @@ sub spreadstdin() {
} else { } else {
$alarm = 0; $alarm = 0;
} }
$eof = not ($nread or $alarm); # Is this EOF?
return not ($nread or $alarm);
}
sub read_block_per_jobslot() {
if(not $everything_read) {
# Read everything in readsize of 2^17
# 21 = 20s
# 20 = 16s
# 19 = 16s,15s
# 18 = 15s
# 17 = 14s
# 16 = 14s
# 15 = 14s
$eof = read_with_alarm(-2**17);
if($eof) {
$everything_read = 1;
# Trick the rest of the code to think we are not done reading blocks yet.
$eof = 0;
# Chop into parts
my $total_size = length($buf);
my $jobslots = $Global::max_jobs_running;
my $parts = -$Global::blocksize * $jobslots;
my $part_size = int($total_size / $parts);
for my $i (0 .. $parts - 1) {
my $start = $i * $part_size;
my $end = ($i == $parts - 1) ? $total_size : $start + $part_size;
# Extract the chunk from buffer
push @parts, substr($buf, $start, $end - $start);
}
$buf="";
} else {
# What do we do here? Caused by alarm.
::die_bug("Read block from pipe failed");
}
}
if(@parts) {
# pop part and return that
$buf .= shift @parts;
} else {
# All parts are done: Let the rest of the code know we are EOF
$eof = 1;
}
}
sub read_block() {
# Read a --blocksize from STDIN
if($Global::blocksize < 0) {
read_block_per_jobslot();
} else {
# Add up to the next full block
my $readsize = $blocksize - (length $buf) % $blocksize;
$eof = read_with_alarm($readsize);
}
} }
sub pass_n_line_records() { sub pass_n_line_records() {

View file

@ -740,19 +740,25 @@ length of one record. For performance reasons I<size> should be bigger
than a two records. GNU B<parallel> will warn you and automatically than a two records. GNU B<parallel> will warn you and automatically
increase the size if you choose a I<size> that is too small. increase the size if you choose a I<size> that is too small.
If you use B<-N>, B<--block> should be bigger than N+1 records.
I<size> defaults to 1M. I<size> defaults to 1M.
When using B<--pipe-part> a negative block size is not interpreted as a A negative block size is not interpreted as a blocksize but as the
blocksize but as the number of blocks each jobslot should have. So number of blocks each jobslot should have. So B<--block -3> will make
this will run 10*5 = 50 jobs in total: 3 jobs for each jobslot. In other words: this will run 3*5 = 15 jobs
in total:
parallel --pipe-part -a myfile --block -10 -j5 wc parallel --pipe-part -a myfile --block -3 -j5 wc
cat myfile | parallel --pipe --block -3 -j5 wc
This is an efficient alternative to B<--round-robin> because data is B<--pipe-part --block> is an efficient alternative to B<--round-robin>
never read by GNU B<parallel>, but you can still have very few because data is never read by GNU B<parallel>, but you can still have
jobslots process large amounts of data. very few jobslots process huge amounts of data.
On the other hand, B<--pipe --block> is quite I<inefficient>: It reads
the whole file into memory before splitting it. Thus input must be
able to fit in memory.
If you use B<--block> -I<size>, input should be bigger than I<size>+1 records.
See also: UNIT PREFIX B<-N> B<--pipe> B<--pipe-part> B<--round-robin> See also: UNIT PREFIX B<-N> B<--pipe> B<--pipe-part> B<--round-robin>
B<--block-timeout> B<--block-timeout>

View file

@ -173,11 +173,13 @@ The following features are in some of the comparable tools:
=back =back
As every new version of the programs are not tested the table may be Since each new version of the programs is not tested, the table may be
outdated. Please file a bug report if you find errors (See REPORTING outdated. Please file a bug report if you find errors (See REPORTING
BUGS). BUGS).
parallel: =head2 GNU Parallel
Summary (see legend above):
=over =over