diff --git a/doc/FUTURE_IDEAS b/doc/FUTURE_IDEAS index 8d3ea651..a941a515 100644 --- a/doc/FUTURE_IDEAS +++ b/doc/FUTURE_IDEAS @@ -1,3 +1,8 @@ +test if block size is too small to match a record sep in both -N mode and normal + + + + --remove-rec-sep test alternation with --recend 'a|b' echo 12a34a45a6 | src/parallel -k --pipe --recend a -N1 --rrs 'echo -n "$PARALLEL_SEQ>"; cat; echo' diff --git a/src/parallel b/src/parallel index 99bdcd48..d4cd34e5 100755 --- a/src/parallel +++ b/src/parallel @@ -110,46 +110,38 @@ sub spreadstdin { $recstart = "(?:".$recstart.")"; $recend = "(?:".$recend.")"; - while(read(STDIN,$buf,$::opt_blocksize)) { - $record = $partial.$buf; + while(read(STDIN,substr($buf,length $buf,0),$::opt_blocksize)) { + # substr above = append to $buf if($Global::max_number_of_args) { # -N => (start..*?end){n} - while($record =~ s/(($recstart.*?$recend){$Global::max_number_of_args})($recstart.*)$/$1/os) { - $partial = $3; - ::debug("Read record: ".length($record)."\n"); + while($buf =~ s/((?:$recstart.*?$recend){$Global::max_number_of_args})($recstart.*)$/$2/os) { + $record = $1; + ::debug("Read record -N: ".length($record)."\n"); if($::opt_remove_rec_sep) { # Remove record separator + $record =~ s/$recend$recstart//gos; $record =~ s/^$recstart//os; $record =~ s/$recend$//os; - $record =~ s/$recstart$recend//gos; } - write_record_to_pipe($record); - $record = $partial; - } - if(eof STDIN) { - # There is no partial record at the end of file - write_record_to_pipe($record); + write_record_to_pipe(\$record); } } else { - if($record =~ s/(.*$recend)($recstart.*?)$/$1/os) { - $partial = $2; - } else { - print $Global::original_stderr $recerror,"\n"; - } - if($::opt_remove_rec_sep) { - # Remove record separator - $record =~ s/^$recstart//os; - $record =~ s/$recend$//os; - $record =~ s/$recstart$recend//gos; - } - ::debug("Read record: ".length($record)."\n"); - write_record_to_pipe($record); - if(eof STDIN) { - # There is no partial record at the end of file - write_record_to_pipe($partial); + if($buf =~ s/(.*$recend)($recstart.*?)$/$2/os) { + $record = $1; + if($::opt_remove_rec_sep) { + # Remove record separator + $record =~ s/$recend$recstart//gos; + $record =~ s/^$recstart//os; + $record =~ s/$recend$//os; + } + ::debug("Read record: ".length($record)."\n"); + write_record_to_pipe(\$record); } } } + # If there is anything left in the buffer write it + write_record_to_pipe(\$buf); + ::debug("Done reading STDIN\n"); flush_and_close_pipes(); ::debug("Done flushing to children\n"); @@ -182,7 +174,7 @@ sub flush_and_close_pipes { } sub write_record_to_pipe { - my $record = shift; + my $record_ref = shift; my $sleep = 0.1; # 0.1 ms write_record: while(1) { # Sorting according to sequence is necessary for -k to work @@ -191,7 +183,10 @@ sub write_record_to_pipe { ::debug("Looking at ",$job->seq(),"-",$job->remaining(),"-",$job->datawritten(),"\n"); if($job->remaining()) { # Part of the job's last record has not finished being written - $job->complete_write(); + if($job->complete_write()) { + # Something got written + $sleep = 0.1; + } } else { if($job->datawritten() > 0) { # There is no data remaining and we have written data before: @@ -201,7 +196,7 @@ sub write_record_to_pipe { my $fh = $job->stdin(); close $fh; } else { - $job->write($record); + $job->write($record_ref); $sleep = 0.1; last write_record; } @@ -2127,6 +2122,7 @@ sub no_of_cpus_freebsd { (`sysctl -a dev.cpu | grep \%parent | awk '{ print \$2 }' | uniq | wc -l | awk '{ print \$1 }'` or `sysctl hw.ncpu 2>/dev/null | awk '{ print \$2 }'`); + chomp $no_of_cpus; return $no_of_cpus; } @@ -2137,6 +2133,7 @@ sub no_of_cores_freebsd { (`sysctl hw.ncpu 2>/dev/null | awk '{ print \$2 }'` or `sysctl -a hw 2>/dev/null | grep -w logicalcpu | awk '{ print \$2 }'`); + chomp $no_of_cores; return $no_of_cores; } @@ -2425,14 +2422,16 @@ sub set_stdin { sub write { my $self = shift; - my $remaining = shift; - if(length($remaining)) { - $self->{'remaining'} .= $remaining; + my $remaining_ref = shift; + if(length($$remaining_ref)) { + $self->{'remaining'} .= $$remaining_ref; $self->complete_write(); } } sub complete_write { + # Returns: + # number of bytes written (see syswrite) my $self = shift; my $in = $self->{'stdin'}; my $len = syswrite($in,$self->{'remaining'}); @@ -2444,6 +2443,7 @@ sub complete_write { substr($self->{'remaining'},0,$len) = ""; $self->{'datawritten'} += $len; } + return $len; } sub remaining { diff --git a/testsuite/wanted-results/test51 b/testsuite/wanted-results/test51 index a9280d93..ac0f8f99 100644 --- a/testsuite/wanted-results/test51 +++ b/testsuite/wanted-results/test51 @@ -17,16 +17,16 @@ ### Test --rrs --recend single 1>123445 - 2>6 + ### Test --rrs -N1 --recend alternate 1>123445 - 2>6 + ### Test --rrs -N1 --recend single 1>12a34 - 2>45a6 + ### Test -N even 1 2 @@ -84,7 +84,7 @@ ole ole ole ### Test --recstart + --recend -0d53a19cdc880668a7c745794dcafbf1 - +3c20e43c58152da30261c5827a1f9084 - ### Race condition bug - 1 - would block ### Race condition bug - 2 - would block ### Test --block size=1