parallel: Deal with {} as part of the command (not arg for the command).

This commit is contained in:
Ole Tange 2014-12-12 13:30:54 +01:00
parent 9493541236
commit 163712f709
4 changed files with 141 additions and 41 deletions

View file

@ -236,8 +236,12 @@ Haiku of the month:
New in this release: New in this release:
* A semibig refactoring of big functions. All non-trivial functions are now less than 100 lines. The refactoring makes this release beta quality.
* GNU Parallel was cited in: Parallel post-processing with MPI-Bash http://dl.acm.org/citation.cfm?id=2691137 * GNU Parallel was cited in: Parallel post-processing with MPI-Bash http://dl.acm.org/citation.cfm?id=2691137
* GNU Parallel was cited in: Distinguishing cause from effect using observational data: methods and benchmarks http://arxiv-web3.library.cornell.edu/pdf/1412.3773.pdf
* GNU Parallel: Open Source For You (OSFY) magazine, October 2013 edition http://www.shakthimaan.com/posts/2014/11/27/gnu-parallel/news.html * GNU Parallel: Open Source For You (OSFY) magazine, October 2013 edition http://www.shakthimaan.com/posts/2014/11/27/gnu-parallel/news.html
* コマンドを並列に実行するGNU parallelがとても便利 http://bicycle1885.hatenablog.com/entry/2014/08/10/143612 * コマンドを並列に実行するGNU parallelがとても便利 http://bicycle1885.hatenablog.com/entry/2014/08/10/143612

View file

