From 819c82de3c163b020463d65f059a839c5ac85d91 Mon Sep 17 00:00:00 2001 From: Ole Tange Date: Tue, 5 Feb 2013 22:17:36 +0100 Subject: [PATCH] parallel: Increase --blocksize exponentially if it is < 1 record. --- src/parallel | 42 ++++++++++++++++++++++-------------------- 1 file changed, 22 insertions(+), 20 deletions(-) diff --git a/src/parallel b/src/parallel index b074a0a3..639996d7 100755 --- a/src/parallel +++ b/src/parallel @@ -283,22 +283,19 @@ sub spreadstdin { } } } - my ($recstart,$recend,$recerror); + my ($recstart,$recend); if(defined($opt::recstart) and defined($opt::recend)) { # If both --recstart and --recend is given then both must match $recstart = $opt::recstart; $recend = $opt::recend; - $recerror = "parallel: Warning: --recend and --recstart unmatched. Increasing --blocksize."; } elsif(defined($opt::recstart)) { # If --recstart is given it must match start of record $recstart = $opt::recstart; $recend = ""; - $recerror = "parallel: Warning: --recstart unmatched. Increasing --blocksize."; } elsif(defined($opt::recend)) { # If --recend is given then it must match end of record $recstart = ""; $recend = $opt::recend; - $recerror = "parallel: Warning: --recend unmatched. Increasing --blocksize."; } if($opt::regexp) { @@ -313,10 +310,12 @@ sub spreadstdin { my $recendrecstart = $recend.$recstart; # Force the while-loop once if everything was read by header reading my $force_one_time_through = 0; + my $blocksize = $opt::blocksize; for my $in (@fhlist) { piperead: while(1) { + my $anything_written = 0; eof($in) and $force_one_time_through++ and last piperead; - read($in,substr($buf,length $buf,0),$opt::blocksize); + read($in,substr($buf,length $buf,0),$blocksize); if($opt::r) { # Remove empty lines @@ -334,7 +333,7 @@ sub spreadstdin { $last_newline_pos = rindex($buf,"\n",$last_newline_pos-1); } # Chop at $last_newline_pos as that is where n-line record ends - write_record_to_pipe(\$header,\$buf,$recstart,$recend,$last_newline_pos+1); + $anything_written += write_record_to_pipe(\$header,\$buf,$recstart,$recend,$last_newline_pos+1); substr($buf,0,$last_newline_pos+1) = ""; } elsif($opt::regexp) { if($Global::max_number_of_args) { @@ -342,12 +341,12 @@ sub spreadstdin { # -L -N => (start..*?end){n*l} my $read_n_lines = $Global::max_number_of_args * ($Global::max_lines || 1); while($buf =~ s/((?:$recstart.*?$recend){$read_n_lines})($recstart.*)$/$2/os) { - write_record_to_pipe(\$header,\$1,$recstart,$recend,length $1); + $anything_written += write_record_to_pipe(\$header,\$1,$recstart,$recend,length $1); } } else { # Find the last recend-recstart in $buf if($buf =~ s/(.*$recend)($recstart.*?)$/$2/os) { - write_record_to_pipe(\$header,\$1,$recstart,$recend,length $1); + $anything_written += write_record_to_pipe(\$header,\$1,$recstart,$recend,length $1); } } } else { @@ -357,7 +356,7 @@ sub spreadstdin { my $read_n_lines = $Global::max_number_of_args * ($Global::max_lines || 1); while(($i = nindex(\$buf,$recendrecstart,$read_n_lines)) != -1) { $i += length $recend; # find the actual splitting location - write_record_to_pipe(\$header,\$buf,$recstart,$recend,$i); + $anything_written += write_record_to_pipe(\$header,\$buf,$recstart,$recend,$i); substr($buf,0,$i) = ""; } } else { @@ -365,14 +364,17 @@ sub spreadstdin { my $i = rindex($buf,$recendrecstart); if($i != -1) { $i += length $recend; # find the actual splitting location - write_record_to_pipe(\$header,\$buf,$recstart,$recend,$i); + $anything_written += write_record_to_pipe(\$header,\$buf,$recstart,$recend,$i); substr($buf,0,$i) = ""; } } } - # If stuff not written: - # Warn $recerror if not already done - # blocksize *= 1.1 + if(not $anything_written) { + # Nothing was written - maybe the block size < record size? + # Increase blocksize exponentially + $blocksize = ceil($blocksize * 1.3); + ::warning("A full record was not matched in a block. Increasing to --blocksize ".$blocksize."\n"); + } } } @@ -407,7 +409,7 @@ sub write_record_to_pipe { my $recstart = shift; my $recend = shift; my $endpos = shift; - if(length $$record_ref == 0) { return; } + if($endpos == 0) { return 0; } # Find the minimal seq $job that has no data written == virgin # If no virgin found, backoff my $sleep = 0.0001; # 0.01 ms - better performance on highend @@ -424,23 +426,23 @@ sub write_record_to_pipe { } else { # Chop of at $endpos as we do not know how many rec_sep will # be removed. - my $record = substr($$record_ref,0,$endpos); + substr($$record_ref,$endpos,length $$record_ref) = ""; # Remove rec_sep if($opt::remove_rec_sep) { # Remove record separator - $record =~ s/$recend$recstart//gos; - $record =~ s/^$recstart//os; - $record =~ s/$recend$//os; + $$record_ref =~ s/$recend$recstart//gos; + $$record_ref =~ s/^$recstart//os; + $$record_ref =~ s/$recend$//os; } $job->write($header_ref); - $job->write(\$record); + $job->write($record_ref); my $fh = $job->stdin(); close $fh; exit(0); } my $fh = $job->stdin(); close $fh; - return; + return 1; } sub __SEM_MODE__ {}