parallel: Fixed bug #56275: --shard with named column and perl expr.

This commit is contained in:
Ole Tange 2019-06-17 01:21:45 +02:00
parent 17a5fd702d
commit 5d81c7c053
10 changed files with 206 additions and 49 deletions

View file

@ -1,5 +1,17 @@
Quote of the month: Quote of the month:
I love so much @GnuParallel to parallelize my SQL requests on @PostgreSQL
-- @rmaziere_85 Romain
I want to make a shout-out for @GnuParallel, it's a work of beauty and power
-- Cristian Consonni @CristianCantoro
Parallel is the BEST command.
-- Nick @NickInfoSec
It is SUPER easy to speed up jobs from the command line w/ GNU parallel.
-- B3n @B3njaminHimes@twitter
GNU parallel really changed how I do a lot of data processing stuff GNU parallel really changed how I do a lot of data processing stuff
-- Brendan Dolan-Gavitt @moyix@twitter -- Brendan Dolan-Gavitt @moyix@twitter

View file

@ -38,11 +38,20 @@ newest version.
Before GNU Parallel was a GNU tool, it started as a wrapper around Before GNU Parallel was a GNU tool, it started as a wrapper around
`make -j`. But GNU Parallel grew, and was no longer just a small `make -j`. But GNU Parallel grew, and was no longer just a small
hack. To make the code easier to maintain it was rewritten to object hack.
orientation.
This would not have been possible if the test suite had not been so The design goals included not requiring a compiler, compatibility with
thorough: It made it much easier to see if old operating systems, and single file program. This limited the
languages tremendously.
Perl and Python were in practice the only possibilities. Python was at
the time quite slow, ressource hungry, and not as widely installed as
Perl. So Perl was the choice.
To make the code easier to maintain it was rewritten to object
orientation. This would not have been possible if the test suite had
not been so thorough: It made it much easier to see if a code change
cause change in behaviour.
=head2 --tollef =head2 --tollef

View file

@ -206,9 +206,9 @@ from:tange@gnu.org
to:parallel@gnu.org, bug-parallel@gnu.org to:parallel@gnu.org, bug-parallel@gnu.org
stable-bcc: Jesse Alama <jessealama@fastmail.fm> stable-bcc: Jesse Alama <jessealama@fastmail.fm>
Subject: GNU Parallel 20190522 ('Akihito') released <<[stable]>> Subject: GNU Parallel 20190622 ('Frederiksen/Saybie/HongKong') released <<[stable]>>
GNU Parallel 20190522 ('') <<[stable]>> has been released. It is available for download at: http://ftpmirror.gnu.org/parallel/ GNU Parallel 20190622 ('') <<[stable]>> has been released. It is available for download at: http://ftpmirror.gnu.org/parallel/
<<No new functionality was introduced so this is a good candidate for a stable release.>> <<No new functionality was introduced so this is a good candidate for a stable release.>>
@ -218,19 +218,16 @@ See https://www.gnu.org/software/parallel/10-years-anniversary.html
Quote of the month: Quote of the month:
Amazingly useful script! <<>>
-- unxusr@reddit.com
New in this release: New in this release:
* --group-by groups lines depending on value of a column. The value can be computed. https://livefreeordichotomize.com/2019/06/04/using_awk_and_r_to_parse_25tb/
* How to compress (bzip / gzip) a very large text quickly? https://medium.com/@gchandra/how-to-compress-bzip-gzip-a-very-large-text-quickly-27c11f4c6681 https://zh.wikipedia.org/wiki/GNU_parallel
chck NEWS http://parallelandvisualtestingwithbehat.blogspot.com/p/blog-page.html
* Simple tutorial to install & use GNU Parallel https://medium.com/@gchandra/simple-tutorial-to-install-use-gnu-parallel-79251120d618 * GNU Parallel Akihito released https://linuxreviews.org/GNU_Parallel_Akihito_released
* Introducing Parallel into Shell https://petelawson.com/post/parallel-in-shell/
* Bug fixes and man page updates. * Bug fixes and man page updates.

View file

