parallel: Prepare for better error handling of parallel running of --basefile and --cleanup.

This commit is contained in:
Ole Tange 2016-08-04 00:01:41 +02:00
parent a04e3e6740
commit 223cdb4754

View file

@ -2950,7 +2950,7 @@ sub setup_basefile {
# %Global::host # %Global::host
# @opt::basefile # @opt::basefile
# Returns: N/A # Returns: N/A
my $cmd = ""; my @cmd;
my $rsync_destdir; my $rsync_destdir;
my $workdir; my $workdir;
for my $sshlogin (values %Global::host) { for my $sshlogin (values %Global::host) {
@ -2965,12 +2965,18 @@ sub setup_basefile {
my $dummyjob = Job->new($dummycmdline); my $dummyjob = Job->new($dummycmdline);
$workdir = $dummyjob->workdir(); $workdir = $dummyjob->workdir();
} }
$cmd .= $sshlogin->rsync_transfer_cmd($file,$workdir) . "&"; push @cmd, $sshlogin->rsync_transfer_cmd($file,$workdir);
} }
} }
$cmd .= "wait;"; debug("init", "basesetup: @cmd\n");
debug("init", "basesetup: $cmd\n"); my ($exitstatus,$stdout_ref,$stderr_ref) =
print `$cmd`; run_parallel((join "\n",@cmd),"-j0","--retries",5);
if($exitstatus) {
my @stdout = @$stdout_ref;
my @stderr = @$stderr_ref;
::error("Copying of --basefile failed: @stdout@stderr");
::wait_and_exit(255);
}
} }
sub cleanup_basefile { sub cleanup_basefile {
@ -2979,7 +2985,7 @@ sub cleanup_basefile {
# %Global::host # %Global::host
# @opt::basefile # @opt::basefile
# Returns: N/A # Returns: N/A
my $cmd = ""; my @cmd;
my $workdir; my $workdir;
if(not $workdir) { if(not $workdir) {
my $dummycmdline = CommandLine->new(1,"true",0,0,0,0,0,{},{},{}); my $dummycmdline = CommandLine->new(1,"true",0,0,0,0,0,{},{},{});
@ -2989,12 +2995,69 @@ sub cleanup_basefile {
for my $sshlogin (values %Global::host) { for my $sshlogin (values %Global::host) {
if($sshlogin->string() eq ":") { next } if($sshlogin->string() eq ":") { next }
for my $file (@opt::basefile) { for my $file (@opt::basefile) {
$cmd .= $sshlogin->cleanup_cmd($file,$workdir)."&"; push @cmd, $sshlogin->cleanup_cmd($file,$workdir);
} }
} }
$cmd .= "wait;"; debug("init", "basecleanup: @cmd\n");
debug("init", "basecleanup: $cmd\n"); my ($exitstatus,$stdout_ref,$stderr_ref) =
print `$cmd`; run_parallel(join("\n",@cmd),"-j0","--retries",5);
if($exitstatus) {
my @stdout = @$stdout_ref;
my @stderr = @$stderr_ref;
::error("Cleanup of --basefile failed: @stdout@stderr");
::wait_and_exit(255);
}
}
sub run_parallel {
my ($stdin,@args) = @_;
my $cmd = join "",map { " $_ & " } split /\n/, $stdin;
print $Global::original_stderr ` $cmd wait` ;
return 0
}
sub _run_parallel {
# Run GNU Parallel
# This should ideally just fork an internal copy
# and not start it through a shell
# Input:
# $stdin = data to provide on stdin for GNU Parallel
# @args = command line arguments
# Returns:
# $exitstatus = exitcode of GNU Parallel run
# \@stdout = standard output
# \@stderr = standard error
my ($stdin,@args) = @_;
my ($exitstatus,@stdout,@stderr);
my ($stdin_fh,$stdout_fh)=(gensym(),gensym());
my ($stderr_fh, $stderrname) = ::tmpfile(SUFFIX => ".par");
unlink $stderrname;
my $pid = ::open3($stdin_fh,$stdout_fh,$stderr_fh,
$0,qw(--plain --shell /bin/sh --will-cite), @args);
if(my $writerpid = fork()) {
close $stdin_fh;
@stdout = <$stdout_fh>;
# Now stdout is closed:
# These pids should be dead or die very soon
while(kill 0, $writerpid) { ::usleep(1); }
die;
# reap $writerpid;
# while(kill 0, $pid) { ::usleep(1); }
# reap $writerpid;
$exitstatus = $?;
seek $stderr_fh, 0, 0;
@stderr = <$stderr_fh>;
close $stdout_fh;
close $stderr_fh;
} else {
close $stdout_fh;
close $stderr_fh;
print $stdin_fh $stdin;
close $stdin_fh;
exit(0);
}
return ($exitstatus,\@stdout,\@stderr);
} }
sub filter_hosts { sub filter_hosts {