Fixed bug #48290: round-robin does not distribute data based on business.

This commit is contained in:
Ole Tange 2016-07-04 12:55:16 +02:00
parent a62b528382
commit 805d924c16
3 changed files with 56 additions and 20 deletions

View file

@ -2029,6 +2029,35 @@ sub open_or_exit {
return $fh; return $fh;
} }
sub set_fh_blocking {
# Set filehandle as blocking
# Inputs:
# $fh = filehandle to be blocking
# Returns:
# N/A
my $fh = shift;
$Global::use{"Fcntl"} ||= eval "use Fcntl qw(:DEFAULT :flock); 1;";
my $flags;
fcntl($fh, &F_GETFL, $flags) || die $!; # Get the current flags on the filehandle
$flags &= ~&O_NONBLOCK; # Remove non-blocking to the flags
fcntl($fh, &F_SETFL, $flags) || die $!; # Set the flags on the filehandle
}
sub set_fh_non_blocking {
# Set filehandle as non-blocking
# Inputs:
# $fh = filehandle to be blocking
# Returns:
# N/A
my $fh = shift;
$Global::use{"Fcntl"} ||= eval "use Fcntl qw(:DEFAULT :flock); 1;";
my $flags;
fcntl($fh, &F_GETFL, $flags) || die $!; # Get the current flags on the filehandle
$flags |= &O_NONBLOCK; # Add non-blocking to the flags
fcntl($fh, &F_SETFL, $flags) || die $!; # Set the flags on the filehandle
}
sub __RUNNING_THE_JOBS_AND_PRINTING_PROGRESS__ {} sub __RUNNING_THE_JOBS_AND_PRINTING_PROGRESS__ {}
# Variable structure: # Variable structure:
@ -6344,7 +6373,11 @@ sub openoutputfiles {
$self->grouped(); $self->grouped();
} }
if($opt::linebuffer) { if($opt::linebuffer) {
$self->set_non_blocking(); # Make it possible to read non-blocking from
# the buffer files
for my $fdno (1,2) {
::set_fh_non_blocking($self->fh($fdno,'r'));
}
} }
} }
@ -6424,18 +6457,6 @@ sub filter_through_compress {
} }
} }
sub set_non_blocking {
my $self = shift;
$Global::use{"Fcntl"} ||= eval "use Fcntl qw(:DEFAULT :flock); 1;";
for my $fdno (1,2) {
my $fdr = $self->fh($fdno,'r');
my $flags;
fcntl($fdr, &::F_GETFL, $flags) || die $!; # Get the current flags on the filehandle
$flags |= &::O_NONBLOCK; # Add non-blocking to the flags
fcntl($fdr, &::F_SETFL, $flags) || die $!; # Set the flags on the filehandle
}
}
sub max_file_name_length { sub max_file_name_length {
# Figure out the max length of a subdir # Figure out the max length of a subdir
# TODO and the max total length # TODO and the max total length
@ -6552,8 +6573,8 @@ sub non_blocking_write {
my $rv = syswrite($in, my $rv = syswrite($in,
substr($self->{'block'},$self->{'block_pos'})); substr($self->{'block'},$self->{'block_pos'}));
if (!defined($rv) && $! == EAGAIN) { if (!defined($rv) && $! == EAGAIN) {
# would block # would block - but would have written
$something_written = 0; $something_written = 1;
} elsif ($self->{'block_pos'}+$rv != $self->{'block_length'}) { } elsif ($self->{'block_pos'}+$rv != $self->{'block_length'}) {
# incomplete write # incomplete write
# Remove the written part # Remove the written part
@ -7586,6 +7607,9 @@ sub start {
if($opt::pipe) { if($opt::pipe) {
my ($stdin_fh) = ::gensym(); my ($stdin_fh) = ::gensym();
$pid = open3_setpgrp($stdin_fh,$stdout_fh,$stderr_fh,$command); $pid = open3_setpgrp($stdin_fh,$stdout_fh,$stderr_fh,$command);
if($opt::roundrobin) {
::set_fh_non_blocking($stdin_fh);
}
$job->set_fh(0,"w",$stdin_fh); $job->set_fh(0,"w",$stdin_fh);
} elsif(@opt::a and not $Global::stdin_in_opt_a and $job->seq() == 1 } elsif(@opt::a and not $Global::stdin_in_opt_a and $job->seq() == 1
and $job->sshlogin()->string() eq ":") { and $job->sshlogin()->string() eq ":") {
@ -7978,11 +8002,7 @@ sub linebuffer_print {
# Blocked reading in final round # Blocked reading in final round
$Global::use{"Fcntl"} ||= eval "use Fcntl qw(:DEFAULT :flock); 1;"; $Global::use{"Fcntl"} ||= eval "use Fcntl qw(:DEFAULT :flock); 1;";
for my $fdno (1,2) { for my $fdno (1,2) {
my $fdr = $self->fh($fdno,'r'); ::set_fh_blocking($self->fh($fdno,'r'));
my $flags;
fcntl($fdr, &F_GETFL, $flags) || die $!; # Get the current flags on the filehandle
$flags &= ~&O_NONBLOCK; # Remove non-blocking to the flags
fcntl($fdr, &F_SETFL, $flags) || die $!; # Set the flags on the filehandle
} }
} }
} }

View file

@ -23,5 +23,8 @@ echo "### Test --tmpdir running full. bug #40733 was caused by this"
echo '**' echo '**'
echo "### bug #48290: round-robin does not distribute data based on business"
echo "Jobslot 1 is 8 times slower than jobslot 8 and should get much less data"
seq 10000000 | parallel --tagstring {%} --linebuffer --compress -j8 --roundrobin --pipe --block 300k 'pv -qL {%}00000'| perl -ne '/^\d+/ and $s{$&}++; END { print map { "$_\n" } sort { $s{$a} <=> $s{$b} } keys %s}'
EOF EOF

View file

@ -19,3 +19,16 @@ parallel: Error: Output is incomplete. Cannot append to buffer file in /tmp/shm/
parallel: Error: Change $TMPDIR with --tmpdir or use --compress. parallel: Error: Change $TMPDIR with --tmpdir or use --compress.
echo '**' echo '**'
** **
echo "### bug #48290: round-robin does not distribute data based on business"
### bug #48290: round-robin does not distribute data based on business
echo "Jobslot 1 is 8 times slower than jobslot 8 and should get much less data"
Jobslot 1 is 8 times slower than jobslot 8 and should get much less data
seq 10000000 | parallel --tagstring {%} --linebuffer --compress -j8 --roundrobin --pipe --block 300k 'pv -qL {%}00000'| perl -ne '/^\d+/ and $s{$&}++; END { print map { "$_\n" } sort { $s{$a} <=> $s{$b} } keys %s}'
1
2
3
5
4
7
6
8