parallel: move job failure and print_earlier into Job object.

This commit is contained in:
Ole Tange 2014-12-29 01:16:32 +01:00
parent fe08937ea9
commit 0992d1a7c9
4 changed files with 181 additions and 108 deletions

View file

@ -32,8 +32,8 @@ measure() {
CORES=$3
VERSION=$4
# Force cpuspeed at 1.7GHz
forever 'parallel sudo cpufreq-set -f 1700MHz -c{} ::: {0..7}' &
# Force cpuspeed at 1.7GHz - seems to give tighter results
forever 'sleep 10;parallel sudo cpufreq-set -f 1700MHz -c{} ::: {0..7}' &
PATH=/tmp/bin:$PATH
cd /tmp/bin
@ -53,4 +53,4 @@ _
evince /tmp/boxplot.pdf
}
measure 300 1000 8 1
measure 3000 1000 8 1

View file

@ -226,9 +226,9 @@ cc:Tim Cuthbertson <tim3d.junk@gmail.com>,
Ryoichiro Suzuki <ryoichiro.suzuki@gmail.com>,
Jesse Alama <jesse.alama@gmail.com>
Subject: GNU Parallel 20141222 ('Manila') released
Subject: GNU Parallel 20150122 ('Manila SQ8501') released
GNU Parallel 20141222 ('Manila') has been released. It is available for download at: http://ftp.gnu.org/gnu/parallel/
GNU Parallel 20150122 ('Manila SQ8501') has been released. It is available for download at: http://ftp.gnu.org/gnu/parallel/
Haiku of the month:

View file

