parallel: PARALLEL_RSYNC_OPTS and --rsync-opts introduced.

parallel: --bar -m bugfix.
This commit is contained in:
Ole Tange 2017-08-24 00:38:49 +02:00
parent 5c9712ec2c
commit 897d9f5db9
6 changed files with 141 additions and 121 deletions

View file

@ -198,51 +198,19 @@ to:parallel@gnu.org, bug-parallel@gnu.org
stable-bcc: Jesse Alama <jessealama@fastmail.fm> stable-bcc: Jesse Alama <jessealama@fastmail.fm>
Subject: GNU Parallel 20170822 ('La Rambla') released <<[stable]>> Subject: GNU Parallel 20170922 ('') released <<[stable]>>
GNU Parallel 20170822 ('La Rambla') <<[stable]>> has been released. It is available for download at: http://ftpmirror.gnu.org/parallel/ GNU Parallel 20170922 ('') <<[stable]>> has been released. It is available for download at: http://ftpmirror.gnu.org/parallel/
<<No new functionality was introduced so this is a good candidate for a stable release.>> <<No new functionality was introduced so this is a good candidate for a stable release.>>
Haiku of the month: Haiku of the month:
--line-buffer <<>>
touches files on disk no more
faster than before
-- Ole Tange -- Ole Tange
New in this release: New in this release:
* --line-buffer no longer uses tempfiles. This is faster and makes it possible for a single process to output more data than there is free disk space.
* Vote for GNU Parallel's community ads on http://meta.unix.stackexchange.com/a/4356/2972 http://meta.askubuntu.com/a/16750/22307 http://meta.serverfault.com/a/9040/45704
* GNU Parallel was cited in: https://springerplus.springeropen.com/articles/10.1186/s40064-016-2022-y
* Edit images with GNU Parallel and ImageMagick https://fedoramagazine.org/edit-images-parallel-imagemagick/
* Running Bash Commands in Parallel https://dzone.com/articles/running-bash-commands-in-parallel-1
* Today I Learned: GNU parallel, plate tectonics https://medium.com/@nornagon/today-i-learned-gnu-parallel-plate-tectonics-9fcf24045e63
* GNU Parallel Tutorial https://www.upf.edu/web/sct-sit/gnu-parallel-tutorial
* Validating half a million TIFF files http://www.dpoc.ac.uk/2017/08/17/validating-half-a-million-tiff-files-part-two/
* Blender and GNU Parallel https://www.blendernation.com/2017/08/16/blender-gnu-parallel/
* Quick Introduction to GNU Parallel https://github.com/LangilleLab/microbiome_helper/wiki/Quick-Introduction-to-GNU-Parallel
* A simple distributed animation render example using GNU Parallel https://gitlab.com/skororu/scripts
* Do stuff on things, in parallel http://blogs.fluidinfo.com/terry/2017/08/05/do-stuff-on-things-in-parallel/
* Bash And GNU Parallel NGS Bidniz http://ino.pm/outreach/presentations/2014/03/genomics-wranglers/index.html
* Using for loop vs Gnu parallel for BLAST http://www.ettemalab.org/using-for-loop-vs-gnu-parallel-for-blast/
* 并行rsync https://gxnotes.com/article/175866.html
<<Citation not OK: BAMClipper: removing primers from alignments to minimize false-negative mutations in amplicon next-generation sequencing https://www.nature.com/articles/s41598-017-01703-6>> <<Citation not OK: BAMClipper: removing primers from alignments to minimize false-negative mutations in amplicon next-generation sequencing https://www.nature.com/articles/s41598-017-01703-6>>
<<Wrong citation https://iris.sissa.it/retrieve/handle/20.500.11767/36149/10823/And%C3%B2_tesi.pdf>> <<Wrong citation https://iris.sissa.it/retrieve/handle/20.500.11767/36149/10823/And%C3%B2_tesi.pdf>>

View file

