parallel: Increase --blocksize exponentially if it is < 1 record.

This commit is contained in:
Ole Tange 2013-02-05 22:17:36 +01:00
parent 9e0541c95a
commit 819c82de3c

View file

@ -283,22 +283,19 @@ sub spreadstdin {
} }
} }
} }
my ($recstart,$recend,$recerror); my ($recstart,$recend);
if(defined($opt::recstart) and defined($opt::recend)) { if(defined($opt::recstart) and defined($opt::recend)) {
# If both --recstart and --recend is given then both must match # If both --recstart and --recend is given then both must match
$recstart = $opt::recstart; $recstart = $opt::recstart;
$recend = $opt::recend; $recend = $opt::recend;
$recerror = "parallel: Warning: --recend and --recstart unmatched. Increasing --blocksize.";
} elsif(defined($opt::recstart)) { } elsif(defined($opt::recstart)) {
# If --recstart is given it must match start of record # If --recstart is given it must match start of record
$recstart = $opt::recstart; $recstart = $opt::recstart;
$recend = ""; $recend = "";
$recerror = "parallel: Warning: --recstart unmatched. Increasing --blocksize.";
} elsif(defined($opt::recend)) { } elsif(defined($opt::recend)) {
# If --recend is given then it must match end of record # If --recend is given then it must match end of record
$recstart = ""; $recstart = "";
$recend = $opt::recend; $recend = $opt::recend;
$recerror = "parallel: Warning: --recend unmatched. Increasing --blocksize.";
} }
if($opt::regexp) { if($opt::regexp) {
@ -313,10 +310,12 @@ sub spreadstdin {
my $recendrecstart = $recend.$recstart; my $recendrecstart = $recend.$recstart;
# Force the while-loop once if everything was read by header reading # Force the while-loop once if everything was read by header reading
my $force_one_time_through = 0; my $force_one_time_through = 0;
my $blocksize = $opt::blocksize;
for my $in (@fhlist) { for my $in (@fhlist) {
piperead: while(1) { piperead: while(1) {
my $anything_written = 0;
eof($in) and $force_one_time_through++ and last piperead; eof($in) and $force_one_time_through++ and last piperead;
read($in,substr($buf,length $buf,0),$opt::blocksize); read($in,substr($buf,length $buf,0),$blocksize);
if($opt::r) { if($opt::r) {
# Remove empty lines # Remove empty lines
@ -334,7 +333,7 @@ sub spreadstdin {
$last_newline_pos = rindex($buf,"\n",$last_newline_pos-1); $last_newline_pos = rindex($buf,"\n",$last_newline_pos-1);
} }
# Chop at $last_newline_pos as that is where n-line record ends # Chop at $last_newline_pos as that is where n-line record ends
write_record_to_pipe(\$header,\$buf,$recstart,$recend,$last_newline_pos+1); $anything_written += write_record_to_pipe(\$header,\$buf,$recstart,$recend,$last_newline_pos+1);
substr($buf,0,$last_newline_pos+1) = ""; substr($buf,0,$last_newline_pos+1) = "";
} elsif($opt::regexp) { } elsif($opt::regexp) {
if($Global::max_number_of_args) { if($Global::max_number_of_args) {
@ -342,12 +341,12 @@ sub spreadstdin {
# -L -N => (start..*?end){n*l} # -L -N => (start..*?end){n*l}
my $read_n_lines = $Global::max_number_of_args * ($Global::max_lines || 1); my $read_n_lines = $Global::max_number_of_args * ($Global::max_lines || 1);
while($buf =~ s/((?:$recstart.*?$recend){$read_n_lines})($recstart.*)$/$2/os) { while($buf =~ s/((?:$recstart.*?$recend){$read_n_lines})($recstart.*)$/$2/os) {
write_record_to_pipe(\$header,\$1,$recstart,$recend,length $1); $anything_written += write_record_to_pipe(\$header,\$1,$recstart,$recend,length $1);
} }
} else { } else {
# Find the last recend-recstart in $buf # Find the last recend-recstart in $buf
if($buf =~ s/(.*$recend)($recstart.*?)$/$2/os) { if($buf =~ s/(.*$recend)($recstart.*?)$/$2/os) {
write_record_to_pipe(\$header,\$1,$recstart,$recend,length $1); $anything_written += write_record_to_pipe(\$header,\$1,$recstart,$recend,length $1);
} }
} }
} else { } else {
@ -357,7 +356,7 @@ sub spreadstdin {
my $read_n_lines = $Global::max_number_of_args * ($Global::max_lines || 1); my $read_n_lines = $Global::max_number_of_args * ($Global::max_lines || 1);
while(($i = nindex(\$buf,$recendrecstart,$read_n_lines)) != -1) { while(($i = nindex(\$buf,$recendrecstart,$read_n_lines)) != -1) {
$i += length $recend; # find the actual splitting location $i += length $recend; # find the actual splitting location
write_record_to_pipe(\$header,\$buf,$recstart,$recend,$i); $anything_written += write_record_to_pipe(\$header,\$buf,$recstart,$recend,$i);
substr($buf,0,$i) = ""; substr($buf,0,$i) = "";
} }
} else { } else {
@ -365,14 +364,17 @@ sub spreadstdin {
my $i = rindex($buf,$recendrecstart); my $i = rindex($buf,$recendrecstart);
if($i != -1) { if($i != -1) {
$i += length $recend; # find the actual splitting location $i += length $recend; # find the actual splitting location
write_record_to_pipe(\$header,\$buf,$recstart,$recend,$i); $anything_written += write_record_to_pipe(\$header,\$buf,$recstart,$recend,$i);
substr($buf,0,$i) = ""; substr($buf,0,$i) = "";
} }
} }
} }
# If stuff not written: if(not $anything_written) {
# Warn $recerror if not already done # Nothing was written - maybe the block size < record size?
# blocksize *= 1.1 # Increase blocksize exponentially
$blocksize = ceil($blocksize * 1.3);
::warning("A full record was not matched in a block. Increasing to --blocksize ".$blocksize."\n");
}
} }
} }
@ -407,7 +409,7 @@ sub write_record_to_pipe {
my $recstart = shift; my $recstart = shift;
my $recend = shift; my $recend = shift;
my $endpos = shift; my $endpos = shift;
if(length $$record_ref == 0) { return; } if($endpos == 0) { return 0; }
# Find the minimal seq $job that has no data written == virgin # Find the minimal seq $job that has no data written == virgin
# If no virgin found, backoff # If no virgin found, backoff
my $sleep = 0.0001; # 0.01 ms - better performance on highend my $sleep = 0.0001; # 0.01 ms - better performance on highend
@ -424,23 +426,23 @@ sub write_record_to_pipe {
} else { } else {
# Chop of at $endpos as we do not know how many rec_sep will # Chop of at $endpos as we do not know how many rec_sep will
# be removed. # be removed.
my $record = substr($$record_ref,0,$endpos); substr($$record_ref,$endpos,length $$record_ref) = "";
# Remove rec_sep # Remove rec_sep
if($opt::remove_rec_sep) { if($opt::remove_rec_sep) {
# Remove record separator # Remove record separator
$record =~ s/$recend$recstart//gos; $$record_ref =~ s/$recend$recstart//gos;
$record =~ s/^$recstart//os; $$record_ref =~ s/^$recstart//os;
$record =~ s/$recend$//os; $$record_ref =~ s/$recend$//os;
} }
$job->write($header_ref); $job->write($header_ref);
$job->write(\$record); $job->write($record_ref);
my $fh = $job->stdin(); my $fh = $job->stdin();
close $fh; close $fh;
exit(0); exit(0);
} }
my $fh = $job->stdin(); my $fh = $job->stdin();
close $fh; close $fh;
return; return 1;
} }
sub __SEM_MODE__ {} sub __SEM_MODE__ {}