parallel: More efficient --pile -L. Passes testsuite-local.

This commit is contained in:
Ole Tange 2013-02-02 22:20:36 +01:00
parent edac400fdf
commit 193935f0e5
8 changed files with 136 additions and 31 deletions

View file

@ -316,24 +316,20 @@ sub spreadstdin {
for my $in (@fhlist) { for my $in (@fhlist) {
piperead: while(1) { piperead: while(1) {
eof($in) and $force_one_time_through++ and last piperead; eof($in) and $force_one_time_through++ and last piperead;
if($Global::max_lines) { # if(0 and $Global::max_lines and $Global::max_number_of_args) {
# Read $Global::max_number_of_args records of $Global::max_lines lines # # Read $Global::max_number_of_args records of $Global::max_lines lines
my @lines; # my @lines;
my $blocksize = length $buf; # my $read_n_lines = ($Global::max_number_of_args || 1) * $Global::max_lines;
my $read_n_lines = ($Global::max_number_of_args || 1) * $Global::max_lines; # for(my $t = 0; !eof($in) and $t < $read_n_lines; $t++) {
do { # my $l = <$in>;
for(my $t = 0; !eof($in) and $t < $read_n_lines; $t++) { # push @lines, $l;
my $l = <$in>; # }
push @lines, $l; # substr($buf,length $buf,0) = join("",@lines);
$blocksize += length($l); # } else {
}
} while($blocksize < $opt::blocksize and !eof($in));
substr($buf,length $buf,0) = join("",@lines);
} else {
# Read a block # Read a block
read($in,substr($buf,length $buf,0),$opt::blocksize); read($in,substr($buf,length $buf,0),$opt::blocksize);
# substr above = append to $buf # substr above = append to $buf
} # }
if($opt::r) { if($opt::r) {
# Remove empty lines # Remove empty lines
@ -342,10 +338,23 @@ sub spreadstdin {
next; next;
} }
} }
if($opt::regexp) { if($Global::max_lines and not $Global::max_number_of_args) {
# Read n-line records
my $n_lines = $buf=~tr/\n/\n/;
my $last_newline_pos = rindex($buf,"\n");
while($n_lines % $Global::max_lines) {
$n_lines--;
$last_newline_pos = rindex($buf,"\n",$last_newline_pos-1);
}
# Chop at $last_newline_pos
write_record_to_pipe(\$header,\$buf,$recstart,$recend,$last_newline_pos+1);
substr($buf,0,$last_newline_pos+1) = "";
} elsif($opt::regexp) {
if($Global::max_number_of_args) { if($Global::max_number_of_args) {
# -N => (start..*?end){n} # -N => (start..*?end){n}
while($buf =~ s/((?:$recstart.*?$recend){$Global::max_number_of_args})($recstart.*)$/$2/os) { # -L -N => (start..*?end){n*l}
my $read_n_lines = $Global::max_number_of_args * ($Global::max_lines || 1);
while($buf =~ s/((?:$recstart.*?$recend){$read_n_lines})($recstart.*)$/$2/os) {
write_record_to_pipe(\$header,\$1,$recstart,$recend,length $1); write_record_to_pipe(\$header,\$1,$recstart,$recend,length $1);
} }
} else { } else {
@ -358,7 +367,8 @@ sub spreadstdin {
if($Global::max_number_of_args) { if($Global::max_number_of_args) {
# -N => (start..*?end){n} # -N => (start..*?end){n}
my $i = 0; my $i = 0;
while(($i = nindex(\$buf,$recendrecstart,$Global::max_number_of_args)) != -1) { my $read_n_lines = $Global::max_number_of_args * ($Global::max_lines || 1);
while(($i = nindex(\$buf,$recendrecstart,$read_n_lines)) != -1) {
$i += length $recend; # find the actual splitting location $i += length $recend; # find the actual splitting location
write_record_to_pipe(\$header,\$buf,$recstart,$recend,$i); write_record_to_pipe(\$header,\$buf,$recstart,$recend,$i);
substr($buf,0,$i) = ""; substr($buf,0,$i) = "";

View file

@ -42,13 +42,13 @@ echo "bug #36657: --load does not work with custom ssh";
parallel --load=1000% -S "/usr/bin/ssh localhost" echo ::: OK parallel --load=1000% -S "/usr/bin/ssh localhost" echo ::: OK
echo "bug #34958: --pipe with record size measured in lines"; echo "bug #34958: --pipe with record size measured in lines";
seq 10 | parallel -k --pipe -L 4 cat\;echo FOO | uniq seq 10 | parallel -k --pipe -L 4 cat\;echo bug 34958-1
echo "bug #37325: Inefficiency of --pipe -L"; echo "bug #37325: Inefficiency of --pipe -L";
seq 2000 | parallel -k --pipe --block 1k -L 4 wc\;echo FOO | uniq seq 2000 | parallel -k --pipe --block 1k -L 4 wc\;echo FOO | uniq
echo "bug #34958: --pipe with record size measured in lines"; echo "bug #34958: --pipe with record size measured in lines";
seq 10 | parallel -k --pipe -l 4 cat\;echo FOO | uniq seq 10 | parallel -k --pipe -l 4 cat\;echo bug 34958-2
echo "### Test --results"; echo "### Test --results";
mkdir -p /tmp/parallel_results_test; mkdir -p /tmp/parallel_results_test;

View file

@ -45,7 +45,7 @@ echo "bug #37956: --colsep does not default to '\t' as specified in the man page
printf "A\tB\n1\tone" | parallel --header : echo {B} {A} printf "A\tB\n1\tone" | parallel --header : echo {B} {A}
echo '### Test --tollef' echo '### Test --tollef'
parallel -k --tollef echo -- 1 2 3 ::: a b c parallel -k --tollef echo -- 1 2 3 ::: a b c | sort
echo '### Test --tollef --gnu' echo '### Test --tollef --gnu'
parallel -k --tollef --gnu echo ::: 1 2 3 -- a b c parallel -k --tollef --gnu echo ::: 1 2 3 -- a b c

View file

@ -6,4 +6,18 @@ echo '### bug #36595: silent loss of input with --pipe and --sshlogin'
echo 'bug #36707: --controlmaster eats jobs' echo 'bug #36707: --controlmaster eats jobs'
seq 2 | parallel -k --controlmaster --sshlogin localhost echo OK{} seq 2 | parallel -k --controlmaster --sshlogin localhost echo OK{}
echo '### -L -n with pipe'
seq 14 | parallel --pipe -k -L 3 -n 2 'cat;echo 6 Ln line record'
echo '### -L -N with pipe'
seq 14 | parallel --pipe -k -L 3 -N 2 'cat;echo 6 LN line record'
echo '### -l -N with pipe'
seq 14 | parallel --pipe -k -l 3 -N 2 'cat;echo 6 lN line record'
echo '### -l -n with pipe'
seq 14 | parallel --pipe -k -l 3 -n 2 'cat;echo 6 ln line record'
EOF EOF

View file

@ -33,7 +33,7 @@ seq 1 9 | parallel -j2 -k -N 3 --pipe 'cat;echo iiiiiiiii'
seq 1 10 | parallel -j2 -k -N 3 --pipe 'cat;echo jjjjjjjjjj' seq 1 10 | parallel -j2 -k -N 3 --pipe 'cat;echo jjjjjjjjjj'
echo '### Test -l -N -L and -n with multiple jobslots and multiple args' echo '### Test -l -N -L and -n with multiple jobslots and multiple args'
seq 1 12 | parallel -kj20 -l 2 --block 5 --pipe "cat; echo a" seq 1 12 | parallel -kj20 -l 2 --block 8 --pipe "cat; echo a"
seq 1 5 | parallel -kj2 -N 2 --pipe "cat; echo b" seq 1 5 | parallel -kj2 -N 2 --pipe "cat; echo b"
seq 1 5 | parallel -kj2 -n 2 --pipe "cat; echo d" seq 1 5 | parallel -kj2 -n 2 --pipe "cat; echo d"

View file

@ -97,17 +97,18 @@ bug #34958: --pipe with record size measured in lines
6 6
7 7
8 8
bug 34958-1
9 9
10 10
FOO bug 34958-1
bug #37325: Inefficiency of --pipe -L bug #37325: Inefficiency of --pipe -L
280 280 1012 276 276 996
FOO
248 248 992
FOO FOO
252 252 1008 252 252 1008
FOO FOO
252 252 1008 244 244 997
FOO
244 244 1005
FOO FOO
200 200 1000 200 200 1000
FOO FOO
@ -117,22 +118,21 @@ FOO
FOO FOO
200 200 1000 200 200 1000
FOO FOO
172 172 860 180 180 900
FOO FOO
bug #34958: --pipe with record size measured in lines bug #34958: --pipe with record size measured in lines
1 1
2 2
3 3
4 4
FOO
5 5
6 6
7 7
8 8
FOO bug 34958-2
9 9
10 10
FOO bug 34958-2
### Test --results ### Test --results
I III I III
I IIII I IIII

View file

@ -7,3 +7,83 @@ bug #36707: --controlmaster eats jobs
seq 2 | parallel -k --controlmaster --sshlogin localhost echo OK{} seq 2 | parallel -k --controlmaster --sshlogin localhost echo OK{}
OK1 OK1
OK2 OK2
echo '### -L -n with pipe'
### -L -n with pipe
seq 14 | parallel --pipe -k -L 3 -n 2 'cat;echo 6 Ln line record'
1
2
3
4
5
6
6 Ln line record
7
8
9
10
11
12
6 Ln line record
13
14
6 Ln line record
echo '### -L -N with pipe'
### -L -N with pipe
seq 14 | parallel --pipe -k -L 3 -N 2 'cat;echo 6 LN line record'
1
2
3
4
5
6
6 LN line record
7
8
9
10
11
12
6 LN line record
13
14
6 LN line record
echo '### -l -N with pipe'
### -l -N with pipe
seq 14 | parallel --pipe -k -l 3 -N 2 'cat;echo 6 lN line record'
1
2
3
4
5
6
6 lN line record
7
8
9
10
11
12
6 lN line record
13
14
6 lN line record
echo '### -l -n with pipe'
### -l -n with pipe
seq 14 | parallel --pipe -k -l 3 -n 2 'cat;echo 6 ln line record'
1
2
3
4
5
6
6 ln line record
7
8
9
10
11
12
6 ln line record
13
14
6 ln line record

View file

@ -119,6 +119,7 @@ d
2 2
3 3
4 4
c
5 5
c c
### Test output is the same for different block size ### Test output is the same for different block size