mirror of
https://git.savannah.gnu.org/git/parallel.git
synced 2025-01-11 00:57:53 +00:00
parallel: initial --round-robin (blocking and inefficient).
This commit is contained in:
parent
895d84af65
commit
7166c90f98
79
src/parallel
79
src/parallel
|
@ -332,8 +332,8 @@ sub spreadstdin {
|
||||||
$recend = "(?:".$recend.")";
|
$recend = "(?:".$recend.")";
|
||||||
} else {
|
} else {
|
||||||
# $recstart/$recend = printf strings (\n)
|
# $recstart/$recend = printf strings (\n)
|
||||||
$recstart =~ s/\\([rnt'"\\])/"qq|\\$1|"/gee;
|
$recstart =~ s/\\([rnt\'\"\\])/"qq|\\$1|"/gee;
|
||||||
$recend =~ s/\\([rnt'"\\])/"qq|\\$1|"/gee;
|
$recend =~ s/\\([rnt\'\"\\])/"qq|\\$1|"/gee;
|
||||||
}
|
}
|
||||||
my $recendrecstart = $recend.$recstart;
|
my $recendrecstart = $recend.$recstart;
|
||||||
my $chunk_number = 1;
|
my $chunk_number = 1;
|
||||||
|
@ -430,6 +430,23 @@ sub spreadstdin {
|
||||||
|
|
||||||
::debug("Done reading input\n");
|
::debug("Done reading input\n");
|
||||||
$Global::start_no_new_jobs ||= 1;
|
$Global::start_no_new_jobs ||= 1;
|
||||||
|
if($opt::roundrobin) {
|
||||||
|
for my $job (values %Global::running) {
|
||||||
|
my $fh = $job->stdin();
|
||||||
|
close $fh;
|
||||||
|
}
|
||||||
|
my %incomplete_jobs = %Global::running;
|
||||||
|
while(keys %incomplete_jobs) {
|
||||||
|
for my $pid (keys %incomplete_jobs) {
|
||||||
|
my $job = $incomplete_jobs{$pid};
|
||||||
|
if(length $job->{'rest'} == 0) {
|
||||||
|
delete $incomplete_jobs{$pid}
|
||||||
|
} else {
|
||||||
|
$job->non_block_write();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
sub nindex {
|
sub nindex {
|
||||||
|
@ -447,13 +464,41 @@ sub nindex {
|
||||||
return $i;
|
return $i;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
sub round_robin_write {
|
||||||
|
my $block = shift;
|
||||||
|
# $Global::total_running
|
||||||
|
my $something_written = 0;
|
||||||
|
while($block) {
|
||||||
|
while(my ($pid,$job) = each %Global::running) {
|
||||||
|
if(length $job->{'rest'} > 0) {
|
||||||
|
$something_written += $job->non_block_write();
|
||||||
|
} else {
|
||||||
|
$job->{'rest'} = $block;
|
||||||
|
$block = "";
|
||||||
|
$job->set_virgin(0);
|
||||||
|
$something_written += $job->non_block_write();
|
||||||
|
last;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
# http://docstore.mik.ua/orelly/perl/cookbook/ch07_15.htm
|
||||||
|
start_more_jobs();
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
sub write_record_to_pipe {
|
sub write_record_to_pipe {
|
||||||
# Fork then
|
# Fork then
|
||||||
# Write record from pos 0 .. $endpos to pipe
|
# Write record from pos 0 .. $endpos to pipe
|
||||||
my ($chunk_number,$header_ref,$record_ref,$recstart,$recend,$endpos) = @_;
|
my ($chunk_number,$header_ref,$record_ref,$recstart,$recend,$endpos) = @_;
|
||||||
if($endpos == 0) { return 0; }
|
if($endpos == 0) { return 0; }
|
||||||
if(vec($Global::job_already_run,$chunk_number,1)) { return 1; }
|
if(vec($Global::job_already_run,$chunk_number,1)) { return 1; }
|
||||||
# Find the minimal seq $job that has no data written == virgin
|
if($opt::roundrobin) {
|
||||||
|
my $block = $$record_ref;
|
||||||
|
substr($block,$endpos,length $block) = "";
|
||||||
|
round_robin_write($block);
|
||||||
|
return;
|
||||||
|
}
|
||||||
# If no virgin found, backoff
|
# If no virgin found, backoff
|
||||||
my $sleep = 0.0001; # 0.01 ms - better performance on highend
|
my $sleep = 0.0001; # 0.01 ms - better performance on highend
|
||||||
while(not @Global::virgin_jobs) {
|
while(not @Global::virgin_jobs) {
|
||||||
|
@ -595,6 +640,7 @@ sub options_hash {
|
||||||
"plain" => \$opt::plain,
|
"plain" => \$opt::plain,
|
||||||
"profile|J=s" => \@opt::profile,
|
"profile|J=s" => \@opt::profile,
|
||||||
"pipe|spreadstdin" => \$opt::pipe,
|
"pipe|spreadstdin" => \$opt::pipe,
|
||||||
|
"robin|round-robin|roundrobin" => \$opt::roundrobin,
|
||||||
"recstart=s" => \$opt::recstart,
|
"recstart=s" => \$opt::recstart,
|
||||||
"recend=s" => \$opt::recend,
|
"recend=s" => \$opt::recend,
|
||||||
"regexp|regex" => \$opt::regexp,
|
"regexp|regex" => \$opt::regexp,
|
||||||
|
@ -3642,6 +3688,33 @@ sub write {
|
||||||
syswrite($in,$$remaining_ref);
|
syswrite($in,$$remaining_ref);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
sub non_block_write {
|
||||||
|
my $self = shift;
|
||||||
|
my $something_written = 0;
|
||||||
|
use POSIX qw(:errno_h);
|
||||||
|
|
||||||
|
my $in = $self->{'stdin'};
|
||||||
|
my $rv = syswrite($in, $self->{'rest'});
|
||||||
|
::debug("Wrote $rv of :".$self->{'rest'}.":\n");
|
||||||
|
if (!defined($rv) && $! == EAGAIN) {
|
||||||
|
# would block
|
||||||
|
$something_written = 0;
|
||||||
|
} elsif ($rv != length $self->{'rest'}) {
|
||||||
|
# incomplete write
|
||||||
|
# Remove the written part
|
||||||
|
substr($self->{'rest'},0,$rv) = "";
|
||||||
|
$something_written = 1;
|
||||||
|
} else {
|
||||||
|
# successfully wrote
|
||||||
|
substr($self->{'rest'},0,$rv) = "";
|
||||||
|
$something_written = 1;
|
||||||
|
}
|
||||||
|
::debug("Rest :".$self->{'rest'}.":\n");
|
||||||
|
::debug("Non-block: $something_written");
|
||||||
|
return $something_written;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
sub virgin {
|
sub virgin {
|
||||||
my $self = shift;
|
my $self = shift;
|
||||||
return $self->{'virgin'};
|
return $self->{'virgin'};
|
||||||
|
|
Loading…
Reference in a new issue