From 5b16a482e4da6e549162ffd9a9e3c9f866a2e8cc Mon Sep 17 00:00:00 2001 From: Ole Tange Date: Tue, 30 Jul 2013 17:13:31 +0200 Subject: [PATCH] src/parallel: Inplemented --linebuffer. --- src/parallel | 69 ++++++++++++++++++++++++++++++++++++++++++++-------- 1 file changed, 59 insertions(+), 10 deletions(-) diff --git a/src/parallel b/src/parallel index d6414ac1..da026ab6 100755 --- a/src/parallel +++ b/src/parallel @@ -585,6 +585,7 @@ sub options_hash { "group" => \$opt::group, "g" => \$opt::retired, "ungroup|u" => \$opt::u, + "linebuffer|linebuffered|line-buffer|line-buffered" => \$opt::linebuffer, "null|0" => \$opt::0, "quote|q" => \$opt::q, "I=s" => \$opt::I, @@ -2245,6 +2246,11 @@ sub reap_usleep { return $ms/2+0.001; } else { usleep($ms); + if($opt::linebuffer) { + for my $job (values %Global::running) { + $job->print(); + } + } # Sleep exponentially longer (1.1^n) if a job did not finish # though at most 1000 ms. return (($ms < 1000) ? ($ms * 1.1) : ($ms)); @@ -4454,7 +4460,10 @@ sub print { $Global::grouped or return; my $command = $self->sshlogin_wrap(); - if(($opt::dryrun or $Global::verbose) and $Global::grouped) { + if(($opt::dryrun or $Global::verbose) and $Global::grouped + and + not $self->{'verbose_printed'}) { + $self->{'verbose_printed'}++; if($Global::verbose <= 1) { print STDOUT $self->replaced(),"\n"; } else { @@ -4472,21 +4481,61 @@ sub print { my $out_fd = $Global::fd{$fdno}; my $in_fd = $self->fd($fdno); if(not $in_fd) { -# ::warning("File descriptor $fdno not defined\n"); + if(not $Job::file_descriptor_warning_printed{$fdno}++) { + ::warning("File descriptor $fdno not defined\n"); + } next; } - seek $in_fd, 0, 0; - if($Global::debug) { - print STDERR "File descriptor $fdno:\n"; - } + ::debug("File descriptor $fdno:"); if($opt::files) { $self->fd_file_name($fdno) and print $out_fd $self->fd_file_name($fdno),"\n"; + } elsif($opt::linebuffer) { + # Line buffered print out + my $partial = \$self->{'partial_line',$fdno}; + # This seek will clear EOF + seek $in_fd, tell($in_fd), 0; + while(read($in_fd,substr($$partial,length $$partial),1_000_000)) { + # Append to $$partial + # Find the last \n + my $i = rindex($$partial,"\n"); + if($i != -1) { + # One or more complete lines were found + if($fdno == 2 and not $self->{'printed_first_line',$fdno}++) { + # OpenSSH_3.6.1p2 gives 'tcgetattr: Invalid argument' with -tt + # This is a crappy way of ignoring it. + $$partial =~ s/^tcgetattr: Invalid argument\n//; + # Length of partial line has changed: Find the last \n again + $i = rindex($$partial,"\n"); + } + if($opt::tag or defined $opt::tagstring) { + # Replace ^ with $tag within the full line + my $tag = $self->tag(); + substr($$partial,0,$i+1) =~ s/^/$tag/gm; + # Length of partial line has changed: Find the last \n again + $i = rindex($$partial,"\n"); + } + # Print up to and including the last \n + print $out_fd substr($$partial,0,$i+1); + # Remove the printed part + substr($$partial,0,$i+1)=""; + } + } + + + if(defined $self->{'exitstatus'} and length $$partial > 0) { + # If the job is dead: print the remaining partial line + if($opt::tag or defined $opt::tagstring) { + my $tag = $self->tag(); + $$partial =~ s/^/$tag/gm; + } + print $out_fd $$partial; + $$partial = undef; + # then close fd + close $in_fd; + } } else { my $buf; seek $in_fd, 0, 0; - if($Global::debug) { - print STDOUT "OUT:\n"; - } if($opt::tag or defined $opt::tagstring) { my $tag = $self->tag(); if($fdno == 2) { @@ -4516,10 +4565,10 @@ sub print { print $out_fd $buf; } } + close $in_fd; } flush $out_fd; ::debug("<