From 94ce0d8f9ffae26c727a77f2668212027dd9d3f6 Mon Sep 17 00:00:00 2001 From: Ole Tange Date: Sat, 22 Mar 2014 12:38:45 +0100 Subject: [PATCH] parallel: Fix --line-buffer problem with missing \n for last record. Passes testsuite. --- src/optional/R/load-parallel-results.r | 125 ---------------------- src/parallel | 22 ++-- src/parallel.texi | 2 +- testsuite/tests-to-run/niceload03.sh | 5 +- testsuite/tests-to-run/parallel-local9.sh | 38 +++++-- testsuite/wanted-results/niceload03 | 11 +- testsuite/wanted-results/parallel-local5 | 8 +- testsuite/wanted-results/parallel-local9 | 17 +++ 8 files changed, 73 insertions(+), 155 deletions(-) delete mode 100644 src/optional/R/load-parallel-results.r diff --git a/src/optional/R/load-parallel-results.r b/src/optional/R/load-parallel-results.r deleted file mode 100644 index 15f70586..00000000 --- a/src/optional/R/load-parallel-results.r +++ /dev/null @@ -1,125 +0,0 @@ -## Copyright (C) 2014 Ole Tange, David Rosenberg, and Free Software -## Foundation, Inc. -## -## This program is free software; you can redistribute it and/or modify -## it under the terms of the GNU General Public License as published by -## the Free Software Foundation; either version 3 of the License, or -## (at your option) any later version. -## -## This program is distributed in the hope that it will be useful, but -## WITHOUT ANY WARRANTY; without even the implied warranty of -## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU -## General Public License for more details. -## -## You should have received a copy of the GNU General Public License -## along with this program; if not, see -## or write to the Free Software Foundation, Inc., 51 Franklin St, -## Fifth Floor, Boston, MA 02110-1301 USA -## -## -## LIBRARY FOR READING GNU PARALLEL RESULTS -## -## Example: -## parallel --results my/results/dir --header : \ -## 'printf "FOO={foo}\\tBAR={bar}\\n";paste <(seq {bar}) <(seq {bar} -1 1)' \ -## :::: <(echo foo; seq 100) <(echo bar; seq 10) -## -## dir="my/results/dir" -## filenametable <- load_parallel_results_filenames(dir); -## raw <- load_parallel_results_raw(filenametable) -## newlines <- load_parallel_results_split_on_newline(filenametable) -## rawdt <- raw_to_data.table(raw) -## rawdf <- raw_to_data.frame(raw) - -load_parallel_results_filenames <- function(resdir) { - ## Find files called .../stdout - stdoutnames <- list.files(path=resdir, pattern="stdout", recursive=T); - ## Find files called .../stderr - stderrnames <- list.files(path=resdir, pattern="stderr", recursive=T); - if(length(stdoutnames) == 0) { - ## Return empty data frame if no files found - return(data.frame()); - } - ## The argument names are every other dir level - ## The argument values are every other dir level - ## e.g. my/results/dir/age/18/chromosome/20/stdout - m <- matrix(unlist(strsplit(stdoutnames, "/")),nrow = length(stdoutnames),byrow=T); - filenametable <- as.table(m[,c(F,T)]); - ## Append the stdout and stderr filenames - filenametable <- cbind(filenametable, - paste(resdir,unlist(stdoutnames),sep="/"), - paste(resdir,unlist(stderrnames),sep="/")); - colnames(filenametable) <- c(strsplit(stdoutnames[1],"/")[[1]][c(T,F)],"stderr"); - return(filenametable); -} - -load_parallel_results_raw <- function(filenametable) { - ## Read the files given in column stdout - stdoutcontents <- - lapply(filenametable[,c("stdout")], - function(filename) { - return(readChar(filename, file.info(filename)$size)); - } ); - ## Read the files given in column stderr - stderrcontents <- - lapply(filenametable[,c("stderr")], - function(filename) { - return(readChar(filename, file.info(filename)$size)); - } ); - ## Replace filenames with file contents - filenametable[,c("stdout","stderr")] <- - c(as.character(stdoutcontents),as.character(stderrcontents)); - return(filenametable); -} - -load_parallel_results_split_on_newline <- function(filenametable,split="\n") { - raw <- load_parallel_results_raw(filenametable); - ## Keep all columns except stdout and stderr - varnames = setdiff(colnames(raw), c("stdout","stderr")) - ## Find the id of the non-stdout and non-stderr columns - header_cols = which(colnames(raw) %in% varnames) - ## Split stdout on \n - splits = strsplit(raw[,"stdout"], split) - ## Compute lengths of all the lines - lens = sapply(splits, length) - ## The arguments should be repeated as many times as there are lines - reps = rep(1:nrow(raw), lens) - ## Merge the repeating argument and the lines into a matrix - m = cbind(raw[reps, header_cols], unlist(splits)) - return(m) -} - -raw_to_data.table <- function(raw, ...) { - require(data.table) - ## Keep all columns except stdout and stderr - varnames = setdiff(colnames(raw), c("stdout","stderr")) - ## after data.table feature request the as.data.frame can be skipped - ## and will thus be much faster - ddt = as.data.table(as.data.frame(raw,stringsAsFactors=FALSE)) - ## ensure fread knows stdout is string and not filename by appending \n - ddt[, stdout := paste0(stdout,"\n")] - ## drop files with empty stdout - ddd = ddt[nchar(stdout)>1,fread(stdout, header=FALSE, ...), by=varnames] - return(ddd) -} - -raw_to_data.frame <- function(raw, ...) { - require(plyr) - ## Convert to data.frame without factors - raw = as.data.frame(raw, stringsAsFactors = FALSE) - ## Keep all columns except stdout and stderr - varnames = setdiff(colnames(raw), c("stdout","stderr")) - - dd = ddply(raw, .variables=varnames, function(row) { - ## Ignore empty stdouts - if (nchar(row[,"stdout"]) == 0) { - return(NULL) - } - ## Read stdout with read.table - con <- textConnection(row[,"stdout"], open = "r") - d = read.table(con, header=FALSE, ...) - return(d) - }) - - return(dd) -} diff --git a/src/parallel b/src/parallel index 766771a5..0621e6fc 100755 --- a/src/parallel +++ b/src/parallel @@ -316,8 +316,10 @@ sub spreadstdin { if(not $anything_written and not eof($in)) { # Nothing was written - maybe the block size < record size? # Increase blocksize exponentially + my $old_blocksize = $blocksize; $blocksize = ceil($blocksize * 1.3 + 1); - ::warning("A full record was not matched in a block. Increasing to --blocksize ".$blocksize."\n"); + ::warning("A record was longer than $old_blocksize. " . + "Increasing to --blocksize $blocksize\n"); } } # If there is anything left in the buffer write it @@ -626,7 +628,7 @@ sub get_options_from_array { sub parse_options { # Returns: N/A # Defaults: - $Global::version = 20140222; + $Global::version = 20140315; $Global::progname = 'parallel'; $Global::infinity = 2**31; $Global::debug = 0; @@ -5025,16 +5027,8 @@ sub print { # Remove the printed part substr($$partial,0,$i+1)=""; } - } if(defined $self->{'exitstatus'}) { - # The job is finished, there might be line with no \n left - print $out_fd $$partial; - } - - if($self->fh($fdno,"rpid") and CORE::kill 0, $self->fh($fdno,"rpid")) { - # decompress still running - } elsif(defined $self->{'exitstatus'}) { # If the job is dead: print the remaining partial line # read remaining if($$partial and ($opt::tag or defined $opt::tagstring)) { @@ -5044,8 +5038,12 @@ sub print { print $out_fd $$partial; # Release the memory $$partial = undef; - # then close fh - close $in_fh; + if($self->fh($fdno,"rpid") and CORE::kill 0, $self->fh($fdno,"rpid")) { + # decompress still running + } else { + # decompress done: close fh + close $in_fh; + } } } else { my $buf; diff --git a/src/parallel.texi b/src/parallel.texi index 836a66ab..5ed17c41 100644 --- a/src/parallel.texi +++ b/src/parallel.texi @@ -3328,7 +3328,7 @@ this. To see this in action try: parallel -kP4 -n1 grep 1 > out.par ::: a b c d e f echo a b c d e f | xargs -P4 -n1 grep 1 > out.xargs-unbuf echo a b c d e f | xargs -P4 -n1 grep --line-buffered 1 > out.xargs-linebuf - echo a b c d e f | xargs -n1 grep --line-buffered 1 > out.xargs-serial + echo a b c d e f | xargs -n1 grep 1 > out.xargs-serial ls -l out* md5sum out* @end verbatim diff --git a/testsuite/tests-to-run/niceload03.sh b/testsuite/tests-to-run/niceload03.sh index 0d05e490..745b759b 100644 --- a/testsuite/tests-to-run/niceload03.sh +++ b/testsuite/tests-to-run/niceload03.sh @@ -21,7 +21,7 @@ echo '### niceload with no arguments should give no output' echo '### Test -t and -s' niceload -v -t 1 -s 2 sleep 4.5 -echo 'bug #38908: niceload: Ctrl-C/TERM should resume jobs if using -p' +echo 'bug #38908: niceload: Ctrl-C/TERM should resume jobs if using -p - Order may change, but not output' # This should take 10 seconds to run + delay from niceload # niceload killed after 1 sec => The delay from niceload should be no more than 1 second stdout /usr/bin/time -f %e perl -e 'for(1..100) { select(undef, undef, undef, 0.1); } print "done\n"' | int & @@ -29,7 +29,8 @@ echo 'bug #38908: niceload: Ctrl-C/TERM should resume jobs if using -p' export A=$!; sleep 2; kill -s TERM $A; - wait + wait; + echo Finished echo 'bug #38908: niceload: Ctrl-C should resume jobs if using -p' # This should take 10 seconds to run + delay from niceload diff --git a/testsuite/tests-to-run/parallel-local9.sh b/testsuite/tests-to-run/parallel-local9.sh index ad3ec2d7..ff28f53e 100644 --- a/testsuite/tests-to-run/parallel-local9.sh +++ b/testsuite/tests-to-run/parallel-local9.sh @@ -11,25 +11,51 @@ echo 'bug #41613: --compress --line-buffer no newline'; echo 'bug #41613: --compress --line-buffer no --tagstring'; diff - <(perl -e 'for("x011".."x110"){print "$_\t", (map { rand } (1..100000)),"\n"}'| + <(nice perl -e 'for("x011".."x110"){print "$_\t", ("\n", map { rand } (1..100000)) }'| parallel -N10 -L1 --pipe -j6 --block 20M --compress pv -qL 3000000 | perl -pe 's/(....).*/$1/') - <(perl -e 'for("x011".."x110"){print "$_\t", (map { rand } (1..100000)),"\n"}'| + <(nice perl -e 'for("x011".."x110"){print "$_\t", ("\n", map { rand } (1..100000)) }'| parallel -N10 -L1 --pipe -j6 --block 20M --compress --line-buffer pv -qL 3000000 | perl -pe 's/(....).*/$1/') >/dev/null - || echo 'Good: --line-buffer matters' + || (echo 'Good: --line-buffer matters'; false) && echo 'Bad: --line-buffer not working' echo 'bug #41613: --compress --line-buffer with --tagstring'; diff - <(perl -e 'for("x011".."x110"){print "$_\t", (map { rand } (1..100000)),"\n"}'| + <(nice perl -e 'for("x011".."x110"){print "$_\t", ("\n", map { rand } (1..100000)) }'| parallel -N10 -L1 --pipe -j6 --block 20M --compress --tagstring {#} pv -qL 3000000 | perl -pe 's/(....).*/$1/') - <(perl -e 'for("x011".."x110"){print "$_\t", (map { rand } (1..100000)),"\n"}'| + <(nice perl -e 'for("x011".."x110"){print "$_\t", ("\n", map { rand } (1..100000)) }'| parallel -N10 -L1 --pipe -j6 --block 20M --compress --tagstring {#} --line-buffer pv -qL 3000000 | perl -pe 's/(....).*/$1/') >/dev/null - || echo 'Good: --line-buffer matters' + || (echo 'Good: --line-buffer matters'; false) && echo 'Bad: --line-buffer not working' + +echo 'bug #41613: --compress --line-buffer - no newline'; + echo 'pipe compress tagstring' + perl -e 'print "O"'| parallel --compress --tagstring {#} --pipe --line-buffer cat + echo "K" + echo 'pipe compress notagstring' + perl -e 'print "O"'| parallel --compress --pipe --line-buffer cat + echo "K" + echo 'pipe nocompress tagstring' + perl -e 'print "O"'| parallel --tagstring {#} --pipe --line-buffer cat + echo "K" + echo 'pipe nocompress notagstring' + perl -e 'print "O"'| parallel --pipe --line-buffer cat + echo "K" + echo 'nopipe compress tagstring' + parallel --compress --tagstring {#} --line-buffer echo {} O ::: -n + echo "K" + echo 'nopipe compress notagstring' + parallel --compress --line-buffer echo {} O ::: -n + echo "K" + echo 'nopipe nocompress tagstring' + parallel --tagstring {#} --line-buffer echo {} O ::: -n + echo "K" + echo 'nopipe nocompress notagstring' + parallel --line-buffer echo {} O ::: -n + echo "K" echo 'bug #41412: --timeout + --delay causes deadlock'; seq 10 | parallel -j10 --timeout 1 --delay .3 echo; diff --git a/testsuite/wanted-results/niceload03 b/testsuite/wanted-results/niceload03 index 623b5b03..6160319e 100644 --- a/testsuite/wanted-results/niceload03 +++ b/testsuite/wanted-results/niceload03 @@ -9,13 +9,14 @@ Sleeping 1s Running 2s Sleeping 1s Running 2s -bug #38908: niceload: Ctrl-C/TERM should resume jobs if using -p -Sleeping 1s -Running 10s +bug #38908: niceload: Ctrl-C/TERM should resume jobs if using -p - Order may change, but not output done 10 +Finished +Sleeping 1s +Running 10s bug #38908: niceload: Ctrl-C should resume jobs if using -p -Sleeping 1s -Running 10s done 10 +Sleeping 1s +Running 10s diff --git a/testsuite/wanted-results/parallel-local5 b/testsuite/wanted-results/parallel-local5 index ff223c7a..f1852c98 100644 --- a/testsuite/wanted-results/parallel-local5 +++ b/testsuite/wanted-results/parallel-local5 @@ -1,6 +1,6 @@ ### Test --pipe ### Test 200M records with too small block -parallel: Warning: A full record was not matched in a block. Increasing to --blocksize 260000001 +parallel: Warning: A record was longer than 200000000. Increasing to --blocksize 260000001 303111434 303111434 303111434 @@ -127,10 +127,10 @@ c 1>01a02a0a 2>0a12a34a 3>45a6a -parallel: Warning: A full record was not matched in a block. Increasing to --blocksize 3 -parallel: Warning: A full record was not matched in a block. Increasing to --blocksize 5 +parallel: Warning: A record was longer than 1. Increasing to --blocksize 3 +parallel: Warning: A record was longer than 3. Increasing to --blocksize 5 1>01a02a0a -parallel: Warning: A full record was not matched in a block. Increasing to --blocksize 8 +parallel: Warning: A record was longer than 5. Increasing to --blocksize 8 2>0a12a34a 3>45a6a ### Test 10M records with too big block diff --git a/testsuite/wanted-results/parallel-local9 b/testsuite/wanted-results/parallel-local9 index 77fd3fe5..596151bd 100644 --- a/testsuite/wanted-results/parallel-local9 +++ b/testsuite/wanted-results/parallel-local9 @@ -4,6 +4,23 @@ bug #41613: --compress --line-buffer no --tagstring Good: --line-buffer matters bug #41613: --compress --line-buffer with --tagstring Good: --line-buffer matters +bug #41613: --compress --line-buffer - no newline +pipe compress tagstring +1 OK +pipe compress notagstring +OK +pipe nocompress tagstring +1 OK +pipe nocompress notagstring +OK +nopipe compress tagstring +1 OK +nopipe compress notagstring +OK +nopipe nocompress tagstring +1 OK +nopipe nocompress notagstring +OK bug #41412: --timeout + --delay causes deadlock 1 2