parallel: Implemented {#}. Passes testsuite.

This commit is contained in:
Ole Tange 2011-04-08 21:57:19 +02:00
parent a14fabad36
commit 7b4f747877
7 changed files with 137 additions and 54 deletions

View file

@ -1,3 +1,30 @@
Bug: parallel --colsep ')' echo {1}
job->start():
$jobslot = Global::jobslot->$sshlogin
sub get_jobslot {
my $sshlogin = shift;
my $jobslot_id = pop @Global::jobslots{$sshlogin};
if not defined $jobslot_id {
$jobslot_id = ++$Global::max_jobslot_id;
}
return $jobslot_id;
}
sub release_jobslot {
my $sshlogin = shift;
my $jobslot_id = shift;
push @Global::jobslots{$sshlogin}, $jobslot_id;
}
Test sshlogins in parallel. Assume parallel is in path
seq 1 10 | parallel -I {o} 'seq 1 255 | parallel echo ssh -oNoHostAuthenticationForLocalhost=true 127.0.{o}.{}' >/tmp/sshloginfile
seq 1 1000 | parallel --sshloginfile /tmp/sshloginfile echo
ssh -F /tmp/
Example: Example:
Chop mbox into emails Chop mbox into emails
Parallel sort Parallel sort

View file

@ -306,6 +306,7 @@ sub get_options_from_array {
"extensionreplace|U=s" => \$::opt_U, "extensionreplace|U=s" => \$::opt_U,
"basenamereplace|bnr=s" => \$::opt_basenamereplace, "basenamereplace|bnr=s" => \$::opt_basenamereplace,
"basenameextensionreplace=s" => \$::opt_basenameextensionreplace, "basenameextensionreplace=s" => \$::opt_basenameextensionreplace,
"seqreplace=s" => \$::opt_seqreplace,
"jobs|j=s" => \$::opt_P, "jobs|j=s" => \$::opt_P,
"load=f" => \$::opt_load, "load=f" => \$::opt_load,
"max-line-length-allowed" => \$::opt_max_line_length_allowed, "max-line-length-allowed" => \$::opt_max_line_length_allowed,
@ -361,7 +362,7 @@ sub get_options_from_array {
"interactive|p" => \$::opt_p, "interactive|p" => \$::opt_p,
"verbose|t" => \$::opt_verbose, "verbose|t" => \$::opt_verbose,
"version|V" => \$::opt_version, "version|V" => \$::opt_version,
"show-limits" => \$::opt_show_limits, "show-limits|showlimits" => \$::opt_show_limits,
"exit|x" => \$::opt_x, "exit|x" => \$::opt_x,
# Semaphore # Semaphore
"semaphore" => \$::opt_semaphore, "semaphore" => \$::opt_semaphore,
@ -384,7 +385,7 @@ sub get_options_from_array {
sub parse_options { sub parse_options {
# Returns: N/A # Returns: N/A
# Defaults: # Defaults:
$Global::version = 20110324; $Global::version = 20110408;
$Global::progname = 'parallel'; $Global::progname = 'parallel';
$Global::infinity = 2**31; $Global::infinity = 2**31;
$Global::debug = 0; $Global::debug = 0;
@ -396,6 +397,7 @@ sub parse_options {
$Global::replace{'{.}'} = '{.}'; $Global::replace{'{.}'} = '{.}';
$Global::replace{'{/}'} = '{/}'; $Global::replace{'{/}'} = '{/}';
$Global::replace{'{/.}'} = '{/.}'; $Global::replace{'{/.}'} = '{/.}';
$Global::replace{'{#}'} = '{#}';
$/="\n"; $/="\n";
$Global::ignore_empty = 0; $Global::ignore_empty = 0;
$Global::interactive = 0; $Global::interactive = 0;
@ -433,6 +435,9 @@ sub parse_options {
if(defined $::opt_basenameextensionreplace) { if(defined $::opt_basenameextensionreplace) {
$Global::replace{'{/.}'} = $::opt_basenameextensionreplace; $Global::replace{'{/.}'} = $::opt_basenameextensionreplace;
} }
if(defined $::opt_seqreplace) {
$Global::replace{'{#}'} = $::opt_seqreplace;
}
if(defined $::opt_E) { $Global::end_of_file_string = $::opt_E; } if(defined $::opt_E) { $Global::end_of_file_string = $::opt_E; }
if(defined $::opt_n) { $Global::max_number_of_args = $::opt_n; } if(defined $::opt_n) { $Global::max_number_of_args = $::opt_n; }
if(defined $::opt_N) { $Global::max_number_of_args = $::opt_N; } if(defined $::opt_N) { $Global::max_number_of_args = $::opt_N; }
@ -556,7 +561,7 @@ sub parse_options {
} }
if(not defined $::opt_P) { if(not defined $::opt_P) {
$::opt_P = "+0"; $::opt_P = "100%";
} }
if($::opt_joblog) { if($::opt_joblog) {
if(not open($Global::joblog,">$::opt_joblog")) { if(not open($Global::joblog,">$::opt_joblog")) {
@ -565,7 +570,7 @@ sub parse_options {
} else { } else {
print $Global::joblog print $Global::joblog
join("\t", "Seq", "Host", "Starttime", "Runtime", join("\t", "Seq", "Host", "Starttime", "Runtime",
"Send", "Receive", "Status", "Command" "Send", "Receive", "Exitval", "Command"
). "\n"; ). "\n";
} }
} }
@ -2375,7 +2380,7 @@ sub new {
'unget' => \@unget, 'unget' => \@unget,
'commandlinequeue' => $commandlinequeue, 'commandlinequeue' => $commandlinequeue,
'total_jobs' => undef, 'total_jobs' => undef,
'next_seq' => 1, # 'next_seq' => 1,
}, ref($class) || $class; }, ref($class) || $class;
} }
@ -2389,8 +2394,6 @@ sub get {
my $commandline = $self->{'commandlinequeue'}->get(); my $commandline = $self->{'commandlinequeue'}->get();
if(defined $commandline) { if(defined $commandline) {
my $job = Job->new($commandline); my $job = Job->new($commandline);
$job->set_seq($self->{'next_seq'});
$self->{'next_seq'}++;
return $job; return $job;
} else { } else {
return undef; return undef;
@ -2427,7 +2430,7 @@ sub total_jobs {
sub next_seq { sub next_seq {
my $self = shift; my $self = shift;
return $self->{'next_seq'}; return $self->{'commandlinequeue'}->seq();
} }
sub quote_args { sub quote_args {
@ -2444,7 +2447,6 @@ sub new {
return bless { return bless {
'commandline' => $commandline, # The commandline with no args 'commandline' => $commandline, # The commandline with no args
'workdir' => undef, # --workdir 'workdir' => undef, # --workdir
'seq' => undef, # $PARALLEL_SEQ
'stdin' => undef, # filehandle for stdin (used for --spreadstdin) 'stdin' => undef, # filehandle for stdin (used for --spreadstdin)
'stdout' => undef, # filehandle for stdout (used for --group) 'stdout' => undef, # filehandle for stdout (used for --group)
'stdoutfilename' => undef, # filename for writing stdout to (used for --files) 'stdoutfilename' => undef, # filename for writing stdout to (used for --files)
@ -2465,24 +2467,18 @@ sub new {
sub replaced { sub replaced {
my $self = shift; my $self = shift;
$self->{'commandline'} or Carp::croak("cmdline empty");
return $self->{'commandline'}->replaced(); return $self->{'commandline'}->replaced();
} }
sub set_seq {
my $self = shift;
my $seq = shift;
$self->{'seq'} = $seq;
}
sub seq { sub seq {
my $self = shift; my $self = shift;
return $self->{'seq'}; return $self->{'commandline'}->seq();
} }
sub set_stdout { sub set_stdout {
my $self = shift; my $self = shift;
my $stdout = shift; $self->{'stdout'} = shift;
$self->{'stdout'} = $stdout;
} }
sub stdout { sub stdout {
@ -2492,8 +2488,7 @@ sub stdout {
sub set_stdoutfilename { sub set_stdoutfilename {
my $self = shift; my $self = shift;
my $stdoutfilename = shift; $self->{'stdoutfilename'} = shift;
$self->{'stdoutfilename'} = $stdoutfilename;
} }
sub stdoutfilename { sub stdoutfilename {
@ -2508,8 +2503,7 @@ sub stderr {
sub set_stderr { sub set_stderr {
my $self = shift; my $self = shift;
my $stderr = shift; $self->{'stderr'} = shift;
$self->{'stderr'} = $stderr;
} }
sub stdin { sub stdin {
@ -2573,8 +2567,7 @@ sub pid {
sub set_pid { sub set_pid {
my $self = shift; my $self = shift;
my $pid = shift; $self->{'pid'} = shift;
$self->{'pid'} = $pid;
} }
sub starttime { sub starttime {
@ -2876,7 +2869,7 @@ sub workdir {
$workdir =~ s:/+$::; # Remove ending / if any $workdir =~ s:/+$::; # Remove ending / if any
$workdir =~ s:^\./::g; # Remove starting ./ if any $workdir =~ s:^\./::g; # Remove starting ./ if any
} else { } else {
$workdir = ".parallel/tmp/".::hostname()."-".$$."-".$self->{'seq'}; $workdir = ".parallel/tmp/".::hostname()."-".$$."-".$self->seq();
} }
} else { } else {
$workdir = "."; $workdir = ".";
@ -3098,6 +3091,7 @@ package CommandLine;
sub new { sub new {
my $class = shift; my $class = shift;
my $seq = shift;
my $command = ::undef_as_empty(shift); my $command = ::undef_as_empty(shift);
my $arg_queue = shift; my $arg_queue = shift;
my $context_replace = shift; my $context_replace = shift;
@ -3135,6 +3129,7 @@ sub new {
return bless { return bless {
'command' => $command, 'command' => $command,
'seq' => $seq,
'len' => $len, 'len' => $len,
'arg_list' => [], 'arg_list' => [],
'arg_queue' => $arg_queue, 'arg_queue' => $arg_queue,
@ -3148,6 +3143,11 @@ sub new {
}, ref($class) || $class; }, ref($class) || $class;
} }
sub seq {
my $self = shift;
return $self->{'seq'};
}
sub populate { sub populate {
# Add arguments from arg_queue until the number of arguments or # Add arguments from arg_queue until the number of arguments or
# max line length is reached # max line length is reached
@ -3372,10 +3372,10 @@ sub number_of_replacements {
} }
for my $k (keys %count) { for my $k (keys %count) {
if(defined $Global::replace{$k}) { if(defined $Global::replace{$k}) {
# {} {/} {.} {/.} # {} {/} {.} {/.} {#}
$context -= (length $Global::replace{$k}) * $count{$k}; $context -= (length $Global::replace{$k}) * $count{$k};
} else { } else {
# {#} # {n}
$context -= (length $k) * $count{$k}; $context -= (length $k) * $count{$k};
} }
} }
@ -3430,7 +3430,7 @@ sub replace_placeholders {
# we have a matching argument for {n} # we have a matching argument for {n}
$replace_single{$used} = $args[$positional-1]->replace($replacementfunction); $replace_single{$used} = $args[$positional-1]->replace($replacementfunction);
} }
} elsif($used =~ /^{\D*}$/) { } elsif($used =~ /^(\{\}|\{\/\}|\{\.\}|\{\/\.\})$/) {
# Add to the multireplacement # Add to the multireplacement
my $replacementfunction = $used; # {} {/} {.} or {/.} my $replacementfunction = $used; # {} {/} {.} or {/.}
CORE::push @used_multi, $replacementfunction; CORE::push @used_multi, $replacementfunction;
@ -3444,6 +3444,8 @@ sub replace_placeholders {
map { $args[$_]->replace($replacementfunction) } map { $args[$_]->replace($replacementfunction) }
0 .. $#args); 0 .. $#args);
} }
} elsif($used eq '{#}') {
$replace_single{$Global::replace{$used}} = $self->seq();
} else { } else {
::die_bug('replace_placeholders20110309'); ::die_bug('replace_placeholders20110309');
} }
@ -3540,9 +3542,20 @@ sub new {
'max_number_of_args' => $max_number_of_args, 'max_number_of_args' => $max_number_of_args,
'size' => undef, 'size' => undef,
'return_files' => $return_files, 'return_files' => $return_files,
'seq' => 1,
}, ref($class) || $class; }, ref($class) || $class;
} }
sub seq {
my $self = shift;
return $self->{'seq'};
}
sub set_seq {
my $self = shift;
$self->{'seq'} = shift;
}
sub quote_args { sub quote_args {
my $self = shift; my $self = shift;
# If there is not command emulate |bash # If there is not command emulate |bash
@ -3556,13 +3569,15 @@ sub get {
return ($cmd_line); return ($cmd_line);
} else { } else {
my $cmd_line; my $cmd_line;
$cmd_line = CommandLine->new($self->{'command'}, $cmd_line = CommandLine->new($self->seq(),
$self->{'command'},
$self->{'arg_queue'}, $self->{'arg_queue'},
$self->{'context_replace'}, $self->{'context_replace'},
$self->{'max_number_of_args'}, $self->{'max_number_of_args'},
$self->{'return_files'}, $self->{'return_files'},
); );
$cmd_line->populate(); $cmd_line->populate();
$self->set_seq($self->seq()+1);
::debug("cmd_line->number_of_args ".$cmd_line->number_of_args()."\n"); ::debug("cmd_line->number_of_args ".$cmd_line->number_of_args()."\n");
if(not $::opt_pipe and $cmd_line->number_of_args() == 0) { if(not $::opt_pipe and $cmd_line->number_of_args() == 0) {
# We did not get more args - maybe at EOF string? # We did not get more args - maybe at EOF string?

View file

@ -106,6 +106,13 @@ B<{/.}> can be used the same places as B<{}>. The replacement string
B<{/.}> can be changed with B<--basenameextensionreplace>. B<{/.}> can be changed with B<--basenameextensionreplace>.
=item B<{#}> (alpha testing)
Sequence number of the job to run. The same as $PARALLEL_SEQ.
The replacement string B<{#}> can be changed with B<--seqreplace>.
=item B<{>I<n>B<}> =item B<{>I<n>B<}>
Argument from argument file I<n> or the I<n>'th argument. See B<-a> Argument from argument file I<n> or the I<n>'th argument. See B<-a>
@ -230,7 +237,8 @@ I<file> will be transferred the same way as B<--transfer>.
=item B<--bnr> I<replace-str> =item B<--bnr> I<replace-str>
Use the replacement string I<replace-str> instead of B<{/}> for basename of input line. Use the replacement string I<replace-str> instead of B<{/}> for
basename of input line.
=item B<--basenameextensionreplace> I<replace-str> =item B<--basenameextensionreplace> I<replace-str>
@ -865,6 +873,12 @@ If the semaphore is not released within secs seconds, take it anyway.
Implies B<--semaphore>. Implies B<--semaphore>.
=item B<--seqreplace> I<replace-str>
Use the replacement string I<replace-str> instead of B<{#}> for
job sequence number.
=item B<--skip-first-line> =item B<--skip-first-line>
Do not use the first line of input (used by GNU B<parallel> itself Do not use the first line of input (used by GNU B<parallel> itself
@ -1453,6 +1467,14 @@ B<find . -type f | parallel -k -j150% -n 1000 -m grep -H -n STRING {}>
This will run 1.5 job per core, and give 1000 arguments to B<grep>. This will run 1.5 job per core, and give 1000 arguments to B<grep>.
To grep a big file in parallel use B<--pipe>:
B<cat bigfile | parallel --pipe grep foo>
Depending on your disks and CPUs it may be faster to read larger blocks:
B<cat bigfile | parallel --pipe --block 10M grep foo>
=head1 EXAMPLE: Using remote computers =head1 EXAMPLE: Using remote computers

View file

@ -6,22 +6,22 @@ seq 1 1000000 >/tmp/parallel-seq
shuf --random-source=/tmp/parallel-seq /tmp/parallel-seq >/tmp/blocktest shuf --random-source=/tmp/parallel-seq /tmp/parallel-seq >/tmp/blocktest
echo '### Test -N with multiple jobslots and multiple args' echo '### Test -N with multiple jobslots and multiple args'
seq 1 1 | parallel -j2 -k -N 3 --pipe 'cat;echo a;sleep 0.1' seq 1 1 | parallel -j2 -k -N 3 --pipe 'cat;echo a' | uniq
seq 1 2 | parallel -j2 -k -N 3 --pipe 'cat;echo bb;sleep 0.1' seq 1 2 | parallel -j2 -k -N 3 --pipe 'cat;echo bb' | uniq
seq 1 3 | parallel -j2 -k -N 3 --pipe 'cat;echo ccc;sleep 0.1' seq 1 3 | parallel -j2 -k -N 3 --pipe 'cat;echo ccc' | uniq
seq 1 4 | parallel -j2 -k -N 3 --pipe 'cat;echo dddd;sleep 0.1' seq 1 4 | parallel -j2 -k -N 3 --pipe 'cat;echo dddd' | uniq
seq 1 5 | parallel -j2 -k -N 3 --pipe 'cat;echo eeeee;sleep 0.1' seq 1 5 | parallel -j2 -k -N 3 --pipe 'cat;echo eeeee' | uniq
seq 1 6 | parallel -j2 -k -N 3 --pipe 'cat;echo ffffff;sleep 0.1' seq 1 6 | parallel -j2 -k -N 3 --pipe 'cat;echo ffffff' | uniq
seq 1 7 | parallel -j2 -k -N 3 --pipe 'cat;echo ggggggg;sleep 0.1' seq 1 7 | parallel -j2 -k -N 3 --pipe 'cat;echo ggggggg' | uniq
seq 1 8 | parallel -j2 -k -N 3 --pipe 'cat;echo hhhhhhhh;sleep 0.1' seq 1 8 | parallel -j2 -k -N 3 --pipe 'cat;echo hhhhhhhh' | uniq
seq 1 9 | parallel -j2 -k -N 3 --pipe 'cat;echo iiiiiiiii;sleep 0.1' seq 1 9 | parallel -j2 -k -N 3 --pipe 'cat;echo iiiiiiiii' | uniq
seq 1 10 | parallel -j2 -k -N 3 --pipe 'cat;echo jjjjjjjjjj;sleep 0.1' seq 1 10 | parallel -j2 -k -N 3 --pipe 'cat;echo jjjjjjjjjj' | uniq
echo '### Test -l -N -L and -n with multiple jobslots and multiple args' echo '### Test -l -N -L and -n with multiple jobslots and multiple args'
seq 1 5 | parallel -kj2 -l 2 --pipe "cat; echo a; sleep 0.1" seq 1 5 | parallel -kj2 -l 2 --pipe "cat; echo a" | uniq
seq 1 5 | parallel -kj2 -N 2 --pipe "cat; echo b; sleep 0.1" seq 1 5 | parallel -kj2 -N 2 --pipe "cat; echo b" | uniq
seq 1 5 | parallel -kj2 -L 2 --pipe "cat; echo c; sleep 0.1" seq 1 5 | parallel -kj2 -L 2 --pipe "cat; echo c" | uniq
seq 1 5 | parallel -kj2 -n 2 --pipe "cat; echo d; sleep 0.1" seq 1 5 | parallel -kj2 -n 2 --pipe "cat; echo d" | uniq
echo '### Test output is the same for different block size' echo '### Test output is the same for different block size'
echo -n 01a02a0a0a12a34a45a6a | echo -n 01a02a0a0a12a34a45a6a |

View file

@ -0,0 +1,7 @@
#!/bin/bash
echo '### Test {#}'
seq 1 10 | parallel -k echo {#}
echo '### Test --seqreplace and line too long'
seq 1 100 | stdout parallel -k --seqreplace I echo $(perl -e 'print "I"x130000') \|wc

View file

@ -2,16 +2,13 @@
### Test -N with multiple jobslots and multiple args ### Test -N with multiple jobslots and multiple args
1 1
a a
a
1 1
2 2
bb bb
bb
1 1
2 2
3 3
ccc ccc
ccc
1 1
2 2
3 3
@ -43,7 +40,6 @@ ggggggg
ggggggg ggggggg
7 7
ggggggg ggggggg
ggggggg
1 1
2 2
3 3
@ -55,7 +51,6 @@ hhhhhhhh
7 7
8 8
hhhhhhhh hhhhhhhh
hhhhhhhh
1 1
2 2
3 3
@ -68,7 +63,6 @@ iiiiiiiii
8 8
9 9
iiiiiiiii iiiiiiiii
iiiiiiiii
1 1
2 2
3 3
@ -92,7 +86,6 @@ a
a a
5 5
a a
a
1 1
2 2
b b
@ -101,7 +94,6 @@ b
b b
5 5
b b
b
1 1
2 2
c c
@ -110,7 +102,6 @@ c
c c
5 5
c c
c
1 1
2 2
d d
@ -119,7 +110,6 @@ d
d d
5 5
d d
d
### Test output is the same for different block size ### Test output is the same for different block size
1>01a02a0a 1>01a02a0a
2>0a12a34a 2>0a12a34a
@ -147,6 +137,7 @@ d
3>45 3>45
4>6 4>6
5>
### Test --rrs -N1 --recend single ### Test --rrs -N1 --recend single
1>12a34 1>12a34
2>45a6 2>45a6

View file

@ -0,0 +1,21 @@
### Test {#}
1
2
3
4
5
6
7
8
9
10
### Test --seqreplace and line too long
1 1 130001
1 1 130001
1 1 130001
1 1 130001
1 1 130001
1 1 130001
1 1 130001
1 1 130001
Command line too long (260009 >= 131071) at number 9: 10...