@ -2186,17 +2186,23 @@ sub progress {
} }
sub get_job_with_sshlogin { sub get_job_with_sshlogin {
# Input:
# $sshlogin = which host should the job be run on?
# Uses:
# $opt::hostgroups
# $Global::JobQueue
# Returns: # Returns:
# next job object for $sshlogin if any available # $job = next job object for $sshlogin if any available
my $sshlogin = shift; my $sshlogin = shift;
my $job = undef; my $job;
if ($opt::hostgroups) { if ($opt::hostgroups) {
my @other_hostgroup_jobs = (); my @other_hostgroup_jobs = ();
while($job = $Global::JobQueue->get()) { while($job = $Global::JobQueue->get()) {
if($sshlogin->in_hostgroups($job->hostgroups())) { if($sshlogin->in_hostgroups($job->hostgroups())) {
# Found a job for this hostgroup # Found a job to be run on a hostgroup of this
# $sshlogin
last; last;
} else { } else {
# This job was not in the hostgroups of $sshlogin # This job was not in the hostgroups of $sshlogin
@ -2256,6 +2262,9 @@ sub get_job_with_sshlogin {
sub __REMOTE_SSH__ {} sub __REMOTE_SSH__ {}
sub read_sshloginfiles { sub read_sshloginfiles {
# Read a list of --slf's
# Input:
# @files = files or symbolic file names to read
# Returns: N/A # Returns: N/A
for my $s (@_) { for my $s (@_) {
read_sshloginfile(expand_slf_shorthand($s)); read_sshloginfile(expand_slf_shorthand($s));
@ -2263,6 +2272,11 @@ sub read_sshloginfiles {
} }
sub expand_slf_shorthand { sub expand_slf_shorthand {
# Expand --slf shorthand into a read file name
# Input:
# $file = file or symbolic file name to read
# Returns:
# $file = actual file name to read
my $file = shift; my $file = shift;
if($file eq "-") { if($file eq "-") {
# skip: It is stdin # skip: It is stdin
@ -2283,6 +2297,11 @@ sub expand_slf_shorthand {
} }
sub read_sshloginfile { sub read_sshloginfile {
# Read sshloginfile into @Global::sshlogin
# Input:
# $file = file to read
# Uses:
# @Global::sshlogin
# Returns: N/A # Returns: N/A
my $file = shift; my $file = shift;
my $close = 1; my $close = 1;
@ -2310,6 +2329,17 @@ sub read_sshloginfile {
} }
sub parse_sshlogin { sub parse_sshlogin {
# Parse @Global::sshlogin into %Global::host.
# Keep only hosts that are in one of the given ssh hostgroups.
# Uses:
# @Global::sshlogin
# $Global::minimal_command_line_length
# %Global::host
# $opt::transfer
# @opt::return
# $opt::cleanup
# @opt::basefile
# @opt::trc
# Returns: N/A # Returns: N/A
my @login; my @login;
if(not @Global::sshlogin) { @Global::sshlogin = (":"); } if(not @Global::sshlogin) { @Global::sshlogin = (":"); }
@ -2386,6 +2416,8 @@ sub parse_sshlogin {
sub remote_hosts { sub remote_hosts {
# Return sshlogins that are not ':' # Return sshlogins that are not ':'
# Uses:
# %Global::host
# Returns: # Returns:
# list of sshlogins with ':' removed # list of sshlogins with ':' removed
return grep !/^:$/, keys %Global::host; return grep !/^:$/, keys %Global::host;
@ -2394,6 +2426,9 @@ sub remote_hosts {
sub setup_basefile { sub setup_basefile {
# Transfer basefiles to each $sshlogin # Transfer basefiles to each $sshlogin
# This needs to be done before first jobs on $sshlogin is run # This needs to be done before first jobs on $sshlogin is run
# Uses:
# %Global::host
# @opt::basefile
# Returns: N/A # Returns: N/A
my $cmd = ""; my $cmd = "";
my $rsync_destdir; my $rsync_destdir;
@ -2416,6 +2451,9 @@ sub setup_basefile {
sub cleanup_basefile { sub cleanup_basefile {
# Remove the basefiles transferred # Remove the basefiles transferred
# Uses:
# %Global::host
# @opt::basefile
# Returns: N/A # Returns: N/A
my $cmd=""; my $cmd="";
my $workdir = Job->new("")->workdir(); my $workdir = Job->new("")->workdir();
@ -2431,9 +2469,58 @@ sub cleanup_basefile {
} }
sub filter_hosts { sub filter_hosts {
# Remove down --sshlogins from active duty.
# Find ncpus, ncores, maxlen, time-to-login for each host.
# Uses:
# %Global::host
# $Global::minimal_command_line_length
# $opt::use_cpus_instead_of_cores
# Returns:
# N/A
my (%ncores, %ncpus, %time_to_login, %maxlen, %echo, @down_hosts); my (%ncores, %ncpus, %time_to_login, %maxlen, %echo, @down_hosts);
for (parallelized_host_filtering()) { my ($ncores_ref, $ncpus_ref, $time_to_login_ref, $maxlen_ref,
$echo_ref, $down_hosts_ref) =
parse_host_filtering(parallelized_host_filtering());
%ncores = %$ncores_ref;
%ncpus = %$ncpus_ref;
%time_to_login = %$time_to_login_ref;
%maxlen = %$maxlen_ref;
%echo = %$echo_ref;
@down_hosts = @$down_hosts_ref;
delete @Global::host{@down_hosts};
@down_hosts and ::warning("Removed @down_hosts\n");
$Global::minimal_command_line_length = 8_000_000;
while (my ($sshlogin, $obj) = each %Global::host) {
if($sshlogin eq ":") { next }
$ncpus{$sshlogin} or ::die_bug("ncpus missing: ".$obj->serverlogin());
$ncores{$sshlogin} or ::die_bug("ncores missing: ".$obj->serverlogin());
$time_to_login{$sshlogin} or ::die_bug("time_to_login missing: ".$obj->serverlogin());
$maxlen{$sshlogin} or ::die_bug("maxlen missing: ".$obj->serverlogin());
if($opt::use_cpus_instead_of_cores) {
$obj->set_ncpus($ncpus{$sshlogin});
} else {
$obj->set_ncpus($ncores{$sshlogin});
}
$obj->set_time_to_login($time_to_login{$sshlogin});
$obj->set_maxlength($maxlen{$sshlogin});
$Global::minimal_command_line_length =
::min($Global::minimal_command_line_length,
int($maxlen{$sshlogin}/2));
::debug("init", "Timing from -S:$sshlogin ncpus:",$ncpus{$sshlogin},
" ncores:", $ncores{$sshlogin},
" time_to_login:", $time_to_login{$sshlogin},
" maxlen:", $maxlen{$sshlogin},
" min_max_len:", $Global::minimal_command_line_length,"\n");
}
}
sub parse_host_filtering {
my (%ncores, %ncpus, %time_to_login, %maxlen, %echo, @down_hosts);
for (@_) {
chomp; chomp;
my @col = split /\t/, $_; my @col = split /\t/, $_;
if(defined $col[6]) { if(defined $col[6]) {
@ -2455,7 +2542,6 @@ sub filter_hosts {
# Remove sshlogin # Remove sshlogin
::debug("init", "--filtered $host\n"); ::debug("init", "--filtered $host\n");
push(@down_hosts, $host); push(@down_hosts, $host);
@down_hosts = uniq(@down_hosts);
} elsif($col[6] eq "127") { } elsif($col[6] eq "127") {
# signal == 127: parallel not installed remote # signal == 127: parallel not installed remote
# Set ncpus and ncores = 1 # Set ncpus and ncores = 1
@ -2501,31 +2587,8 @@ sub filter_hosts {
::die_bug("host check unmatched short jobline ($col[0]): $_"); ::die_bug("host check unmatched short jobline ($col[0]): $_");
} }
} }
delete @Global::host{@down_hosts}; @down_hosts = uniq(@down_hosts);
@down_hosts and ::warning("Removed @down_hosts\n"); return(\%ncores, \%ncpus, \%time_to_login, \%maxlen, \%echo, \@down_hosts);
$Global::minimal_command_line_length = 8_000_000;
while (my ($sshlogin, $obj) = each %Global::host) {
if($sshlogin eq ":") { next }
$ncpus{$sshlogin} or ::die_bug("ncpus missing: ".$obj->serverlogin());
$ncores{$sshlogin} or ::die_bug("ncores missing: ".$obj->serverlogin());
$time_to_login{$sshlogin} or ::die_bug("time_to_login missing: ".$obj->serverlogin());
$maxlen{$sshlogin} or ::die_bug("maxlen missing: ".$obj->serverlogin());
if($opt::use_cpus_instead_of_cores) {
$obj->set_ncpus($ncpus{$sshlogin});
} else {
$obj->set_ncpus($ncores{$sshlogin});
}
$obj->set_time_to_login($time_to_login{$sshlogin});
$obj->set_maxlength($maxlen{$sshlogin});
$Global::minimal_command_line_length =
::min($Global::minimal_command_line_length,
int($maxlen{$sshlogin}/2));
::debug("init", "Timing from -S:$sshlogin ncpus:",$ncpus{$sshlogin},
" ncores:", $ncores{$sshlogin},
" time_to_login:", $time_to_login{$sshlogin},
" maxlen:", $maxlen{$sshlogin},
" min_max_len:", $Global::minimal_command_line_length,"\n");
}
} }
sub parallelized_host_filtering { sub parallelized_host_filtering {
@ -6997,6 +7060,7 @@ sub new {
} }
# Replace replacement strings with {= perl expr =} # Replace replacement strings with {= perl expr =}
@command = merge_rpl_parts(@command); @command = merge_rpl_parts(@command);
# Protect matching inside {= perl expr =} # Protect matching inside {= perl expr =}
# by replacing {= and =} with \257< and \257> # by replacing {= and =} with \257< and \257>
for(@command) { for(@command) {
@ -7037,8 +7101,15 @@ sub new {
} }
} }
} }
# Add {} if no replacement strings in @command
my($replacecount_ref, $len_ref, @command) = my($replacecount_ref, $len_ref, @command) =
replacement_counts_and_lengths(@command); replacement_counts_and_lengths(@command);
if("@command" =~ /^\S*\257</) {
# Replacement string is (part of) the command (and not just argument)
# E.g. parallel {}, parallel my_{= s/_//=}, parallel {2}
# Do no quote (Otherwise it will fail if the input contains spaces)
$Global::noquote = 1;
}
return bless { return bless {
'unget' => \@unget, 'unget' => \@unget,
@ -7093,7 +7164,7 @@ sub replacement_counts_and_lengths {
# Count the number of different replacement strings. # Count the number of different replacement strings.
# Find the lengths of context for context groups and non-context # Find the lengths of context for context groups and non-context
# groups. # groups.
# If no {} found: add it to the @command # If no {} found in @command: add it to @command
# #
# Input: # Input:
# @command = command template # @command = command template
@ -7111,8 +7182,8 @@ sub replacement_counts_and_lengths {
for my $c (@cmd) { for my $c (@cmd) {
while($c =~ s/ \257<([^\257]*?)\257> /\000/x) { while($c =~ s/ \257<([^\257]*?)\257> /\000/x) {
# %replacecount = { "perlexpr" => number of times seen } # %replacecount = { "perlexpr" => number of times seen }
# e.g { "$_++" => 2 } # e.g { "s/a/b/" => 2 }
$replacecount{$1} ++; $replacecount{$1}++;
$sum++; $sum++;
} }
# Measure the length of the context around the {= perl expr =} # Measure the length of the context around the {= perl expr =}
@ -7133,13 +7204,13 @@ sub replacement_counts_and_lengths {
# %replacecount = { "perlexpr" => number of times seen } # %replacecount = { "perlexpr" => number of times seen }
# e.g { "$_++" => 2 } # e.g { "$_++" => 2 }
# But for tagstring we just need to mark it as seen # But for tagstring we just need to mark it as seen
$replacecount{$1}||=1; $replacecount{$1} ||= 1;
} }
} }
if($opt::bar) { if($opt::bar) {
# If the command does not contain {} force it to be computed # If the command does not contain {} force it to be computed
# as it is being used by --bar # as it is being used by --bar
$replacecount{""}||=1; $replacecount{""} ||= 1;
} }
$len{'context'} = 0+$contextlen; $len{'context'} = 0+$contextlen;
@ -7150,11 +7221,9 @@ sub replacement_counts_and_lengths {
" Non: ", $len{'noncontext'}, " Ctxgrp: ", $len{'contextgroups'}, " Non: ", $len{'noncontext'}, " Ctxgrp: ", $len{'contextgroups'},
" NonCtxGrp: ", $len{'noncontextgroups'}, "\n"); " NonCtxGrp: ", $len{'noncontextgroups'}, "\n");
if($sum == 0) { if($sum == 0) {
# Default command = {}
# If not replacement string: append {}
if(not @command) { if(not @command) {
# Default command = {}
@command = ("\257<\257>"); @command = ("\257<\257>");
$Global::noquote = 1;
} elsif(($opt::pipe or $opt::pipepart) } elsif(($opt::pipe or $opt::pipepart)
and not $opt::fifo and not $opt::cat) { and not $opt::fifo and not $opt::cat) {
# With --pipe / --pipe-part you can have no replacement # With --pipe / --pipe-part you can have no replacement

View file

@ -30,7 +30,7 @@ echo '### Test bug #43376: {%} and {#} with --pipe'
echo '**' echo '**'
echo '### {= and =} in different groups' echo '### {= and =} in different groups separated by space'
parallel echo {= s/a/b/ =} ::: a parallel echo {= s/a/b/ =} ::: a
parallel echo {= s/a/b/=} ::: a parallel echo {= s/a/b/=} ::: a
parallel echo {= s/a/b/=}{= s/a/b/=} ::: a parallel echo {= s/a/b/=}{= s/a/b/=} ::: a
@ -41,4 +41,15 @@ echo '### {= and =} in different groups'
parallel echo {={= =} ::: a parallel echo {={= =} ::: a
parallel echo {= {= =} ::: a parallel echo {= {= =} ::: a
parallel echo {= {= =} =} ::: a parallel echo {= {= =} =} ::: a
echo '**'
echo '### {} as part of the command'
echo p /bin/ls | parallel l{= s/p/s/ =}
echo /bin/ls-p | parallel --colsep '-' l{=2 s/p/s/ =} {1}
echo s /bin/ls | parallel l{}
echo /bin/ls | parallel ls {}
echo ls /bin/ls | parallel {}
echo ls /bin/ls | parallel
EOF EOF

View file

@ -26,8 +26,8 @@ echo '### Test bug #43376: {%} and {#} with --pipe'
1 1
echo '**' echo '**'
** **
echo '### {= and =} in different groups' echo '### {= and =} in different groups separated by space'
### {= and =} in different groups ### {= and =} in different groups separated by space
parallel echo {= s/a/b/ =} ::: a parallel echo {= s/a/b/ =} ::: a
b b
parallel echo {= s/a/b/=} ::: a parallel echo {= s/a/b/=} ::: a
@ -48,3 +48,19 @@ b {=a
{= a {= a
parallel echo {= {= =} =} ::: a parallel echo {= {= =} =} ::: a
{= a =} {= a =}
echo '**'
**
echo '### {} as part of the command'
### {} as part of the command
echo p /bin/ls | parallel l{= s/p/s/ =}
/bin/ls
echo /bin/ls-p | parallel --colsep '-' l{=2 s/p/s/ =} {1}
/bin/ls
echo s /bin/ls | parallel l{}
/bin/ls
echo /bin/ls | parallel ls {}
/bin/ls
echo ls /bin/ls | parallel {}
/bin/ls
echo ls /bin/ls | parallel
/bin/ls