parallel --pipe: Do not pipe 0-length input to children

This commit is contained in:
Ole Tange 2011-01-27 18:53:57 +01:00
parent c9d95b0627
commit b0afcf9fa4
3 changed files with 21 additions and 25 deletions

View file

@ -1,3 +1,6 @@
Use recsep as splitstring instead of match
my @vals = (split / /, $string)[0,2,5,7];
test if block size is too small to match a record sep in both -N mode and normal test if block size is too small to match a record sep in both -N mode and normal

View file

@ -80,15 +80,9 @@ if($::opt_halt_on_error) {
sub spreadstdin { sub spreadstdin {
# read a record # read a record
# print it to the first jobs that is ready # print it to the first jobs that is ready
my $first = 0;
my $second = 0;
my $sleep = 0.1; my $sleep = 0.1;
my $max_sleep = 0.1;
my $record; my $record;
my $partial = "";
my $buf = ""; my $buf = "";
my $rec_start = undef;
my $rec_end = undef;
my ($recstart,$recend,$recerror); my ($recstart,$recend,$recerror);
if(defined($::opt_recstart) and defined($::opt_recend)) { if(defined($::opt_recstart) and defined($::opt_recend)) {
# If both --recstart and --recend is given then both must match # If both --recstart and --recend is given then both must match
@ -117,30 +111,19 @@ sub spreadstdin {
while($buf =~ s/((?:$recstart.*?$recend){$Global::max_number_of_args})($recstart.*)$/$2/os) { while($buf =~ s/((?:$recstart.*?$recend){$Global::max_number_of_args})($recstart.*)$/$2/os) {
$record = $1; $record = $1;
::debug("Read record -N: ".length($record)."\n"); ::debug("Read record -N: ".length($record)."\n");
if($::opt_remove_rec_sep) { write_record_to_pipe(\$record,$recstart,$recend);
# Remove record separator
$record =~ s/$recend$recstart//gos;
$record =~ s/^$recstart//os;
$record =~ s/$recend$//os;
}
write_record_to_pipe(\$record);
} }
} else { } else {
# Find the last recend-recstart in $buf
if($buf =~ s/(.*$recend)($recstart.*?)$/$2/os) { if($buf =~ s/(.*$recend)($recstart.*?)$/$2/os) {
$record = $1; $record = $1;
if($::opt_remove_rec_sep) { ::debug("Matched record: ".length($record)."/".length($buf)."\n");
# Remove record separator write_record_to_pipe(\$record,$recstart,$recend);
$record =~ s/$recend$recstart//gos;
$record =~ s/^$recstart//os;
$record =~ s/$recend$//os;
}
::debug("Read record: ".length($record)."\n");
write_record_to_pipe(\$record);
} }
} }
} }
# If there is anything left in the buffer write it # If there is anything left in the buffer write it
write_record_to_pipe(\$buf); write_record_to_pipe(\$buf,$recstart,$recend);
::debug("Done reading STDIN\n"); ::debug("Done reading STDIN\n");
flush_and_close_pipes(); flush_and_close_pipes();
@ -157,7 +140,10 @@ sub flush_and_close_pipes {
do_not_reap(); do_not_reap();
for my $job (values %Global::running) { for my $job (values %Global::running) {
if($job->remaining()) { if($job->remaining()) {
$job->complete_write(); if($job->complete_write()) {
# Some data was written
$sleep = 0.1;
}
$flush_done = 0; $flush_done = 0;
} }
} }
@ -175,6 +161,15 @@ sub flush_and_close_pipes {
sub write_record_to_pipe { sub write_record_to_pipe {
my $record_ref = shift; my $record_ref = shift;
my $recstart = shift;
my $recend = shift;
if(length $$record_ref == 0) { return; }
if($::opt_remove_rec_sep) {
# Remove record separator
$$record_ref =~ s/$recend$recstart//gos;
$$record_ref =~ s/^$recstart//os;
$$record_ref =~ s/$recend$//os;
}
my $sleep = 0.1; # 0.1 ms my $sleep = 0.1; # 0.1 ms
write_record: while(1) { write_record: while(1) {
# Sorting according to sequence is necessary for -k to work # Sorting according to sequence is necessary for -k to work

View file

@ -81,8 +81,6 @@ ole
11 11
12 12
ole ole
ole
ole
### Test --recstart + --recend ### Test --recstart + --recend
3c20e43c58152da30261c5827a1f9084 - 3c20e43c58152da30261c5827a1f9084 -
### Race condition bug - 1 - would block ### Race condition bug - 1 - would block