parallel: 225MB/s with: parallel -j4 --block 100M --pipe --round wc -l

This commit is contained in:
Ole Tange 2013-07-24 15:25:40 +02:00
parent 7166c90f98
commit a73f2e3c4d
2 changed files with 69 additions and 34 deletions

View file

@ -436,15 +436,21 @@ sub spreadstdin {
close $fh; close $fh;
} }
my %incomplete_jobs = %Global::running; my %incomplete_jobs = %Global::running;
my $sleep = 1;
while(keys %incomplete_jobs) { while(keys %incomplete_jobs) {
my $something_written = 0;
for my $pid (keys %incomplete_jobs) { for my $pid (keys %incomplete_jobs) {
my $job = $incomplete_jobs{$pid}; my $job = $incomplete_jobs{$pid};
if(length $job->{'rest'} == 0) { if($job->stdin_buffer_length()) {
delete $incomplete_jobs{$pid} $something_written += $job->non_block_write();
} else { } else {
$job->non_block_write(); delete $incomplete_jobs{$pid}
} }
} }
if($something_written) {
$sleep = $sleep/2;
}
$sleep = ::reap_usleep($sleep);
} }
} }
} }
@ -465,16 +471,16 @@ sub nindex {
} }
sub round_robin_write { sub round_robin_write {
my $block = shift; my ($header_ref,$block_ref,$recstart,$recend,$endpos) = @_;
# $Global::total_running
my $something_written = 0; my $something_written = 0;
while($block) { my $block_passed = 0;
while(not $block_passed) {
while(my ($pid,$job) = each %Global::running) { while(my ($pid,$job) = each %Global::running) {
if(length $job->{'rest'} > 0) { if($job->stdin_buffer_length() > 0) {
$something_written += $job->non_block_write(); $something_written += $job->non_block_write();
} else { } else {
$job->{'rest'} = $block; $job->set_stdin_buffer($block_ref,$endpos,$recstart,$recend);
$block = ""; $block_passed = 1;
$job->set_virgin(0); $job->set_virgin(0);
$something_written += $job->non_block_write(); $something_written += $job->non_block_write();
last; last;
@ -484,6 +490,7 @@ sub round_robin_write {
# http://docstore.mik.ua/orelly/perl/cookbook/ch07_15.htm # http://docstore.mik.ua/orelly/perl/cookbook/ch07_15.htm
start_more_jobs(); start_more_jobs();
return $something_written;
} }
@ -494,10 +501,7 @@ sub write_record_to_pipe {
if($endpos == 0) { return 0; } if($endpos == 0) { return 0; }
if(vec($Global::job_already_run,$chunk_number,1)) { return 1; } if(vec($Global::job_already_run,$chunk_number,1)) { return 1; }
if($opt::roundrobin) { if($opt::roundrobin) {
my $block = $$record_ref; return round_robin_write($header_ref,$record_ref,$recstart,$recend,$endpos);
substr($block,$endpos,length $block) = "";
round_robin_write($block);
return;
} }
# If no virgin found, backoff # If no virgin found, backoff
my $sleep = 0.0001; # 0.01 ms - better performance on highend my $sleep = 0.0001; # 0.01 ms - better performance on highend
@ -519,10 +523,7 @@ sub write_record_to_pipe {
substr($$record_ref,$endpos,length $$record_ref) = ""; substr($$record_ref,$endpos,length $$record_ref) = "";
# Remove rec_sep # Remove rec_sep
if($opt::remove_rec_sep) { if($opt::remove_rec_sep) {
# Remove record separator remove_rec_sep($record_ref,$recstart,$recend);
$$record_ref =~ s/$recend$recstart//gos;
$$record_ref =~ s/^$recstart//os;
$$record_ref =~ s/$recend$//os;
} }
$job->write($header_ref); $job->write($header_ref);
$job->write($record_ref); $job->write($record_ref);
@ -3688,28 +3689,62 @@ sub write {
syswrite($in,$$remaining_ref); syswrite($in,$$remaining_ref);
} }
sub set_stdin_buffer {
my $self = shift;
my ($block_ref,$endpos,$recstart,$recend) = @_;
$self->{'stdin_buffer'} = substr($$block_ref,0,$endpos);
if($opt::remove_rec_sep) {
remove_rec_sep(\$self->{'stdin_buffer'},$recstart,$recend);
}
$self->{'stdin_buffer_length'} = length $self->{'stdin_buffer'};
$self->{'stdin_buffer_pos'} = 0;
}
sub stdin_buffer_length {
my $self = shift;
return $self->{'stdin_buffer_length'};
}
sub remove_rec_sep {
my ($block_ref,$recstart,$recend) = @_;
# Remove record separator
$$block_ref =~ s/$recend$recstart//gos;
$$block_ref =~ s/^$recstart//os;
$$block_ref =~ s/$recend$//os;
}
sub non_block_write { sub non_block_write {
my $self = shift; my $self = shift;
my $something_written = 0; my $something_written = 0;
use POSIX qw(:errno_h); use POSIX qw(:errno_h);
use Fcntl;
my $in = $self->{'stdin'}; my $flags = '';
my $rv = syswrite($in, $self->{'rest'});
::debug("Wrote $rv of :".$self->{'rest'}.":\n");
if (!defined($rv) && $! == EAGAIN) { for my $buf (substr($self->{'stdin_buffer'},$self->{'stdin_buffer_pos'})) {
# would block my $in = $self->{'stdin'};
$something_written = 0; # fcntl($in, F_GETFL, $flags)
} elsif ($rv != length $self->{'rest'}) { # or die "Couldn't get flags for HANDLE : $!\n";
# incomplete write # $flags |= O_NONBLOCK;
# Remove the written part # fcntl($in, F_SETFL, $flags)
substr($self->{'rest'},0,$rv) = ""; # or die "Couldn't set flags for HANDLE: $!\n";
$something_written = 1; my $rv = syswrite($in, $buf);
} else { if (!defined($rv) && $! == EAGAIN) {
# successfully wrote # would block
substr($self->{'rest'},0,$rv) = ""; $something_written = 0;
$something_written = 1; } elsif ($self->{'stdin_buffer_pos'}+$rv != $self->{'stdin_buffer_length'}) {
# incomplete write
# Remove the written part
$self->{'stdin_buffer_pos'} += $rv;
$something_written = $rv;
} else {
# successfully wrote everything
my $a="";
$self->set_stdin_buffer(\$a,0,"","");
$something_written = $rv;
}
} }
::debug("Rest :".$self->{'rest'}.":\n");
::debug("Non-block: $something_written"); ::debug("Non-block: $something_written");
return $something_written; return $something_written;
} }

View file

@ -295,7 +295,7 @@ B<sql -s '\t' :myalias 'SELECT * FROM foo;' | parallel --colsep '\t' do_stuff {4
=head2 Retry if the connection fails =head2 Retry if the connection fails
If the access to the database fails occationally B<--retries> can help If the access to the database fails occasionally B<--retries> can help
make sure the query succeeds: make sure the query succeeds:
B<sql --retries 5 :myalias 'SELECT * FROM really_big_foo;'> B<sql --retries 5 :myalias 'SELECT * FROM really_big_foo;'>