--command, -c, --file, and -f now obsoleted. --eta works again.

Bugfix in testsuite for --retries.
Lots of dead code removed.
This commit is contained in:
Ole Tange 2010-11-29 23:59:16 +01:00
parent 46cc1980fa
commit 4d22781652
3 changed files with 43 additions and 97 deletions

View file

@ -1,8 +1,10 @@
parallel: Argument handling re-written to OO.
The code is quite messy, the implementation is fairly slow, but the
structure seems sound and it passes the testsuite.
basename {/} and {/.} implemented.
Flushing of STDERR and STDOUT after each job completes.
--command, -c, --file, and -f now obsoleted. --eta works again.
Bugfix in testsuite for --retries.
Lots of dead code removed.
== Bug? ==
locate .gz | parallel -X find {} -size +1000 -size -2000 | parallel --workdir ... -S .. --trc {/}.bz2 'zcat {} | bzip2 > {/}.bz2'
== Compare ==

View file

@ -2496,13 +2496,11 @@ if($::opt_skip_first_line) {
<$fh>;
}
$Global::CommandLineQueue = CommandLineQueue->new(join(" ",@ARGV),\@fhlist,$Global::Xargs,$number_of_args,\@Global::ret_files);
$Global::CommandLineQueue = CommandLineQueue->new(
join(" ",@ARGV),\@fhlist,$Global::Xargs,$number_of_args,\@Global::ret_files);
$Global::JobQueue = JobQueue->new($Global::CommandLineQueue);
init_run_jobs();
#if(defined $::opt_P) {
# compute_number_of_processes_for_sshlogins();
#}
my $sem;
if($Global::semaphore) {
# $Global::host{':'}{'max_no_of_running'} must be set
@ -2576,8 +2574,6 @@ sub get_options_from_array {
"keep-order|keeporder|k" => \$::opt_k,
"group|g" => \$::opt_g,
"ungroup|u" => \$::opt_u,
# "command|c" => \$::opt_c,
# "file|f" => \$::opt_f,
"null|0" => \$::opt_0,
"quote|q" => \$::opt_q,
"I=s" => \$::opt_I,
@ -2658,7 +2654,6 @@ sub parse_options {
$Global::replace{'{/.}'} = '{/.}';
$/="\n";
$Global::ignore_empty = 0;
#$Global::argfile = *STDIN;
$Global::interactive = 0;
$Global::stderr_verbose = 0;
$Global::default_simultaneous_sshlogins = 9;
@ -2720,13 +2715,7 @@ sub parse_options {
if(defined $::opt_L and $::opt_L or defined $::opt_l) {
$Global::max_lines = $::opt_l || $::opt_L || 1;
$Global::max_number_of_args ||= $Global::max_lines;
# warn $Global::max_lines;
# warn $Global::max_number_of_args;
}
if($::opt_s) {
#$Global::max_number_of_args ||= 10000;
}
if($::opt_n || $::opt_N) { }
%Global::replace_rev = reverse %Global::replace;
if(grep /^$Global::arg_sep$/o, @ARGV) {
@ -2739,16 +2728,6 @@ sub parse_options {
@ARGV=convert_argfiles_from_command_line_to_multiple_opt_a();
}
if(($::opt_l || $::opt_L || $::opt_n || $::opt_N || $::opt_s ||
$::opt_colsep) and not ($::opt_m or $::opt_X)) {
# The options --max-line, -l, -L, --max-args, -n, --max-chars, -s
# do not make sense without -X or -m
# so default to -X
# Needs to be done after :::: and @opt_a, as they can set $::opt_N
#$Global::Xargs = 1;
# TODO Somehow set context_replace or not
}
# Semaphore defaults
# Must be done before computing number of processes and max_line_length
# because when running as a semaphore GNU Parallel does not read args
@ -2770,7 +2749,6 @@ sub parse_options {
}
if(defined $::opt_eta) {
$::opt_progress = $::opt_eta;
# use $CommandLineQueue->size(); when needed
}
if(@ARGV) {
@ -2935,54 +2913,6 @@ sub convert_argfiles_from_command_line_to_multiple_opt_a {
return @new_argv;
}
sub argfiles_xapply_style {
# Multiple -a => xapply style
# Convert the n files into one queue
# Every n'th entry is from the same file
# Set opt_N to read n entries per invocation
# Returns: N/A
#$Global::argfile = open_or_exit("/dev/null");
$::opt_N = $#::opt_a+1;
$Global::max_number_of_args = $#::opt_a+1;
# my @fhlist = map { open_or_exit($_) } @::opt_a;
# $Global::arg_queue = RecordQueue->new(\@fhlist);
# ::my_dump($Global::arg_queue);
# # read the files
# my @content;
# my $max_lineno = 0;
# my $in_fh = gensym;
# for (my $fileno = 0; $fileno <= $#::opt_a; $fileno++) {
# $in_fh = open_or_exit($::opt_a[$fileno]);
# $Global::arg_queue{$in_fh} ||= ArgQueue->new([$in_fh]);
# if($::opt_skip_first_line and $fileno == 0) {
# <$in_fh>; # Read first line and forget it
# }
# for (my $lineno=0;
# $content[$fileno][$lineno] = $Global::arg_queue{$in_fh}->get();
# $lineno++) {
# $max_lineno = max($max_lineno,$lineno);
# }
# close $in_fh;
# }
# for (my $lineno=0; $lineno <= $max_lineno; $lineno++) {
# for (my $fileno = 0; $fileno <= $#::opt_a; $fileno++) {
# my $arg = $content[$fileno][$lineno];
# if($Global::trim ne 'n') {
# $arg = trim($arg);
# }
# $Global::arg_queue ||= ArgQueue->new([$Global::argfile]);
# if(defined $arg) {
# $Global::arg_queue->unget($arg);
# } else {
# die;
# $Global::arg_queue->unget(Arg->new(""));
# }
# }
# }
# $Global::total_jobs += $max_lineno;
}
sub open_or_exit {
# Returns:
# file handle to read-opened file
@ -3159,6 +3089,7 @@ sub processes_available_by_system_limit {
kill 9, $pid;
waitpid($pid,0);
}
#wait();
# Cleanup: Unget the command_lines (and args_refs)
$Global::CommandLineQueue->unget(@command_lines);
@ -3614,14 +3545,14 @@ sub progress {
my $eta = "";
if($::opt_eta) {
my $completed = 0;
for(@workers) { $completed += ($Global::host{$_}->jobs_completed()||0) }
for(@workers) { $completed += $Global::host{$_}->jobs_completed() }
if($completed) {
$Private::first_completed ||= time;
my $avgtime = (time-$Private::first_completed)/$completed;
my $this_eta = ($Global::total_jobs - $completed) * $avgtime;
my $this_eta = ($Global::JobQueue->total_jobs() - $completed) * $avgtime;
$Private::eta ||= $this_eta;
# Smooth the eta so it does not jump wildly
$Private::eta = 0.9 * $Private::eta + 0.1 * $this_eta;
$Private::eta = 0.98 * $Private::eta + 0.02 * $this_eta;
$eta = sprintf("ETA: %ds ", $Private::eta);
}
}
@ -4276,7 +4207,7 @@ sub new {
return bless {
'string' => $string,
'jobs_running' => undef,
'jobs_completed' => undef,
'jobs_completed' => 0,
'maxlength' => undef,
'max_jobs_running' => undef,
'ncpus' => undef,
@ -4606,6 +4537,7 @@ sub new {
return bless {
'unget' => \@unget,
'commandlinequeue' => $commandlinequeue,
'total_jobs' => undef,
}, ref($class) || $class;
}
@ -4638,6 +4570,21 @@ sub empty {
return $empty;
}
sub total_jobs {
my $self = shift;
if(not defined $self->{'total_jobs'}) {
my $job;
my @queue;
while($job = $self->get()) {
push @queue, $job;
}
$self->unget(@queue);
$self->{'total_jobs'} = $#queue+1;
}
return $self->{'total_jobs'};
}
package Job;
sub new {
@ -4926,7 +4873,8 @@ sub sshcleanup {
my @subworkdirs = parentdirs_of($file);
$file = ::shell_quote_scalar($file);
if(@subworkdirs) {
$removeworkdir = "; rmdir 2>/dev/null ".join(" ",map { ::shell_quote_scalar($workdir."/".$_) } @subworkdirs);
$removeworkdir = "; rmdir 2>/dev/null ".
join(" ",map { ::shell_quote_scalar($workdir."/".$_) } @subworkdirs);
}
my $relpath = ($file !~ m:^/:); # Is the path relative?
my $cleandir = ($relpath ? $workdir : "");
@ -5363,8 +5311,6 @@ sub number_of_replacements {
$no_args = length($cmd);
$context = length($command) - $no_args;
}
#warn("Command:$command no_args:$no_args context:$context");
#warn(%count);
for my $k (keys %count) {
if(defined $Global::replace{$k}) {
# {} {/} {.} {/.}
@ -5374,7 +5320,6 @@ sub number_of_replacements {
$context -= (length $k) * $count{$k};
}
}
#die("Command:$command no_args:$no_args context:$context");
return ($sum,$no_args,$context,$number_of_context_groups,%count);
}
@ -5411,7 +5356,6 @@ sub replace_placeholders {
# Merge arguments from records into args
CORE::push @args, @$record;
}
#::my_dump(@args);
for my $used (keys %{$self->{'replacecount'}}) {
if($used =~ /^{(\d+)(\D*)}$/) {
my $positional = $1; # number if any
@ -5936,7 +5880,6 @@ sub new {
-d $parallel_locks or mkdir $parallel_locks;
my $lockdir = "$parallel_locks/$id";
my $lockfile = $lockdir.".lock";
# Carp::cluck($count);
if($count < 1) { die "Semaphore count = $count"; }
return bless {
'lockfile' => $lockfile,

View file

@ -1,20 +1,21 @@
#!/bin/bash
echo '### Test of --retries'
seq 1 10 | parallel -k --retries 2 -v -S 4.3.2.1,: echo
seq 1 10 | stdout parallel -k --retries 2 -v -S 4.3.2.1,: echo
echo '### Test of --retries - it should run 13 jobs in total'
seq 0 12 | parallel --progress -kj100% --retries 1 -S 12/nlv.pi.dk,1/:,parallel@server2 -vq \
perl -e 'sleep 1;print "job{}\n";exit({})' | tail -n1 | \
perl -ne '@a=(split /\//,$_); print $a[1]+$a[4]+$a[7],"\n"'
seq 0 12 | stdout parallel --progress -kj100% --retries 1 -S 12/nlv.pi.dk,1/:,parallel@server2 -vq \
perl -e 'sleep 1;print "job{}\n";exit({})' | \
perl -ne 'BEGIN{$/="\r";} @a=(split /\//,$_); END{print $a[1]+$a[4]+$a[7],"\n"}'
echo '### Test of --retries - it should run 25 jobs in total'
seq 0 12 | parallel --progress -kj100% --retries 2 -S 12/nlv.pi.dk,1/:,parallel@server2 -vq \
perl -e 'sleep 1;print "job{}\n";exit({})' | tail -n1 | \
perl -ne '@a=(split /\//,$_); print $a[1]+$a[4]+$a[7],"\n"'
seq 0 12 | stdout parallel --progress -kj100% --retries 2 -S 12/nlv.pi.dk,1/:,parallel@server2 -vq \
perl -e 'sleep 1;print "job{}\n";exit({})' | \
perl -ne 'BEGIN{$/="\r";} @a=(split /\//,$_); END{print $a[1]+$a[4]+$a[7],"\n"}'
echo '### Test of --retries - it should run 49 jobs in total'
seq 0 12 | parallel --progress -kj100% --retries 4 -S 12/nlv.pi.dk,1/:,parallel@server2 -vq \
perl -e 'sleep 1;print "job{}\n";exit({})' | tail -n1 | \
perl -ne '@a=(split /\//,$_); print $a[1]+$a[4]+$a[7],"\n"'
seq 0 12 | stdout parallel --progress -kj100% --retries 4 -S 12/nlv.pi.dk,1/:,parallel@server2 -vq \
perl -e 'sleep 1;print "job{}\n";exit({})' | \
perl -ne 'BEGIN{$/="\r";} @a=(split /\//,$_); END{print $a[1]+$a[4]+$a[7],"\n"}'