From cc528fbbfbf317ed8936b1a56f04288ebb2cf732 Mon Sep 17 00:00:00 2001 From: Ole Tange Date: Wed, 20 Feb 2019 00:48:09 +0100 Subject: [PATCH] parallel: --shard parallelized. Can do 1M records on 4 core with hyperthread. --- src/parallel | 38 +++++++++++++++++++++----------------- 1 file changed, 21 insertions(+), 17 deletions(-) diff --git a/src/parallel b/src/parallel index 206d1e3b..8c3005ff 100755 --- a/src/parallel +++ b/src/parallel @@ -198,6 +198,7 @@ sub pipe_tee_setup() { sub parcat_script() { + # TODO if script fails: Use parallel -j0 --plain --lb cat ::: fifos my $script = q'{ use Symbol qw(gensym); use IPC::Open3; @@ -410,27 +411,32 @@ sub pipe_shard_setup() { # cat fifo | user_command # Changes: # @Global::cat_prepends - my @fifos; + my @shardfifos; + my @parcatfifos; # TODO $opt::jobs should be evaluated (100%) - for(0..$opt::jobs-1) { -# push @fifos, tmpfifo(); - push @{$fifos[$_]}, tmpfifo(); + # TODO $opt::jobs should be number of total_jobs if there are argugemts + my $njobs = $opt::jobs; + for my $m (0..$njobs-1) { + for my $n (0..$njobs-1) { + # sharding to A B C D + # parcatting all As together + $parcatfifos[$n][$m] = $shardfifos[$m][$n] = tmpfifo(); + } } my $script = sharder_script(); # cat foo | sharder sep col fifo1 fifo2 fifo3 ... fifoN - my @fif = map { $_->[0] } @fifos; -# ::debug("shard",'perl', '-e', $script, ($opt::colsep || ","), $opt::shard, @fif); - if(not fork()){ + + if(not fork()) { # Let the sharder inherit our stdin # and redirect stdout to null open STDOUT, ">","/dev/null"; -# exec 'perl', '-e', $script, ($opt::colsep || ","), $opt::shard, @fifos; - @fif = map { $_->[0] } @fifos; - exec 'perl', '-e', $script, ($opt::colsep || ","), $opt::shard, @fif; + # The PERL_HASH_SEED must be the same for all sharders + # so B::hash will return the same value for any given input + $ENV{PERL_HASH_SEED} = $$; + exec qw(parallel --block 100k -q --pipe -j), $njobs, + qw(--roundrobin -u perl -e), $script, ($opt::colsep || ","), + $opt::shard, '{}', (map { (':::+', @{$_}) } @parcatfifos); } - -# parallel --round-robin $script $opt ::: @fifos[0] ::: @fifos[1] - # For each fifo # (rm fifo1; grep 1) < fifo1 # (rm fifo2; grep 2) < fifo2 @@ -440,9 +446,7 @@ sub pipe_shard_setup() { ::error("'parcat' must be in path."); ::wait_and_exit(255); } -# @Global::cat_prepends = map { "perl -e $parcat $_ | " } @fifos; - @Global::cat_prepends = map { "perl -e $parcat @$_ | " } @fifos; -# warn map { "$_\n" } @Global::cat_prepends; + @Global::cat_prepends = map { "perl -e $parcat @$_ | " } @parcatfifos; } sub pipe_part_files(@) { @@ -8219,7 +8223,7 @@ sub wrapped($) { # (rm fifo; ... ) < fifo $command = (shift @Global::cat_prepends). "($command)". (shift @Global::cat_appends); - } elsif($opt::pipe) { + } elsif($opt::pipe and not $opt::roundrobin) { # Wrap with EOF-detector to avoid starting $command if EOF. $command = empty_input_wrapper($command); }