parallel: Include parcat code in sharding to avoid depending on parcat.

This commit is contained in:
Ole Tange 2019-02-16 17:23:08 +01:00
parent 904cda6e19
commit 79666b4906
6 changed files with 338 additions and 80 deletions

View file

@ -1,5 +1,14 @@
Quote of the month:
With GNU Parallel you sure can!
I like getting things done
--Kyle Lady @kylelady@twitter
Y'all need some GNU parallel in your lives
-- ChaKu @ChaiLovesChai@twitter

View file

@ -196,9 +196,215 @@ sub pipe_tee_setup() {
@Global::cat_appends = map { ") < $_" } @fifos;
}
sub pipe_demultiplex_setup() {
sub parcat_script() {
my $script = q'{
use Symbol qw(gensym);
use IPC::Open3;
use POSIX qw(:errno_h);
use IO::Select;
use strict;
use threads;
use threads::shared;
use Thread::Queue;
use Fcntl qw(:DEFAULT :flock);
my $opened :shared;
my $q = Thread::Queue->new();
my $okq = Thread::Queue->new();
my @producers;
if(not @ARGV) {
if(-t *STDIN) {
print "Usage:\n";
print " parcat file(s)\n";
print " cat argfile | parcat\n";
} else {
# Read arguments from stdin
chomp(@ARGV = <STDIN>);
}
}
my $files_to_open = 0;
# Default: fd = stdout
my $fd = 1;
for (@ARGV) {
# --rm = remove file when opened
/^--rm$/ and do { $opt::rm = 1; next; };
# -1 = output to fd 1, -2 = output to fd 2
/^-(\d+)$/ and do { $fd = $1; next; };
push @producers, threads->create("producer", $_, $fd);
$files_to_open++;
}
sub producer {
# Open a file/fifo, set non blocking, enqueue fileno of the file handle
my $file = shift;
my $output_fd = shift;
open(my $fh, "<", $file) || do {
print STDERR "parcat: Cannot open $file\n";
exit(1);
};
# Remove file when it has been opened
if($opt::rm) {
unlink $file;
}
set_fh_non_blocking($fh);
$opened++;
# Pass the fileno to parent
$q->enqueue(fileno($fh),$output_fd);
# Get an OK that the $fh is opened and we can release the $fh
while(1) {
my $ok = $okq->dequeue();
if($ok == fileno($fh)) { last; }
# Not ours - very unlikely to happen
$okq->enqueue($ok);
}
return;
}
my $s = IO::Select->new();
my %buffer;
sub add_file {
my $infd = shift;
my $outfd = shift;
open(my $infh, "<&=", $infd) || die;
open(my $outfh, ">&=", $outfd) || die;
$s->add($infh);
# Tell the producer now opened here and can be released
$okq->enqueue($infd);
# Initialize the buffer
@{$buffer{$infh}{$outfd}} = ();
$Global::fh{$outfd} = $outfh;
}
sub add_files {
# Non-blocking dequeue
my ($infd,$outfd);
do {
($infd,$outfd) = $q->dequeue_nb(2);
if(defined($outfd)) {
add_file($infd,$outfd);
}
} while(defined($outfd));
}
sub add_files_block {
# Blocking dequeue
my ($infd,$outfd) = $q->dequeue(2);
add_file($infd,$outfd);
}
my $fd;
my (@ready,$infh,$rv,$buf);
do {
# Wait until at least one file is opened
add_files_block();
while($q->pending or keys %buffer) {
add_files();
while(keys %buffer) {
@ready = $s->can_read(0.01);
if(not @ready) {
add_files();
}
for $infh (@ready) {
# There is only one key, namely the output file descriptor
for my $outfd (keys %{$buffer{$infh}}) {
$rv = sysread($infh, $buf, 65536);
if (!$rv) {
if($! == EAGAIN) {
# Would block: Nothing read
next;
} else {
# Nothing read, but would not block:
# This file is done
$s->remove($infh);
for(@{$buffer{$infh}{$outfd}}) {
syswrite($Global::fh{$outfd},$_);
}
delete $buffer{$infh};
# Closing the $infh causes it to block
# close $infh;
add_files();
next;
}
}
# Something read.
# Find \n or \r for full line
my $i = (rindex($buf,"\n")+1);
if($i) {
# Print full line
for(@{$buffer{$infh}{$outfd}}, substr($buf,0,$i)) {
syswrite($Global::fh{$outfd},$_);
}
# @buffer = remaining half line
$buffer{$infh}{$outfd} = [substr($buf,$i,$rv-$i)];
} else {
# Something read, but not a full line
push @{$buffer{$infh}{$outfd}}, $buf;
}
redo;
}
}
}
}
} while($opened < $files_to_open);
for (@producers) {
$_->join();
}
sub set_fh_non_blocking {
# Set filehandle as non-blocking
# Inputs:
# $fh = filehandle to be blocking
# Returns:
# N/A
my $fh = shift;
my $flags;
fcntl($fh, &F_GETFL, $flags) || die $!; # Get the current flags on the filehandle
$flags |= &O_NONBLOCK; # Add non-blocking to the flags
fcntl($fh, &F_SETFL, $flags) || die $!; # Set the flags on the filehandle
}
}';
return ::spacefree(1, $script);
}
sub sharder_script() {
my $script = q{
use B;
# Column separator
my $sep = shift;
# Which columns to shard on (count from 1)
my $col = shift;
# Which columns to shard on (count from 0)
my $col0 = $col - 1;
my $bins = @ARGV;
# Open fifos for writing, fh{0..$bins}
my $t = 0;
my %fh;
for(@ARGV) {
open $fh{$t++}, ">", $_;
# open blocks until it is opened by reader
# so unlink only happens when it is ready
unlink $_;
}
while(<STDIN>) {
# Split into $col columns (no need to split into more)
@F = split $sep, $_, $col+1;
$fh = $fh{ hex(B::hash($F[$col0]))%$bins };
print $fh $_;
}
# Close all open fifos
close values %fh;
};
return ::spacefree(1, $script);
}
sub pipe_shard_setup() {
# Create temporary fifos
# Run 'demux.pl sep col fifo1 fifo2 fifo3 ... fifoN' in the background
# Run 'shard.pl sep col fifo1 fifo2 fifo3 ... fifoN' in the background
# This will spread the input to fifos
# Generate commands that reads from fifo1..N:
# cat fifo | user_command
@ -206,51 +412,37 @@ sub pipe_demultiplex_setup() {
# @Global::cat_prepends
my @fifos;
# TODO $opt::jobs should be evaluated (100%)
for(1..$opt::jobs) {
push @fifos, tmpfifo();
for(0..$opt::jobs-1) {
# push @fifos, tmpfifo();
push @{$fifos[$_]}, tmpfifo();
}
my $script = ::spacefree(1,q{
BEGIN {
use B;
# Which columns to demultiplex on (count from 1)
$col = (shift) - 1;
$bins = @ARGV;
# Open fifos for writing, fh{0..$bins}
my $t = 0;
for(@ARGV) {
open $fh{$t++}, ">", $_;
# open blocks until it is opened by reader
# so unlink only happens when it is ready
unlink $_;
}
# the -n wrapper should read from STDIN - not files
@ARGV = ();
};
# Wrapped in while(<>) due to -n
$fh = $fh{ hex(B::hash($F[$col]))%$bins };
print $fh $_;
END {
# Close all open fifos
close values %fh;
}
});
# cat foo | demuxer sep col fifo1 fifo2 fifo3 ... fifoN
::debug("demux",'perl', '-F', $opt::colsep,
'-ane', $script, $opt::demultiplex, @fifos);
my $script = sharder_script();
# cat foo | sharder sep col fifo1 fifo2 fifo3 ... fifoN
my @fif = map { $_->[0] } @fifos;
# ::debug("shard",'perl', '-e', $script, ($opt::colsep || ","), $opt::shard, @fif);
if(not fork()){
# Let the demuxer inherit our stdin
# Let the sharder inherit our stdin
# and redirect stdout to null
open STDOUT, ">","/dev/null";
exec 'perl', "-F".($opt::colsep || ","),
'-ane', $script, $opt::demultiplex, @fifos;
# exec 'perl', '-e', $script, ($opt::colsep || ","), $opt::shard, @fifos;
@fif = map { $_->[0] } @fifos;
exec 'perl', '-e', $script, ($opt::colsep || ","), $opt::shard, @fif;
}
# parallel --round-robin $script $opt ::: @fifos[0] ::: @fifos[1]
# For each fifo
# (rm fifo1; grep 1) < fifo1
# (rm fifo2; grep 2) < fifo2
# (rm fifo3; grep 3) < fifo3
# Remove the tmpfifo as soon as it is open
@Global::cat_prepends = map { "(true $_;" } @fifos;
@Global::cat_appends = map { ") < $_" } @fifos;
my $parcat = Q(parcat_script());
if(not $parcat) {
::error("'parcat' must be in path.");
::wait_and_exit(255);
}
# @Global::cat_prepends = map { "perl -e $parcat $_ | " } @fifos;
@Global::cat_prepends = map { "perl -e $parcat @$_ | " } @fifos;
# warn map { "$_\n" } @Global::cat_prepends;
}
sub pipe_part_files(@) {
@ -343,7 +535,7 @@ sub find_split_positions($$$) {
while(read($fh,substr($buf,length $buf,0),$dd_block_size)) {
if($opt::regexp) {
# If match /$recend$recstart/ => Record position
if($buf =~ /^(.*$recend)$recstart/os) {
if($buf =~ m:^(.*$recend)$recstart:os) {
# Start looking for next record _after_ this match
$pos += length($1);
push(@pos,$pos);
@ -1065,7 +1257,7 @@ sub options_hash() {
"fifo" => \$opt::fifo,
"pipepart|pipe-part" => \$opt::pipepart,
"tee" => \$opt::tee,
"demultiplex|demux=s" => \$opt::demultiplex,
"shard=s" => \$opt::shard,
"hgrp|hostgrp|hostgroup|hostgroups" => \$opt::hostgroups,
"embed" => \$opt::embed,
);
@ -4892,7 +5084,7 @@ sub which(@) {
push(@which, grep { not -d $_ and -x $_ } $prg);
}
}
return @which;
return wantarray ? @which : $which[0];
}
{
@ -8023,7 +8215,7 @@ sub wrapped($) {
# --pipe --tee: wrap:
# (rm fifo; ... ) < fifo
#
# --pipe --demux X:
# --pipe --shard X:
# (rm fifo; ... ) < fifo
$command = (shift @Global::cat_prepends). "($command)".
(shift @Global::cat_appends);
@ -8796,7 +8988,7 @@ sub start($) {
::set_fh_non_blocking($stdin_fh);
}
$job->set_fh(0,"w",$stdin_fh);
if($opt::tee or $opt::demultiplex) { $job->set_virgin(0); }
if($opt::tee or $opt::shard) { $job->set_virgin(0); }
} elsif ($opt::tty and -c "/dev/tty" and
open(my $devtty_fh, "<", "/dev/tty")) {
# Give /dev/tty to the command if no one else is using it
@ -12205,8 +12397,8 @@ sub main() {
pipepart_setup();
} elsif($opt::pipe and $opt::tee) {
pipe_tee_setup();
} elsif($opt::pipe and $opt::demultiplex) {
pipe_demultiplex_setup();
} elsif($opt::pipe and $opt::shard) {
pipe_shard_setup();
}
if($opt::eta or $opt::bar or $opt::shuf or $Global::halt_pct) {
@ -12226,8 +12418,8 @@ sub main() {
}
$SIG{TERM} = \&start_no_new_jobs;
if($opt::tee or $opt::demultiplex) {
# All jobs must be running in parallel for --tee
if($opt::tee or $opt::shard) {
# All jobs must be running in parallel for --tee/--shard
while(start_more_jobs()) {}
$Global::start_no_new_jobs = 1;
if(not $Global::JobQueue->empty()) {

View file

@ -61,15 +61,16 @@ or download it at: https://doi.org/10.5281/zenodo.1146014
Otherwise start by watching the intro videos for a quick introduction:
http://www.youtube.com/playlist?list=PL284C9FF2488BC6D1
Then look at the B<EXAMPLE>s after the list of B<OPTIONS> (Use
B<LESS=+/EXAMPLE: man parallel>). That will give you an idea of what
GNU B<parallel> is capable of.
Then browse through the B<EXAMPLE>s after the list of B<OPTIONS> in
B<man parallel> (Use B<LESS=+/EXAMPLE: man parallel>). That will give
you an idea of what GNU B<parallel> is capable of.
Then spend an hour walking through the tutorial (B<man
parallel_tutorial>). Your command line will love you for it.
If you want to dive even deeper: spend a couple of hours walking
through the tutorial (B<man parallel_tutorial>). Your command line
will love you for it.
Finally you may want to look at the rest of this manual if you have
special needs not already covered.
Finally you may want to look at the rest of the manual (B<man
parallel>) if you have special needs not already covered.
If you want to know the design decisions behind GNU B<parallel>, try:
B<man parallel_design>. This is also a good intro if you intend to
@ -634,21 +635,6 @@ escape codes are understood as for the printf command. Multibyte
characters are not supported.
=item B<--demux> I<col> (alpha testing)
=item B<--demultiplex> I<col> (alpha testing)
Demultiplex input on column number I<col>. Input is split using
B<--colsep>. The value in the column is hashed so that all lines of a
given value is given to the same job slot.
This is a bit similar to B<--roundrobin> in that all data is given to
a limited number of jobs, but opposite B<--roundrobin> data is given
to a specific job slot based on the value in a column.
The performance is in the order of 1M rows per second.
=item B<--dirnamereplace> I<replace-str>
=item B<--dnr> I<replace-str>
@ -2086,6 +2072,20 @@ Only supported in B<Ash, Bash, Dash, Ksh, Sh, and Zsh>.
See also B<--env>, B<--record-env>.
=item B<--shard> I<shardkey> (alpha testing)
Use column I<shardkey> as shard key and shard input to the jobs.
Each input line is split using B<--colsep>. The value in the
I<shardkey> column is hashed so that all lines of a given value is
given to the same job slot.
This is similar to sharding in databases.
The performance is in the order of 100K rows per second. Faster if the
I<shardkey> is small (<10), slower if it is big (>100).
=item B<--shebang>
=item B<--hashbang>

View file

@ -9,15 +9,28 @@ real world.
=head2 Reader's guide
Start by watching the intro videos for a quick introduction:
If you prefer reading a book buy B<GNU Parallel 2018> at
http://www.lulu.com/shop/ole-tange/gnu-parallel-2018/paperback/product-23558902.html
or download it at: https://doi.org/10.5281/zenodo.1146014
Otherwise start by watching the intro videos for a quick introduction:
http://www.youtube.com/playlist?list=PL284C9FF2488BC6D1
Then look at the B<EXAMPLE>s after the list of B<OPTIONS> in B<man
parallel> (Use B<LESS=+/EXAMPLE: man parallel>). That will give you
an idea of what GNU B<parallel> is capable of.
Then browse through the B<EXAMPLE>s after the list of B<OPTIONS> in
B<man parallel> (Use B<LESS=+/EXAMPLE: man parallel>). That will give
you an idea of what GNU B<parallel> is capable of.
If you want to dive even deeper: spend a couple of hours walking
through the tutorial (B<man parallel_tutorial>). Your command line
will love you for it.
Finally you may want to look at the rest of the manual (B<man
parallel>) if you have special needs not already covered.
If you want to know the design decisions behind GNU B<parallel>, try:
B<man parallel_design>. This is also a good intro if you intend to
change GNU B<parallel>.
Then spend a couple of hours walking through this tutorial (B<man
parallel_tutorial>). Your command line will love you for it.
=head1 Prerequisites

View file

@ -901,9 +901,28 @@ par_wd_dotdotdot() {
parallel --wd ... 'echo $OLDPWD' ::: foo
}
par_demux() {
echo '### --demux'
seq 100000 | parallel --pipe --demux 1 -j5 'echo {#}; cat' | wc
par_shard() {
echo '### --shard'
# Each of the 5 lines should match:
# ##### ##### ######
seq 100000 | parallel --pipe --shard 1 -j5 wc |
perl -pe 's/(.*\d{5,}){3}/OK/'
# Data should be sharded to all processes
shard_on_col() {
col=$1
seq 10 99 | shuf | perl -pe 's/(.)/$1\t/g' |
parallel --pipe --shard $col -j2 --colsep "\t" sort -k$col |
field $col | uniq -c | sort
}
shard_on_col 1
shard_on_col 2
echo '*** broken'
# Shorthand for --pipe -j+0
seq 100000 | parallel --shard 1 wc |
perl -pe 's/(.*\d{5,}){3}/OK/'
# Combine with arguments
seq 100000 | parallel --shard 1 echo {}\;wc ::: {1..5} ::: a b |
perl -pe 's/(.*\d{5,}){3}/OK/'
}
export -f $(compgen -A function | grep par_)

View file

@ -1299,8 +1299,6 @@ par_csv_pipe 11000"
par_csv_pipe More records in single block
par_csv_pipe 9000"
par_csv_pipe 11000"
par_demux ### --demux
par_demux 100005 100005 588905
par_dryrun_append_joblog --dry-run should not append to joblog
par_dryrun_append_joblog 1
par_dryrun_append_joblog 2
@ -1519,6 +1517,33 @@ par_retries_replacement_string 33
par_sem_quote ### sem --quote should not add empty argument
par_sem_quote echo
par_sem_quote
par_shard ### --shard
par_shard OK
par_shard OK
par_shard OK
par_shard OK
par_shard OK
par_shard 10 1
par_shard 10 2
par_shard 10 3
par_shard 10 4
par_shard 10 5
par_shard 10 6
par_shard 10 7
par_shard 10 8
par_shard 10 9
par_shard 9 0
par_shard 9 1
par_shard 9 2
par_shard 9 3
par_shard 9 4
par_shard 9 5
par_shard 9 6
par_shard 9 7
par_shard 9 8
par_shard 9 9
par_shard parallel: Error: --tee requres --jobs to be higher. Try --jobs 0.
par_shard parallel: Error: --tee requres --jobs to be higher. Try --jobs 0.
par_slow_pipe_regexp ### bug #53718: --pipe --regexp -N blocks
par_slow_pipe_regexp This should take a few ms, but took more than 2 hours
par_slow_pipe_regexp 980 981 5881