From 904cda6e19759b3ca2464670ab0043600847d1cf Mon Sep 17 00:00:00 2001 From: Ole Tange Date: Thu, 14 Feb 2019 01:35:32 +0100 Subject: [PATCH] parallel: --demux initial version. --- doc/release_new_version | 2 +- src/parallel | 67 ++++++++++++++++++- src/parallel.pod | 17 +++++ testsuite/tests-to-run/parallel-local-0.3s.sh | 5 ++ testsuite/wanted-results/parallel-local-0.3s | 2 + 5 files changed, 90 insertions(+), 3 deletions(-) diff --git a/doc/release_new_version b/doc/release_new_version index 3efc2e33..8c3009e4 100644 --- a/doc/release_new_version +++ b/doc/release_new_version @@ -207,7 +207,7 @@ from:tange@gnu.org to:parallel@gnu.org, bug-parallel@gnu.org stable-bcc: Jesse Alama -Subject: GNU Parallel 20190222 ('Al-Baghuz Fawqani') released <<[stable]>> +Subject: GNU Parallel 20190222 ('INF/Al-Baghuz Fawqani') released <<[stable]>> GNU Parallel 20190222 ('') <<[stable]>> has been released. It is available for download at: http://ftpmirror.gnu.org/parallel/ diff --git a/src/parallel b/src/parallel index 6b2520cc..3ce1f620 100755 --- a/src/parallel +++ b/src/parallel @@ -196,6 +196,63 @@ sub pipe_tee_setup() { @Global::cat_appends = map { ") < $_" } @fifos; } +sub pipe_demultiplex_setup() { + # Create temporary fifos + # Run 'demux.pl sep col fifo1 fifo2 fifo3 ... fifoN' in the background + # This will spread the input to fifos + # Generate commands that reads from fifo1..N: + # cat fifo | user_command + # Changes: + # @Global::cat_prepends + my @fifos; + # TODO $opt::jobs should be evaluated (100%) + for(1..$opt::jobs) { + push @fifos, tmpfifo(); + } + my $script = ::spacefree(1,q{ + BEGIN { + use B; + # Which columns to demultiplex on (count from 1) + $col = (shift) - 1; + $bins = @ARGV; + # Open fifos for writing, fh{0..$bins} + my $t = 0; + for(@ARGV) { + open $fh{$t++}, ">", $_; + # open blocks until it is opened by reader + # so unlink only happens when it is ready + unlink $_; + } + # the -n wrapper should read from STDIN - not files + @ARGV = (); + }; + # Wrapped in while(<>) due to -n + $fh = $fh{ hex(B::hash($F[$col]))%$bins }; + print $fh $_; + END { + # Close all open fifos + close values %fh; + } + }); + # cat foo | demuxer sep col fifo1 fifo2 fifo3 ... fifoN + ::debug("demux",'perl', '-F', $opt::colsep, + '-ane', $script, $opt::demultiplex, @fifos); + if(not fork()){ + # Let the demuxer inherit our stdin + # and redirect stdout to null + open STDOUT, ">","/dev/null"; + exec 'perl', "-F".($opt::colsep || ","), + '-ane', $script, $opt::demultiplex, @fifos; + } + # For each fifo + # (rm fifo1; grep 1) < fifo1 + # (rm fifo2; grep 2) < fifo2 + # (rm fifo3; grep 3) < fifo3 + # Remove the tmpfifo as soon as it is open + @Global::cat_prepends = map { "(true $_;" } @fifos; + @Global::cat_appends = map { ") < $_" } @fifos; +} + sub pipe_part_files(@) { # Given the bigfile # find header and split positions @@ -1008,6 +1065,7 @@ sub options_hash() { "fifo" => \$opt::fifo, "pipepart|pipe-part" => \$opt::pipepart, "tee" => \$opt::tee, + "demultiplex|demux=s" => \$opt::demultiplex, "hgrp|hostgrp|hostgroup|hostgroups" => \$opt::hostgroups, "embed" => \$opt::embed, ); @@ -7964,6 +8022,9 @@ sub wrapped($) { # # --pipe --tee: wrap: # (rm fifo; ... ) < fifo + # + # --pipe --demux X: + # (rm fifo; ... ) < fifo $command = (shift @Global::cat_prepends). "($command)". (shift @Global::cat_appends); } elsif($opt::pipe) { @@ -8735,7 +8796,7 @@ sub start($) { ::set_fh_non_blocking($stdin_fh); } $job->set_fh(0,"w",$stdin_fh); - if($opt::tee) { $job->set_virgin(0); } + if($opt::tee or $opt::demultiplex) { $job->set_virgin(0); } } elsif ($opt::tty and -c "/dev/tty" and open(my $devtty_fh, "<", "/dev/tty")) { # Give /dev/tty to the command if no one else is using it @@ -12144,6 +12205,8 @@ sub main() { pipepart_setup(); } elsif($opt::pipe and $opt::tee) { pipe_tee_setup(); + } elsif($opt::pipe and $opt::demultiplex) { + pipe_demultiplex_setup(); } if($opt::eta or $opt::bar or $opt::shuf or $Global::halt_pct) { @@ -12163,7 +12226,7 @@ sub main() { } $SIG{TERM} = \&start_no_new_jobs; - if($opt::tee) { + if($opt::tee or $opt::demultiplex) { # All jobs must be running in parallel for --tee while(start_more_jobs()) {} $Global::start_no_new_jobs = 1; diff --git a/src/parallel.pod b/src/parallel.pod index b3c8330c..39a03fec 100644 --- a/src/parallel.pod +++ b/src/parallel.pod @@ -620,6 +620,7 @@ Even quoted newlines are parsed correctly: When used with B<--pipe> only pass full CSV-records. + =item B<--delimiter> I =item B<-d> I @@ -632,6 +633,22 @@ as \n, or octal or hexadecimal escape codes. Octal and hexadecimal escape codes are understood as for the printf command. Multibyte characters are not supported. + +=item B<--demux> I (alpha testing) + +=item B<--demultiplex> I (alpha testing) + +Demultiplex input on column number I. Input is split using +B<--colsep>. The value in the column is hashed so that all lines of a +given value is given to the same job slot. + +This is a bit similar to B<--roundrobin> in that all data is given to +a limited number of jobs, but opposite B<--roundrobin> data is given +to a specific job slot based on the value in a column. + +The performance is in the order of 1M rows per second. + + =item B<--dirnamereplace> I =item B<--dnr> I diff --git a/testsuite/tests-to-run/parallel-local-0.3s.sh b/testsuite/tests-to-run/parallel-local-0.3s.sh index 1145cf06..a7b2344a 100644 --- a/testsuite/tests-to-run/parallel-local-0.3s.sh +++ b/testsuite/tests-to-run/parallel-local-0.3s.sh @@ -901,6 +901,11 @@ par_wd_dotdotdot() { parallel --wd ... 'echo $OLDPWD' ::: foo } +par_demux() { + echo '### --demux' + seq 100000 | parallel --pipe --demux 1 -j5 'echo {#}; cat' | wc +} + export -f $(compgen -A function | grep par_) compgen -A function | grep par_ | LC_ALL=C sort | parallel --timeout 20 -j6 --tag -k --joblog +/tmp/jl-`basename $0` '{} 2>&1' diff --git a/testsuite/wanted-results/parallel-local-0.3s b/testsuite/wanted-results/parallel-local-0.3s index a80589c6..e4cd1eb5 100644 --- a/testsuite/wanted-results/parallel-local-0.3s +++ b/testsuite/wanted-results/parallel-local-0.3s @@ -1299,6 +1299,8 @@ par_csv_pipe 11000" par_csv_pipe More records in single block par_csv_pipe 9000" par_csv_pipe 11000" +par_demux ### --demux +par_demux 100005 100005 588905 par_dryrun_append_joblog --dry-run should not append to joblog par_dryrun_append_joblog 1 par_dryrun_append_joblog 2