Merge branch 'master' of ssh://git.sv.gnu.org/srv/git/parallel

This commit is contained in:
Ole Tange 2013-07-18 22:10:38 +02:00
commit 9e3777be3c

View file

@ -332,8 +332,8 @@ sub spreadstdin {
$recend = "(?:".$recend.")"; $recend = "(?:".$recend.")";
} else { } else {
# $recstart/$recend = printf strings (\n) # $recstart/$recend = printf strings (\n)
$recstart =~ s/\\([rnt'"\\])/"qq|\\$1|"/gee; $recstart =~ s/\\([rnt\'\"\\])/"qq|\\$1|"/gee;
$recend =~ s/\\([rnt'"\\])/"qq|\\$1|"/gee; $recend =~ s/\\([rnt\'\"\\])/"qq|\\$1|"/gee;
} }
my $recendrecstart = $recend.$recstart; my $recendrecstart = $recend.$recstart;
my $chunk_number = 1; my $chunk_number = 1;
@ -430,6 +430,23 @@ sub spreadstdin {
::debug("Done reading input\n"); ::debug("Done reading input\n");
$Global::start_no_new_jobs ||= 1; $Global::start_no_new_jobs ||= 1;
if($opt::roundrobin) {
for my $job (values %Global::running) {
my $fh = $job->stdin();
close $fh;
}
my %incomplete_jobs = %Global::running;
while(keys %incomplete_jobs) {
for my $pid (keys %incomplete_jobs) {
my $job = $incomplete_jobs{$pid};
if(length $job->{'rest'} == 0) {
delete $incomplete_jobs{$pid}
} else {
$job->non_block_write();
}
}
}
}
} }
sub nindex { sub nindex {
@ -447,13 +464,41 @@ sub nindex {
return $i; return $i;
} }
sub round_robin_write {
my $block = shift;
# $Global::total_running
my $something_written = 0;
while($block) {
while(my ($pid,$job) = each %Global::running) {
if(length $job->{'rest'} > 0) {
$something_written += $job->non_block_write();
} else {
$job->{'rest'} = $block;
$block = "";
$job->set_virgin(0);
$something_written += $job->non_block_write();
last;
}
}
}
# http://docstore.mik.ua/orelly/perl/cookbook/ch07_15.htm
start_more_jobs();
}
sub write_record_to_pipe { sub write_record_to_pipe {
# Fork then # Fork then
# Write record from pos 0 .. $endpos to pipe # Write record from pos 0 .. $endpos to pipe
my ($chunk_number,$header_ref,$record_ref,$recstart,$recend,$endpos) = @_; my ($chunk_number,$header_ref,$record_ref,$recstart,$recend,$endpos) = @_;
if($endpos == 0) { return 0; } if($endpos == 0) { return 0; }
if(vec($Global::job_already_run,$chunk_number,1)) { return 1; } if(vec($Global::job_already_run,$chunk_number,1)) { return 1; }
# Find the minimal seq $job that has no data written == virgin if($opt::roundrobin) {
my $block = $$record_ref;
substr($block,$endpos,length $block) = "";
round_robin_write($block);
return;
}
# If no virgin found, backoff # If no virgin found, backoff
my $sleep = 0.0001; # 0.01 ms - better performance on highend my $sleep = 0.0001; # 0.01 ms - better performance on highend
while(not @Global::virgin_jobs) { while(not @Global::virgin_jobs) {
@ -595,6 +640,7 @@ sub options_hash {
"plain" => \$opt::plain, "plain" => \$opt::plain,
"profile|J=s" => \@opt::profile, "profile|J=s" => \@opt::profile,
"pipe|spreadstdin" => \$opt::pipe, "pipe|spreadstdin" => \$opt::pipe,
"robin|round-robin|roundrobin" => \$opt::roundrobin,
"recstart=s" => \$opt::recstart, "recstart=s" => \$opt::recstart,
"recend=s" => \$opt::recend, "recend=s" => \$opt::recend,
"regexp|regex" => \$opt::regexp, "regexp|regex" => \$opt::regexp,
@ -3643,6 +3689,33 @@ sub write {
syswrite($stdin_fh,$$remaining_ref); syswrite($stdin_fh,$$remaining_ref);
} }
sub non_block_write {
my $self = shift;
my $something_written = 0;
use POSIX qw(:errno_h);
my $in = $self->{'stdin'};
my $rv = syswrite($in, $self->{'rest'});
::debug("Wrote $rv of :".$self->{'rest'}.":\n");
if (!defined($rv) && $! == EAGAIN) {
# would block
$something_written = 0;
} elsif ($rv != length $self->{'rest'}) {
# incomplete write
# Remove the written part
substr($self->{'rest'},0,$rv) = "";
$something_written = 1;
} else {
# successfully wrote
substr($self->{'rest'},0,$rv) = "";
$something_written = 1;
}
::debug("Rest :".$self->{'rest'}.":\n");
::debug("Non-block: $something_written");
return $something_written;
}
sub virgin { sub virgin {
my $self = shift; my $self = shift;
return $self->{'virgin'}; return $self->{'virgin'};