diff --git a/doc/FUTURE_IDEAS b/doc/FUTURE_IDEAS index db27f1e8..439c93d6 100644 --- a/doc/FUTURE_IDEAS +++ b/doc/FUTURE_IDEAS @@ -1,8 +1,10 @@ -parallel: Argument handling re-written to OO. -The code is quite messy, the implementation is fairly slow, but the -structure seems sound and it passes the testsuite. -basename {/} and {/.} implemented. -Flushing of STDERR and STDOUT after each job completes. +--command, -c, --file, and -f now obsoleted. --eta works again. +Bugfix in testsuite for --retries. +Lots of dead code removed. + +== Bug? == + +locate .gz | parallel -X find {} -size +1000 -size -2000 | parallel --workdir ... -S .. --trc {/}.bz2 'zcat {} | bzip2 > {/}.bz2' == Compare == diff --git a/src/parallel b/src/parallel index b683f218..508e7986 100755 --- a/src/parallel +++ b/src/parallel @@ -2496,13 +2496,11 @@ if($::opt_skip_first_line) { <$fh>; } -$Global::CommandLineQueue = CommandLineQueue->new(join(" ",@ARGV),\@fhlist,$Global::Xargs,$number_of_args,\@Global::ret_files); +$Global::CommandLineQueue = CommandLineQueue->new( + join(" ",@ARGV),\@fhlist,$Global::Xargs,$number_of_args,\@Global::ret_files); $Global::JobQueue = JobQueue->new($Global::CommandLineQueue); init_run_jobs(); -#if(defined $::opt_P) { -# compute_number_of_processes_for_sshlogins(); -#} my $sem; if($Global::semaphore) { # $Global::host{':'}{'max_no_of_running'} must be set @@ -2576,8 +2574,6 @@ sub get_options_from_array { "keep-order|keeporder|k" => \$::opt_k, "group|g" => \$::opt_g, "ungroup|u" => \$::opt_u, -# "command|c" => \$::opt_c, -# "file|f" => \$::opt_f, "null|0" => \$::opt_0, "quote|q" => \$::opt_q, "I=s" => \$::opt_I, @@ -2658,7 +2654,6 @@ sub parse_options { $Global::replace{'{/.}'} = '{/.}'; $/="\n"; $Global::ignore_empty = 0; - #$Global::argfile = *STDIN; $Global::interactive = 0; $Global::stderr_verbose = 0; $Global::default_simultaneous_sshlogins = 9; @@ -2720,13 +2715,7 @@ sub parse_options { if(defined $::opt_L and $::opt_L or defined $::opt_l) { $Global::max_lines = $::opt_l || $::opt_L || 1; $Global::max_number_of_args ||= $Global::max_lines; -# warn $Global::max_lines; -# warn $Global::max_number_of_args; } - if($::opt_s) { - #$Global::max_number_of_args ||= 10000; - } - if($::opt_n || $::opt_N) { } %Global::replace_rev = reverse %Global::replace; if(grep /^$Global::arg_sep$/o, @ARGV) { @@ -2739,16 +2728,6 @@ sub parse_options { @ARGV=convert_argfiles_from_command_line_to_multiple_opt_a(); } - if(($::opt_l || $::opt_L || $::opt_n || $::opt_N || $::opt_s || - $::opt_colsep) and not ($::opt_m or $::opt_X)) { - # The options --max-line, -l, -L, --max-args, -n, --max-chars, -s - # do not make sense without -X or -m - # so default to -X - # Needs to be done after :::: and @opt_a, as they can set $::opt_N - #$Global::Xargs = 1; - # TODO Somehow set context_replace or not - } - # Semaphore defaults # Must be done before computing number of processes and max_line_length # because when running as a semaphore GNU Parallel does not read args @@ -2770,7 +2749,6 @@ sub parse_options { } if(defined $::opt_eta) { $::opt_progress = $::opt_eta; - # use $CommandLineQueue->size(); when needed } if(@ARGV) { @@ -2935,54 +2913,6 @@ sub convert_argfiles_from_command_line_to_multiple_opt_a { return @new_argv; } -sub argfiles_xapply_style { - # Multiple -a => xapply style - # Convert the n files into one queue - # Every n'th entry is from the same file - # Set opt_N to read n entries per invocation - # Returns: N/A - #$Global::argfile = open_or_exit("/dev/null"); - $::opt_N = $#::opt_a+1; - $Global::max_number_of_args = $#::opt_a+1; -# my @fhlist = map { open_or_exit($_) } @::opt_a; -# $Global::arg_queue = RecordQueue->new(\@fhlist); -# ::my_dump($Global::arg_queue); - -# # read the files -# my @content; -# my $max_lineno = 0; -# my $in_fh = gensym; -# for (my $fileno = 0; $fileno <= $#::opt_a; $fileno++) { -# $in_fh = open_or_exit($::opt_a[$fileno]); -# $Global::arg_queue{$in_fh} ||= ArgQueue->new([$in_fh]); -# if($::opt_skip_first_line and $fileno == 0) { -# <$in_fh>; # Read first line and forget it -# } -# for (my $lineno=0; -# $content[$fileno][$lineno] = $Global::arg_queue{$in_fh}->get(); -# $lineno++) { -# $max_lineno = max($max_lineno,$lineno); -# } -# close $in_fh; -# } -# for (my $lineno=0; $lineno <= $max_lineno; $lineno++) { -# for (my $fileno = 0; $fileno <= $#::opt_a; $fileno++) { -# my $arg = $content[$fileno][$lineno]; -# if($Global::trim ne 'n') { -# $arg = trim($arg); -# } -# $Global::arg_queue ||= ArgQueue->new([$Global::argfile]); -# if(defined $arg) { -# $Global::arg_queue->unget($arg); -# } else { -# die; -# $Global::arg_queue->unget(Arg->new("")); -# } -# } -# } -# $Global::total_jobs += $max_lineno; -} - sub open_or_exit { # Returns: # file handle to read-opened file @@ -3159,6 +3089,7 @@ sub processes_available_by_system_limit { kill 9, $pid; waitpid($pid,0); } + #wait(); # Cleanup: Unget the command_lines (and args_refs) $Global::CommandLineQueue->unget(@command_lines); @@ -3614,14 +3545,14 @@ sub progress { my $eta = ""; if($::opt_eta) { my $completed = 0; - for(@workers) { $completed += ($Global::host{$_}->jobs_completed()||0) } + for(@workers) { $completed += $Global::host{$_}->jobs_completed() } if($completed) { $Private::first_completed ||= time; my $avgtime = (time-$Private::first_completed)/$completed; - my $this_eta = ($Global::total_jobs - $completed) * $avgtime; + my $this_eta = ($Global::JobQueue->total_jobs() - $completed) * $avgtime; $Private::eta ||= $this_eta; # Smooth the eta so it does not jump wildly - $Private::eta = 0.9 * $Private::eta + 0.1 * $this_eta; + $Private::eta = 0.98 * $Private::eta + 0.02 * $this_eta; $eta = sprintf("ETA: %ds ", $Private::eta); } } @@ -4276,7 +4207,7 @@ sub new { return bless { 'string' => $string, 'jobs_running' => undef, - 'jobs_completed' => undef, + 'jobs_completed' => 0, 'maxlength' => undef, 'max_jobs_running' => undef, 'ncpus' => undef, @@ -4606,6 +4537,7 @@ sub new { return bless { 'unget' => \@unget, 'commandlinequeue' => $commandlinequeue, + 'total_jobs' => undef, }, ref($class) || $class; } @@ -4638,6 +4570,21 @@ sub empty { return $empty; } +sub total_jobs { + my $self = shift; + if(not defined $self->{'total_jobs'}) { + my $job; + my @queue; + while($job = $self->get()) { + push @queue, $job; + } + $self->unget(@queue); + $self->{'total_jobs'} = $#queue+1; + } + return $self->{'total_jobs'}; +} + + package Job; sub new { @@ -4926,7 +4873,8 @@ sub sshcleanup { my @subworkdirs = parentdirs_of($file); $file = ::shell_quote_scalar($file); if(@subworkdirs) { - $removeworkdir = "; rmdir 2>/dev/null ".join(" ",map { ::shell_quote_scalar($workdir."/".$_) } @subworkdirs); + $removeworkdir = "; rmdir 2>/dev/null ". + join(" ",map { ::shell_quote_scalar($workdir."/".$_) } @subworkdirs); } my $relpath = ($file !~ m:^/:); # Is the path relative? my $cleandir = ($relpath ? $workdir : ""); @@ -5363,8 +5311,6 @@ sub number_of_replacements { $no_args = length($cmd); $context = length($command) - $no_args; } - #warn("Command:$command no_args:$no_args context:$context"); - #warn(%count); for my $k (keys %count) { if(defined $Global::replace{$k}) { # {} {/} {.} {/.} @@ -5374,7 +5320,6 @@ sub number_of_replacements { $context -= (length $k) * $count{$k}; } } - #die("Command:$command no_args:$no_args context:$context"); return ($sum,$no_args,$context,$number_of_context_groups,%count); } @@ -5411,7 +5356,6 @@ sub replace_placeholders { # Merge arguments from records into args CORE::push @args, @$record; } - #::my_dump(@args); for my $used (keys %{$self->{'replacecount'}}) { if($used =~ /^{(\d+)(\D*)}$/) { my $positional = $1; # number if any @@ -5936,7 +5880,6 @@ sub new { -d $parallel_locks or mkdir $parallel_locks; my $lockdir = "$parallel_locks/$id"; my $lockfile = $lockdir.".lock"; -# Carp::cluck($count); if($count < 1) { die "Semaphore count = $count"; } return bless { 'lockfile' => $lockfile, diff --git a/testsuite/tests-to-run/test32.sh b/testsuite/tests-to-run/test32.sh index c2c6d5bc..13acf177 100644 --- a/testsuite/tests-to-run/test32.sh +++ b/testsuite/tests-to-run/test32.sh @@ -1,20 +1,21 @@ #!/bin/bash echo '### Test of --retries' -seq 1 10 | parallel -k --retries 2 -v -S 4.3.2.1,: echo +seq 1 10 | stdout parallel -k --retries 2 -v -S 4.3.2.1,: echo echo '### Test of --retries - it should run 13 jobs in total' -seq 0 12 | parallel --progress -kj100% --retries 1 -S 12/nlv.pi.dk,1/:,parallel@server2 -vq \ - perl -e 'sleep 1;print "job{}\n";exit({})' | tail -n1 | \ - perl -ne '@a=(split /\//,$_); print $a[1]+$a[4]+$a[7],"\n"' +seq 0 12 | stdout parallel --progress -kj100% --retries 1 -S 12/nlv.pi.dk,1/:,parallel@server2 -vq \ + perl -e 'sleep 1;print "job{}\n";exit({})' | \ + perl -ne 'BEGIN{$/="\r";} @a=(split /\//,$_); END{print $a[1]+$a[4]+$a[7],"\n"}' echo '### Test of --retries - it should run 25 jobs in total' -seq 0 12 | parallel --progress -kj100% --retries 2 -S 12/nlv.pi.dk,1/:,parallel@server2 -vq \ - perl -e 'sleep 1;print "job{}\n";exit({})' | tail -n1 | \ - perl -ne '@a=(split /\//,$_); print $a[1]+$a[4]+$a[7],"\n"' +seq 0 12 | stdout parallel --progress -kj100% --retries 2 -S 12/nlv.pi.dk,1/:,parallel@server2 -vq \ + perl -e 'sleep 1;print "job{}\n";exit({})' | \ + perl -ne 'BEGIN{$/="\r";} @a=(split /\//,$_); END{print $a[1]+$a[4]+$a[7],"\n"}' echo '### Test of --retries - it should run 49 jobs in total' -seq 0 12 | parallel --progress -kj100% --retries 4 -S 12/nlv.pi.dk,1/:,parallel@server2 -vq \ - perl -e 'sleep 1;print "job{}\n";exit({})' | tail -n1 | \ - perl -ne '@a=(split /\//,$_); print $a[1]+$a[4]+$a[7],"\n"' +seq 0 12 | stdout parallel --progress -kj100% --retries 4 -S 12/nlv.pi.dk,1/:,parallel@server2 -vq \ + perl -e 'sleep 1;print "job{}\n";exit({})' | \ + perl -ne 'BEGIN{$/="\r";} @a=(split /\//,$_); END{print $a[1]+$a[4]+$a[7],"\n"}' +