Fixed bug --pipe --linebuffer --round does not distribute if the consumer is fast.

This commit is contained in:
Ole Tange 2014-11-14 01:00:56 +01:00
parent 1dd2d6a601
commit dce64026cc
4 changed files with 149 additions and 125 deletions

View file

@ -471,39 +471,46 @@ sub nindex {
return $i;
}
sub round_robin_write {
# Input:
# $header_ref = ref to $header string
# $block_ref = ref to $block to be written
# $recstart = record start string
# $recend = record end string
# $endpos = end position of $block
# Uses:
# %Global::running
my ($header_ref,$block_ref,$recstart,$recend,$endpos) = @_;
my $something_written = 0;
my $block_passed = 0;
my $sleep = 1;
while(not $block_passed) {
# Continue flushing existing buffers
# until one is empty and a new block is passed
while(my ($pid,$job) = each %Global::running) {
if($job->stdin_buffer_length() > 0) {
$something_written += $job->non_block_write();
} else {
$job->set_stdin_buffer($header_ref,$block_ref,$endpos,$recstart,$recend);
$block_passed = 1;
$job->set_virgin(0);
$something_written += $job->non_block_write();
last;
}
}
$sleep = ::reap_usleep($sleep);
}
start_more_jobs();
return $something_written;
}
{
my @robin_queue;
sub round_robin_write {
# Input:
# $header_ref = ref to $header string
# $block_ref = ref to $block to be written
# $recstart = record start string
# $recend = record end string
# $endpos = end position of $block
# Uses:
# %Global::running
my ($header_ref,$block_ref,$recstart,$recend,$endpos) = @_;
my $something_written = 0;
my $block_passed = 0;
my $sleep = 1;
while(not $block_passed) {
# Continue flushing existing buffers
# until one is empty and a new block is passed
# Make a queue to spread the blocks evenly
if(not @robin_queue) {
push @robin_queue, values %Global::running;
}
while(my $job = shift @robin_queue) {
if($job->stdin_buffer_length() > 0) {
$something_written += $job->non_block_write();
} else {
$job->set_stdin_buffer($header_ref,$block_ref,$endpos,$recstart,$recend);
$block_passed = 1;
$job->set_virgin(0);
$something_written += $job->non_block_write();
last;
}
}
$sleep = ::reap_usleep($sleep);
}
# start_more_jobs();
return $something_written;
}
}
sub write_record_to_pipe {
# Fork then
@ -608,7 +615,7 @@ sub options_hash {
"keep-order|keeporder|k" => \$opt::keeporder,
"group" => \$opt::group,
"g" => \$opt::retired,
"ungroup|u" => \$opt::u,
"ungroup|u" => \$opt::ungroup,
"linebuffer|linebuffered|line-buffer|line-buffered" => \$opt::linebuffer,
"tmux" => \$opt::tmux,
"null|0" => \$opt::0,
@ -780,7 +787,6 @@ sub parse_options {
$Global::infinity = 2**31;
$Global::debug = 0;
$Global::verbose = 0;
$Global::grouped = 1;
$Global::quoting = 0;
# Read only table with default --rpl values
%Global::replace =
@ -832,8 +838,6 @@ sub parse_options {
$Global::shell = $ENV{'PARALLEL_SHELL'} || parent_shell($$) || $ENV{'SHELL'} || "/bin/sh";
if(defined $opt::X) { $Global::ContextReplace = 1; }
if(defined $opt::silent) { $Global::verbose = 0; }
if(defined $opt::group) { $Global::grouped = 1; }
if(defined $opt::u) { $Global::grouped = 0; }
if(defined $opt::0) { $/ = "\0"; }
if(defined $opt::d) { my $e="sprintf \"$opt::d\""; $/ = eval $e; }
if(defined $opt::p) { $Global::interactive = $opt::p; }
@ -944,7 +948,7 @@ sub parse_options {
$opt::jobs = 1;
}
if(not defined $opt::group) {
$Global::grouped = 0;
$opt::ungroup = 0;
}
}
if(@opt::trc) {
@ -1564,13 +1568,13 @@ sub enough_file_handles {
# Check that we have enough filehandles available for starting
# another job
# Uses:
# $Global::grouped
# $opt::ungroup
# %Global::fd
# Returns:
# 1 if ungrouped (thus not needing extra filehandles)
# 0 if too few filehandles
# 1 if enough filehandles
if($Global::grouped) {
if(not $opt::ungroup) {
my %fh;
my $enough_filehandles = 1;
# perl uses 7 filehandles for something?
@ -2576,7 +2580,7 @@ sub onall {
join(" ",
((defined $opt::jobs) ? "-P $opt::jobs" : ""),
((defined $opt::linebuffer) ? "--linebuffer" : ""),
((defined $opt::u) ? "-u" : ""),
((defined $opt::ungroup) ? "-u" : ""),
((defined $opt::group) ? "-g" : ""),
((defined $opt::keeporder) ? "--keeporder" : ""),
((defined $opt::D) ? "-D $opt::D" : ""),
@ -2585,7 +2589,7 @@ sub onall {
);
my $suboptions =
join(" ",
((defined $opt::u) ? "-u" : ""),
((defined $opt::ungroup) ? "-u" : ""),
((defined $opt::linebuffer) ? "--linebuffer" : ""),
((defined $opt::group) ? "-g" : ""),
((defined $opt::files) ? "--files" : ""),
@ -5027,7 +5031,7 @@ sub openoutputfiles {
}
$self->set_fh(1,"unlink","");
$self->set_fh(2,"unlink","");
} elsif($Global::grouped) {
} elsif(not $opt::ungroup) {
# To group we create temporary files for STDOUT and STDERR
# To avoid the cleanup unlink the files immediately (but keep them open)
if(@Global::tee_jobs) {
@ -5075,7 +5079,7 @@ sub openoutputfiles {
$self->set_fh($fdno,'r',$fdr);
$self->set_fh($fdno,'rpid',$rpid);
}
} elsif($Global::grouped) {
} elsif(not $opt::ungroup) {
# Set reading FD if using --group (--ungroup does not need)
for my $fdno (1,2) {
# Re-open the file for reading
@ -5838,7 +5842,7 @@ sub start {
open OUT, '>&', $stdout_fh or ::die_bug("Can't redirect STDOUT: $!");
open ERR, '>&', $stderr_fh or ::die_bug("Can't dup STDOUT: $!");
if(($opt::dryrun or $Global::verbose) and not $Global::grouped) {
if(($opt::dryrun or $Global::verbose) and $opt::ungroup) {
if($Global::verbose <= 1) {
print $stdout_fh $job->replaced(),"\n";
} else {
@ -6017,15 +6021,17 @@ sub print {
if($opt::pipe and $self->virgin()) {
# Skip --joblog, --dryrun, --verbose
} else {
if($Global::joblog) { $self->print_joblog() }
if($Global::joblog and defined $self->{'exitstatus'}) {
# Add to joblog when finished
$self->print_joblog();
}
# Printing is only relevant for grouped output.
$Global::grouped or return;
# Printing is only relevant for grouped/--line-buffer output.
$opt::ungroup and return;
# Check for disk full
exit_if_disk_full();
my $command = $self->wrapped();
if(($opt::dryrun or $Global::verbose) and $Global::grouped
if(($opt::dryrun or $Global::verbose)
and
not $self->{'verbose_printed'}) {
$self->{'verbose_printed'}++;
@ -6033,7 +6039,7 @@ sub print {
print STDOUT $self->replaced(),"\n";
} else {
# Verbose level > 1: Print the rsync and stuff
print STDOUT $command,"\n";
print STDOUT $self->wrapped(),"\n";
}
# If STDOUT and STDERR are merged,
# we want the command to be printed first
@ -6058,7 +6064,8 @@ sub print {
close $self->fh($fdno,"w");
close $in_fh;
if($opt::pipe and $self->virgin()) {
# Nothing was printed to this job: # cleanup unused tmp files if --files was set
# Nothing was printed to this job:
# cleanup unused tmp files if --files was set
for my $fdno (1,2) {
unlink $self->fh($fdno,"name");
unlink $self->fh($fdno,"unlink");
@ -6068,74 +6075,7 @@ sub print {
}
} elsif($opt::linebuffer) {
# Line buffered print out
my $partial = \$self->{'partial_line',$fdno};
if(defined $self->{'exitstatus'}) {
# If the job is dead: close printing fh. Needed for --compress
close $self->fh($fdno,"w");
if($opt::compress && $opt::linebuffer) {
# Blocked reading in final round
$Global::use{"Fcntl"} ||= eval "use Fcntl qw(:DEFAULT :flock); 1;";
for my $fdno (1,2) {
my $fdr = $self->fh($fdno,'r');
my $flags;
fcntl($fdr, &F_GETFL, $flags) || die $!; # Get the current flags on the filehandle
$flags &= ~&O_NONBLOCK; # Remove non-blocking to the flags
fcntl($fdr, &F_SETFL, $flags) || die $!; # Set the flags on the filehandle
}
}
}
# This seek will clear EOF
seek $in_fh, tell($in_fh), 0;
# The read is non-blocking: The $in_fh is set to non-blocking.
# 32768 --tag = 5.1s
# 327680 --tag = 4.4s
# 1024000 --tag = 4.4s
# 3276800 --tag = 4.3s
# 32768000 --tag = 4.7s
# 10240000 --tag = 4.3s
while(read($in_fh,substr($$partial,length $$partial),3276800)) {
# Append to $$partial
# Find the last \n
my $i = rindex($$partial,"\n");
if($i != -1) {
# One or more complete lines were found
if($fdno == 2 and not $self->{'printed_first_line',$fdno}++) {
# OpenSSH_3.6.1p2 gives 'tcgetattr: Invalid argument' with -tt
# This is a crappy way of ignoring it.
$$partial =~ s/^(client_process_control: )?tcgetattr: Invalid argument\n//;
# Length of partial line has changed: Find the last \n again
$i = rindex($$partial,"\n");
}
if($opt::tag or defined $opt::tagstring) {
# Replace ^ with $tag within the full line
my $tag = $self->tag();
substr($$partial,0,$i+1) =~ s/^/$tag/gm;
# Length of partial line has changed: Find the last \n again
$i = rindex($$partial,"\n");
}
# Print up to and including the last \n
print $out_fd substr($$partial,0,$i+1);
# Remove the printed part
substr($$partial,0,$i+1)="";
}
}
if(defined $self->{'exitstatus'}) {
# If the job is dead: print the remaining partial line
# read remaining
if($$partial and ($opt::tag or defined $opt::tagstring)) {
my $tag = $self->tag();
$$partial =~ s/^/$tag/gm;
}
print $out_fd $$partial;
# Release the memory
$$partial = undef;
if($self->fh($fdno,"rpid") and CORE::kill 0, $self->fh($fdno,"rpid")) {
# decompress still running
} else {
# decompress done: close fh
close $in_fh;
}
}
$self->linebuffer_print($fdno,$in_fh,$out_fd);
} else {
my $buf;
close $self->fh($fdno,"w");
@ -6179,6 +6119,80 @@ sub print {
::debug("print", "<<joboutput @command\n");
}
sub linebuffer_print {
my $self = shift;
my ($fdno,$in_fh,$out_fd) = @_;
my $partial = \$self->{'partial_line',$fdno};
if(defined $self->{'exitstatus'}) {
# If the job is dead: close printing fh. Needed for --compress
close $self->fh($fdno,"w");
if($opt::compress) {
# Blocked reading in final round
$Global::use{"Fcntl"} ||= eval "use Fcntl qw(:DEFAULT :flock); 1;";
for my $fdno (1,2) {
my $fdr = $self->fh($fdno,'r');
my $flags;
fcntl($fdr, &F_GETFL, $flags) || die $!; # Get the current flags on the filehandle
$flags &= ~&O_NONBLOCK; # Remove non-blocking to the flags
fcntl($fdr, &F_SETFL, $flags) || die $!; # Set the flags on the filehandle
}
}
}
# This seek will clear EOF
seek $in_fh, tell($in_fh), 0;
# The read is non-blocking: The $in_fh is set to non-blocking.
# 32768 --tag = 5.1s
# 327680 --tag = 4.4s
# 1024000 --tag = 4.4s
# 3276800 --tag = 4.3s
# 32768000 --tag = 4.7s
# 10240000 --tag = 4.3s
while(read($in_fh,substr($$partial,length $$partial),3276800)) {
# Append to $$partial
# Find the last \n
my $i = rindex($$partial,"\n");
if($i != -1) {
# One or more complete lines were found
if($fdno == 2 and not $self->{'printed_first_line',$fdno}++) {
# OpenSSH_3.6.1p2 gives 'tcgetattr: Invalid argument' with -tt
# This is a crappy way of ignoring it.
$$partial =~ s/^(client_process_control: )?tcgetattr: Invalid argument\n//;
# Length of partial line has changed: Find the last \n again
$i = rindex($$partial,"\n");
}
if($opt::tag or defined $opt::tagstring) {
# Replace ^ with $tag within the full line
my $tag = $self->tag();
substr($$partial,0,$i+1) =~ s/^/$tag/gm;
# Length of partial line has changed: Find the last \n again
$i = rindex($$partial,"\n");
}
# Print up to and including the last \n
print $out_fd substr($$partial,0,$i+1);
# Remove the printed part
substr($$partial,0,$i+1)="";
}
}
if(defined $self->{'exitstatus'}) {
# If the job is dead: print the remaining partial line
# read remaining
if($$partial and ($opt::tag or defined $opt::tagstring)) {
my $tag = $self->tag();
$$partial =~ s/^/$tag/gm;
}
print $out_fd $$partial;
# Release the memory
$$partial = undef;
if($self->fh($fdno,"rpid") and CORE::kill 0, $self->fh($fdno,"rpid")) {
# decompress still running
} else {
# decompress done: close fh
close $in_fh;
}
}
}
sub print_joblog {
my $self = shift;
my $cmd;

View file

@ -34,6 +34,9 @@ echo '### test round-robin';
echo '### bug #43600: --pipe --linebuffer --round does not work'
seq 10000000000 | parallel --pipe --linebuffer --round cat | head
echo '### Check that 4 processes are really used'
seq 1000000 | parallel -j4 --pipe --round --line-buf wc |sort
echo '### --version must have higher priority than retired options'
$NICEPAR --version -g -Y -U -W -T | tail

View file

@ -13,7 +13,12 @@ echo 'ssh "$@"; echo "$@" >>/tmp/myssh2-run' >/tmp/myssh2
chmod 755 /tmp/myssh1 /tmp/myssh2
seq 1 100 | parallel --sshdelay 0.05 --sshlogin "/tmp/myssh1 $SSHLOGIN1,/tmp/myssh2 $SSHLOGIN2" -k echo
cat <<'EOF' | sed -e s/\$SERVER1/$SERVER1/\;s/\$SERVER2/$SERVER2/\;s/\$SSHLOGIN1/$SSHLOGIN1/\;s/\$SSHLOGIN2/$SSHLOGIN2/\;s/\$SSHLOGIN3/$SSHLOGIN3/ | parallel -vj2 -k -L1
cat <<'EOF' | sed -e s/\$SERVER1/$SERVER1/\;s/\$SERVER2/$SERVER2/\;s/\$SSHLOGIN1/$SSHLOGIN1/\;s/\$SSHLOGIN2/$SSHLOGIN2/\;s/\$SSHLOGIN3/$SSHLOGIN3/ | parallel -vj1 -k -L1
echo '### bug #41964: --controlmaster not seems to reuse OpenSSH connections to the same host'
(parallel -S redhat9.tange.dk true ::: {1..20}; echo No --controlmaster - finish last) &
(parallel -M -S redhat9.tange.dk true ::: {1..20}; echo With --controlmaster - finish first) &
wait
echo '### --filter-hosts - OK, non-such-user, connection refused, wrong host'
parallel --nonall --filter-hosts -S localhost,NoUser@localhost,154.54.72.206,"ssh 5.5.5.5" hostname
@ -29,9 +34,4 @@ echo '### test --filter-hosts with server w/o ssh, non-existing server'
echo '### Missing: test --filter-hosts proxied through the one host'
echo '### bug #41964: --controlmaster not seems to reuse OpenSSH connections to the same host'
(parallel -S redhat9.tange.dk true ::: {1..20}; echo No --controlmaster - finish last) &
(parallel -M -S redhat9.tange.dk true ::: {1..20}; echo With --controlmaster - finish first) &
wait
EOF

View file

@ -63,6 +63,13 @@ echo '### bug #43600: --pipe --linebuffer --round does not work'
8
9
10
echo '### Check that 4 processes are really used'
### Check that 4 processes are really used
seq 1000000 | parallel -j4 --pipe --round --line-buf wc |sort
149797 149797 1048579
235145 235145 1646016
299593 299593 2097151
315465 315465 2097150
echo '### --version must have higher priority than retired options'
### --version must have higher priority than retired options
$NICEPAR --version -g -Y -U -W -T | tail