From 21753cbbd725d4b194f6a3cd736e3062a43342a5 Mon Sep 17 00:00:00 2001 From: Ole Tange Date: Mon, 23 Jun 2014 02:08:24 +0200 Subject: [PATCH] parallel: --shebang-wrap for FreeBSD. cattail refactored. --- src/parallel | 183 +++++++++++++++++++++++++++++++++------------------ 1 file changed, 118 insertions(+), 65 deletions(-) diff --git a/src/parallel b/src/parallel index 86d5a859..db0c758a 100755 --- a/src/parallel +++ b/src/parallel @@ -1171,10 +1171,27 @@ sub read_options { exec "$0 --skip-first-line -a $argfile @ARGV"; } if($opt::shebang_wrap) { - my $parser = shift @ARGV; - my $scriptfile = shell_quote_scalar(shift @ARGV); + my @options; + my @parser; + if ($^O eq 'freebsd') { + # FreeBSD's #! puts different values in @ARGV than Linux' does. + my @nooptions = @ARGV; + get_options_from_array(\@nooptions); + while($#ARGV > $#nooptions) { + push @options, shift @ARGV; + } + while(@ARGV and $ARGV[0] ne ":::") { + push @parser, shift @ARGV; + } + if(@ARGV and $ARGV[0] eq ":::") { + shift @ARGV; + } + } else { + @options = shift @ARGV; + } + my $script = shell_quote_scalar(shift @ARGV); # exec myself to split $ARGV[0] into separate fields - exec "$0 --internal-pipe-means-argfiles $parser $scriptfile ::: @ARGV"; + exec "$0 --internal-pipe-means-argfiles @options @parser $script ::: @ARGV"; } } @@ -4315,65 +4332,73 @@ sub slot { return $self->{'commandline'}->slot(); } -sub cattail { - # Returns: - # $cattail = perl program for: cattail "decompress program" writerpid [file_to_decompress or stdin] [file_to_unlink] - my $cattail = q{ - # cat followed by tail. - # If $writerpid dead: finish after this round - use Fcntl; - - $|=1; +{ + my($cattail); - my ($cmd, $writerpid, $read_file, $unlink_file) = @ARGV; - if($read_file) { - open(IN,"<",$read_file) || die("cattail: Cannot open $read_file"); - } else { - *IN = *STDIN; + sub cattail { + # Returns: + # $cattail = perl program for: cattail "decompress program" writerpid [file_to_decompress or stdin] [file_to_unlink] + if(not $cattail) { + $cattail = q{ + # cat followed by tail. + # If $writerpid dead: finish after this round + use Fcntl; + + $|=1; + + my ($cmd, $writerpid, $read_file, $unlink_file) = @ARGV; + if($read_file) { + open(IN,"<",$read_file) || die("cattail: Cannot open $read_file"); + } else { + *IN = *STDIN; + } + + my $flags; + fcntl(IN, F_GETFL, $flags) || die $!; # Get the current flags on the filehandle + $flags |= O_NONBLOCK; # Add non-blocking to the flags + fcntl(IN, F_SETFL, $flags) || die $!; # Set the flags on the filehandle + open(OUT,"|-",$cmd) || die("cattail: Cannot run $cmd"); + + while(1) { + # clear EOF + seek(IN,0,1); + my $writer_running = kill 0, $writerpid; + $read = sysread(IN,$buf,32768); + if($read) { + # We can unlink the file now: The writer has written something + -e $unlink_file and unlink $unlink_file; + # Blocking print + while($buf) { + my $bytes_written = syswrite(OUT,$buf); + # syswrite may be interrupted by SIGHUP + substr($buf,0,$bytes_written) = ""; + } + # Something printed: Wait less next time + $sleep /= 2; + } else { + if(eof(IN) and not $writer_running) { + # Writer dead: There will never be more to read => exit + exit; + } + # TODO This could probably be done more efficiently using select(2) + # Nothing read: Wait longer before next read + # Up to 30 milliseconds + $sleep = ($sleep < 30) ? ($sleep * 1.001 + 0.01) : ($sleep); + usleep($sleep); + } + } + + sub usleep { + # Sleep this many milliseconds. + my $secs = shift; + select(undef, undef, undef, $secs/1000); + } + }; + $cattail =~ s/#.*//mg; + $cattail =~ s/\s+/ /g; + } + return $cattail; } - - my $flags; - fcntl(IN, F_GETFL, $flags) || die $!; # Get the current flags on the filehandle - $flags |= O_NONBLOCK; # Add non-blocking to the flags - fcntl(IN, F_SETFL, $flags) || die $!; # Set the flags on the filehandle - open(OUT,"|-",$cmd) || die("cattail: Cannot run $cmd"); - - while(1) { - # clear EOF - seek(IN,0,1); - my $writer_running = kill 0, $writerpid; - $read = sysread(IN,$buf,32768); - if($read) { - # We can unlink the file now: The writer has written something - -e $unlink_file and unlink $unlink_file; - # Blocking print - while($buf) { - my $bytes_written = syswrite(OUT,$buf); - # syswrite may be interrupted by SIGHUP - substr($buf,0,$bytes_written) = ""; - } - # Something printed: Wait less next time - $sleep /= 2; - } else { - if(eof(IN) and not $writer_running) { - # Writer dead: There will never be more to read => exit - exit; - } - # TODO This could probably be done more efficiently using select(2) - # Nothing read: Wait longer before next read - # Up to 30 milliseconds - $sleep = ($sleep < 30) ? ($sleep * 1.001 + 0.01) : ($sleep); - usleep($sleep); - } - } - - sub usleep { - # Sleep this many milliseconds. - my $secs = shift; - select(undef, undef, undef, $secs/1000); - } -}; - return $cattail; } sub openoutputfiles { @@ -4836,6 +4861,7 @@ sub sshlogin_wrap { if($opt::pipe and $opt::ctrlc or not $opt::pipe and not $opt::noctrlc) { + # TODO Determine if this is needed # Propagating CTRL-C to kill remote jobs requires # remote jobs to be run with a terminal. $ssh_options = "-tt -oLogLevel=quiet"; @@ -4850,13 +4876,40 @@ sub sshlogin_wrap { my $wd = ::shell_quote_file($self->workdir()); $remote_pre .= ::shell_quote_scalar("mkdir -p ") . $wd . ::shell_quote_scalar("; cd ") . $wd . - ::shell_quote_scalar(qq{ || exit 255;}); + # exit 255 (instead of exec false) would be the correct thing, + # but that fails on tcsh + ::shell_quote_scalar(qq{ || exec false;}); } + my $signal_script = "perl -e '". + q{ + use IO::Poll; + $SIG{CHLD} = sub {exit ($?&127 ? 128+($?&127) : 1+$?>>8)}; + $p = IO::Poll->new; + $p->mask(STDOUT, POLLHUP); + $pid=fork; unless($pid) {setpgrp; exec $ENV{SHELL}, "-c", @ARGV; die "exec: $!\n"} + $p->poll; + kill SIGHUP, -$pid unless $done; + wait; exit ($?&127 ? 128+($?&127) : 1+$?>>8) + } . "' "; +# q{ +# use IO::Poll; +# $SIG{CHLD} = sub {$done = 1}; +# $p = IO::Poll->new; +# $p->mask(STDOUT, POLLHUP); +# $pid=fork; unless($pid) {setpgrp; exec $ENV{SHELL}, "-c", @ARGV; die "exec: $!\n"} +# $p->poll; +# kill SIGHUP, -$pid unless $done; +# wait; exit ($?&127 ? 128+($?&127) : 1+$?>>8) +# } . "' "; + $signal_script =~ s/\s+/ /g; + $self->{'sshlogin_wrap'} = ($pre . "$sshcmd $ssh_options $serverlogin $parallel_env " . $remote_pre - . ::shell_quote_scalar($next_command_line) . ";" +# . ::shell_quote_scalar($signal_script . ::shell_quote_scalar($next_command_line)) + . ::shell_quote_scalar($next_command_line) + . ";" . $post); } } @@ -5488,7 +5541,7 @@ sub set_exitsignal { or tell $disk_full_fh == $pos) { ::error("Output is incomplete. Cannot append to buffer file in \$TMPDIR. Is the disk full?\n"); - ::error("Change \$TMPDIR with --tmpdir.\n"); + ::error("Change \$TMPDIR with --tmpdir or use --compress.\n"); ::wait_and_exit(255); } truncate $disk_full_fh, $pos; @@ -7091,6 +7144,6 @@ sub mkdir_or_die { } # Keep perl -w happy -$opt::x = $Semaphore::timeout = $Semaphore::wait = $opt::shebang = $Global::no_more_file_handles_warned = -$Job::file_descriptor_warning_printed = 0; +$opt::x = $Semaphore::timeout = $Semaphore::wait = $Global::no_more_file_handles_warned = +$Job::file_descriptor_warning_printed = $Global::max_slot_number = 0;