From f1e58af040b66887a321c482b189708400626082 Mon Sep 17 00:00:00 2001 From: Ole Tange Date: Sat, 23 Nov 2019 23:16:20 +0100 Subject: [PATCH] --retries works (possibly breaking something else). --- doc/haikus | 5 +- src/parallel | 165 ++++++++++++++++++++++++++++++++++----------------- 2 files changed, 111 insertions(+), 59 deletions(-) diff --git a/doc/haikus b/doc/haikus index 1ba4770b..965937bd 100644 --- a/doc/haikus +++ b/doc/haikus @@ -3,7 +3,7 @@ Quote of the month: Well anyway, It was blazingly fast and astonished by performance. guess I'll never use xargs. -- (Not) Akaming @_Akamig@twitter -GNU parallel has helped me kill a Hadoop cluster before. + GNU parallel has helped me kill a Hadoop cluster before. -- Travis Campbell @hcoyote@twitter Yeah, GNU parallel is a beast when used accordingly. @@ -43,7 +43,6 @@ GNU parallel has helped me kill a Hadoop cluster before. gnu parallel and emacs). -- Peter Kjellström @nsccap@twitter - GNU/Parallel umm... tempting. -- k-leb k @dcatdemon@twitter @@ -53,11 +52,9 @@ GNU parallel has helped me kill a Hadoop cluster before. Not sure if it counts as an "alt" tool but GNU parallel really took my shell scripting game to the next level. -- @alinajaf@twitter - === Used === [L]earning about parallel was amazing for me, it gives us many beautiful solutions. -- SergioAraujo@stackoverflow - I've said it before: The command line program GNU Parallel is a godsend. -- Jo Chr. Oterhals @oterhals diff --git a/src/parallel b/src/parallel index 8b8359cb..f2c8c4e6 100755 --- a/src/parallel +++ b/src/parallel @@ -949,18 +949,30 @@ sub spreadstdin() { my $blocksize = $Global::blocksize; my $in = *STDIN; my $header = find_header(\$buf,$in); + my $timeout = 3; while(1) { my $anything_written = 0; my $buflen = length $buf; my $readsize = ($buflen < $blocksize) ? $blocksize-$buflen : $blocksize; # If $buf < $blocksize, append so it is $blocksize long after reading. # Otherwise append a full $blocksize + local $SIG{ALRM} = sub { + ::set_fh_non_blocking($in); + read($in,substr($buf,$buflen,0),$readsize); + ::set_fh_blocking($in); +# warn("ee $buf"); + alarm $timeout; + }; + # alarm $timeout; if(not read($in,substr($buf,$buflen,0),$readsize)) { +# warn("ff"); # End-of-file $chunk_number != 1 and last; # Force the while-loop once if everything was read by header reading $one_time_through++ and last; } + # warn("yy"); + # alarm 0; if($opt::r) { # Remove empty lines $buf =~ s/^\s*\n//gm; @@ -2071,7 +2083,7 @@ sub check_invalid_option_combinations() { sub init_globals() { # Defaults: - $Global::version = 20191122; + $Global::version = 20191023; $Global::progname = 'parallel'; $::name = "GNU Parallel"; $Global::infinity = 2**31; @@ -7954,6 +7966,29 @@ sub free_slot($) { } } +{ + my $which_sh; + + sub startshell($) { + my $self = shift; + + # Shell must support 'exec >foo 2>bar' + # Using sh always will cause functions to not be exported + # Could perl be used instead? + $which_sh = $Global::cshell ? "/bin/sh" : $Global::shell; + my ($pid, $stdin); + if($pid = open($stdin, "|-", $which_sh)) { + my $fileno = fileno($stdin); + # Assume we get pid, fileno from spawner child + $self->{'pid'} = $pid; + open(my $stdin_fh, ">&=$fileno") or die ($fileno); + $self->set_fh(0,"w",$stdin_fh); + } else { + die(); + } + } +} + sub openoutputfiles($) { # Open files for STDOUT and STDERR # Set file handles in $self->fh @@ -7964,20 +7999,21 @@ sub openoutputfiles($) { ($opt::keeporder or $opt::files or $opt::results or $opt::compress or $opt::compress_program or $opt::decompress_program)) { - # Do not save to files: Use non-blocking pipe my ($outfhr, $errfhr); - pipe($outfhr, $outfhw) || die; - pipe($errfhr, $errfhw) || die; - $self->set_fh(1,'w',$outfhw); - $self->set_fh(2,'w',$errfhw); + $outname = ::tmpfifo(); + $errname = ::tmpfifo(); + open($outfhr,"+<",$outname); + open($errfhr,"+<",$errname); $self->set_fh(1,'r',$outfhr); $self->set_fh(2,'r',$errfhr); + open($outfhw,"+>",$outname); + open($errfhw,"+>",$errname); + $self->set_fh(1,'w',$outfhw); + $self->set_fh(2,'w',$errfhw); # Make it possible to read non-blocking from the pipe for my $fdno (1,2) { ::set_fh_non_blocking($self->fh($fdno,'r')); } - # Return immediately because we do not need setting filenames - return; } elsif($opt::results and not $Global::csvsep) { my $out = $self->{'commandline'}->results_out(); my $seqname; @@ -8037,8 +8073,8 @@ sub openoutputfiles($) { } } else { # --ungroup - open($outfhw,">&",$Global::fd{1}) || die; - open($errfhw,">&",$Global::fd{2}) || die; +# open($outfhw,">&",$Global::fd{1}) || die; +# open($errfhw,">&",$Global::fd{2}) || die; # File name must be empty as it will otherwise be printed $outname = ""; $errname = ""; @@ -8046,15 +8082,31 @@ sub openoutputfiles($) { $self->set_fh(2,"unlink",$errname); } # Set writing FD - $self->set_fh(1,'w',$outfhw); - $self->set_fh(2,'w',$errfhw); +# $self->set_fh(1,'w',$outfhw); +# $self->set_fh(2,'w',$errfhw); $self->set_fh(1,'name',$outname); $self->set_fh(2,'name',$errname); + if($opt::compress) { $self->filter_through_compress(); } elsif(not $opt::ungroup) { $self->grouped(); } + my $in = $self->fh(0,'w'); + if($outname) { + syswrite($in,"exec >$outname\n"); + # Must be unlinked by worker child + my $n = $self->fh(1,"unlink"); + if(-e $n) { syswrite($in,"rm $n\n"); } + } elsif($errname) { + syswrite($in,"exec 2>$errname\n"); + # Must be unlinked by worker child + my $n = $self->fh(2,"unlink"); + if(-e $n) { syswrite($in,"rm $n\n"); } + } +# close $outfhw; +# close $errfhw; + if($opt::linebuffer) { # Make it possible to read non-blocking from # the buffer files @@ -8114,7 +8166,7 @@ sub grouped($) { ::die_bug("fdr: Cannot open ".$self->fh($fdno,'name')); $self->set_fh($fdno,'r',$fdr); # Unlink if not debugging - $Global::debug or ::rm($self->fh($fdno,"unlink")); +# $Global::debug or ::rm($self->fh($fdno,"unlink")); } } @@ -9278,15 +9330,23 @@ sub start($) { open OUT, '>&', $stdout_fh or ::die_bug("Can't dup STDOUT: $!"); open ERR, '>&', $stderr_fh or ::die_bug("Can't dup STDERR: $!"); # The eval is needed to catch exception from open3 - eval { - if(not $pid = ::open3($stdin_fh, ">&OUT", ">&ERR", "-")) { - # Each child gets its own process group to make it safe to killall - eval{ setpgrp(0,0) }; - eval{ setpriority(0,0,$opt::nice) }; - exec($Global::shell,"-c",$command) - || ::die_bug("open3-$stdin_fh $command"); - } - }; +# eval { +# if(not $pid = ::open3($stdin_fh, ">&OUT", ">&ERR", "-")) { +# # Each child gets its own process group to make it safe to killall +# eval{ setpgrp(0,0) }; +# eval{ setpriority(0,0,$opt::nice) }; +# exec($Global::shell,"-c",$command) +# || ::die_bug("open3-$stdin_fh $command"); +# } +# }; +# my ($in,$out,$err); +# if($pid = open($in, "|-", "/bin/sh")) { +# $fileno = fileno($in); +# } + # $pid $fileno +# $stdin_fh = $self->fh("w",0) +# print $stdin_fh $command; + return $pid; } @@ -9366,7 +9426,7 @@ sub start($) { eval $redefine_eval; } - sub open3_setpgrp { + sub _open3_setpgrp { my $setgprp_cache = $Global::cache_dir . "/tmp/sshlogin/" . ::hostname() . "/setpgrp_func"; if(-e $setgprp_cache) { @@ -9395,55 +9455,47 @@ sub start($) { # $job->skip() was called $command = "true"; } + $ENV{'PARALLEL_SEQ'} = $job->seq(); + $ENV{'PARALLEL_PID'} = $$; + $ENV{'PARALLEL_TMP'} = ::tmpname("par"); + $job->startshell(); $job->openoutputfiles(); $job->print_verbose_dryrun(); # Call slot to store the slot value $job->slot(); - my($stdout_fh,$stderr_fh) = ($job->fh(1,"w"),$job->fh(2,"w")); if($opt::dryrun or $opt::sqlmaster) { $command = "true"; } - $ENV{'PARALLEL_SEQ'} = $job->seq(); - $ENV{'PARALLEL_PID'} = $$; - $ENV{'PARALLEL_TMP'} = ::tmpname("par"); $job->add_rm($ENV{'PARALLEL_TMP'}); ::debug("run", $Global::total_running, " processes . Starting (", $job->seq(), "): $command\n"); + + my ($stdin_fh) = $job->fh(0,"w"); + if ($opt::tty and -c "/dev/tty" and + open(my $devtty_fh, "<", "/dev/tty")) { + # Give /dev/tty to the command if no one else is using it + close $devtty_fh; + syswrite($stdin_fh,"exec < /dev/tty\n"); + } + my @setpgrp = ('exec perl','-e', + ::Q("eval\{setpgrp\}\;eval\{setpriority\(0,0,$opt::nice\)\}\;". + "exec '$Global::shell', '-c', \@ARGV")); + syswrite($stdin_fh,"@setpgrp ".::Q($command)."\n"); +# print $stdin_fh "@setpgrp ",::Q($command),"\n"; + ::debug("run", "Run: $command\n"); if($opt::pipe) { - my ($stdin_fh) = ::gensym(); - $pid = open3_setpgrp($stdin_fh,$stdout_fh,$stderr_fh,$command); if($opt::roundrobin and not $opt::keeporder) { # --keep-order will make sure the order will be reproducible ::set_fh_non_blocking($stdin_fh); } - $job->set_fh(0,"w",$stdin_fh); if($opt::tee or $opt::shard or $opt::bin) { $job->set_virgin(0); } - } elsif ($opt::tty and -c "/dev/tty" and - open(my $devtty_fh, "<", "/dev/tty")) { - # Give /dev/tty to the command if no one else is using it - # The eval is needed to catch exception from open3 - local (*IN,*OUT,*ERR); - open OUT, '>&', $stdout_fh or ::die_bug("Can't dup STDOUT: $!"); - open ERR, '>&', $stderr_fh or ::die_bug("Can't dup STDERR: $!"); - *IN = $devtty_fh; - # The eval is needed to catch exception from open3 - my @wrap = ('perl','-e', - "eval\{setpriority\(0,0,$opt::nice\)\}\;". - "exec '$Global::shell', '-c', \@ARGV"); - eval { - $pid = ::open3("<&IN", ">&OUT", ">&ERR", @wrap, $command) - || ::die_bug("open3-/dev/tty"); - 1; - }; - close $devtty_fh; - $job->set_virgin(0); } else { - $pid = open3_setpgrp(::gensym(),$stdout_fh,$stderr_fh,$command); + # Close stdin if not pipe input + close $stdin_fh; $job->set_virgin(0); } - if($pid) { + if($job->{'pid'}) { # A job was started $Global::total_running++; $Global::total_started++; - $job->set_pid($pid); $job->set_starttime(); $Global::running{$job->pid()} = $job; if($opt::timeout) { @@ -9999,9 +10051,12 @@ sub print_tag(@) { sub free_ressources() { my $self = shift; if(not $opt::ungroup) { + my $fh; for my $fdno (sort { $a <=> $b } keys %Global::fd) { - close $self->fh($fdno,"w"); - close $self->fh($fdno,"r"); + $fh = $self->fh($fdno,"w"); + $fh and close $fh; + $fh = $self->fh($fdno,"r"); + $fh and close $fh; } } } @@ -10010,7 +10065,7 @@ sub print_normal($) { my $self = shift; my ($fdno,$in_fh,$out_fd) = @_; my $buf; - close $self->fh($fdno,"w"); + #close $self->fh($fdno,"w"); if($? and $opt::compress) { ::error($opt::compress_program." failed."); $self->set_exitstatus(255);