parallel: --shebang-wrap for FreeBSD. cattail refactored.

This commit is contained in:
Ole Tange 2014-06-23 02:08:24 +02:00
parent d8161294b2
commit 21753cbbd7

View file

@ -1171,10 +1171,27 @@ sub read_options {
exec "$0 --skip-first-line -a $argfile @ARGV"; exec "$0 --skip-first-line -a $argfile @ARGV";
} }
if($opt::shebang_wrap) { if($opt::shebang_wrap) {
my $parser = shift @ARGV; my @options;
my $scriptfile = shell_quote_scalar(shift @ARGV); my @parser;
if ($^O eq 'freebsd') {
# FreeBSD's #! puts different values in @ARGV than Linux' does.
my @nooptions = @ARGV;
get_options_from_array(\@nooptions);
while($#ARGV > $#nooptions) {
push @options, shift @ARGV;
}
while(@ARGV and $ARGV[0] ne ":::") {
push @parser, shift @ARGV;
}
if(@ARGV and $ARGV[0] eq ":::") {
shift @ARGV;
}
} else {
@options = shift @ARGV;
}
my $script = shell_quote_scalar(shift @ARGV);
# exec myself to split $ARGV[0] into separate fields # exec myself to split $ARGV[0] into separate fields
exec "$0 --internal-pipe-means-argfiles $parser $scriptfile ::: @ARGV"; exec "$0 --internal-pipe-means-argfiles @options @parser $script ::: @ARGV";
} }
} }
@ -4315,65 +4332,73 @@ sub slot {
return $self->{'commandline'}->slot(); return $self->{'commandline'}->slot();
} }
sub cattail { {
# Returns: my($cattail);
# $cattail = perl program for: cattail "decompress program" writerpid [file_to_decompress or stdin] [file_to_unlink]
my $cattail = q{
# cat followed by tail.
# If $writerpid dead: finish after this round
use Fcntl;
$|=1; sub cattail {
# Returns:
# $cattail = perl program for: cattail "decompress program" writerpid [file_to_decompress or stdin] [file_to_unlink]
if(not $cattail) {
$cattail = q{
# cat followed by tail.
# If $writerpid dead: finish after this round
use Fcntl;
my ($cmd, $writerpid, $read_file, $unlink_file) = @ARGV; $|=1;
if($read_file) {
open(IN,"<",$read_file) || die("cattail: Cannot open $read_file"); my ($cmd, $writerpid, $read_file, $unlink_file) = @ARGV;
} else { if($read_file) {
*IN = *STDIN; open(IN,"<",$read_file) || die("cattail: Cannot open $read_file");
} else {
*IN = *STDIN;
}
my $flags;
fcntl(IN, F_GETFL, $flags) || die $!; # Get the current flags on the filehandle
$flags |= O_NONBLOCK; # Add non-blocking to the flags
fcntl(IN, F_SETFL, $flags) || die $!; # Set the flags on the filehandle
open(OUT,"|-",$cmd) || die("cattail: Cannot run $cmd");
while(1) {
# clear EOF
seek(IN,0,1);
my $writer_running = kill 0, $writerpid;
$read = sysread(IN,$buf,32768);
if($read) {
# We can unlink the file now: The writer has written something
-e $unlink_file and unlink $unlink_file;
# Blocking print
while($buf) {
my $bytes_written = syswrite(OUT,$buf);
# syswrite may be interrupted by SIGHUP
substr($buf,0,$bytes_written) = "";
}
# Something printed: Wait less next time
$sleep /= 2;
} else {
if(eof(IN) and not $writer_running) {
# Writer dead: There will never be more to read => exit
exit;
}
# TODO This could probably be done more efficiently using select(2)
# Nothing read: Wait longer before next read
# Up to 30 milliseconds
$sleep = ($sleep < 30) ? ($sleep * 1.001 + 0.01) : ($sleep);
usleep($sleep);
}
}
sub usleep {
# Sleep this many milliseconds.
my $secs = shift;
select(undef, undef, undef, $secs/1000);
}
};
$cattail =~ s/#.*//mg;
$cattail =~ s/\s+/ /g;
}
return $cattail;
} }
my $flags;
fcntl(IN, F_GETFL, $flags) || die $!; # Get the current flags on the filehandle
$flags |= O_NONBLOCK; # Add non-blocking to the flags
fcntl(IN, F_SETFL, $flags) || die $!; # Set the flags on the filehandle
open(OUT,"|-",$cmd) || die("cattail: Cannot run $cmd");
while(1) {
# clear EOF
seek(IN,0,1);
my $writer_running = kill 0, $writerpid;
$read = sysread(IN,$buf,32768);
if($read) {
# We can unlink the file now: The writer has written something
-e $unlink_file and unlink $unlink_file;
# Blocking print
while($buf) {
my $bytes_written = syswrite(OUT,$buf);
# syswrite may be interrupted by SIGHUP
substr($buf,0,$bytes_written) = "";
}
# Something printed: Wait less next time
$sleep /= 2;
} else {
if(eof(IN) and not $writer_running) {
# Writer dead: There will never be more to read => exit
exit;
}
# TODO This could probably be done more efficiently using select(2)
# Nothing read: Wait longer before next read
# Up to 30 milliseconds
$sleep = ($sleep < 30) ? ($sleep * 1.001 + 0.01) : ($sleep);
usleep($sleep);
}
}
sub usleep {
# Sleep this many milliseconds.
my $secs = shift;
select(undef, undef, undef, $secs/1000);
}
};
return $cattail;
} }
sub openoutputfiles { sub openoutputfiles {
@ -4836,6 +4861,7 @@ sub sshlogin_wrap {
if($opt::pipe and $opt::ctrlc if($opt::pipe and $opt::ctrlc
or or
not $opt::pipe and not $opt::noctrlc) { not $opt::pipe and not $opt::noctrlc) {
# TODO Determine if this is needed
# Propagating CTRL-C to kill remote jobs requires # Propagating CTRL-C to kill remote jobs requires
# remote jobs to be run with a terminal. # remote jobs to be run with a terminal.
$ssh_options = "-tt -oLogLevel=quiet"; $ssh_options = "-tt -oLogLevel=quiet";
@ -4850,13 +4876,40 @@ sub sshlogin_wrap {
my $wd = ::shell_quote_file($self->workdir()); my $wd = ::shell_quote_file($self->workdir());
$remote_pre .= ::shell_quote_scalar("mkdir -p ") . $wd . $remote_pre .= ::shell_quote_scalar("mkdir -p ") . $wd .
::shell_quote_scalar("; cd ") . $wd . ::shell_quote_scalar("; cd ") . $wd .
::shell_quote_scalar(qq{ || exit 255;}); # exit 255 (instead of exec false) would be the correct thing,
# but that fails on tcsh
::shell_quote_scalar(qq{ || exec false;});
} }
my $signal_script = "perl -e '".
q{
use IO::Poll;
$SIG{CHLD} = sub {exit ($?&127 ? 128+($?&127) : 1+$?>>8)};
$p = IO::Poll->new;
$p->mask(STDOUT, POLLHUP);
$pid=fork; unless($pid) {setpgrp; exec $ENV{SHELL}, "-c", @ARGV; die "exec: $!\n"}
$p->poll;
kill SIGHUP, -$pid unless $done;
wait; exit ($?&127 ? 128+($?&127) : 1+$?>>8)
} . "' ";
# q{
# use IO::Poll;
# $SIG{CHLD} = sub {$done = 1};
# $p = IO::Poll->new;
# $p->mask(STDOUT, POLLHUP);
# $pid=fork; unless($pid) {setpgrp; exec $ENV{SHELL}, "-c", @ARGV; die "exec: $!\n"}
# $p->poll;
# kill SIGHUP, -$pid unless $done;
# wait; exit ($?&127 ? 128+($?&127) : 1+$?>>8)
# } . "' ";
$signal_script =~ s/\s+/ /g;
$self->{'sshlogin_wrap'} = $self->{'sshlogin_wrap'} =
($pre ($pre
. "$sshcmd $ssh_options $serverlogin $parallel_env " . "$sshcmd $ssh_options $serverlogin $parallel_env "
. $remote_pre . $remote_pre
. ::shell_quote_scalar($next_command_line) . ";" # . ::shell_quote_scalar($signal_script . ::shell_quote_scalar($next_command_line))
. ::shell_quote_scalar($next_command_line)
. ";"
. $post); . $post);
} }
} }
@ -5488,7 +5541,7 @@ sub set_exitsignal {
or or
tell $disk_full_fh == $pos) { tell $disk_full_fh == $pos) {
::error("Output is incomplete. Cannot append to buffer file in \$TMPDIR. Is the disk full?\n"); ::error("Output is incomplete. Cannot append to buffer file in \$TMPDIR. Is the disk full?\n");
::error("Change \$TMPDIR with --tmpdir.\n"); ::error("Change \$TMPDIR with --tmpdir or use --compress.\n");
::wait_and_exit(255); ::wait_and_exit(255);
} }
truncate $disk_full_fh, $pos; truncate $disk_full_fh, $pos;
@ -7091,6 +7144,6 @@ sub mkdir_or_die {
} }
# Keep perl -w happy # Keep perl -w happy
$opt::x = $Semaphore::timeout = $Semaphore::wait = $opt::shebang = $Global::no_more_file_handles_warned = $opt::x = $Semaphore::timeout = $Semaphore::wait = $Global::no_more_file_handles_warned =
$Job::file_descriptor_warning_printed = 0; $Job::file_descriptor_warning_printed = $Global::max_slot_number = 0;