parallel: --demux initial version.

This commit is contained in:
Ole Tange 2019-02-14 01:35:32 +01:00
parent 14581e8483
commit 904cda6e19
5 changed files with 90 additions and 3 deletions

View file

@ -207,7 +207,7 @@ from:tange@gnu.org
to:parallel@gnu.org, bug-parallel@gnu.org to:parallel@gnu.org, bug-parallel@gnu.org
stable-bcc: Jesse Alama <jessealama@fastmail.fm> stable-bcc: Jesse Alama <jessealama@fastmail.fm>
Subject: GNU Parallel 20190222 ('Al-Baghuz Fawqani') released <<[stable]>> Subject: GNU Parallel 20190222 ('INF/Al-Baghuz Fawqani') released <<[stable]>>
GNU Parallel 20190222 ('') <<[stable]>> has been released. It is available for download at: http://ftpmirror.gnu.org/parallel/ GNU Parallel 20190222 ('') <<[stable]>> has been released. It is available for download at: http://ftpmirror.gnu.org/parallel/

View file

@ -196,6 +196,63 @@ sub pipe_tee_setup() {
@Global::cat_appends = map { ") < $_" } @fifos; @Global::cat_appends = map { ") < $_" } @fifos;
} }
sub pipe_demultiplex_setup() {
# Create temporary fifos
# Run 'demux.pl sep col fifo1 fifo2 fifo3 ... fifoN' in the background
# This will spread the input to fifos
# Generate commands that reads from fifo1..N:
# cat fifo | user_command
# Changes:
# @Global::cat_prepends
my @fifos;
# TODO $opt::jobs should be evaluated (100%)
for(1..$opt::jobs) {
push @fifos, tmpfifo();
}
my $script = ::spacefree(1,q{
BEGIN {
use B;
# Which columns to demultiplex on (count from 1)
$col = (shift) - 1;
$bins = @ARGV;
# Open fifos for writing, fh{0..$bins}
my $t = 0;
for(@ARGV) {
open $fh{$t++}, ">", $_;
# open blocks until it is opened by reader
# so unlink only happens when it is ready
unlink $_;
}
# the -n wrapper should read from STDIN - not files
@ARGV = ();
};
# Wrapped in while(<>) due to -n
$fh = $fh{ hex(B::hash($F[$col]))%$bins };
print $fh $_;
END {
# Close all open fifos
close values %fh;
}
});
# cat foo | demuxer sep col fifo1 fifo2 fifo3 ... fifoN
::debug("demux",'perl', '-F', $opt::colsep,
'-ane', $script, $opt::demultiplex, @fifos);
if(not fork()){
# Let the demuxer inherit our stdin
# and redirect stdout to null
open STDOUT, ">","/dev/null";
exec 'perl', "-F".($opt::colsep || ","),
'-ane', $script, $opt::demultiplex, @fifos;
}
# For each fifo
# (rm fifo1; grep 1) < fifo1
# (rm fifo2; grep 2) < fifo2
# (rm fifo3; grep 3) < fifo3
# Remove the tmpfifo as soon as it is open
@Global::cat_prepends = map { "(true $_;" } @fifos;
@Global::cat_appends = map { ") < $_" } @fifos;
}
sub pipe_part_files(@) { sub pipe_part_files(@) {
# Given the bigfile # Given the bigfile
# find header and split positions # find header and split positions
@ -1008,6 +1065,7 @@ sub options_hash() {
"fifo" => \$opt::fifo, "fifo" => \$opt::fifo,
"pipepart|pipe-part" => \$opt::pipepart, "pipepart|pipe-part" => \$opt::pipepart,
"tee" => \$opt::tee, "tee" => \$opt::tee,
"demultiplex|demux=s" => \$opt::demultiplex,
"hgrp|hostgrp|hostgroup|hostgroups" => \$opt::hostgroups, "hgrp|hostgrp|hostgroup|hostgroups" => \$opt::hostgroups,
"embed" => \$opt::embed, "embed" => \$opt::embed,
); );
@ -7964,6 +8022,9 @@ sub wrapped($) {
# #
# --pipe --tee: wrap: # --pipe --tee: wrap:
# (rm fifo; ... ) < fifo # (rm fifo; ... ) < fifo
#
# --pipe --demux X:
# (rm fifo; ... ) < fifo
$command = (shift @Global::cat_prepends). "($command)". $command = (shift @Global::cat_prepends). "($command)".
(shift @Global::cat_appends); (shift @Global::cat_appends);
} elsif($opt::pipe) { } elsif($opt::pipe) {
@ -8735,7 +8796,7 @@ sub start($) {
::set_fh_non_blocking($stdin_fh); ::set_fh_non_blocking($stdin_fh);
} }
$job->set_fh(0,"w",$stdin_fh); $job->set_fh(0,"w",$stdin_fh);
if($opt::tee) { $job->set_virgin(0); } if($opt::tee or $opt::demultiplex) { $job->set_virgin(0); }
} elsif ($opt::tty and -c "/dev/tty" and } elsif ($opt::tty and -c "/dev/tty" and
open(my $devtty_fh, "<", "/dev/tty")) { open(my $devtty_fh, "<", "/dev/tty")) {
# Give /dev/tty to the command if no one else is using it # Give /dev/tty to the command if no one else is using it
@ -12144,6 +12205,8 @@ sub main() {
pipepart_setup(); pipepart_setup();
} elsif($opt::pipe and $opt::tee) { } elsif($opt::pipe and $opt::tee) {
pipe_tee_setup(); pipe_tee_setup();
} elsif($opt::pipe and $opt::demultiplex) {
pipe_demultiplex_setup();
} }
if($opt::eta or $opt::bar or $opt::shuf or $Global::halt_pct) { if($opt::eta or $opt::bar or $opt::shuf or $Global::halt_pct) {
@ -12163,7 +12226,7 @@ sub main() {
} }
$SIG{TERM} = \&start_no_new_jobs; $SIG{TERM} = \&start_no_new_jobs;
if($opt::tee) { if($opt::tee or $opt::demultiplex) {
# All jobs must be running in parallel for --tee # All jobs must be running in parallel for --tee
while(start_more_jobs()) {} while(start_more_jobs()) {}
$Global::start_no_new_jobs = 1; $Global::start_no_new_jobs = 1;

View file

@ -620,6 +620,7 @@ Even quoted newlines are parsed correctly:
When used with B<--pipe> only pass full CSV-records. When used with B<--pipe> only pass full CSV-records.
=item B<--delimiter> I<delim> =item B<--delimiter> I<delim>
=item B<-d> I<delim> =item B<-d> I<delim>
@ -632,6 +633,22 @@ as \n, or octal or hexadecimal escape codes. Octal and hexadecimal
escape codes are understood as for the printf command. Multibyte escape codes are understood as for the printf command. Multibyte
characters are not supported. characters are not supported.
=item B<--demux> I<col> (alpha testing)
=item B<--demultiplex> I<col> (alpha testing)
Demultiplex input on column number I<col>. Input is split using
B<--colsep>. The value in the column is hashed so that all lines of a
given value is given to the same job slot.
This is a bit similar to B<--roundrobin> in that all data is given to
a limited number of jobs, but opposite B<--roundrobin> data is given
to a specific job slot based on the value in a column.
The performance is in the order of 1M rows per second.
=item B<--dirnamereplace> I<replace-str> =item B<--dirnamereplace> I<replace-str>
=item B<--dnr> I<replace-str> =item B<--dnr> I<replace-str>

View file

@ -901,6 +901,11 @@ par_wd_dotdotdot() {
parallel --wd ... 'echo $OLDPWD' ::: foo parallel --wd ... 'echo $OLDPWD' ::: foo
} }
par_demux() {
echo '### --demux'
seq 100000 | parallel --pipe --demux 1 -j5 'echo {#}; cat' | wc
}
export -f $(compgen -A function | grep par_) export -f $(compgen -A function | grep par_)
compgen -A function | grep par_ | LC_ALL=C sort | compgen -A function | grep par_ | LC_ALL=C sort |
parallel --timeout 20 -j6 --tag -k --joblog +/tmp/jl-`basename $0` '{} 2>&1' parallel --timeout 20 -j6 --tag -k --joblog +/tmp/jl-`basename $0` '{} 2>&1'

View file

@ -1299,6 +1299,8 @@ par_csv_pipe 11000"
par_csv_pipe More records in single block par_csv_pipe More records in single block
par_csv_pipe 9000" par_csv_pipe 9000"
par_csv_pipe 11000" par_csv_pipe 11000"
par_demux ### --demux
par_demux 100005 100005 588905
par_dryrun_append_joblog --dry-run should not append to joblog par_dryrun_append_joblog --dry-run should not append to joblog
par_dryrun_append_joblog 1 par_dryrun_append_joblog 1
par_dryrun_append_joblog 2 par_dryrun_append_joblog 2