parallel: Code cleanup

This commit is contained in:
Ole Tange 2010-12-04 04:06:27 +01:00
parent 0b31661e8e
commit e907f723e7
2 changed files with 56 additions and 79 deletions

View file

@ -2990,25 +2990,23 @@ sub cleanup {
sub shell_quote { sub shell_quote {
# Quote the string so shell will not expand any special chars
# Returns:
# string quoted with \ as needed by the shell
my (@strings) = (@_); my (@strings) = (@_);
my $arg; my $arg;
for $arg (@strings) { for $arg (@strings) {
if($::oodebug and not defined $arg) { $a =~ s/([\002-\011\013-\032\\\#\?\`\(\)\*\>\<\~\|\; \"\!\$\&\'])/\\$1/g;
Carp::cluck($arg); $a =~ s/[\n]/'\n'/g; # filenames with '\n' is quoted using \'
} # $arg =~ s/\\/\\\\/g;
$arg =~ s/\\/\\\\/g; # $arg =~ s/([\#\?\`\(\)\*\>\<\~\|\; \"\!\$\&\'])/\\$1/g;
$arg =~ s/([\#\?\`\(\)\*\>\<\~\|\; \"\!\$\&\'])/\\$1/g; # $arg =~ s/([\002-\011\013-\032])/\\$1/g;
$arg =~ s/([\002-\011\013-\032])/\\$1/g; # $arg =~ s/([\n])/'\n'/g; # filenames with '\n' is quoted using \'
$arg =~ s/([\n])/'\n'/g; # filenames with '\n' is quoted using \'
} }
return wantarray ? @strings : "@strings"; return wantarray ? @strings : "@strings";
} }
sub shell_quote_scalar { sub shell_quote_scalar {
# Shell quote a scalar # Quote the string so shell will not expand any special chars
# Returns:
# string quoted with \ as needed by the shell
my $a = shift; my $a = shift;
$a =~ s/([\002-\011\013-\032\\\#\?\`\(\)\*\>\<\~\|\; \"\!\$\&\'])/\\$1/g; $a =~ s/([\002-\011\013-\032\\\#\?\`\(\)\*\>\<\~\|\; \"\!\$\&\'])/\\$1/g;
$a =~ s/[\n]/'\n'/g; # filenames with '\n' is quoted using \' $a =~ s/[\n]/'\n'/g; # filenames with '\n' is quoted using \'
@ -3032,8 +3030,6 @@ sub shell_unquote {
sub __NUMBER_OF_PROCESSES_FILEHANDLES_MAX_LENGTH_OF_COMMAND_LINE__ {} sub __NUMBER_OF_PROCESSES_FILEHANDLES_MAX_LENGTH_OF_COMMAND_LINE__ {}
sub enough_file_handles { sub enough_file_handles {
# check that we have enough filehandles available for starting # check that we have enough filehandles available for starting
# another job # another job
@ -3122,35 +3118,10 @@ sub __RUNNING_AND_PRINTING_THE_JOBS__ {}
# Variable structure: # Variable structure:
# #
# $Global::failed{$clean_command}{'count'}{$sshlogin} = number of times failed on this sshlogin # $Global::running{$pid} = Pointer to Job-object
# $Global::failed{$clean_command}{'seq'} = original sequence number # $Global::host{$sshlogin} = Pointer to SSHLogin-object
# $Global::running{$pid}{'seq'} = printsequence
# $Global::running{$pid}{sshlogin} = server to run on
# $Global::running{$pid}{'exitstatus'} = exit status
# $Global::running{$pid}{'out'} = stdout filehandle
# $Global::running{$pid}{'err'} = stderr filehandle
# $Global::running{$pid}{'command'} = command being run (including rsync/ssh and args)
# $Global::running{$pid}{'cleancommand'} = command being run (excluding rsync/ssh but including args)
# $Global::host{$sshlogin}{'no_of_running'} = number of currently running jobs
# $Global::host{$sshlogin}{'completed'} = number of completed jobs
# $Global::host{$sshlogin}{'ncpus'} = number of CPUs (or CPU cores)
# $Global::host{$sshlogin}{'maxlength'} = max line length (currently buggy for remote)
# $Global::host{$sshlogin}{'max_no_of_running'} = max parallel running jobs
# $Global::host{$sshlogin}{'sshcmd'} = command to use as ssh
# $Global::host{$sshlogin}{'serverlogin'} = username@hostname
# $Global::total_running = total number of running jobs # $Global::total_running = total number of running jobs
# $Global::total_started = total jobs started # $Global::total_started = total jobs started
# $Global::total_jobs = total jobs to be started at all
# $Global::total_completed = total jobs completed
# @Global::unget_lines = raw argument lines - needs quoting and splitting
#
# Flow:
# Get_line: Line is read from file or stdin. Delimiter is chopped
# Get_line_argv: Line is read from ARGV - no delimiter
# Get column: Multiple -a or --colsep
# Get column: @ARGV
# Quote column:
# get_quoted_args
sub init_run_jobs { sub init_run_jobs {
# Remember the original STDOUT and STDERR # Remember the original STDOUT and STDERR
@ -3160,7 +3131,6 @@ sub init_run_jobs {
open $Global::original_stdin, "<&STDIN" or die "Can't dup STDIN: $!"; open $Global::original_stdin, "<&STDIN" or die "Can't dup STDIN: $!";
$Global::total_running = 0; $Global::total_running = 0;
$Global::total_started = 0; $Global::total_started = 0;
$Global::total_completed = 0;
$Global::tty_taken = 0; $Global::tty_taken = 0;
$SIG{USR1} = \&list_running_jobs; $SIG{USR1} = \&list_running_jobs;
$SIG{USR2} = \&toggle_progress; $SIG{USR2} = \&toggle_progress;
@ -3684,45 +3654,24 @@ sub reaper {
next; next;
} }
# Ignore processes that we did not start # Ignore processes that we did not start
$Global::running{$stiff} or next; my $job = $Global::running{$stiff};
$Global::running{$stiff}->set_exitstatus($? >> 8); $job or next;
debug("died (".$Global::running{$stiff}->exitstatus()."): ".$Global::running{$stiff}->seq()); $job->set_exitstatus($? >> 8);
debug("died (".$job->exitstatus()."): ".$job->seq());
if($stiff == $Global::tty_taken) { if($stiff == $Global::tty_taken) {
# The process that died had the tty => release it # The process that died had the tty => release it
$Global::tty_taken = 0; $Global::tty_taken = 0;
} }
my $retry_job = 0;
if ($::opt_retries) {
my $job = $Global::running{$stiff};
my $clean_command = $job->replaced();
if(not $job->exitstatus()) {
# Completed with success. If there is a recorded failure: forget it
$job->reset_failed_here();
} else {
# The job failed. Should it be retried?
$job->add_failed_here();
if($job->total_failed() == $::opt_retries) {
# This has been retried enough
$retry_job = 0;
} else {
# This command should be retried
$Global::JobQueue->unget($job);
$retry_job = 1;
::debug("Retry ".$job->seq()."\n");
}
}
}
if(not $retry_job) { if(not $job->should_be_retried()) {
# Force printing now if the job failed and we are going to exit # Force printing now if the job failed and we are going to exit
my $print_now = ($Global::running{$stiff}->exitstatus() and my $print_now = ($job->exitstatus() and
$::opt_halt_on_error and $::opt_halt_on_error == 2); $::opt_halt_on_error and $::opt_halt_on_error == 2);
if($Global::keeporder and not $print_now) { if($Global::keeporder and not $print_now) {
$Private::print_later{$Global::running{$stiff}->seq()} = $Private::print_later{$job->seq()} = $job;
$Global::running{$stiff};
$Private::job_end_sequence ||= 1; $Private::job_end_sequence ||= 1;
debug("Looking for: $Private::job_end_sequence ". debug("Looking for: $Private::job_end_sequence ".
"Current: ".$Global::running{$stiff}->seq()."\n"); "Current: ".$job->seq()."\n");
while($Private::print_later{$Private::job_end_sequence}) { while($Private::print_later{$Private::job_end_sequence}) {
debug("Found job end $Private::job_end_sequence"); debug("Found job end $Private::job_end_sequence");
$Private::print_later{$Private::job_end_sequence}->print(); $Private::print_later{$Private::job_end_sequence}->print();
@ -3730,9 +3679,9 @@ sub reaper {
$Private::job_end_sequence++; $Private::job_end_sequence++;
} }
} else { } else {
$Global::running{$stiff}->print(); $job->print();
} }
if($Global::running{$stiff}->exitstatus()) { if($job->exitstatus()) {
# The jobs had a exit status <> 0, so error # The jobs had a exit status <> 0, so error
$Global::exitstatus++; $Global::exitstatus++;
if($::opt_halt_on_error) { if($::opt_halt_on_error) {
@ -3741,23 +3690,22 @@ sub reaper {
print STDERR ("$Global::progname: Starting no more jobs. ", print STDERR ("$Global::progname: Starting no more jobs. ",
"Waiting for ", scalar(keys %Global::running), "Waiting for ", scalar(keys %Global::running),
" jobs to finish. This job failed:\n", " jobs to finish. This job failed:\n",
$Global::running{$stiff}->replaced(),"\n"); $job->replaced(),"\n");
$Global::start_no_new_jobs++; $Global::start_no_new_jobs++;
$Global::halt_on_error_exitstatus = $Global::running{$stiff}->exitstatus(); $Global::halt_on_error_exitstatus = $job->exitstatus();
} elsif($::opt_halt_on_error == 2) { } elsif($::opt_halt_on_error == 2) {
# If halt on error == 2 we should exit immediately # If halt on error == 2 we should exit immediately
print STDERR ("$Global::progname: This job failed:\n", print STDERR ("$Global::progname: This job failed:\n",
$Global::running{$stiff}->replaced(),"\n"); $job->replaced(),"\n");
exit ($Global::running{$stiff}->exitstatus()); exit ($job->exitstatus());
} }
} }
} }
} }
my $sshlogin = $Global::running{$stiff}->sshlogin(); my $sshlogin = $job->sshlogin();
$sshlogin->dec_jobs_running(); $sshlogin->dec_jobs_running();
$sshlogin->inc_jobs_completed(); $sshlogin->inc_jobs_completed();
$Global::total_running--; $Global::total_running--;
$Global::total_completed++;
delete $Global::running{$stiff}; delete $Global::running{$stiff};
start_more_jobs(); start_more_jobs();
} }
@ -4996,6 +4944,34 @@ sub start {
return $job; return $job;
} }
sub should_be_retried {
# Should this job be retried?
# Returns
# 0 - do not retry
# 1 - job queued for retry
my $self = shift;
if (not $::opt_retries) {
return 0;
}
if(not $self->exitstatus()) {
# Completed with success. If there is a recorded failure: forget it
$self->reset_failed_here();
return 0
} else {
# The job failed. Should it be retried?
$self->add_failed_here();
if($self->total_failed() == $::opt_retries) {
# This has been retried enough
return 0;
} else {
# This command should be retried
$Global::JobQueue->unget($self);
::debug("Retry ".$self->seq()."\n");
return 1;
}
}
}
sub print { sub print {
# Print the output of the jobs # Print the output of the jobs
# Returns: N/A # Returns: N/A

View file

@ -25,6 +25,7 @@ parallel -k -X echo {2.} ::: /a/number1.c a/number2.c number3.c /a/number4 a/num
SERVER1=parallel-server3 SERVER1=parallel-server3
SERVER2=parallel-server2 SERVER2=parallel-server2
rm -rf tmp
echo "### Test combined --return {/}_{/.}_{#/.}_{#/}_{#.}" echo "### Test combined --return {/}_{/.}_{#/.}_{#/}_{#.}"
stdout parallel -k -Xv --cleanup --return tmp/{/}_{/.}_{2/.}_{2/}_{2.}/file -S parallel@$SERVER2 \ stdout parallel -k -Xv --cleanup --return tmp/{/}_{/.}_{2/.}_{2/}_{2.}/file -S parallel@$SERVER2 \