parallel: --pipe engine changed making --pipe alpha quality.

This commit is contained in:
Ole Tange 2020-12-20 22:07:47 +01:00
parent a86a786189
commit 48e29256e2
9 changed files with 149 additions and 101 deletions

25
NEWS
View file

@ -1,3 +1,28 @@
20201222
* --pipe engine changed making --pipe alpha quality.
* --results -.json outputs results as JSON objects on stdout (standard
output).
* --delay 123auto will auto-adjust --delay. If jobs fail due to being
spawned too quickly, --delay will exponentially increase.
* Bug fixes and man page updates.
News about GNU Parallel:
* Best practices for accelerating data migrations using AWS Snowball
Edge
https://aws.amazon.com/blogs/storage/best-practices-for-accelerating-data-migrations-using-aws-snowball-edge/
* Pass the Output of a Command as an Argument for Another
https://www.baeldung.com/linux/pass-cmd-output-as-an-argument
* Warwick RSE Drop-in - Workflow Management Part 2 - GNU Parallel
https://www.youtube.com/watch?v=t_v2Otgt87g
20201122 20201122
* Bug fixes and man page updates. * Bug fixes and man page updates.

View file

@ -1,8 +1,12 @@
Quote of the month: Quote of the month:
GNU Parallel is my single favourite tool for batch processing data
from the command line.
-- Jeff Wintersinger @jwintersinger
Today I'm grateful for GNU parallel, especially with the --colsep and Today I'm grateful for GNU parallel, especially with the --colsep and
--jobs parameters #GiveThanks --jobs parameters #GiveThanks
Erin Young @ErinYoun -- Erin Young @ErinYoun
I also prefer gnu parallel. Mainly because it makes embarrassingly I also prefer gnu parallel. Mainly because it makes embarrassingly
parallel tasks embarrassingly easy to run on the command line. parallel tasks embarrassingly easy to run on the command line.

View file

@ -190,7 +190,7 @@ from:tange@gnu.org
to:parallel@gnu.org, bug-parallel@gnu.org to:parallel@gnu.org, bug-parallel@gnu.org
stable-bcc: Jesse Alama <jessealama@fastmail.fm> stable-bcc: Jesse Alama <jessealama@fastmail.fm>
Subject: GNU Parallel 20201222 ('Maradona/Yeager') released <<[stable]>> Subject: GNU Parallel 20201222 ('Vaccine/Maradona/Yeager/Le Carre') released <<[stable]>>
GNU Parallel 20201222 ('') <<[stable]>> has been released. It is available for download at: http://ftpmirror.gnu.org/parallel/ GNU Parallel 20201222 ('') <<[stable]>> has been released. It is available for download at: http://ftpmirror.gnu.org/parallel/
@ -206,16 +206,17 @@ Quote of the month:
New in this release: New in this release:
<<>> <<>>
https://www.youtube.com/watch?v=t_v2Otgt87g
* Bug fixes and man page updates. * Bug fixes and man page updates.
News about GNU Parallel: News about GNU Parallel:
https://aws.amazon.com/blogs/storage/best-practices-for-accelerating-data-migrations-using-aws-snowball-edge/ * Best practices for accelerating data migrations using AWS Snowball Edge https://aws.amazon.com/blogs/storage/best-practices-for-accelerating-data-migrations-using-aws-snowball-edge/
* Pass the Output of a Command as an Argument for Another https://www.baeldung.com/linux/pass-cmd-output-as-an-argument
* Warwick RSE Drop-in - Workflow Management Part 2 - GNU Parallel https://www.youtube.com/watch?v=t_v2Otgt87g
Get the book: GNU Parallel 2018 http://www.lulu.com/shop/ole-tange/gnu-parallel-2018/paperback/product-23558902.html Get the book: GNU Parallel 2018 http://www.lulu.com/shop/ole-tange/gnu-parallel-2018/paperback/product-23558902.html

View file