@ -158,6 +158,7 @@ if($Global::semaphore) {
$sem->release();
}
for(keys %Global::sshmaster) {
# If 'ssh -M's are running: kill them
kill "TERM", $_;
}
::debug("init", "Halt\n");
@ -986,7 +987,7 @@ sub init_globals {
# Modifiable copy of %Global::replace
%Global::rpl = %Global::replace;
$Global::parens = "{==}";
$/="\n";
$/ = "\n";
$Global::ignore_empty = 0;
$Global::interactive = 0;
$Global::stderr_verbose = 0;
@ -1186,7 +1187,8 @@ sub parse_env_var {
if(not @qcsh) { push @qcsh, "true"; }
if(not @qbash) { push @qbash, "true"; }
# Create lines like:
# echo $SHELL | grep "/t\\{0,1\\}csh" >/dev/null && setenv V1 val1 && setenv V2 val2 || export V1=val1 && export V2=val2 ; echo "$V1$V2"
# echo $SHELL | grep "/t\\{0,1\\}csh" >/dev/null && setenv V1 val1 &&
# setenv V2 val2 || export V1=val1 && export V2=val2 ; echo "$V1$V2"
if(@vars) {
$Global::envvar .=
join"",
@ -1641,6 +1643,9 @@ sub __RUNNING_THE_JOBS_AND_PRINTING_PROGRESS__ {}
# $Global::total_started = total jobs started
sub init_run_jobs {
# Set Global variables and progress signal handlers
# Do the copying of basefiles
# Returns: N/A
$Global::total_running = 0;
$Global::total_started = 0;
$Global::tty_taken = 0;
@ -1654,6 +1659,13 @@ sub init_run_jobs {
my %last_mtime;
sub changed_procs_file {
# If --jobs is a file and it is modfied:
# Force recomputing of max_jobs_running for each $sshlogin
# Uses:
# $Global::max_procs_file
# $Global::max_procs_file_last_mod
# %Global::host
# Returns: N/A
if($Global::max_procs_file) {
# --jobs filename
my $mtime = (stat($Global::max_procs_file))[9];
@ -1666,7 +1678,18 @@ sub init_run_jobs {
}
}
}
sub changed_sshloginfile {
# If --slf is changed:
# reload --slf
# filter_hosts
# setup_basefile
# Uses:
# @opt::sshloginfile
# @Global::sshlogin
# %Global::host
# $opt::filter_hosts
# Returns: N/A
if(@opt::sshloginfile) {
# Is --sshloginfile changed?
for my $slf (@opt::sshloginfile) {
@ -1702,12 +1725,8 @@ sub init_run_jobs {
# * not server swapping
# * not too short time since last remote login
# Uses:
# $Global::max_procs_file
# $Global::max_procs_file_last_mod
# %Global::host
# @opt::sshloginfile
# $Global::start_no_new_jobs
# $opt::filter_hosts
# $Global::JobQueue
# $opt::pipe
# $opt::load
@ -1892,7 +1911,7 @@ sub drain_job_queue {
if($opt::progress) {
::status(init_progress());
}
my $last_header="";
my $last_header = "";
my $sleep = 0.2;
do {
while($Global::total_running > 0) {
@ -2461,7 +2480,7 @@ sub cleanup_basefile {
# %Global::host
# @opt::basefile
# Returns: N/A
my $cmd="";
my $cmd = "";
my $workdir = Job->new("")->workdir();
for my $sshlogin (values %Global::host) {
if($sshlogin->string() eq ":") { next }
@ -2481,49 +2500,50 @@ sub filter_hosts {
# %Global::host
# $Global::minimal_command_line_length
# $opt::use_cpus_instead_of_cores
# Returns:
# N/A
my (%ncores, %ncpus, %time_to_login, %maxlen, %echo, @down_hosts);
# Returns: N/A
my ($ncores_ref, $ncpus_ref, $time_to_login_ref, $maxlen_ref,
$echo_ref, $down_hosts_ref) =
parse_host_filtering(parallelized_host_filtering());
%ncores = %$ncores_ref;
%ncpus = %$ncpus_ref;
%time_to_login = %$time_to_login_ref;
%maxlen = %$maxlen_ref;
%echo = %$echo_ref;
@down_hosts = @$down_hosts_ref;
delete @Global::host{@down_hosts};
@down_hosts and ::warning("Removed @down_hosts\n");
delete @Global::host{@$down_hosts_ref};
@$down_hosts_ref and ::warning("Removed @$down_hosts_ref\n");
$Global::minimal_command_line_length = 8_000_000;
while (my ($sshlogin, $obj) = each %Global::host) {
if($sshlogin eq ":") { next }
$ncpus{$sshlogin} or ::die_bug("ncpus missing: ".$obj->serverlogin());
$ncores{$sshlogin} or ::die_bug("ncores missing: ".$obj->serverlogin());
$time_to_login{$sshlogin} or ::die_bug("time_to_login missing: ".$obj->serverlogin());
$maxlen{$sshlogin} or ::die_bug("maxlen missing: ".$obj->serverlogin());
$ncpus_ref->{$sshlogin} or ::die_bug("ncpus missing: ".$obj->serverlogin());
$ncores_ref->{$sshlogin} or ::die_bug("ncores missing: ".$obj->serverlogin());
$time_to_login_ref->{$sshlogin} or ::die_bug("time_to_login missing: ".$obj->serverlogin());
$maxlen_ref->{$sshlogin} or ::die_bug("maxlen missing: ".$obj->serverlogin());
if($opt::use_cpus_instead_of_cores) {
$obj->set_ncpus($ncpus{$sshlogin});
$obj->set_ncpus($ncpus_ref->{$sshlogin});
} else {
$obj->set_ncpus($ncores{$sshlogin});
$obj->set_ncpus($ncores_ref->{$sshlogin});
}
$obj->set_time_to_login($time_to_login{$sshlogin});
$obj->set_maxlength($maxlen{$sshlogin});
$obj->set_time_to_login($time_to_login_ref->{$sshlogin});
$obj->set_maxlength($maxlen_ref->{$sshlogin});
$Global::minimal_command_line_length =
::min($Global::minimal_command_line_length,
int($maxlen{$sshlogin}/2));
::debug("init", "Timing from -S:$sshlogin ncpus:",$ncpus{$sshlogin},
" ncores:", $ncores{$sshlogin},
" time_to_login:", $time_to_login{$sshlogin},
" maxlen:", $maxlen{$sshlogin},
int($maxlen_ref->{$sshlogin}/2));
::debug("init", "Timing from -S:$sshlogin ncpus:",$ncpus_ref->{$sshlogin},
" ncores:", $ncores_ref->{$sshlogin},
" time_to_login:", $time_to_login_ref->{$sshlogin},
" maxlen:", $maxlen_ref->{$sshlogin},
" min_max_len:", $Global::minimal_command_line_length,"\n");
}
}
sub parse_host_filtering {
# Input:
# @lines = output from parallelized_host_filtering()
# Returns:
# \%ncores = number of cores of {host}
# \%ncpus = number of cpus of {host}
# \%time_to_login = time_to_login on {host}
# \%maxlen = max command len on {host}
# \%echo = echo received from {host}
# \@down_hosts = list of hosts with no answer
my (%ncores, %ncpus, %time_to_login, %maxlen, %echo, @down_hosts);
for (@_) {
@ -2651,7 +2671,46 @@ sub parallelized_host_filtering {
}
sub onall {
# Runs @command on all hosts.
# Uses parallel to run @command on each host.
# --jobs = number of hosts to run on simultaneously.
# For each host a parallel command with the args will be running.
# Uses:
# $Global::quoting
# @opt::basefile
# $opt::jobs
# $opt::linebuffer
# $opt::ungroup
# $opt::group
# $opt::keeporder
# $opt::D
# $opt::plain
# $opt::max_chars
# $opt::linebuffer
# $opt::files
# $opt::colsep
# $opt::timeout
# $opt::plain
# $opt::retries
# $opt::max_chars
# $opt::arg_sep
# $opt::arg_file_sep
# @opt::v
# @opt::env
# %Global::host
# $Global::exitstatus
# $Global::debug
# $Global::joblog
# $opt::tag
# $opt::joblog
# Input:
# @command = command to run on all hosts
# Returns: N/A
sub tmp_joblog {
# Input:
# $joblog = filename of joblog - undef if none
# Returns:
# $tmpfile = temp file for joblog - undef if none
my $joblog = shift;
if(not defined $joblog) {
return undef;
@ -2665,7 +2724,7 @@ sub onall {
@command = shell_quote_empty(@command);
}
# Copy all @fhlist into tempfiles
# Copy all @fhlist (-a and :::) into tempfiles
my @argfiles = ();
for my $fh (@fhlist) {
my ($outfh, $name) = ::tmpfile(SUFFIX => ".all", UNLINK => 1);
@ -2765,6 +2824,7 @@ sub save_original_signal_handler {
}
sub list_running_jobs {
# Print running jobs on tty
# Returns: N/A
for my $v (values %Global::running) {
::status("$Global::progname: ",$v->replaced(),"\n");
@ -2772,6 +2832,7 @@ sub list_running_jobs {
}
sub start_no_new_jobs {
# Start no more jobs
# Returns: N/A
$SIG{TERM} = $Global::original_sig{TERM};
::status
@ -2786,11 +2847,22 @@ sub reaper {
# A job finished.
# Print the output.
# Start another job
# Uses:
# %Global::sshmaster
# %Global::running
# $Global::tty_taken
# @Global::slots
# $opt::timeout
# $Global::timeoutq
# $opt::halt_on_error
# $opt::keeporder
# $Global::total_running
# Returns: N/A
my $stiff;
my $children_reaped = 0;
debug("run", "Reaper ");
while (($stiff = waitpid(-1, &WNOHANG)) > 0) {
# $stiff = pid of dead process
$children_reaped++;
if($Global::sshmaster{$stiff}) {
# This is one of the ssh -M: ignore
@ -2820,12 +2892,12 @@ sub reaper {
my $print_now = ($opt::halt_on_error and $opt::halt_on_error == 2
and $job->exitstatus());
if($opt::keeporder and not $print_now) {
print_earlier_jobs($job);
$job->print_earlier_jobs();
} else {
$job->print();
}
if($job->exitstatus()) {
process_failed_job($job);
$job->fail();
}
}
@ -2840,61 +2912,6 @@ sub reaper {
return $children_reaped;
}
sub process_failed_job {
# The jobs had a exit status <> 0, so error
# Returns: N/A
my $job = shift;
$Global::exitstatus++;
$Global::total_failed++;
if($opt::halt_on_error) {
if($opt::halt_on_error == 1
or
($opt::halt_on_error < 1 and $Global::total_failed > 3
and
$Global::total_failed / $Global::total_started > $opt::halt_on_error)) {
# If halt on error == 1 or --halt 10%
# we should gracefully exit
::status
("$Global::progname: Starting no more jobs. ",
"Waiting for ", scalar(keys %Global::running),
" jobs to finish. This job failed:\n",
$job->replaced(),"\n");
$Global::start_no_new_jobs ||= 1;
$Global::halt_on_error_exitstatus = $job->exitstatus();
} elsif($opt::halt_on_error == 2) {
# If halt on error == 2 we should exit immediately
::status
("$Global::progname: This job failed:\n",
$job->replaced(),"\n");
exit ($job->exitstatus());
}
}
}
{
my (%print_later,$job_end_sequence);
sub print_earlier_jobs {
# Print jobs completed earlier
# Returns: N/A
my $job = shift;
$print_later{$job->seq()} = $job;
$job_end_sequence ||= 1;
debug("run", "Looking for: $job_end_sequence ",
"Current: ", $job->seq(), "\n");
for(my $j = $print_later{$job_end_sequence};
$j or vec($Global::job_already_run,$job_end_sequence,1);
$job_end_sequence++,
$j = $print_later{$job_end_sequence}) {
debug("run", "Found job end $job_end_sequence");
if($j) {
$j->print();
delete $print_later{$job_end_sequence};
}
}
}
}
sub __USAGE__ {}
sub wait_and_exit {
@ -4516,7 +4533,7 @@ sub ncpus {
sub no_of_cpus {
# Returns:
# Number of physical CPUs
local $/="\n"; # If delimiter is set, then $/ will be wrong
local $/ = "\n"; # If delimiter is set, then $/ will be wrong
my $no_of_cpus;
if ($^O eq 'linux') {
$no_of_cpus = no_of_cpus_gnu_linux() || no_of_cores_gnu_linux();
@ -4574,7 +4591,7 @@ sub no_of_cpus {
sub no_of_cores {
# Returns:
# Number of CPU cores
local $/="\n"; # If delimiter is set, then $/ will be wrong
local $/ = "\n"; # If delimiter is set, then $/ will be wrong
my $no_of_cores;
if ($^O eq 'linux') {
$no_of_cores = no_of_cores_gnu_linux();
@ -5440,11 +5457,11 @@ sub max_file_name_length {
my $upper = 8_000_000;
my $len = 8;
my $dir="x"x$len;
my $dir = "x"x$len;
do {
rmdir($testdir."/".$dir);
$len *= 16;
$dir="x"x$len;
$dir = "x"x$len;
} while (mkdir $testdir."/".$dir);
# Then search for the actual max length between $len/16 and $len
my $min = $len/16;
@ -5453,7 +5470,7 @@ sub max_file_name_length {
# If we are within 5 chars of the exact value:
# it is not worth the extra time to find the exact value
my $test = int(($min+$max)/2);
$dir="x"x$test;
$dir = "x"x$test;
if(mkdir $testdir."/".$dir) {
rmdir($testdir."/".$dir);
$min = $test;
@ -5543,7 +5560,7 @@ sub non_block_write {
$something_written = $rv;
} else {
# successfully wrote everything
my $a="";
my $a = "";
$self->set_stdin_buffer(\$a,\$a,"","");
$something_written = $rv;
}
@ -6367,6 +6384,30 @@ sub should_be_retried {
}
}
{
my (%print_later,$job_end_sequence);
sub print_earlier_jobs {
# Print jobs completed earlier
# Returns: N/A
my $job = shift;
$print_later{$job->seq()} = $job;
$job_end_sequence ||= 1;
::debug("run", "Looking for: $job_end_sequence ",
"Current: ", $job->seq(), "\n");
for(my $j = $print_later{$job_end_sequence};
$j or vec($Global::job_already_run,$job_end_sequence,1);
$job_end_sequence++,
$j = $print_later{$job_end_sequence}) {
::debug("run", "Found job end $job_end_sequence");
if($j) {
$j->print();
delete $print_later{$job_end_sequence};
}
}
}
}
sub print {
# Print the output of the jobs
# Returns: N/A
@ -6480,8 +6521,8 @@ sub linebuffer_print {
# 327680 --tag = 4.4s
# 1024000 --tag = 4.4s
# 3276800 --tag = 4.3s
# 32768000 --tag = 4.7s
# 10240000 --tag = 4.3s
# 32768000 --tag = 4.7s
while(read($in_fh,substr($$partial,length $$partial),3276800)) {
# Append to $$partial
# Find the last \n
@ -6505,7 +6546,7 @@ sub linebuffer_print {
# Print up to and including the last \n
print $out_fd substr($$partial,0,$i+1);
# Remove the printed part
substr($$partial,0,$i+1)="";
substr($$partial,0,$i+1) = "";
}
}
if(defined $self->{'exitstatus'}) {
@ -6685,6 +6726,36 @@ sub set_exitsignal {
}
}
sub fail {
# The jobs had a exit status <> 0, so error
# Returns: N/A
my $job = shift;
$Global::exitstatus++;
$Global::total_failed++;
if($opt::halt_on_error) {
if($opt::halt_on_error == 1
or
($opt::halt_on_error < 1 and $Global::total_failed > 3
and
$Global::total_failed / $Global::total_started > $opt::halt_on_error)) {
# If halt on error == 1 or --halt 10%
# we should gracefully exit
::status
("$Global::progname: Starting no more jobs. ",
"Waiting for ", scalar(keys %Global::running),
" jobs to finish. This job failed:\n",
$job->replaced(),"\n");
$Global::start_no_new_jobs ||= 1;
$Global::halt_on_error_exitstatus = $job->exitstatus();
} elsif($opt::halt_on_error == 2) {
# If halt on error == 2 we should exit immediately
::status
("$Global::progname: This job failed:\n",
$job->replaced(),"\n");
exit ($job->exitstatus());
}
}
}
package CommandLine;
@ -6730,7 +6801,7 @@ sub seq {
sub slot {
# Find the number of a free job slot and return it
# Uses:
# @Global::slots
# @Global::slots - list with free jobslots
# Returns:
# $jobslot = number of jobslot
my $self = shift;
@ -8152,8 +8223,8 @@ sub new {
my $class = shift;
my $id = shift;
my $count = shift;
$id=~s/([^-_a-z0-9])/unpack("H*",$1)/ige; # Convert non-word chars to hex
$id="id-".$id; # To distinguish it from a process id
$id =~ s/([^-_a-z0-9])/unpack("H*",$1)/ige; # Convert non-word chars to hex
$id = "id-".$id; # To distinguish it from a process id
my $parallel_dir = $ENV{'HOME'}."/.parallel";
-d $parallel_dir or mkdir_or_die($parallel_dir);
my $parallel_locks = $parallel_dir."/semaphores";

View file

@ -983,12 +983,14 @@ servers.
Run all the jobs on all computers given with B<--sshlogin>. GNU
B<parallel> will log into B<--jobs> number of computers in parallel
and run one job at a time on the computer. The order of the jobs will
not be changed, but some computers may finish before others. B<-j>
adjusts how many computers to log into in parallel.
not be changed, but some computers may finish before others.
When using B<--group> the output will be grouped by each server, so
all the output from one server will be grouped together.
B<--joblog> will contain an entry for each job on each server, so
there will be several job sequence 1.
=item B<--output-as-files>