parallel: Treat STDOUT and STDERR as fd{1} and fd{2}. Find all shell opened file descriptors.

This commit is contained in:
Ole Tange 2013-07-14 13:22:56 +02:00
parent 56e5118535
commit d3a9292b2a
2 changed files with 66 additions and 63 deletions

View file

@ -217,6 +217,8 @@ New in this release:
* http://www.brunokim.com.br/blog/?p=18 * http://www.brunokim.com.br/blog/?p=18
* http://timotheepoisot.fr/2013/07/08/parallel/
* http://www.open-open.com/news/view/371301 * http://www.open-open.com/news/view/371301
* Bug fixes and man page updates. * Bug fixes and man page updates.

View file

@ -33,7 +33,6 @@ use Getopt::Long;
# Used to ensure code quality # Used to ensure code quality
use strict; use strict;
$SIG{TERM} ||= sub { exit 0; }; # $SIG{TERM} is not set on Mac OS X
if(not $ENV{SHELL}) { if(not $ENV{SHELL}) {
# $ENV{SHELL} is sometimes not set on Mac OS X and Windows # $ENV{SHELL} is sometimes not set on Mac OS X and Windows
::warning("\$SHELL not set. Using /bin/sh.\n"); ::warning("\$SHELL not set. Using /bin/sh.\n");
@ -45,9 +44,10 @@ if(not $ENV{HOME}) {
$ENV{HOME} = "/tmp"; $ENV{HOME} = "/tmp";
} }
save_sig_stdin_stdout_stderr(); save_stdin_stdout_stderr();
save_original_signal_handler();
parse_options(); parse_options();
::debug("Open file descriptors: ".join(" ",keys %Global::fd)."\n");
my $number_of_args; my $number_of_args;
if($Global::max_number_of_args) { if($Global::max_number_of_args) {
$number_of_args=$Global::max_number_of_args; $number_of_args=$Global::max_number_of_args;
@ -481,12 +481,12 @@ sub write_record_to_pipe {
} }
$job->write($header_ref); $job->write($header_ref);
$job->write($record_ref); $job->write($record_ref);
my $fh = $job->stdin(); my $stdin_fh = $job->fd(0);
close $fh; close $stdin_fh;
exit(0); exit(0);
} }
my $fh = $job->stdin(); my $stdin_fh = $job->fd(0);
close $fh; close $stdin_fh;
return 1; return 1;
} }
@ -1007,7 +1007,7 @@ sub open_joblog {
} else { } else {
if($opt::joblog eq "-") { if($opt::joblog eq "-") {
# Use STDOUT as joblog # Use STDOUT as joblog
$Global::joblog = $Global::original_stdout $Global::joblog = $Global::fd{1};
} elsif(not open($Global::joblog, ">", $opt::joblog)) { } elsif(not open($Global::joblog, ">", $opt::joblog)) {
# Overwrite the joblog # Overwrite the joblog
::error("Cannot write to --joblog $opt::joblog.\n"); ::error("Cannot write to --joblog $opt::joblog.\n");
@ -1254,6 +1254,27 @@ sub shell_unquote {
sub __FILEHANDLES__ {} sub __FILEHANDLES__ {}
sub save_stdin_stdout_stderr {
# Remember the original STDIN, STDOUT and STDERR
# and file descriptors opened by the shell (e.g. 3>/tmp/foo)
# Returns: N/A
# Find file descriptors that are already opened (by the shell)
for (1..60) {
# /dev/fd/62 and above are used by bash for <(cmd)
my $fh;
if(open($fh,">&=$_")) {
$Global::fd{$_}=$fh;
}
}
open $Global::original_stderr, ">&", "STDERR" or
::die_bug("Can't dup STDERR: $!");
open $Global::original_stdin, "<&", "STDIN" or
::die_bug("Can't dup STDIN: $!");
$Global::fd{0} = $Global::original_stdin;
}
sub enough_file_handles { sub enough_file_handles {
# check that we have enough filehandles available for starting # check that we have enough filehandles available for starting
# another job # another job
@ -1308,19 +1329,6 @@ sub __RUNNING_THE_JOBS_AND_PRINTING_PROGRESS__ {}
# $Global::total_running = total number of running jobs # $Global::total_running = total number of running jobs
# $Global::total_started = total jobs started # $Global::total_started = total jobs started
sub save_sig_stdin_stdout_stderr {
# Remember the original signal handler, STDIN, STDOUT and STDERR
# Returns: N/A
%Global::original_sig = %SIG;
$SIG{TERM} = sub {}; # Dummy until jobs really start
open $Global::original_stdout, ">&", "STDOUT" or
::die_bug("Can't dup STDOUT: $!");
open $Global::original_stderr, ">&", "STDERR" or
::die_bug("Can't dup STDERR: $!");
open $Global::original_stdin, "<&", "STDIN" or
::die_bug("Can't dup STDIN: $!");
}
sub init_run_jobs { sub init_run_jobs {
$Global::total_running = 0; $Global::total_running = 0;
$Global::total_started = 0; $Global::total_started = 0;
@ -1468,8 +1476,8 @@ sub drain_job_queue {
if($opt::pipe) { if($opt::pipe) {
# When using --pipe sometimes file handles are not closed properly # When using --pipe sometimes file handles are not closed properly
for my $job (values %Global::running) { for my $job (values %Global::running) {
my $fh = $job->stdin(); my $stdin_fh = $job->fd(0);
close $fh; close $stdin_fh;
} }
} }
if($opt::progress) { if($opt::progress) {
@ -1865,6 +1873,14 @@ sub cleanup_basefile {
sub __SIGNAL_HANDLING__ {} sub __SIGNAL_HANDLING__ {}
sub save_original_signal_handler {
# Remember the original signal handler
# Returns: N/A
$SIG{TERM} ||= sub { exit 0; }; # $SIG{TERM} is not set on Mac OS X
%Global::original_sig = %SIG;
$SIG{TERM} = sub {}; # Dummy until jobs really start
}
sub list_running_jobs { sub list_running_jobs {
# Returns: N/A # Returns: N/A
for my $v (values %Global::running) { for my $v (values %Global::running) {
@ -2256,8 +2272,10 @@ sub debug {
# Returns: N/A # Returns: N/A
$Global::debug or return; $Global::debug or return;
@_ = grep { defined $_ ? $_ : "" } @_; @_ = grep { defined $_ ? $_ : "" } @_;
if($Global::original_stdout) { if($Global::fd{1}) {
print $Global::original_stdout @_; # Original stdout was saved
my $stdout = $Global::fd{1};
print $stdout @_;
} else { } else {
print @_; print @_;
} }
@ -3587,18 +3605,22 @@ sub openresultsfile {
} }
open OUT, '>&', $outfh or ::die_bug("Can't redirect STDOUT: $!"); open OUT, '>&', $outfh or ::die_bug("Can't redirect STDOUT: $!");
open ERR, '>&', $errfh or ::die_bug("Can't dup STDOUT: $!"); open ERR, '>&', $errfh or ::die_bug("Can't dup STDOUT: $!");
$self->set_stdout($outfh); $self->set_fd(1,$outfh);
$self->set_stderr($errfh); $self->set_fd(2,$errfh);
} }
sub set_stdout { sub set_fd {
# Set file descriptor
my $self = shift; my $self = shift;
$self->{'stdout'} = shift; my $fd_no = shift;
$self->{'fd'}{$fd_no} = shift;
} }
sub stdout { sub fd {
# Get file descriptor
my $self = shift; my $self = shift;
return $self->{'stdout'}; my $fd_no = shift;
return $self->{'fd'}{$fd_no};
} }
sub set_stdoutfilename { sub set_stdoutfilename {
@ -3611,32 +3633,11 @@ sub stdoutfilename {
return $self->{'stdoutfilename'}; return $self->{'stdoutfilename'};
} }
sub stderr {
my $self = shift;
return $self->{'stderr'};
}
sub set_stderr {
my $self = shift;
$self->{'stderr'} = shift;
}
sub stdin {
my $self = shift;
return $self->{'stdin'};
}
sub set_stdin {
my $self = shift;
my $stdin = shift;
$self->{'stdin'} = $stdin;
}
sub write { sub write {
my $self = shift; my $self = shift;
my $remaining_ref = shift; my $remaining_ref = shift;
my $in = $self->{'stdin'}; my $stdin_fh = $self->fd(0);
syswrite($in,$$remaining_ref); syswrite($stdin_fh,$$remaining_ref);
} }
sub virgin { sub virgin {
@ -3676,7 +3677,7 @@ sub runtime {
# Returns: # Returns:
# Run time in seconds # Run time in seconds
my $self = shift; my $self = shift;
return int(($self->endtime() - $self->starttime())*1000)/1000; return sprintf("%.3f",int(($self->endtime() - $self->starttime())*1000)/1000);
} }
sub endtime { sub endtime {
@ -4159,8 +4160,8 @@ sub start {
open OUT, '>&', $outfh or ::die_bug("Can't redirect STDOUT: $!"); open OUT, '>&', $outfh or ::die_bug("Can't redirect STDOUT: $!");
open ERR, '>&', $errfh or ::die_bug("Can't dup STDOUT: $!"); open ERR, '>&', $errfh or ::die_bug("Can't dup STDOUT: $!");
$job->set_stdout($outfh); $job->set_fd(1,$outfh);
$job->set_stderr($errfh); $job->set_fd(2,$errfh);
} else { } else {
(*OUT,*ERR)=(*STDOUT,*STDERR); (*OUT,*ERR)=(*STDOUT,*STDERR);
} }
@ -4181,7 +4182,7 @@ sub start {
::debug("$Global::total_running processes. Starting (" ::debug("$Global::total_running processes. Starting ("
. $job->seq() . "): $command\n"); . $job->seq() . "): $command\n");
if($opt::pipe) { if($opt::pipe) {
my ($in); my ($stdin_fh);
# Wrap command with end-of-file detector, # Wrap command with end-of-file detector,
# so we do not spawn a program if there is no input. # so we do not spawn a program if there is no input.
# Exit value: # Exit value:
@ -4198,11 +4199,11 @@ sub start {
"($command);"; "($command);";
# The eval is needed to catch exception from open3 # The eval is needed to catch exception from open3
eval { eval {
$pid = ::open3($in, ">&OUT", ">&ERR", $ENV{SHELL}, "-c", $command) || $pid = ::open3($stdin_fh, ">&OUT", ">&ERR", $ENV{SHELL}, "-c", $command) ||
::die_bug("open3-pipe"); ::die_bug("open3-pipe");
1; 1;
}; };
$job->set_stdin($in); $job->set_fd(0,$stdin_fh);
} elsif(@opt::a and not $Global::stdin_in_opt_a and $job->seq() == 1 } elsif(@opt::a and not $Global::stdin_in_opt_a and $job->seq() == 1
and $job->sshlogin()->string() eq ":") { and $job->sshlogin()->string() eq ":") {
# Give STDIN to the first job if using -a (but only if running # Give STDIN to the first job if using -a (but only if running
@ -4215,7 +4216,7 @@ sub start {
1; 1;
}; };
# Re-open to avoid complaining # Re-open to avoid complaining
open STDIN, "<&", $Global::original_stdin open(STDIN, "<&", $Global::original_stdin)
or ::die_bug("dup-\$Global::original_stdin: $!"); or ::die_bug("dup-\$Global::original_stdin: $!");
} elsif ($opt::tty and not $Global::tty_taken and -c "/dev/tty" and } elsif ($opt::tty and not $Global::tty_taken and -c "/dev/tty" and
open(my $devtty_fh, "<", "/dev/tty")) { open(my $devtty_fh, "<", "/dev/tty")) {
@ -4316,8 +4317,8 @@ sub print {
} }
# Only relevant for grouping # Only relevant for grouping
$Global::grouped or return; $Global::grouped or return;
my $out = $self->stdout(); my $out = $self->fd(1);
my $err = $self->stderr(); my $err = $self->fd(2);
my $command = $self->sshlogin_wrap(); my $command = $self->sshlogin_wrap();
if($Global::joblog) { if($Global::joblog) {