parallel: Fix --line-buffer problem with missing \n for last record. Passes testsuite.

This commit is contained in:
Ole Tange 2014-03-22 12:38:45 +01:00
parent a3f11158b5
commit 94ce0d8f9f
8 changed files with 73 additions and 155 deletions

View file

@ -1,125 +0,0 @@
## Copyright (C) 2014 Ole Tange, David Rosenberg, and Free Software
## Foundation, Inc.
##
## This program is free software; you can redistribute it and/or modify
## it under the terms of the GNU General Public License as published by
## the Free Software Foundation; either version 3 of the License, or
## (at your option) any later version.
##
## This program is distributed in the hope that it will be useful, but
## WITHOUT ANY WARRANTY; without even the implied warranty of
## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
## General Public License for more details.
##
## You should have received a copy of the GNU General Public License
## along with this program; if not, see <http://www.gnu.org/licenses/>
## or write to the Free Software Foundation, Inc., 51 Franklin St,
## Fifth Floor, Boston, MA 02110-1301 USA
##
##
## LIBRARY FOR READING GNU PARALLEL RESULTS
##
## Example:
## parallel --results my/results/dir --header : \
## 'printf "FOO={foo}\\tBAR={bar}\\n";paste <(seq {bar}) <(seq {bar} -1 1)' \
## :::: <(echo foo; seq 100) <(echo bar; seq 10)
##
## dir="my/results/dir"
## filenametable <- load_parallel_results_filenames(dir);
## raw <- load_parallel_results_raw(filenametable)
## newlines <- load_parallel_results_split_on_newline(filenametable)
## rawdt <- raw_to_data.table(raw)
## rawdf <- raw_to_data.frame(raw)
load_parallel_results_filenames <- function(resdir) {
## Find files called .../stdout
stdoutnames <- list.files(path=resdir, pattern="stdout", recursive=T);
## Find files called .../stderr
stderrnames <- list.files(path=resdir, pattern="stderr", recursive=T);
if(length(stdoutnames) == 0) {
## Return empty data frame if no files found
return(data.frame());
}
## The argument names are every other dir level
## The argument values are every other dir level
## e.g. my/results/dir/age/18/chromosome/20/stdout
m <- matrix(unlist(strsplit(stdoutnames, "/")),nrow = length(stdoutnames),byrow=T);
filenametable <- as.table(m[,c(F,T)]);
## Append the stdout and stderr filenames
filenametable <- cbind(filenametable,
paste(resdir,unlist(stdoutnames),sep="/"),
paste(resdir,unlist(stderrnames),sep="/"));
colnames(filenametable) <- c(strsplit(stdoutnames[1],"/")[[1]][c(T,F)],"stderr");
return(filenametable);
}
load_parallel_results_raw <- function(filenametable) {
## Read the files given in column stdout
stdoutcontents <-
lapply(filenametable[,c("stdout")],
function(filename) {
return(readChar(filename, file.info(filename)$size));
} );
## Read the files given in column stderr
stderrcontents <-
lapply(filenametable[,c("stderr")],
function(filename) {
return(readChar(filename, file.info(filename)$size));
} );
## Replace filenames with file contents
filenametable[,c("stdout","stderr")] <-
c(as.character(stdoutcontents),as.character(stderrcontents));
return(filenametable);
}
load_parallel_results_split_on_newline <- function(filenametable,split="\n") {
raw <- load_parallel_results_raw(filenametable);
## Keep all columns except stdout and stderr
varnames = setdiff(colnames(raw), c("stdout","stderr"))
## Find the id of the non-stdout and non-stderr columns
header_cols = which(colnames(raw) %in% varnames)
## Split stdout on \n
splits = strsplit(raw[,"stdout"], split)
## Compute lengths of all the lines
lens = sapply(splits, length)
## The arguments should be repeated as many times as there are lines
reps = rep(1:nrow(raw), lens)
## Merge the repeating argument and the lines into a matrix
m = cbind(raw[reps, header_cols], unlist(splits))
return(m)
}
raw_to_data.table <- function(raw, ...) {
require(data.table)
## Keep all columns except stdout and stderr
varnames = setdiff(colnames(raw), c("stdout","stderr"))
## after data.table feature request the as.data.frame can be skipped
## and will thus be much faster
ddt = as.data.table(as.data.frame(raw,stringsAsFactors=FALSE))
## ensure fread knows stdout is string and not filename by appending \n
ddt[, stdout := paste0(stdout,"\n")]
## drop files with empty stdout
ddd = ddt[nchar(stdout)>1,fread(stdout, header=FALSE, ...), by=varnames]
return(ddd)
}
raw_to_data.frame <- function(raw, ...) {
require(plyr)
## Convert to data.frame without factors
raw = as.data.frame(raw, stringsAsFactors = FALSE)
## Keep all columns except stdout and stderr
varnames = setdiff(colnames(raw), c("stdout","stderr"))
dd = ddply(raw, .variables=varnames, function(row) {
## Ignore empty stdouts
if (nchar(row[,"stdout"]) == 0) {
return(NULL)
}
## Read stdout with read.table
con <- textConnection(row[,"stdout"], open = "r")
d = read.table(con, header=FALSE, ...)
return(d)
})
return(dd)
}

