parallel: Calculation of total_jobs() make better.

parallel: Buffer size for --linebuffer increased for better performance.
parallel: --(n)onall now respects --memfree.
This commit is contained in:
Ole Tange 2019-01-19 02:20:44 +01:00
parent 4234c24658
commit 8700e957fc
3 changed files with 51 additions and 34 deletions

View file

@ -3014,7 +3014,7 @@ sub progress() {
{ {
my ($total, $first_completed, $smoothed_avg_time, $last_eta); my ($first_completed, $smoothed_avg_time, $last_eta);
sub compute_eta { sub compute_eta {
# Calculate important numbers for ETA # Calculate important numbers for ETA
@ -3025,13 +3025,14 @@ sub progress() {
# $pctcomplete = percent of jobs completed # $pctcomplete = percent of jobs completed
# $avgtime = averaged time # $avgtime = averaged time
# $eta = smoothed eta # $eta = smoothed eta
$total = $Global::JobQueue->total_jobs();
my $completed = $Global::total_completed; my $completed = $Global::total_completed;
# In rare cases with -X will $completed > total_jobs()
my $total = ::max($Global::JobQueue->total_jobs(),$completed);
my $left = $total - $completed; my $left = $total - $completed;
if(not $completed) { if(not $completed) {
return($total, $completed, $left, 0, 0, 0); return($total, $completed, $left, 0, 0, 0);
} }
my $pctcomplete = $completed / $total; my $pctcomplete = ::min($completed / $total,100);
$first_completed ||= time; $first_completed ||= time;
my $timepassed = (time - $first_completed); my $timepassed = (time - $first_completed);
my $avgtime = $timepassed / $completed; my $avgtime = $timepassed / $completed;
@ -3771,6 +3772,7 @@ sub onall($@) {
# -P should only go to the first, and -S should not be copied at all. # -P should only go to the first, and -S should not be copied at all.
my $options = my $options =
join(" ", join(" ",
((defined $opt::memfree) ? "--memfree ".$opt::memfree : ""),
((defined $opt::D) ? "-D $opt::D" : ""), ((defined $opt::D) ? "-D $opt::D" : ""),
((defined $opt::group) ? "-g" : ""), ((defined $opt::group) ? "-g" : ""),
((defined $opt::jobs) ? "-P $opt::jobs" : ""), ((defined $opt::jobs) ? "-P $opt::jobs" : ""),
@ -5748,8 +5750,13 @@ sub set_last_login_at($$) {
sub loadavg_too_high($) { sub loadavg_too_high($) {
my $self = shift; my $self = shift;
my $loadavg = $self->loadavg(); my $loadavg = $self->loadavg();
return (not defined $loadavg or if(defined $loadavg) {
$loadavg > $self->max_loadavg()); ::debug("load", "Load $loadavg > ",$self->max_loadavg());
return $loadavg >= $self->max_loadavg();
} else {
# Unknown load: Assume load is too high
return 1;
}
} }
{ {
@ -5878,7 +5885,7 @@ sub loadavg($) {
} }
# As the command can take long to run if run remote # As the command can take long to run if run remote
# save it to a tmp file before moving it to the correct file # save it to a tmp file before moving it to the correct file
::debug("load", "Cmd: ", $cmd,"\n"); ::debug("load", "Update load\n");
my $file = $self->{'loadavg_file'}; my $file = $self->{'loadavg_file'};
# tmpfile on same filesystem as $file # tmpfile on same filesystem as $file
my $tmpfile = $file.$$; my $tmpfile = $file.$$;
@ -7110,11 +7117,16 @@ sub total_jobs($) {
} }
} }
$record_queue->unget(@arg_records); $record_queue->unget(@arg_records);
$self->{'total_jobs'} = # $#arg_records = number of args - 1
::ceil((1+$#arg_records+$self->{'this_job_no'}) # We have read one @arg_record for this job (so add 1 more)
/ ::max($Global::max_number_of_args,1)); my $num_args = $#arg_records + 2;
# This jobs is not started so -1
my $started_jobs = $self->{'this_job_no'} - 1;
my $max_args = ::max($Global::max_number_of_args,1);
$self->{'total_jobs'} = ::ceil($num_args / $max_args)
+ $started_jobs;
::debug("init","Total jobs: ".$self->{'total_jobs'}. ::debug("init","Total jobs: ".$self->{'total_jobs'}.
" (".(1+$#arg_records)."+".$self->{'this_job_no'}.")\n"); " ($num_args/$max_args + $started_jobs)\n");
} }
} }
return $self->{'total_jobs'}; return $self->{'total_jobs'};
@ -7123,6 +7135,7 @@ sub total_jobs($) {
sub flush_total_jobs($) { sub flush_total_jobs($) {
# Unset total_jobs to force recomputing # Unset total_jobs to force recomputing
my $self = shift; my $self = shift;
::debug("init","flush Total jobs: ");
$self->{'total_jobs'} = undef; $self->{'total_jobs'} = undef;
} }
@ -9181,7 +9194,9 @@ sub print_linebuffer($) {
my $outputlength = 0; my $outputlength = 0;
my $halfline_ref = $self->{'halfline'}{$fdno}; my $halfline_ref = $self->{'halfline'}{$fdno};
my ($buf,$i,$rv); my ($buf,$i,$rv);
while($rv = sysread($in_fh, $buf, 131072)) { # 1310720 gives 1.2 GB/s
# 131072 gives 0.9 GB/s
while($rv = sysread($in_fh, $buf,1310720)) {
$outputlength += $rv; $outputlength += $rv;
# TODO --recend # TODO --recend
# Treat both \n and \r as line end # Treat both \n and \r as line end
@ -9191,7 +9206,7 @@ sub print_linebuffer($) {
if($opt::tag or defined $opt::tagstring) { if($opt::tag or defined $opt::tagstring) {
# Replace ^ with $tag within the full line # Replace ^ with $tag within the full line
my $tag = $self->tag(); my $tag = $self->tag();
# TODO --recend # TODO --recend that can be partially in @$halfline_ref
substr($buf,0,$i-1) =~ s/(?<=[\n\r])/$tag/gm; substr($buf,0,$i-1) =~ s/(?<=[\n\r])/$tag/gm;
# The length changed, so find the new ending pos # The length changed, so find the new ending pos
$i = (rindex($buf,"\n")+1) || (rindex($buf,"\r")+1); $i = (rindex($buf,"\n")+1) || (rindex($buf,"\r")+1);
@ -10712,7 +10727,7 @@ sub get($) {
sub unget($) { sub unget($) {
my $self = shift; my $self = shift;
::debug("run", "RecordQueue-unget '@_'\n"); ::debug("run", "RecordQueue-unget\n");
$self->{'arg_number'} -= @_; $self->{'arg_number'} -= @_;
unshift @{$self->{'unget'}}, @_; unshift @{$self->{'unget'}}, @_;
} }

View file

@ -10,7 +10,7 @@ P="$P_ALL"
# tru64 takes 22s to run 4 parallels # tru64 takes 22s to run 4 parallels
MAXTIME=50 MAXTIME=50
RETRIES=2 RETRIES=3
MAXPROC=${maxproc:-11} MAXPROC=${maxproc:-11}
MAXINNERPROC=${maxinnerproc:-3} MAXINNERPROC=${maxinnerproc:-3}
@ -31,16 +31,17 @@ doit() {
export MAXTIME export MAXTIME
export RETRIES export RETRIES
export MAXPROC export MAXPROC
export RET_TIME_K="-k --retries $RETRIES --timeout $MAXTIME" export RET_TIME_K="--memfree 150m -k --retries $RETRIES --timeout $MAXTIME"
LC_ALL=C LC_ALL=C
MAXPROC=$(echo $(seq 300 | parallel -j0 echo {%} | sort -n | tail -n1) /$MAXINNERPROC | bc)
echo MAXTIME=$MAXTIME RETRIES=$RETRIES MAXPROC=$MAXPROC MAXINNERPROC=$MAXINNERPROC echo MAXTIME=$MAXTIME RETRIES=$RETRIES MAXPROC=$MAXPROC MAXINNERPROC=$MAXINNERPROC
echo '### Filter out working servers' echo '### Filter out working servers'
# syllable often gives false positive # syllable often gives false positive
parallel --timeout $MAXTIME -j10 ssh syllable true ::: {1..10} 2>/dev/null >/dev/null & parallel --timeout $MAXTIME -j10 ssh syllable true ::: {1..10} 2>/dev/null >/dev/null &
POLAR_ALL="`bin/parallel -j0 -k --timeout 10 echo {} ::: $P`" POLAR_ALL="`bin/parallel --memfree 200m -j0 -k --timeout 10 echo {} ::: $P`"
POLAR="`bin/parallel -j0 -k --timeout 10 $PARALLEL_SSH {} echo {} ::: $P`" POLAR="`bin/parallel --memfree 200m -j0 -k --timeout 10 $PARALLEL_SSH {} echo {} ::: $P`"
diff <(echo "$POLAR_ALL") <(echo "$POLAR") diff <(echo "$POLAR_ALL") <(echo "$POLAR")
S_POLAR=`bin/parallel -j0 $RET_TIME_K echo -S 1/{} ::: $POLAR` S_POLAR=`bin/parallel -j0 $RET_TIME_K echo -S 1/{} ::: $POLAR`
@ -73,8 +74,9 @@ doit() {
export -f par_nonall export -f par_nonall
echo '### Copy commands to servers' echo '### Copy commands to servers'
# Dont copy stdout - it depends on /bin/bash
env_parallel -vj$MAXPROC $RET_TIME_K --delay 0.03 --tag copy {2} {1} {1/} \ env_parallel -vj$MAXPROC $RET_TIME_K --delay 0.03 --tag copy {2} {1} {1/} \
::: bin/{parallel,env_parallel,env_parallel.*[^~],parcat,stdout} \ ::: bin/{parallel,env_parallel,env_parallel.*[^~],parcat} \
::: $POLAR minix ::: $POLAR minix
echo Done copying echo Done copying

View file

@ -1,4 +1,4 @@
MAXTIME=50 RETRIES=2 MAXPROC=11 MAXINNERPROC=3 MAXTIME=50 RETRIES=3 MAXPROC=100 MAXINNERPROC=3
### Filter out working servers ### Filter out working servers
1d0 1d0
< openstep < openstep
@ -453,7 +453,7 @@ hpux Works on hpux64
hpux-ia64 Works on hpux-ia64 hpux-ia64 Works on hpux-ia64
hurd Works on hurd hurd Works on hurd
irix /usr/freeware/bin/bash: line 1: /home/t/tange/setupenv: No such file or directory irix /usr/freeware/bin/bash: line 1: /home/t/tange/setupenv: No such file or directory
irix Unknown open() mode '>&' at /home/t/tange/bin/parallel line 2458. irix Unknown open() mode '>&' at /home/t/tange/bin/parallel line 2358.
macosx Works on macosx.polarhome.com macosx Works on macosx.polarhome.com
mandriva Works on mandriva.polarhome.com mandriva Works on mandriva.polarhome.com
miros Works on miros.polarhome.com miros Works on miros.polarhome.com
@ -499,8 +499,8 @@ hpux-ia64 1
hurd hurd
hurd 1 hurd 1
irix /usr/freeware/bin/bash: line 1: /home/t/tange/setupenv: No such file or directory irix /usr/freeware/bin/bash: line 1: /home/t/tange/setupenv: No such file or directory
irix Unknown open() mode '>&' at /home/t/tange/bin/parallel line 2458. irix Unknown open() mode '>&' at /home/t/tange/bin/parallel line 2358.
irix Unknown open() mode '>&' at /home/t/tange/bin/parallel line 2458. irix Unknown open() mode '>&' at /home/t/tange/bin/parallel line 2358.
macosx 2 macosx 2
macosx 2 macosx 2
mandriva 1 mandriva 1
@ -557,8 +557,8 @@ hpux-ia64 1
hurd 1 hurd 1
hurd 1 hurd 1
irix /usr/freeware/bin/bash: line 1: /home/t/tange/setupenv: No such file or directory irix /usr/freeware/bin/bash: line 1: /home/t/tange/setupenv: No such file or directory
irix Unknown open() mode '>&' at /home/t/tange/bin/parallel line 2458. irix Unknown open() mode '>&' at /home/t/tange/bin/parallel line 2358.
irix Unknown open() mode '>&' at /home/t/tange/bin/parallel line 2458. irix Unknown open() mode '>&' at /home/t/tange/bin/parallel line 2358.
macosx 2 macosx 2
macosx 2 macosx 2
mandriva 1 mandriva 1
@ -656,7 +656,7 @@ scosysv OK readonly tmp
solaris Error in tempfile() using /XXXXXXXX.arg: Parent directory (/) is not writable solaris Error in tempfile() using /XXXXXXXX.arg: Parent directory (/) is not writable
solaris at /home/t/tange/bin/parallel line 0000 solaris at /home/t/tange/bin/parallel line 0000
solaris OK readonly tmp solaris OK readonly tmp
solaris-x86 Error in tempfile() using /XXXXXXXX.arg: Could not create temp file /XXXXXXXX.arg: Permission denied at /home/tange/bin/parallel line 0000 solaris-x86 Error in tempfile() using /XXXXXXXX.arg: Could not create temp file /XXXXXXXX.arg: Permission denied at ~/bin/parallel line 0000
solaris-x86 OK readonly tmp solaris-x86 OK readonly tmp
suse Error in tempfile() using /XXXXXXXX.arg: Could not create temp file /XXXXXXXX.arg: Permission denied at /home/t/tange/bin/parallel line 0000. suse Error in tempfile() using /XXXXXXXX.arg: Could not create temp file /XXXXXXXX.arg: Permission denied at /home/t/tange/bin/parallel line 0000.
suse OK readonly tmp suse OK readonly tmp
@ -1043,15 +1043,15 @@ solaris /home/t/tange/.cshrc
solaris /home/t/tange/.tcshrc solaris /home/t/tange/.tcshrc
solaris install-OK solaris install-OK
solaris-x86 Installed env_parallel in: solaris-x86 Installed env_parallel in:
solaris-x86 /home/tange/.bashrc solaris-x86 ~/.bashrc
solaris-x86 /home/tange/.shrc solaris-x86 ~/.shrc
solaris-x86 /home/tange/.zshenv solaris-x86 ~/.zshenv
solaris-x86 /home/tange/.config/fish/config.fish solaris-x86 ~/.config/fish/config.fish
solaris-x86 /home/tange/.kshrc solaris-x86 ~/.kshrc
solaris-x86 /home/tange/.mkshrc solaris-x86 ~/.mkshrc
solaris-x86 /home/tange/.profile solaris-x86 ~/.profile
solaris-x86 /home/tange/.cshrc solaris-x86 ~/.cshrc
solaris-x86 /home/tange/.tcshrc solaris-x86 ~/.tcshrc
solaris-x86 install-OK solaris-x86 install-OK
sshwithpass minix Installed env_parallel in: sshwithpass minix Installed env_parallel in:
sshwithpass minix /home/t/tange/.bashrc sshwithpass minix /home/t/tange/.bashrc