parallel: Keep --latest-line for jobs not visible yet.

This commit is contained in:
Ole Tange 2022-11-03 22:56:09 +01:00
parent 5d6f3a4f98
commit 3fa3bb6c5f
2 changed files with 99 additions and 44 deletions

View file

@ -80,7 +80,7 @@ sub set_input_source_header($$) {
for my $s (@opt::a) { for my $s (@opt::a) {
# ::: are put into files and given a filehandle # ::: are put into files and given a filehandle
# ignore these and only keep the filenames. # ignore these and only keep the filenames.
fileno $s and next; fileno $s and next;
for(@$command_ref, @Global::ret_files, for(@$command_ref, @Global::ret_files,
@Global::transfer_files, $opt::tagstring, @Global::transfer_files, $opt::tagstring,
$opt::workdir, $opt::results, $opt::retries, $opt::workdir, $opt::results, $opt::retries,
@ -2236,10 +2236,10 @@ sub parse_options(@) {
if(defined $opt::tagstring) { if(defined $opt::tagstring) {
$opt::tagstring = unquote_printf($opt::tagstring); $opt::tagstring = unquote_printf($opt::tagstring);
if($opt::tagstring =~ if($opt::tagstring =~
/\Q$Global::parensleft\E.*\Q$Global::parensright\E/ /\Q$Global::parensleft\E.*\S+.*\Q$Global::parensright\E/
and and
$Global::linebuffer) { $Global::linebuffer) {
# --tagstring contains {= =} and --linebuffer => # --tagstring contains {= ... =} and --linebuffer =>
# recompute replacement string for each use (do not cache) # recompute replacement string for each use (do not cache)
$Global::cache_replacement_eval = 0; $Global::cache_replacement_eval = 0;
} }
@ -5452,7 +5452,7 @@ sub reaper() {
# Update average runtime for timeout only for successful jobs # Update average runtime for timeout only for successful jobs
$Global::timeoutq->update_median_runtime($job->runtime()); $Global::timeoutq->update_median_runtime($job->runtime());
} }
if($opt::keeporder) { if($opt::keeporder and not $opt::latestline) {
$job->print_earlier_jobs(); $job->print_earlier_jobs();
} else { } else {
$job->print(); $job->print();
@ -5500,6 +5500,7 @@ sub kill_sleep_seq(@) {
if(not @term_seq) { if(not @term_seq) {
@term_seq = ("TERM",200,"TERM",100,"TERM",50,"KILL",25); @term_seq = ("TERM",200,"TERM",100,"TERM",50,"KILL",25);
} }
# for each signal+waittime: kill process groups still not dead
while(@term_seq) { while(@term_seq) {
@pgrps = kill_sleep(shift @term_seq, shift @term_seq, @pgrps); @pgrps = kill_sleep(shift @term_seq, shift @term_seq, @pgrps);
} }
@ -6642,7 +6643,7 @@ sub reap_usleep() {
exit_if_disk_full(); exit_if_disk_full();
if($Global::linebuffer) { if($Global::linebuffer) {
my $something_printed = 0; my $something_printed = 0;
if($opt::keeporder) { if($opt::keeporder and not $opt::latestline) {
for my $job (values %Global::running) { for my $job (values %Global::running) {
$something_printed += $job->print_earlier_jobs(); $something_printed += $job->print_earlier_jobs();
} }
@ -6956,7 +6957,7 @@ sub new($$) {
} elsif($s =~ s/^(\\[\[\]box0-9a-f.]+)//i) { } elsif($s =~ s/^(\\[\[\]box0-9a-f.]+)//i) {
# RFC2673 allows for: # RFC2673 allows for:
# \[b11010000011101] \[o64072/14] \[xd074/14] \[208.116.0.0/14] # \[b11010000011101] \[o64072/14] \[xd074/14] \[208.116.0.0/14]
$host = $1; $host = $1;
} elsif($s =~ s/^\[([0-9a-f:]+)\]//i } elsif($s =~ s/^\[([0-9a-f:]+)\]//i
or or
$s =~ s/^([0-9a-f:]+)//i) { $s =~ s/^([0-9a-f:]+)//i) {
@ -6980,7 +6981,7 @@ sub new($$) {
# 2001:db8::1#80 # 2001:db8::1#80
$port = $1; $port = $1;
} }
if($s and $s ne ':') { if($s and $s ne ':') {
::die_bug("SSHLogin parser failed on '$origs' => '$s'"); ::die_bug("SSHLogin parser failed on '$origs' => '$s'");
} }
@ -8999,12 +9000,12 @@ sub replaced($) {
{ {
my $next_available_row; my $next_available_row;
sub row($) { sub row($) {
my $self = shift; my $self = shift;
if(not defined $self->{'row'}) { if(not defined $self->{'row'}) {
if($opt::keeporder) { if($opt::keeporder) {
$self->{'row'} = $self->seq(); $self->{'row'} = $self->seq()-1;
} else { } else {
$self->{'row'} = $next_available_row++; $self->{'row'} = $next_available_row++;
} }
@ -9125,7 +9126,21 @@ sub openoutputfiles($) {
my $self = shift; my $self = shift;
my ($outfhw, $errfhw, $outname, $errname); my ($outfhw, $errfhw, $outname, $errname);
if($Global::linebuffer and not if($opt::latestline) {
# Do not save to files: Use non-blocking pipe
my ($outfhr, $errfhr);
pipe($outfhr, $outfhw) || die;
$self->set_fh(1,'w',$outfhw);
$self->set_fh(2,'w',$outfhw);
$self->set_fh(1,'r',$outfhr);
$self->set_fh(2,'r',$outfhr);
# Make it possible to read non-blocking from the pipe
for my $fdno (1,2) {
::set_fh_non_blocking($self->fh($fdno,'r'));
}
# Return immediately because we do not need setting filenames
return;
} elsif($Global::linebuffer and not
($opt::keeporder or $opt::files or $opt::results or ($opt::keeporder or $opt::files or $opt::results or
$opt::compress or $opt::compress_program or $opt::compress or $opt::compress_program or
$opt::decompress_program)) { $opt::decompress_program)) {
@ -11020,7 +11035,13 @@ sub print($) {
} }
my $returnsize = $self->returnsize(); my $returnsize = $self->returnsize();
for my $fdno (sort { $a <=> $b } keys %Global::fh) { my @fdno;
if($opt::latestline) {
@fdno = (1);
} else {
@fdno = (sort { $a <=> $b } keys %Global::fh);
}
for my $fdno (@fdno) {
# Sort by file descriptor numerically: 1,2,3,..,9,10,11 # Sort by file descriptor numerically: 1,2,3,..,9,10,11
$fdno == 0 and next; $fdno == 0 and next;
my $out_fh = $Global::fh{$fdno}; my $out_fh = $Global::fh{$fdno};
@ -11282,26 +11303,42 @@ sub print_files($) {
# --bar # --bar
{ {
my ($up,$init,$currow,$maxrow); my ($up,$init,$currow,$maxrow);
my ($minvisible,%print_later,%notvisible);
sub print_linebuffer($) { sub print_linebuffer($) {
sub print_latest_line($) { sub print_latest_line($) {
my $self = shift; my $self = shift;
my $out_fh = shift; my $out_fh = shift;
my $str = shift;
my $row = $self->row(); my $row = $self->row();
# Is row visible?
if(not ($minvisible <= $row
and
$row < $minvisible -1 + ::terminal_rows())) {
return;
}
my ($color,$reset_color) = $self->color(); my ($color,$reset_color) = $self->color();
my $tag = $self->tag(); # Strings with TABs give the wrong length.
my $untabify_tag = $self->untabtag(); # Untabify strings, add " " till full terminal width to overwrite
my $untabify_str = $str; # earlier longer line.
$untabify_str =~ s/\t/" "x(8-($-[0]%8))/eg;
my $termcol = ::terminal_columns(); my $termcol = ::terminal_columns();
my $taglen = $termcol; my $untabify_tag = $self->untabtag();
my $strlen = $termcol-(length $untabify_tag)-(length $untabify_str); my $taglen = ::min(length $untabify_tag,$termcol);
$untabify_str = substr($untabify_str,0,$strlen); my $strlen = $termcol - $taglen;
my $untabify_str = $self->{$out_fh,'latestline'};
$untabify_str =~ s/\t/" "x(8-($-[0]%8))/eg;
my $strspc = $strlen - length $untabify_str;
if($strspc > 0) {
# Line is shorter than terminal width: fill with spc
$untabify_str = $untabify_str." "x$strspc;
} else {
# Line is longer than terminal width: chop + add ">"
$untabify_str = substr($untabify_str,0,$strlen-1).">";
}
$untabify_tag = substr($untabify_tag,0,$taglen); $untabify_tag = substr($untabify_tag,0,$taglen);
$maxrow = $row > $maxrow ? $row : $maxrow; $maxrow = $row > $maxrow ? $row : $maxrow;
print($out_fh printf($out_fh
"%s%s%s%.${taglen}s%s%.${strlen}s%s%s",
"$up"x($currow - $row), "$up"x($currow - $row),
"\n"x($row - $currow), "\n"x($row - $currow),
"\r", $untabify_tag, "\r", $untabify_tag,
@ -11324,6 +11361,7 @@ sub print_files($) {
::set_fh_blocking($self->fh($fdno,'r')); ::set_fh_blocking($self->fh($fdno,'r'));
} }
} }
if($opt::latestline) { $print_later{$self->row()} = $self; }
} }
if(not $init) { if(not $init) {
$init = 1; $init = 1;
@ -11331,8 +11369,9 @@ sub print_files($) {
# cursor_up cuu1 = up one line # cursor_up cuu1 = up one line
$up = `tput cuu1 </dev/tty`; $up = `tput cuu1 </dev/tty`;
chomp($up); chomp($up);
$currow = 1; $currow = 0;
$maxrow = 1; $maxrow = 0;
$minvisible = 0;
} }
} }
if(not $self->virgin()) { if(not $self->virgin()) {
@ -11372,13 +11411,14 @@ sub print_files($) {
$i = ((rindex($buf,"\n")+1) || (rindex($buf,"\r")+1)); $i = ((rindex($buf,"\n")+1) || (rindex($buf,"\r")+1));
if($i) { if($i) {
if($opt::latestline) { if($opt::latestline) {
# Remove the final \n/\r # Keep the latest full line
my $l = join('', @$halfline_ref, my $l = join('', @$halfline_ref,
substr($buf,0,$i-1)); substr($buf,0,$i-1));
my $j = ((rindex($l,"\n")+1) || my $j = ((rindex($l,"\n")+1) ||
(rindex($l,"\r")+1)); (rindex($l,"\r")+1));
$self->print_latest_line($out_fh,substr($l,$j)); $self->{$out_fh,'latestline'} = substr($l,$j);
# Remove the printed part by keeping the unprinted # Remove the processed part
# by keeping the unprocessed part
@$halfline_ref = (substr($buf,$i)); @$halfline_ref = (substr($buf,$i));
} else { } else {
# One or more complete lines were found # One or more complete lines were found
@ -11402,21 +11442,17 @@ sub print_files($) {
# TODO --recend that can be partially in # TODO --recend that can be partially in
# @$halfline_ref # @$halfline_ref
substr($buf,0,$i-1) =~ substr($buf,0,$i-1) =~
s/(?<=[\n\r])(?=.|$)/$tag/gs; s/([\n\r])(?=.|$)/$1$tag/gs;
# The length changed,
# so find the new ending pos
$i = ::max((rindex($buf,"\n")+1),
(rindex($buf,"\r")+1));
} else { } else {
# Replace with freshly computed tag-value # Replace with freshly computed tag-value
unshift @$halfline_ref, $self->tag(); unshift @$halfline_ref, $self->tag();
substr($buf,0,$i-1) =~ substr($buf,0,$i-1) =~
s/(?<=[\n\r])(?=.|$)/$self->tag()/gse; s/([\n\r])(?=.|$)/$1.$self->tag()/gse;
# The length changed,
# so find the new ending pos
$i = ::max((rindex($buf,"\n")+1),
(rindex($buf,"\r")+1));
} }
# The length changed,
# so find the new ending pos
$i = ::max((rindex($buf,"\n")+1),
(rindex($buf,"\r")+1));
# Print the partial line (halfline) # Print the partial line (halfline)
# and the last half # and the last half
print $out_fh @$halfline_ref, substr($buf,0,$i); print $out_fh @$halfline_ref, substr($buf,0,$i);
@ -11439,8 +11475,26 @@ sub print_files($) {
} }
} }
$self->add_returnsize($outputlength); $self->add_returnsize($outputlength);
if($opt::latestline) {
$self->print_latest_line($out_fh);
}
} }
if(defined $self->{'exitstatus'}) { if(defined $self->{'exitstatus'}) {
if($opt::latestline) {
# Print latest line from jobs that are already scrolled by
while($print_later{$minvisible}) {
$print_later{$minvisible}->print_latest_line($out_fh);
delete $print_later{$minvisible};
$minvisible++;
}
# Print latest line from jobs that are on screen now
for(my $row = $minvisible;
$row < $minvisible -1 + ::terminal_rows();
$row++) {
$print_later{$row} and
$print_later{$row}->print_latest_line($out_fh);
}
}
if($opt::files or ($opt::results and not $Global::csvsep)) { if($opt::files or ($opt::results and not $Global::csvsep)) {
$self->add_returnsize(-s $self->fh($fdno,"name")); $self->add_returnsize(-s $self->fh($fdno,"name"));
} else { } else {
@ -11711,11 +11765,13 @@ sub tag($) {
sub untabtag($) { sub untabtag($) {
# tag with \t replaced with spaces # tag with \t replaced with spaces
my $self = shift; my $self = shift;
if(not defined $self->{'untab'}{$self->{'tag'}}) { my $tag = $self->tag();
$self->{'untab'}{$self->{'tag'}} = $self->{'tag'}; if(not defined $self->{'untab'}{$tag}) {
$self->{'untab'}{$self->{'tag'}} =~ s/\t/" "x(8-($-[0]%8))/eg; my $t = $tag;
$t =~ s/\t/" "x(8-($-[0]%8))/eg;
$self->{'untab'}{$tag} = $t;
} }
return $self->{'untab'}{$self->{'tag'}}; return $self->{'untab'}{$tag};
} }
{ {

View file

@ -1580,9 +1580,8 @@ See also: B<--memfree> B<--load>
=item B<--ll> (alpha testing) =item B<--ll> (alpha testing)
Print the lastest line of each running job. Print the lastest line. Each job gets a single line that is updated
with the lastest output from the job.
This only works if the currently running jobs fit on the screen.
Example: Example:
@ -1591,7 +1590,7 @@ Example:
perl -ne '$|=1; for(split//){ print; select($a,$a,$a,0.03);}' perl -ne '$|=1; for(split//){ print; select($a,$a,$a,0.03);}'
} }
export -f slow_seq export -f slow_seq
parallel --shuf -j10 --ll --tag --bar --color slow_seq {} ::: {1..100} parallel --shuf -j99 --ll --tag --bar --color slow_seq {} ::: {1..300}
See also: B<--line-buffer> See also: B<--line-buffer>