mirror of
https://git.savannah.gnu.org/git/parallel.git
synced 2024-11-25 23:47:53 +00:00
Faster version bug #45479: --pipe/--pipepart --tee.
This commit is contained in:
parent
70006774e8
commit
a2358aebe4
371
src/parallel
371
src/parallel
|
@ -51,8 +51,17 @@ my @command = @ARGV;
|
||||||
|
|
||||||
my @input_source_fh;
|
my @input_source_fh;
|
||||||
if($opt::pipepart) {
|
if($opt::pipepart) {
|
||||||
# -a is used for data - not for command line args
|
if($opt::tee) {
|
||||||
@input_source_fh = map { open_or_exit($_) } "/dev/null";
|
@input_source_fh = map { open_or_exit($_) } @opt::a;
|
||||||
|
# Remove the first: It will be the file piped.
|
||||||
|
shift @input_source_fh;
|
||||||
|
if(not @input_source_fh and not $opt::pipe) {
|
||||||
|
@input_source_fh = (*STDIN);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
# -a is used for data - not for command line args
|
||||||
|
@input_source_fh = map { open_or_exit($_) } "/dev/null";
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
@input_source_fh = map { open_or_exit($_) } @opt::a;
|
@input_source_fh = map { open_or_exit($_) } @opt::a;
|
||||||
if(not @input_source_fh and not $opt::pipe) {
|
if(not @input_source_fh and not $opt::pipe) {
|
||||||
|
@ -77,6 +86,65 @@ if($opt::skip_first_line) {
|
||||||
|
|
||||||
set_input_source_header();
|
set_input_source_header();
|
||||||
|
|
||||||
|
if($opt::filter_hosts and (@opt::sshlogin or @opt::sshloginfile)) {
|
||||||
|
# Parallel check all hosts are up. Remove hosts that are down
|
||||||
|
filter_hosts();
|
||||||
|
}
|
||||||
|
|
||||||
|
if($opt::nonall or $opt::onall) {
|
||||||
|
onall(\@input_source_fh,@command);
|
||||||
|
wait_and_exit(min(undef_as_zero($Global::exitstatus),254));
|
||||||
|
}
|
||||||
|
|
||||||
|
# TODO --transfer foo/./bar --cleanup
|
||||||
|
# multiple --transfer and --basefile with different /./
|
||||||
|
|
||||||
|
$Global::JobQueue = JobQueue->new(
|
||||||
|
\@command,\@input_source_fh,$Global::ContextReplace,
|
||||||
|
$number_of_args,\@Global::transfer_files,\@Global::ret_files);
|
||||||
|
|
||||||
|
if($opt::pipepart) {
|
||||||
|
pipepart_setup();
|
||||||
|
} elsif($opt::pipe and $opt::tee) {
|
||||||
|
pipe_tee_setup();
|
||||||
|
}
|
||||||
|
|
||||||
|
if($opt::eta or $opt::bar or $opt::shuf or $Global::halt_pct) {
|
||||||
|
# Count the number of jobs or shuffle all jobs
|
||||||
|
# before starting any.
|
||||||
|
# Must be done after ungetting any --pipepart jobs.
|
||||||
|
$Global::JobQueue->total_jobs();
|
||||||
|
}
|
||||||
|
# Compute $Global::max_jobs_running
|
||||||
|
# Must be done after ungetting any --pipepart jobs.
|
||||||
|
max_jobs_running();
|
||||||
|
|
||||||
|
init_run_jobs();
|
||||||
|
my $sem;
|
||||||
|
if($Global::semaphore) {
|
||||||
|
$sem = acquire_semaphore();
|
||||||
|
}
|
||||||
|
$SIG{TERM} = \&start_no_new_jobs;
|
||||||
|
start_more_jobs();
|
||||||
|
if($opt::tee) {
|
||||||
|
# All jobs must be running in parallel for --tee
|
||||||
|
$Global::start_no_new_jobs = 1;
|
||||||
|
} elsif($opt::pipe and not $opt::pipepart) {
|
||||||
|
spreadstdin();
|
||||||
|
}
|
||||||
|
::debug("init", "Start draining\n");
|
||||||
|
drain_job_queue();
|
||||||
|
::debug("init", "Done draining\n");
|
||||||
|
reaper();
|
||||||
|
::debug("init", "Done reaping\n");
|
||||||
|
::debug("init", "Cleaning\n");
|
||||||
|
if($Global::semaphore) {
|
||||||
|
$sem->release();
|
||||||
|
}
|
||||||
|
cleanup();
|
||||||
|
::debug("init", "Halt\n");
|
||||||
|
halt();
|
||||||
|
|
||||||
sub set_input_source_header {
|
sub set_input_source_header {
|
||||||
if($opt::header and not $opt::pipe) {
|
if($opt::header and not $opt::pipe) {
|
||||||
# split with colsep or \t
|
# split with colsep or \t
|
||||||
|
@ -117,114 +185,102 @@ sub set_input_source_header {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if($opt::filter_hosts and (@opt::sshlogin or @opt::sshloginfile)) {
|
sub max_jobs_running {
|
||||||
# Parallel check all hosts are up. Remove hosts that are down
|
if(not $Global::max_jobs_running) {
|
||||||
filter_hosts();
|
|
||||||
}
|
|
||||||
|
|
||||||
if($opt::nonall or $opt::onall) {
|
|
||||||
onall(\@input_source_fh,@command);
|
|
||||||
wait_and_exit(min(undef_as_zero($Global::exitstatus),254));
|
|
||||||
}
|
|
||||||
|
|
||||||
# TODO --transfer foo/./bar --cleanup
|
|
||||||
# multiple --transfer and --basefile with different /./
|
|
||||||
|
|
||||||
$Global::JobQueue = JobQueue->new(
|
|
||||||
\@command,\@input_source_fh,$Global::ContextReplace,
|
|
||||||
$number_of_args,\@Global::transfer_files,\@Global::ret_files);
|
|
||||||
|
|
||||||
if($opt::pipepart) {
|
|
||||||
if(not $opt::blocksize) {
|
|
||||||
# --blocksize with 10 jobs per jobslot
|
|
||||||
$opt::blocksize = -10;
|
|
||||||
}
|
|
||||||
if($opt::roundrobin) {
|
|
||||||
# --blocksize with 1 job per jobslot
|
|
||||||
$opt::blocksize = -1;
|
|
||||||
}
|
|
||||||
if($opt::blocksize < 0) {
|
|
||||||
my $size = 0;
|
|
||||||
# Compute size of -a
|
|
||||||
for(@opt::a) {
|
|
||||||
if(-f $_) {
|
|
||||||
$size += -s $_;
|
|
||||||
} elsif(-b $_) {
|
|
||||||
$size += size_of_block_dev($_);
|
|
||||||
} else {
|
|
||||||
::error("$_ is neither a file nor a block device");
|
|
||||||
wait_and_exit(255);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
# Compute $Global::max_jobs_running
|
# Compute $Global::max_jobs_running
|
||||||
$Global::dummy_jobs = 1;
|
|
||||||
for my $sshlogin (values %Global::host) {
|
for my $sshlogin (values %Global::host) {
|
||||||
$sshlogin->max_jobs_running();
|
$sshlogin->max_jobs_running();
|
||||||
}
|
}
|
||||||
$Global::max_jobs_running or
|
|
||||||
::die_bug("Global::max_jobs_running not set");
|
|
||||||
# Run in total $job_slots*(- $blocksize) jobs
|
|
||||||
# Set --blocksize = size / no of proc / (- $blocksize)
|
|
||||||
$Global::blocksize = 1 +
|
|
||||||
int($size / $Global::max_jobs_running / -$opt::blocksize);
|
|
||||||
}
|
}
|
||||||
@Global::cat_partials = map { pipe_part_files($_) } @opt::a;
|
return $Global::max_jobs_running;
|
||||||
# Unget the empty arg as many times as there are parts
|
|
||||||
$Global::JobQueue->{'commandlinequeue'}{'arg_queue'}->unget(
|
|
||||||
map { [Arg->new("\0")] } @Global::cat_partials
|
|
||||||
);
|
|
||||||
}
|
|
||||||
if($opt::eta or $opt::bar or $opt::shuf or $Global::halt_pct) {
|
|
||||||
# Count the number of jobs or shuffle all jobs
|
|
||||||
# before starting any.
|
|
||||||
# Must be done after ungetting any --pipepart jobs.
|
|
||||||
$Global::JobQueue->total_jobs();
|
|
||||||
}
|
|
||||||
# Compute $Global::max_jobs_running
|
|
||||||
# Must be done after ungetting any --pipepart jobs.
|
|
||||||
for my $sshlogin (values %Global::host) {
|
|
||||||
$sshlogin->max_jobs_running();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
init_run_jobs();
|
sub halt {
|
||||||
my $sem;
|
if($opt::halt and $Global::halt_when ne "never") {
|
||||||
if($Global::semaphore) {
|
if(not defined $Global::halt_exitstatus) {
|
||||||
$sem = acquire_semaphore();
|
if($Global::halt_pct) {
|
||||||
}
|
$Global::halt_exitstatus =
|
||||||
$SIG{TERM} = \&start_no_new_jobs;
|
::ceil($Global::total_failed / $Global::total_started * 100);
|
||||||
start_more_jobs();
|
} elsif($Global::halt_count) {
|
||||||
if($opt::pipe and not $opt::pipepart) {
|
$Global::halt_exitstatus =
|
||||||
spreadstdin();
|
::min(undef_as_zero($Global::total_failed),101);
|
||||||
}
|
}
|
||||||
::debug("init", "Start draining\n");
|
|
||||||
drain_job_queue();
|
|
||||||
::debug("init", "Done draining\n");
|
|
||||||
reaper();
|
|
||||||
::debug("init", "Done reaping\n");
|
|
||||||
::debug("init", "Cleaning\n");
|
|
||||||
if($Global::semaphore) {
|
|
||||||
$sem->release();
|
|
||||||
}
|
|
||||||
cleanup();
|
|
||||||
::debug("init", "Halt\n");
|
|
||||||
if($opt::halt and $Global::halt_when ne "never") {
|
|
||||||
if(not defined $Global::halt_exitstatus) {
|
|
||||||
if($Global::halt_pct) {
|
|
||||||
$Global::halt_exitstatus =
|
|
||||||
::ceil($Global::total_failed / $Global::total_started * 100);
|
|
||||||
} elsif($Global::halt_count) {
|
|
||||||
$Global::halt_exitstatus =
|
|
||||||
::min(undef_as_zero($Global::total_failed),101);
|
|
||||||
}
|
}
|
||||||
|
wait_and_exit($Global::halt_exitstatus);
|
||||||
|
} else {
|
||||||
|
wait_and_exit(min(undef_as_zero($Global::exitstatus),101));
|
||||||
}
|
}
|
||||||
wait_and_exit($Global::halt_exitstatus);
|
|
||||||
} else {
|
|
||||||
wait_and_exit(min(undef_as_zero($Global::exitstatus),101));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
sub __PIPE_MODE__ {}
|
sub __PIPE_MODE__ {}
|
||||||
|
|
||||||
|
sub pipepart_setup {
|
||||||
|
if($opt::tee) {
|
||||||
|
# Prepend each command with
|
||||||
|
# cat file
|
||||||
|
my $cat_string = "cat ".::shell_quote_scalar($opt::a[0]);
|
||||||
|
for(1..max_jobs_running()) {
|
||||||
|
push @Global::cat_prepends, $cat_string;
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
if(not $opt::blocksize) {
|
||||||
|
# --blocksize with 10 jobs per jobslot
|
||||||
|
$opt::blocksize = -10;
|
||||||
|
}
|
||||||
|
if($opt::roundrobin) {
|
||||||
|
# --blocksize with 1 job per jobslot
|
||||||
|
$opt::blocksize = -1;
|
||||||
|
}
|
||||||
|
if($opt::blocksize < 0) {
|
||||||
|
my $size = 0;
|
||||||
|
# Compute size of -a
|
||||||
|
for(@opt::a) {
|
||||||
|
if(-f $_) {
|
||||||
|
$size += -s $_;
|
||||||
|
} elsif(-b $_) {
|
||||||
|
$size += size_of_block_dev($_);
|
||||||
|
} else {
|
||||||
|
::error("$_ is neither a file nor a block device");
|
||||||
|
wait_and_exit(255);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
# Run in total $job_slots*(- $blocksize) jobs
|
||||||
|
# Set --blocksize = size / no of proc / (- $blocksize)
|
||||||
|
$Global::dummy_jobs = 1;
|
||||||
|
$Global::blocksize = 1 +
|
||||||
|
int($size / max_jobs_running() / -$opt::blocksize);
|
||||||
|
}
|
||||||
|
@Global::cat_prepends = map { pipe_part_files($_) } @opt::a;
|
||||||
|
# Unget the empty arg as many times as there are parts
|
||||||
|
$Global::JobQueue->{'commandlinequeue'}{'arg_queue'}->unget(
|
||||||
|
map { [Arg->new("\0")] } @Global::cat_prepends
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
sub pipe_tee_setup {
|
||||||
|
# mkfifo t1..5
|
||||||
|
my @fifos;
|
||||||
|
for(1..max_jobs_running()) {
|
||||||
|
push @fifos, tmpfifo();
|
||||||
|
}
|
||||||
|
|
||||||
|
# cat foo | tee t1 t2 t3 t4 t5 > /dev/null
|
||||||
|
if(not fork()){
|
||||||
|
# Let tee inheirit our stdin
|
||||||
|
# and redirect stdout to null
|
||||||
|
open STDOUT, ">","/dev/null";
|
||||||
|
exec "tee",@fifos;
|
||||||
|
}
|
||||||
|
|
||||||
|
# cat t1 | grep 1
|
||||||
|
# cat t2 | grep 2
|
||||||
|
# cat t3 | grep 3
|
||||||
|
# cat t4 | grep 4
|
||||||
|
# cat t5 | grep 5
|
||||||
|
# Remove the tmpfifo as soon as it is open
|
||||||
|
@Global::cat_prepends = map { "(rm $_;cat) < $_" } @fifos;
|
||||||
|
}
|
||||||
|
|
||||||
sub pipe_part_files {
|
sub pipe_part_files {
|
||||||
# Input:
|
# Input:
|
||||||
|
@ -240,12 +296,12 @@ sub pipe_part_files {
|
||||||
my $header = find_header(\$buf,open_or_exit($file));
|
my $header = find_header(\$buf,open_or_exit($file));
|
||||||
# find positions
|
# find positions
|
||||||
my @pos = find_split_positions($file,$Global::blocksize,length $header);
|
my @pos = find_split_positions($file,$Global::blocksize,length $header);
|
||||||
# Make @cat_partials
|
# Make @cat_prepends
|
||||||
my @cat_partials = ();
|
my @cat_prepends = ();
|
||||||
for(my $i=0; $i<$#pos; $i++) {
|
for(my $i=0; $i<$#pos; $i++) {
|
||||||
push @cat_partials, cat_partial($file, 0, length($header), $pos[$i], $pos[$i+1]);
|
push @cat_prepends, cat_partial($file, 0, length($header), $pos[$i], $pos[$i+1]);
|
||||||
}
|
}
|
||||||
return @cat_partials;
|
return @cat_prepends;
|
||||||
}
|
}
|
||||||
|
|
||||||
sub find_header {
|
sub find_header {
|
||||||
|
@ -336,22 +392,26 @@ sub cat_partial {
|
||||||
# $file = the file to read
|
# $file = the file to read
|
||||||
# ($start, $end, [$start2, $end2, ...]) = start byte, end byte
|
# ($start, $end, [$start2, $end2, ...]) = start byte, end byte
|
||||||
# Returns:
|
# Returns:
|
||||||
# Efficient perl command to copy $start..$end, $start2..$end2, ... to stdout
|
# Efficient command to copy $start..$end, $start2..$end2, ... to stdout
|
||||||
my($file, @start_end) = @_;
|
my($file, @start_end) = @_;
|
||||||
my($start, $i);
|
my($start, $i);
|
||||||
# Convert start_end to start_len
|
# Convert start_end to start_len
|
||||||
my @start_len = map { if(++$i % 2) { $start = $_; } else { $_-$start } } @start_end;
|
my @start_len = map {
|
||||||
my $script = spacefree(0,
|
if(++$i % 2) { $start = $_; } else { $_-$start }
|
||||||
q{
|
} @start_end;
|
||||||
while(@ARGV) {
|
my $script = spacefree
|
||||||
sysseek(STDIN,shift,0) || die;
|
(0,
|
||||||
$left = shift;
|
q{
|
||||||
while($read = sysread(STDIN,$buf, $left > 131072 ? 131072 : $left)){
|
while(@ARGV) {
|
||||||
$left -= $read;
|
sysseek(STDIN,shift,0) || die;
|
||||||
syswrite(STDOUT,$buf);
|
$left = shift;
|
||||||
}
|
while($read =
|
||||||
}
|
sysread(STDIN,$buf, $left > 131072 ? 131072 : $left)){
|
||||||
});
|
$left -= $read;
|
||||||
|
syswrite(STDOUT,$buf);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
return "<". shell_quote_scalar($file) .
|
return "<". shell_quote_scalar($file) .
|
||||||
" perl -e '$script' @start_len";
|
" perl -e '$script' @start_len";
|
||||||
}
|
}
|
||||||
|
@ -639,50 +699,6 @@ sub nindex {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
{
|
|
||||||
my $sleep = 1;
|
|
||||||
|
|
||||||
sub tee_write {
|
|
||||||
# Write the block to all jobs
|
|
||||||
#
|
|
||||||
# Input:
|
|
||||||
# $header_ref = ref to $header string
|
|
||||||
# $block_ref = ref to $block to be written
|
|
||||||
# $recstart = record start string
|
|
||||||
# $recend = record end string
|
|
||||||
# $endpos = end position of $block
|
|
||||||
# Uses:
|
|
||||||
# %Global::running
|
|
||||||
# Returns:
|
|
||||||
# $written = amount of bytes written
|
|
||||||
my ($header_ref,$buffer_ref,$recstart,$recend,$endpos) = @_;
|
|
||||||
my $written = 0;
|
|
||||||
my $done = 0;
|
|
||||||
my %done;
|
|
||||||
while(not $done) {
|
|
||||||
$done = 1;
|
|
||||||
for my $job (values %Global::running) {
|
|
||||||
if(not $done{$job}) {
|
|
||||||
$done = 0;
|
|
||||||
if($job->block_length() > 0) {
|
|
||||||
# Flush old block
|
|
||||||
$written += $job->non_blocking_write();
|
|
||||||
} else {
|
|
||||||
# Give a copy of the new block
|
|
||||||
$job->set_block($header_ref,$buffer_ref,$endpos,$recstart,$recend);
|
|
||||||
$job->set_virgin(0);
|
|
||||||
$written += $job->non_blocking_write();
|
|
||||||
# Mark this job as done
|
|
||||||
$done{$job} = 1;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
$sleep = ::reap_usleep($sleep);
|
|
||||||
}
|
|
||||||
return $written;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
sub index64 {
|
sub index64 {
|
||||||
# Do index on strings > 2GB.
|
# Do index on strings > 2GB.
|
||||||
# index in Perl < v5.22 does not work for > 2GB
|
# index in Perl < v5.22 does not work for > 2GB
|
||||||
|
@ -795,10 +811,6 @@ sub write_record_to_pipe {
|
||||||
# Write the block to one of the already running jobs
|
# Write the block to one of the already running jobs
|
||||||
return round_robin_write($header_ref,$buffer_ref,$recstart,$recend,$endpos);
|
return round_robin_write($header_ref,$buffer_ref,$recstart,$recend,$endpos);
|
||||||
}
|
}
|
||||||
if($opt::tee) {
|
|
||||||
# Write the block to all jobs
|
|
||||||
return tee_write($header_ref,$buffer_ref,$recstart,$recend,$endpos);
|
|
||||||
}
|
|
||||||
# If no virgin found, backoff
|
# If no virgin found, backoff
|
||||||
my $sleep = 0.0001; # 0.01 ms - better performance on highend
|
my $sleep = 0.0001; # 0.01 ms - better performance on highend
|
||||||
while(not @Global::virgin_jobs) {
|
while(not @Global::virgin_jobs) {
|
||||||
|
@ -1349,7 +1361,7 @@ sub check_invalid_option_combinations {
|
||||||
|
|
||||||
sub init_globals {
|
sub init_globals {
|
||||||
# Defaults:
|
# Defaults:
|
||||||
$Global::version = 20170123;
|
$Global::version = 20170202;
|
||||||
$Global::progname = 'parallel';
|
$Global::progname = 'parallel';
|
||||||
$Global::infinity = 2**31;
|
$Global::infinity = 2**31;
|
||||||
$Global::debug = 0;
|
$Global::debug = 0;
|
||||||
|
@ -4119,17 +4131,17 @@ sub tmpname {
|
||||||
do {
|
do {
|
||||||
$tmpname = $ENV{'TMPDIR'}."/".$name.
|
$tmpname = $ENV{'TMPDIR'}."/".$name.
|
||||||
join"", map { (0..9,"a".."z","A".."Z")[rand(62)] } (1..5);
|
join"", map { (0..9,"a".."z","A".."Z")[rand(62)] } (1..5);
|
||||||
} while($Global::unlink{$tmpname}++ or -e $tmpname);
|
} while(-e $tmpname or $Global::unlink{$tmpname}++);
|
||||||
return $tmpname;
|
return $tmpname;
|
||||||
}
|
}
|
||||||
|
|
||||||
#sub tmpfifo {
|
sub tmpfifo {
|
||||||
# # Find an unused name and mkfifo on it
|
# Find an unused name and mkfifo on it
|
||||||
# use POSIX qw(mkfifo);
|
use POSIX qw(mkfifo);
|
||||||
# my $tmpfifo = tmpname("fif",@_);
|
my $tmpfifo = tmpname("fif",@_);
|
||||||
# mkfifo($tmpfifo,0600);
|
mkfifo($tmpfifo,0600);
|
||||||
# return $tmpfifo;
|
return $tmpfifo;
|
||||||
#}
|
}
|
||||||
|
|
||||||
sub rm {
|
sub rm {
|
||||||
# Remove file and remove it from %Global::unlink
|
# Remove file and remove it from %Global::unlink
|
||||||
|
@ -7129,7 +7141,8 @@ sub wrapped {
|
||||||
# * --cat
|
# * --cat
|
||||||
# * --fifo
|
# * --fifo
|
||||||
# * --sshlogin
|
# * --sshlogin
|
||||||
# * --pipepart (@Global::cat_partials)
|
# * --pipepart (@Global::cat_prepends)
|
||||||
|
# * --tee (@Global::cat_prepends)
|
||||||
# * --pipe
|
# * --pipe
|
||||||
# * --tmux
|
# * --tmux
|
||||||
# The ordering of the wrapping is important:
|
# The ordering of the wrapping is important:
|
||||||
|
@ -7141,7 +7154,7 @@ sub wrapped {
|
||||||
# $Global::shell
|
# $Global::shell
|
||||||
# $opt::cat
|
# $opt::cat
|
||||||
# $opt::fifo
|
# $opt::fifo
|
||||||
# @Global::cat_partials
|
# @Global::cat_prepends
|
||||||
# $opt::pipe
|
# $opt::pipe
|
||||||
# $opt::tmux
|
# $opt::tmux
|
||||||
# Returns:
|
# Returns:
|
||||||
|
@ -7201,15 +7214,21 @@ sub wrapped {
|
||||||
}
|
}
|
||||||
# Wrap with ssh + tranferring of files
|
# Wrap with ssh + tranferring of files
|
||||||
$command = $self->sshlogin_wrap($command);
|
$command = $self->sshlogin_wrap($command);
|
||||||
if(@Global::cat_partials) {
|
if(@Global::cat_prepends) {
|
||||||
# Prepend:
|
# --pipepart: prepend:
|
||||||
# < /tmp/foo perl -e 'while(@ARGV) {
|
# < /tmp/foo perl -e 'while(@ARGV) {
|
||||||
# sysseek(STDIN,shift,0) || die; $left = shift;
|
# sysseek(STDIN,shift,0) || die; $left = shift;
|
||||||
# while($read = sysread(STDIN,$buf, ($left > 131072 ? 131072 : $left))){
|
# while($read = sysread(STDIN,$buf, ($left > 131072 ? 131072 : $left))){
|
||||||
# $left -= $read; syswrite(STDOUT,$buf);
|
# $left -= $read; syswrite(STDOUT,$buf);
|
||||||
# }
|
# }
|
||||||
# }' 0 0 0 11 |
|
# }' 0 0 0 11 |
|
||||||
$command = (shift @Global::cat_partials). " | ($command)";
|
#
|
||||||
|
# --pipepart --tee: prepend:
|
||||||
|
# cat dash-a-file |
|
||||||
|
#
|
||||||
|
# --pipe --tee: prepend:
|
||||||
|
# cat fifo |
|
||||||
|
$command = (shift @Global::cat_prepends). " | ($command)";
|
||||||
} elsif($opt::pipe) {
|
} elsif($opt::pipe) {
|
||||||
# Wrap with EOF-detector to avoid starting $command if EOF.
|
# Wrap with EOF-detector to avoid starting $command if EOF.
|
||||||
$command = empty_input_wrapper($command);
|
$command = empty_input_wrapper($command);
|
||||||
|
|
|
@ -1772,7 +1772,7 @@ Here are a few examples:
|
||||||
Is the job sequence even or odd?
|
Is the job sequence even or odd?
|
||||||
--rpl '{odd} $_ = seq() % 2 ? "odd" : "even"'
|
--rpl '{odd} $_ = seq() % 2 ? "odd" : "even"'
|
||||||
Pad job sequence with leading zeros to get equal width
|
Pad job sequence with leading zeros to get equal width
|
||||||
--rpl '{0#} $f = "%0".int(1+log(total_jobs())/log(10))."d"; $_=sprintf($f,seq())'
|
--rpl '{0#} $f=1+int("".(log(total_jobs())/log(10))); $_=sprintf("%0${f}d",seq())'
|
||||||
Job sequence counting from 0
|
Job sequence counting from 0
|
||||||
--rpl '{#0} $_ = seq() - 1'
|
--rpl '{#0} $_ = seq() - 1'
|
||||||
Job slot counting from 2
|
Job slot counting from 2
|
||||||
|
@ -2170,10 +2170,16 @@ B<--tagstring> is ignored when using B<-u>, B<--onall>, and B<--nonall>.
|
||||||
|
|
||||||
=item B<--tee> (alpha testing)
|
=item B<--tee> (alpha testing)
|
||||||
|
|
||||||
Pipe all data to all jobs. Used with B<--pipe> and B<:::>.
|
Pipe all data to all jobs. Used with B<--pipe>/B<--pipepart> and
|
||||||
|
B<:::>.
|
||||||
|
|
||||||
seq 1000 | parallel --pipe --tee -v wc {} ::: -w -l -c
|
seq 1000 | parallel --pipe --tee -v wc {} ::: -w -l -c
|
||||||
|
|
||||||
|
How many numbers in 1..1000 contain 0..9, and how many bytes do they
|
||||||
|
fill:
|
||||||
|
|
||||||
|
seq 1000 | parallel --pipe --tee --tag 'grep {1} | wc {2}' ::: {0..9} ::: -l -c
|
||||||
|
|
||||||
|
|
||||||
=item B<--termseq> I<sequence>
|
=item B<--termseq> I<sequence>
|
||||||
|
|
||||||
|
|
|
@ -675,6 +675,7 @@ par_retries_replacement_string() {
|
||||||
par_tee() {
|
par_tee() {
|
||||||
export PARALLEL='-k --tee --pipe --tag'
|
export PARALLEL='-k --tee --pipe --tag'
|
||||||
seq 1000000 | parallel 'echo {%};LANG=C wc' ::: {1..5} ::: {a..b}
|
seq 1000000 | parallel 'echo {%};LANG=C wc' ::: {1..5} ::: {a..b}
|
||||||
|
seq 300000 | parallel 'grep {1} | LANG=C wc {2}' ::: {1..5} ::: -l -c
|
||||||
}
|
}
|
||||||
|
|
||||||
par_tagstring_pipe() {
|
par_tagstring_pipe() {
|
||||||
|
|
|
@ -239,6 +239,31 @@ par_tmux_fg() {
|
||||||
stdout parallel --tmux --fg sleep ::: 3 | perl -pe 's/.tmp\S+/tmp/'
|
stdout parallel --tmux --fg sleep ::: 3 | perl -pe 's/.tmp\S+/tmp/'
|
||||||
}
|
}
|
||||||
|
|
||||||
|
par_pipe_tee() {
|
||||||
|
echo 'bug #45479: --pipe/--pipepart --tee'
|
||||||
|
echo '--pipe --tee'
|
||||||
|
|
||||||
|
random1G() {
|
||||||
|
< /dev/zero openssl enc -aes-128-ctr -K 1234 -iv 1234 2>/dev/null |
|
||||||
|
head -c 1G;
|
||||||
|
}
|
||||||
|
random1G | parallel --pipe --tee cat ::: {1..3} | LANG=C wc -c
|
||||||
|
}
|
||||||
|
|
||||||
|
par_pipepart_tee() {
|
||||||
|
echo 'bug #45479: --pipe/--pipepart --tee'
|
||||||
|
echo '--pipepart --tee'
|
||||||
|
|
||||||
|
random1G() {
|
||||||
|
< /dev/zero openssl enc -aes-128-ctr -K 1234 -iv 1234 2>/dev/null |
|
||||||
|
head -c 1G;
|
||||||
|
}
|
||||||
|
tmp=$(mktemp)
|
||||||
|
random1G >$tmp
|
||||||
|
parallel --pipepart --tee -a $tmp cat ::: {1..3} | LANG=C wc -c
|
||||||
|
rm $tmp
|
||||||
|
}
|
||||||
|
|
||||||
export -f $(compgen -A function | grep par_)
|
export -f $(compgen -A function | grep par_)
|
||||||
compgen -A function | grep par_ | sort |
|
compgen -A function | grep par_ | sort |
|
||||||
parallel --joblog /tmp/jl-`basename $0` -j10 --tag -k '{} 2>&1'
|
parallel --joblog /tmp/jl-`basename $0` -j10 --tag -k '{} 2>&1'
|
||||||
|
|
|
@ -74,6 +74,8 @@ perl -ne '$/="\n\n"; /^Output/../^[^O]\S/ and next; /^ / and print;' ../../src/
|
||||||
# Timings are often off
|
# Timings are often off
|
||||||
s/^(\d)$/9/;
|
s/^(\d)$/9/;
|
||||||
s/^(\d\d)$/99/;
|
s/^(\d\d)$/99/;
|
||||||
|
# Sometime these vars are not present
|
||||||
|
s/^PAM_KWALLET5*_LOGIN$//;
|
||||||
# Fails often due to race
|
# Fails often due to race
|
||||||
s/cat: input_file: No such file or directory\n//;
|
s/cat: input_file: No such file or directory\n//;
|
||||||
s{rsync: link_stat ".*/home/parallel/input_file.out" .*\n}{};
|
s{rsync: link_stat ".*/home/parallel/input_file.out" .*\n}{};
|
||||||
|
|
|
@ -1654,3 +1654,13 @@ par_tee 5 a 9
|
||||||
par_tee 5 a 1000000 1000000 6888896
|
par_tee 5 a 1000000 1000000 6888896
|
||||||
par_tee 5 b 10
|
par_tee 5 b 10
|
||||||
par_tee 5 b 1000000 1000000 6888896
|
par_tee 5 b 1000000 1000000 6888896
|
||||||
|
par_tee 1 -l 181902
|
||||||
|
par_tee 1 -c 1228633
|
||||||
|
par_tee 2 -l 181902
|
||||||
|
par_tee 2 -c 1228633
|
||||||
|
par_tee 3 -l 122854
|
||||||
|
par_tee 3 -c 815297
|
||||||
|
par_tee 4 -l 122853
|
||||||
|
par_tee 4 -c 815290
|
||||||
|
par_tee 5 -l 122853
|
||||||
|
par_tee 5 -c 815290
|
||||||
|
|
|
@ -492,8 +492,14 @@ par_memleak should give 1 == true
|
||||||
par_memleak Memory use should not depend very much on the total number of jobs run\n
|
par_memleak Memory use should not depend very much on the total number of jobs run\n
|
||||||
par_memleak Test if memory consumption(300 jobs) < memory consumption(30 jobs) * 110%
|
par_memleak Test if memory consumption(300 jobs) < memory consumption(30 jobs) * 110%
|
||||||
par_memleak 1
|
par_memleak 1
|
||||||
|
par_pipe_tee bug #45479: --pipe/--pipepart --tee
|
||||||
|
par_pipe_tee --pipe --tee
|
||||||
|
par_pipe_tee 3221225472
|
||||||
par_pipepart_spawn ### bug #46214: Using --pipepart doesnt spawn multiple jobs in version 20150922
|
par_pipepart_spawn ### bug #46214: Using --pipepart doesnt spawn multiple jobs in version 20150922
|
||||||
par_pipepart_spawn 1:local / 8 / 999
|
par_pipepart_spawn 1:local / 8 / 999
|
||||||
|
par_pipepart_tee bug #45479: --pipe/--pipepart --tee
|
||||||
|
par_pipepart_tee --pipepart --tee
|
||||||
|
par_pipepart_tee 3221225472
|
||||||
par_print_before_halt_on_error ### What is printed before the jobs are killed
|
par_print_before_halt_on_error ### What is printed before the jobs are killed
|
||||||
par_print_before_halt_on_error -2 exit code 0
|
par_print_before_halt_on_error -2 exit code 0
|
||||||
par_print_before_halt_on_error -2 0 0
|
par_print_before_halt_on_error -2 0 0
|
||||||
|
|
|
@ -852,8 +852,6 @@ MFLAGS
|
||||||
MOZ_NO_REMOTE
|
MOZ_NO_REMOTE
|
||||||
ORACLE_HOME
|
ORACLE_HOME
|
||||||
ORACLE_SID
|
ORACLE_SID
|
||||||
PAM_KWALLET5_LOGIN
|
|
||||||
PAM_KWALLET_LOGIN
|
|
||||||
PARALLEL
|
PARALLEL
|
||||||
PARALLEL_PID
|
PARALLEL_PID
|
||||||
PARALLEL_SEQ
|
PARALLEL_SEQ
|
||||||
|
|
Loading…
Reference in a new issue