parallel: -l 0 -L 0 -n 0, and -N 0 implemented. Passes some tests. Man page missing.

This commit is contained in:
Ole Tange 2011-02-23 16:01:18 +01:00
parent 1446047579
commit ff48e14301
6 changed files with 96 additions and 17 deletions

View file

@ -526,3 +526,51 @@ parallel -a table_file.tsv --colsep '\t' mycmd -o {2} {3} -i {1}
# Run traceroute in parallel, but keep the output order the same
parallel -k traceroute ::: foss.org.my debian.org freenetproject.org
Test of signal passing through ssh
#!/bin/bash
SERVER1=parallel-server3
SERVER2=parallel-server2
export BG_PROC
start_remote_sleep() {
parallel -D -u -S parallel@$SERVER2 sleep ::: 370 &
BG_PROC=$!
while ! ssh parallel@$SERVER2 ps -A -o cmd | grep -q '^sleep 370' ; do
sleep 0.3
done
}
stop_local_parallel() {
kill -9 $BG_PROC
}
check_and_stop_remote_sleep() {
ssh parallel@$SERVER2 ps -A -o cmd | grep '^sleep 370'
ssh parallel@$SERVER2 killall sleep
}
echo '### Test kill signals'
start_remote_sleep 2>/dev/null
kill -1 $BG_PROC
check_and_stop_remote_sleep
sub propagate_signal {
my $signal = shift;
# $signal = "KILL";
::debug("Sending $signal to ",keys %Global::running);
kill $signal, keys %Global::running;
if(defined $Global::original_sig{$signal}) {
&{$Global::original_sig{$signal}};
}
}
my %do_not_propagate = map { $_ => 1 } qw(TTOU TTIN CONT TSTP __WARN__ __DIE__);
for (keys %SIG) {
$do_not_propagate{$_} and next;
$SIG{$_} = eval 'sub { propagate_signal("'.$_.'"); };';
}

View file

