parallel: --load implemented but not tested. Testsuite passes.

This commit is contained in:
Ole Tange 2010-12-03 16:48:49 +01:00
parent baba462a0d
commit 0b31661e8e

View file

@ -507,6 +507,11 @@ B<-L> instead.
Implies B<-X> unless B<-m> is set.
=item B<--load>=I<max-load> (unimplemented)
Do not start new jobs unless the load is less than I<max-load>.
=item B<--controlmaster> (experimental)
=item B<-M> (experimental)
@ -2548,10 +2553,6 @@ $Global::JobQueue = JobQueue->new($Global::CommandLineQueue);
init_run_jobs();
my $sem;
if($Global::semaphore) {
# $Global::host{':'}{'max_no_of_running'} must be set
#if(not defined $Global::host{':'}{'max_no_of_running'}) {
#compute_number_of_processes_for_sshlogins();
#}
$sem = acquire_semaphore();
}
$SIG{TERM} = \&start_no_new_jobs;
@ -2626,6 +2627,7 @@ sub get_options_from_array {
"basenamereplace=s" => \$::opt_basenamereplace,
"basenameextensionreplace=s" => \$::opt_basenameextensionreplace,
"jobs|j=s" => \$::opt_P,
"load=s" => \$::opt_load,
"max-line-length-allowed" => \$::opt_max_line_length_allowed,
"number-of-cpus" => \$::opt_number_of_cpus,
"number-of-cores" => \$::opt_number_of_cores,
@ -2820,11 +2822,7 @@ sub parse_options {
print STDERR ("Warning: using -X or -m with --sshlogin may fail\n");
}
# Needs to be done after setting $Global::command and $Global::command_line_max_len
# as '-m' influences the number of commands that needs to be run
if(defined $::opt_P) {
# compute_number_of_processes_for_sshlogins();
} else {
if(not defined $::opt_P) {
for my $sshlogin (values %Global::host) {
$sshlogin->set_max_jobs_running($Global::default_simultaneous_sshlogins);
}
@ -3035,168 +3033,6 @@ sub shell_unquote {
sub __NUMBER_OF_PROCESSES_FILEHANDLES_MAX_LENGTH_OF_COMMAND_LINE__ {}
# Number of parallel processes to run
sub compute_number_of_processes {
# Number of processes wanted and limited by system resources
# Returns:
# Number of processes
my $opt_P = shift;
my $sshlogin = shift;
my $wanted_processes = user_requested_processes($opt_P,$sshlogin);
if(not defined $wanted_processes) {
$wanted_processes = $Global::default_simultaneous_sshlogins;
}
debug("Wanted procs: $wanted_processes\n");
my $system_limit =
processes_available_by_system_limit($wanted_processes,$sshlogin);
debug("Limited to procs: $system_limit\n");
return $system_limit;
}
sub processes_available_by_system_limit {
# If the wanted number of processes is bigger than the system limits:
# Limit them to the system limits
# Limits are: File handles, number of input lines, processes,
# and taking > 1 second to spawn 10 extra processes
# Returns:
# Number of processes
my $wanted_processes = shift;
my $sshlogin = shift;
my $system_limit=0;
my @command_lines=();
my ($next_command_line, $args_ref);
my $more_filehandles=1;
my $max_system_proc_reached=0;
my $slow_spawining_warning_printed=0;
my $time = time;
my %fh;
my @children;
do_not_reap();
# Reserve filehandles
# perl uses 7 filehandles for something?
# parallel uses 1 for memory_usage
for my $i (1..8) {
open($fh{"init-$i"},"</dev/null");
}
while($system_limit < $wanted_processes
and (not $Global::CommandLineQueue->empty() or $Global::semaphore)
and $more_filehandles
and not $max_system_proc_reached) {
$system_limit++;
if(not $Global::semaphore) {
# If there are no more command lines, then we have a process
# per command line, so no need to go further
($next_command_line) = $Global::CommandLineQueue->get();
push(@command_lines, $next_command_line);
}
# Every simultaneous process uses 2 filehandles when grouping
$more_filehandles = open($fh{$system_limit*2},"</dev/null")
&& open($fh{$system_limit*2+1},"</dev/null");
# System process limit
$system_limit % 10 or $time=time;
my $child;
if($child = fork()) {
push (@children,$child);
} elsif(defined $child) {
# The child takes one process slot
# It will be killed later
$SIG{TERM} = $Global::original_sigterm;
sleep 100000;
wait_and_exit(0);
} else {
$max_system_proc_reached = 1;
}
debug("Time to fork ten procs: ", time-$time, " (processes so far: ", $system_limit,")\n");
if(time-$time > 2 and not $slow_spawining_warning_printed) {
# It took more than 2 second to fork ten processes.
# Give the user a warning. He can press Ctrl-C if this
# sucks.
print STDERR ("Warning: Starting 10 extra processes takes > 2 sec.\n",
"Consider adjusting -j. Press CTRL-C to stop.\n");
$slow_spawining_warning_printed = 1;
}
}
if($system_limit < $wanted_processes and not $more_filehandles) {
print STDERR ("Warning: Only enough filehandles to run ",
$system_limit, " jobs in parallel. ",
"Raising ulimit -n may help\n");
}
if($system_limit < $wanted_processes and $max_system_proc_reached) {
print STDERR ("Warning: Only enough available processes to run ",
$system_limit, " jobs in parallel.\n");
}
# Cleanup: Close the files
for (values %fh) { close $_ }
# Cleanup: Kill the children
for my $pid (@children) {
kill 9, $pid;
waitpid($pid,0);
}
#wait();
# Cleanup: Unget the command_lines (and args_refs)
$Global::CommandLineQueue->unget(@command_lines);
if($sshlogin->string() ne ":" and
$system_limit > $Global::default_simultaneous_sshlogins) {
$system_limit =
simultaneous_sshlogin_limit($sshlogin,$system_limit);
}
return $system_limit;
}
sub simultaneous_sshlogin {
# Using $sshlogin try to see if we can do $wanted_processes
# simultaneous logins
# (ssh host echo simultaneouslogin & ssh host echo simultaneouslogin & ...)|grep simul|wc -l
# Returns:
# Number of succesful logins
my $sshlogin = shift;
my $wanted_processes = shift;
my $sshcmd = $sshlogin->sshcommand();
my $serverlogin = $sshlogin->serverlogin();
my $cmd = "$sshcmd $serverlogin echo simultaneouslogin 2>&1 &"x$wanted_processes;
debug("Trying $wanted_processes logins at $serverlogin");
open (SIMUL, "($cmd)|grep simultaneouslogin | wc -l|") or die;
my $ssh_limit = <SIMUL>;
close SIMUL;
chomp $ssh_limit;
return $ssh_limit;
}
sub simultaneous_sshlogin_limit {
# Test by logging in wanted number of times simultaneously
# Returns:
# min($wanted_processes,$working_simultaneous_ssh_logins-1)
my $sshlogin = shift;
my $wanted_processes = shift;
my $sshcmd = $sshlogin->sshcommand();
my $serverlogin = $sshlogin->serverlogin();
# Try twice because it guesses wrong sometimes
# Choose the minimal
my $ssh_limit =
min(simultaneous_sshlogin($sshlogin,$wanted_processes),
simultaneous_sshlogin($sshlogin,$wanted_processes));
if($ssh_limit < $wanted_processes) {
print STDERR
("Warning: ssh to $serverlogin only allows ",
"for $ssh_limit simultaneous logins.\n",
"You may raise this by changing ",
"/etc/ssh/sshd_config:MaxStartup on $serverlogin\n",
"Using only ",$ssh_limit-1," connections ",
"to avoid race conditions\n");
}
# Race condition can cause problem if using all sshs.
if($ssh_limit > 1) { $ssh_limit -= 1; }
return $ssh_limit;
}
sub enough_file_handles {
# check that we have enough filehandles available for starting
@ -3220,194 +3056,6 @@ sub enough_file_handles {
}
}
sub user_requested_processes {
# Parse the number of processes that the user asked for using -j
# Returns:
# the number of processes to run on this sshlogin
my $opt_P = shift;
my $sshlogin = shift;
my $processes;
if(defined $opt_P) {
if($opt_P =~ /^\+(\d+)$/) {
# E.g. -P +2
my $j = $1;
$processes =
$sshlogin->ncpus() + $j;
} elsif ($opt_P =~ /^-(\d+)$/) {
# E.g. -P -2
my $j = $1;
$processes =
$sshlogin->ncpus() - $j;
} elsif ($opt_P =~ /^(\d+)\%$/) {
my $j = $1;
$processes =
$sshlogin->ncpus() * $j / 100;
} elsif ($opt_P =~ /^(\d+)$/) {
$processes = $1;
if($processes == 0) {
# -P 0 = infinity (or at least close)
$processes = 2**31;
}
} elsif (-f $opt_P) {
$Global::max_procs_file = $opt_P;
$Global::max_procs_file_last_mod = (stat($Global::max_procs_file))[9];
if(open(IN, $Global::max_procs_file)) {
my $opt_P_file = join("",<IN>);
close IN;
$processes = user_requested_processes($opt_P_file);
} else {
print STDERR "Cannot open $opt_P\n";
exit(255);
}
} else {
print STDERR "Parsing of --jobs/-j/--max-procs/-P failed\n";
die_usage();
}
if($processes < 1) {
$processes = 1;
}
}
return $processes;
}
sub _no_of_cpus {
# Returns:
# Number of physical CPUs
if(not $Private::no_of_cpus) {
local $/="\n"; # If delimiter is set, then $/ will be wrong
my $no_of_cpus = (no_of_cpus_freebsd()
|| no_of_cpus_darwin()
|| no_of_cpus_solaris()
|| no_of_cpus_gnu_linux()
);
if($no_of_cpus) {
$Private::no_of_cpus = $no_of_cpus;
} else {
warn("Cannot figure out number of cpus. Using 1");
$Private::no_of_cpus = 1;
}
}
return $Private::no_of_cpus;
}
sub _no_of_cores {
# Returns:
# Number of CPU cores
if(not $Private::no_of_cores) {
local $/="\n"; # If delimiter is set, then $/ will be wrong
my $no_of_cores = (no_of_cores_freebsd()
|| no_of_cores_darwin()
|| no_of_cores_solaris()
|| no_of_cores_gnu_linux()
);
if($no_of_cores) {
$Private::no_of_cores = $no_of_cores;
} else {
warn("Cannot figure out number of CPU cores. Using 1");
$Private::no_of_cores = 1;
}
}
return $Private::no_of_cores;
}
sub _no_of_cpus_gnu_linux {
# Returns:
# Number of physical CPUs on GNU/Linux
my $no_of_cpus;
if(-e "/proc/cpuinfo") {
$no_of_cpus = 0;
my %seen;
open(IN,"cat /proc/cpuinfo|") || return undef;
while(<IN>) {
if(/^physical id.*[:](.*)/ and not $seen{$1}++) {
$no_of_cpus++;
}
}
close IN;
}
return $no_of_cpus;
}
sub _no_of_cores_gnu_linux {
# Returns:
# Number of CPU cores on GNU/Linux
my $no_of_cores;
if(-e "/proc/cpuinfo") {
$no_of_cores = 0;
open(IN,"cat /proc/cpuinfo|") || return undef;
while(<IN>) {
/^processor.*[:]/ and $no_of_cores++;
}
close IN;
}
return $no_of_cores;
}
sub _no_of_cpus_darwin {
# Returns:
# Number of physical CPUs on Mac Darwin
my $no_of_cpus = `sysctl -a hw 2>/dev/null | grep -w physicalcpu | awk '{ print \$2 }'`;
return $no_of_cpus;
}
sub _no_of_cores_darwin {
# Returns:
# Number of CPU cores on Mac Darwin
my $no_of_cores = `sysctl -a hw 2>/dev/null | grep -w logicalcpu | awk '{ print \$2 }'`;
return $no_of_cores;
}
sub _no_of_cpus_freebsd {
# Returns:
# Number of physical CPUs on FreeBSD
my $no_of_cpus = `sysctl hw.ncpu 2>/dev/null | awk '{ print \$2 }'`;
return $no_of_cpus;
}
sub _no_of_cores_freebsd {
# Returns:
# Number of CPU cores on FreeBSD
my $no_of_cores = `sysctl -a hw 2>/dev/null | grep -w logicalcpu | awk '{ print \$2 }'`;
return $no_of_cores;
}
sub _no_of_cpus_solaris {
# Returns:
# Number of physical CPUs on Solaris
if(-x "/usr/sbin/psrinfo") {
my @psrinfo = `/usr/sbin/psrinfo`;
if($#psrinfo >= 0) {
return $#psrinfo +1;
}
}
if(-x "/usr/sbin/prtconf") {
my @prtconf = `/usr/sbin/prtconf | grep cpu..instance`;
if($#prtconf >= 0) {
return $#prtconf +1;
}
}
return undef;
}
sub no_of_cores_solaris {
# Returns:
# Number of CPU cores on Solaris
if(-x "/usr/sbin/psrinfo") {
my @psrinfo = `/usr/sbin/psrinfo`;
if($#psrinfo >= 0) {
return $#psrinfo +1;
}
}
if(-x "/usr/sbin/prtconf") {
my @prtconf = `/usr/sbin/prtconf | grep cpu..instance`;
if($#prtconf >= 0) {
return $#prtconf +1;
}
}
return undef;
}
#
# General useful library functions
#
@ -3752,7 +3400,11 @@ sub start_more_jobs {
}
for my $sshlogin (values %Global::host) {
debug("Running jobs on ".$sshlogin->string().": ".$sshlogin->jobs_running()."\n");
while ($sshlogin->jobs_running() < $sshlogin->max_jobs_running()) {
while ($sshlogin->jobs_running() < $sshlogin->max_jobs_running()
and
(($::opt_load and $sshlogin->loadavg() < $sshlogin->max_loadavg())
or
1)) {
if($Global::JobQueue->empty()) {
last;
}
@ -3807,10 +3459,6 @@ sub start_another_job {
}
}
sub __READING_AND_QUOTING_ARGUMENTS__ {}
sub get_job_with_sshlogin {
@ -3867,9 +3515,6 @@ sub get_job_with_sshlogin {
return $job;
}
sub __REMOTE_SSH__ {}
sub read_sshloginfile {
@ -3939,8 +3584,6 @@ sub remote_hosts {
return grep !/^:$/, keys %Global::host;
}
sub setup_basefile {
# Transfer basefiles to each $sshlogin
# This needs to be done before first jobs on $sshlogin is run
@ -4221,7 +3864,6 @@ sub my_size {
}
}
sub my_dump {
# Returns:
# ascii expression of object if Data::Dump(er) is installed
@ -4320,20 +3962,306 @@ sub set_max_jobs_running {
$self->{'max_jobs_running'} = shift;
}
sub loadavg {
my $self = shift;
# TODO add some caching so we do not compute this more than
# once per second
my $uptime = $self->sshcommand() . " " . $self->serverlogin() . " uptime";
my $loadavg;
# load average: 0.76, 1.53, 1.45
if($uptime =~ /load average: (\d+.\d+)/) {
$loadavg = $1;
} else {
die "Cannot find loadaverage from ".$self->string();
}
return $loadavg;
}
sub max_loadavg {
my $self = shift;
if(not defined $self->{'max_loadavg'}) {
$self->{'max_loadavg'} =
$self->compute_max_loadavg($::opt_load);
}
return $self->{'max_loadavg'};
}
sub compute_max_loadavg {
# Parse the max loadaverage that the user asked for using --load
# Returns:
# max loadaverage
my $self = shift;
my $loadspec = shift;
my $load;
if(defined $loadspec) {
if($loadspec =~ /^\+(\d+)$/) {
# E.g. --load +2
my $j = $1;
$load =
$self->ncpus() + $j;
} elsif ($loadspec =~ /^-(\d+)$/) {
# E.g. --load -2
my $j = $1;
$load =
$self->ncpus() - $j;
} elsif ($loadspec =~ /^(\d+)\%$/) {
my $j = $1;
$load =
$self->ncpus() * $j / 100;
} elsif ($loadspec =~ /^(\d+)$/) {
$load = $1;
if($load == 0) {
# --load 0 = infinity (or at least close)
$load = 2**31;
}
} elsif (-f $loadspec) {
# TODO this needs to be done for $loadspec
die;
$Global::max_procs_file = $loadspec;
$Global::max_procs_file_last_mod = (stat($Global::max_procs_file))[9];
if(open(IN, $Global::max_procs_file)) {
my $opt_P_file = join("",<IN>);
close IN;
$load = $self->compute_max_loadavg($opt_P_file);
} else {
print STDERR "Cannot open $loadspec\n";
exit(255);
}
} else {
print STDERR "Parsing of --load failed\n";
::die_usage();
}
if($load < 0.01) {
$load = 0.01;
}
}
return $load;
}
sub max_jobs_running {
my $self = shift;
if(not defined $self->{'max_jobs_running'}) {
$self->{'max_jobs_running'} =
::compute_number_of_processes($::opt_P,$self);
$self->compute_number_of_processes($::opt_P);
}
return $self->{'max_jobs_running'};
}
sub compute_number_of_processes {
# Number of processes wanted and limited by system resources
# Returns:
# Number of processes
my $self = shift;
my $opt_P = shift;
my $wanted_processes = $self->user_requested_processes($opt_P);
if(not defined $wanted_processes) {
$wanted_processes = $Global::default_simultaneous_sshlogins;
}
::debug("Wanted procs: $wanted_processes\n");
my $system_limit =
$self->processes_available_by_system_limit($wanted_processes);
::debug("Limited to procs: $system_limit\n");
return $system_limit;
}
sub processes_available_by_system_limit {
# If the wanted number of processes is bigger than the system limits:
# Limit them to the system limits
# Limits are: File handles, number of input lines, processes,
# and taking > 1 second to spawn 10 extra processes
# Returns:
# Number of processes
my $self = shift;
my $wanted_processes = shift;
my $system_limit=0;
my @command_lines=();
my ($next_command_line, $args_ref);
my $more_filehandles=1;
my $max_system_proc_reached=0;
my $slow_spawining_warning_printed=0;
my $time = time;
my %fh;
my @children;
::do_not_reap();
# Reserve filehandles
# perl uses 7 filehandles for something?
# parallel uses 1 for memory_usage
for my $i (1..8) {
open($fh{"init-$i"},"</dev/null");
}
while($system_limit < $wanted_processes
and (not $Global::CommandLineQueue->empty() or $Global::semaphore)
and $more_filehandles
and not $max_system_proc_reached) {
$system_limit++;
if(not $Global::semaphore) {
# If there are no more command lines, then we have a process
# per command line, so no need to go further
($next_command_line) = $Global::CommandLineQueue->get();
push(@command_lines, $next_command_line);
}
# Every simultaneous process uses 2 filehandles when grouping
$more_filehandles = open($fh{$system_limit*2},"</dev/null")
&& open($fh{$system_limit*2+1},"</dev/null");
# System process limit
$system_limit % 10 or $time=time;
my $child;
if($child = fork()) {
push (@children,$child);
} elsif(defined $child) {
# The child takes one process slot
# It will be killed later
$SIG{TERM} = $Global::original_sigterm;
sleep 100000;
wait_and_exit(0);
} else {
$max_system_proc_reached = 1;
}
::debug("Time to fork ten procs: ", time-$time, " (processes so far: ", $system_limit,")\n");
if(time-$time > 2 and not $slow_spawining_warning_printed) {
# It took more than 2 second to fork ten processes.
# Give the user a warning. He can press Ctrl-C if this
# sucks.
print STDERR ("Warning: Starting 10 extra processes takes > 2 sec.\n",
"Consider adjusting -j. Press CTRL-C to stop.\n");
$slow_spawining_warning_printed = 1;
}
}
if($system_limit < $wanted_processes and not $more_filehandles) {
print STDERR ("Warning: Only enough filehandles to run ",
$system_limit, " jobs in parallel. ",
"Raising ulimit -n may help\n");
}
if($system_limit < $wanted_processes and $max_system_proc_reached) {
print STDERR ("Warning: Only enough available processes to run ",
$system_limit, " jobs in parallel.\n");
}
# Cleanup: Close the files
for (values %fh) { close $_ }
# Cleanup: Kill the children
for my $pid (@children) {
kill 9, $pid;
waitpid($pid,0);
}
#wait();
# Cleanup: Unget the command_lines (and args_refs)
$Global::CommandLineQueue->unget(@command_lines);
if($self->string() ne ":" and
$system_limit > $Global::default_simultaneous_sshlogins) {
$system_limit =
$self->simultaneous_sshlogin_limit($system_limit);
}
return $system_limit;
}
sub simultaneous_sshlogin_limit {
# Test by logging in wanted number of times simultaneously
# Returns:
# min($wanted_processes,$working_simultaneous_ssh_logins-1)
my $self = shift;
my $wanted_processes = shift;
# Try twice because it guesses wrong sometimes
# Choose the minimal
my $ssh_limit =
::min($self->simultaneous_sshlogin($wanted_processes),
$self->simultaneous_sshlogin($wanted_processes));
if($ssh_limit < $wanted_processes) {
my $serverlogin = $self->serverlogin();
print STDERR
("Warning: ssh to $serverlogin only allows ",
"for $ssh_limit simultaneous logins.\n",
"You may raise this by changing ",
"/etc/ssh/sshd_config:MaxStartup on $serverlogin\n",
"Using only ",$ssh_limit-1," connections ",
"to avoid race conditions\n");
}
# Race condition can cause problem if using all sshs.
if($ssh_limit > 1) { $ssh_limit -= 1; }
return $ssh_limit;
}
sub simultaneous_sshlogin {
# Using $sshlogin try to see if we can do $wanted_processes
# simultaneous logins
# (ssh host echo simultaneouslogin & ssh host echo simultaneouslogin & ...)|grep simul|wc -l
# Returns:
# Number of succesful logins
my $self = shift;
my $wanted_processes = shift;
my $sshcmd = $self->sshcommand();
my $serverlogin = $self->serverlogin();
my $cmd = "$sshcmd $serverlogin echo simultaneouslogin 2>&1 &"x$wanted_processes;
::debug("Trying $wanted_processes logins at $serverlogin");
open (SIMUL, "($cmd)|grep simultaneouslogin | wc -l|") or die;
my $ssh_limit = <SIMUL>;
close SIMUL;
chomp $ssh_limit;
return $ssh_limit;
}
sub set_ncpus {
my $self = shift;
$self->{'ncpus'} = shift;
}
sub user_requested_processes {
# Parse the number of processes that the user asked for using -j
# Returns:
# the number of processes to run on this sshlogin
my $self = shift;
my $opt_P = shift;
my $processes;
if(defined $opt_P) {
if($opt_P =~ /^\+(\d+)$/) {
# E.g. -P +2
my $j = $1;
$processes =
$self->ncpus() + $j;
} elsif ($opt_P =~ /^-(\d+)$/) {
# E.g. -P -2
my $j = $1;
$processes =
$self->ncpus() - $j;
} elsif ($opt_P =~ /^(\d+)\%$/) {
my $j = $1;
$processes =
$self->ncpus() * $j / 100;
} elsif ($opt_P =~ /^(\d+)$/) {
$processes = $1;
if($processes == 0) {
# -P 0 = infinity (or at least close)
$processes = 2**31;
}
} elsif (-f $opt_P) {
$Global::max_procs_file = $opt_P;
$Global::max_procs_file_last_mod = (stat($Global::max_procs_file))[9];
if(open(IN, $Global::max_procs_file)) {
my $opt_P_file = join("",<IN>);
close IN;
$processes = $self->user_requested_processes($opt_P_file);
} else {
print STDERR "Cannot open $opt_P\n";
exit(255);
}
} else {
print STDERR "Parsing of --jobs/-j/--max-procs/-P failed\n";
::die_usage();
}
if($processes < 1) {
$processes = 1;
}
}
return $processes;
}
sub ncpus {
my $self = shift;
if(not defined $self->{'ncpus'}) {