View file

@ -316,8 +316,10 @@ sub spreadstdin {
if(not $anything_written and not eof($in)) {
# Nothing was written - maybe the block size < record size?
# Increase blocksize exponentially
my $old_blocksize = $blocksize;
$blocksize = ceil($blocksize * 1.3 + 1);
::warning("A full record was not matched in a block. Increasing to --blocksize ".$blocksize."\n");
::warning("A record was longer than $old_blocksize. " .
"Increasing to --blocksize $blocksize\n");
}
}
# If there is anything left in the buffer write it
@ -626,7 +628,7 @@ sub get_options_from_array {
sub parse_options {
# Returns: N/A
# Defaults:
$Global::version = 20140222;
$Global::version = 20140315;
$Global::progname = 'parallel';
$Global::infinity = 2**31;
$Global::debug = 0;
@ -5025,16 +5027,8 @@ sub print {
# Remove the printed part
substr($$partial,0,$i+1)="";
}
}
if(defined $self->{'exitstatus'}) {
# The job is finished, there might be line with no \n left
print $out_fd $$partial;
}
if($self->fh($fdno,"rpid") and CORE::kill 0, $self->fh($fdno,"rpid")) {
# decompress still running
} elsif(defined $self->{'exitstatus'}) {
# If the job is dead: print the remaining partial line
# read remaining
if($$partial and ($opt::tag or defined $opt::tagstring)) {
@ -5044,8 +5038,12 @@ sub print {
print $out_fd $$partial;
# Release the memory
$$partial = undef;
# then close fh
close $in_fh;
if($self->fh($fdno,"rpid") and CORE::kill 0, $self->fh($fdno,"rpid")) {
# decompress still running
} else {
# decompress done: close fh
close $in_fh;
}
}
} else {
my $buf;

View file

@ -3328,7 +3328,7 @@ this. To see this in action try:
parallel -kP4 -n1 grep 1 > out.par ::: a b c d e f
echo a b c d e f | xargs -P4 -n1 grep 1 > out.xargs-unbuf
echo a b c d e f | xargs -P4 -n1 grep --line-buffered 1 > out.xargs-linebuf
echo a b c d e f | xargs -n1 grep --line-buffered 1 > out.xargs-serial
echo a b c d e f | xargs -n1 grep 1 > out.xargs-serial
ls -l out*
md5sum out*
@end verbatim

View file

@ -21,7 +21,7 @@ echo '### niceload with no arguments should give no output'
echo '### Test -t and -s'
niceload -v -t 1 -s 2 sleep 4.5
echo 'bug #38908: niceload: Ctrl-C/TERM should resume jobs if using -p'
echo 'bug #38908: niceload: Ctrl-C/TERM should resume jobs if using -p - Order may change, but not output'
# This should take 10 seconds to run + delay from niceload
# niceload killed after 1 sec => The delay from niceload should be no more than 1 second
stdout /usr/bin/time -f %e perl -e 'for(1..100) { select(undef, undef, undef, 0.1); } print "done\n"' | int &
@ -29,7 +29,8 @@ echo 'bug #38908: niceload: Ctrl-C/TERM should resume jobs if using -p'
export A=$!;
sleep 2;
kill -s TERM $A;
wait
wait;
echo Finished
echo 'bug #38908: niceload: Ctrl-C should resume jobs if using -p'
# This should take 10 seconds to run + delay from niceload

View file

@ -11,25 +11,51 @@ echo 'bug #41613: --compress --line-buffer no newline';
echo 'bug #41613: --compress --line-buffer no --tagstring';
diff
<(perl -e 'for("x011".."x110"){print "$_\t", (map { rand } (1..100000)),"\n"}'|
<(nice perl -e 'for("x011".."x110"){print "$_\t", ("\n", map { rand } (1..100000)) }'|
parallel -N10 -L1 --pipe -j6 --block 20M --compress
pv -qL 3000000 | perl -pe 's/(....).*/$1/')
<(perl -e 'for("x011".."x110"){print "$_\t", (map { rand } (1..100000)),"\n"}'|
<(nice perl -e 'for("x011".."x110"){print "$_\t", ("\n", map { rand } (1..100000)) }'|
parallel -N10 -L1 --pipe -j6 --block 20M --compress --line-buffer
pv -qL 3000000 | perl -pe 's/(....).*/$1/')
>/dev/null
|| echo 'Good: --line-buffer matters'
|| (echo 'Good: --line-buffer matters'; false) && echo 'Bad: --line-buffer not working'
echo 'bug #41613: --compress --line-buffer with --tagstring';
diff
<(perl -e 'for("x011".."x110"){print "$_\t", (map { rand } (1..100000)),"\n"}'|
<(nice perl -e 'for("x011".."x110"){print "$_\t", ("\n", map { rand } (1..100000)) }'|
parallel -N10 -L1 --pipe -j6 --block 20M --compress --tagstring {#}
pv -qL 3000000 | perl -pe 's/(....).*/$1/')
<(perl -e 'for("x011".."x110"){print "$_\t", (map { rand } (1..100000)),"\n"}'|
<(nice perl -e 'for("x011".."x110"){print "$_\t", ("\n", map { rand } (1..100000)) }'|
parallel -N10 -L1 --pipe -j6 --block 20M --compress --tagstring {#} --line-buffer
pv -qL 3000000 | perl -pe 's/(....).*/$1/')
>/dev/null
|| echo 'Good: --line-buffer matters'
|| (echo 'Good: --line-buffer matters'; false) && echo 'Bad: --line-buffer not working'
echo 'bug #41613: --compress --line-buffer - no newline';
echo 'pipe compress tagstring'
perl -e 'print "O"'| parallel --compress --tagstring {#} --pipe --line-buffer cat
echo "K"
echo 'pipe compress notagstring'
perl -e 'print "O"'| parallel --compress --pipe --line-buffer cat
echo "K"
echo 'pipe nocompress tagstring'
perl -e 'print "O"'| parallel --tagstring {#} --pipe --line-buffer cat
echo "K"
echo 'pipe nocompress notagstring'
perl -e 'print "O"'| parallel --pipe --line-buffer cat
echo "K"
echo 'nopipe compress tagstring'
parallel --compress --tagstring {#} --line-buffer echo {} O ::: -n
echo "K"
echo 'nopipe compress notagstring'
parallel --compress --line-buffer echo {} O ::: -n
echo "K"
echo 'nopipe nocompress tagstring'
parallel --tagstring {#} --line-buffer echo {} O ::: -n
echo "K"
echo 'nopipe nocompress notagstring'
parallel --line-buffer echo {} O ::: -n
echo "K"
echo 'bug #41412: --timeout + --delay causes deadlock';
seq 10 | parallel -j10 --timeout 1 --delay .3 echo;

View file

@ -9,13 +9,14 @@ Sleeping 1s
Running 2s
Sleeping 1s
Running 2s
bug #38908: niceload: Ctrl-C/TERM should resume jobs if using -p
Sleeping 1s
Running 10s
bug #38908: niceload: Ctrl-C/TERM should resume jobs if using -p - Order may change, but not output
done
10
Finished
Sleeping 1s
Running 10s
bug #38908: niceload: Ctrl-C should resume jobs if using -p
Sleeping 1s
Running 10s
done
10
Sleeping 1s
Running 10s

View file

@ -1,6 +1,6 @@
### Test --pipe
### Test 200M records with too small block
parallel: Warning: A full record was not matched in a block. Increasing to --blocksize 260000001
parallel: Warning: A record was longer than 200000000. Increasing to --blocksize 260000001
303111434
303111434
303111434
@ -127,10 +127,10 @@ c
1>01a02a0a
2>0a12a34a
3>45a6a
parallel: Warning: A full record was not matched in a block. Increasing to --blocksize 3
parallel: Warning: A full record was not matched in a block. Increasing to --blocksize 5
parallel: Warning: A record was longer than 1. Increasing to --blocksize 3
parallel: Warning: A record was longer than 3. Increasing to --blocksize 5
1>01a02a0a
parallel: Warning: A full record was not matched in a block. Increasing to --blocksize 8
parallel: Warning: A record was longer than 5. Increasing to --blocksize 8
2>0a12a34a
3>45a6a
### Test 10M records with too big block

View file

@ -4,6 +4,23 @@ bug #41613: --compress --line-buffer no --tagstring
Good: --line-buffer matters
bug #41613: --compress --line-buffer with --tagstring
Good: --line-buffer matters
bug #41613: --compress --line-buffer - no newline
pipe compress tagstring
1 OK
pipe compress notagstring
OK
pipe nocompress tagstring
1 OK
pipe nocompress notagstring
OK
nopipe compress tagstring
1 OK
nopipe compress notagstring
OK
nopipe nocompress tagstring
1 OK
nopipe nocompress notagstring
OK
bug #41412: --timeout + --delay causes deadlock
1
2