@ -378,6 +378,8 @@ sub sharder_script() {
my $col = shift; my $col = shift;
# Which columns to shard on (count from 0) # Which columns to shard on (count from 0)
my $col0 = $col - 1; my $col0 = $col - 1;
# Perl expression
my $perlexpr = shift;
my $bins = @ARGV; my $bins = @ARGV;
# Open fifos for writing, fh{0..$bins} # Open fifos for writing, fh{0..$bins}
my $t = 0; my $t = 0;
@ -388,12 +390,26 @@ sub sharder_script() {
# so unlink only happens when it is ready # so unlink only happens when it is ready
unlink $_; unlink $_;
} }
if($perlexpr) {
my $subref = eval("sub { no strict; no warnings; $perlexpr }");
while(<STDIN>) {
# Split into $col columns (no need to split into more)
@F = split $sep, $_, $col+1;
{
local $_ = $F[$col0];
&$subref();
$fh = $fh{ hex(B::hash($_))%$bins };
}
print $fh $_;
}
} else {
while(<STDIN>) { while(<STDIN>) {
# Split into $col columns (no need to split into more) # Split into $col columns (no need to split into more)
@F = split $sep, $_, $col+1; @F = split $sep, $_, $col+1;
$fh = $fh{ hex(B::hash($F[$col0]))%$bins }; $fh = $fh{ hex(B::hash($F[$col0]))%$bins };
print $fh $_; print $fh $_;
} }
}
# Close all open fifos # Close all open fifos
close values %fh; close values %fh;
}; };
@ -423,6 +439,20 @@ sub pipe_shard_setup() {
my $script = sharder_script(); my $script = sharder_script();
# cat foo | sharder sep col fifo1 fifo2 fifo3 ... fifoN # cat foo | sharder sep col fifo1 fifo2 fifo3 ... fifoN
if($opt::shard =~ /^[a-z_][a-z_0-9]*(\s|$)/i) {
# Group by column name
# (Yes, this will also wrongly match a perlexpr like: chop)
my($read,$char,@line);
# A full line, but nothing more (the rest must be read by the child)
# $Global::header used to prepend block to each job
do {
$read = sysread(STDIN,$char,1);
push @line, $char;
} while($read and $char ne "\n");
$Global::header = join "", @line;
}
my ($col, $perlexpr, $subref) =
column_perlexpr($opt::shard, $Global::header, $opt::colsep);
if(not fork()) { if(not fork()) {
# Let the sharder inherit our stdin # Let the sharder inherit our stdin
# and redirect stdout to null # and redirect stdout to null
@ -432,7 +462,7 @@ sub pipe_shard_setup() {
$ENV{'PERL_HASH_SEED'} = $$; $ENV{'PERL_HASH_SEED'} = $$;
exec qw(parallel --block 100k -q --pipe -j), $njobs, exec qw(parallel --block 100k -q --pipe -j), $njobs,
qw(--roundrobin -u perl -e), $script, ($opt::colsep || ","), qw(--roundrobin -u perl -e), $script, ($opt::colsep || ","),
$opt::shard, '{}', (map { (':::+', @{$_}) } @parcatfifos); $col, $perlexpr, '{}', (map { (':::+', @{$_}) } @parcatfifos);
} }
# For each fifo # For each fifo
# (rm fifo1; grep 1) < fifo1 # (rm fifo1; grep 1) < fifo1

View file

@ -2155,13 +2155,17 @@ Only supported in B<Ash, Bash, Dash, Ksh, Sh, and Zsh>.
See also B<--env>, B<--record-env>. See also B<--env>, B<--record-env>.
=item B<--shard> I<shardcol> (beta testing) =item B<--shard> I<shardexpr> (alpha testing)
Use column I<shardcol> as shard key and shard input to the jobs. Use I<shardexpr> as shard key and shard input to the jobs.
Each input line is split using B<--colsep>. The value in the I<shardexpr> is [column number|column name] [perlexpression] e.g. 3,
I<shardcol> column is hashed so that all lines of a given value is Address, 3 $_%=100, Address s/\d//g.
given to the same job slot.
Each input line is split using B<--colsep>. The value of the column is
put into $_, the perl expression is executed, the resulting value is
hashed so that all lines of a given value is given to the same job
slot.
This is similar to sharding in databases. This is similar to sharding in databases.

View file

@ -68,8 +68,12 @@ The commands to run can be an array:
echo "${data[1]}" echo "${data[1]}"
echo "${data[2]}" echo "${data[2]}"
B<parset> can not be part of a pipe. In particular this means it B<parset> can read from stdin (standard input) if it is a file:
cannot read anything from standard input (stdin) or write to a pipe:
parset res echo < parallel_input_file
but B<parset> can not be part of a pipe. In particular this means it
cannot read from a pipe or write to a pipe:
seq 10 | parset res echo Does not work seq 10 | parset res echo Does not work
@ -124,8 +128,12 @@ Put output into vars B<$seq, $pwd, $ls>:
parset "seq pwd ls" ::: "seq 10" pwd ls parset "seq pwd ls" ::: "seq 10" pwd ls
echo "$ls" echo "$ls"
B<parset> can not be part of a pipe. In particular this means it B<parset> can read from stdin (standard input) if it is a file:
cannot read anything from standard input (stdin) or write to a pipe:
parset res1,res2,res3 echo < parallel_input_file
but B<parset> can not be part of a pipe. In particular this means it
cannot read from a pipe or write to a pipe:
seq 3 | parset res1,res2,res3 echo Does not work seq 3 | parset res1,res2,res3 echo Does not work

View file

@ -4,6 +4,18 @@
# Each should be taking 10-30s and be possible to run in parallel # Each should be taking 10-30s and be possible to run in parallel
# I.e.: No race conditions, no logins # I.e.: No race conditions, no logins
par_kill_hup() {
echo '### Are children killed if GNU Parallel receives HUP? There should be no sleep at the end'
parallel -j 2 -q bash -c 'sleep {} & pid=$!; wait $pid' ::: 3 3 3 3 3 3 3 3 3 3 3 3 3 3 3 3 3 3 3 3 &
T=$!
sleep 9.9
pstree $$
kill -HUP $T
sleep 2
pstree $$
}
par_parset() { par_parset() {
echo '### test parset' echo '### test parset'
. `which env_parallel.bash` . `which env_parallel.bash`

View file

@ -61,6 +61,37 @@ par_shard() {
} }
shard_on_col 1 shard_on_col 1
shard_on_col 2 shard_on_col 2
shard_on_col_name() {
colname=$1
col=$2
(echo AB; seq 10 99 | shuf) | perl -pe 's/(.)/$1\t/g' |
parallel --header : --pipe --shard $colname -j2 --colsep "\t" sort -k$col |
field $col | uniq -c | sort
}
shard_on_col_name A 1
shard_on_col_name B 2
shard_on_col_expr() {
colexpr="$1"
col=$2
(seq 10 99 | shuf) | perl -pe 's/(.)/$1\t/g' |
parallel --pipe --shard "$colexpr" -j2 --colsep "\t" "sort -k$col; echo c1 c2" |
field $col | uniq -c | sort
}
shard_on_col_expr '1 $_%=3' 1
shard_on_col_expr '2 $_%=3' 2
shard_on_col_name_expr() {
colexpr="$1"
col=$2
(echo AB; seq 10 99 | shuf) | perl -pe 's/(.)/$1\t/g' |
parallel --header : --pipe --shard "$colexpr" -j2 --colsep "\t" "sort -k$col; echo c1 c2" |
field $col | uniq -c | sort
}
shard_on_col_name_expr 'A $_%=3' 1
shard_on_col_name_expr 'B $_%=3' 2
echo '*** broken' echo '*** broken'
# Shorthand for --pipe -j+0 # Shorthand for --pipe -j+0
seq 100000 | parallel --shard 1 wc | seq 100000 | parallel --shard 1 wc |
@ -148,18 +179,6 @@ par_kill_term() {
pstree $$ pstree $$
} }
par_kill_hup() {
echo '### Are children killed if GNU Parallel receives HUP? There should be no sleep at the end'
parallel -j 2 -q bash -c 'sleep {} & pid=$!; wait $pid' ::: 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 &
T=$!
sleep 2.9
pstree $$
kill -HUP $T
sleep 2
pstree $$
}
par_kill_int_twice() { par_kill_int_twice() {
echo '### Are children killed if GNU Parallel receives INT twice? There should be no sleep at the end' echo '### Are children killed if GNU Parallel receives INT twice? There should be no sleep at the end'

View file

@ -258,6 +258,15 @@ par_kill_children_timeout parallel: Warning: This job was killed because it time
par_kill_children_timeout parallel: Warning: doit 1000000001 par_kill_children_timeout parallel: Warning: doit 1000000001
par_kill_children_timeout 2 par_kill_children_timeout 2
par_kill_children_timeout 0 0 0 par_kill_children_timeout 0 0 0
par_kill_hup ### Are children killed if GNU Parallel receives HUP? There should be no sleep at the end
par_kill_hup bash-+-perl
par_kill_hup `-pstree
par_kill_hup parallel: SIGHUP received. No new jobs will be started.
par_kill_hup parallel: Waiting for these 2 jobs to finish. Send SIGTERM to stop now.
par_kill_hup parallel: bash -c 'sleep 3 & pid=$!; wait $pid'
par_kill_hup parallel: bash -c 'sleep 3 & pid=$!; wait $pid'
par_kill_hup bash-+-perl---2*[bash---sleep]
par_kill_hup `-pstree
par_line_buffer ### --line-buffer par_line_buffer ### --line-buffer
par_line_buffer 55 55 120 par_line_buffer 55 55 120
par_line_buffer These must diff: 1 par_line_buffer These must diff: 1

View file

@ -1122,14 +1122,6 @@ par_jobslot_repl 2
par_jobslot_repl 1 par_jobslot_repl 1
par_jobslot_repl 2 par_jobslot_repl 2
par_jobslot_repl 1 par_jobslot_repl 1
par_kill_hup ### Are children killed if GNU Parallel receives HUP? There should be no sleep at the end
par_kill_hup bash-+-perl---2*[bash---sleep]
par_kill_hup `-pstree
par_kill_hup parallel: SIGHUP received. No new jobs will be started.
par_kill_hup parallel: Waiting for these 2 jobs to finish. Send SIGTERM to stop now.
par_kill_hup parallel: bash -c 'sleep 1 & pid=$!; wait $pid'
par_kill_hup parallel: bash -c 'sleep 1 & pid=$!; wait $pid'
par_kill_hup bash---pstree
par_kill_int_twice ### Are children killed if GNU Parallel receives INT twice? There should be no sleep at the end par_kill_int_twice ### Are children killed if GNU Parallel receives INT twice? There should be no sleep at the end
par_kill_int_twice bash-+-perl---bash---sleep par_kill_int_twice bash-+-perl---bash---sleep
par_kill_int_twice `-pstree par_kill_int_twice `-pstree
@ -1288,6 +1280,71 @@ par_shard 9 6
par_shard 9 7 par_shard 9 7
par_shard 9 8 par_shard 9 8
par_shard 9 9 par_shard 9 9
par_shard 10 1
par_shard 10 2
par_shard 10 3
par_shard 10 4
par_shard 10 5
par_shard 10 6
par_shard 10 7
par_shard 10 8
par_shard 10 9
par_shard 9 0
par_shard 9 1
par_shard 9 2
par_shard 9 3
par_shard 9 4
par_shard 9 5
par_shard 9 6
par_shard 9 7
par_shard 9 8
par_shard 9 9
par_shard 10 1
par_shard 10 2
par_shard 10 3
par_shard 10 4
par_shard 10 5
par_shard 10 6
par_shard 10 7
par_shard 10 8
par_shard 10 9
par_shard 1 c1
par_shard 1 c1
par_shard 1 c2
par_shard 1 c2
par_shard 9 0
par_shard 9 1
par_shard 9 2
par_shard 9 3
par_shard 9 4
par_shard 9 5
par_shard 9 6
par_shard 9 7
par_shard 9 8
par_shard 9 9
par_shard 10 1
par_shard 10 2
par_shard 10 3
par_shard 10 4
par_shard 10 5
par_shard 10 6
par_shard 10 7
par_shard 10 8
par_shard 10 9
par_shard 1 c1
par_shard 1 c1
par_shard 1 c2
par_shard 1 c2
par_shard 9 0
par_shard 9 1
par_shard 9 2
par_shard 9 3
par_shard 9 4
par_shard 9 5
par_shard 9 6
par_shard 9 7
par_shard 9 8
par_shard 9 9
par_shard *** broken par_shard *** broken
par_shard parallel: Error: --tee requres --jobs to be higher. Try --jobs 0. par_shard parallel: Error: --tee requres --jobs to be higher. Try --jobs 0.
par_shard parallel: Error: --tee requres --jobs to be higher. Try --jobs 0. par_shard parallel: Error: --tee requres --jobs to be higher. Try --jobs 0.