From c028fa0ad749d2b9f1871844d39976802b6d8dde Mon Sep 17 00:00:00 2001 From: Ole Tange Date: Sun, 5 Feb 2017 02:46:29 +0100 Subject: [PATCH] Fixed bug #45479: --pipe/--pipepart --tee. --- src/parallel | 81 ++++++++++++++++--- src/parallel.pod | 21 +++-- testsuite/tests-to-run/parallel-local-0.3s.sh | 5 ++ testsuite/wanted-results/parallel-local-0.3s | 20 +++++ 4 files changed, 108 insertions(+), 19 deletions(-) diff --git a/src/parallel b/src/parallel index b7639e28..0ecb240a 100755 --- a/src/parallel +++ b/src/parallel @@ -198,14 +198,6 @@ drain_job_queue(); ::debug("init", "Done draining\n"); reaper(); ::debug("init", "Done reaping\n"); -if($opt::pipe and @opt::a) { - for my $job (@Global::tee_jobs) { - ::rm($job->fh(2,"name")); - $job->set_fh(2,"name",""); - $job->print(); - ::rm($job->fh(1,"name")); - } -} ::debug("init", "Cleaning\n"); if($Global::semaphore) { $sem->release(); @@ -376,6 +368,15 @@ sub spreadstdin { # %Global::running # Returns: N/A + if($opt::tee) { + # Spawn all jobs + # read a record + # Write record to all jobs + if(not $Global::JobQueue->empty()) { + ::error("--tee requres --jobs to be higher. Try --jobs 0."); + } + } + my $buf = ""; my ($recstart,$recend) = recstartrecend(); my $recendrecstart = $recend.$recstart; @@ -635,6 +636,50 @@ sub nindex { } } +{ + my $sleep = 1; + + sub tee_write { + # Write the block to all jobs + # + # Input: + # $header_ref = ref to $header string + # $block_ref = ref to $block to be written + # $recstart = record start string + # $recend = record end string + # $endpos = end position of $block + # Uses: + # %Global::running + # Returns: + # $written = amount of bytes written + my ($header_ref,$buffer_ref,$recstart,$recend,$endpos) = @_; + my $written = 0; + my $done = 0; + my %done; + while(not $done) { + $done = 1; + for my $job (values %Global::running) { + if(not $done{$job}) { + $done = 0; + if($job->block_length() > 0) { + # Flush old block + $written += $job->non_blocking_write(); + } else { + # Give a copy of the new block + $job->set_block($header_ref,$buffer_ref,$endpos,$recstart,$recend); + $job->set_virgin(0); + $written += $job->non_blocking_write(); + # Mark this job as done + $done{$job} = 1; + } + } + } + $sleep = ::reap_usleep($sleep); + } + return $written; + } +} + sub index64 { # Do index on strings > 2GB. # index in Perl < v5.22 does not work for > 2GB @@ -744,8 +789,13 @@ sub write_record_to_pipe { if($endpos == 0) { return 0; } if(vec($Global::job_already_run,$chunk_number,1)) { return 1; } if($opt::roundrobin) { + # Write the block to one of the already running jobs return round_robin_write($header_ref,$buffer_ref,$recstart,$recend,$endpos); } + if($opt::tee) { + # Write the block to all jobs + return tee_write($header_ref,$buffer_ref,$recstart,$recend,$endpos); + } # If no virgin found, backoff my $sleep = 0.0001; # 0.01 ms - better performance on highend while(not @Global::virgin_jobs) { @@ -1152,6 +1202,11 @@ sub parse_options { map { (0..9,"a".."z","A".."Z")[rand(62)] } (1..50); push @ARGV, $Global::arg_sep, "\0"; } + if(defined $opt::tee) { + if(not defined $opt::jobs) { + $opt::jobs = 0; + } + } if(defined $opt::tty) { # Defaults for --tty: -j1 -u # Can be overridden with -jXXX -g @@ -5377,7 +5432,7 @@ sub compute_number_of_processes { sub get_args_or_jobs { # Get an arg or a job (depending on mode) - if($Global::semaphore or $opt::pipe) { + if($Global::semaphore or ($opt::pipe and not $opt::tee)) { # Skip: No need to get args return 1; } elsif(defined $opt::retries and $count_jobs_already_read) { @@ -6617,9 +6672,7 @@ sub openoutputfiles { } elsif(not $opt::ungroup) { # To group we create temporary files for STDOUT and STDERR # To avoid the cleanup unlink the files immediately (but keep them open) - if(@Global::tee_jobs) { - # files must be removed when the tee is done - } elsif($opt::files) { + if($opt::files) { ($outfhw, $outname) = ::tmpfile(SUFFIX => ".par"); ($errfhw, $errname) = ::tmpfile(SUFFIX => ".par"); # --files => only remove stderr @@ -10223,6 +10276,10 @@ sub total_jobs { # shorthand for $job->slot(); $job->slot(); } + sub seq { + # shorthand for $job->seq(); + $job->seq(); + } sub replace { # Calculates the corresponding value for a given perl expression diff --git a/src/parallel.pod b/src/parallel.pod index f7035d32..baf3cef7 100644 --- a/src/parallel.pod +++ b/src/parallel.pod @@ -273,11 +273,11 @@ perl quote a string number of jobs in total -=item Z<> B<$job->>B +=item Z<> B slot number of job -=item Z<> B<$job->>B +=item Z<> B sequence number of job @@ -1770,13 +1770,13 @@ of GNU B's internal functions and data structures. Here are a few examples: Is the job sequence even or odd? - --rpl '{odd} $_ = $job->seq() % 2 ? "odd" : "even"' + --rpl '{odd} $_ = seq() % 2 ? "odd" : "even"' Pad job sequence with leading zeros to get equal width - --rpl '{0#} $f = "%0".int(1+log(total_jobs())/log(10))."d"; $_=sprintf($f,$job->seq())' + --rpl '{0#} $f = "%0".int(1+log(total_jobs())/log(10))."d"; $_=sprintf($f,seq())' Job sequence counting from 0 - --rpl '{#0} $_ = $job->seq() - 1' + --rpl '{#0} $_ = seq() - 1' Job slot counting from 2 - --rpl '{%1} $_ = $job->slot() + 1' + --rpl '{%1} $_ = slot() + 1' See also: B<{= perl expression =}> B<--parens> @@ -2168,6 +2168,13 @@ B<{}>. B<--tagstring> is ignored when using B<-u>, B<--onall>, and B<--nonall>. +=item B<--tee> (alpha testing) + +Pipe all data to all jobs. Used with B<--pipe> and B<:::>. + + seq 1000 | parallel --pipe --tee -v wc {} ::: -w -l -c + + =item B<--termseq> I Termination sequence. When a job is killed due to B<--timeout>, @@ -4141,7 +4148,7 @@ Killed by Ctrl-C, timeout, not enough memory or similar. =item Z<>-2 (In joblog and SQL table) -$job->skip() was called in B<{= =}>. +skip() was called in B<{= =}>. =item Z<>-1000 (In SQL table) diff --git a/testsuite/tests-to-run/parallel-local-0.3s.sh b/testsuite/tests-to-run/parallel-local-0.3s.sh index 6b29dc09..1f9b02f7 100644 --- a/testsuite/tests-to-run/parallel-local-0.3s.sh +++ b/testsuite/tests-to-run/parallel-local-0.3s.sh @@ -672,6 +672,11 @@ par_retries_replacement_string() { rm $tmp } +par_tee() { + export PARALLEL='-k --tee --pipe --tag' + seq 1000000 | parallel 'echo {%};LANG=C wc' ::: {1..5} ::: {a..b} +} + export -f $(compgen -A function | grep par_) compgen -A function | grep par_ | sort | parallel -j6 --tag -k --joblog +/tmp/jl-`basename $0` '{} 2>&1' diff --git a/testsuite/wanted-results/parallel-local-0.3s b/testsuite/wanted-results/parallel-local-0.3s index b6115bbe..0385f4f9 100644 --- a/testsuite/wanted-results/parallel-local-0.3s +++ b/testsuite/wanted-results/parallel-local-0.3s @@ -1630,3 +1630,23 @@ par_retries_replacement_string 22 par_retries_replacement_string 33 par_retries_replacement_string 33 par_retries_replacement_string 33 +par_tee 1 a 1 +par_tee 1 a 1000000 1000000 6888896 +par_tee 1 b 2 +par_tee 1 b 1000000 1000000 6888896 +par_tee 2 a 3 +par_tee 2 a 1000000 1000000 6888896 +par_tee 2 b 4 +par_tee 2 b 1000000 1000000 6888896 +par_tee 3 a 5 +par_tee 3 a 1000000 1000000 6888896 +par_tee 3 b 6 +par_tee 3 b 1000000 1000000 6888896 +par_tee 4 a 7 +par_tee 4 a 1000000 1000000 6888896 +par_tee 4 b 8 +par_tee 4 b 1000000 1000000 6888896 +par_tee 5 a 9 +par_tee 5 a 1000000 1000000 6888896 +par_tee 5 b 10 +par_tee 5 b 1000000 1000000 6888896