parallel: --fifo works on csh.

This commit is contained in:
Ole Tange 2015-05-15 17:25:19 +02:00
parent 847fbe125a
commit fd5622b2c6
5 changed files with 133 additions and 24 deletions

View file

@ -933,6 +933,14 @@ sub parse_options {
::error("--timeout must be seconds or percentage.");
wait_and_exit(255);
}
if(defined $opt::fifo and $opt::cat) {
::error("--fifo cannot be combined with --cat.");
::wait_and_exit(255);
}
if((defined $opt::fifo or defined $opt::cat)
and not $opt::pipepart) {
$opt::pipe = 1;
}
if(defined $opt::minversion) {
print $Global::version,"\n";
if($Global::version < $opt::minversion) {
@ -1062,7 +1070,7 @@ sub parse_options {
sub init_globals {
# Defaults:
$Global::version = 20150509;
$Global::version = 20150515;
$Global::progname = 'parallel';
$Global::infinity = 2**31;
$Global::debug = 0;
@ -4619,7 +4627,7 @@ sub compute_number_of_processes {
# This is the child
# The child takes one process slot
# It will be killed later
$SIG{TERM} = $Global::original_sig{TERM};
$SIG{'TERM'} = $Global::original_sig{'TERM'};
sleep 10000000;
exit(0);
} else {
@ -5704,7 +5712,8 @@ sub slot {
sub cattail {
# Returns:
# $cattail = perl program for: cattail "decompress program" writerpid [file_to_decompress or stdin] [file_to_unlink]
# $cattail = perl program for:
# cattail "decompress program" writerpid [file_to_decompress or stdin] [file_to_unlink]
if(not $cattail) {
$cattail = q{
# cat followed by tail (possibly with rm as soon at the file is opened)
@ -6284,6 +6293,46 @@ sub total_failed {
}
}
{
my $script;
sub fifo_wrap {
# Script to create a fifo, run a command on the fifo
# while copying STDIN to the fifo, and finally
# remove the fifo and return the exit code of the command.
if(not $script) {
# {} == $PARALLEL_TMP for --fifo
# To make it csh compatible a wrapper needs to:
# * mkfifo
# * spawn $command &
# * cat > fifo
# * waitpid to get the exit code from $command
# * be less than 1000 chars long
$script = "perl -e ". ::shell_quote_scalar
(::spacefree
(0, q{
($s,$c,$f) = @ARGV;
# mkfifo $PARALLEL_TMP
system "mkfifo", $f;
# spawn $shell -c $command &
$pid = fork || exec $s, "-c", $c;
open($o,">",$f) || die $!;
# cat > $PARALLEL_TMP
while(sysread(STDIN,$buf,32768)){
syswrite $o, $buf;
}
close $o;
# waitpid to get the exit code from $command
waitpid $pid,0;
# Cleanup
unlink $f;
exit $?/256;
}));
}
return $script;
}
}
sub wrapped {
# Wrap command with:
# * --shellquote
@ -6337,29 +6386,27 @@ sub wrapped {
}
}
if($opt::cat) {
# Append 'unlink {} without affecting $?'
# In '--cat' and '--fifo' {} == $PARALLEL_TMP.
# This is to make it possible to compute $PARALLEL_TMP on
# the fly when running remotely.
# $ENV{PARALLEL_TMP} is set in the remote wrapper before
# the command is run.
#
# Prepend 'cat > $PARALLEL_TMP;'
# Append 'unlink $PARALLEL_TMP without affecting $?'
$command =
$self->{'commandline'}->replace_placeholders(["cat > \257<\257>; "], 0, 0).
'cat > $PARALLEL_TMP;'.
$command.";". postpone_exit_and_cleanup().
'$PARALLEL_TMP';
} elsif($opt::fifo) {
# Prepend 'mkfifo {}; ('
# Append ') & _PID=$!; cat > {}; wait $_PID; '
# (This makes it fail in csh, but give the correct exit code in bash)
# Append ') & cat > {}; wait; '
# Append 'unlink {} without affecting $?'
# Set $ENV{PARALLEL_TMP} when starting a job
# Set $ENV{PARALLEL_TMP} in the remote wrapper
# mkfifo $PARALLEL_TMP;
# {} = $PARALLEL_TMP;
# (...) &
# cat > $PARALLEL_TMP; wait \$_PID; cleanup $PARALLEL_TMP
# perl -e 'open($fifo,">",shift); while(read){print FIFO};unlink $fifo;waitpid($pid,0);exit $?' $! $PARALLEL_FIFO
$command =
"mkfifo \$PARALLEL_TMP\n (".
$command.";".
') & _PID=$!; cat > $PARALLEL_TMP; wait $_PID; '.
postpone_exit_and_cleanup().
'$PARALLEL_TMP';
$command = fifo_wrap(). " ".
$Global::shell. " ".
::shell_quote_scalar($command).
' $PARALLEL_TMP'.
';';
}
# Wrap with ssh + tranferring of files
$command = $self->sshlogin_wrap($command);

View file

@ -461,6 +461,8 @@ will give data to the program on stdin (standard input). With B<--cat>
GNU B<parallel> will create a temporary file with the name in {}, so
you can do: B<parallel --pipe --cat wc {}>.
Implies B<--pipe> unless B<--pipepart> is used.
See also B<--fifo>.
@ -504,9 +506,9 @@ Compress temporary files. If the output is big and very compressible
this will take up less disk space in $TMPDIR and possibly be faster
due to less disk I/O.
GNU B<parallel> will try B<lzop>, B<pigz>, B<gzip>, B<pbzip2>,
B<plzip>, B<bzip2>, B<lzma>, B<lzip>, B<xz> in that order, and use the
first available.
GNU B<parallel> will try B<lz4>, B<pigz>, B<lzop>, B<plzip>,
B<pbzip2>, B<pxz>, B<gzip>, B<lzma>, B<xz>, B<bzip2>, B<lzip> in that
order, and use the first available.
=item B<--compress-program> I<prg>
@ -654,6 +656,8 @@ with the name in {}, so you can do: B<parallel --pipe --fifo wc {}>.
Beware: If data is not read from the fifo, the job will block forever.
Implies B<--pipe> unless B<--pipepart> is used.
See also B<--cat>.

View file

@ -0,0 +1,10 @@
#!/bin/bash
# Simple jobs that never fails
# Each should be taking 1-3s and be possible to run in parallel
# I.e.: No race conditions, no logins
cat <<'EOF' | sed -e 's/;$/; /;s/$SERVER1/'$SERVER1'/;s/$SERVER2/'$SERVER2'/' | stdout parallel -vj0 -k --joblog /tmp/jl-`basename $0` -L1
echo '### Test --fifo under csh'
csh -c "seq 3000000 | parallel -k --pipe --fifo 'sleep .{#};cat {}|wc -c ; false; echo \$status; false'"; echo exit $?
EOF

View file

@ -7,7 +7,7 @@ par_tmux_filter() {
export -f par_tmux_filter
par_tmux() {
(stdout parallel --timeout 3 --tmux --delay .3 echo '{}{=$_="\\"x$_=}'; echo $?) | par_tmux_filter
(stdout parallel --timeout 3 --tmux --delay .4 echo '{}{=$_="\\"x$_=}'; echo $?) | par_tmux_filter
}
export -f par_tmux
cat <<'EOF' | sed -e 's/;$/; /;s/$SERVER1/'$SERVER1'/;s/$SERVER2/'$SERVER2'/' | stdout parallel -vj0 --timeout 60 --retries 2 -k --joblog /tmp/jl-`basename $0` -L1

View file

@ -0,0 +1,48 @@
echo '### Test --fifo under csh'
### Test --fifo under csh
csh -c "seq 3000000 | parallel -k --pipe --fifo 'sleep .{#};cat {}|wc -c ; false; echo \$status; false'"; echo exit $?
1048571
1
1048579
1
1048572
1
1048579
1
1048579
1
1048572
1
1048580
1
1048576
1
1048576
1
1048576
1
1048576
1
1048576
1
1048576
1
1048576
1
1048576
1
1048576
1
1048576
1
1048576
1
1048576
1
1048576
1
1048576
1
868800
1
exit 22