diff --git a/src/parallel b/src/parallel index 55229df5..5e47745a 100755 --- a/src/parallel +++ b/src/parallel @@ -80,7 +80,7 @@ sub set_input_source_header($$) { for my $s (@opt::a) { # ::: are put into files and given a filehandle # ignore these and only keep the filenames. - fileno $s and next; + fileno $s and next; for(@$command_ref, @Global::ret_files, @Global::transfer_files, $opt::tagstring, $opt::workdir, $opt::results, $opt::retries, @@ -2236,10 +2236,10 @@ sub parse_options(@) { if(defined $opt::tagstring) { $opt::tagstring = unquote_printf($opt::tagstring); if($opt::tagstring =~ - /\Q$Global::parensleft\E.*\Q$Global::parensright\E/ + /\Q$Global::parensleft\E.*\S+.*\Q$Global::parensright\E/ and $Global::linebuffer) { - # --tagstring contains {= =} and --linebuffer => + # --tagstring contains {= ... =} and --linebuffer => # recompute replacement string for each use (do not cache) $Global::cache_replacement_eval = 0; } @@ -5452,7 +5452,7 @@ sub reaper() { # Update average runtime for timeout only for successful jobs $Global::timeoutq->update_median_runtime($job->runtime()); } - if($opt::keeporder) { + if($opt::keeporder and not $opt::latestline) { $job->print_earlier_jobs(); } else { $job->print(); @@ -5500,6 +5500,7 @@ sub kill_sleep_seq(@) { if(not @term_seq) { @term_seq = ("TERM",200,"TERM",100,"TERM",50,"KILL",25); } + # for each signal+waittime: kill process groups still not dead while(@term_seq) { @pgrps = kill_sleep(shift @term_seq, shift @term_seq, @pgrps); } @@ -6642,7 +6643,7 @@ sub reap_usleep() { exit_if_disk_full(); if($Global::linebuffer) { my $something_printed = 0; - if($opt::keeporder) { + if($opt::keeporder and not $opt::latestline) { for my $job (values %Global::running) { $something_printed += $job->print_earlier_jobs(); } @@ -6956,7 +6957,7 @@ sub new($$) { } elsif($s =~ s/^(\\[\[\]box0-9a-f.]+)//i) { # RFC2673 allows for: # \[b11010000011101] \[o64072/14] \[xd074/14] \[208.116.0.0/14] - $host = $1; + $host = $1; } elsif($s =~ s/^\[([0-9a-f:]+)\]//i or $s =~ s/^([0-9a-f:]+)//i) { @@ -6980,7 +6981,7 @@ sub new($$) { # 2001:db8::1#80 $port = $1; } - + if($s and $s ne ':') { ::die_bug("SSHLogin parser failed on '$origs' => '$s'"); } @@ -8999,12 +9000,12 @@ sub replaced($) { { my $next_available_row; - + sub row($) { my $self = shift; if(not defined $self->{'row'}) { if($opt::keeporder) { - $self->{'row'} = $self->seq(); + $self->{'row'} = $self->seq()-1; } else { $self->{'row'} = $next_available_row++; } @@ -9125,7 +9126,21 @@ sub openoutputfiles($) { my $self = shift; my ($outfhw, $errfhw, $outname, $errname); - if($Global::linebuffer and not + if($opt::latestline) { + # Do not save to files: Use non-blocking pipe + my ($outfhr, $errfhr); + pipe($outfhr, $outfhw) || die; + $self->set_fh(1,'w',$outfhw); + $self->set_fh(2,'w',$outfhw); + $self->set_fh(1,'r',$outfhr); + $self->set_fh(2,'r',$outfhr); + # Make it possible to read non-blocking from the pipe + for my $fdno (1,2) { + ::set_fh_non_blocking($self->fh($fdno,'r')); + } + # Return immediately because we do not need setting filenames + return; + } elsif($Global::linebuffer and not ($opt::keeporder or $opt::files or $opt::results or $opt::compress or $opt::compress_program or $opt::decompress_program)) { @@ -11020,7 +11035,13 @@ sub print($) { } my $returnsize = $self->returnsize(); - for my $fdno (sort { $a <=> $b } keys %Global::fh) { + my @fdno; + if($opt::latestline) { + @fdno = (1); + } else { + @fdno = (sort { $a <=> $b } keys %Global::fh); + } + for my $fdno (@fdno) { # Sort by file descriptor numerically: 1,2,3,..,9,10,11 $fdno == 0 and next; my $out_fh = $Global::fh{$fdno}; @@ -11282,26 +11303,42 @@ sub print_files($) { # --bar { my ($up,$init,$currow,$maxrow); + my ($minvisible,%print_later,%notvisible); sub print_linebuffer($) { sub print_latest_line($) { my $self = shift; my $out_fh = shift; - my $str = shift; my $row = $self->row(); + # Is row visible? + if(not ($minvisible <= $row + and + $row < $minvisible -1 + ::terminal_rows())) { + return; + } my ($color,$reset_color) = $self->color(); - my $tag = $self->tag(); - my $untabify_tag = $self->untabtag(); - my $untabify_str = $str; - $untabify_str =~ s/\t/" "x(8-($-[0]%8))/eg; + # Strings with TABs give the wrong length. + # Untabify strings, add " " till full terminal width to overwrite + # earlier longer line. my $termcol = ::terminal_columns(); - my $taglen = $termcol; - my $strlen = $termcol-(length $untabify_tag)-(length $untabify_str); - $untabify_str = substr($untabify_str,0,$strlen); + my $untabify_tag = $self->untabtag(); + my $taglen = ::min(length $untabify_tag,$termcol); + my $strlen = $termcol - $taglen; + my $untabify_str = $self->{$out_fh,'latestline'}; + $untabify_str =~ s/\t/" "x(8-($-[0]%8))/eg; + my $strspc = $strlen - length $untabify_str; + if($strspc > 0) { + # Line is shorter than terminal width: fill with spc + $untabify_str = $untabify_str." "x$strspc; + } else { + # Line is longer than terminal width: chop + add ">" + $untabify_str = substr($untabify_str,0,$strlen-1).">"; + } $untabify_tag = substr($untabify_tag,0,$taglen); - + $maxrow = $row > $maxrow ? $row : $maxrow; - print($out_fh + printf($out_fh + "%s%s%s%.${taglen}s%s%.${strlen}s%s%s", "$up"x($currow - $row), "\n"x($row - $currow), "\r", $untabify_tag, @@ -11324,6 +11361,7 @@ sub print_files($) { ::set_fh_blocking($self->fh($fdno,'r')); } } + if($opt::latestline) { $print_later{$self->row()} = $self; } } if(not $init) { $init = 1; @@ -11331,8 +11369,9 @@ sub print_files($) { # cursor_up cuu1 = up one line $up = `tput cuu1 virgin()) { @@ -11372,13 +11411,14 @@ sub print_files($) { $i = ((rindex($buf,"\n")+1) || (rindex($buf,"\r")+1)); if($i) { if($opt::latestline) { - # Remove the final \n/\r + # Keep the latest full line my $l = join('', @$halfline_ref, substr($buf,0,$i-1)); my $j = ((rindex($l,"\n")+1) || (rindex($l,"\r")+1)); - $self->print_latest_line($out_fh,substr($l,$j)); - # Remove the printed part by keeping the unprinted + $self->{$out_fh,'latestline'} = substr($l,$j); + # Remove the processed part + # by keeping the unprocessed part @$halfline_ref = (substr($buf,$i)); } else { # One or more complete lines were found @@ -11402,21 +11442,17 @@ sub print_files($) { # TODO --recend that can be partially in # @$halfline_ref substr($buf,0,$i-1) =~ - s/(?<=[\n\r])(?=.|$)/$tag/gs; - # The length changed, - # so find the new ending pos - $i = ::max((rindex($buf,"\n")+1), - (rindex($buf,"\r")+1)); + s/([\n\r])(?=.|$)/$1$tag/gs; } else { # Replace with freshly computed tag-value unshift @$halfline_ref, $self->tag(); substr($buf,0,$i-1) =~ - s/(?<=[\n\r])(?=.|$)/$self->tag()/gse; - # The length changed, - # so find the new ending pos - $i = ::max((rindex($buf,"\n")+1), - (rindex($buf,"\r")+1)); + s/([\n\r])(?=.|$)/$1.$self->tag()/gse; } + # The length changed, + # so find the new ending pos + $i = ::max((rindex($buf,"\n")+1), + (rindex($buf,"\r")+1)); # Print the partial line (halfline) # and the last half print $out_fh @$halfline_ref, substr($buf,0,$i); @@ -11439,8 +11475,26 @@ sub print_files($) { } } $self->add_returnsize($outputlength); + if($opt::latestline) { + $self->print_latest_line($out_fh); + } } if(defined $self->{'exitstatus'}) { + if($opt::latestline) { + # Print latest line from jobs that are already scrolled by + while($print_later{$minvisible}) { + $print_later{$minvisible}->print_latest_line($out_fh); + delete $print_later{$minvisible}; + $minvisible++; + } + # Print latest line from jobs that are on screen now + for(my $row = $minvisible; + $row < $minvisible -1 + ::terminal_rows(); + $row++) { + $print_later{$row} and + $print_later{$row}->print_latest_line($out_fh); + } + } if($opt::files or ($opt::results and not $Global::csvsep)) { $self->add_returnsize(-s $self->fh($fdno,"name")); } else { @@ -11711,11 +11765,13 @@ sub tag($) { sub untabtag($) { # tag with \t replaced with spaces my $self = shift; - if(not defined $self->{'untab'}{$self->{'tag'}}) { - $self->{'untab'}{$self->{'tag'}} = $self->{'tag'}; - $self->{'untab'}{$self->{'tag'}} =~ s/\t/" "x(8-($-[0]%8))/eg; + my $tag = $self->tag(); + if(not defined $self->{'untab'}{$tag}) { + my $t = $tag; + $t =~ s/\t/" "x(8-($-[0]%8))/eg; + $self->{'untab'}{$tag} = $t; } - return $self->{'untab'}{$self->{'tag'}}; + return $self->{'untab'}{$tag}; } { diff --git a/src/parallel.pod b/src/parallel.pod index 4baf9964..05095f45 100644 --- a/src/parallel.pod +++ b/src/parallel.pod @@ -1580,9 +1580,8 @@ See also: B<--memfree> B<--load> =item B<--ll> (alpha testing) -Print the lastest line of each running job. - -This only works if the currently running jobs fit on the screen. +Print the lastest line. Each job gets a single line that is updated +with the lastest output from the job. Example: @@ -1591,7 +1590,7 @@ Example: perl -ne '$|=1; for(split//){ print; select($a,$a,$a,0.03);}' } export -f slow_seq - parallel --shuf -j10 --ll --tag --bar --color slow_seq {} ::: {1..100} + parallel --shuf -j99 --ll --tag --bar --color slow_seq {} ::: {1..300} See also: B<--line-buffer>