diff --git a/src/parallel b/src/parallel index 6de56b50..d4e31333 100755 --- a/src/parallel +++ b/src/parallel @@ -332,8 +332,8 @@ sub spreadstdin { $recend = "(?:".$recend.")"; } else { # $recstart/$recend = printf strings (\n) - $recstart =~ s/\\([rnt'"\\])/"qq|\\$1|"/gee; - $recend =~ s/\\([rnt'"\\])/"qq|\\$1|"/gee; + $recstart =~ s/\\([rnt\'\"\\])/"qq|\\$1|"/gee; + $recend =~ s/\\([rnt\'\"\\])/"qq|\\$1|"/gee; } my $recendrecstart = $recend.$recstart; my $chunk_number = 1; @@ -430,6 +430,23 @@ sub spreadstdin { ::debug("Done reading input\n"); $Global::start_no_new_jobs ||= 1; + if($opt::roundrobin) { + for my $job (values %Global::running) { + my $fh = $job->stdin(); + close $fh; + } + my %incomplete_jobs = %Global::running; + while(keys %incomplete_jobs) { + for my $pid (keys %incomplete_jobs) { + my $job = $incomplete_jobs{$pid}; + if(length $job->{'rest'} == 0) { + delete $incomplete_jobs{$pid} + } else { + $job->non_block_write(); + } + } + } + } } sub nindex { @@ -447,13 +464,41 @@ sub nindex { return $i; } +sub round_robin_write { + my $block = shift; + # $Global::total_running + my $something_written = 0; + while($block) { + while(my ($pid,$job) = each %Global::running) { + if(length $job->{'rest'} > 0) { + $something_written += $job->non_block_write(); + } else { + $job->{'rest'} = $block; + $block = ""; + $job->set_virgin(0); + $something_written += $job->non_block_write(); + last; + } + } + } + + # http://docstore.mik.ua/orelly/perl/cookbook/ch07_15.htm + start_more_jobs(); +} + + sub write_record_to_pipe { # Fork then # Write record from pos 0 .. $endpos to pipe my ($chunk_number,$header_ref,$record_ref,$recstart,$recend,$endpos) = @_; if($endpos == 0) { return 0; } if(vec($Global::job_already_run,$chunk_number,1)) { return 1; } - # Find the minimal seq $job that has no data written == virgin + if($opt::roundrobin) { + my $block = $$record_ref; + substr($block,$endpos,length $block) = ""; + round_robin_write($block); + return; + } # If no virgin found, backoff my $sleep = 0.0001; # 0.01 ms - better performance on highend while(not @Global::virgin_jobs) { @@ -595,6 +640,7 @@ sub options_hash { "plain" => \$opt::plain, "profile|J=s" => \@opt::profile, "pipe|spreadstdin" => \$opt::pipe, + "robin|round-robin|roundrobin" => \$opt::roundrobin, "recstart=s" => \$opt::recstart, "recend=s" => \$opt::recend, "regexp|regex" => \$opt::regexp, @@ -3642,6 +3688,33 @@ sub write { syswrite($in,$$remaining_ref); } +sub non_block_write { + my $self = shift; + my $something_written = 0; + use POSIX qw(:errno_h); + + my $in = $self->{'stdin'}; + my $rv = syswrite($in, $self->{'rest'}); + ::debug("Wrote $rv of :".$self->{'rest'}.":\n"); + if (!defined($rv) && $! == EAGAIN) { + # would block + $something_written = 0; + } elsif ($rv != length $self->{'rest'}) { + # incomplete write + # Remove the written part + substr($self->{'rest'},0,$rv) = ""; + $something_written = 1; + } else { + # successfully wrote + substr($self->{'rest'},0,$rv) = ""; + $something_written = 1; + } + ::debug("Rest :".$self->{'rest'}.":\n"); + ::debug("Non-block: $something_written"); + return $something_written; +} + + sub virgin { my $self = shift; return $self->{'virgin'};