--retries works (possibly breaking something else).

This commit is contained in:
Ole Tange 2019-11-23 23:16:20 +01:00
parent 21fb3b6e60
commit f1e58af040
2 changed files with 111 additions and 59 deletions

View file

@ -3,7 +3,7 @@ Quote of the month:
Well anyway, It was blazingly fast and astonished by performance. guess I'll never use xargs. Well anyway, It was blazingly fast and astonished by performance. guess I'll never use xargs.
-- (Not) Akaming @_Akamig@twitter -- (Not) Akaming @_Akamig@twitter
GNU parallel has helped me kill a Hadoop cluster before. GNU parallel has helped me kill a Hadoop cluster before.
-- Travis Campbell @hcoyote@twitter -- Travis Campbell @hcoyote@twitter
Yeah, GNU parallel is a beast when used accordingly. Yeah, GNU parallel is a beast when used accordingly.
@ -43,7 +43,6 @@ GNU parallel has helped me kill a Hadoop cluster before.
gnu parallel and emacs). gnu parallel and emacs).
-- Peter Kjellström @nsccap@twitter -- Peter Kjellström @nsccap@twitter
GNU/Parallel umm... tempting. GNU/Parallel umm... tempting.
-- k-leb k @dcatdemon@twitter -- k-leb k @dcatdemon@twitter
@ -53,11 +52,9 @@ GNU parallel has helped me kill a Hadoop cluster before.
Not sure if it counts as an "alt" tool but GNU parallel really took my shell scripting game to the next level. Not sure if it counts as an "alt" tool but GNU parallel really took my shell scripting game to the next level.
-- @alinajaf@twitter -- @alinajaf@twitter
=== Used === === Used ===
[L]earning about parallel was amazing for me, it gives us many beautiful solutions. [L]earning about parallel was amazing for me, it gives us many beautiful solutions.
-- SergioAraujo@stackoverflow -- SergioAraujo@stackoverflow
I've said it before: The command line program GNU Parallel is a godsend. I've said it before: The command line program GNU Parallel is a godsend.
-- Jo Chr. Oterhals @oterhals -- Jo Chr. Oterhals @oterhals

View file

