diff --git a/doc/haikus b/doc/haikus index 6fef5a2d..067c3ddf 100644 --- a/doc/haikus +++ b/doc/haikus @@ -1,5 +1,14 @@ Quote of the month: + + +With GNU Parallel you sure can! +I like getting things done + +--Kyle Lady @kylelady@twitter + + + Y'all need some GNU parallel in your lives -- ChaKu @ChaiLovesChai@twitter diff --git a/src/parallel b/src/parallel index 3ce1f620..206d1e3b 100755 --- a/src/parallel +++ b/src/parallel @@ -196,9 +196,215 @@ sub pipe_tee_setup() { @Global::cat_appends = map { ") < $_" } @fifos; } -sub pipe_demultiplex_setup() { + +sub parcat_script() { + my $script = q'{ + use Symbol qw(gensym); + use IPC::Open3; + use POSIX qw(:errno_h); + use IO::Select; + use strict; + use threads; + use threads::shared; + use Thread::Queue; + use Fcntl qw(:DEFAULT :flock); + + my $opened :shared; + my $q = Thread::Queue->new(); + my $okq = Thread::Queue->new(); + my @producers; + + if(not @ARGV) { + if(-t *STDIN) { + print "Usage:\n"; + print " parcat file(s)\n"; + print " cat argfile | parcat\n"; + } else { + # Read arguments from stdin + chomp(@ARGV = ); + } + } + my $files_to_open = 0; + # Default: fd = stdout + my $fd = 1; + for (@ARGV) { + # --rm = remove file when opened + /^--rm$/ and do { $opt::rm = 1; next; }; + # -1 = output to fd 1, -2 = output to fd 2 + /^-(\d+)$/ and do { $fd = $1; next; }; + push @producers, threads->create("producer", $_, $fd); + $files_to_open++; + } + + sub producer { + # Open a file/fifo, set non blocking, enqueue fileno of the file handle + my $file = shift; + my $output_fd = shift; + open(my $fh, "<", $file) || do { + print STDERR "parcat: Cannot open $file\n"; + exit(1); + }; + # Remove file when it has been opened + if($opt::rm) { + unlink $file; + } + set_fh_non_blocking($fh); + $opened++; + # Pass the fileno to parent + $q->enqueue(fileno($fh),$output_fd); + # Get an OK that the $fh is opened and we can release the $fh + while(1) { + my $ok = $okq->dequeue(); + if($ok == fileno($fh)) { last; } + # Not ours - very unlikely to happen + $okq->enqueue($ok); + } + return; + } + + my $s = IO::Select->new(); + my %buffer; + + sub add_file { + my $infd = shift; + my $outfd = shift; + open(my $infh, "<&=", $infd) || die; + open(my $outfh, ">&=", $outfd) || die; + $s->add($infh); + # Tell the producer now opened here and can be released + $okq->enqueue($infd); + # Initialize the buffer + @{$buffer{$infh}{$outfd}} = (); + $Global::fh{$outfd} = $outfh; + } + + sub add_files { + # Non-blocking dequeue + my ($infd,$outfd); + do { + ($infd,$outfd) = $q->dequeue_nb(2); + if(defined($outfd)) { + add_file($infd,$outfd); + } + } while(defined($outfd)); + } + + sub add_files_block { + # Blocking dequeue + my ($infd,$outfd) = $q->dequeue(2); + add_file($infd,$outfd); + } + + + my $fd; + my (@ready,$infh,$rv,$buf); + do { + # Wait until at least one file is opened + add_files_block(); + while($q->pending or keys %buffer) { + add_files(); + while(keys %buffer) { + @ready = $s->can_read(0.01); + if(not @ready) { + add_files(); + } + for $infh (@ready) { + # There is only one key, namely the output file descriptor + for my $outfd (keys %{$buffer{$infh}}) { + $rv = sysread($infh, $buf, 65536); + if (!$rv) { + if($! == EAGAIN) { + # Would block: Nothing read + next; + } else { + # Nothing read, but would not block: + # This file is done + $s->remove($infh); + for(@{$buffer{$infh}{$outfd}}) { + syswrite($Global::fh{$outfd},$_); + } + delete $buffer{$infh}; + # Closing the $infh causes it to block + # close $infh; + add_files(); + next; + } + } + # Something read. + # Find \n or \r for full line + my $i = (rindex($buf,"\n")+1); + if($i) { + # Print full line + for(@{$buffer{$infh}{$outfd}}, substr($buf,0,$i)) { + syswrite($Global::fh{$outfd},$_); + } + # @buffer = remaining half line + $buffer{$infh}{$outfd} = [substr($buf,$i,$rv-$i)]; + } else { + # Something read, but not a full line + push @{$buffer{$infh}{$outfd}}, $buf; + } + redo; + } + } + } + } + } while($opened < $files_to_open); + + for (@producers) { + $_->join(); + } + + sub set_fh_non_blocking { + # Set filehandle as non-blocking + # Inputs: + # $fh = filehandle to be blocking + # Returns: + # N/A + my $fh = shift; + my $flags; + fcntl($fh, &F_GETFL, $flags) || die $!; # Get the current flags on the filehandle + $flags |= &O_NONBLOCK; # Add non-blocking to the flags + fcntl($fh, &F_SETFL, $flags) || die $!; # Set the flags on the filehandle + } + }'; + return ::spacefree(1, $script); +} + +sub sharder_script() { + my $script = q{ + use B; + # Column separator + my $sep = shift; + # Which columns to shard on (count from 1) + my $col = shift; + # Which columns to shard on (count from 0) + my $col0 = $col - 1; + my $bins = @ARGV; + # Open fifos for writing, fh{0..$bins} + my $t = 0; + my %fh; + for(@ARGV) { + open $fh{$t++}, ">", $_; + # open blocks until it is opened by reader + # so unlink only happens when it is ready + unlink $_; + } + while() { + # Split into $col columns (no need to split into more) + @F = split $sep, $_, $col+1; + $fh = $fh{ hex(B::hash($F[$col0]))%$bins }; + print $fh $_; + } + # Close all open fifos + close values %fh; + }; + return ::spacefree(1, $script); +} + +sub pipe_shard_setup() { # Create temporary fifos - # Run 'demux.pl sep col fifo1 fifo2 fifo3 ... fifoN' in the background + # Run 'shard.pl sep col fifo1 fifo2 fifo3 ... fifoN' in the background # This will spread the input to fifos # Generate commands that reads from fifo1..N: # cat fifo | user_command @@ -206,51 +412,37 @@ sub pipe_demultiplex_setup() { # @Global::cat_prepends my @fifos; # TODO $opt::jobs should be evaluated (100%) - for(1..$opt::jobs) { - push @fifos, tmpfifo(); + for(0..$opt::jobs-1) { +# push @fifos, tmpfifo(); + push @{$fifos[$_]}, tmpfifo(); } - my $script = ::spacefree(1,q{ - BEGIN { - use B; - # Which columns to demultiplex on (count from 1) - $col = (shift) - 1; - $bins = @ARGV; - # Open fifos for writing, fh{0..$bins} - my $t = 0; - for(@ARGV) { - open $fh{$t++}, ">", $_; - # open blocks until it is opened by reader - # so unlink only happens when it is ready - unlink $_; - } - # the -n wrapper should read from STDIN - not files - @ARGV = (); - }; - # Wrapped in while(<>) due to -n - $fh = $fh{ hex(B::hash($F[$col]))%$bins }; - print $fh $_; - END { - # Close all open fifos - close values %fh; - } - }); - # cat foo | demuxer sep col fifo1 fifo2 fifo3 ... fifoN - ::debug("demux",'perl', '-F', $opt::colsep, - '-ane', $script, $opt::demultiplex, @fifos); + my $script = sharder_script(); + # cat foo | sharder sep col fifo1 fifo2 fifo3 ... fifoN + my @fif = map { $_->[0] } @fifos; +# ::debug("shard",'perl', '-e', $script, ($opt::colsep || ","), $opt::shard, @fif); if(not fork()){ - # Let the demuxer inherit our stdin + # Let the sharder inherit our stdin # and redirect stdout to null open STDOUT, ">","/dev/null"; - exec 'perl', "-F".($opt::colsep || ","), - '-ane', $script, $opt::demultiplex, @fifos; +# exec 'perl', '-e', $script, ($opt::colsep || ","), $opt::shard, @fifos; + @fif = map { $_->[0] } @fifos; + exec 'perl', '-e', $script, ($opt::colsep || ","), $opt::shard, @fif; } + +# parallel --round-robin $script $opt ::: @fifos[0] ::: @fifos[1] + # For each fifo # (rm fifo1; grep 1) < fifo1 # (rm fifo2; grep 2) < fifo2 # (rm fifo3; grep 3) < fifo3 - # Remove the tmpfifo as soon as it is open - @Global::cat_prepends = map { "(true $_;" } @fifos; - @Global::cat_appends = map { ") < $_" } @fifos; + my $parcat = Q(parcat_script()); + if(not $parcat) { + ::error("'parcat' must be in path."); + ::wait_and_exit(255); + } +# @Global::cat_prepends = map { "perl -e $parcat $_ | " } @fifos; + @Global::cat_prepends = map { "perl -e $parcat @$_ | " } @fifos; +# warn map { "$_\n" } @Global::cat_prepends; } sub pipe_part_files(@) { @@ -343,7 +535,7 @@ sub find_split_positions($$$) { while(read($fh,substr($buf,length $buf,0),$dd_block_size)) { if($opt::regexp) { # If match /$recend$recstart/ => Record position - if($buf =~ /^(.*$recend)$recstart/os) { + if($buf =~ m:^(.*$recend)$recstart:os) { # Start looking for next record _after_ this match $pos += length($1); push(@pos,$pos); @@ -1065,7 +1257,7 @@ sub options_hash() { "fifo" => \$opt::fifo, "pipepart|pipe-part" => \$opt::pipepart, "tee" => \$opt::tee, - "demultiplex|demux=s" => \$opt::demultiplex, + "shard=s" => \$opt::shard, "hgrp|hostgrp|hostgroup|hostgroups" => \$opt::hostgroups, "embed" => \$opt::embed, ); @@ -4892,7 +5084,7 @@ sub which(@) { push(@which, grep { not -d $_ and -x $_ } $prg); } } - return @which; + return wantarray ? @which : $which[0]; } { @@ -8023,7 +8215,7 @@ sub wrapped($) { # --pipe --tee: wrap: # (rm fifo; ... ) < fifo # - # --pipe --demux X: + # --pipe --shard X: # (rm fifo; ... ) < fifo $command = (shift @Global::cat_prepends). "($command)". (shift @Global::cat_appends); @@ -8796,7 +8988,7 @@ sub start($) { ::set_fh_non_blocking($stdin_fh); } $job->set_fh(0,"w",$stdin_fh); - if($opt::tee or $opt::demultiplex) { $job->set_virgin(0); } + if($opt::tee or $opt::shard) { $job->set_virgin(0); } } elsif ($opt::tty and -c "/dev/tty" and open(my $devtty_fh, "<", "/dev/tty")) { # Give /dev/tty to the command if no one else is using it @@ -12205,8 +12397,8 @@ sub main() { pipepart_setup(); } elsif($opt::pipe and $opt::tee) { pipe_tee_setup(); - } elsif($opt::pipe and $opt::demultiplex) { - pipe_demultiplex_setup(); + } elsif($opt::pipe and $opt::shard) { + pipe_shard_setup(); } if($opt::eta or $opt::bar or $opt::shuf or $Global::halt_pct) { @@ -12226,8 +12418,8 @@ sub main() { } $SIG{TERM} = \&start_no_new_jobs; - if($opt::tee or $opt::demultiplex) { - # All jobs must be running in parallel for --tee + if($opt::tee or $opt::shard) { + # All jobs must be running in parallel for --tee/--shard while(start_more_jobs()) {} $Global::start_no_new_jobs = 1; if(not $Global::JobQueue->empty()) { diff --git a/src/parallel.pod b/src/parallel.pod index 39a03fec..199ca313 100644 --- a/src/parallel.pod +++ b/src/parallel.pod @@ -61,15 +61,16 @@ or download it at: https://doi.org/10.5281/zenodo.1146014 Otherwise start by watching the intro videos for a quick introduction: http://www.youtube.com/playlist?list=PL284C9FF2488BC6D1 -Then look at the Bs after the list of B (Use -B). That will give you an idea of what -GNU B is capable of. +Then browse through the Bs after the list of B in +B (Use B). That will give +you an idea of what GNU B is capable of. -Then spend an hour walking through the tutorial (B). Your command line will love you for it. +If you want to dive even deeper: spend a couple of hours walking +through the tutorial (B). Your command line +will love you for it. -Finally you may want to look at the rest of this manual if you have -special needs not already covered. +Finally you may want to look at the rest of the manual (B) if you have special needs not already covered. If you want to know the design decisions behind GNU B, try: B. This is also a good intro if you intend to @@ -634,21 +635,6 @@ escape codes are understood as for the printf command. Multibyte characters are not supported. -=item B<--demux> I (alpha testing) - -=item B<--demultiplex> I (alpha testing) - -Demultiplex input on column number I. Input is split using -B<--colsep>. The value in the column is hashed so that all lines of a -given value is given to the same job slot. - -This is a bit similar to B<--roundrobin> in that all data is given to -a limited number of jobs, but opposite B<--roundrobin> data is given -to a specific job slot based on the value in a column. - -The performance is in the order of 1M rows per second. - - =item B<--dirnamereplace> I =item B<--dnr> I @@ -2086,6 +2072,20 @@ Only supported in B. See also B<--env>, B<--record-env>. +=item B<--shard> I (alpha testing) + +Use column I as shard key and shard input to the jobs. + +Each input line is split using B<--colsep>. The value in the +I column is hashed so that all lines of a given value is +given to the same job slot. + +This is similar to sharding in databases. + +The performance is in the order of 100K rows per second. Faster if the +I is small (<10), slower if it is big (>100). + + =item B<--shebang> =item B<--hashbang> diff --git a/src/parallel_tutorial.pod b/src/parallel_tutorial.pod index 773b77bb..eaa46e4a 100644 --- a/src/parallel_tutorial.pod +++ b/src/parallel_tutorial.pod @@ -9,15 +9,28 @@ real world. =head2 Reader's guide -Start by watching the intro videos for a quick introduction: +If you prefer reading a book buy B at +http://www.lulu.com/shop/ole-tange/gnu-parallel-2018/paperback/product-23558902.html +or download it at: https://doi.org/10.5281/zenodo.1146014 + +Otherwise start by watching the intro videos for a quick introduction: http://www.youtube.com/playlist?list=PL284C9FF2488BC6D1 -Then look at the Bs after the list of B in B (Use B). That will give you -an idea of what GNU B is capable of. +Then browse through the Bs after the list of B in +B (Use B). That will give +you an idea of what GNU B is capable of. + +If you want to dive even deeper: spend a couple of hours walking +through the tutorial (B). Your command line +will love you for it. + +Finally you may want to look at the rest of the manual (B) if you have special needs not already covered. + +If you want to know the design decisions behind GNU B, try: +B. This is also a good intro if you intend to +change GNU B. -Then spend a couple of hours walking through this tutorial (B). Your command line will love you for it. =head1 Prerequisites diff --git a/testsuite/tests-to-run/parallel-local-0.3s.sh b/testsuite/tests-to-run/parallel-local-0.3s.sh index a7b2344a..fc43a76c 100644 --- a/testsuite/tests-to-run/parallel-local-0.3s.sh +++ b/testsuite/tests-to-run/parallel-local-0.3s.sh @@ -901,9 +901,28 @@ par_wd_dotdotdot() { parallel --wd ... 'echo $OLDPWD' ::: foo } -par_demux() { - echo '### --demux' - seq 100000 | parallel --pipe --demux 1 -j5 'echo {#}; cat' | wc +par_shard() { + echo '### --shard' + # Each of the 5 lines should match: + # ##### ##### ###### + seq 100000 | parallel --pipe --shard 1 -j5 wc | + perl -pe 's/(.*\d{5,}){3}/OK/' + # Data should be sharded to all processes + shard_on_col() { + col=$1 + seq 10 99 | shuf | perl -pe 's/(.)/$1\t/g' | + parallel --pipe --shard $col -j2 --colsep "\t" sort -k$col | + field $col | uniq -c | sort + } + shard_on_col 1 + shard_on_col 2 + echo '*** broken' + # Shorthand for --pipe -j+0 + seq 100000 | parallel --shard 1 wc | + perl -pe 's/(.*\d{5,}){3}/OK/' + # Combine with arguments + seq 100000 | parallel --shard 1 echo {}\;wc ::: {1..5} ::: a b | + perl -pe 's/(.*\d{5,}){3}/OK/' } export -f $(compgen -A function | grep par_) diff --git a/testsuite/wanted-results/parallel-local-0.3s b/testsuite/wanted-results/parallel-local-0.3s index e4cd1eb5..47ef2936 100644 --- a/testsuite/wanted-results/parallel-local-0.3s +++ b/testsuite/wanted-results/parallel-local-0.3s @@ -1299,8 +1299,6 @@ par_csv_pipe 11000" par_csv_pipe More records in single block par_csv_pipe 9000" par_csv_pipe 11000" -par_demux ### --demux -par_demux 100005 100005 588905 par_dryrun_append_joblog --dry-run should not append to joblog par_dryrun_append_joblog 1 par_dryrun_append_joblog 2 @@ -1519,6 +1517,33 @@ par_retries_replacement_string 33 par_sem_quote ### sem --quote should not add empty argument par_sem_quote echo par_sem_quote +par_shard ### --shard +par_shard OK +par_shard OK +par_shard OK +par_shard OK +par_shard OK +par_shard 10 1 +par_shard 10 2 +par_shard 10 3 +par_shard 10 4 +par_shard 10 5 +par_shard 10 6 +par_shard 10 7 +par_shard 10 8 +par_shard 10 9 +par_shard 9 0 +par_shard 9 1 +par_shard 9 2 +par_shard 9 3 +par_shard 9 4 +par_shard 9 5 +par_shard 9 6 +par_shard 9 7 +par_shard 9 8 +par_shard 9 9 +par_shard parallel: Error: --tee requres --jobs to be higher. Try --jobs 0. +par_shard parallel: Error: --tee requres --jobs to be higher. Try --jobs 0. par_slow_pipe_regexp ### bug #53718: --pipe --regexp -N blocks par_slow_pipe_regexp This should take a few ms, but took more than 2 hours par_slow_pipe_regexp 980 981 5881