parallel: Converted ->fd to 4 file format. Passes testsuite.

This commit is contained in:
Ole Tange 2014-02-16 13:51:52 +01:00
parent 3447f2da85
commit 846085cfd3
3 changed files with 154 additions and 83 deletions

View file

@ -220,11 +220,18 @@ New in this release:
Systems for Forensic Applications Systems for Forensic Applications
http://lantana.tenet.res.in/website_files/thesis/Phd/srikanth.pdf http://lantana.tenet.res.in/website_files/thesis/Phd/srikanth.pdf
* GNU Parallel was cited in: Scaleable Code Clone Detection
http://scg.unibe.ch/archive/phd/schwarz-phd.pdf
* GNU Parallel was used (unfortunately without citation) in: * GNU Parallel was used (unfortunately without citation) in:
Performance and Energy Efficiency of Common Compression / Performance and Energy Efficiency of Common Compression /
Decompression Utilities Decompression Utilities
http://www.researchgate.net/publication/243962643_Performance_and_Energy_Efficiency_of_Common_CompressionDecompression_Utilities_An_Experimental_Study_in_Mobile_and_Workstation_Computer_Platforms/file/3deec51d1dbc0474f9.pdf http://www.researchgate.net/publication/243962643_Performance_and_Energy_Efficiency_of_Common_CompressionDecompression_Utilities_An_Experimental_Study_in_Mobile_and_Workstation_Computer_Platforms/file/3deec51d1dbc0474f9.pdf
* GNU Parallel was recommended (without citation) in: Redesigning the
Specificity of Protein-DNA Interactions with Rosetta
http://link.springer.com/protocol/10.1007/978-1-62703-968-0_17
* GNU Parallel is co-distributed with RepeatExplorer * GNU Parallel is co-distributed with RepeatExplorer
http://www.vcru.wisc.edu/simonlab/bioinformatics/programs/repeatexplorer/README.txt http://www.vcru.wisc.edu/simonlab/bioinformatics/programs/repeatexplorer/README.txt
@ -234,6 +241,9 @@ New in this release:
* PHP wrapper class for the GNU Parallel tool * PHP wrapper class for the GNU Parallel tool
https://github.com/geopal-solutions/gnu-parallel-wrapper/tree/master https://github.com/geopal-solutions/gnu-parallel-wrapper/tree/master
* Exploratory Data Analysis
http://www.slideshare.net/thinrhino/gnunify
* Single-Thread-Programme auf Multicore-Rechnern parallelisieren * Single-Thread-Programme auf Multicore-Rechnern parallelisieren
http://www.adlerweb.info/blog/2014/02/08/linux-single-thread-programme-auf-multicore-rechnern-parallelisieren http://www.adlerweb.info/blog/2014/02/08/linux-single-thread-programme-auf-multicore-rechnern-parallelisieren

View file

