parallel: Generalized openoutputfiles().

This commit is contained in:
Ole Tange 2013-08-21 21:24:47 +02:00
parent 2c587e4055
commit 26fd31c2de

View file

@ -262,8 +262,26 @@ if($opt::nonall or $opt::onall) {
wait_and_exit(min(undef_as_zero($Global::exitstatus),254)); wait_and_exit(min(undef_as_zero($Global::exitstatus),254));
} }
$Global::JobQueue = JobQueue->new( $Global::JobQueue = JobQueue->new(
$command,\@fhlist,$Global::ContextReplace,$number_of_args,\@Global::ret_files); $command,\@fhlist,$Global::ContextReplace,$number_of_args,\@Global::ret_files);
if($opt::pipe and @opt::a) {
# Disabled for now
# ... | parallel --pipe cmd ::: arg1 arg2
# The command to run is:
# tee >((cmd arg1) >/tmp/tmp1 2>/tmp/err1) >((cmd arg2) >/tmp/tmp2 2>/tmp/err2) >/dev/null
my @jobs;
# Get all jobs
while(not $Global::JobQueue->empty()) {
push @jobs, $Global::JobQueue->get();
}
$command = "tee ".join("",map {">((".$_->replaced().") >/tmp/tmp) "} @jobs)
." >/dev/null";
$Global::JobQueue = JobQueue->new(
$command,\@fhlist,$Global::ContextReplace,$number_of_args,\@Global::ret_files);
}
if($opt::eta) { if($opt::eta) {
# Count the number of jobs before starting any # Count the number of jobs before starting any
$Global::JobQueue->total_jobs(); $Global::JobQueue->total_jobs();
@ -423,7 +441,7 @@ sub spreadstdin {
$blocksize = ceil($blocksize * 1.3 + 1); $blocksize = ceil($blocksize * 1.3 + 1);
::warning("A full record was not matched in a block. Increasing to --blocksize ".$blocksize."\n"); ::warning("A full record was not matched in a block. Increasing to --blocksize ".$blocksize."\n");
} }
} }
# If there is anything left in the buffer write it # If there is anything left in the buffer write it
substr($buf,0,0) = ""; substr($buf,0,0) = "";
write_record_to_pipe($chunk_number++,\$header,\$buf,$recstart,$recend,length $buf); write_record_to_pipe($chunk_number++,\$header,\$buf,$recstart,$recend,length $buf);
@ -3673,29 +3691,43 @@ sub seq {
return $self->{'commandline'}->seq(); return $self->{'commandline'}->seq();
} }
sub openresultsfile { sub openoutputfiles {
my $self = shift; my $self = shift;
my $args_as_dirname = $self->{'commandline'}->args_as_dirname(); my ($outfh,$errfh,$outname,$errname);
my ($outfh,$errfh,$name,$dir); if($opt::results) {
# prefix/name1/val1/name2/val2/ my $args_as_dirname = $self->{'commandline'}->args_as_dirname();
$dir = $opt::results."/".$args_as_dirname; # prefix/name1/val1/name2/val2/
File::Path::mkpath($dir); my $dir = $opt::results."/".$args_as_dirname;
# prefix/name1/val1/name2/val2/stdout File::Path::mkpath($dir);
$name = "$dir/stdout"; # prefix/name1/val1/name2/val2/stdout
if(not open($outfh,"+>",$name)) { $outname = "$dir/stdout";
::error("Cannot write to `$name'.\n"); if(not open($outfh,"+>",$outname)) {
::wait_and_exit(255); ::error("Cannot write to `$outname'.\n");
::wait_and_exit(255);
}
# prefix/name1/val1/name2/val2/stderr
$errname = "$dir/stderr";
if(not open($errfh,"+>",$errname)) {
::error("Cannot write to `$errname'.\n");
::wait_and_exit(255);
}
} elsif($Global::grouped) {
# To group we create temporary files for STDOUT and STDERR
# To avoid the cleanup unlink the files immediately (but keep them open)
($outfh, $outname) = ::tempfile(SUFFIX => ".par");
$opt::files or unlink $outname;
($errfh, $errname) = ::tempfile(SUFFIX => ".par");
unlink $errname;
$errname = "";
} else {
# --ungroup
$outfh = *STDOUT;
$errfh = *STDERR;
$outname = "";
$errname = "";
} }
$self->set_fd_file_name(1,$name); $self->set_fd_file_name(1,$outname);
# prefix/name1/val1/name2/val2/stderr $self->set_fd_file_name(2,$errname);
$name = "$dir/stderr";
if(not open($errfh,"+>",$name)) {
::error("Cannot write to `$name'.\n");
::wait_and_exit(255);
}
$self->set_fd_file_name(2,$name);
open OUT, '>&', $outfh or ::die_bug("Can't redirect STDOUT: $!");
open ERR, '>&', $errfh or ::die_bug("Can't dup STDOUT: $!");
$self->set_fd(1,$outfh); $self->set_fd(1,$outfh);
$self->set_fd(2,$errfh); $self->set_fd(2,$errfh);
} }
@ -4305,34 +4337,19 @@ sub start {
} }
} }
local (*IN,*OUT,*ERR);
my $pid; my $pid;
if($opt::results) { $job->openoutputfiles();
$job->openresultsfile(); my($stdout_fh,$stderr_fh) = ($job->fd(1),$job->fd(2));
} elsif($Global::grouped) { local (*IN,*OUT,*ERR);
my ($outfh,$errfh,$name); open OUT, '>&', $stdout_fh or ::die_bug("Can't redirect STDOUT: $!");
# To group we create temporary files for STDOUT and STDERR open ERR, '>&', $stderr_fh or ::die_bug("Can't dup STDOUT: $!");
# To avoid the cleanup unlink the files immediately (but keep them open)
($outfh, $name) = ::tempfile(SUFFIX => ".par");
$job->set_fd_file_name(1,$name);
$opt::files or unlink $name;
($errfh, $name) = ::tempfile(SUFFIX => ".par");
unlink $name;
open OUT, '>&', $outfh or ::die_bug("Can't redirect STDOUT: $!");
open ERR, '>&', $errfh or ::die_bug("Can't dup STDOUT: $!");
$job->set_fd(1,$outfh);
$job->set_fd(2,$errfh);
} else {
(*OUT,*ERR)=(*STDOUT,*STDERR);
}
if(($opt::dryrun or $Global::verbose) and not $Global::grouped) { if(($opt::dryrun or $Global::verbose) and not $Global::grouped) {
if($Global::verbose <= 1) { if($Global::verbose <= 1) {
print OUT $job->replaced(),"\n"; print $stdout_fh $job->replaced(),"\n";
} else { } else {
# Verbose level > 1: Print the rsync and stuff # Verbose level > 1: Print the rsync and stuff
print OUT $command,"\n"; print $stdout_fh $command,"\n";
} }
} }
if($opt::dryrun) { if($opt::dryrun) {