From 7b4f747877baf83892e2d2894ccf66d2ea33bdc7 Mon Sep 17 00:00:00 2001 From: Ole Tange Date: Fri, 8 Apr 2011 21:57:19 +0200 Subject: [PATCH] parallel: Implemented {#}. Passes testsuite. --- doc/FUTURE_IDEAS | 27 ++++++++++++ src/parallel | 73 +++++++++++++++++++------------- src/parallel.pod | 24 ++++++++++- testsuite/tests-to-run/test51.sh | 28 ++++++------ testsuite/tests-to-run/test56.sh | 7 +++ testsuite/wanted-results/test51 | 11 +---- testsuite/wanted-results/test56 | 21 +++++++++ 7 files changed, 137 insertions(+), 54 deletions(-) create mode 100644 testsuite/tests-to-run/test56.sh create mode 100644 testsuite/wanted-results/test56 diff --git a/doc/FUTURE_IDEAS b/doc/FUTURE_IDEAS index 8f8fb5a7..382dfde8 100644 --- a/doc/FUTURE_IDEAS +++ b/doc/FUTURE_IDEAS @@ -1,3 +1,30 @@ +Bug: parallel --colsep ')' echo {1} + +job->start(): +$jobslot = Global::jobslot->$sshlogin + +sub get_jobslot { + my $sshlogin = shift; + my $jobslot_id = pop @Global::jobslots{$sshlogin}; + if not defined $jobslot_id { + $jobslot_id = ++$Global::max_jobslot_id; + } + return $jobslot_id; +} + +sub release_jobslot { + my $sshlogin = shift; + my $jobslot_id = shift; + push @Global::jobslots{$sshlogin}, $jobslot_id; +} + +Test sshlogins in parallel. Assume parallel is in path + +seq 1 10 | parallel -I {o} 'seq 1 255 | parallel echo ssh -oNoHostAuthenticationForLocalhost=true 127.0.{o}.{}' >/tmp/sshloginfile +seq 1 1000 | parallel --sshloginfile /tmp/sshloginfile echo + +ssh -F /tmp/ + Example: Chop mbox into emails Parallel sort diff --git a/src/parallel b/src/parallel index cc393a4f..fed937a3 100755 --- a/src/parallel +++ b/src/parallel @@ -306,6 +306,7 @@ sub get_options_from_array { "extensionreplace|U=s" => \$::opt_U, "basenamereplace|bnr=s" => \$::opt_basenamereplace, "basenameextensionreplace=s" => \$::opt_basenameextensionreplace, + "seqreplace=s" => \$::opt_seqreplace, "jobs|j=s" => \$::opt_P, "load=f" => \$::opt_load, "max-line-length-allowed" => \$::opt_max_line_length_allowed, @@ -361,7 +362,7 @@ sub get_options_from_array { "interactive|p" => \$::opt_p, "verbose|t" => \$::opt_verbose, "version|V" => \$::opt_version, - "show-limits" => \$::opt_show_limits, + "show-limits|showlimits" => \$::opt_show_limits, "exit|x" => \$::opt_x, # Semaphore "semaphore" => \$::opt_semaphore, @@ -384,7 +385,7 @@ sub get_options_from_array { sub parse_options { # Returns: N/A # Defaults: - $Global::version = 20110324; + $Global::version = 20110408; $Global::progname = 'parallel'; $Global::infinity = 2**31; $Global::debug = 0; @@ -396,6 +397,7 @@ sub parse_options { $Global::replace{'{.}'} = '{.}'; $Global::replace{'{/}'} = '{/}'; $Global::replace{'{/.}'} = '{/.}'; + $Global::replace{'{#}'} = '{#}'; $/="\n"; $Global::ignore_empty = 0; $Global::interactive = 0; @@ -433,6 +435,9 @@ sub parse_options { if(defined $::opt_basenameextensionreplace) { $Global::replace{'{/.}'} = $::opt_basenameextensionreplace; } + if(defined $::opt_seqreplace) { + $Global::replace{'{#}'} = $::opt_seqreplace; + } if(defined $::opt_E) { $Global::end_of_file_string = $::opt_E; } if(defined $::opt_n) { $Global::max_number_of_args = $::opt_n; } if(defined $::opt_N) { $Global::max_number_of_args = $::opt_N; } @@ -556,7 +561,7 @@ sub parse_options { } if(not defined $::opt_P) { - $::opt_P = "+0"; + $::opt_P = "100%"; } if($::opt_joblog) { if(not open($Global::joblog,">$::opt_joblog")) { @@ -565,7 +570,7 @@ sub parse_options { } else { print $Global::joblog join("\t", "Seq", "Host", "Starttime", "Runtime", - "Send", "Receive", "Status", "Command" + "Send", "Receive", "Exitval", "Command" ). "\n"; } } @@ -2375,7 +2380,7 @@ sub new { 'unget' => \@unget, 'commandlinequeue' => $commandlinequeue, 'total_jobs' => undef, - 'next_seq' => 1, +# 'next_seq' => 1, }, ref($class) || $class; } @@ -2389,8 +2394,6 @@ sub get { my $commandline = $self->{'commandlinequeue'}->get(); if(defined $commandline) { my $job = Job->new($commandline); - $job->set_seq($self->{'next_seq'}); - $self->{'next_seq'}++; return $job; } else { return undef; @@ -2427,7 +2430,7 @@ sub total_jobs { sub next_seq { my $self = shift; - return $self->{'next_seq'}; + return $self->{'commandlinequeue'}->seq(); } sub quote_args { @@ -2444,7 +2447,6 @@ sub new { return bless { 'commandline' => $commandline, # The commandline with no args 'workdir' => undef, # --workdir - 'seq' => undef, # $PARALLEL_SEQ 'stdin' => undef, # filehandle for stdin (used for --spreadstdin) 'stdout' => undef, # filehandle for stdout (used for --group) 'stdoutfilename' => undef, # filename for writing stdout to (used for --files) @@ -2465,24 +2467,18 @@ sub new { sub replaced { my $self = shift; + $self->{'commandline'} or Carp::croak("cmdline empty"); return $self->{'commandline'}->replaced(); } -sub set_seq { - my $self = shift; - my $seq = shift; - $self->{'seq'} = $seq; -} - sub seq { my $self = shift; - return $self->{'seq'}; + return $self->{'commandline'}->seq(); } sub set_stdout { my $self = shift; - my $stdout = shift; - $self->{'stdout'} = $stdout; + $self->{'stdout'} = shift; } sub stdout { @@ -2492,8 +2488,7 @@ sub stdout { sub set_stdoutfilename { my $self = shift; - my $stdoutfilename = shift; - $self->{'stdoutfilename'} = $stdoutfilename; + $self->{'stdoutfilename'} = shift; } sub stdoutfilename { @@ -2508,8 +2503,7 @@ sub stderr { sub set_stderr { my $self = shift; - my $stderr = shift; - $self->{'stderr'} = $stderr; + $self->{'stderr'} = shift; } sub stdin { @@ -2573,8 +2567,7 @@ sub pid { sub set_pid { my $self = shift; - my $pid = shift; - $self->{'pid'} = $pid; + $self->{'pid'} = shift; } sub starttime { @@ -2876,7 +2869,7 @@ sub workdir { $workdir =~ s:/+$::; # Remove ending / if any $workdir =~ s:^\./::g; # Remove starting ./ if any } else { - $workdir = ".parallel/tmp/".::hostname()."-".$$."-".$self->{'seq'}; + $workdir = ".parallel/tmp/".::hostname()."-".$$."-".$self->seq(); } } else { $workdir = "."; @@ -3098,6 +3091,7 @@ package CommandLine; sub new { my $class = shift; + my $seq = shift; my $command = ::undef_as_empty(shift); my $arg_queue = shift; my $context_replace = shift; @@ -3135,6 +3129,7 @@ sub new { return bless { 'command' => $command, + 'seq' => $seq, 'len' => $len, 'arg_list' => [], 'arg_queue' => $arg_queue, @@ -3148,6 +3143,11 @@ sub new { }, ref($class) || $class; } +sub seq { + my $self = shift; + return $self->{'seq'}; +} + sub populate { # Add arguments from arg_queue until the number of arguments or # max line length is reached @@ -3372,10 +3372,10 @@ sub number_of_replacements { } for my $k (keys %count) { if(defined $Global::replace{$k}) { - # {} {/} {.} {/.} + # {} {/} {.} {/.} {#} $context -= (length $Global::replace{$k}) * $count{$k}; } else { - # {#} + # {n} $context -= (length $k) * $count{$k}; } } @@ -3430,7 +3430,7 @@ sub replace_placeholders { # we have a matching argument for {n} $replace_single{$used} = $args[$positional-1]->replace($replacementfunction); } - } elsif($used =~ /^{\D*}$/) { + } elsif($used =~ /^(\{\}|\{\/\}|\{\.\}|\{\/\.\})$/) { # Add to the multireplacement my $replacementfunction = $used; # {} {/} {.} or {/.} CORE::push @used_multi, $replacementfunction; @@ -3444,6 +3444,8 @@ sub replace_placeholders { map { $args[$_]->replace($replacementfunction) } 0 .. $#args); } + } elsif($used eq '{#}') { + $replace_single{$Global::replace{$used}} = $self->seq(); } else { ::die_bug('replace_placeholders20110309'); } @@ -3540,9 +3542,20 @@ sub new { 'max_number_of_args' => $max_number_of_args, 'size' => undef, 'return_files' => $return_files, + 'seq' => 1, }, ref($class) || $class; } +sub seq { + my $self = shift; + return $self->{'seq'}; +} + +sub set_seq { + my $self = shift; + $self->{'seq'} = shift; +} + sub quote_args { my $self = shift; # If there is not command emulate |bash @@ -3556,13 +3569,15 @@ sub get { return ($cmd_line); } else { my $cmd_line; - $cmd_line = CommandLine->new($self->{'command'}, + $cmd_line = CommandLine->new($self->seq(), + $self->{'command'}, $self->{'arg_queue'}, $self->{'context_replace'}, $self->{'max_number_of_args'}, $self->{'return_files'}, ); $cmd_line->populate(); + $self->set_seq($self->seq()+1); ::debug("cmd_line->number_of_args ".$cmd_line->number_of_args()."\n"); if(not $::opt_pipe and $cmd_line->number_of_args() == 0) { # We did not get more args - maybe at EOF string? diff --git a/src/parallel.pod b/src/parallel.pod index 8a1cf174..e8fc7f7b 100644 --- a/src/parallel.pod +++ b/src/parallel.pod @@ -106,6 +106,13 @@ B<{/.}> can be used the same places as B<{}>. The replacement string B<{/.}> can be changed with B<--basenameextensionreplace>. +=item B<{#}> (alpha testing) + +Sequence number of the job to run. The same as $PARALLEL_SEQ. + +The replacement string B<{#}> can be changed with B<--seqreplace>. + + =item B<{>IB<}> Argument from argument file I or the I'th argument. See B<-a> @@ -230,7 +237,8 @@ I will be transferred the same way as B<--transfer>. =item B<--bnr> I -Use the replacement string I instead of B<{/}> for basename of input line. +Use the replacement string I instead of B<{/}> for +basename of input line. =item B<--basenameextensionreplace> I @@ -865,6 +873,12 @@ If the semaphore is not released within secs seconds, take it anyway. Implies B<--semaphore>. +=item B<--seqreplace> I + +Use the replacement string I instead of B<{#}> for +job sequence number. + + =item B<--skip-first-line> Do not use the first line of input (used by GNU B itself @@ -1453,6 +1467,14 @@ B This will run 1.5 job per core, and give 1000 arguments to B. +To grep a big file in parallel use B<--pipe>: + +B + +Depending on your disks and CPUs it may be faster to read larger blocks: + +B + =head1 EXAMPLE: Using remote computers diff --git a/testsuite/tests-to-run/test51.sh b/testsuite/tests-to-run/test51.sh index 4bc8fc8c..d64deb54 100644 --- a/testsuite/tests-to-run/test51.sh +++ b/testsuite/tests-to-run/test51.sh @@ -6,22 +6,22 @@ seq 1 1000000 >/tmp/parallel-seq shuf --random-source=/tmp/parallel-seq /tmp/parallel-seq >/tmp/blocktest echo '### Test -N with multiple jobslots and multiple args' -seq 1 1 | parallel -j2 -k -N 3 --pipe 'cat;echo a;sleep 0.1' -seq 1 2 | parallel -j2 -k -N 3 --pipe 'cat;echo bb;sleep 0.1' -seq 1 3 | parallel -j2 -k -N 3 --pipe 'cat;echo ccc;sleep 0.1' -seq 1 4 | parallel -j2 -k -N 3 --pipe 'cat;echo dddd;sleep 0.1' -seq 1 5 | parallel -j2 -k -N 3 --pipe 'cat;echo eeeee;sleep 0.1' -seq 1 6 | parallel -j2 -k -N 3 --pipe 'cat;echo ffffff;sleep 0.1' -seq 1 7 | parallel -j2 -k -N 3 --pipe 'cat;echo ggggggg;sleep 0.1' -seq 1 8 | parallel -j2 -k -N 3 --pipe 'cat;echo hhhhhhhh;sleep 0.1' -seq 1 9 | parallel -j2 -k -N 3 --pipe 'cat;echo iiiiiiiii;sleep 0.1' -seq 1 10 | parallel -j2 -k -N 3 --pipe 'cat;echo jjjjjjjjjj;sleep 0.1' +seq 1 1 | parallel -j2 -k -N 3 --pipe 'cat;echo a' | uniq +seq 1 2 | parallel -j2 -k -N 3 --pipe 'cat;echo bb' | uniq +seq 1 3 | parallel -j2 -k -N 3 --pipe 'cat;echo ccc' | uniq +seq 1 4 | parallel -j2 -k -N 3 --pipe 'cat;echo dddd' | uniq +seq 1 5 | parallel -j2 -k -N 3 --pipe 'cat;echo eeeee' | uniq +seq 1 6 | parallel -j2 -k -N 3 --pipe 'cat;echo ffffff' | uniq +seq 1 7 | parallel -j2 -k -N 3 --pipe 'cat;echo ggggggg' | uniq +seq 1 8 | parallel -j2 -k -N 3 --pipe 'cat;echo hhhhhhhh' | uniq +seq 1 9 | parallel -j2 -k -N 3 --pipe 'cat;echo iiiiiiiii' | uniq +seq 1 10 | parallel -j2 -k -N 3 --pipe 'cat;echo jjjjjjjjjj' | uniq echo '### Test -l -N -L and -n with multiple jobslots and multiple args' -seq 1 5 | parallel -kj2 -l 2 --pipe "cat; echo a; sleep 0.1" -seq 1 5 | parallel -kj2 -N 2 --pipe "cat; echo b; sleep 0.1" -seq 1 5 | parallel -kj2 -L 2 --pipe "cat; echo c; sleep 0.1" -seq 1 5 | parallel -kj2 -n 2 --pipe "cat; echo d; sleep 0.1" +seq 1 5 | parallel -kj2 -l 2 --pipe "cat; echo a" | uniq +seq 1 5 | parallel -kj2 -N 2 --pipe "cat; echo b" | uniq +seq 1 5 | parallel -kj2 -L 2 --pipe "cat; echo c" | uniq +seq 1 5 | parallel -kj2 -n 2 --pipe "cat; echo d" | uniq echo '### Test output is the same for different block size' echo -n 01a02a0a0a12a34a45a6a | diff --git a/testsuite/tests-to-run/test56.sh b/testsuite/tests-to-run/test56.sh new file mode 100644 index 00000000..00f84790 --- /dev/null +++ b/testsuite/tests-to-run/test56.sh @@ -0,0 +1,7 @@ +#!/bin/bash + +echo '### Test {#}' +seq 1 10 | parallel -k echo {#} + +echo '### Test --seqreplace and line too long' +seq 1 100 | stdout parallel -k --seqreplace I echo $(perl -e 'print "I"x130000') \|wc diff --git a/testsuite/wanted-results/test51 b/testsuite/wanted-results/test51 index 34729c9f..bfc1399d 100644 --- a/testsuite/wanted-results/test51 +++ b/testsuite/wanted-results/test51 @@ -2,16 +2,13 @@ ### Test -N with multiple jobslots and multiple args 1 a -a 1 2 bb -bb 1 2 3 ccc -ccc 1 2 3 @@ -43,7 +40,6 @@ ggggggg ggggggg 7 ggggggg -ggggggg 1 2 3 @@ -55,7 +51,6 @@ hhhhhhhh 7 8 hhhhhhhh -hhhhhhhh 1 2 3 @@ -68,7 +63,6 @@ iiiiiiiii 8 9 iiiiiiiii -iiiiiiiii 1 2 3 @@ -92,7 +86,6 @@ a a 5 a -a 1 2 b @@ -101,7 +94,6 @@ b b 5 b -b 1 2 c @@ -110,7 +102,6 @@ c c 5 c -c 1 2 d @@ -119,7 +110,6 @@ d d 5 d -d ### Test output is the same for different block size 1>01a02a0a 2>0a12a34a @@ -147,6 +137,7 @@ d 3>45 4>6 +5> ### Test --rrs -N1 --recend single 1>12a34 2>45a6 diff --git a/testsuite/wanted-results/test56 b/testsuite/wanted-results/test56 new file mode 100644 index 00000000..bf4a06ee --- /dev/null +++ b/testsuite/wanted-results/test56 @@ -0,0 +1,21 @@ +### Test {#} +1 +2 +3 +4 +5 +6 +7 +8 +9 +10 +### Test --seqreplace and line too long + 1 1 130001 + 1 1 130001 + 1 1 130001 + 1 1 130001 + 1 1 130001 + 1 1 130001 + 1 1 130001 + 1 1 130001 +Command line too long (260009 >= 131071) at number 9: 10...