@ -949,18 +949,30 @@ sub spreadstdin() {
my $blocksize = $Global::blocksize; my $blocksize = $Global::blocksize;
my $in = *STDIN; my $in = *STDIN;
my $header = find_header(\$buf,$in); my $header = find_header(\$buf,$in);
my $timeout = 3;
while(1) { while(1) {
my $anything_written = 0; my $anything_written = 0;
my $buflen = length $buf; my $buflen = length $buf;
my $readsize = ($buflen < $blocksize) ? $blocksize-$buflen : $blocksize; my $readsize = ($buflen < $blocksize) ? $blocksize-$buflen : $blocksize;
# If $buf < $blocksize, append so it is $blocksize long after reading. # If $buf < $blocksize, append so it is $blocksize long after reading.
# Otherwise append a full $blocksize # Otherwise append a full $blocksize
local $SIG{ALRM} = sub {
::set_fh_non_blocking($in);
read($in,substr($buf,$buflen,0),$readsize);
::set_fh_blocking($in);
# warn("ee $buf");
alarm $timeout;
};
# alarm $timeout;
if(not read($in,substr($buf,$buflen,0),$readsize)) { if(not read($in,substr($buf,$buflen,0),$readsize)) {
# warn("ff");
# End-of-file # End-of-file
$chunk_number != 1 and last; $chunk_number != 1 and last;
# Force the while-loop once if everything was read by header reading # Force the while-loop once if everything was read by header reading
$one_time_through++ and last; $one_time_through++ and last;
} }
# warn("yy");
# alarm 0;
if($opt::r) { if($opt::r) {
# Remove empty lines # Remove empty lines
$buf =~ s/^\s*\n//gm; $buf =~ s/^\s*\n//gm;
@ -2071,7 +2083,7 @@ sub check_invalid_option_combinations() {
sub init_globals() { sub init_globals() {
# Defaults: # Defaults:
$Global::version = 20191122; $Global::version = 20191023;
$Global::progname = 'parallel'; $Global::progname = 'parallel';
$::name = "GNU Parallel"; $::name = "GNU Parallel";
$Global::infinity = 2**31; $Global::infinity = 2**31;
@ -7954,6 +7966,29 @@ sub free_slot($) {
} }
} }
{
my $which_sh;
sub startshell($) {
my $self = shift;
# Shell must support 'exec >foo 2>bar'
# Using sh always will cause functions to not be exported
# Could perl be used instead?
$which_sh = $Global::cshell ? "/bin/sh" : $Global::shell;
my ($pid, $stdin);
if($pid = open($stdin, "|-", $which_sh)) {
my $fileno = fileno($stdin);
# Assume we get pid, fileno from spawner child
$self->{'pid'} = $pid;
open(my $stdin_fh, ">&=$fileno") or die ($fileno);
$self->set_fh(0,"w",$stdin_fh);
} else {
die();
}
}
}
sub openoutputfiles($) { sub openoutputfiles($) {
# Open files for STDOUT and STDERR # Open files for STDOUT and STDERR
# Set file handles in $self->fh # Set file handles in $self->fh
@ -7964,20 +7999,21 @@ sub openoutputfiles($) {
($opt::keeporder or $opt::files or $opt::results or ($opt::keeporder or $opt::files or $opt::results or
$opt::compress or $opt::compress_program or $opt::compress or $opt::compress_program or
$opt::decompress_program)) { $opt::decompress_program)) {
# Do not save to files: Use non-blocking pipe
my ($outfhr, $errfhr); my ($outfhr, $errfhr);
pipe($outfhr, $outfhw) || die; $outname = ::tmpfifo();
pipe($errfhr, $errfhw) || die; $errname = ::tmpfifo();
$self->set_fh(1,'w',$outfhw); open($outfhr,"+<",$outname);
$self->set_fh(2,'w',$errfhw); open($errfhr,"+<",$errname);
$self->set_fh(1,'r',$outfhr); $self->set_fh(1,'r',$outfhr);
$self->set_fh(2,'r',$errfhr); $self->set_fh(2,'r',$errfhr);
open($outfhw,"+>",$outname);
open($errfhw,"+>",$errname);
$self->set_fh(1,'w',$outfhw);
$self->set_fh(2,'w',$errfhw);
# Make it possible to read non-blocking from the pipe # Make it possible to read non-blocking from the pipe
for my $fdno (1,2) { for my $fdno (1,2) {
::set_fh_non_blocking($self->fh($fdno,'r')); ::set_fh_non_blocking($self->fh($fdno,'r'));
} }
# Return immediately because we do not need setting filenames
return;
} elsif($opt::results and not $Global::csvsep) { } elsif($opt::results and not $Global::csvsep) {
my $out = $self->{'commandline'}->results_out(); my $out = $self->{'commandline'}->results_out();
my $seqname; my $seqname;
@ -8037,8 +8073,8 @@ sub openoutputfiles($) {
} }
} else { } else {
# --ungroup # --ungroup
open($outfhw,">&",$Global::fd{1}) || die; # open($outfhw,">&",$Global::fd{1}) || die;
open($errfhw,">&",$Global::fd{2}) || die; # open($errfhw,">&",$Global::fd{2}) || die;
# File name must be empty as it will otherwise be printed # File name must be empty as it will otherwise be printed
$outname = ""; $outname = "";
$errname = ""; $errname = "";
@ -8046,15 +8082,31 @@ sub openoutputfiles($) {
$self->set_fh(2,"unlink",$errname); $self->set_fh(2,"unlink",$errname);
} }
# Set writing FD # Set writing FD
$self->set_fh(1,'w',$outfhw); # $self->set_fh(1,'w',$outfhw);
$self->set_fh(2,'w',$errfhw); # $self->set_fh(2,'w',$errfhw);
$self->set_fh(1,'name',$outname); $self->set_fh(1,'name',$outname);
$self->set_fh(2,'name',$errname); $self->set_fh(2,'name',$errname);
if($opt::compress) { if($opt::compress) {
$self->filter_through_compress(); $self->filter_through_compress();
} elsif(not $opt::ungroup) { } elsif(not $opt::ungroup) {
$self->grouped(); $self->grouped();
} }
my $in = $self->fh(0,'w');
if($outname) {
syswrite($in,"exec >$outname\n");
# Must be unlinked by worker child
my $n = $self->fh(1,"unlink");
if(-e $n) { syswrite($in,"rm $n\n"); }
} elsif($errname) {
syswrite($in,"exec 2>$errname\n");
# Must be unlinked by worker child
my $n = $self->fh(2,"unlink");
if(-e $n) { syswrite($in,"rm $n\n"); }
}
# close $outfhw;
# close $errfhw;
if($opt::linebuffer) { if($opt::linebuffer) {
# Make it possible to read non-blocking from # Make it possible to read non-blocking from
# the buffer files # the buffer files
@ -8114,7 +8166,7 @@ sub grouped($) {
::die_bug("fdr: Cannot open ".$self->fh($fdno,'name')); ::die_bug("fdr: Cannot open ".$self->fh($fdno,'name'));
$self->set_fh($fdno,'r',$fdr); $self->set_fh($fdno,'r',$fdr);
# Unlink if not debugging # Unlink if not debugging
$Global::debug or ::rm($self->fh($fdno,"unlink")); # $Global::debug or ::rm($self->fh($fdno,"unlink"));
} }
} }
@ -9278,15 +9330,23 @@ sub start($) {
open OUT, '>&', $stdout_fh or ::die_bug("Can't dup STDOUT: $!"); open OUT, '>&', $stdout_fh or ::die_bug("Can't dup STDOUT: $!");
open ERR, '>&', $stderr_fh or ::die_bug("Can't dup STDERR: $!"); open ERR, '>&', $stderr_fh or ::die_bug("Can't dup STDERR: $!");
# The eval is needed to catch exception from open3 # The eval is needed to catch exception from open3
eval { # eval {
if(not $pid = ::open3($stdin_fh, ">&OUT", ">&ERR", "-")) { # if(not $pid = ::open3($stdin_fh, ">&OUT", ">&ERR", "-")) {
# Each child gets its own process group to make it safe to killall # # Each child gets its own process group to make it safe to killall
eval{ setpgrp(0,0) }; # eval{ setpgrp(0,0) };
eval{ setpriority(0,0,$opt::nice) }; # eval{ setpriority(0,0,$opt::nice) };
exec($Global::shell,"-c",$command) # exec($Global::shell,"-c",$command)
|| ::die_bug("open3-$stdin_fh $command"); # || ::die_bug("open3-$stdin_fh $command");
} # }
}; # };
# my ($in,$out,$err);
# if($pid = open($in, "|-", "/bin/sh")) {
# $fileno = fileno($in);
# }
# $pid $fileno
# $stdin_fh = $self->fh("w",0)
# print $stdin_fh $command;
return $pid; return $pid;
} }
@ -9366,7 +9426,7 @@ sub start($) {
eval $redefine_eval; eval $redefine_eval;
} }
sub open3_setpgrp { sub _open3_setpgrp {
my $setgprp_cache = $Global::cache_dir . "/tmp/sshlogin/" . my $setgprp_cache = $Global::cache_dir . "/tmp/sshlogin/" .
::hostname() . "/setpgrp_func"; ::hostname() . "/setpgrp_func";
if(-e $setgprp_cache) { if(-e $setgprp_cache) {
@ -9395,55 +9455,47 @@ sub start($) {
# $job->skip() was called # $job->skip() was called
$command = "true"; $command = "true";
} }
$ENV{'PARALLEL_SEQ'} = $job->seq();
$ENV{'PARALLEL_PID'} = $$;
$ENV{'PARALLEL_TMP'} = ::tmpname("par");
$job->startshell();
$job->openoutputfiles(); $job->openoutputfiles();
$job->print_verbose_dryrun(); $job->print_verbose_dryrun();
# Call slot to store the slot value # Call slot to store the slot value
$job->slot(); $job->slot();
my($stdout_fh,$stderr_fh) = ($job->fh(1,"w"),$job->fh(2,"w"));
if($opt::dryrun or $opt::sqlmaster) { $command = "true"; } if($opt::dryrun or $opt::sqlmaster) { $command = "true"; }
$ENV{'PARALLEL_SEQ'} = $job->seq();
$ENV{'PARALLEL_PID'} = $$;
$ENV{'PARALLEL_TMP'} = ::tmpname("par");
$job->add_rm($ENV{'PARALLEL_TMP'}); $job->add_rm($ENV{'PARALLEL_TMP'});
::debug("run", $Global::total_running, " processes . Starting (", ::debug("run", $Global::total_running, " processes . Starting (",
$job->seq(), "): $command\n"); $job->seq(), "): $command\n");
my ($stdin_fh) = $job->fh(0,"w");
if ($opt::tty and -c "/dev/tty" and
open(my $devtty_fh, "<", "/dev/tty")) {
# Give /dev/tty to the command if no one else is using it
close $devtty_fh;
syswrite($stdin_fh,"exec < /dev/tty\n");
}
my @setpgrp = ('exec perl','-e',
::Q("eval\{setpgrp\}\;eval\{setpriority\(0,0,$opt::nice\)\}\;".
"exec '$Global::shell', '-c', \@ARGV"));
syswrite($stdin_fh,"@setpgrp ".::Q($command)."\n");
# print $stdin_fh "@setpgrp ",::Q($command),"\n";
::debug("run", "Run: $command\n");
if($opt::pipe) { if($opt::pipe) {
my ($stdin_fh) = ::gensym();
$pid = open3_setpgrp($stdin_fh,$stdout_fh,$stderr_fh,$command);
if($opt::roundrobin and not $opt::keeporder) { if($opt::roundrobin and not $opt::keeporder) {
# --keep-order will make sure the order will be reproducible # --keep-order will make sure the order will be reproducible
::set_fh_non_blocking($stdin_fh); ::set_fh_non_blocking($stdin_fh);
} }
$job->set_fh(0,"w",$stdin_fh);
if($opt::tee or $opt::shard or $opt::bin) { $job->set_virgin(0); } if($opt::tee or $opt::shard or $opt::bin) { $job->set_virgin(0); }
} elsif ($opt::tty and -c "/dev/tty" and
open(my $devtty_fh, "<", "/dev/tty")) {
# Give /dev/tty to the command if no one else is using it
# The eval is needed to catch exception from open3
local (*IN,*OUT,*ERR);
open OUT, '>&', $stdout_fh or ::die_bug("Can't dup STDOUT: $!");
open ERR, '>&', $stderr_fh or ::die_bug("Can't dup STDERR: $!");
*IN = $devtty_fh;
# The eval is needed to catch exception from open3
my @wrap = ('perl','-e',
"eval\{setpriority\(0,0,$opt::nice\)\}\;".
"exec '$Global::shell', '-c', \@ARGV");
eval {
$pid = ::open3("<&IN", ">&OUT", ">&ERR", @wrap, $command)
|| ::die_bug("open3-/dev/tty");
1;
};
close $devtty_fh;
$job->set_virgin(0);
} else { } else {
$pid = open3_setpgrp(::gensym(),$stdout_fh,$stderr_fh,$command); # Close stdin if not pipe input
close $stdin_fh;
$job->set_virgin(0); $job->set_virgin(0);
} }
if($pid) { if($job->{'pid'}) {
# A job was started # A job was started
$Global::total_running++; $Global::total_running++;
$Global::total_started++; $Global::total_started++;
$job->set_pid($pid);
$job->set_starttime(); $job->set_starttime();
$Global::running{$job->pid()} = $job; $Global::running{$job->pid()} = $job;
if($opt::timeout) { if($opt::timeout) {
@ -9999,9 +10051,12 @@ sub print_tag(@) {
sub free_ressources() { sub free_ressources() {
my $self = shift; my $self = shift;
if(not $opt::ungroup) { if(not $opt::ungroup) {
my $fh;
for my $fdno (sort { $a <=> $b } keys %Global::fd) { for my $fdno (sort { $a <=> $b } keys %Global::fd) {
close $self->fh($fdno,"w"); $fh = $self->fh($fdno,"w");
close $self->fh($fdno,"r"); $fh and close $fh;
$fh = $self->fh($fdno,"r");
$fh and close $fh;
} }
} }
} }
@ -10010,7 +10065,7 @@ sub print_normal($) {
my $self = shift; my $self = shift;
my ($fdno,$in_fh,$out_fd) = @_; my ($fdno,$in_fh,$out_fd) = @_;
my $buf; my $buf;
close $self->fh($fdno,"w"); #close $self->fh($fdno,"w");
if($? and $opt::compress) { if($? and $opt::compress) {
::error($opt::compress_program." failed."); ::error($opt::compress_program." failed.");
$self->set_exitstatus(255); $self->set_exitstatus(255);