Fixed bug #45479: --pipe/--pipepart --tee.

This commit is contained in:
Ole Tange 2017-02-05 02:46:29 +01:00
parent 1229288418
commit c028fa0ad7
4 changed files with 108 additions and 19 deletions

View file

@ -198,14 +198,6 @@ drain_job_queue();
::debug("init", "Done draining\n"); ::debug("init", "Done draining\n");
reaper(); reaper();
::debug("init", "Done reaping\n"); ::debug("init", "Done reaping\n");
if($opt::pipe and @opt::a) {
for my $job (@Global::tee_jobs) {
::rm($job->fh(2,"name"));
$job->set_fh(2,"name","");
$job->print();
::rm($job->fh(1,"name"));
}
}
::debug("init", "Cleaning\n"); ::debug("init", "Cleaning\n");
if($Global::semaphore) { if($Global::semaphore) {
$sem->release(); $sem->release();
@ -376,6 +368,15 @@ sub spreadstdin {
# %Global::running # %Global::running
# Returns: N/A # Returns: N/A
if($opt::tee) {
# Spawn all jobs
# read a record
# Write record to all jobs
if(not $Global::JobQueue->empty()) {
::error("--tee requres --jobs to be higher. Try --jobs 0.");
}
}
my $buf = ""; my $buf = "";
my ($recstart,$recend) = recstartrecend(); my ($recstart,$recend) = recstartrecend();
my $recendrecstart = $recend.$recstart; my $recendrecstart = $recend.$recstart;
@ -635,6 +636,50 @@ sub nindex {
} }
} }
{
my $sleep = 1;
sub tee_write {
# Write the block to all jobs
#
# Input:
# $header_ref = ref to $header string
# $block_ref = ref to $block to be written
# $recstart = record start string
# $recend = record end string
# $endpos = end position of $block
# Uses:
# %Global::running
# Returns:
# $written = amount of bytes written
my ($header_ref,$buffer_ref,$recstart,$recend,$endpos) = @_;
my $written = 0;
my $done = 0;
my %done;
while(not $done) {
$done = 1;
for my $job (values %Global::running) {
if(not $done{$job}) {
$done = 0;
if($job->block_length() > 0) {
# Flush old block
$written += $job->non_blocking_write();
} else {
# Give a copy of the new block
$job->set_block($header_ref,$buffer_ref,$endpos,$recstart,$recend);
$job->set_virgin(0);
$written += $job->non_blocking_write();
# Mark this job as done
$done{$job} = 1;
}
}
}
$sleep = ::reap_usleep($sleep);
}
return $written;
}
}
sub index64 { sub index64 {
# Do index on strings > 2GB. # Do index on strings > 2GB.
# index in Perl < v5.22 does not work for > 2GB # index in Perl < v5.22 does not work for > 2GB
@ -744,8 +789,13 @@ sub write_record_to_pipe {
if($endpos == 0) { return 0; } if($endpos == 0) { return 0; }
if(vec($Global::job_already_run,$chunk_number,1)) { return 1; } if(vec($Global::job_already_run,$chunk_number,1)) { return 1; }
if($opt::roundrobin) { if($opt::roundrobin) {
# Write the block to one of the already running jobs
return round_robin_write($header_ref,$buffer_ref,$recstart,$recend,$endpos); return round_robin_write($header_ref,$buffer_ref,$recstart,$recend,$endpos);
} }
if($opt::tee) {
# Write the block to all jobs
return tee_write($header_ref,$buffer_ref,$recstart,$recend,$endpos);
}
# If no virgin found, backoff # If no virgin found, backoff
my $sleep = 0.0001; # 0.01 ms - better performance on highend my $sleep = 0.0001; # 0.01 ms - better performance on highend
while(not @Global::virgin_jobs) { while(not @Global::virgin_jobs) {
@ -1152,6 +1202,11 @@ sub parse_options {
map { (0..9,"a".."z","A".."Z")[rand(62)] } (1..50); map { (0..9,"a".."z","A".."Z")[rand(62)] } (1..50);
push @ARGV, $Global::arg_sep, "\0"; push @ARGV, $Global::arg_sep, "\0";
} }
if(defined $opt::tee) {
if(not defined $opt::jobs) {
$opt::jobs = 0;
}
}
if(defined $opt::tty) { if(defined $opt::tty) {
# Defaults for --tty: -j1 -u # Defaults for --tty: -j1 -u
# Can be overridden with -jXXX -g # Can be overridden with -jXXX -g
@ -5377,7 +5432,7 @@ sub compute_number_of_processes {
sub get_args_or_jobs { sub get_args_or_jobs {
# Get an arg or a job (depending on mode) # Get an arg or a job (depending on mode)
if($Global::semaphore or $opt::pipe) { if($Global::semaphore or ($opt::pipe and not $opt::tee)) {
# Skip: No need to get args # Skip: No need to get args
return 1; return 1;
} elsif(defined $opt::retries and $count_jobs_already_read) { } elsif(defined $opt::retries and $count_jobs_already_read) {
@ -6617,9 +6672,7 @@ sub openoutputfiles {
} elsif(not $opt::ungroup) { } elsif(not $opt::ungroup) {
# To group we create temporary files for STDOUT and STDERR # To group we create temporary files for STDOUT and STDERR
# To avoid the cleanup unlink the files immediately (but keep them open) # To avoid the cleanup unlink the files immediately (but keep them open)
if(@Global::tee_jobs) { if($opt::files) {
# files must be removed when the tee is done
} elsif($opt::files) {
($outfhw, $outname) = ::tmpfile(SUFFIX => ".par"); ($outfhw, $outname) = ::tmpfile(SUFFIX => ".par");
($errfhw, $errname) = ::tmpfile(SUFFIX => ".par"); ($errfhw, $errname) = ::tmpfile(SUFFIX => ".par");
# --files => only remove stderr # --files => only remove stderr
@ -10223,6 +10276,10 @@ sub total_jobs {
# shorthand for $job->slot(); # shorthand for $job->slot();
$job->slot(); $job->slot();
} }
sub seq {
# shorthand for $job->seq();
$job->seq();
}
sub replace { sub replace {
# Calculates the corresponding value for a given perl expression # Calculates the corresponding value for a given perl expression

View file

@ -273,11 +273,11 @@ perl quote a string
number of jobs in total number of jobs in total
=item Z<> B<$job->>B<slot()> =item Z<> B<slot()>
slot number of job slot number of job
=item Z<> B<$job->>B<seq()> =item Z<> B<seq()>
sequence number of job sequence number of job
@ -1770,13 +1770,13 @@ of GNU B<parallel>'s internal functions and data structures.
Here are a few examples: Here are a few examples:
Is the job sequence even or odd? Is the job sequence even or odd?
--rpl '{odd} $_ = $job->seq() % 2 ? "odd" : "even"' --rpl '{odd} $_ = seq() % 2 ? "odd" : "even"'
Pad job sequence with leading zeros to get equal width Pad job sequence with leading zeros to get equal width
--rpl '{0#} $f = "%0".int(1+log(total_jobs())/log(10))."d"; $_=sprintf($f,$job->seq())' --rpl '{0#} $f = "%0".int(1+log(total_jobs())/log(10))."d"; $_=sprintf($f,seq())'
Job sequence counting from 0 Job sequence counting from 0
--rpl '{#0} $_ = $job->seq() - 1' --rpl '{#0} $_ = seq() - 1'
Job slot counting from 2 Job slot counting from 2
--rpl '{%1} $_ = $job->slot() + 1' --rpl '{%1} $_ = slot() + 1'
See also: B<{= perl expression =}> B<--parens> See also: B<{= perl expression =}> B<--parens>
@ -2168,6 +2168,13 @@ B<{}>.
B<--tagstring> is ignored when using B<-u>, B<--onall>, and B<--nonall>. B<--tagstring> is ignored when using B<-u>, B<--onall>, and B<--nonall>.
=item B<--tee> (alpha testing)
Pipe all data to all jobs. Used with B<--pipe> and B<:::>.
seq 1000 | parallel --pipe --tee -v wc {} ::: -w -l -c
=item B<--termseq> I<sequence> =item B<--termseq> I<sequence>
Termination sequence. When a job is killed due to B<--timeout>, Termination sequence. When a job is killed due to B<--timeout>,
@ -4141,7 +4148,7 @@ Killed by Ctrl-C, timeout, not enough memory or similar.
=item Z<>-2 (In joblog and SQL table) =item Z<>-2 (In joblog and SQL table)
$job->skip() was called in B<{= =}>. skip() was called in B<{= =}>.
=item Z<>-1000 (In SQL table) =item Z<>-1000 (In SQL table)

View file

@ -672,6 +672,11 @@ par_retries_replacement_string() {
rm $tmp rm $tmp
} }
par_tee() {
export PARALLEL='-k --tee --pipe --tag'
seq 1000000 | parallel 'echo {%};LANG=C wc' ::: {1..5} ::: {a..b}
}
export -f $(compgen -A function | grep par_) export -f $(compgen -A function | grep par_)
compgen -A function | grep par_ | sort | compgen -A function | grep par_ | sort |
parallel -j6 --tag -k --joblog +/tmp/jl-`basename $0` '{} 2>&1' parallel -j6 --tag -k --joblog +/tmp/jl-`basename $0` '{} 2>&1'

View file

@ -1630,3 +1630,23 @@ par_retries_replacement_string 22
par_retries_replacement_string 33 par_retries_replacement_string 33
par_retries_replacement_string 33 par_retries_replacement_string 33
par_retries_replacement_string 33 par_retries_replacement_string 33
par_tee 1 a 1
par_tee 1 a 1000000 1000000 6888896
par_tee 1 b 2
par_tee 1 b 1000000 1000000 6888896
par_tee 2 a 3
par_tee 2 a 1000000 1000000 6888896
par_tee 2 b 4
par_tee 2 b 1000000 1000000 6888896
par_tee 3 a 5
par_tee 3 a 1000000 1000000 6888896
par_tee 3 b 6
par_tee 3 b 1000000 1000000 6888896
par_tee 4 a 7
par_tee 4 a 1000000 1000000 6888896
par_tee 4 b 8
par_tee 4 b 1000000 1000000 6888896
par_tee 5 a 9
par_tee 5 a 1000000 1000000 6888896
par_tee 5 b 10
par_tee 5 b 1000000 1000000 6888896