mirror of
https://git.savannah.gnu.org/git/parallel.git
synced 2024-11-26 07:57:58 +00:00
Fixed bug #44358: 2 GB records cause problems for -N.
This commit is contained in:
parent
c445232b23
commit
2dee8e452b
109
src/parallel
109
src/parallel
|
@ -251,7 +251,7 @@ sub find_split_positions {
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
# If match $recend$recstart => Record position
|
# If match $recend$recstart => Record position
|
||||||
my $i = index($buf,$recendrecstart);
|
my $i = index64(\$buf,$recendrecstart);
|
||||||
if($i != -1) {
|
if($i != -1) {
|
||||||
push(@pos,$pos+$i);
|
push(@pos,$pos+$i);
|
||||||
# Start looking for next record _after_ this match
|
# Start looking for next record _after_ this match
|
||||||
|
@ -301,8 +301,8 @@ sub spreadstdin {
|
||||||
my $recendrecstart = $recend.$recstart;
|
my $recendrecstart = $recend.$recstart;
|
||||||
my $chunk_number = 1;
|
my $chunk_number = 1;
|
||||||
my $one_time_through;
|
my $one_time_through;
|
||||||
my $two_gb = (2<<30)-1;
|
my $two_gb = (1<<31)-1;
|
||||||
my $blocksize = ::min($opt::blocksize,$two_gb);
|
my $blocksize = $opt::blocksize;
|
||||||
my $in = *STDIN;
|
my $in = *STDIN;
|
||||||
my $header = find_header(\$buf,$in);
|
my $header = find_header(\$buf,$in);
|
||||||
while(1) {
|
while(1) {
|
||||||
|
@ -323,10 +323,10 @@ sub spreadstdin {
|
||||||
if($Global::max_lines and not $Global::max_number_of_args) {
|
if($Global::max_lines and not $Global::max_number_of_args) {
|
||||||
# Read n-line records
|
# Read n-line records
|
||||||
my $n_lines = $buf =~ tr/\n/\n/;
|
my $n_lines = $buf =~ tr/\n/\n/;
|
||||||
my $last_newline_pos = rindex($buf,"\n");
|
my $last_newline_pos = rindex64(\$buf,"\n");
|
||||||
while($n_lines % $Global::max_lines) {
|
while($n_lines % $Global::max_lines) {
|
||||||
$n_lines--;
|
$n_lines--;
|
||||||
$last_newline_pos = rindex($buf,"\n",$last_newline_pos-1);
|
$last_newline_pos = rindex64(\$buf,"\n",$last_newline_pos-1);
|
||||||
}
|
}
|
||||||
# Chop at $last_newline_pos as that is where n-line record ends
|
# Chop at $last_newline_pos as that is where n-line record ends
|
||||||
$anything_written +=
|
$anything_written +=
|
||||||
|
@ -368,19 +368,8 @@ sub spreadstdin {
|
||||||
substr($buf,0,$i) = "";
|
substr($buf,0,$i) = "";
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
my $i;
|
|
||||||
if(length $buf < $two_gb) {
|
|
||||||
# Find the last recend+recstart in $buf
|
# Find the last recend+recstart in $buf
|
||||||
$i = rindex($buf,$recendrecstart);
|
my $i = rindex64(\$buf,$recendrecstart);
|
||||||
} else {
|
|
||||||
# Find the last recend+recstart in the last 2 GB of $buf
|
|
||||||
# rindex does not work on > 2GB
|
|
||||||
my $over2gb = (length $buf)-$two_gb;
|
|
||||||
$i = rindex(substr($buf,$over2gb,$two_gb),$recendrecstart);
|
|
||||||
if($i != -1) {
|
|
||||||
$i += $over2gb;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if($i != -1) {
|
if($i != -1) {
|
||||||
$i += length $recend; # find the actual splitting location
|
$i += length $recend; # find the actual splitting location
|
||||||
$anything_written +=
|
$anything_written +=
|
||||||
|
@ -471,8 +460,9 @@ sub nindex {
|
||||||
# the position where the Nth copy is found
|
# the position where the Nth copy is found
|
||||||
my ($buf_ref, $str, $n) = @_;
|
my ($buf_ref, $str, $n) = @_;
|
||||||
my $i = 0;
|
my $i = 0;
|
||||||
|
my $two_gb = (1<<31)-1;
|
||||||
for(1..$n) {
|
for(1..$n) {
|
||||||
$i = index($$buf_ref,$str,$i+1);
|
$i = index64($buf_ref,$str,$i+1);
|
||||||
if($i == -1) { last }
|
if($i == -1) { last }
|
||||||
}
|
}
|
||||||
return $i;
|
return $i;
|
||||||
|
@ -520,6 +510,79 @@ sub nindex {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
sub index64 {
|
||||||
|
# Do index on strings > 2GB.
|
||||||
|
# index in Perl < v5.22 does not work for > 2GB
|
||||||
|
# Input:
|
||||||
|
# as index
|
||||||
|
# Output:
|
||||||
|
# as index
|
||||||
|
my $ref = shift;
|
||||||
|
my $match = shift;
|
||||||
|
my $pos = shift || 0;
|
||||||
|
my $block_size = (1<<31)-1;
|
||||||
|
my $strlen = length($$ref);
|
||||||
|
# No point in doing extra work if we don't need to.
|
||||||
|
if($strlen < $block_size) {
|
||||||
|
return index($$ref, $match, $pos);
|
||||||
|
}
|
||||||
|
|
||||||
|
my $matchlen = length($match);
|
||||||
|
my $ret;
|
||||||
|
my $offset = $pos;
|
||||||
|
while($offset < $strlen) {
|
||||||
|
$ret = index(
|
||||||
|
substr($$ref, $offset, $block_size),
|
||||||
|
$match, $pos-$offset);
|
||||||
|
if($ret != -1) {
|
||||||
|
return $ret + $offset;
|
||||||
|
}
|
||||||
|
$offset += ($block_size - $matchlen - 1);
|
||||||
|
}
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
sub rindex64 {
|
||||||
|
# Do rindex on strings > 2GB.
|
||||||
|
# rindex in Perl < v5.22 does not work for > 2GB
|
||||||
|
# Input:
|
||||||
|
# as rindex
|
||||||
|
# Output:
|
||||||
|
# as rindex
|
||||||
|
my $ref = shift;
|
||||||
|
my $match = shift;
|
||||||
|
my $pos = shift;
|
||||||
|
my $block_size = (1<<31)-1;
|
||||||
|
my $strlen = length($$ref);
|
||||||
|
# Default: search from end
|
||||||
|
$pos = defined $pos ? $pos : $strlen;
|
||||||
|
# No point in doing extra work if we don't need to.
|
||||||
|
if($strlen < $block_size) {
|
||||||
|
return rindex($$ref, $match, $pos);
|
||||||
|
}
|
||||||
|
|
||||||
|
my $matchlen = length($match);
|
||||||
|
my $ret;
|
||||||
|
my $offset = $pos - $block_size + $matchlen;
|
||||||
|
if($offset < 0) {
|
||||||
|
# The offset is less than a $block_size
|
||||||
|
# Set the $offset to 0 and
|
||||||
|
# Adjust block_size accordingly
|
||||||
|
$block_size = $block_size + $offset;
|
||||||
|
$offset = 0;
|
||||||
|
}
|
||||||
|
while($offset >= 0) {
|
||||||
|
$ret = rindex(
|
||||||
|
substr($$ref, $offset, $block_size),
|
||||||
|
$match);
|
||||||
|
if($ret != -1) {
|
||||||
|
return $ret + $offset;
|
||||||
|
}
|
||||||
|
$offset -= ($block_size - $matchlen - 1);
|
||||||
|
}
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
sub write_record_to_pipe {
|
sub write_record_to_pipe {
|
||||||
# Fork then
|
# Fork then
|
||||||
# Write record from pos 0 .. $endpos to pipe
|
# Write record from pos 0 .. $endpos to pipe
|
||||||
|
@ -831,6 +894,10 @@ sub parse_options {
|
||||||
not defined $opt::recend) { $opt::recend = "\n"; }
|
not defined $opt::recend) { $opt::recend = "\n"; }
|
||||||
if(not defined $opt::blocksize) { $opt::blocksize = "1M"; }
|
if(not defined $opt::blocksize) { $opt::blocksize = "1M"; }
|
||||||
$opt::blocksize = multiply_binary_prefix($opt::blocksize);
|
$opt::blocksize = multiply_binary_prefix($opt::blocksize);
|
||||||
|
if($opt::blocksize > (1<<31)-1) {
|
||||||
|
warning("--blocksize >= 2G causes problems. Using 2G-1\n");
|
||||||
|
$opt::blocksize = (1<<31)-1;
|
||||||
|
}
|
||||||
$opt::memfree = multiply_binary_prefix($opt::memfree);
|
$opt::memfree = multiply_binary_prefix($opt::memfree);
|
||||||
if(defined $opt::controlmaster) { $opt::noctrlc = 1; }
|
if(defined $opt::controlmaster) { $opt::noctrlc = 1; }
|
||||||
if(defined $opt::halt and
|
if(defined $opt::halt and
|
||||||
|
@ -6771,7 +6838,7 @@ sub linebuffer_print {
|
||||||
while(read($in_fh,substr($$partial,length $$partial),3276800)) {
|
while(read($in_fh,substr($$partial,length $$partial),3276800)) {
|
||||||
# Append to $$partial
|
# Append to $$partial
|
||||||
# Find the last \n
|
# Find the last \n
|
||||||
my $i = rindex($$partial,"\n");
|
my $i = ::rindex64($partial,"\n");
|
||||||
if($i != -1) {
|
if($i != -1) {
|
||||||
# One or more complete lines were found
|
# One or more complete lines were found
|
||||||
if($fdno == 2 and not $self->{'printed_first_line',$fdno}++) {
|
if($fdno == 2 and not $self->{'printed_first_line',$fdno}++) {
|
||||||
|
@ -6779,14 +6846,14 @@ sub linebuffer_print {
|
||||||
# This is a crappy way of ignoring it.
|
# This is a crappy way of ignoring it.
|
||||||
$$partial =~ s/^(client_process_control: )?tcgetattr: Invalid argument\n//;
|
$$partial =~ s/^(client_process_control: )?tcgetattr: Invalid argument\n//;
|
||||||
# Length of partial line has changed: Find the last \n again
|
# Length of partial line has changed: Find the last \n again
|
||||||
$i = rindex($$partial,"\n");
|
$i = ::rindex64($partial,"\n");
|
||||||
}
|
}
|
||||||
if($opt::tag or defined $opt::tagstring) {
|
if($opt::tag or defined $opt::tagstring) {
|
||||||
# Replace ^ with $tag within the full line
|
# Replace ^ with $tag within the full line
|
||||||
my $tag = $self->tag();
|
my $tag = $self->tag();
|
||||||
substr($$partial,0,$i+1) =~ s/^/$tag/gm;
|
substr($$partial,0,$i+1) =~ s/^/$tag/gm;
|
||||||
# Length of partial line has changed: Find the last \n again
|
# Length of partial line has changed: Find the last \n again
|
||||||
$i = rindex($$partial,"\n");
|
$i = ::rindex64($partial,"\n");
|
||||||
}
|
}
|
||||||
# Print up to and including the last \n
|
# Print up to and including the last \n
|
||||||
print $out_fd substr($$partial,0,$i+1);
|
print $out_fd substr($$partial,0,$i+1);
|
||||||
|
|
|
@ -1,6 +1,12 @@
|
||||||
#!/bin/bash
|
#!/bin/bash
|
||||||
|
|
||||||
cat <<'EOF' | sed -e 's/;$/; /;s/$SERVER1/'$SERVER1'/;s/$SERVER2/'$SERVER2'/' | stdout parallel -vj0 -k --joblog /tmp/jl-`basename $0` -L1
|
cat <<'EOF' | sed -e 's/;$/; /;s/$SERVER1/'$SERVER1'/;s/$SERVER2/'$SERVER2'/' | stdout parallel -vj0 -k --joblog /tmp/jl-`basename $0` -L1
|
||||||
|
echo '### bug #44358: 2 GB records cause problems for -N2'
|
||||||
|
(yes "`echo {1..100}`" | head -c 5000000000; echo FOO;
|
||||||
|
yes "`echo {1..100}`" | head -c 3000000000; echo FOO;
|
||||||
|
yes "`echo {1..100}`" | head -c 1000000000;) |
|
||||||
|
parallel --pipe --recend FOO'\n' --block 1g -k LANG=c wc -c
|
||||||
|
|
||||||
echo "### --line-buffer"
|
echo "### --line-buffer"
|
||||||
seq 10 | parallel -j20 --line-buffer 'seq {} 10 | pv -qL 10' > /tmp/parallel_l$$;
|
seq 10 | parallel -j20 --line-buffer 'seq {} 10 | pv -qL 10' > /tmp/parallel_l$$;
|
||||||
seq 10 | parallel -j20 'seq {} 10 | pv -qL 10' > /tmp/parallel_$$;
|
seq 10 | parallel -j20 'seq {} 10 | pv -qL 10' > /tmp/parallel_$$;
|
||||||
|
|
|
@ -29,6 +29,12 @@ ls | parallel -kv rm -- {.}/abc-{.}-{} 2>&1
|
||||||
|
|
||||||
# -L1 will join lines ending in ' '
|
# -L1 will join lines ending in ' '
|
||||||
cat <<'EOF' | sed -e s/\$SERVER1/$SERVER1/\;s/\$SERVER2/$SERVER2/ | nice parallel -vj0 -k -L1
|
cat <<'EOF' | sed -e s/\$SERVER1/$SERVER1/\;s/\$SERVER2/$SERVER2/ | nice parallel -vj0 -k -L1
|
||||||
|
echo '### bug #44358: 2 GB records cause problems for -N2'
|
||||||
|
(yes "`echo {1..100}`" | head -c 5000000000; echo FOO;
|
||||||
|
yes "`echo {1..100}`" | head -c 3000000000; echo FOO;
|
||||||
|
yes "`echo {1..100}`" | head -c 1000000000;) |
|
||||||
|
parallel --pipe --recend FOO'\n' -N2 --block 1g -k LANG=c wc -c
|
||||||
|
|
||||||
echo '### Test compress'
|
echo '### Test compress'
|
||||||
seq 5 | parallel -j2 --tag --compress 'seq {} | pv -q -L 10'
|
seq 5 | parallel -j2 --tag --compress 'seq {} | pv -q -L 10'
|
||||||
|
|
||||||
|
|
|
@ -6,6 +6,12 @@ seq 1 1000000 >/tmp/parallel-seq
|
||||||
shuf --random-source=/tmp/parallel-seq /tmp/parallel-seq >/tmp/blocktest
|
shuf --random-source=/tmp/parallel-seq /tmp/parallel-seq >/tmp/blocktest
|
||||||
|
|
||||||
cat <<'EOF' | sed -e s/\$SERVER1/$SERVER1/\;s/\$SERVER2/$SERVER2/ | parallel -vj2 -k --joblog /tmp/jl-`basename $0` -L1
|
cat <<'EOF' | sed -e s/\$SERVER1/$SERVER1/\;s/\$SERVER2/$SERVER2/ | parallel -vj2 -k --joblog /tmp/jl-`basename $0` -L1
|
||||||
|
echo '### -L >4GB'
|
||||||
|
(head -c 5000000000 /dev/zero; echo FOO;
|
||||||
|
head -c 3000000000 /dev/zero; echo FOO;
|
||||||
|
head -c 1000000000 /dev/zero;) |
|
||||||
|
parallel --pipe -L2 --block 1g -k LANG=c wc -c
|
||||||
|
|
||||||
echo '### Test 200M records with too small block';
|
echo '### Test 200M records with too small block';
|
||||||
(
|
(
|
||||||
echo start;
|
echo start;
|
||||||
|
|
|
@ -1,3 +1,12 @@
|
||||||
|
echo '### bug #44358: 2 GB records cause problems for -N2'
|
||||||
|
### bug #44358: 2 GB records cause problems for -N2
|
||||||
|
(yes "`echo {1..100}`" | head -c 5000000000; echo FOO; yes "`echo {1..100}`" | head -c 3000000000; echo FOO; yes "`echo {1..100}`" | head -c 1000000000;) | parallel --pipe --recend FOO'\n' --block 1g -k LANG=c wc -c
|
||||||
|
5000000004
|
||||||
|
3000000004
|
||||||
|
1000000000
|
||||||
|
parallel: Warning: A record was longer than 1000000000. Increasing to --blocksize 1300000001
|
||||||
|
parallel: Warning: A record was longer than 1300000001. Increasing to --blocksize 1690000003
|
||||||
|
parallel: Warning: A record was longer than 1690000003. Increasing to --blocksize 2147483647
|
||||||
echo "### --line-buffer"
|
echo "### --line-buffer"
|
||||||
### --line-buffer
|
### --line-buffer
|
||||||
seq 10 | parallel -j20 --line-buffer 'seq {} 10 | pv -qL 10' > /tmp/parallel_l$$; seq 10 | parallel -j20 'seq {} 10 | pv -qL 10' > /tmp/parallel_$$; cat /tmp/parallel_l$$ | wc; diff /tmp/parallel_$$ /tmp/parallel_l$$ >/dev/null ; echo These must diff: $?; rm /tmp/parallel_l$$ /tmp/parallel_$$
|
seq 10 | parallel -j20 --line-buffer 'seq {} 10 | pv -qL 10' > /tmp/parallel_l$$; seq 10 | parallel -j20 'seq {} 10 | pv -qL 10' > /tmp/parallel_$$; cat /tmp/parallel_l$$ | wc; diff /tmp/parallel_$$ /tmp/parallel_l$$ >/dev/null ; echo These must diff: $?; rm /tmp/parallel_l$$ /tmp/parallel_$$
|
||||||
|
|
|
@ -55,6 +55,11 @@ rm -- 2-col/abc-2-col-2-col.txt
|
||||||
rm -- a/abc-a-a
|
rm -- a/abc-a-a
|
||||||
rm -- b/abc-b-b
|
rm -- b/abc-b-b
|
||||||
rm -- \ä\¸\\å\›\½\ \(Zh\Å\<5C>nggu\Ã\³\)/abc-\ä\¸\\å\›\½\ \(Zh\Å\<5C>nggu\Ã\³\)-\ä\¸\\å\›\½\ \(Zh\Å\<5C>nggu\Ã\³\)
|
rm -- \ä\¸\\å\›\½\ \(Zh\Å\<5C>nggu\Ã\³\)/abc-\ä\¸\\å\›\½\ \(Zh\Å\<5C>nggu\Ã\³\)-\ä\¸\\å\›\½\ \(Zh\Å\<5C>nggu\Ã\³\)
|
||||||
|
echo '### bug #44358: 2 GB records cause problems for -N2'
|
||||||
|
### bug #44358: 2 GB records cause problems for -N2
|
||||||
|
(yes "`echo {1..100}`" | head -c 5000000000; echo FOO; yes "`echo {1..100}`" | head -c 3000000000; echo FOO; yes "`echo {1..100}`" | head -c 1000000000;) | parallel --pipe --recend FOO'\n' -N2 --block 1g -k LANG=c wc -c
|
||||||
|
8000000008
|
||||||
|
1000000000
|
||||||
echo '### Test compress'
|
echo '### Test compress'
|
||||||
### Test compress
|
### Test compress
|
||||||
seq 5 | parallel -j2 --tag --compress 'seq {} | pv -q -L 10'
|
seq 5 | parallel -j2 --tag --compress 'seq {} | pv -q -L 10'
|
||||||
|
|
|
@ -1,4 +1,9 @@
|
||||||
### Test --pipe
|
### Test --pipe
|
||||||
|
echo '### -L >4GB'
|
||||||
|
### -L >4GB
|
||||||
|
(head -c 5000000000 /dev/zero; echo FOO; head -c 3000000000 /dev/zero; echo FOO; head -c 1000000000 /dev/zero;) | parallel --pipe -L2 --block 1g -k LANG=c wc -c
|
||||||
|
8000000008
|
||||||
|
1000000000
|
||||||
echo '### Test 200M records with too small block'; ( echo start; seq 1 44 | parallel -uj1 cat /tmp/blocktest\;true; echo end; echo start; seq 1 44 | parallel -uj1 cat /tmp/blocktest\;true; echo end; echo start; seq 1 44 | parallel -uj1 cat /tmp/blocktest\;true; echo end; ) | stdout parallel -k --block 200m -j2 --pipe --recend 'end\n' wc -c | egrep -v '^0$'
|
echo '### Test 200M records with too small block'; ( echo start; seq 1 44 | parallel -uj1 cat /tmp/blocktest\;true; echo end; echo start; seq 1 44 | parallel -uj1 cat /tmp/blocktest\;true; echo end; echo start; seq 1 44 | parallel -uj1 cat /tmp/blocktest\;true; echo end; ) | stdout parallel -k --block 200m -j2 --pipe --recend 'end\n' wc -c | egrep -v '^0$'
|
||||||
### Test 200M records with too small block
|
### Test 200M records with too small block
|
||||||
parallel: Warning: A record was longer than 200000000. Increasing to --blocksize 260000001
|
parallel: Warning: A record was longer than 200000000. Increasing to --blocksize 260000001
|
||||||
|
|
Loading…
Reference in a new issue