From fd5622b2c6ca3cb6bb88351ffc188a06295c13db Mon Sep 17 00:00:00 2001 From: Ole Tange Date: Fri, 15 May 2015 17:25:19 +0200 Subject: [PATCH] parallel: --fifo works on csh. --- src/parallel | 87 ++++++++++++++++----- src/parallel.pod | 10 ++- testsuite/tests-to-run/parallel-local-1s.sh | 10 +++ testsuite/tests-to-run/parallel-local7.sh | 2 +- testsuite/wanted-results/parallel-local-1s | 48 ++++++++++++ 5 files changed, 133 insertions(+), 24 deletions(-) create mode 100644 testsuite/tests-to-run/parallel-local-1s.sh create mode 100644 testsuite/wanted-results/parallel-local-1s diff --git a/src/parallel b/src/parallel index c39285f2..20e55878 100755 --- a/src/parallel +++ b/src/parallel @@ -933,6 +933,14 @@ sub parse_options { ::error("--timeout must be seconds or percentage."); wait_and_exit(255); } + if(defined $opt::fifo and $opt::cat) { + ::error("--fifo cannot be combined with --cat."); + ::wait_and_exit(255); + } + if((defined $opt::fifo or defined $opt::cat) + and not $opt::pipepart) { + $opt::pipe = 1; + } if(defined $opt::minversion) { print $Global::version,"\n"; if($Global::version < $opt::minversion) { @@ -1062,7 +1070,7 @@ sub parse_options { sub init_globals { # Defaults: - $Global::version = 20150509; + $Global::version = 20150515; $Global::progname = 'parallel'; $Global::infinity = 2**31; $Global::debug = 0; @@ -4619,7 +4627,7 @@ sub compute_number_of_processes { # This is the child # The child takes one process slot # It will be killed later - $SIG{TERM} = $Global::original_sig{TERM}; + $SIG{'TERM'} = $Global::original_sig{'TERM'}; sleep 10000000; exit(0); } else { @@ -5704,7 +5712,8 @@ sub slot { sub cattail { # Returns: - # $cattail = perl program for: cattail "decompress program" writerpid [file_to_decompress or stdin] [file_to_unlink] + # $cattail = perl program for: + # cattail "decompress program" writerpid [file_to_decompress or stdin] [file_to_unlink] if(not $cattail) { $cattail = q{ # cat followed by tail (possibly with rm as soon at the file is opened) @@ -6284,6 +6293,46 @@ sub total_failed { } } +{ + my $script; + + sub fifo_wrap { + # Script to create a fifo, run a command on the fifo + # while copying STDIN to the fifo, and finally + # remove the fifo and return the exit code of the command. + if(not $script) { + # {} == $PARALLEL_TMP for --fifo + # To make it csh compatible a wrapper needs to: + # * mkfifo + # * spawn $command & + # * cat > fifo + # * waitpid to get the exit code from $command + # * be less than 1000 chars long + $script = "perl -e ". ::shell_quote_scalar + (::spacefree + (0, q{ + ($s,$c,$f) = @ARGV; + # mkfifo $PARALLEL_TMP + system "mkfifo", $f; + # spawn $shell -c $command & + $pid = fork || exec $s, "-c", $c; + open($o,">",$f) || die $!; + # cat > $PARALLEL_TMP + while(sysread(STDIN,$buf,32768)){ + syswrite $o, $buf; + } + close $o; + # waitpid to get the exit code from $command + waitpid $pid,0; + # Cleanup + unlink $f; + exit $?/256; + })); + } + return $script; + } +} + sub wrapped { # Wrap command with: # * --shellquote @@ -6337,29 +6386,27 @@ sub wrapped { } } if($opt::cat) { -# Append 'unlink {} without affecting $?' + # In '--cat' and '--fifo' {} == $PARALLEL_TMP. + # This is to make it possible to compute $PARALLEL_TMP on + # the fly when running remotely. + # $ENV{PARALLEL_TMP} is set in the remote wrapper before + # the command is run. + # + # Prepend 'cat > $PARALLEL_TMP;' + # Append 'unlink $PARALLEL_TMP without affecting $?' $command = - $self->{'commandline'}->replace_placeholders(["cat > \257<\257>; "], 0, 0). + 'cat > $PARALLEL_TMP;'. $command.";". postpone_exit_and_cleanup(). '$PARALLEL_TMP'; } elsif($opt::fifo) { # Prepend 'mkfifo {}; (' - # Append ') & _PID=$!; cat > {}; wait $_PID; ' - # (This makes it fail in csh, but give the correct exit code in bash) + # Append ') & cat > {}; wait; ' # Append 'unlink {} without affecting $?' - # Set $ENV{PARALLEL_TMP} when starting a job - # Set $ENV{PARALLEL_TMP} in the remote wrapper - # mkfifo $PARALLEL_TMP; - # {} = $PARALLEL_TMP; - # (...) & - # cat > $PARALLEL_TMP; wait \$_PID; cleanup $PARALLEL_TMP - # perl -e 'open($fifo,">",shift); while(read){print FIFO};unlink $fifo;waitpid($pid,0);exit $?' $! $PARALLEL_FIFO - $command = - "mkfifo \$PARALLEL_TMP\n (". - $command.";". - ') & _PID=$!; cat > $PARALLEL_TMP; wait $_PID; '. - postpone_exit_and_cleanup(). - '$PARALLEL_TMP'; + $command = fifo_wrap(). " ". + $Global::shell. " ". + ::shell_quote_scalar($command). + ' $PARALLEL_TMP'. + ';'; } # Wrap with ssh + tranferring of files $command = $self->sshlogin_wrap($command); diff --git a/src/parallel.pod b/src/parallel.pod index f6ec81d3..52e89b23 100644 --- a/src/parallel.pod +++ b/src/parallel.pod @@ -461,6 +461,8 @@ will give data to the program on stdin (standard input). With B<--cat> GNU B will create a temporary file with the name in {}, so you can do: B. +Implies B<--pipe> unless B<--pipepart> is used. + See also B<--fifo>. @@ -504,9 +506,9 @@ Compress temporary files. If the output is big and very compressible this will take up less disk space in $TMPDIR and possibly be faster due to less disk I/O. -GNU B will try B, B, B, B, -B, B, B, B, B in that order, and use the -first available. +GNU B will try B, B, B, B, +B, B, B, B, B, B, B in that +order, and use the first available. =item B<--compress-program> I @@ -654,6 +656,8 @@ with the name in {}, so you can do: B. Beware: If data is not read from the fifo, the job will block forever. +Implies B<--pipe> unless B<--pipepart> is used. + See also B<--cat>. diff --git a/testsuite/tests-to-run/parallel-local-1s.sh b/testsuite/tests-to-run/parallel-local-1s.sh new file mode 100644 index 00000000..a3b7bbbd --- /dev/null +++ b/testsuite/tests-to-run/parallel-local-1s.sh @@ -0,0 +1,10 @@ +#!/bin/bash + +# Simple jobs that never fails +# Each should be taking 1-3s and be possible to run in parallel +# I.e.: No race conditions, no logins +cat <<'EOF' | sed -e 's/;$/; /;s/$SERVER1/'$SERVER1'/;s/$SERVER2/'$SERVER2'/' | stdout parallel -vj0 -k --joblog /tmp/jl-`basename $0` -L1 +echo '### Test --fifo under csh' + csh -c "seq 3000000 | parallel -k --pipe --fifo 'sleep .{#};cat {}|wc -c ; false; echo \$status; false'"; echo exit $? + +EOF diff --git a/testsuite/tests-to-run/parallel-local7.sh b/testsuite/tests-to-run/parallel-local7.sh index 7ed68ead..ffc45f32 100755 --- a/testsuite/tests-to-run/parallel-local7.sh +++ b/testsuite/tests-to-run/parallel-local7.sh @@ -7,7 +7,7 @@ par_tmux_filter() { export -f par_tmux_filter par_tmux() { - (stdout parallel --timeout 3 --tmux --delay .3 echo '{}{=$_="\\"x$_=}'; echo $?) | par_tmux_filter + (stdout parallel --timeout 3 --tmux --delay .4 echo '{}{=$_="\\"x$_=}'; echo $?) | par_tmux_filter } export -f par_tmux cat <<'EOF' | sed -e 's/;$/; /;s/$SERVER1/'$SERVER1'/;s/$SERVER2/'$SERVER2'/' | stdout parallel -vj0 --timeout 60 --retries 2 -k --joblog /tmp/jl-`basename $0` -L1 diff --git a/testsuite/wanted-results/parallel-local-1s b/testsuite/wanted-results/parallel-local-1s new file mode 100644 index 00000000..21a881be --- /dev/null +++ b/testsuite/wanted-results/parallel-local-1s @@ -0,0 +1,48 @@ +echo '### Test --fifo under csh' +### Test --fifo under csh + csh -c "seq 3000000 | parallel -k --pipe --fifo 'sleep .{#};cat {}|wc -c ; false; echo \$status; false'"; echo exit $? +1048571 +1 +1048579 +1 +1048572 +1 +1048579 +1 +1048579 +1 +1048572 +1 +1048580 +1 +1048576 +1 +1048576 +1 +1048576 +1 +1048576 +1 +1048576 +1 +1048576 +1 +1048576 +1 +1048576 +1 +1048576 +1 +1048576 +1 +1048576 +1 +1048576 +1 +1048576 +1 +1048576 +1 +868800 +1 +exit 22