From 846085cfd32b18e5b0a159ae256f720f2ff0cdc4 Mon Sep 17 00:00:00 2001 From: Ole Tange Date: Sun, 16 Feb 2014 13:51:52 +0100 Subject: [PATCH] parallel: Converted ->fd to 4 file format. Passes testsuite. --- doc/release_new_version | 10 ++ src/parallel | 217 ++++++++++++++++++++------------ testsuite/wanted-results/test13 | 10 +- 3 files changed, 154 insertions(+), 83 deletions(-) diff --git a/doc/release_new_version b/doc/release_new_version index 7329c210..bf5f5bee 100644 --- a/doc/release_new_version +++ b/doc/release_new_version @@ -220,11 +220,18 @@ New in this release: Systems for Forensic Applications http://lantana.tenet.res.in/website_files/thesis/Phd/srikanth.pdf +* GNU Parallel was cited in: Scaleable Code Clone Detection + http://scg.unibe.ch/archive/phd/schwarz-phd.pdf + * GNU Parallel was used (unfortunately without citation) in: Performance and Energy Efficiency of Common Compression / Decompression Utilities http://www.researchgate.net/publication/243962643_Performance_and_Energy_Efficiency_of_Common_CompressionDecompression_Utilities_An_Experimental_Study_in_Mobile_and_Workstation_Computer_Platforms/file/3deec51d1dbc0474f9.pdf +* GNU Parallel was recommended (without citation) in: Redesigning the + Specificity of Protein-DNA Interactions with Rosetta + http://link.springer.com/protocol/10.1007/978-1-62703-968-0_17 + * GNU Parallel is co-distributed with RepeatExplorer http://www.vcru.wisc.edu/simonlab/bioinformatics/programs/repeatexplorer/README.txt @@ -234,6 +241,9 @@ New in this release: * PHP wrapper class for the GNU Parallel tool https://github.com/geopal-solutions/gnu-parallel-wrapper/tree/master +* Exploratory Data Analysis + http://www.slideshare.net/thinrhino/gnunify + * Single-Thread-Programme auf Multicore-Rechnern parallelisieren http://www.adlerweb.info/blog/2014/02/08/linux-single-thread-programme-auf-multicore-rechnern-parallelisieren diff --git a/src/parallel b/src/parallel index d49704e7..0e293c6a 100755 --- a/src/parallel +++ b/src/parallel @@ -137,8 +137,8 @@ if($opt::pipe and @opt::a) { } @Global::tee_jobs = @jobs; $command = "tee".join("",map {" >((".$_->replaced().")". - " >".$_->fd_file_name(1). - " 2>".$_->fd_file_name(2).")" } @jobs) + " >".$_->fd(1,"name"). + " 2>".$_->fd(2,"name").")" } @jobs) ." >/dev/null"; $Global::JobQueue = JobQueue->new( $command,\@fhlist,$Global::ContextReplace,$number_of_args,\@Global::ret_files); @@ -173,10 +173,10 @@ drain_job_queue(); reaper(); if($opt::pipe and @opt::a) { for my $job (@Global::tee_jobs) { - unlink $job->fd_file_name(2); - $job->set_fd_file_name(2,""); + unlink $job->fd(2,"name"); + $job->set_fd(2,"name",""); $job->print(); - unlink $job->fd_file_name(1); + unlink $job->fd(1,"name"); } } cleanup(); @@ -325,8 +325,7 @@ sub spreadstdin { $Global::start_no_new_jobs ||= 1; if($opt::roundrobin) { for my $job (values %Global::running) { - my $fh = $job->fd(0); - close $fh; + close $job->fd(0,"w"); } my %incomplete_jobs = %Global::running; my $sleep = 1; @@ -420,12 +419,10 @@ sub write_record_to_pipe { } $job->write($header_ref); $job->write($record_ref); - my $stdin_fh = $job->fd(0); - close $stdin_fh; + close $job->fd(0,"w"); exit(0); } - my $stdin_fh = $job->fd(0); - close $stdin_fh; + close $job->fd(0,"w"); return 1; } @@ -1532,8 +1529,7 @@ sub drain_job_queue { if($opt::pipe) { # When using --pipe sometimes file handles are not closed properly for my $job (values %Global::running) { - my $stdin_fh = $job->fd(0); - close $stdin_fh; + close $job->fd(0,"w"); } } if($opt::progress) { @@ -4035,11 +4031,69 @@ sub seq { return $self->{'commandline'}->seq(); } +sub cattail { + # Returns: + # $cattail = perl program for: cattail "decompress program" [file_to_decompress or stdin] + my $cattail = q{ + # cat followed by tail. + # SIGHUP says there will be appended no more, so just finish after this round. + + use Fcntl; + + $SIG{HUP} = sub { $Global::sighup = 1; }; + + $|=1; + + $cmd = shift; + + if(@ARGV) { + open(IN,"<",$ARGV[0]) || die("Cannot open $ARGV[0]"); + } else { + *IN = *STDIN; + } + + my $flags; + fcntl(IN, F_GETFL, $flags) || die $!; # Get the current flags on the filehandle + $flags |= O_NONBLOCK; # Add non-blocking to the flags + fcntl(IN, F_SETFL, $flags) || die $!; # Set the flags on the filehandle + + open(OUT,"|-",$cmd) || die; + + while(1) { + # clear EOF + seek(IN,0,1); + $read = sysread(IN,$buf,1_000_000); + if($read) { + # Blocking print + syswrite(OUT,$buf); + # Something printed: Wait less next time + $sleep /= 2; + } else { + if($Global::sighup) { + # SIGHUP received: There will never be more to read => exit + exit; + } + # Nothing read: Wait longer next time + $sleep = 1.05*$sleep + 0.01; + usleep($sleep); + } + } + + sub usleep { + # Sleep this many milliseconds. + my $secs = shift; + select(undef, undef, undef, $secs/1000); + } +}; + return $cattail; +} + sub openoutputfiles { # Open files for STDOUT and STDERR - # Set file handles in $self->fd and possibly $self->fd_input + # Set file handles in $self->fd + # TODO non-blocking $opt::linebuffer my $self = shift; - my ($outfh, $errfh, $outname, $errname, $unlink_out, $unlink_err); + my ($outfhw, $errfhw, $outrfhr, $errfhr, $outname, $errname, $unlink_out, $unlink_err); if($opt::results) { my $args_as_dirname = $self->{'commandline'}->args_as_dirname(); # prefix/name1/val1/name2/val2/ @@ -4047,96 +4101,113 @@ sub openoutputfiles { File::Path::mkpath($dir); # prefix/name1/val1/name2/val2/stdout $outname = "$dir/stdout"; - if(not open($outfh, "+>", $outname)) { + if(not open($outfhw, "+>", $outname)) { ::error("Cannot write to `$outname'.\n"); ::wait_and_exit(255); } # prefix/name1/val1/name2/val2/stderr $errname = "$dir/stderr"; - if(not open($errfh, "+>", $errname)) { + if(not open($errfhw, "+>", $errname)) { ::error("Cannot write to `$errname'.\n"); ::wait_and_exit(255); } + $self->set_fd(1,"unlink",""); + $self->set_fd(2,"unlink",""); } elsif($Global::grouped) { # To group we create temporary files for STDOUT and STDERR # To avoid the cleanup unlink the files immediately (but keep them open) if(@Global::tee_jobs) { # files must be removed when the tee is done } elsif($opt::files) { - ($outfh, $outname) = ::tempfile(SUFFIX => ".par"); - ($errfh, $errname) = ::tempfile(SUFFIX => ".par"); + ($outfhw, $outname) = ::tempfile(SUFFIX => ".par"); + ($errfhw, $errname) = ::tempfile(SUFFIX => ".par"); # --files => only remove stderr - $unlink_err = 1; + $self->set_fd(1,"unlink",""); + $self->set_fd(2,"unlink",$errname); } else { - ($outfh, $outname) = ::tempfile(SUFFIX => ".par"); - ($errfh, $errname) = ::tempfile(SUFFIX => ".par"); - $unlink_out = 1; - $unlink_err = 1; + ($outfhw, $outname) = ::tempfile(SUFFIX => ".par"); + ($errfhw, $errname) = ::tempfile(SUFFIX => ".par"); + $self->set_fd(1,"unlink",$outname); + $self->set_fd(2,"unlink",$errname); } } else { # --ungroup - $outfh = *STDOUT; - $errfh = *STDERR; + open($outfhw,">&",$Global::fd{1}) || die; + open($errfhw,">&",$Global::fd{2}) || die; # File name must be empty as it will otherwise be printed $outname = ""; $errname = ""; - $unlink_out = 1; - $unlink_err = 1; + $self->set_fd(1,"unlink",$outname); + $self->set_fd(2,"unlink",$errname); } - $self->set_fd_file_name(1,$outname); - $self->set_fd_file_name(2,$errname); + # Set writing FD + $self->set_fd(1,'w',$outfhw); + $self->set_fd(2,'w',$errfhw); + $self->set_fd(1,'name',$outname); + $self->set_fd(2,'name',$errname); if($opt::compress) { # Send stdout to stdin for $opt::compress_program(1) # Send stderr to stdin for $opt::compress_program(2) - $self->set_fd_input(1, $outfh); - $self->set_fd_input(2, $errfh); - my $rm = $unlink_out ? "rm" : "true"; - ::debug("| ($rm $outname; $opt::compress_program) > $outname\n"); - open(my $coutfh,"|-","($rm $outname; $opt::compress_program) > $outname") || die $?; - $rm = $unlink_err ? "rm" : "true"; - ::debug("| ($rm $errname; $opt::compress_program) > $errname\n"); - open(my $cerrfh,"|-","($rm $errname; $opt::compress_program) > $errname") || die $?; - $self->set_fd(1,$coutfh); - $self->set_fd(2,$cerrfh); + # cattail get pid i $pid = $self->fd($fdno,'rpid'); + my $cattail = cattail(); + for my $fdno (1,2) { + my $wpid = open(my $fdw,"|-","$opt::compress_program >". + $self->fd($fdno,'name')) || die $?; + $self->set_fd($fdno,'w',$fdw); + $self->set_fd($fdno,'wpid',$wpid); + my $rpid = open(my $fdr, "-|", "perl", "-e", $cattail, + $opt::decompress_program, $self->fd($fdno,'name')) || die $?; + $self->set_fd($fdno,'r',$fdr); + $self->set_fd($fdno,'rpid',$rpid); + # Unlink if required + # TODO this is disabled for now + # unlink $self->fd($fdno,"unlink"); + + } } else { - $self->set_fd(1,$outfh); - $self->set_fd(2,$errfh); - $unlink_out && unlink($outname); - $unlink_err && unlink($errname); + # Set reading FD + for my $fdno (1,2) { + my $fdw = $self->fd($fdno,'w'); + # Duplicate filehandle, so fdw can be closed seperately + open(my $fdr,"<&",$fdw) || die; + $self->set_fd($fdno,'r',$fdr); + # Unlink if required + unlink $self->fd($fdno,"unlink"); + } } } sub set_fd { # Set file descriptor - my ($self, $fd_no, $fd) = @_; - $self->{'fd'}{$fd_no} = $fd; + my ($self, $fd_no, $key, $fd) = @_; + $self->{'fd'}{$fd_no,$key} = $fd; } sub fd { # Get file descriptor - my ($self, $fd_no) = @_; - return $self->{'fd'}{$fd_no}; + my ($self, $fd_no, $key) = @_; + return $self->{'fd'}{$fd_no,$key}; } -sub set_fd_input { +sub _set_fd_input { my ($self, $fd_no, $fd_input) = @_; $self->{'fd_input'}{$fd_no} = $fd_input; } -sub fd_input { +sub _fd_input { # Get input file descriptor my ($self, $fd_no) = @_; return $self->{'fd_input'}{$fd_no}; } -sub set_fd_file_name { +sub _set_fd_file_name { # Set file name for a file descriptor my $self = shift; my $fd_no = shift; $self->{'fd_file_name',$fd_no} = shift; } -sub fd_file_name { +sub _fd_file_name { # Get file name for a file descriptor my $self = shift; my $fd_no = shift; @@ -4146,7 +4217,7 @@ sub fd_file_name { sub write { my $self = shift; my $remaining_ref = shift; - my $stdin_fh = $self->fd(0); + my $stdin_fh = $self->fd(0,"w"); syswrite($stdin_fh,$$remaining_ref); } @@ -4181,7 +4252,7 @@ sub non_block_write { # use Fcntl; # my $flags = ''; for my $buf (substr($self->{'stdin_buffer'},$self->{'stdin_buffer_pos'})) { - my $in = $self->fd(0); + my $in = $self->fd(0,"w"); # fcntl($in, F_GETFL, $flags) # or die "Couldn't get flags for HANDLE : $!\n"; # $flags |= O_NONBLOCK; @@ -4701,7 +4772,7 @@ sub start { my $pid; $job->openoutputfiles(); - my($stdout_fh,$stderr_fh) = ($job->fd(1),$job->fd(2)); + my($stdout_fh,$stderr_fh) = ($job->fd(1,"w"),$job->fd(2,"w")); local (*IN,*OUT,*ERR); open OUT, '>&', $stdout_fh or ::die_bug("Can't redirect STDOUT: $!"); open ERR, '>&', $stderr_fh or ::die_bug("Can't dup STDOUT: $!"); @@ -4743,7 +4814,7 @@ sub start { ::die_bug("open3-pipe"); 1; }; - $job->set_fd(0,$stdin_fh); + $job->set_fd(0,"w",$stdin_fh); } elsif(@opt::a and not $Global::stdin_in_opt_a and $job->seq() == 1 and $job->sshlogin()->string() eq ":") { # Give STDIN to the first job if using -a (but only if running @@ -4857,13 +4928,13 @@ sub print { if($opt::pipe and $self->virgin()) { # Nothing was printed to this job: # cleanup tmp files if --files was set - unlink $self->fd_file_name(1); + unlink $self->fd(1,"name"); return; } if($opt::dryrun) { # Nothing was printed to this job: # cleanup tmp files if --files was set - unlink $self->fd_file_name(1); + unlink $self->fd(1,"name"); } if($Global::joblog) { $self->print_joblog() } @@ -4893,7 +4964,7 @@ sub print { # Sort by file descriptor numerically: 1,2,3,..,9,10,11 $fdno == 0 and next; my $out_fd = $Global::fd{$fdno}; - my $in_fd = $self->fd($fdno); + my $in_fd = $self->fd($fdno,"r"); if(not $in_fd) { if(not $Job::file_descriptor_warning_printed{$fdno}++) { # ::warning("File descriptor $fdno not defined\n"); @@ -4903,9 +4974,9 @@ sub print { ::debug("File descriptor $fdno:"); if($opt::files) { # If --compress: $in_fd must be closed first. - close($in_fd); - if($fdno == 1 and $self->fd_file_name($fdno)) { - print $out_fd $self->fd_file_name($fdno),"\n"; + close $self->fd($fdno,"w"); + if($fdno == 1 and $self->fd($fdno,"name")) { + print $out_fd $self->fd($fdno,"name"),"\n"; } } elsif($opt::linebuffer) { # Line buffered print out @@ -4952,21 +5023,13 @@ sub print { } } else { my $buf; - if($opt::compress) { - # Close for writing so the compressor can finish up - # (Are we really sure it finishes? Potential bug) - close($in_fd); - # Save STDIN file handle - open(my $stdin_copy, "<&", "STDIN") or ::die_bug("Can't dup STDIN: $!"); - my $fd = $self->fd_input($fdno); - # Replace STDIN with the file on disk - open(STDIN, "<&", $fd); - ::debug("$opt::decompress_program |"); - # Start decompression with the fake STDIN - open($in_fd, "-|", $opt::decompress_program) || - ::die_bug("Decompressing failed ($opt::decompress_program): $!"); - # Put STDIN file handle back - open(STDIN, "<&", $stdin_copy) or ::die_bug("Can't dup STDIN: $!"); + close $self->fd($fdno,"w"); + if($self->fd($fdno,"wpid")) { + # This is --compress + # Wait for $compress_program to finish + waitpid($self->fd($fdno,"wpid"),0); + # Then tell cattail this the last reading + CORE::kill "HUP", $self->fd($fdno,"rpid"); } else { # Seek to start seek $in_fd, 0, 0; diff --git a/testsuite/wanted-results/test13 b/testsuite/wanted-results/test13 index 7f57e00e..9f9a5e9e 100644 --- a/testsuite/wanted-results/test13 +++ b/testsuite/wanted-results/test13 @@ -9,6 +9,7 @@ begin 5 6 7 +parallel: Warning: No more file handles. Raising ulimit -n or /etc/security/limits.conf may help. 8 9 10 @@ -17,7 +18,6 @@ begin 13 14 15 -parallel: Warning: No more file handles. Raising ulimit -n or /etc/security/limits.conf may help. 16 17 18 @@ -51,8 +51,6 @@ job2 14 15 16 -17 -18 2 3 4 @@ -62,7 +60,8 @@ job2 8 9 parallel: SIGTERM received. No new jobs will be started. -parallel: Waiting for these 9 jobs to finish. Send SIGTERM again to stop now. +parallel: Waiting for these 8 jobs to finish. Send SIGTERM again to stop now. +parallel: Warning: No more file handles. Raising ulimit -n or /etc/security/limits.conf may help. parallel: sleep 3; echo 10 parallel: sleep 3; echo 11 parallel: sleep 3; echo 12 @@ -70,8 +69,7 @@ parallel: sleep 3; echo 13 parallel: sleep 3; echo 14 parallel: sleep 3; echo 15 parallel: sleep 3; echo 16 -parallel: sleep 3; echo 17 -parallel: sleep 3; echo 18 +parallel: sleep 3; echo 9 ### Test bug: empty line for | sh with -k a b