env_parallel.fish: Recursive env_parallel possible.

parallel: --rr = --round-robin.
This commit is contained in:
Ole Tange 2018-07-24 19:50:31 +02:00
parent 29fbb5f85f
commit 2dc37640a6
8 changed files with 146 additions and 74 deletions

30
NEWS
View file

@ -1,3 +1,30 @@
20180722
* The quoting engine has been changed. Instead of using \-quoting GNU
Parallel now uses '-quoting in bash/ash/dash/ksh. This should
improve compatibility with different locales. This is a big change
causing this release to be alpha quality.
* The CPU calculation has changed. By default GNU Parallel uses the
number of CPU threads as the number of CPUs. This can be changed to
the number of CPU cores or number of CPU sockets with
--use-cores-instead-of-threads or --use-sockets-instead-of-threads.
* The detected number of sockets, cores, and threads can be shown with
--number-of-sockets, --number-of-cores, and --number-of-threads.
* env_parallel now support mksh using env_parallel.mksh.
* GNU Parallel is distributed as part of Snippy
https://github.com/tseemann/snippy
* GNU Parallel: Ejecutar comandos simultáneamente en Linux
https://esgeeks.com/gnu-parallel-ejecutar-comandos-simultaneo-linux/
* Parallel download genomic data with GNU-Parallel
https://digibio.blogspot.com/search/label/GNU-Parallel
20180622
* Deal better with multibyte chars by forcing LC_ALL=C.
@ -203,8 +230,7 @@
* Initial support for XDG Base Directory $XDG_* environment variables.
* Validating half a million TIFF files. Part
Two
* Validating half a million TIFF files. Part Two
https://www.dpoc.ac.uk/2017/08/17/validating-half-a-million-tiff-files-part-two/
* Turing data science class: Cluster computing

View file

@ -121,7 +121,7 @@ function env_parallel
#
begin;
for v in (set -n | \
grep -Ev '^(PARALLEL_TMP)$' | \
grep -Ev '^(PARALLEL_TMP|PARALLEL_ENV)$' | \
grep -E "^$_grep_REGEXP\$" | grep -vE "^$_ignore_UNDERSCORE\$");
# Separate variables with the string: \000
# array_name1 val1\0
@ -168,8 +168,16 @@ function env_parallel
s/\n/\001/g'
end;
)
# If --record-env: exit
perl -e 'exit grep { /^--record-env$/ } @ARGV' -- $argv; and parallel $argv;
# --session
perl -e 'exit grep { /^--session$/ } @ARGV' -- $argv; or begin;
setenv PARALLEL_IGNORED_NAMES (
functions -n | perl -ne 's/,/\n/g; /^(env_parallel)$/ and next; print';
set -n;
)
end;
# If --record-env or --session: exit
perl -e 'exit grep { /^(--record-env|--session)$/ } @ARGV' -- $argv; and parallel $argv;
set _parallel_exit_CODE $status
set -e PARALLEL_ENV
return $_parallel_exit_CODE

View file

