parallel: Code re-org to remove $Private::variables.

This commit is contained in:
Ole Tange 2014-08-27 23:29:15 +02:00
parent 53883d3e31
commit 75d055970f
5 changed files with 159 additions and 121 deletions

View file

@ -224,9 +224,9 @@ cc:Tim Cuthbertson <tim3d.junk@gmail.com>,
Ryoichiro Suzuki <ryoichiro.suzuki@gmail.com>, Ryoichiro Suzuki <ryoichiro.suzuki@gmail.com>,
Jesse Alama <jesse.alama@gmail.com> Jesse Alama <jesse.alama@gmail.com>
Subject: GNU Parallel 20140822 ('Williams') released Subject: GNU Parallel 20140922 ('Attenborough') released
GNU Parallel 20140822 ('Williams') has been released. It is available for download at: http://ftp.gnu.org/gnu/parallel/ GNU Parallel 20140922 ('Attenborough') has been released. It is available for download at: http://ftp.gnu.org/gnu/parallel/
Haiku of the month: Haiku of the month:

View file

@ -724,13 +724,12 @@ sub get_options_from_array {
sub parse_options { sub parse_options {
# Returns: N/A # Returns: N/A
# Defaults: # Defaults:
$Global::version = 20140823; $Global::version = 20140826;
$Global::progname = 'parallel'; $Global::progname = 'parallel';
$Global::infinity = 2**31; $Global::infinity = 2**31;
$Global::debug = 0; $Global::debug = 0;
$Global::verbose = 0; $Global::verbose = 0;
$Global::grouped = 1; $Global::grouped = 1;
$Global::keeporder = 0;
$Global::quoting = 0; $Global::quoting = 0;
# Read only table with default --rpl values # Read only table with default --rpl values
%Global::replace = %Global::replace =
@ -782,7 +781,6 @@ sub parse_options {
$Global::shell = parent_shell($$) || $ENV{'SHELL'} || "/bin/sh"; $Global::shell = parent_shell($$) || $ENV{'SHELL'} || "/bin/sh";
if(defined $opt::X) { $Global::ContextReplace = 1; } if(defined $opt::X) { $Global::ContextReplace = 1; }
if(defined $opt::silent) { $Global::verbose = 0; } if(defined $opt::silent) { $Global::verbose = 0; }
if(defined $opt::keeporder) { $Global::keeporder = 1; }
if(defined $opt::group) { $Global::grouped = 1; } if(defined $opt::group) { $Global::grouped = 1; }
if(defined $opt::u) { $Global::grouped = 0; } if(defined $opt::u) { $Global::grouped = 0; }
if(defined $opt::0) { $/ = "\0"; } if(defined $opt::0) { $/ = "\0"; }
@ -1803,7 +1801,6 @@ sub init_progress {
sub drain_job_queue { sub drain_job_queue {
# Returns: N/A # Returns: N/A
$Private::first_completed ||= time;
if($opt::progress) { if($opt::progress) {
print $Global::original_stderr init_progress(); print $Global::original_stderr init_progress();
} }
@ -1876,9 +1873,20 @@ sub toggle_progress {
sub progress { sub progress {
# Returns: # Returns:
# list of workers # $workerlist = list of workers
# header that will fit on the screen # $header = that will fit on the screen
# status message that will fit on the screen # $status = message that will fit on the screen
if($opt::bar) {
return ("workerlist" => "", "header" => "", "status" => bar());
}
my $eta = "";
my ($status,$header)=("","");
if($opt::eta) {
my($total, $completed, $left, $pctcomplete, $avgtime, $this_eta) =
compute_eta();
$eta = sprintf("ETA: %ds Left: %d AVG: %.2fs ",
$this_eta, $left, $avgtime);
}
my $termcols = terminal_columns(); my $termcols = terminal_columns();
my @workers = sort keys %Global::host; my @workers = sort keys %Global::host;
my %sshlogin = map { $_ eq ":" ? ($_=>"local") : ($_=>$_) } @workers; my %sshlogin = map { $_ eq ":" ? ($_=>"local") : ($_=>$_) } @workers;
@ -1891,60 +1899,6 @@ sub progress {
($Global::host{$w}->ncpus() || "-")." / ". ($Global::host{$w}->ncpus() || "-")." / ".
$Global::host{$w}->max_jobs_running()."\n"; $Global::host{$w}->max_jobs_running()."\n";
} }
my $eta = "";
my ($status,$header)=("","");
if($opt::eta or $opt::bar) {
my $completed = 0;
for(@workers) { $completed += $Global::host{$_}->jobs_completed() }
if($completed) {
my $total = $Global::JobQueue->total_jobs();
my $left = $total - $completed;
my $pctcomplete = $completed / $total;
my $timepassed = (time - $Private::first_completed);
my $avgtime = $timepassed / $completed;
$Private::smoothed_avg_time ||= $avgtime;
# Smooth the eta so it does not jump wildly
$Private::smoothed_avg_time = (1 - $pctcomplete) *
$Private::smoothed_avg_time + $pctcomplete * $avgtime;
my $this_eta;
$Private::last_time ||= $timepassed;
if($timepassed != $Private::last_time
or not defined $Private::last_eta) {
$Private::last_time = $timepassed;
$this_eta = $left * $Private::smoothed_avg_time;
$Private::last_eta = $this_eta;
} else {
$this_eta = $Private::last_eta;
}
$eta = sprintf("ETA: %ds Left: %d AVG: %.2fs ", $this_eta, $left, $avgtime);
if($opt::bar) {
my $arg = $Global::newest_job ?
$Global::newest_job->{'commandline'}->replace_placeholders(["\257<\257>"],0,0) : "";
# [\011\013\014] messes up display in the terminal
$arg =~ tr/[\011-\016\033\302-\365]//d;
my $bar_text =
sprintf("%d%% %d:%d=%ds %s",
$pctcomplete*100, $completed, $left, $this_eta, $arg);
my $rev = "\033[7m";
my $reset = "\033[0m";
my $terminal_width = terminal_columns();
my $s = sprintf("%-${terminal_width}s",
substr($bar_text." "x$terminal_width,
0,$terminal_width));
my $width = int($terminal_width * $pctcomplete);
substr($s,$width,0) = $reset;
my $zenity = sprintf("%-${terminal_width}s",
substr("# " . int($this_eta) . " sec $arg",
0,$terminal_width));
$s = "\r" . $zenity . "\r" . $pctcomplete*100 . # Prefix with zenity header
"\r" . $rev . $s . $reset;
$status = $s;
}
}
}
if($opt::bar) {
return ("workerlist" => "", "header" => "", "status" => $status);
}
$status = "x"x($termcols+1); $status = "x"x($termcols+1);
if(length $status > $termcols) { if(length $status > $termcols) {
# sshlogin1:XX/XX/XX%/XX.Xs sshlogin2:XX/XX/XX%/XX.Xs sshlogin3:XX/XX/XX%/XX.Xs # sshlogin1:XX/XX/XX%/XX.Xs sshlogin2:XX/XX/XX%/XX.Xs sshlogin3:XX/XX/XX%/XX.Xs
@ -2060,13 +2014,78 @@ sub progress {
} }
{ {
my $columns; my ($total,$first_completed,$smoothed_avg_time);
sub compute_eta {
# Calculate important numbers for ETA
# Returns:
# $total = number of jobs in total
# $completed = number of jobs completed
# $left = number of jobs left
# $pctcomplete = percent of jobs completed
# $avgtime = averaged time
# $eta = smoothed eta
$total ||= $Global::JobQueue->total_jobs();
my $completed = 0;
for(values %Global::host) { $completed += $_->jobs_completed() }
my $left = $total - $completed;
if(not $completed) {
return($total, $completed, $left, 0, 0, 0);
}
my $pctcomplete = $completed / $total;
$first_completed ||= time;
my $timepassed = (time - $first_completed);
my $avgtime = $timepassed / $completed;
$smoothed_avg_time ||= $avgtime;
# Smooth the eta so it does not jump wildly
$smoothed_avg_time = (1 - $pctcomplete) * $smoothed_avg_time +
$pctcomplete * $avgtime;
my $eta = int($left * $smoothed_avg_time);
return($total, $completed, $left, $pctcomplete, $avgtime, $eta);
}
}
{
my ($rev,$reset);
sub bar {
# Return:
# $status = bar with eta, completed jobs, arg and pct
$rev ||= "\033[7m";
$reset ||= "\033[0m";
my($total, $completed, $left, $pctcomplete, $avgtime, $eta) =
compute_eta();
my $arg = $Global::newest_job ?
$Global::newest_job->{'commandline'}->replace_placeholders(["\257<\257>"],0,0) : "";
# These chars mess up display in the terminal
$arg =~ tr/[\011-\016\033\302-\365]//d;
my $bar_text =
sprintf("%d%% %d:%d=%ds %s",
$pctcomplete*100, $completed, $left, $eta, $arg);
my $terminal_width = terminal_columns();
my $s = sprintf("%-${terminal_width}s",
substr($bar_text." "x$terminal_width,
0,$terminal_width));
my $width = int($terminal_width * $pctcomplete);
substr($s,$width,0) = $reset;
my $zenity = sprintf("%-${terminal_width}s",
substr("# $eta sec $arg",
0,$terminal_width));
$s = "\r" . $zenity . "\r" . $pctcomplete*100 . # Prefix with zenity header
"\r" . $rev . $s . $reset;
return $s;
}
}
{
my ($columns,$last_column_time);
sub terminal_columns { sub terminal_columns {
# Get the number of columns of the display # Get the number of columns of the display
# Returns: # Returns:
# number of columns of the screen # number of columns of the screen
if(not $columns) { if(not $columns or $last_column_time < time) {
$last_column_time = time;
$columns = $ENV{'COLUMNS'}; $columns = $ENV{'COLUMNS'};
if(not $columns) { if(not $columns) {
my $resize = qx{ resize 2>/dev/null }; my $resize = qx{ resize 2>/dev/null };
@ -2571,52 +2590,15 @@ sub reaper {
# Force printing now if the job failed and we are going to exit # Force printing now if the job failed and we are going to exit
my $print_now = ($opt::halt_on_error and $opt::halt_on_error == 2 my $print_now = ($opt::halt_on_error and $opt::halt_on_error == 2
and $job->exitstatus()); and $job->exitstatus());
if($Global::keeporder and not $print_now) { if($opt::keeporder and not $print_now) {
$Private::print_later{$job->seq()} = $job; print_earlier_jobs($job);
$Private::job_end_sequence ||= 1;
debug("run", "Looking for: $Private::job_end_sequence ",
"Current: ", $job->seq(), "\n");
for(my $j = $Private::print_later{$Private::job_end_sequence};
$j or vec($Global::job_already_run,$Private::job_end_sequence,1);
$Private::job_end_sequence++,
$j = $Private::print_later{$Private::job_end_sequence}) {
debug("run", "Found job end $Private::job_end_sequence");
if($j) {
$j->print();
delete $Private::print_later{$Private::job_end_sequence};
}
}
} else { } else {
$job->print(); $job->print();
} }
if($job->exitstatus()) { if($job->exitstatus()) {
# The jobs had a exit status <> 0, so error process_failed_job($job);
$Global::exitstatus++; }
$Global::total_failed++;
if($opt::halt_on_error) {
if($opt::halt_on_error == 1
or
($opt::halt_on_error < 1 and $Global::total_failed > 3
and
$Global::total_failed / $Global::total_started > $opt::halt_on_error)) {
# If halt on error == 1 or --halt 10%
# we should gracefully exit
print $Global::original_stderr
("$Global::progname: Starting no more jobs. ",
"Waiting for ", scalar(keys %Global::running),
" jobs to finish. This job failed:\n",
$job->replaced(),"\n");
$Global::start_no_new_jobs ||= 1;
$Global::halt_on_error_exitstatus = $job->exitstatus();
} elsif($opt::halt_on_error == 2) {
# If halt on error == 2 we should exit immediately
print $Global::original_stderr
("$Global::progname: This job failed:\n",
$job->replaced(),"\n");
exit ($job->exitstatus());
}
}
}
} }
my $sshlogin = $job->sshlogin(); my $sshlogin = $job->sshlogin();
$sshlogin->dec_jobs_running(); $sshlogin->dec_jobs_running();
@ -2629,6 +2611,61 @@ sub reaper {
return $children_reaped; return $children_reaped;
} }
sub process_failed_job {
# The jobs had a exit status <> 0, so error
# Returns: N/A
my $job = shift;
$Global::exitstatus++;
$Global::total_failed++;
if($opt::halt_on_error) {
if($opt::halt_on_error == 1
or
($opt::halt_on_error < 1 and $Global::total_failed > 3
and
$Global::total_failed / $Global::total_started > $opt::halt_on_error)) {
# If halt on error == 1 or --halt 10%
# we should gracefully exit
print $Global::original_stderr
("$Global::progname: Starting no more jobs. ",
"Waiting for ", scalar(keys %Global::running),
" jobs to finish. This job failed:\n",
$job->replaced(),"\n");
$Global::start_no_new_jobs ||= 1;
$Global::halt_on_error_exitstatus = $job->exitstatus();
} elsif($opt::halt_on_error == 2) {
# If halt on error == 2 we should exit immediately
print $Global::original_stderr
("$Global::progname: This job failed:\n",
$job->replaced(),"\n");
exit ($job->exitstatus());
}
}
}
{
my (%print_later,$job_end_sequence);
sub print_earlier_jobs {
# Print jobs completed earlier
# Returns: N/A
my $job = shift;
$print_later{$job->seq()} = $job;
$job_end_sequence ||= 1;
debug("run", "Looking for: $job_end_sequence ",
"Current: ", $job->seq(), "\n");
for(my $j = $print_later{$job_end_sequence};
$j or vec($Global::job_already_run,$job_end_sequence,1);
$job_end_sequence++,
$j = $print_later{$job_end_sequence}) {
debug("run", "Found job end $job_end_sequence");
if($j) {
$j->print();
delete $print_later{$job_end_sequence};
}
}
}
}
sub __USAGE__ {} sub __USAGE__ {}
sub wait_and_exit { sub wait_and_exit {
@ -2879,13 +2916,16 @@ sub undef_as_empty {
return $a ? $a : ""; return $a ? $a : "";
} }
sub hostname { {
if(not $Private::hostname) { my $hostname;
my $hostname = `hostname`; sub hostname {
chomp($hostname); if(not $hostname) {
$Private::hostname = $hostname || "nohostname"; $hostname = `hostname`;
chomp($hostname);
$hostname ||= "nohostname";
}
return $hostname;
} }
return $Private::hostname;
} }
sub which { sub which {

View file

@ -2313,7 +2313,7 @@ To see the difference between file A and file B look at the file
=head1 EXAMPLE: Speeding up fast jobs =head1 EXAMPLE: Speeding up fast jobs
Starting a job on the local machine takes around 3 ms. This can be a Starting a job on the local machine takes around 10 ms. This can be a
big overhead if the job takes very few ms to run. Often you can group big overhead if the job takes very few ms to run. Often you can group
small jobs together using B<-X> which will make the overhead less small jobs together using B<-X> which will make the overhead less
significant. Compare the speed of these: significant. Compare the speed of these:
@ -2327,10 +2327,9 @@ B<parallel> to spawn multiple GNU B<parallel>s:
seq -w 0 999999 | parallel -j10 --pipe parallel -j0 touch pict{}.jpg seq -w 0 999999 | parallel -j10 --pipe parallel -j0 touch pict{}.jpg
If B<-j0> normally spawns 506 jobs, then the above will try to spawn If B<-j0> normally spawns 252 jobs, then the above will try to spawn
5060 jobs. It is likely that you this way will hit the limit of number 2520 jobs. On a normal GNU/Linux system you can spawn 32000 jobs using
of processes and/or filehandles. Look at 'ulimit -n' and 'ulimit -u' this technique with no problems.
to raise these limits.
=head1 EXAMPLE: Using shell variables =head1 EXAMPLE: Using shell variables

View file

@ -139,7 +139,7 @@ make[0]: Entering directory `/tmp/parallel-00000000/src'
make install-exec-hook make install-exec-hook
make[0]: Entering directory `/tmp/parallel-00000000/src' make[0]: Entering directory `/tmp/parallel-00000000/src'
rm /usr/local/bin/sem || true rm /usr/local/bin/sem || true
ln -s /usr/local/bin/parallel /usr/local/bin/sem ln -s parallel /usr/local/bin/sem
make[0]: Leaving directory `/tmp/parallel-00000000/src' make[0]: Leaving directory `/tmp/parallel-00000000/src'
pod2html --title "GNU Parallel" ./parallel.pod > ./parallel.htmln \ pod2html --title "GNU Parallel" ./parallel.pod > ./parallel.htmln \
&& mv ./parallel.htmln ./parallel.html \ && mv ./parallel.htmln ./parallel.html \

View file

@ -8,7 +8,7 @@ echo '### Test of --eta with no jobs'
Computers / CPU cores / Max jobs to run Computers / CPU cores / Max jobs to run
1:local / 8 / 1 1:local / 8 / 1
0 ETA: 0s Left: 0 AVG: 0.00s 0
echo '### Test of --progress' echo '### Test of --progress'
### Test of --progress ### Test of --progress
seq 1 10 | stdout parallel --progress "sleep 1; echo {}" | wc -l seq 1 10 | stdout parallel --progress "sleep 1; echo {}" | wc -l
@ -29,7 +29,6 @@ Computers / CPU cores / Max jobs to run
1:local / 8 / 1 1:local / 8 / 1
Computer:jobs running/jobs completed/%of started jobs/Average seconds to complete Computer:jobs running/jobs completed/%of started jobs/Average seconds to complete
local:1/0/100%/0.0s 1 2
echo '### --timeout --onall on remote machines: 2*slept 1, 2 jobs failed' echo '### --timeout --onall on remote machines: 2*slept 1, 2 jobs failed'
### --timeout --onall on remote machines: 2*slept 1, 2 jobs failed ### --timeout --onall on remote machines: 2*slept 1, 2 jobs failed
parallel -j0 --timeout 6 --onall -S localhost,parallel@parallel-server1 'sleep {}; echo slept {}' ::: 1 8 9 ; echo jobs failed: $? parallel -j0 --timeout 6 --onall -S localhost,parallel@parallel-server1 'sleep {}; echo slept {}' ::: 1 8 9 ; echo jobs failed: $?