From a73f2e3c4d5eb64796ea9dfccf9f1ea5e36c1b97 Mon Sep 17 00:00:00 2001 From: Ole Tange Date: Wed, 24 Jul 2013 15:25:40 +0200 Subject: [PATCH] parallel: 225MB/s with: parallel -j4 --block 100M --pipe --round wc -l --- src/parallel | 101 ++++++++++++++++++++++++++++++++++----------------- src/sql | 2 +- 2 files changed, 69 insertions(+), 34 deletions(-) diff --git a/src/parallel b/src/parallel index d4e31333..7a41f036 100755 --- a/src/parallel +++ b/src/parallel @@ -436,15 +436,21 @@ sub spreadstdin { close $fh; } my %incomplete_jobs = %Global::running; + my $sleep = 1; while(keys %incomplete_jobs) { + my $something_written = 0; for my $pid (keys %incomplete_jobs) { my $job = $incomplete_jobs{$pid}; - if(length $job->{'rest'} == 0) { - delete $incomplete_jobs{$pid} + if($job->stdin_buffer_length()) { + $something_written += $job->non_block_write(); } else { - $job->non_block_write(); + delete $incomplete_jobs{$pid} } } + if($something_written) { + $sleep = $sleep/2; + } + $sleep = ::reap_usleep($sleep); } } } @@ -465,16 +471,16 @@ sub nindex { } sub round_robin_write { - my $block = shift; - # $Global::total_running + my ($header_ref,$block_ref,$recstart,$recend,$endpos) = @_; my $something_written = 0; - while($block) { + my $block_passed = 0; + while(not $block_passed) { while(my ($pid,$job) = each %Global::running) { - if(length $job->{'rest'} > 0) { + if($job->stdin_buffer_length() > 0) { $something_written += $job->non_block_write(); } else { - $job->{'rest'} = $block; - $block = ""; + $job->set_stdin_buffer($block_ref,$endpos,$recstart,$recend); + $block_passed = 1; $job->set_virgin(0); $something_written += $job->non_block_write(); last; @@ -484,6 +490,7 @@ sub round_robin_write { # http://docstore.mik.ua/orelly/perl/cookbook/ch07_15.htm start_more_jobs(); + return $something_written; } @@ -494,10 +501,7 @@ sub write_record_to_pipe { if($endpos == 0) { return 0; } if(vec($Global::job_already_run,$chunk_number,1)) { return 1; } if($opt::roundrobin) { - my $block = $$record_ref; - substr($block,$endpos,length $block) = ""; - round_robin_write($block); - return; + return round_robin_write($header_ref,$record_ref,$recstart,$recend,$endpos); } # If no virgin found, backoff my $sleep = 0.0001; # 0.01 ms - better performance on highend @@ -519,10 +523,7 @@ sub write_record_to_pipe { substr($$record_ref,$endpos,length $$record_ref) = ""; # Remove rec_sep if($opt::remove_rec_sep) { - # Remove record separator - $$record_ref =~ s/$recend$recstart//gos; - $$record_ref =~ s/^$recstart//os; - $$record_ref =~ s/$recend$//os; + remove_rec_sep($record_ref,$recstart,$recend); } $job->write($header_ref); $job->write($record_ref); @@ -3688,28 +3689,62 @@ sub write { syswrite($in,$$remaining_ref); } +sub set_stdin_buffer { + my $self = shift; + my ($block_ref,$endpos,$recstart,$recend) = @_; + $self->{'stdin_buffer'} = substr($$block_ref,0,$endpos); + if($opt::remove_rec_sep) { + remove_rec_sep(\$self->{'stdin_buffer'},$recstart,$recend); + } + $self->{'stdin_buffer_length'} = length $self->{'stdin_buffer'}; + $self->{'stdin_buffer_pos'} = 0; +} + +sub stdin_buffer_length { + my $self = shift; + return $self->{'stdin_buffer_length'}; +} + +sub remove_rec_sep { + my ($block_ref,$recstart,$recend) = @_; + # Remove record separator + $$block_ref =~ s/$recend$recstart//gos; + $$block_ref =~ s/^$recstart//os; + $$block_ref =~ s/$recend$//os; +} + sub non_block_write { my $self = shift; my $something_written = 0; use POSIX qw(:errno_h); + use Fcntl; - my $in = $self->{'stdin'}; - my $rv = syswrite($in, $self->{'rest'}); - ::debug("Wrote $rv of :".$self->{'rest'}.":\n"); - if (!defined($rv) && $! == EAGAIN) { - # would block - $something_written = 0; - } elsif ($rv != length $self->{'rest'}) { - # incomplete write - # Remove the written part - substr($self->{'rest'},0,$rv) = ""; - $something_written = 1; - } else { - # successfully wrote - substr($self->{'rest'},0,$rv) = ""; - $something_written = 1; + my $flags = ''; + + + for my $buf (substr($self->{'stdin_buffer'},$self->{'stdin_buffer_pos'})) { + my $in = $self->{'stdin'}; +# fcntl($in, F_GETFL, $flags) +# or die "Couldn't get flags for HANDLE : $!\n"; +# $flags |= O_NONBLOCK; +# fcntl($in, F_SETFL, $flags) +# or die "Couldn't set flags for HANDLE: $!\n"; + my $rv = syswrite($in, $buf); + if (!defined($rv) && $! == EAGAIN) { + # would block + $something_written = 0; + } elsif ($self->{'stdin_buffer_pos'}+$rv != $self->{'stdin_buffer_length'}) { + # incomplete write + # Remove the written part + $self->{'stdin_buffer_pos'} += $rv; + $something_written = $rv; + } else { + # successfully wrote everything + my $a=""; + $self->set_stdin_buffer(\$a,0,"",""); + $something_written = $rv; + } } - ::debug("Rest :".$self->{'rest'}.":\n"); ::debug("Non-block: $something_written"); return $something_written; } diff --git a/src/sql b/src/sql index 2bd7cef1..6aa05c5b 100755 --- a/src/sql +++ b/src/sql @@ -295,7 +295,7 @@ B can help +If the access to the database fails occasionally B<--retries> can help make sure the query succeeds: B