From 5b78d119b5b13623325e6f2244cb65cada2d1f78 Mon Sep 17 00:00:00 2001 From: Ole Tange Date: Mon, 29 Dec 2014 01:16:32 +0100 Subject: [PATCH] parallel: move job failure and print_earlier into Job object. --- doc/boxplot-runtime | 6 +- doc/release_new_version | 4 +- src/parallel | 273 +++++++++++++++++++++++++--------------- src/parallel.pod | 6 +- 4 files changed, 181 insertions(+), 108 deletions(-) diff --git a/doc/boxplot-runtime b/doc/boxplot-runtime index 593810bf..b1cace2d 100644 --- a/doc/boxplot-runtime +++ b/doc/boxplot-runtime @@ -32,8 +32,8 @@ measure() { CORES=$3 VERSION=$4 - # Force cpuspeed at 1.7GHz - forever 'parallel sudo cpufreq-set -f 1700MHz -c{} ::: {0..7}' & + # Force cpuspeed at 1.7GHz - seems to give tighter results + forever 'sleep 10;parallel sudo cpufreq-set -f 1700MHz -c{} ::: {0..7}' & PATH=/tmp/bin:$PATH cd /tmp/bin @@ -53,4 +53,4 @@ _ evince /tmp/boxplot.pdf } -measure 300 1000 8 1 +measure 3000 1000 8 1 diff --git a/doc/release_new_version b/doc/release_new_version index ea2ecdec..194b8dec 100644 --- a/doc/release_new_version +++ b/doc/release_new_version @@ -226,9 +226,9 @@ cc:Tim Cuthbertson , Ryoichiro Suzuki , Jesse Alama -Subject: GNU Parallel 20141222 ('Manila') released +Subject: GNU Parallel 20150122 ('Manila SQ8501') released -GNU Parallel 20141222 ('Manila') has been released. It is available for download at: http://ftp.gnu.org/gnu/parallel/ +GNU Parallel 20150122 ('Manila SQ8501') has been released. It is available for download at: http://ftp.gnu.org/gnu/parallel/ Haiku of the month: diff --git a/src/parallel b/src/parallel index 36dfcf60..cb945cfd 100755 --- a/src/parallel +++ b/src/parallel @@ -158,6 +158,7 @@ if($Global::semaphore) { $sem->release(); } for(keys %Global::sshmaster) { + # If 'ssh -M's are running: kill them kill "TERM", $_; } ::debug("init", "Halt\n"); @@ -986,7 +987,7 @@ sub init_globals { # Modifiable copy of %Global::replace %Global::rpl = %Global::replace; $Global::parens = "{==}"; - $/="\n"; + $/ = "\n"; $Global::ignore_empty = 0; $Global::interactive = 0; $Global::stderr_verbose = 0; @@ -1186,7 +1187,8 @@ sub parse_env_var { if(not @qcsh) { push @qcsh, "true"; } if(not @qbash) { push @qbash, "true"; } # Create lines like: - # echo $SHELL | grep "/t\\{0,1\\}csh" >/dev/null && setenv V1 val1 && setenv V2 val2 || export V1=val1 && export V2=val2 ; echo "$V1$V2" + # echo $SHELL | grep "/t\\{0,1\\}csh" >/dev/null && setenv V1 val1 && + # setenv V2 val2 || export V1=val1 && export V2=val2 ; echo "$V1$V2" if(@vars) { $Global::envvar .= join"", @@ -1641,6 +1643,9 @@ sub __RUNNING_THE_JOBS_AND_PRINTING_PROGRESS__ {} # $Global::total_started = total jobs started sub init_run_jobs { + # Set Global variables and progress signal handlers + # Do the copying of basefiles + # Returns: N/A $Global::total_running = 0; $Global::total_started = 0; $Global::tty_taken = 0; @@ -1654,6 +1659,13 @@ sub init_run_jobs { my %last_mtime; sub changed_procs_file { + # If --jobs is a file and it is modfied: + # Force recomputing of max_jobs_running for each $sshlogin + # Uses: + # $Global::max_procs_file + # $Global::max_procs_file_last_mod + # %Global::host + # Returns: N/A if($Global::max_procs_file) { # --jobs filename my $mtime = (stat($Global::max_procs_file))[9]; @@ -1666,7 +1678,18 @@ sub init_run_jobs { } } } + sub changed_sshloginfile { + # If --slf is changed: + # reload --slf + # filter_hosts + # setup_basefile + # Uses: + # @opt::sshloginfile + # @Global::sshlogin + # %Global::host + # $opt::filter_hosts + # Returns: N/A if(@opt::sshloginfile) { # Is --sshloginfile changed? for my $slf (@opt::sshloginfile) { @@ -1702,12 +1725,8 @@ sub init_run_jobs { # * not server swapping # * not too short time since last remote login # Uses: - # $Global::max_procs_file - # $Global::max_procs_file_last_mod # %Global::host - # @opt::sshloginfile # $Global::start_no_new_jobs - # $opt::filter_hosts # $Global::JobQueue # $opt::pipe # $opt::load @@ -1892,7 +1911,7 @@ sub drain_job_queue { if($opt::progress) { ::status(init_progress()); } - my $last_header=""; + my $last_header = ""; my $sleep = 0.2; do { while($Global::total_running > 0) { @@ -2461,7 +2480,7 @@ sub cleanup_basefile { # %Global::host # @opt::basefile # Returns: N/A - my $cmd=""; + my $cmd = ""; my $workdir = Job->new("")->workdir(); for my $sshlogin (values %Global::host) { if($sshlogin->string() eq ":") { next } @@ -2481,49 +2500,50 @@ sub filter_hosts { # %Global::host # $Global::minimal_command_line_length # $opt::use_cpus_instead_of_cores - # Returns: - # N/A - my (%ncores, %ncpus, %time_to_login, %maxlen, %echo, @down_hosts); + # Returns: N/A my ($ncores_ref, $ncpus_ref, $time_to_login_ref, $maxlen_ref, $echo_ref, $down_hosts_ref) = parse_host_filtering(parallelized_host_filtering()); - %ncores = %$ncores_ref; - %ncpus = %$ncpus_ref; - %time_to_login = %$time_to_login_ref; - %maxlen = %$maxlen_ref; - %echo = %$echo_ref; - @down_hosts = @$down_hosts_ref; - delete @Global::host{@down_hosts}; - @down_hosts and ::warning("Removed @down_hosts\n"); + delete @Global::host{@$down_hosts_ref}; + @$down_hosts_ref and ::warning("Removed @$down_hosts_ref\n"); $Global::minimal_command_line_length = 8_000_000; while (my ($sshlogin, $obj) = each %Global::host) { if($sshlogin eq ":") { next } - $ncpus{$sshlogin} or ::die_bug("ncpus missing: ".$obj->serverlogin()); - $ncores{$sshlogin} or ::die_bug("ncores missing: ".$obj->serverlogin()); - $time_to_login{$sshlogin} or ::die_bug("time_to_login missing: ".$obj->serverlogin()); - $maxlen{$sshlogin} or ::die_bug("maxlen missing: ".$obj->serverlogin()); + $ncpus_ref->{$sshlogin} or ::die_bug("ncpus missing: ".$obj->serverlogin()); + $ncores_ref->{$sshlogin} or ::die_bug("ncores missing: ".$obj->serverlogin()); + $time_to_login_ref->{$sshlogin} or ::die_bug("time_to_login missing: ".$obj->serverlogin()); + $maxlen_ref->{$sshlogin} or ::die_bug("maxlen missing: ".$obj->serverlogin()); if($opt::use_cpus_instead_of_cores) { - $obj->set_ncpus($ncpus{$sshlogin}); + $obj->set_ncpus($ncpus_ref->{$sshlogin}); } else { - $obj->set_ncpus($ncores{$sshlogin}); + $obj->set_ncpus($ncores_ref->{$sshlogin}); } - $obj->set_time_to_login($time_to_login{$sshlogin}); - $obj->set_maxlength($maxlen{$sshlogin}); + $obj->set_time_to_login($time_to_login_ref->{$sshlogin}); + $obj->set_maxlength($maxlen_ref->{$sshlogin}); $Global::minimal_command_line_length = ::min($Global::minimal_command_line_length, - int($maxlen{$sshlogin}/2)); - ::debug("init", "Timing from -S:$sshlogin ncpus:",$ncpus{$sshlogin}, - " ncores:", $ncores{$sshlogin}, - " time_to_login:", $time_to_login{$sshlogin}, - " maxlen:", $maxlen{$sshlogin}, + int($maxlen_ref->{$sshlogin}/2)); + ::debug("init", "Timing from -S:$sshlogin ncpus:",$ncpus_ref->{$sshlogin}, + " ncores:", $ncores_ref->{$sshlogin}, + " time_to_login:", $time_to_login_ref->{$sshlogin}, + " maxlen:", $maxlen_ref->{$sshlogin}, " min_max_len:", $Global::minimal_command_line_length,"\n"); } } sub parse_host_filtering { + # Input: + # @lines = output from parallelized_host_filtering() + # Returns: + # \%ncores = number of cores of {host} + # \%ncpus = number of cpus of {host} + # \%time_to_login = time_to_login on {host} + # \%maxlen = max command len on {host} + # \%echo = echo received from {host} + # \@down_hosts = list of hosts with no answer my (%ncores, %ncpus, %time_to_login, %maxlen, %echo, @down_hosts); for (@_) { @@ -2651,7 +2671,46 @@ sub parallelized_host_filtering { } sub onall { + # Runs @command on all hosts. + # Uses parallel to run @command on each host. + # --jobs = number of hosts to run on simultaneously. + # For each host a parallel command with the args will be running. + # Uses: + # $Global::quoting + # @opt::basefile + # $opt::jobs + # $opt::linebuffer + # $opt::ungroup + # $opt::group + # $opt::keeporder + # $opt::D + # $opt::plain + # $opt::max_chars + # $opt::linebuffer + # $opt::files + # $opt::colsep + # $opt::timeout + # $opt::plain + # $opt::retries + # $opt::max_chars + # $opt::arg_sep + # $opt::arg_file_sep + # @opt::v + # @opt::env + # %Global::host + # $Global::exitstatus + # $Global::debug + # $Global::joblog + # $opt::tag + # $opt::joblog + # Input: + # @command = command to run on all hosts + # Returns: N/A sub tmp_joblog { + # Input: + # $joblog = filename of joblog - undef if none + # Returns: + # $tmpfile = temp file for joblog - undef if none my $joblog = shift; if(not defined $joblog) { return undef; @@ -2665,7 +2724,7 @@ sub onall { @command = shell_quote_empty(@command); } - # Copy all @fhlist into tempfiles + # Copy all @fhlist (-a and :::) into tempfiles my @argfiles = (); for my $fh (@fhlist) { my ($outfh, $name) = ::tmpfile(SUFFIX => ".all", UNLINK => 1); @@ -2765,6 +2824,7 @@ sub save_original_signal_handler { } sub list_running_jobs { + # Print running jobs on tty # Returns: N/A for my $v (values %Global::running) { ::status("$Global::progname: ",$v->replaced(),"\n"); @@ -2772,6 +2832,7 @@ sub list_running_jobs { } sub start_no_new_jobs { + # Start no more jobs # Returns: N/A $SIG{TERM} = $Global::original_sig{TERM}; ::status @@ -2786,11 +2847,22 @@ sub reaper { # A job finished. # Print the output. # Start another job + # Uses: + # %Global::sshmaster + # %Global::running + # $Global::tty_taken + # @Global::slots + # $opt::timeout + # $Global::timeoutq + # $opt::halt_on_error + # $opt::keeporder + # $Global::total_running # Returns: N/A my $stiff; my $children_reaped = 0; debug("run", "Reaper "); while (($stiff = waitpid(-1, &WNOHANG)) > 0) { + # $stiff = pid of dead process $children_reaped++; if($Global::sshmaster{$stiff}) { # This is one of the ssh -M: ignore @@ -2820,12 +2892,12 @@ sub reaper { my $print_now = ($opt::halt_on_error and $opt::halt_on_error == 2 and $job->exitstatus()); if($opt::keeporder and not $print_now) { - print_earlier_jobs($job); + $job->print_earlier_jobs(); } else { $job->print(); } if($job->exitstatus()) { - process_failed_job($job); + $job->fail(); } } @@ -2840,61 +2912,6 @@ sub reaper { return $children_reaped; } -sub process_failed_job { - # The jobs had a exit status <> 0, so error - # Returns: N/A - my $job = shift; - $Global::exitstatus++; - $Global::total_failed++; - if($opt::halt_on_error) { - if($opt::halt_on_error == 1 - or - ($opt::halt_on_error < 1 and $Global::total_failed > 3 - and - $Global::total_failed / $Global::total_started > $opt::halt_on_error)) { - # If halt on error == 1 or --halt 10% - # we should gracefully exit - ::status - ("$Global::progname: Starting no more jobs. ", - "Waiting for ", scalar(keys %Global::running), - " jobs to finish. This job failed:\n", - $job->replaced(),"\n"); - $Global::start_no_new_jobs ||= 1; - $Global::halt_on_error_exitstatus = $job->exitstatus(); - } elsif($opt::halt_on_error == 2) { - # If halt on error == 2 we should exit immediately - ::status - ("$Global::progname: This job failed:\n", - $job->replaced(),"\n"); - exit ($job->exitstatus()); - } - } -} - -{ - my (%print_later,$job_end_sequence); - - sub print_earlier_jobs { - # Print jobs completed earlier - # Returns: N/A - my $job = shift; - $print_later{$job->seq()} = $job; - $job_end_sequence ||= 1; - debug("run", "Looking for: $job_end_sequence ", - "Current: ", $job->seq(), "\n"); - for(my $j = $print_later{$job_end_sequence}; - $j or vec($Global::job_already_run,$job_end_sequence,1); - $job_end_sequence++, - $j = $print_later{$job_end_sequence}) { - debug("run", "Found job end $job_end_sequence"); - if($j) { - $j->print(); - delete $print_later{$job_end_sequence}; - } - } - } -} - sub __USAGE__ {} sub wait_and_exit { @@ -4516,7 +4533,7 @@ sub ncpus { sub no_of_cpus { # Returns: # Number of physical CPUs - local $/="\n"; # If delimiter is set, then $/ will be wrong + local $/ = "\n"; # If delimiter is set, then $/ will be wrong my $no_of_cpus; if ($^O eq 'linux') { $no_of_cpus = no_of_cpus_gnu_linux() || no_of_cores_gnu_linux(); @@ -4574,7 +4591,7 @@ sub no_of_cpus { sub no_of_cores { # Returns: # Number of CPU cores - local $/="\n"; # If delimiter is set, then $/ will be wrong + local $/ = "\n"; # If delimiter is set, then $/ will be wrong my $no_of_cores; if ($^O eq 'linux') { $no_of_cores = no_of_cores_gnu_linux(); @@ -5440,11 +5457,11 @@ sub max_file_name_length { my $upper = 8_000_000; my $len = 8; - my $dir="x"x$len; + my $dir = "x"x$len; do { rmdir($testdir."/".$dir); $len *= 16; - $dir="x"x$len; + $dir = "x"x$len; } while (mkdir $testdir."/".$dir); # Then search for the actual max length between $len/16 and $len my $min = $len/16; @@ -5453,7 +5470,7 @@ sub max_file_name_length { # If we are within 5 chars of the exact value: # it is not worth the extra time to find the exact value my $test = int(($min+$max)/2); - $dir="x"x$test; + $dir = "x"x$test; if(mkdir $testdir."/".$dir) { rmdir($testdir."/".$dir); $min = $test; @@ -5543,7 +5560,7 @@ sub non_block_write { $something_written = $rv; } else { # successfully wrote everything - my $a=""; + my $a = ""; $self->set_stdin_buffer(\$a,\$a,"",""); $something_written = $rv; } @@ -6367,6 +6384,30 @@ sub should_be_retried { } } +{ + my (%print_later,$job_end_sequence); + + sub print_earlier_jobs { + # Print jobs completed earlier + # Returns: N/A + my $job = shift; + $print_later{$job->seq()} = $job; + $job_end_sequence ||= 1; + ::debug("run", "Looking for: $job_end_sequence ", + "Current: ", $job->seq(), "\n"); + for(my $j = $print_later{$job_end_sequence}; + $j or vec($Global::job_already_run,$job_end_sequence,1); + $job_end_sequence++, + $j = $print_later{$job_end_sequence}) { + ::debug("run", "Found job end $job_end_sequence"); + if($j) { + $j->print(); + delete $print_later{$job_end_sequence}; + } + } + } +} + sub print { # Print the output of the jobs # Returns: N/A @@ -6480,8 +6521,8 @@ sub linebuffer_print { # 327680 --tag = 4.4s # 1024000 --tag = 4.4s # 3276800 --tag = 4.3s - # 32768000 --tag = 4.7s # 10240000 --tag = 4.3s + # 32768000 --tag = 4.7s while(read($in_fh,substr($$partial,length $$partial),3276800)) { # Append to $$partial # Find the last \n @@ -6505,7 +6546,7 @@ sub linebuffer_print { # Print up to and including the last \n print $out_fd substr($$partial,0,$i+1); # Remove the printed part - substr($$partial,0,$i+1)=""; + substr($$partial,0,$i+1) = ""; } } if(defined $self->{'exitstatus'}) { @@ -6685,6 +6726,36 @@ sub set_exitsignal { } } +sub fail { + # The jobs had a exit status <> 0, so error + # Returns: N/A + my $job = shift; + $Global::exitstatus++; + $Global::total_failed++; + if($opt::halt_on_error) { + if($opt::halt_on_error == 1 + or + ($opt::halt_on_error < 1 and $Global::total_failed > 3 + and + $Global::total_failed / $Global::total_started > $opt::halt_on_error)) { + # If halt on error == 1 or --halt 10% + # we should gracefully exit + ::status + ("$Global::progname: Starting no more jobs. ", + "Waiting for ", scalar(keys %Global::running), + " jobs to finish. This job failed:\n", + $job->replaced(),"\n"); + $Global::start_no_new_jobs ||= 1; + $Global::halt_on_error_exitstatus = $job->exitstatus(); + } elsif($opt::halt_on_error == 2) { + # If halt on error == 2 we should exit immediately + ::status + ("$Global::progname: This job failed:\n", + $job->replaced(),"\n"); + exit ($job->exitstatus()); + } + } +} package CommandLine; @@ -6730,7 +6801,7 @@ sub seq { sub slot { # Find the number of a free job slot and return it # Uses: - # @Global::slots + # @Global::slots - list with free jobslots # Returns: # $jobslot = number of jobslot my $self = shift; @@ -8152,8 +8223,8 @@ sub new { my $class = shift; my $id = shift; my $count = shift; - $id=~s/([^-_a-z0-9])/unpack("H*",$1)/ige; # Convert non-word chars to hex - $id="id-".$id; # To distinguish it from a process id + $id =~ s/([^-_a-z0-9])/unpack("H*",$1)/ige; # Convert non-word chars to hex + $id = "id-".$id; # To distinguish it from a process id my $parallel_dir = $ENV{'HOME'}."/.parallel"; -d $parallel_dir or mkdir_or_die($parallel_dir); my $parallel_locks = $parallel_dir."/semaphores"; diff --git a/src/parallel.pod b/src/parallel.pod index 201762ed..b4234fed 100644 --- a/src/parallel.pod +++ b/src/parallel.pod @@ -983,12 +983,14 @@ servers. Run all the jobs on all computers given with B<--sshlogin>. GNU B will log into B<--jobs> number of computers in parallel and run one job at a time on the computer. The order of the jobs will -not be changed, but some computers may finish before others. B<-j> -adjusts how many computers to log into in parallel. +not be changed, but some computers may finish before others. When using B<--group> the output will be grouped by each server, so all the output from one server will be grouped together. +B<--joblog> will contain an entry for each job on each server, so +there will be several job sequence 1. + =item B<--output-as-files>