@ -11,7 +11,8 @@ use strict;
use Carp;
$::oodebug=0;
$Global::original_sigterm = $SIG{TERM} || sub { exit 0; }; # $SIG{TERM} is not set on Mac OS X
$SIG{TERM} ||= sub { exit 0; }; # $SIG{TERM} is not set on Mac OS X
%Global::original_sig = %SIG;
$SIG{TERM} = sub {}; # Dummy until jobs really start
open $Global::original_stderr, ">&STDERR" or die "Can't dup STDERR: $!";
@ -428,8 +429,8 @@ sub parse_options {
$Global::replace{'{/.}'} = $::opt_basenameextensionreplace;
}
if(defined $::opt_E and $::opt_E) { $Global::end_of_file_string = $::opt_E; }
if(defined $::opt_n and $::opt_n) { $Global::max_number_of_args = $::opt_n; }
if(defined $::opt_N and $::opt_N) { $Global::max_number_of_args = $::opt_N; }
if(defined $::opt_n) { $Global::max_number_of_args = $::opt_n; }
if(defined $::opt_N) { $Global::max_number_of_args = $::opt_N; }
if(defined $::opt_tmpdir) { $ENV{'TMPDIR'} = $::opt_tmpdir; }
if(defined $::opt_help) { die_usage(); }
if(defined $::opt_colsep) { $Global::trim = 'lr'; }
@ -483,10 +484,15 @@ sub parse_options {
$Global::arg_sep = "--";
}
}
if(defined $::opt_L and $::opt_L or defined $::opt_l) {
$Global::max_lines = $::opt_l || $::opt_L || 1;
for my $nlines ($::opt_L, $::opt_l) {
if(defined $nlines) {
if($nlines eq "") {
$nlines = 1;
}
$Global::max_lines = $nlines;
$Global::max_number_of_args ||= $Global::max_lines;
}
}
%Global::replace_rev = reverse %Global::replace;
if(grep /^$Global::arg_sep$/o, @ARGV) {
@ -1360,7 +1366,7 @@ sub list_running_jobs {
sub start_no_new_jobs {
# Returns: N/A
$SIG{TERM} = $Global::original_sigterm;
$SIG{TERM} = $Global::original_sig{TERM};
print $Global::original_stderr
("$Global::progname: SIGTERM received. No new jobs will be started.\n",
"$Global::progname: Waiting for these ", scalar(keys %Global::running),
@ -1903,7 +1909,7 @@ sub processes_available_by_system_limit {
} elsif(defined $child) {
# The child takes one process slot
# It will be killed later
$SIG{TERM} = $Global::original_sigterm;
$SIG{TERM} = $Global::original_sig{TERM};
sleep 10000000;
exit(0);
} else {
@ -2067,17 +2073,15 @@ sub ncpus {
my $ncpu;
if($::opt_use_cpus_instead_of_cores) {
$ncpu = qx(echo|$sshcmd $serverlogin parallel --number-of-cpus);
chomp($ncpu);
} else {
$ncpu = qx(echo|$sshcmd $serverlogin parallel --number-of-cores);
chomp($ncpu);
}
if($ncpu =~ /^[0-9]+$/) {
if($ncpu =~ /^\s*[0-9]+\s*$/s) {
$self->{'ncpus'} = $ncpu;
} else {
print $Global::original_stderr
("parallel: Warning: Could not figure out ",
"number of cpus on $serverlogin. Using 1\n");
"number of cpus on $serverlogin ($ncpu). Using 1\n");
$self->{'ncpus'} = 1;
}
}
@ -3651,12 +3655,20 @@ sub new {
}
sub get {
# Returns:
# reference to array of Arg-objects
my $self = shift;
if(@{$self->{'unget'}}) {
return shift @{$self->{'unget'}};
}
$self->{'arg_number'}++;
return $self->{'arg_sub_queue'}->get();
my $ret = $self->{'arg_sub_queue'}->get();
if(defined $Global::max_number_of_args
and $Global::max_number_of_args == 0) {
return [];
} else {
return $ret;
}
}
sub unget {
@ -3693,6 +3705,8 @@ sub new {
}
sub get {
# Returns:
# reference to array of Arg-objects
my $self = shift;
if(@{$self->{'unget'}}) {
return shift @{$self->{'unget'}};

View file

@ -18,10 +18,10 @@ seq 1 9 | parallel -j2 -k -N 3 --pipe 'cat;echo iiiiiiiii;sleep 0.1'
seq 1 10 | parallel -j2 -k -N 3 --pipe 'cat;echo jjjjjjjjjj;sleep 0.1'
echo '### Test -l -N -L and -n with multiple jobslots and multiple args'
seq 1 5 | parallel -kj2 -l 2 --pipe cat \; echo a
seq 1 5 | parallel -kj2 -N 2 --pipe cat \; echo b
seq 1 5 | parallel -kj2 -L 2 --pipe cat \; echo c
seq 1 5 | parallel -kj2 -n 2 --pipe cat \; echo d
seq 1 5 | parallel -kj2 -l 2 --pipe "cat; echo a; sleep 0.1"
seq 1 5 | parallel -kj2 -N 2 --pipe "cat; echo b; sleep 0.1"
seq 1 5 | parallel -kj2 -L 2 --pipe "cat; echo c; sleep 0.1"
seq 1 5 | parallel -kj2 -n 2 --pipe "cat; echo d; sleep 0.1"
echo '### Test output is the same for different block size'
echo -n 01a02a0a0a12a34a45a6a |

View file

@ -0,0 +1,7 @@
#!/bin/bash
echo '### Test 0-arguments'
seq 1 2 | parallel -n0 echo n0
seq 1 2 | parallel -L0 echo L0
seq 1 2 | parallel -l0 echo l0
seq 1 2 | parallel -N0 echo N0

View file

@ -110,6 +110,7 @@ c
c
5
c
c
1
2
d

View file

@ -0,0 +1,9 @@
### Test 0-arguments
n0
n0
L0
L0
l0
l0
N0
N0