parallel: --shard parallelized. Can do 1M records on 4 core with hyperthread.

This commit is contained in:
Ole Tange 2019-02-20 00:48:09 +01:00
parent 79666b4906
commit cc528fbbfb

View file

@ -198,6 +198,7 @@ sub pipe_tee_setup() {
sub parcat_script() { sub parcat_script() {
# TODO if script fails: Use parallel -j0 --plain --lb cat ::: fifos
my $script = q'{ my $script = q'{
use Symbol qw(gensym); use Symbol qw(gensym);
use IPC::Open3; use IPC::Open3;
@ -410,27 +411,32 @@ sub pipe_shard_setup() {
# cat fifo | user_command # cat fifo | user_command
# Changes: # Changes:
# @Global::cat_prepends # @Global::cat_prepends
my @fifos; my @shardfifos;
my @parcatfifos;
# TODO $opt::jobs should be evaluated (100%) # TODO $opt::jobs should be evaluated (100%)
for(0..$opt::jobs-1) { # TODO $opt::jobs should be number of total_jobs if there are argugemts
# push @fifos, tmpfifo(); my $njobs = $opt::jobs;
push @{$fifos[$_]}, tmpfifo(); for my $m (0..$njobs-1) {
for my $n (0..$njobs-1) {
# sharding to A B C D
# parcatting all As together
$parcatfifos[$n][$m] = $shardfifos[$m][$n] = tmpfifo();
}
} }
my $script = sharder_script(); my $script = sharder_script();
# cat foo | sharder sep col fifo1 fifo2 fifo3 ... fifoN # cat foo | sharder sep col fifo1 fifo2 fifo3 ... fifoN
my @fif = map { $_->[0] } @fifos;
# ::debug("shard",'perl', '-e', $script, ($opt::colsep || ","), $opt::shard, @fif);
if(not fork()) { if(not fork()) {
# Let the sharder inherit our stdin # Let the sharder inherit our stdin
# and redirect stdout to null # and redirect stdout to null
open STDOUT, ">","/dev/null"; open STDOUT, ">","/dev/null";
# exec 'perl', '-e', $script, ($opt::colsep || ","), $opt::shard, @fifos; # The PERL_HASH_SEED must be the same for all sharders
@fif = map { $_->[0] } @fifos; # so B::hash will return the same value for any given input
exec 'perl', '-e', $script, ($opt::colsep || ","), $opt::shard, @fif; $ENV{PERL_HASH_SEED} = $$;
exec qw(parallel --block 100k -q --pipe -j), $njobs,
qw(--roundrobin -u perl -e), $script, ($opt::colsep || ","),
$opt::shard, '{}', (map { (':::+', @{$_}) } @parcatfifos);
} }
# parallel --round-robin $script $opt ::: @fifos[0] ::: @fifos[1]
# For each fifo # For each fifo
# (rm fifo1; grep 1) < fifo1 # (rm fifo1; grep 1) < fifo1
# (rm fifo2; grep 2) < fifo2 # (rm fifo2; grep 2) < fifo2
@ -440,9 +446,7 @@ sub pipe_shard_setup() {
::error("'parcat' must be in path."); ::error("'parcat' must be in path.");
::wait_and_exit(255); ::wait_and_exit(255);
} }
# @Global::cat_prepends = map { "perl -e $parcat $_ | " } @fifos; @Global::cat_prepends = map { "perl -e $parcat @$_ | " } @parcatfifos;
@Global::cat_prepends = map { "perl -e $parcat @$_ | " } @fifos;
# warn map { "$_\n" } @Global::cat_prepends;
} }
sub pipe_part_files(@) { sub pipe_part_files(@) {
@ -8219,7 +8223,7 @@ sub wrapped($) {
# (rm fifo; ... ) < fifo # (rm fifo; ... ) < fifo
$command = (shift @Global::cat_prepends). "($command)". $command = (shift @Global::cat_prepends). "($command)".
(shift @Global::cat_appends); (shift @Global::cat_appends);
} elsif($opt::pipe) { } elsif($opt::pipe and not $opt::roundrobin) {
# Wrap with EOF-detector to avoid starting $command if EOF. # Wrap with EOF-detector to avoid starting $command if EOF.
$command = empty_input_wrapper($command); $command = empty_input_wrapper($command);
} }