@ -969,8 +969,8 @@ sub options_hash {
"noctrlc|no-ctrlc|no-ctrl-c" => \$opt::retired, "noctrlc|no-ctrlc|no-ctrl-c" => \$opt::retired,
"workdir|work-dir|wd=s" => \$opt::workdir, "workdir|work-dir|wd=s" => \$opt::workdir,
"W=s" => \$opt::retired, "W=s" => \$opt::retired,
"tmpdir=s" => \$opt::tmpdir, "rsync-opts|rsyncopts=s" => \$opt::rsync_opts,
"tempdir=s" => \$opt::tmpdir, "tmpdir|tempdir=s" => \$opt::tmpdir,
"use-compress-program|compress-program=s" => \$opt::compress_program, "use-compress-program|compress-program=s" => \$opt::compress_program,
"use-decompress-program|decompress-program=s" => \$opt::decompress_program, "use-decompress-program|decompress-program=s" => \$opt::decompress_program,
"compress" => \$opt::compress, "compress" => \$opt::compress,
@ -1128,6 +1128,8 @@ sub parse_options {
$Global::timeoutq = TimeoutQueue->new($opt::timeout); $Global::timeoutq = TimeoutQueue->new($opt::timeout);
} }
if(defined $opt::tmpdir) { $ENV{'TMPDIR'} = $opt::tmpdir; } if(defined $opt::tmpdir) { $ENV{'TMPDIR'} = $opt::tmpdir; }
$ENV{'PARALLEL_RSYNC_OPTS'} = $opt::rsync_opts ||
$ENV{'PARALLEL_RSYNC_OPTS'} || '-rlDzR';
$opt::nice ||= 0; $opt::nice ||= 0;
if(defined $opt::help) { usage(); exit(0); } if(defined $opt::help) { usage(); exit(0); }
if(defined $opt::sqlandworker) { if(defined $opt::sqlandworker) {
@ -6445,11 +6447,12 @@ sub rsync_transfer_cmd {
} }
$file = ::shell_quote_file($file); $file = ::shell_quote_file($file);
my $sshcmd = $self->sshcommand(); my $sshcmd = $self->sshcommand();
my $rsync_opt = "-rlDzR -e" . ::shell_quote_scalar($sshcmd); my $rsync_opts = $ENV{'PARALLEL_RSYNC_OPTS'}.
" -e".::shell_quote_scalar($sshcmd);
my $serverlogin = $self->serverlogin(); my $serverlogin = $self->serverlogin();
# Make dir if it does not exist # Make dir if it does not exist
return "$sshcmd $serverlogin -- mkdir -p $rsync_destdir && " . return "$sshcmd $serverlogin -- mkdir -p $rsync_destdir && " .
rsync()." $rsync_opt $file $serverlogin:$rsync_destdir"; rsync()." $rsync_opts $file $serverlogin:$rsync_destdir";
} }
sub cleanup_cmd { sub cleanup_cmd {
@ -6613,6 +6616,12 @@ sub total_jobs {
return $self->{'total_jobs'}; return $self->{'total_jobs'};
} }
sub flush_total_jobs {
# Unset total_jobs to force recomputing
my $self = shift;
$self->{'total_jobs'} = undef;
}
sub next_seq { sub next_seq {
my $self = shift; my $self = shift;
@ -7878,7 +7887,8 @@ sub sshreturn {
my $sshlogin = $self->sshlogin(); my $sshlogin = $self->sshlogin();
my $sshcmd = $sshlogin->sshcommand(); my $sshcmd = $sshlogin->sshcommand();
my $serverlogin = $sshlogin->serverlogin(); my $serverlogin = $sshlogin->serverlogin();
my $rsync_opt = "-rlDzR -e".::shell_quote_scalar($sshcmd); my $rsync_opts = $ENV{'PARALLEL_RSYNC_OPTS'}.
" -e".::shell_quote_scalar($sshcmd);
my $pre = ""; my $pre = "";
for my $file ($self->return()) { for my $file ($self->return()) {
$file =~ s:^\./::g; # Remove ./ if any $file =~ s:^\./::g; # Remove ./ if any
@ -7908,7 +7918,7 @@ sub sshreturn {
# --rsync-path="cd /home/tange/dir/subdir/; rsync" # --rsync-path="cd /home/tange/dir/subdir/; rsync"
# server:file.gz /home/tange/dir/subdir/ # server:file.gz /home/tange/dir/subdir/
$pre .= "mkdir -p $basedir$cd && ". $sshlogin->rsync(). $pre .= "mkdir -p $basedir$cd && ". $sshlogin->rsync().
" $rsync_cd $rsync_opt $serverlogin:". " $rsync_cd $rsync_opts $serverlogin:".
$basename . " ".$basedir.$cd.";"; $basename . " ".$basedir.$cd.";";
} }
return $pre; return $pre;
@ -9026,90 +9036,94 @@ sub slot {
return $self->{'slot'}; return $self->{'slot'};
} }
sub populate { {
# Add arguments from arg_queue until the number of arguments or my $already_spread;
# max line length is reached
# Uses:
# $Global::minimal_command_line_length
# $opt::cat
# $opt::fifo
# $Global::JobQueue
# $opt::m
# $opt::X
# $CommandLine::already_spread
# $Global::max_jobs_running
# Returns: N/A
my $self = shift;
my $next_arg;
my $max_len = $Global::minimal_command_line_length
|| Limits::Command::max_length();
if($opt::cat or $opt::fifo) { sub populate {
# Get the empty arg added by --pipepart (if any) # Add arguments from arg_queue until the number of arguments or
$Global::JobQueue->{'commandlinequeue'}->{'arg_queue'}->get(); # max line length is reached
# $PARALLEL_TMP will point to a tempfile that will be used as {} # Uses:
$Global::JobQueue->{'commandlinequeue'}->{'arg_queue'}-> # $Global::minimal_command_line_length
unget([Arg->new('$PARALLEL_TMP')]); # $opt::cat
} # $opt::fifo
while (not $self->{'arg_queue'}->empty()) { # $Global::JobQueue
$next_arg = $self->{'arg_queue'}->get(); # $opt::m
if(not defined $next_arg) { # $opt::X
next; # $Global::max_jobs_running
# Returns: N/A
my $self = shift;
my $next_arg;
my $max_len = $Global::minimal_command_line_length
|| Limits::Command::max_length();
if($opt::cat or $opt::fifo) {
# Get the empty arg added by --pipepart (if any)
$Global::JobQueue->{'commandlinequeue'}->{'arg_queue'}->get();
# $PARALLEL_TMP will point to a tempfile that will be used as {}
$Global::JobQueue->{'commandlinequeue'}->{'arg_queue'}->
unget([Arg->new('$PARALLEL_TMP')]);
} }
$self->push($next_arg); while (not $self->{'arg_queue'}->empty()) {
if($self->len() >= $max_len) { $next_arg = $self->{'arg_queue'}->get();
# Command length is now > max_length if(not defined $next_arg) {
# If there are arguments: remove the last next;
# If there are no arguments: Error }
# TODO stuff about -x opt_x $self->push($next_arg);
if($self->len() >= $max_len) {
# Command length is now > max_length
# If there are arguments: remove the last
# If there are no arguments: Error
# TODO stuff about -x opt_x
if($self->number_of_args() > 1) {
# There is something to work on
$self->{'arg_queue'}->unget($self->pop());
last;
} else {
my $args = join(" ", map { $_->orig() } @$next_arg);
::error("Command line too long (".
$self->len(). " >= ".
$max_len.
") at input ".
$self->{'arg_queue'}->arg_number().
": ".
((length $args > 50) ?
(substr($args,0,50))."..." :
$args));
$self->{'arg_queue'}->unget($self->pop());
::wait_and_exit(255);
}
}
if(defined $self->{'max_number_of_args'}) {
if($self->number_of_args() >= $self->{'max_number_of_args'}) {
last;
}
}
}
if(($opt::m or $opt::X) and not $already_spread
and $self->{'arg_queue'}->empty() and $Global::max_jobs_running) {
# -m or -X and EOF => Spread the arguments over all jobslots
# (unless they are already spread)
$already_spread ||= 1;
if($self->number_of_args() > 1) { if($self->number_of_args() > 1) {
# There is something to work on $self->{'max_number_of_args'} =
$self->{'arg_queue'}->unget($self->pop()); ::ceil($self->number_of_args()/$Global::max_jobs_running);
last; $Global::JobQueue->{'commandlinequeue'}->{'max_number_of_args'} =
} else { $self->{'max_number_of_args'};
my $args = join(" ", map { $_->orig() } @$next_arg); $self->{'arg_queue'}->unget($self->pop_all());
::error("Command line too long (". while($self->number_of_args() < $self->{'max_number_of_args'}) {
$self->len(). " >= ". $self->push($self->{'arg_queue'}->get());
$max_len. }
") at input ".
$self->{'arg_queue'}->arg_number().
": ".
((length $args > 50) ?
(substr($args,0,50))."..." :
$args));
$self->{'arg_queue'}->unget($self->pop());
::wait_and_exit(255);
} }
$Global::JobQueue->flush_total_jobs();
} }
if(defined $self->{'max_number_of_args'}) { if($opt::sqlmaster) {
if($self->number_of_args() >= $self->{'max_number_of_args'}) { # Insert the V1..Vn for this $seq in SQL table instead of generating one
last; $Global::sql->insert_records($self->seq(), $self->{'command'},
} $self->{'arg_list_flat_orig'});
} }
} }
if(($opt::m or $opt::X) and not $CommandLine::already_spread
and $self->{'arg_queue'}->empty() and $Global::max_jobs_running) {
# -m or -X and EOF => Spread the arguments over all jobslots
# (unless they are already spread)
$CommandLine::already_spread ||= 1;
if($self->number_of_args() > 1) {
$self->{'max_number_of_args'} =
::ceil($self->number_of_args()/$Global::max_jobs_running);
$Global::JobQueue->{'commandlinequeue'}->{'max_number_of_args'} =
$self->{'max_number_of_args'};
$self->{'arg_queue'}->unget($self->pop_all());
while($self->number_of_args() < $self->{'max_number_of_args'}) {
$self->push($self->{'arg_queue'}->get());
}
}
}
if($opt::sqlmaster) {
# Insert the V1..Vn for this $seq in SQL table instead of generating one
$Global::sql->insert_records($self->seq(), $self->{'command'},
$self->{'arg_list_flat_orig'});
}
} }
sub push { sub push {

View file

@ -1042,9 +1042,9 @@ B<-l 0> is an alias for B<-l 1>.
Implies B<-X> unless B<-m>, B<--xargs>, or B<--pipe> is set. Implies B<-X> unless B<-m>, B<--xargs>, or B<--pipe> is set.
=item B<--line-buffer> (alpha testing) =item B<--line-buffer> (beta testing)
=item B<--lb> (alpha testing) =item B<--lb> (beta testing)
Buffer output on line basis. B<--group> will keep the output together Buffer output on line basis. B<--group> will keep the output together
for a whole job. B<--ungroup> allows output to mixup with half a line for a whole job. B<--ungroup> allows output to mixup with half a line
@ -1867,6 +1867,12 @@ You can even use multiple matches:
See also: B<{= perl expression =}> B<--parens> See also: B<{= perl expression =}> B<--parens>
=item B<--rsync-opts> I<options> (alpha testing)
Options to pass on to B<rsync>. Setting B<--rsync-opts> takes
precedence over setting the environment variable $PARALLEL_RSYNC_OPTS.
=item B<--max-chars>=I<max-chars> =item B<--max-chars>=I<max-chars>
=item B<-s> I<max-chars> =item B<-s> I<max-chars>
@ -4250,6 +4256,11 @@ by: B<kill -TERM $PARALLEL_PID>. This only works on the local
computer. computer.
=item $PARALLEL_RSYNC_OPTS
Options to pass on to B<rsync>. Defaults to: -rlDzR.
=item $PARALLEL_SHELL =item $PARALLEL_SHELL
Use this shell for the commands run by GNU Parallel: Use this shell for the commands run by GNU Parallel:

View file

@ -25,8 +25,9 @@ the destination variable and made into an array.
If I<variablename> contains multiple names separated by ',' or space, If I<variablename> contains multiple names separated by ',' or space,
the names will be the destination variables. the names will be the destination variables.
B<parset> is beta quality and not production ready, but please use it The last bug in B<parset> was discovered 2017-06-29, so the code is
for everyday use and report bugs. ready for being tested widely. Please use it for everyday use and
report bugs.
=head1 OPTIONS =head1 OPTIONS

View file

@ -66,6 +66,26 @@ par_load_csh() {
parallel --load 100% -S csh@lo echo ::: a parallel --load 100% -S csh@lo echo ::: a
} }
par_PARALLEL_RSYNC_OPTS() {
echo '### test rsync opts'
touch parallel_rsync_opts.test
parallel --rsync-opts -rlDzRRRR -vv -S parallel@lo --trc {}.out touch {}.out ::: parallel_rsync_opts.test |
perl -ne 's/(rsync .*?RRRR)/print $1/ge'
export PARALLEL_RSYNC_OPTS=-zzrrllddRRRR
parallel -vv -S parallel@lo --trc {}.out touch {}.out ::: parallel_rsync_opts.test |
perl -ne 's/(rsync .*?RRRR)/print $1/ge'
rm parallel_rsync_opts.test parallel_rsync_opts.test.out
echo
}
par_bar_m() {
echo '### test --bar -m'
stdout parallel --bar -P 2 -m sleep ::: 1 1 2 2 3 3 |
perl -pe 's/\r/\n/g'|
grep -E '^[0-9]+ *$' |
uniq
}
export -f $(compgen -A function | grep par_) export -f $(compgen -A function | grep par_)
#compgen -A function | grep par_ | sort | parallel --delay $D -j$P --tag -k '{} 2>&1' #compgen -A function | grep par_ | sort | parallel --delay $D -j$P --tag -k '{} 2>&1'
compgen -A function | grep par_ | sort | compgen -A function | grep par_ | sort |

View file

@ -1,3 +1,9 @@
par_PARALLEL_RSYNC_OPTS ### test rsync opts
par_PARALLEL_RSYNC_OPTS rsync --protocol 30 -rlDzRRRRrsync --protocol 30 --rsync-path=cd\ ././.\;\ rsync -rlDzRRRRrsync --protocol 30 -zzrrllddRRRRrsync --protocol 30 --rsync-path=cd\ ././.\;\ rsync -zzrrllddRRRR
par_bar_m ### test --bar -m
par_bar_m 0
par_bar_m 50
par_bar_m 100
par_keep_order ### Test --keep-order par_keep_order ### Test --keep-order
par_keep_order job0 par_keep_order job0
par_keep_order job1 par_keep_order job1