parallel: --filter implemented.

This commit is contained in:
Ole Tange 2021-02-17 16:04:04 +01:00
parent 0753d4348d
commit c7d42df6c6

View file

@ -56,7 +56,8 @@ sub set_input_source_header($$) {
# Replace {colname} with {2} # Replace {colname} with {2}
for(@$command_ref, @Global::ret_files, for(@$command_ref, @Global::ret_files,
@Global::transfer_files, $opt::tagstring, @Global::transfer_files, $opt::tagstring,
$opt::workdir, $opt::results, $opt::retries) { $opt::workdir, $opt::results, $opt::retries,
@opt::filter) {
# Skip if undefined # Skip if undefined
$_ or next; $_ or next;
s:\{$s(|/|//|\.|/\.)\}:\{$id$1\}:g; s:\{$s(|/|//|\.|/\.)\}:\{$id$1\}:g;
@ -1577,15 +1578,6 @@ sub options_hash() {
"session" => \$opt::session, "session" => \$opt::session,
"plain" => \$opt::plain, "plain" => \$opt::plain,
"profile|J=s" => \@opt::profile, "profile|J=s" => \@opt::profile,
"pipe|spreadstdin" => \$opt::pipe,
"robin|round-robin|roundrobin" => \$opt::roundrobin,
"recstart=s" => \$opt::recstart,
"recend=s" => \$opt::recend,
"regexp|regex" => \$opt::regexp,
"remove-rec-sep|removerecsep|rrs" => \$opt::remove_rec_sep,
"files|output-as-files|outputasfiles" => \$opt::files,
"block|block-size|blocksize=s" => \$opt::blocksize,
"blocktimeout|block-timeout|bt=s" => \$opt::blocktimeout,
"tollef" => \$opt::tollef, "tollef" => \$opt::tollef,
"gnu" => \$opt::gnu, "gnu" => \$opt::gnu,
"link|xapply" => \$opt::link, "link|xapply" => \$opt::link,
@ -1639,6 +1631,16 @@ sub options_hash() {
"Y" => \$opt::retired, "Y" => \$opt::retired,
"skip-first-line" => \$opt::skip_first_line, "skip-first-line" => \$opt::skip_first_line,
"bug" => \$opt::bug, "bug" => \$opt::bug,
# --pipe
"pipe|spreadstdin" => \$opt::pipe,
"robin|round-robin|roundrobin" => \$opt::roundrobin,
"recstart=s" => \$opt::recstart,
"recend=s" => \$opt::recend,
"regexp|regex" => \$opt::regexp,
"remove-rec-sep|removerecsep|rrs" => \$opt::remove_rec_sep,
"files|output-as-files|outputasfiles" => \$opt::files,
"block|block-size|blocksize=s" => \$opt::blocksize,
"blocktimeout|block-timeout|bt=s" => \$opt::blocktimeout,
"header=s" => \$opt::header, "header=s" => \$opt::header,
"cat" => \$opt::cat, "cat" => \$opt::cat,
"fifo" => \$opt::fifo, "fifo" => \$opt::fifo,
@ -1647,8 +1649,10 @@ sub options_hash() {
"shard=s" => \$opt::shard, "shard=s" => \$opt::shard,
"bin=s" => \$opt::bin, "bin=s" => \$opt::bin,
"groupby|group-by=s" => \$opt::groupby, "groupby|group-by=s" => \$opt::groupby,
#
"hgrp|hostgrp|hostgroup|hostgroups" => \$opt::hostgroups, "hgrp|hostgrp|hostgroup|hostgroups" => \$opt::hostgroups,
"embed" => \$opt::embed, "embed" => \$opt::embed,
"filter=s" => \@opt::filter,
); );
} }
@ -2151,7 +2155,7 @@ sub check_invalid_option_combinations() {
sub init_globals() { sub init_globals() {
# Defaults: # Defaults:
$Global::version = 20210122; $Global::version = 20210201;
$Global::progname = 'parallel'; $Global::progname = 'parallel';
$::name = "GNU Parallel"; $::name = "GNU Parallel";
$Global::infinity = 2**31; $Global::infinity = 2**31;
@ -4111,7 +4115,7 @@ sub setup_basefile() {
::wait_and_exit(255); ::wait_and_exit(255);
} }
if(not $workdir) { if(not $workdir) {
my $dummycmdline = CommandLine->new(1,["true"],{},0,0,[],[],{},{},{}); my $dummycmdline = CommandLine->new(1,["true"],{},0,0,[],[],[],[],{},{});
my $dummyjob = Job->new($dummycmdline); my $dummyjob = Job->new($dummycmdline);
$workdir = $dummyjob->workdir(); $workdir = $dummyjob->workdir();
} }
@ -4138,7 +4142,7 @@ sub cleanup_basefile() {
my @cmd; my @cmd;
my $workdir; my $workdir;
if(not $workdir) { if(not $workdir) {
my $dummycmdline = CommandLine->new(1,"true",0,0,0,0,0,{},{},{}); my $dummycmdline = CommandLine->new(1,["true"],{},0,0,[],[],[],[],{},{});
my $dummyjob = Job->new($dummycmdline); my $dummyjob = Job->new($dummycmdline);
$workdir = $dummyjob->workdir(); $workdir = $dummyjob->workdir();
} }
@ -4341,7 +4345,7 @@ sub parse_host_filtering() {
$maxlen{$col[0]} = $col[1]; $maxlen{$col[0]} = $col[1];
} elsif(not $echo{$col[0]}) { } elsif(not $echo{$col[0]}) {
$echo{$col[0]} = $col[1]; $echo{$col[0]} = $col[1];
} elsif(m/perl: warning:|LANGUAGE =|LC_ALL =|LANG =|are supported and installed/) { } elsif(m/perl: warning:|LANGUAGE =|LC_ALL =|LANG =|are supported and installed|Disconnected from|Received disconnect from/) {
# Skip these: # Skip these:
# perl: warning: Setting locale failed. # perl: warning: Setting locale failed.
# perl: warning: Please check that your locale settings: # perl: warning: Please check that your locale settings:
@ -4350,6 +4354,7 @@ sub parse_host_filtering() {
# LANG = "en_US.UTF-8" # LANG = "en_US.UTF-8"
# are supported and installed on your system. # are supported and installed on your system.
# perl: warning: Falling back to the standard locale ("C"). # perl: warning: Falling back to the standard locale ("C").
# Disconnected from 127.0.0.1 port 22
} else { } else {
::die_bug("host check too many col0: $_"); ::die_bug("host check too many col0: $_");
} }
@ -4384,7 +4389,7 @@ sub parallelized_host_filtering() {
# perl -e '$a=`$command`; print $? ? "$default_value" : $a' # perl -e '$a=`$command`; print $? ? "$default_value" : $a'
my $wcmd = q(perl -e '$a=`).$command.q(`;). my $wcmd = q(perl -e '$a=`).$command.q(`;).
q(print $? ? ").::pQ($default_value).q(" : $a'); q(print $? ? ").::pQ($default_value).q(" : $a');
my $commandline = CommandLine->new(1,[$wcmd],{},0,0,[],[],{},{},{}); my $commandline = CommandLine->new(1,[$wcmd],{},0,0,[],[],[],[],{},{});
my $job = Job->new($commandline); my $job = Job->new($commandline);
$job->set_sshlogin($sshlogin); $job->set_sshlogin($sshlogin);
$job->wrapped(); $job->wrapped();
@ -9411,6 +9416,21 @@ sub sshlogin_wrap($) {
return $self->{'sshlogin_wrap'}{$command}; return $self->{'sshlogin_wrap'}{$command};
} }
sub filter($) {
# Replace replacement strings in filter(s) and evaluate them
# Returns:
# $run - 1=yes, undef=no
my $self = shift;
my $run = 1;
if(@opt::filter) {
for my $eval ($self->{'commandline'}->
replace_placeholders(\@opt::filter,0,0)) {
$run &&= eval $eval;
}
}
return $run;
}
sub transfer($) { sub transfer($) {
# Files to transfer # Files to transfer
# Non-quoted and with {...} substituted # Non-quoted and with {...} substituted
@ -9787,8 +9807,10 @@ sub start($) {
} }
# Must be run after $job->interactive_start(): # Must be run after $job->interactive_start():
# $job->interactive_start() may call $job->skip() # $job->interactive_start() may call $job->skip()
if($job->{'commandline'}{'skip'}) { if($job->{'commandline'}{'skip'}
# $job->skip() was called or
not $job->filter()) {
# $job->skip() was called or job filtered
$command = "true"; $command = "true";
} }
$job->openoutputfiles(); $job->openoutputfiles();
@ -11428,7 +11450,8 @@ sub new($) {
# @command, --transferfile, --return, # @command, --transferfile, --return,
# --tagstring, --workdir, --results # --tagstring, --workdir, --results
for(@command, @$transfer_files, @$return_files, for(@command, @$transfer_files, @$return_files,
$opt::tagstring, $opt::workdir, $opt::results, $opt::retries) { $opt::tagstring, $opt::workdir, $opt::results, $opt::retries,
@opt::filter) {
# Skip if undefined # Skip if undefined
$_ or next; $_ or next;
# Escape \257 => \257\256 # Escape \257 => \257\256
@ -11639,6 +11662,7 @@ sub replacement_counts_and_lengths($$@) {
$noncontextlen += length $c; $noncontextlen += length $c;
} }
for(@$transfer_files, @$return_files, for(@$transfer_files, @$return_files,
@opt::filter,
$opt::tagstring, $opt::workdir, $opt::results, $opt::retries) { $opt::tagstring, $opt::workdir, $opt::results, $opt::retries) {
# Options that can contain replacement strings # Options that can contain replacement strings
$_ or next; $_ or next;
@ -13521,7 +13545,7 @@ sub main() {
while(start_more_jobs()) {} while(start_more_jobs()) {}
spreadstdin(); spreadstdin();
} else { } else {
# Reap the finished jobs - start more # Reap the finished jobs and start more
while(reapers() + start_more_jobs()) {} while(reapers() + start_more_jobs()) {}
} }
::debug("init", "Start draining\n"); ::debug("init", "Start draining\n");