@ -137,8 +137,8 @@ if($opt::pipe and @opt::a) {
} }
@Global::tee_jobs = @jobs; @Global::tee_jobs = @jobs;
$command = "tee".join("",map {" >((".$_->replaced().")". $command = "tee".join("",map {" >((".$_->replaced().")".
" >".$_->fd_file_name(1). " >".$_->fd(1,"name").
" 2>".$_->fd_file_name(2).")" } @jobs) " 2>".$_->fd(2,"name").")" } @jobs)
." >/dev/null"; ." >/dev/null";
$Global::JobQueue = JobQueue->new( $Global::JobQueue = JobQueue->new(
$command,\@fhlist,$Global::ContextReplace,$number_of_args,\@Global::ret_files); $command,\@fhlist,$Global::ContextReplace,$number_of_args,\@Global::ret_files);
@ -173,10 +173,10 @@ drain_job_queue();
reaper(); reaper();
if($opt::pipe and @opt::a) { if($opt::pipe and @opt::a) {
for my $job (@Global::tee_jobs) { for my $job (@Global::tee_jobs) {
unlink $job->fd_file_name(2); unlink $job->fd(2,"name");
$job->set_fd_file_name(2,""); $job->set_fd(2,"name","");
$job->print(); $job->print();
unlink $job->fd_file_name(1); unlink $job->fd(1,"name");
} }
} }
cleanup(); cleanup();
@ -325,8 +325,7 @@ sub spreadstdin {
$Global::start_no_new_jobs ||= 1; $Global::start_no_new_jobs ||= 1;
if($opt::roundrobin) { if($opt::roundrobin) {
for my $job (values %Global::running) { for my $job (values %Global::running) {
my $fh = $job->fd(0); close $job->fd(0,"w");
close $fh;
} }
my %incomplete_jobs = %Global::running; my %incomplete_jobs = %Global::running;
my $sleep = 1; my $sleep = 1;
@ -420,12 +419,10 @@ sub write_record_to_pipe {
} }
$job->write($header_ref); $job->write($header_ref);
$job->write($record_ref); $job->write($record_ref);
my $stdin_fh = $job->fd(0); close $job->fd(0,"w");
close $stdin_fh;
exit(0); exit(0);
} }
my $stdin_fh = $job->fd(0); close $job->fd(0,"w");
close $stdin_fh;
return 1; return 1;
} }
@ -1532,8 +1529,7 @@ sub drain_job_queue {
if($opt::pipe) { if($opt::pipe) {
# When using --pipe sometimes file handles are not closed properly # When using --pipe sometimes file handles are not closed properly
for my $job (values %Global::running) { for my $job (values %Global::running) {
my $stdin_fh = $job->fd(0); close $job->fd(0,"w");
close $stdin_fh;
} }
} }
if($opt::progress) { if($opt::progress) {
@ -4035,11 +4031,69 @@ sub seq {
return $self->{'commandline'}->seq(); return $self->{'commandline'}->seq();
} }
sub cattail {
# Returns:
# $cattail = perl program for: cattail "decompress program" [file_to_decompress or stdin]
my $cattail = q{
# cat followed by tail.
# SIGHUP says there will be appended no more, so just finish after this round.
use Fcntl;
$SIG{HUP} = sub { $Global::sighup = 1; };
$|=1;
$cmd = shift;
if(@ARGV) {
open(IN,"<",$ARGV[0]) || die("Cannot open $ARGV[0]");
} else {
*IN = *STDIN;
}
my $flags;
fcntl(IN, F_GETFL, $flags) || die $!; # Get the current flags on the filehandle
$flags |= O_NONBLOCK; # Add non-blocking to the flags
fcntl(IN, F_SETFL, $flags) || die $!; # Set the flags on the filehandle
open(OUT,"|-",$cmd) || die;
while(1) {
# clear EOF
seek(IN,0,1);
$read = sysread(IN,$buf,1_000_000);
if($read) {
# Blocking print
syswrite(OUT,$buf);
# Something printed: Wait less next time
$sleep /= 2;
} else {
if($Global::sighup) {
# SIGHUP received: There will never be more to read => exit
exit;
}
# Nothing read: Wait longer next time
$sleep = 1.05*$sleep + 0.01;
usleep($sleep);
}
}
sub usleep {
# Sleep this many milliseconds.
my $secs = shift;
select(undef, undef, undef, $secs/1000);
}
};
return $cattail;
}
sub openoutputfiles { sub openoutputfiles {
# Open files for STDOUT and STDERR # Open files for STDOUT and STDERR
# Set file handles in $self->fd and possibly $self->fd_input # Set file handles in $self->fd
# TODO non-blocking $opt::linebuffer
my $self = shift; my $self = shift;
my ($outfh, $errfh, $outname, $errname, $unlink_out, $unlink_err); my ($outfhw, $errfhw, $outrfhr, $errfhr, $outname, $errname, $unlink_out, $unlink_err);
if($opt::results) { if($opt::results) {
my $args_as_dirname = $self->{'commandline'}->args_as_dirname(); my $args_as_dirname = $self->{'commandline'}->args_as_dirname();
# prefix/name1/val1/name2/val2/ # prefix/name1/val1/name2/val2/
@ -4047,96 +4101,113 @@ sub openoutputfiles {
File::Path::mkpath($dir); File::Path::mkpath($dir);
# prefix/name1/val1/name2/val2/stdout # prefix/name1/val1/name2/val2/stdout
$outname = "$dir/stdout"; $outname = "$dir/stdout";
if(not open($outfh, "+>", $outname)) { if(not open($outfhw, "+>", $outname)) {
::error("Cannot write to `$outname'.\n"); ::error("Cannot write to `$outname'.\n");
::wait_and_exit(255); ::wait_and_exit(255);
} }
# prefix/name1/val1/name2/val2/stderr # prefix/name1/val1/name2/val2/stderr
$errname = "$dir/stderr"; $errname = "$dir/stderr";
if(not open($errfh, "+>", $errname)) { if(not open($errfhw, "+>", $errname)) {
::error("Cannot write to `$errname'.\n"); ::error("Cannot write to `$errname'.\n");
::wait_and_exit(255); ::wait_and_exit(255);
} }
$self->set_fd(1,"unlink","");
$self->set_fd(2,"unlink","");
} elsif($Global::grouped) { } elsif($Global::grouped) {
# To group we create temporary files for STDOUT and STDERR # To group we create temporary files for STDOUT and STDERR
# To avoid the cleanup unlink the files immediately (but keep them open) # To avoid the cleanup unlink the files immediately (but keep them open)
if(@Global::tee_jobs) { if(@Global::tee_jobs) {
# files must be removed when the tee is done # files must be removed when the tee is done
} elsif($opt::files) { } elsif($opt::files) {
($outfh, $outname) = ::tempfile(SUFFIX => ".par"); ($outfhw, $outname) = ::tempfile(SUFFIX => ".par");
($errfh, $errname) = ::tempfile(SUFFIX => ".par"); ($errfhw, $errname) = ::tempfile(SUFFIX => ".par");
# --files => only remove stderr # --files => only remove stderr
$unlink_err = 1; $self->set_fd(1,"unlink","");
$self->set_fd(2,"unlink",$errname);
} else { } else {
($outfh, $outname) = ::tempfile(SUFFIX => ".par"); ($outfhw, $outname) = ::tempfile(SUFFIX => ".par");
($errfh, $errname) = ::tempfile(SUFFIX => ".par"); ($errfhw, $errname) = ::tempfile(SUFFIX => ".par");
$unlink_out = 1; $self->set_fd(1,"unlink",$outname);
$unlink_err = 1; $self->set_fd(2,"unlink",$errname);
} }
} else { } else {
# --ungroup # --ungroup
$outfh = *STDOUT; open($outfhw,">&",$Global::fd{1}) || die;
$errfh = *STDERR; open($errfhw,">&",$Global::fd{2}) || die;
# File name must be empty as it will otherwise be printed # File name must be empty as it will otherwise be printed
$outname = ""; $outname = "";
$errname = ""; $errname = "";
$unlink_out = 1; $self->set_fd(1,"unlink",$outname);
$unlink_err = 1; $self->set_fd(2,"unlink",$errname);
} }
$self->set_fd_file_name(1,$outname); # Set writing FD
$self->set_fd_file_name(2,$errname); $self->set_fd(1,'w',$outfhw);
$self->set_fd(2,'w',$errfhw);
$self->set_fd(1,'name',$outname);
$self->set_fd(2,'name',$errname);
if($opt::compress) { if($opt::compress) {
# Send stdout to stdin for $opt::compress_program(1) # Send stdout to stdin for $opt::compress_program(1)
# Send stderr to stdin for $opt::compress_program(2) # Send stderr to stdin for $opt::compress_program(2)
$self->set_fd_input(1, $outfh); # cattail get pid i $pid = $self->fd($fdno,'rpid');
$self->set_fd_input(2, $errfh); my $cattail = cattail();
my $rm = $unlink_out ? "rm" : "true"; for my $fdno (1,2) {
::debug("| ($rm $outname; $opt::compress_program) > $outname\n"); my $wpid = open(my $fdw,"|-","$opt::compress_program >".
open(my $coutfh,"|-","($rm $outname; $opt::compress_program) > $outname") || die $?; $self->fd($fdno,'name')) || die $?;
$rm = $unlink_err ? "rm" : "true"; $self->set_fd($fdno,'w',$fdw);
::debug("| ($rm $errname; $opt::compress_program) > $errname\n"); $self->set_fd($fdno,'wpid',$wpid);
open(my $cerrfh,"|-","($rm $errname; $opt::compress_program) > $errname") || die $?; my $rpid = open(my $fdr, "-|", "perl", "-e", $cattail,
$self->set_fd(1,$coutfh); $opt::decompress_program, $self->fd($fdno,'name')) || die $?;
$self->set_fd(2,$cerrfh); $self->set_fd($fdno,'r',$fdr);
$self->set_fd($fdno,'rpid',$rpid);
# Unlink if required
# TODO this is disabled for now
# unlink $self->fd($fdno,"unlink");
}
} else { } else {
$self->set_fd(1,$outfh); # Set reading FD
$self->set_fd(2,$errfh); for my $fdno (1,2) {
$unlink_out && unlink($outname); my $fdw = $self->fd($fdno,'w');
$unlink_err && unlink($errname); # Duplicate filehandle, so fdw can be closed seperately
open(my $fdr,"<&",$fdw) || die;
$self->set_fd($fdno,'r',$fdr);
# Unlink if required
unlink $self->fd($fdno,"unlink");
}
} }
} }
sub set_fd { sub set_fd {
# Set file descriptor # Set file descriptor
my ($self, $fd_no, $fd) = @_; my ($self, $fd_no, $key, $fd) = @_;
$self->{'fd'}{$fd_no} = $fd; $self->{'fd'}{$fd_no,$key} = $fd;
} }
sub fd { sub fd {
# Get file descriptor # Get file descriptor
my ($self, $fd_no) = @_; my ($self, $fd_no, $key) = @_;
return $self->{'fd'}{$fd_no}; return $self->{'fd'}{$fd_no,$key};
} }
sub set_fd_input { sub _set_fd_input {
my ($self, $fd_no, $fd_input) = @_; my ($self, $fd_no, $fd_input) = @_;
$self->{'fd_input'}{$fd_no} = $fd_input; $self->{'fd_input'}{$fd_no} = $fd_input;
} }
sub fd_input { sub _fd_input {
# Get input file descriptor # Get input file descriptor
my ($self, $fd_no) = @_; my ($self, $fd_no) = @_;
return $self->{'fd_input'}{$fd_no}; return $self->{'fd_input'}{$fd_no};
} }
sub set_fd_file_name { sub _set_fd_file_name {
# Set file name for a file descriptor # Set file name for a file descriptor
my $self = shift; my $self = shift;
my $fd_no = shift; my $fd_no = shift;
$self->{'fd_file_name',$fd_no} = shift; $self->{'fd_file_name',$fd_no} = shift;
} }
sub fd_file_name { sub _fd_file_name {
# Get file name for a file descriptor # Get file name for a file descriptor
my $self = shift; my $self = shift;
my $fd_no = shift; my $fd_no = shift;
@ -4146,7 +4217,7 @@ sub fd_file_name {
sub write { sub write {
my $self = shift; my $self = shift;
my $remaining_ref = shift; my $remaining_ref = shift;
my $stdin_fh = $self->fd(0); my $stdin_fh = $self->fd(0,"w");
syswrite($stdin_fh,$$remaining_ref); syswrite($stdin_fh,$$remaining_ref);
} }
@ -4181,7 +4252,7 @@ sub non_block_write {
# use Fcntl; # use Fcntl;
# my $flags = ''; # my $flags = '';
for my $buf (substr($self->{'stdin_buffer'},$self->{'stdin_buffer_pos'})) { for my $buf (substr($self->{'stdin_buffer'},$self->{'stdin_buffer_pos'})) {
my $in = $self->fd(0); my $in = $self->fd(0,"w");
# fcntl($in, F_GETFL, $flags) # fcntl($in, F_GETFL, $flags)
# or die "Couldn't get flags for HANDLE : $!\n"; # or die "Couldn't get flags for HANDLE : $!\n";
# $flags |= O_NONBLOCK; # $flags |= O_NONBLOCK;
@ -4701,7 +4772,7 @@ sub start {
my $pid; my $pid;
$job->openoutputfiles(); $job->openoutputfiles();
my($stdout_fh,$stderr_fh) = ($job->fd(1),$job->fd(2)); my($stdout_fh,$stderr_fh) = ($job->fd(1,"w"),$job->fd(2,"w"));
local (*IN,*OUT,*ERR); local (*IN,*OUT,*ERR);
open OUT, '>&', $stdout_fh or ::die_bug("Can't redirect STDOUT: $!"); open OUT, '>&', $stdout_fh or ::die_bug("Can't redirect STDOUT: $!");
open ERR, '>&', $stderr_fh or ::die_bug("Can't dup STDOUT: $!"); open ERR, '>&', $stderr_fh or ::die_bug("Can't dup STDOUT: $!");
@ -4743,7 +4814,7 @@ sub start {
::die_bug("open3-pipe"); ::die_bug("open3-pipe");
1; 1;
}; };
$job->set_fd(0,$stdin_fh); $job->set_fd(0,"w",$stdin_fh);
} elsif(@opt::a and not $Global::stdin_in_opt_a and $job->seq() == 1 } elsif(@opt::a and not $Global::stdin_in_opt_a and $job->seq() == 1
and $job->sshlogin()->string() eq ":") { and $job->sshlogin()->string() eq ":") {
# Give STDIN to the first job if using -a (but only if running # Give STDIN to the first job if using -a (but only if running
@ -4857,13 +4928,13 @@ sub print {
if($opt::pipe and $self->virgin()) { if($opt::pipe and $self->virgin()) {
# Nothing was printed to this job: # Nothing was printed to this job:
# cleanup tmp files if --files was set # cleanup tmp files if --files was set
unlink $self->fd_file_name(1); unlink $self->fd(1,"name");
return; return;
} }
if($opt::dryrun) { if($opt::dryrun) {
# Nothing was printed to this job: # Nothing was printed to this job:
# cleanup tmp files if --files was set # cleanup tmp files if --files was set
unlink $self->fd_file_name(1); unlink $self->fd(1,"name");
} }
if($Global::joblog) { $self->print_joblog() } if($Global::joblog) { $self->print_joblog() }
@ -4893,7 +4964,7 @@ sub print {
# Sort by file descriptor numerically: 1,2,3,..,9,10,11 # Sort by file descriptor numerically: 1,2,3,..,9,10,11
$fdno == 0 and next; $fdno == 0 and next;
my $out_fd = $Global::fd{$fdno}; my $out_fd = $Global::fd{$fdno};
my $in_fd = $self->fd($fdno); my $in_fd = $self->fd($fdno,"r");
if(not $in_fd) { if(not $in_fd) {
if(not $Job::file_descriptor_warning_printed{$fdno}++) { if(not $Job::file_descriptor_warning_printed{$fdno}++) {
# ::warning("File descriptor $fdno not defined\n"); # ::warning("File descriptor $fdno not defined\n");
@ -4903,9 +4974,9 @@ sub print {
::debug("File descriptor $fdno:"); ::debug("File descriptor $fdno:");
if($opt::files) { if($opt::files) {
# If --compress: $in_fd must be closed first. # If --compress: $in_fd must be closed first.
close($in_fd); close $self->fd($fdno,"w");
if($fdno == 1 and $self->fd_file_name($fdno)) { if($fdno == 1 and $self->fd($fdno,"name")) {
print $out_fd $self->fd_file_name($fdno),"\n"; print $out_fd $self->fd($fdno,"name"),"\n";
} }
} elsif($opt::linebuffer) { } elsif($opt::linebuffer) {
# Line buffered print out # Line buffered print out
@ -4952,21 +5023,13 @@ sub print {
} }
} else { } else {
my $buf; my $buf;
if($opt::compress) { close $self->fd($fdno,"w");
# Close for writing so the compressor can finish up if($self->fd($fdno,"wpid")) {
# (Are we really sure it finishes? Potential bug) # This is --compress
close($in_fd); # Wait for $compress_program to finish
# Save STDIN file handle waitpid($self->fd($fdno,"wpid"),0);
open(my $stdin_copy, "<&", "STDIN") or ::die_bug("Can't dup STDIN: $!"); # Then tell cattail this the last reading
my $fd = $self->fd_input($fdno); CORE::kill "HUP", $self->fd($fdno,"rpid");
# Replace STDIN with the file on disk
open(STDIN, "<&", $fd);
::debug("$opt::decompress_program |");
# Start decompression with the fake STDIN
open($in_fd, "-|", $opt::decompress_program) ||
::die_bug("Decompressing failed ($opt::decompress_program): $!");
# Put STDIN file handle back
open(STDIN, "<&", $stdin_copy) or ::die_bug("Can't dup STDIN: $!");
} else { } else {
# Seek to start # Seek to start
seek $in_fd, 0, 0; seek $in_fd, 0, 0;

View file

@ -9,6 +9,7 @@ begin
5 5
6 6
7 7
parallel: Warning: No more file handles. Raising ulimit -n or /etc/security/limits.conf may help.
8 8
9 9
10 10
@ -17,7 +18,6 @@ begin
13 13
14 14
15 15
parallel: Warning: No more file handles. Raising ulimit -n or /etc/security/limits.conf may help.
16 16
17 17
18 18
@ -51,8 +51,6 @@ job2
14 14
15 15
16 16
17
18
2 2
3 3
4 4
@ -62,7 +60,8 @@ job2
8 8
9 9
parallel: SIGTERM received. No new jobs will be started. parallel: SIGTERM received. No new jobs will be started.
parallel: Waiting for these 9 jobs to finish. Send SIGTERM again to stop now. parallel: Waiting for these 8 jobs to finish. Send SIGTERM again to stop now.
parallel: Warning: No more file handles. Raising ulimit -n or /etc/security/limits.conf may help.
parallel: sleep 3; echo 10 parallel: sleep 3; echo 10
parallel: sleep 3; echo 11 parallel: sleep 3; echo 11
parallel: sleep 3; echo 12 parallel: sleep 3; echo 12
@ -70,8 +69,7 @@ parallel: sleep 3; echo 13
parallel: sleep 3; echo 14 parallel: sleep 3; echo 14
parallel: sleep 3; echo 15 parallel: sleep 3; echo 15
parallel: sleep 3; echo 16 parallel: sleep 3; echo 16
parallel: sleep 3; echo 17 parallel: sleep 3; echo 9
parallel: sleep 3; echo 18
### Test bug: empty line for | sh with -k ### Test bug: empty line for | sh with -k
a a
b b