Small optimization for large --blocksize for --pipe.

Fixed bug in FreeBSD cpu/core detection.
This commit is contained in:
Ole Tange 2011-01-27 00:29:28 +01:00
parent 6f78fafc20
commit c9d95b0627
3 changed files with 43 additions and 38 deletions

View file

@ -1,3 +1,8 @@
test if block size is too small to match a record sep in both -N mode and normal
--remove-rec-sep --remove-rec-sep
test alternation with --recend 'a|b' test alternation with --recend 'a|b'
echo 12a34a45a6 | src/parallel -k --pipe --recend a -N1 --rrs 'echo -n "$PARALLEL_SEQ>"; cat; echo' echo 12a34a45a6 | src/parallel -k --pipe --recend a -N1 --rrs 'echo -n "$PARALLEL_SEQ>"; cat; echo'

View file

@ -110,46 +110,38 @@ sub spreadstdin {
$recstart = "(?:".$recstart.")"; $recstart = "(?:".$recstart.")";
$recend = "(?:".$recend.")"; $recend = "(?:".$recend.")";
while(read(STDIN,$buf,$::opt_blocksize)) { while(read(STDIN,substr($buf,length $buf,0),$::opt_blocksize)) {
$record = $partial.$buf; # substr above = append to $buf
if($Global::max_number_of_args) { if($Global::max_number_of_args) {
# -N => (start..*?end){n} # -N => (start..*?end){n}
while($record =~ s/(($recstart.*?$recend){$Global::max_number_of_args})($recstart.*)$/$1/os) { while($buf =~ s/((?:$recstart.*?$recend){$Global::max_number_of_args})($recstart.*)$/$2/os) {
$partial = $3; $record = $1;
::debug("Read record: ".length($record)."\n"); ::debug("Read record -N: ".length($record)."\n");
if($::opt_remove_rec_sep) { if($::opt_remove_rec_sep) {
# Remove record separator # Remove record separator
$record =~ s/$recend$recstart//gos;
$record =~ s/^$recstart//os; $record =~ s/^$recstart//os;
$record =~ s/$recend$//os; $record =~ s/$recend$//os;
$record =~ s/$recstart$recend//gos;
} }
write_record_to_pipe($record); write_record_to_pipe(\$record);
$record = $partial;
}
if(eof STDIN) {
# There is no partial record at the end of file
write_record_to_pipe($record);
} }
} else { } else {
if($record =~ s/(.*$recend)($recstart.*?)$/$1/os) { if($buf =~ s/(.*$recend)($recstart.*?)$/$2/os) {
$partial = $2; $record = $1;
} else {
print $Global::original_stderr $recerror,"\n";
}
if($::opt_remove_rec_sep) { if($::opt_remove_rec_sep) {
# Remove record separator # Remove record separator
$record =~ s/$recend$recstart//gos;
$record =~ s/^$recstart//os; $record =~ s/^$recstart//os;
$record =~ s/$recend$//os; $record =~ s/$recend$//os;
$record =~ s/$recstart$recend//gos;
} }
::debug("Read record: ".length($record)."\n"); ::debug("Read record: ".length($record)."\n");
write_record_to_pipe($record); write_record_to_pipe(\$record);
if(eof STDIN) {
# There is no partial record at the end of file
write_record_to_pipe($partial);
} }
} }
} }
# If there is anything left in the buffer write it
write_record_to_pipe(\$buf);
::debug("Done reading STDIN\n"); ::debug("Done reading STDIN\n");
flush_and_close_pipes(); flush_and_close_pipes();
::debug("Done flushing to children\n"); ::debug("Done flushing to children\n");
@ -182,7 +174,7 @@ sub flush_and_close_pipes {
} }
sub write_record_to_pipe { sub write_record_to_pipe {
my $record = shift; my $record_ref = shift;
my $sleep = 0.1; # 0.1 ms my $sleep = 0.1; # 0.1 ms
write_record: while(1) { write_record: while(1) {
# Sorting according to sequence is necessary for -k to work # Sorting according to sequence is necessary for -k to work
@ -191,7 +183,10 @@ sub write_record_to_pipe {
::debug("Looking at ",$job->seq(),"-",$job->remaining(),"-",$job->datawritten(),"\n"); ::debug("Looking at ",$job->seq(),"-",$job->remaining(),"-",$job->datawritten(),"\n");
if($job->remaining()) { if($job->remaining()) {
# Part of the job's last record has not finished being written # Part of the job's last record has not finished being written
$job->complete_write(); if($job->complete_write()) {
# Something got written
$sleep = 0.1;
}
} else { } else {
if($job->datawritten() > 0) { if($job->datawritten() > 0) {
# There is no data remaining and we have written data before: # There is no data remaining and we have written data before:
@ -201,7 +196,7 @@ sub write_record_to_pipe {
my $fh = $job->stdin(); my $fh = $job->stdin();
close $fh; close $fh;
} else { } else {
$job->write($record); $job->write($record_ref);
$sleep = 0.1; $sleep = 0.1;
last write_record; last write_record;
} }
@ -2127,6 +2122,7 @@ sub no_of_cpus_freebsd {
(`sysctl -a dev.cpu | grep \%parent | awk '{ print \$2 }' | uniq | wc -l | awk '{ print \$1 }'` (`sysctl -a dev.cpu | grep \%parent | awk '{ print \$2 }' | uniq | wc -l | awk '{ print \$1 }'`
or or
`sysctl hw.ncpu 2>/dev/null | awk '{ print \$2 }'`); `sysctl hw.ncpu 2>/dev/null | awk '{ print \$2 }'`);
chomp $no_of_cpus;
return $no_of_cpus; return $no_of_cpus;
} }
@ -2137,6 +2133,7 @@ sub no_of_cores_freebsd {
(`sysctl hw.ncpu 2>/dev/null | awk '{ print \$2 }'` (`sysctl hw.ncpu 2>/dev/null | awk '{ print \$2 }'`
or or
`sysctl -a hw 2>/dev/null | grep -w logicalcpu | awk '{ print \$2 }'`); `sysctl -a hw 2>/dev/null | grep -w logicalcpu | awk '{ print \$2 }'`);
chomp $no_of_cores;
return $no_of_cores; return $no_of_cores;
} }
@ -2425,14 +2422,16 @@ sub set_stdin {
sub write { sub write {
my $self = shift; my $self = shift;
my $remaining = shift; my $remaining_ref = shift;
if(length($remaining)) { if(length($$remaining_ref)) {
$self->{'remaining'} .= $remaining; $self->{'remaining'} .= $$remaining_ref;
$self->complete_write(); $self->complete_write();
} }
} }
sub complete_write { sub complete_write {
# Returns:
# number of bytes written (see syswrite)
my $self = shift; my $self = shift;
my $in = $self->{'stdin'}; my $in = $self->{'stdin'};
my $len = syswrite($in,$self->{'remaining'}); my $len = syswrite($in,$self->{'remaining'});
@ -2444,6 +2443,7 @@ sub complete_write {
substr($self->{'remaining'},0,$len) = ""; substr($self->{'remaining'},0,$len) = "";
$self->{'datawritten'} += $len; $self->{'datawritten'} += $len;
} }
return $len;
} }
sub remaining { sub remaining {

View file

@ -17,16 +17,16 @@
### Test --rrs --recend single ### Test --rrs --recend single
1>123445 1>123445
2>6 2>6
### Test --rrs -N1 --recend alternate ### Test --rrs -N1 --recend alternate
1>123445 1>123445
2>6 2>6
### Test --rrs -N1 --recend single ### Test --rrs -N1 --recend single
1>12a34 1>12a34
2>45a6 2>45a6
### Test -N even ### Test -N even
1 1
2 2
@ -84,7 +84,7 @@ ole
ole ole
ole ole
### Test --recstart + --recend ### Test --recstart + --recend
0d53a19cdc880668a7c745794dcafbf1 - 3c20e43c58152da30261c5827a1f9084 -
### Race condition bug - 1 - would block ### Race condition bug - 1 - would block
### Race condition bug - 2 - would block ### Race condition bug - 2 - would block
### Test --block size=1 ### Test --block size=1