@ -24,7 +24,7 @@
use strict;
use Getopt::Long;
$Global::progname="niceload";
$Global::version = 20180722;
$Global::version = 20180723;
Getopt::Long::Configure("bundling","require_order");
get_options_from_array(\@ARGV) || die_usage();
if($opt::version) {

View file

@ -1049,7 +1049,7 @@ sub options_hash {
"plain" => \$opt::plain,
"profile|J=s" => \@opt::profile,
"pipe|spreadstdin" => \$opt::pipe,
"robin|round-robin|roundrobin" => \$opt::roundrobin,
"rr|robin|round-robin|roundrobin" => \$opt::roundrobin,
"recstart=s" => \$opt::recstart,
"recend=s" => \$opt::recend,
"regexp|regex" => \$opt::regexp,
@ -1265,8 +1265,11 @@ sub parse_options {
}
$opt::memfree = multiply_binary_prefix($opt::memfree);
check_invalid_option_combinations();
if((defined $opt::fifo or defined $opt::cat)
if((defined $opt::fifo or defined $opt::cat
or
defined $opt::roundrobin)
and not $opt::pipepart) {
# --fifo, --cat, and --round-robin => --pipe if not --pipepart
$opt::pipe = 1;
}
if(defined $opt::minversion) {
@ -1400,7 +1403,14 @@ sub parse_options {
#
# This problem has been covered by others - though no solution has
# been found:
#
# "Is it alright to compromise or even deliberately ignore the
# happiness of the maintainers so that we can enjoy free and open
# source software?"
# Slide 8 from:
# https://www.slideshare.net/NadiaEghbal/consider-the-maintainer
#
# NumPy has long not had funding:
# https://www.numfocus.org/blog/why-is-numpy-only-now-getting-funded/
#
# Before implementing the citation notice it was discussed with
@ -1445,13 +1455,16 @@ sub parse_options {
parse_halt();
if($ENV{'PARALLEL_ENV'}) {
if(defined $ENV{'PARALLEL_ENV'}) {
# Read environment and set $Global::parallel_env
# Must be done before is_acceptable_command_line_length()
my $penv = $ENV{'PARALLEL_ENV'};
if(not ($opt::onall or $opt::nonall)) {
# unset $PARALLEL_ENV: It should not be given to children
# because it takes up a lot of env space
delete $ENV{'PARALLEL_ENV'};
# Except when --(n)onall/nonall is set
# delete $ENV{'PARALLEL_ENV'};
}
if(-e $penv) {
# This is a file/fifo: Replace envvar with content of file
open(my $parallel_env, "<", $penv) ||
@ -1497,7 +1510,7 @@ sub check_invalid_option_combinations {
::wait_and_exit(255);
}
if(defined $opt::retries and defined $opt::roundrobin) {
::error("--retries cannot be combined with --roundrobin.");
::error("--retries cannot be combined with --round-robin.");
::wait_and_exit(255);
}
if(defined $opt::pipepart and
@ -1541,7 +1554,7 @@ sub check_invalid_option_combinations {
sub init_globals {
# Defaults:
$Global::version = 20180722;
$Global::version = 20180723;
$Global::progname = 'parallel';
$Global::infinity = 2**31;
$Global::debug = 0;
@ -7953,7 +7966,9 @@ sub wrapped {
# --pipepart: prepend:
# < /tmp/foo perl -e 'while(@ARGV) {
# sysseek(STDIN,shift,0) || die; $left = shift;
# while($read = sysread(STDIN,$buf, ($left > 131072 ? 131072 : $left))){
# while($read =
# sysread(STDIN,$buf,
# ($left > 131072 ? 131072 : $left))) {
# $left -= $read; syswrite(STDOUT,$buf);
# }
# }' 0 0 0 11 |
@ -8100,7 +8115,7 @@ sub base64_wrap {
my $eval_string = shift;
return
"perl -e ".
::shell_quote_scalar(base64_zip_eval())." ".
::Q(base64_zip_eval())." ".
join" ",::shell_quote(string_zip_base64($eval_string));
}
@ -8144,14 +8159,16 @@ sub sshlogin_wrap {
if(not $monitor_parent_sshd_script) {
$monitor_parent_sshd_script =
# This will be packed in ', so only use "
::spacefree(0,'$shell = "'.($ENV{'PARALLEL_SHELL'} || '$ENV{SHELL}').'";'.
'$tmpdir = "'.::perl_quote_scalar($ENV{'TMPDIR'}).'";'.
::spacefree(0,'$shell = "'.($ENV{'PARALLEL_SHELL'}
|| '$ENV{SHELL}').'";'.
'$tmpdir = "'.::pQ($ENV{'TMPDIR'}).'";'.
'$nice = '.$opt::nice.';'.
q{
# Set $PARALLEL_TMP to a non-existent file name in $TMPDIR
do {
$ENV{PARALLEL_TMP} = $tmpdir."/par".
join"", map { (0..9,"a".."z","A".."Z")[rand(62)] } (1..5);
join"", map
{ (0..9,"a".."z","A".."Z")[rand(62)] } (1..5);
} while(-e $ENV{PARALLEL_TMP});
$SIG{CHLD} = sub { $done = 1; };
$pid = fork;
@ -8198,7 +8215,8 @@ sub sshlogin_wrap {
local $/ = "\n";
# --env _
# Include all vars that are not in a clean environment
if(open(my $vars_fh, "<", $Global::config_dir . "/ignored_vars")) {
if(open(my $vars_fh,
"<", $Global::config_dir . "/ignored_vars")) {
my @ignore = <$vars_fh>;
chomp @ignore;
my %ignore;
@ -8212,8 +8230,10 @@ sub sshlogin_wrap {
::wait_and_exit(255);
}
}
# Duplicate vars as BASH functions to include post-shellshock functions (v1+v2)
# So --env myfunc should look for BASH_FUNC_myfunc() and BASH_FUNC_myfunc%%
# Duplicate vars as BASH functions to include
# post-shellshock functions (v1+v2)
# So --env myfunc should look for
# BASH_FUNC_myfunc() and BASH_FUNC_myfunc%%
push(@vars, "PARALLEL_PID", "PARALLEL_SEQ",
map { ("BASH_FUNC_$_()", "BASH_FUNC_$_%%") } @vars);
# Keep only defined variables
@ -8244,17 +8264,19 @@ sub sshlogin_wrap {
if(@bashfunc) {
# Functions are not supported for all shells
if($Global::shell !~ m:(bash|rbash|zsh|rzsh|dash|ksh):) {
::warning("Shell functions may not be supported in $Global::shell.");
::warning("Shell functions may not be supported in ".
"$Global::shell.");
}
$bashfuncset =
'@bash_functions=qw('."@bash_functions".");".
::spacefree(1,'$shell="'.($ENV{'PARALLEL_SHELL'} || '$ENV{SHELL}').'";'.q{
::spacefree(1,'$shell="'.($ENV{'PARALLEL_SHELL'} ||
'$ENV{SHELL}').'";'.q{
if($shell=~/csh/) {
print STDERR "CSH/TCSH DO NOT SUPPORT newlines IN VARIABLES/FUNCTIONS. Unset @bash_functions\n";
exec "false";
}
}).
"\n".'$bashfunc = "'.::perl_quote_scalar("@bashfunc").'";';
"\n".'$bashfunc = "'.::pQ("@bashfunc").'";';
} else {
$bashfuncset = '$bashfunc = "";'
}
@ -8286,11 +8308,13 @@ sub sshlogin_wrap {
$command = "cd ".::Q($wd)." || exit 255; ".$command;
}
if(@opt::env) {
# Prepend with environment setter, which sets functions in zsh
# Prepend with environment setter,
# which sets functions in zsh
my ($csh_friendly,$envset,$bashfuncset) = env_as_eval();
my $perl_code = $envset.$bashfuncset.
'@ARGV="'.::perl_quote_scalar($command).'";'.
"exec\"$Global::shell\",\"-c\",\(\$bashfunc.\"\@ARGV\"\)\;die\"exec:\$\!\\n\"\;";
('@ARGV="'.::pQ($command).'";'.
'exec"'.$Global::shell.'",'.
'"-c",($bashfunc."@ARGV");die"exec:$!\n";');
if(length $perl_code > 999
or
not $csh_friendly
@ -8300,7 +8324,7 @@ sub sshlogin_wrap {
# csh does not deal well with $ENV with \n
$self->{'sshlogin_wrap'} = base64_wrap($perl_code);
} else {
$self->{'sshlogin_wrap'} = "perl -e ".::shell_quote_scalar($perl_code);
$self->{'sshlogin_wrap'} = "perl -e ".::Q($perl_code);
}
} else {
$self->{'sshlogin_wrap'} = $command;
@ -8311,16 +8335,14 @@ sub sshlogin_wrap {
# Create remote workdir if needed. Then cd to it.
my $wd = ::pQ($self->workdir());
$pwd = qq{system("mkdir","-p","--","$wd"); chdir "$wd" ||}.
qq{print(STDERR "parallel: Cannot chdir to $wd\\n") && exit 255;};
qq{print(STDERR "parallel: Cannot chdir to $wd\\n") &&}.
qq{exit 255;};
}
my ($csh_friendly,$envset,$bashfuncset) = env_as_eval();
my $remote_command = $pwd.$envset.$bashfuncset.
'@ARGV="'.::perl_quote_scalar($command).'";'.
monitor_parent_sshd_script();
$quoted_remote_command = "perl -e ".
::shell_quote_scalar($remote_command);
my $dq_remote_command =
::shell_quote_scalar($quoted_remote_command);
'@ARGV="'.::pQ($command).'";'. monitor_parent_sshd_script();
$quoted_remote_command = "perl -e ". ::Q($remote_command);
my $dq_remote_command = ::Q($quoted_remote_command);
if(length $dq_remote_command > 999
or
not $csh_friendly
@ -8329,8 +8351,7 @@ sub sshlogin_wrap {
# csh does not deal well with > 1000 chars in one word
# csh does not deal well with $ENV with \n
$quoted_remote_command =
"perl -e ".
::shell_quote_scalar(::shell_quote_scalar(base64_zip_eval()))." ".
"perl -e ". ::Q(::Q(base64_zip_eval()))." ".
join" ", ::shell_quote(::shell_quote(string_zip_base64($remote_command)));
} else {
$quoted_remote_command = $dq_remote_command;
@ -8445,18 +8466,20 @@ sub sshreturn {
my $sshlogin = $self->sshlogin();
my $sshcmd = $sshlogin->sshcommand();
my $serverlogin = $sshlogin->serverlogin();
my $rsync_opts = $ENV{'PARALLEL_RSYNC_OPTS'}.
" -e".::shell_quote_scalar($sshcmd);
my $rsync_opts = $ENV{'PARALLEL_RSYNC_OPTS'}." -e".::Q($sshcmd);
my $pre = "";
for my $file ($self->return()) {
$file =~ s:^\./::g; # Remove ./ if any
my $relpath = ($file !~ m:^/:) || ($file =~ m:/\./:); # Is the path relative or /./?
# Remove ./ if any
$file =~ s:^\./::g;
# Is the path relative or /./?
my $relpath = ($file !~ m:^/:) || ($file =~ m:/\./:);
my $cd = "";
my $wd = "";
if($relpath) {
# rsync -avR /foo/./bar/baz.c remote:/tmp/
# == (on old systems)
# rsync -avR --rsync-path="cd /foo; rsync" remote:bar/baz.c /tmp/
# rsync -avR --rsync-path="cd /foo; rsync"
# remote:bar/baz.c /tmp/
$wd = ::shell_quote_file($self->workdir()."/");
}
# Only load File::Basename if actually needed
@ -8467,7 +8490,7 @@ sub sshreturn {
my $nobasedir = $file;
$nobasedir =~ s:.*/\./::;
$cd = ::shell_quote_file(::dirname($nobasedir));
my $rsync_cd = '--rsync-path='.::shell_quote_scalar("cd $wd$cd; rsync");
my $rsync_cd = '--rsync-path='.::Q("cd $wd$cd; rsync");
my $basename =
::shell_quote_scalar(::shell_quote_file(::basename($file)));
# --return
@ -8494,11 +8517,12 @@ sub sshcleanup {
my $cleancmd = "";
for my $file ($self->remote_cleanup()) {
# TODO @subworkdirs not used?
my @subworkdirs = parentdirs_of($file);
$cleancmd .= $sshlogin->cleanup_cmd($file,$workdir).";";
}
if(defined $opt::workdir and $opt::workdir eq "...") {
$cleancmd .= "$sshcmd $serverlogin -- rm -rf " . ::shell_quote_scalar($workdir).';';
$cleancmd .= "$sshcmd $serverlogin -- rm -rf " . ::Q($workdir).';';
}
return $cleancmd;
}
@ -8542,7 +8566,9 @@ sub workdir {
$part eq "" and next;
$parent .= "/".$part;
my ($parent_dev, $parent_ino) = (stat($parent))[0,1];
if($parent_dev == $home_dev and $parent_ino == $home_ino) {
if($parent_dev == $home_dev
and
$parent_ino == $home_ino) {
# dev and ino is the same: We found the homedir.
$workdir = join("/",@dir_parts);
last;
@ -8607,7 +8633,8 @@ sub start {
# The eval is needed to catch exception from open3
eval {
if(not $pid = ::open3($stdin_fh, ">&OUT", ">&ERR", "-")) {
# Each child gets its own process group to make it safe to killall
# Each child gets its own process group
# to make it safe to killall
eval{ setpgrp(0,0) };
eval{ setpriority(0,0,$opt::nice) };
exec($Global::shell,"-c",$command)
@ -8639,7 +8666,8 @@ sub start {
"exec '$Global::shell', '-c', \@ARGV");
# The eval is needed to catch exception from open3
eval {
$pid = ::open3($stdin_fh, ">&OUT", ">&ERR", @setpgrp_wrap, $command)
$pid = ::open3($stdin_fh, ">&OUT", ">&ERR",
@setpgrp_wrap, $command)
|| ::die_bug("open3-$stdin_fh");
1;
};
@ -8813,7 +8841,7 @@ sub interactive_start {
# ; causes problems
# ascii 194-245 annoys tmux
$title =~ tr/[\011-\016;\302-\365]/ /s;
$title = ::shell_quote_scalar($title);
$title = ::Q($title);
my $l_act = length($actual_command);
my $l_tit = length($title);
@ -8821,7 +8849,7 @@ sub interactive_start {
# The line to run contains a 118 chars extra code + the title 2x
my $l_tot = 2 * $l_tit + $l_act + $l_fifo;
my $quoted_space75 = ::shell_quote_scalar(" ")x75;
my $quoted_space75 = ::Q(" ")x75;
while($l_tit < 1000 and
(
(890 < $l_tot and $l_tot < 1350)
@ -8982,7 +9010,7 @@ sub print {
if($Global::joblog and defined $self->{'exitstatus'}) {
# Add to joblog when finished
$self->print_joblog();
# Printing is only relevant for grouped/--line-buffer output.
# Printing is only relevant for grouped/--line-buffer output
$opt::ungroup and return;
}
}
@ -9003,7 +9031,8 @@ sub print {
}
next;
}
::debug("print", "File descriptor $fdno (", $self->fh($fdno,"name"), "):\n");
::debug("print",
"File descriptor $fdno (", $self->fh($fdno,"name"), "):\n");
if($opt::linebuffer) {
# Line buffered print out
$self->print_linebuffer($fdno,$in_fh,$out_fd);
@ -9064,7 +9093,8 @@ sub print {
print $Global::csv_fh
(map { $$_ }
combine_ref("Seq", "Host", "Starttime", "JobRuntime",
"Send", "Receive", "Exitval", "Signal", "Command",
"Send", "Receive", "Exitval", "Signal",
"Command",
@V,
"Stdout","Stderr"
)),"\n";
@ -9470,7 +9500,8 @@ sub set_exitsignal {
# --halt now,fail=X% or soon,fail=X%
# --halt now,done=X% or soon,done=X%
$Global::halt_exitstatus =
::ceil($Global::total_failed / $total_jobs * 100);
::ceil($Global::total_failed / $total_jobs
* 100);
} elsif($Global::halt_count) {
# --halt now,fail=X or soon,fail=X
# --halt now,done=X or soon,done=X
@ -9484,7 +9515,8 @@ sub set_exitsignal {
$Global::halt_exitstatus =
($job->exitstatus()
or
$job->exitsignal() ? $job->exitsignal() + 128 : 0);
$job->exitsignal() ?
$job->exitsignal() + 128 : 0);
}
}
::debug("halt","Pct: ",$Global::halt_pct,
@ -9634,7 +9666,8 @@ sub slot {
}
if(defined $self->{'max_number_of_args'}) {
if($self->number_of_args() >= $self->{'max_number_of_args'}) {
if($self->number_of_args() >=
$self->{'max_number_of_args'}) {
last;
}
}
@ -9646,11 +9679,13 @@ sub slot {
$already_spread ||= 1;
if($self->number_of_args() > 1) {
$self->{'max_number_of_args'} =
::ceil($self->number_of_args()/$Global::max_jobs_running);
::ceil($self->number_of_args() /
$Global::max_jobs_running);
$Global::JobQueue->{'commandlinequeue'}->{'max_number_of_args'} =
$self->{'max_number_of_args'};
$self->{'arg_queue'}->unget($self->pop_all());
while($self->number_of_args() < $self->{'max_number_of_args'}) {
while($self->number_of_args() <
$self->{'max_number_of_args'}) {
$self->push($self->{'arg_queue'}->get());
}
}
@ -9658,7 +9693,8 @@ sub slot {
}
if($opt::sqlmaster) {
# Insert the V1..Vn for this $seq in SQL table instead of generating one
# Insert the V1..Vn for this $seq in SQL table
# instead of generating one
$Global::sql->insert_records($self->seq(), $self->{'command'},
$self->{'arg_list_flat_orig'});
}

View file

@ -1052,7 +1052,7 @@ to see the difference:
If used with B<--onall> or B<--nonall> the output will grouped by
sshlogin in sorted order.
If used with B<--pipe --roundrobin> and the same input, the jobslots
If used with B<--pipe --round-robin> and the same input, the jobslots
will get the same blocks in the same order in every run.
@ -1458,20 +1458,20 @@ on remote computers).
Print the number of physical CPU cores and exit.
=item B<--number-of-cores> (alpha testing)
=item B<--number-of-cores> (beta testing)
Print the number of physical CPU cores and exit (used by GNU B<parallel> itself
to determine the number of physical CPU cores on remote computers).
=item B<--number-of-sockets> (alpha testing)
=item B<--number-of-sockets> (beta testing)
Print the number of filled CPU sockets and exit (used by GNU
B<parallel> itself to determine the number of filled CPU sockets on
remote computers).
=item B<--number-of-threads> (alpha testing)
=item B<--number-of-threads> (beta testing)
Print the number of hyperthreaded CPU cores and exit (used by GNU
B<parallel> itself to determine the number of hyperthreaded CPU cores
@ -1617,9 +1617,9 @@ it to the command.
Only used with B<--pipe>.
=item B<--results> I<name> (beta testing)
=item B<--results> I<name>
=item B<--res> I<name> (beta testing)
=item B<--res> I<name>
Save the output into files.
@ -1883,6 +1883,8 @@ with B<--sshlogin>.
=item B<--round>
=item B<--rr>
Normally B<--pipe> will give a single block to each instance of the
command. With B<--round-robin> all blocks will at random be written to
commands already running. This is useful if the command takes a long
@ -2056,7 +2058,7 @@ Use the replacement string I<replace-str> instead of B<{#}> for
job sequence number.
=item B<--session> (beta testing)
=item B<--session>
Record names in current environment in B<$PARALLEL_IGNORED_NAMES> and
exit. Only used with B<env_parallel>. Aliases, functions, and
@ -2569,9 +2571,9 @@ Use the replacement string I<replace-str> instead of B<{.}> for input
line without extension.
=item B<--use-sockets-instead-of-threads> (alpha testing)
=item B<--use-sockets-instead-of-threads> (beta testing)
=item B<--use-cores-instead-of-threads> (alpha testing)
=item B<--use-cores-instead-of-threads> (beta testing)
=item B<--use-cpus-instead-of-cores> (obsolete)

View file

@ -576,7 +576,7 @@ $Global::Initfile && unlink $Global::Initfile;
exit ($err);
sub parse_options {
$Global::version = 20180722;
$Global::version = 20180723;
$Global::progname = 'sql';
# This must be done first as this may exec myself

View file

@ -148,5 +148,5 @@ export -f $(compgen -A function | egrep 'p_|par_')
# Tested that -j0 in parallel is fastest (up to 15 jobs)
# more than 3 jobs: sqlite locks
compgen -A function | grep par_ | sort |
stdout parallel --timeout 200 -vj3 -k --tag --joblog /tmp/jl-`basename $0` p_wrapper \
stdout parallel --timeout 250 -vj40% -k --tag --joblog /tmp/jl-`basename $0` p_wrapper \
:::: - ::: \$MYSQL \$PG \$SQLITE

View file

@ -4,7 +4,7 @@ par_pipe_retries 165668 165668 1048571
par_pipe_retries localhost-:
par_pipe_retries 134362 134362 940534
par_pipe_retries localhost-:
par_pipe_retries parallel: Error: --retries cannot be combined with --roundrobin.
par_pipe_retries parallel: Error: --retries cannot be combined with --round-robin.
par_lsh ### --ssh lsh
par_lsh parallel: Warning: Could not figure out number of cpus on lo (). Using 1.
par_lsh OK