@ -1292,7 +1292,6 @@ sub nindex($$) {
$job->set_block($header_ref, $buffer_ref, $job->set_block($header_ref, $buffer_ref,
$endpos, $recstart, $recend); $endpos, $recstart, $recend);
$block_passed = 1; $block_passed = 1;
$job->set_virgin(0);
$written += $job->non_blocking_write(); $written += $job->non_blocking_write();
last; last;
} }
@ -1355,7 +1354,7 @@ sub rindex64($@) {
# Default: search from end # Default: search from end
$pos = defined $pos ? $pos : $strlen; $pos = defined $pos ? $pos : $strlen;
# No point in doing extra work if we don't need to. # No point in doing extra work if we don't need to.
if($strlen < $block_size) { if($strlen < $block_size or $] > 5.022) {
return rindex($$ref, $match, $pos); return rindex($$ref, $match, $pos);
} }
@ -1433,42 +1432,8 @@ sub write_record_to_pipe($$$$$$) {
start_more_jobs(); start_more_jobs();
} }
my $job = shift @Global::virgin_jobs; my $job = shift @Global::virgin_jobs;
# Job is no longer virgin $job->set_block($header_ref, $buffer_ref, $endpos, $recstart, $recend);
$job->set_virgin(0); $job->write_block();
if($opt::retries) {
# Copy $buffer[0..$endpos] to $job->{'block'}
# Remove rec_sep
# Run $job->add_transfersize
$job->set_block($header_ref, $buffer_ref, $endpos,
$recstart, $recend);
if(fork()) {
# Skip
} else {
$job->write($job->block_ref());
close $job->fh(0,"w");
exit(0);
}
} else {
# We ignore the removed rec_sep which is technically wrong.
$job->add_transfersize($endpos + length $$header_ref);
if(fork()) {
# Skip
} else {
# Chop of at $endpos as we do not know how many rec_sep will
# be removed.
substr($$buffer_ref,$endpos,length $$buffer_ref) = "";
# Remove rec_sep
if($opt::remove_rec_sep) {
Job::remove_rec_sep($buffer_ref, $recstart, $recend);
}
$job->write($header_ref);
$job->write($buffer_ref);
close $job->fh(0,"w");
exit(0);
}
}
close $job->fh(0,"w");
return 1; return 1;
} }
@ -2179,7 +2144,7 @@ sub check_invalid_option_combinations() {
sub init_globals() { sub init_globals() {
# Defaults: # Defaults:
$Global::version = 20201207; $Global::version = 20201212;
$Global::progname = 'parallel'; $Global::progname = 'parallel';
$::name = "GNU Parallel"; $::name = "GNU Parallel";
$Global::infinity = 2**31; $Global::infinity = 2**31;
@ -3455,15 +3420,7 @@ sub init_run_jobs() {
push(@Global::virgin_jobs,$job); push(@Global::virgin_jobs,$job);
} else { } else {
# Block already set: This is a retry # Block already set: This is a retry
if(fork()) { $job->write_block();
::debug("pipe","\n\nWriting ",length ${$job->block_ref()},
" to ", $job->seq(),"\n");
close $job->fh(0,"w");
} else {
$job->write($job->block_ref());
close $job->fh(0,"w");
exit(0);
}
} }
} }
debug("start", "Started as seq ", $job->seq(), debug("start", "Started as seq ", $job->seq(),
@ -8468,8 +8425,6 @@ sub filter_through_compress($) {
} }
} }
sub set_fh($$$$) { sub set_fh($$$$) {
# Set file handle # Set file handle
my ($self, $fd_no, $key, $fh) = @_; my ($self, $fd_no, $key, $fh) = @_;
@ -8482,6 +8437,32 @@ sub fh($) {
return $self->{'fd'}{$fd_no,$key}; return $self->{'fd'}{$fd_no,$key};
} }
sub write_block($) {
my $self = shift;
my $stdin_fh = $self->fh(0,"w");
if(fork()) {
# Close in parent
close $stdin_fh;
} else {
# If writing is to a closed pipe:
# Do not call signal handler, but let nothing be written
local $SIG{PIPE} = undef;
for my $part (
grep { defined $_ }
$self->{'header'},$self->{'block'}) {
# syswrite may not write all in one go,
# so make sure everything is written.
my $written;
while($written = syswrite($stdin_fh,$$part)) {
substr($$part,0,$written) = "";
}
}
close $stdin_fh;
exit(0);
}
}
sub write($) { sub write($) {
my $self = shift; my $self = shift;
my $remaining_ref = shift; my $remaining_ref = shift;
@ -8514,22 +8495,37 @@ sub set_block($$$$$$) {
# N/A # N/A
my $self = shift; my $self = shift;
my ($header_ref,$buffer_ref,$endpos,$recstart,$recend) = @_; my ($header_ref,$buffer_ref,$endpos,$recstart,$recend) = @_;
$self->{'block'} = ($self->virgin() ? $$header_ref : ""). $self->{'header'} = $header_ref;
substr($$buffer_ref,0,$endpos); if($opt::roundrobin or $opt::remove_rec_sep or $opt::retries) {
if($opt::remove_rec_sep) { my $a = "";
remove_rec_sep(\$self->{'block'},$recstart,$recend); if(($opt::roundrobin or $opt::retries) and $self->virgin()) {
$a .= $$header_ref;
}
# Job is no longer virgin
$self->set_virgin(0);
# Make a full copy because $buffer will change
$a .= substr($$buffer_ref,0,$endpos);
$self->{'block'} = \$a;
if($opt::remove_rec_sep) {
remove_rec_sep($self->{'block'},$recstart,$recend);
}
$self->{'block_length'} = length ${$self->{'block'}};
} else {
$self->set_virgin(0);
for(substr($$buffer_ref,0,$endpos)) {
$self->{'block'} = \$_;
}
$self->{'block_length'} = $endpos + length ${$self->{'header'}};
} }
$self->{'block_length'} = length $self->{'block'};
$self->{'block_pos'} = 0; $self->{'block_pos'} = 0;
$self->add_transfersize($self->{'block_length'}); $self->add_transfersize($self->{'block_length'});
} }
sub block_ref($) { sub block_ref($) {
my $self = shift; my $self = shift;
return \$self->{'block'}; return $self->{'block'};
} }
sub block_length($) { sub block_length($) {
my $self = shift; my $self = shift;
return $self->{'block_length'}; return $self->{'block_length'};
@ -8564,7 +8560,7 @@ sub non_blocking_write($) {
my $in = $self->fh(0,"w"); my $in = $self->fh(0,"w");
my $rv = syswrite($in, my $rv = syswrite($in,
substr($self->{'block'},$self->{'block_pos'})); substr(${$self->{'block'}},$self->{'block_pos'}));
if (!defined($rv) && $! == ::EAGAIN()) { if (!defined($rv) && $! == ::EAGAIN()) {
# would block - but would have written # would block - but would have written
$something_written = 0; $something_written = 0;

View file

@ -12,15 +12,14 @@ install_packages() {
test_pkgs="$test_pkgs xterm libc6-i386 libcrypt1:i386" test_pkgs="$test_pkgs xterm libc6-i386 libcrypt1:i386"
test_pkgs="$test_pkgs libtest-nowarnings-perl" test_pkgs="$test_pkgs libtest-nowarnings-perl"
# DEBIAN package # Debian package
packaging_pkgs="dpkg-dev build-essential debhelper osc cvs automake-1.15" packaging_pkgs="dpkg-dev build-essential debhelper osc cvs automake-1.15"
packaging_pkgs="$packaging_pkgs python3-m2crypto alien" packaging_pkgs="$packaging_pkgs python3-m2crypto alien"
# SHEBANG TOOLS # Shebang Tools
shebang_pkgs="gnuplot octave ruby r-base-core" shebang_pkgs="gnuplot octave ruby r-base-core"
# BUILD TOOLS
build_pkgs="bison" # SQL Tools
# SQL TOOLS
sql_pkgs="libdbd-pg-perl libdbd-sqlite3-perl libdbd-csv-perl" sql_pkgs="libdbd-pg-perl libdbd-sqlite3-perl libdbd-csv-perl"
sql_pkgs="$sql_pkgs libdbd-mysql-perl rlwrap" sql_pkgs="$sql_pkgs libdbd-mysql-perl rlwrap"
@ -37,8 +36,8 @@ install_packages() {
# Databases # Databases
database_pkgs="postgresql mysql-server sqlite" database_pkgs="postgresql mysql-server sqlite"
# Build packages # Build Tools
build_pkgs="libxxhash-dev libzstd-dev liblz4-dev libssl-dev" build_pkgs="bison libxxhash-dev libzstd-dev liblz4-dev libssl-dev"
build_pkgs="$build_pkgs python3-cmarkgfm" build_pkgs="$build_pkgs python3-cmarkgfm"
sudo dpkg --add-architecture i386; sudo apt update sudo dpkg --add-architecture i386; sudo apt update
@ -351,11 +350,13 @@ rsync_versions() {
cd rsync-$1 cd rsync-$1
git reset --hard git reset --hard
git checkout $1 git checkout $1
perl -i -pe 's/AC_DEFINE_UNQUOTED.HAVE_REMSH, .HAVE_REMSH./AC_DEFINE_UNQUOTED(HAVE_REMSH, \$HAVE_REMSH,[dummy])/;
s/AC_DEFINE.HAVE_ERRNO_DECL.,/AC_DEFINE(HAVE_ERRNO_DECL,[1],[dummy]),/;
s/AC_DEFINE.HAVE_FNMATCH.,/AC_DEFINE(HAVE_FNMATCH,[1],[dummy]),/;' configure.in
autoreconf --install -W gnu autoreconf --install -W gnu
make proto
# Make "lib/addrinfo.h" ? # Make "lib/addrinfo.h" ?
LDFLAGS=-static ./configure && LDFLAGS=-static ./configure &&
make -j2 && (make proto; make -j2) &&
sudo cp rsync /usr/local/bin/rsync-$1 sudo cp rsync /usr/local/bin/rsync-$1
} }
export -f make_one export -f make_one

View file

@ -232,13 +232,17 @@ par_delimiter() {
par_argfile() { par_argfile() {
echo '### Test -a and --arg-file: Read input from file instead of stdin' echo '### Test -a and --arg-file: Read input from file instead of stdin'
seq 1 10 >/tmp/parallel_$$-1; parallel -k -a /tmp/parallel_$$-1 echo; rm /tmp/parallel_$$-1 tmp=$(mktemp)
seq 1 10 >/tmp/parallel_$$-2; parallel -k --arg-file /tmp/parallel_$$-2 echo; rm /tmp/parallel_$$-2 seq 1 10 >$tmp
parallel -k -a $tmp echo
parallel -k --arg-file $tmp echo
rm $tmp
} }
par_pipe_unneeded_procs() { par_pipe_unneeded_procs() {
echo '### Test bug #34241: --pipe should not spawn unneeded processes' echo '### Test bug #34241: --pipe should not spawn unneeded processes'
seq 3 | parallel -j30 --pipe --block-size 10 cat\;echo o 2> >(grep -Ev 'Warning: Starting|Warning: Consider') seq 3 |
parallel -j30 --pipe --block-size 10 cat\;echo o 2> >(grep -Ev 'Warning: Starting|Warning: Consider')
} }
par_results_arg_256() { par_results_arg_256() {
@ -257,21 +261,28 @@ par_pipe_to_func() {
echo pipefunc and more OK | parallel --pipe 'myfunc {#};echo and more OK' echo pipefunc and more OK | parallel --pipe 'myfunc {#};echo and more OK'
} }
par_roundrobin_k() {
echo '### Test -k --round-robin'
seq 1000000 | parallel -j4 -k --round-robin --pipe wc
}
par_pipepart_roundrobin() { par_pipepart_roundrobin() {
echo '### bug #45769: --round-robin --pipepart gives wrong results' echo '### bug #45769: --round-robin --pipepart gives wrong results'
seq 10000 >/tmp/seq10000 tmp=$(mktemp)
parallel -j2 --pipepart -a /tmp/seq10000 --block 14 --round-robin wc | wc -l seq 10000 >$tmp
rm /tmp/seq10000 parallel -j2 --pipepart -a $tmp --block 14 --round-robin wc | wc -l
rm $tmp
} }
par_pipepart_header() { par_pipepart_header() {
echo '### bug #44614: --pipepart --header off by one' echo '### bug #44614: --pipepart --header off by one'
seq 10 >/tmp/parallel_44616 tmp=$(mktemp)
parallel --pipepart -a /tmp/parallel_44616 -k --block 5 'echo foo; cat' seq 10 >$tmp
parallel --pipepart -a /tmp/parallel_44616 -k --block 2 --regexp --recend 3'\n' 'echo foo; cat' parallel --pipepart -a $tmp -k --block 5 'echo foo; cat'
rm /tmp/parallel_44616 parallel --pipepart -a $tmp -k --block 2 --regexp --recend 3'\n' 'echo foo; cat'
rm $tmp
} }
par_quote() { par_quote() {
@ -297,9 +308,10 @@ par_read_from_stdin() {
par_total_from_joblog() { par_total_from_joblog() {
echo 'bug #47086: [PATCH] Initialize total_completed from joblog' echo 'bug #47086: [PATCH] Initialize total_completed from joblog'
rm -f /tmp/parallel-47086 tmp=$(mktemp)
parallel -j1 --joblog /tmp/parallel-47086 --halt now,fail=1 echo '{= $_=$Global::total_completed =};exit {}' ::: 0 0 0 1 0 0 parallel -j1 --joblog $tmp --halt now,fail=1 echo '{= $_=$Global::total_completed =};exit {}' ::: 0 0 0 1 0 0
parallel -j1 --joblog /tmp/parallel-47086 --halt now,fail=1 --resume echo '{= $_=$Global::total_completed =};exit {}' ::: 0 0 0 1 0 0 parallel -j1 --joblog $tmp --halt now,fail=1 --resume echo '{= $_=$Global::total_completed =};exit {}' ::: 0 0 0 1 0 0
rm $tmp
} }
par_xapply() { par_xapply() {
@ -430,9 +442,11 @@ par_empty_line() {
par_append_joblog() { par_append_joblog() {
echo '### can you append to a joblog using +' echo '### can you append to a joblog using +'
parallel --joblog /tmp/parallel_append_joblog echo ::: 1 tmp=$(mktemp)
parallel --joblog +/tmp/parallel_append_joblog echo ::: 1 parallel --joblog $tmp echo ::: 1
wc -l /tmp/parallel_append_joblog parallel --joblog +$tmp echo ::: 1
wc -l < $tmp
rm $tmp
} }
par_file_ending_in_newline() { par_file_ending_in_newline() {
@ -606,11 +620,12 @@ par_pipe_tag_v() {
par_dryrun_append_joblog() { par_dryrun_append_joblog() {
echo '--dry-run should not append to joblog' echo '--dry-run should not append to joblog'
rm -f /tmp/jl.$$ tmp=$(mktemp)
parallel -k --jl /tmp/jl.$$ echo ::: 1 2 3 parallel -k --jl $tmp echo ::: 1 2 3
parallel --dryrun -k --jl +/tmp/jl.$$ echo ::: 1 2 3 4 parallel --dryrun -k --jl +$tmp echo ::: 1 2 3 4
# Job 4 should not show up: 3 lines + header = 4 # Job 4 should not show up: 3 lines + header = 4
wc -l < /tmp/jl.$$ wc -l < $tmp
rm $tmp
} }
par_0_no_newline() { par_0_no_newline() {
@ -660,8 +675,9 @@ par_slow_pipe_regexp() {
par_results() { par_results() {
echo "### --results test.csv" echo "### --results test.csv"
parallel -k --results /tmp/$$.csv echo ::: a b c tmp=$(mktemp)
rm /tmp/$$.csv parallel -k --results "$tmp"-dir echo ::: a b c
rm -r $tmp "$tmp"-dir
} }
par_testquote() { par_testquote() {
@ -874,7 +890,7 @@ par_group-by_colsep_space() {
} }
par_json() { par_json() {
printf '"\t\\"' | parallel --results -.json echo :::: - ::: '"' '\\' | printf '"\t\\"' | parallel -k --results -.json echo :::: - ::: '"' '\\' |
perl -pe 's/\d/0/g' perl -pe 's/\d/0/g'
} }

View file

@ -111,17 +111,17 @@ par_delay_Xauto() {
exit $m;' $1; exit $m;' $1;
} }
export -f doit export -f doit
#seq 1000 | ppar --jl - -v --delay 0.1auto -q doit "$tmp"
before=`date +%s` before=`date +%s`
out=$(seq 30 | parallel --delay 0.03 -q doit "$tmp") out=$(seq 30 | parallel --delay 0.03 -q doit "$tmp")
after=`date +%s` after=`date +%s`
normaldiff=$((after-before)) # Round to 5 seconds
normaldiff=$(( (after-before)/5 ))
echo $normaldiff echo $normaldiff
before=`date +%s` before=`date +%s`
out=$(seq 30 | parallel --delay 0.03auto -q doit "$tmp") out=$(seq 30 | parallel --delay 0.03auto -q doit "$tmp")
after=`date +%s` after=`date +%s`
autodiff=$((after-before)) autodiff=$(( (after-before)/5 ))
echo $autodiff echo $autodiff
rm "$tmp" rm "$tmp"

View file

@ -14,7 +14,7 @@ par_X_eta_div_zero 0:local / 0 / 0
par_append_joblog ### can you append to a joblog using + par_append_joblog ### can you append to a joblog using +
par_append_joblog 1 par_append_joblog 1
par_append_joblog 1 par_append_joblog 1
par_append_joblog 3 /tmp/parallel_append_joblog par_append_joblog 3
par_argfile ### Test -a and --arg-file: Read input from file instead of stdin par_argfile ### Test -a and --arg-file: Read input from file instead of stdin
par_argfile 1 par_argfile 1
par_argfile 2 par_argfile 2
@ -828,6 +828,11 @@ par_retries_replacement_string 22
par_retries_replacement_string 33 par_retries_replacement_string 33
par_retries_replacement_string 33 par_retries_replacement_string 33
par_retries_replacement_string 33 par_retries_replacement_string 33
par_roundrobin_k ### Test -k --round-robin
par_roundrobin_k 315464 315464 2097143
par_roundrobin_k 299592 299592 2097144
par_roundrobin_k 235148 235148 1646037
par_roundrobin_k 149796 149796 1048572
par_rpl_that_is_substring_of_longer_rpl ### --rpl % that is a substring of longer --rpl %D par_rpl_that_is_substring_of_longer_rpl ### --rpl % that is a substring of longer --rpl %D
par_rpl_that_is_substring_of_longer_rpl a.b/c.d/e.f=a.b/c.d/e.f par_rpl_that_is_substring_of_longer_rpl a.b/c.d/e.f=a.b/c.d/e.f
par_rpl_that_is_substring_of_longer_rpl a.b/c.d=a.b/c.d par_rpl_that_is_substring_of_longer_rpl a.b/c.d=a.b/c.d

View file

@ -7,8 +7,8 @@ par_bug56403 1 job1b
par_bug56403 2 job2cjob2d par_bug56403 2 job2cjob2d
par_delay_Xauto TODO: --retries for those that fail and --sshdelay par_delay_Xauto TODO: --retries for those that fail and --sshdelay
par_delay_Xauto ### bug #58911: --delay Xauto par_delay_Xauto ### bug #58911: --delay Xauto
par_delay_Xauto 1 par_delay_Xauto 0
par_delay_Xauto 25 par_delay_Xauto 5
par_distribute_input_by_ability ### bug #48290: round-robin does not distribute data based on business par_distribute_input_by_ability ### bug #48290: round-robin does not distribute data based on business
par_distribute_input_by_ability ### Distribute input to jobs that are ready par_distribute_input_by_ability ### Distribute input to jobs that are ready
par_distribute_input_by_ability Job-slot n is 50% slower than n+1, so the order should be 1..7 par_distribute_input_by_ability Job-slot n is 50% slower than n+1, so the order should be 1..7