From 0b31661e8e6a94aeb7bc9af030ebf943120eb0d8 Mon Sep 17 00:00:00 2001 From: Ole Tange Date: Fri, 3 Dec 2010 16:48:49 +0100 Subject: [PATCH] parallel: --load implemented but not tested. Testsuite passes. --- src/parallel | 670 +++++++++++++++++++++++---------------------------- 1 file changed, 299 insertions(+), 371 deletions(-) diff --git a/src/parallel b/src/parallel index d1ddfdcd..15ba736b 100755 --- a/src/parallel +++ b/src/parallel @@ -507,6 +507,11 @@ B<-L> instead. Implies B<-X> unless B<-m> is set. +=item B<--load>=I (unimplemented) + +Do not start new jobs unless the load is less than I. + + =item B<--controlmaster> (experimental) =item B<-M> (experimental) @@ -2548,10 +2553,6 @@ $Global::JobQueue = JobQueue->new($Global::CommandLineQueue); init_run_jobs(); my $sem; if($Global::semaphore) { - # $Global::host{':'}{'max_no_of_running'} must be set - #if(not defined $Global::host{':'}{'max_no_of_running'}) { - #compute_number_of_processes_for_sshlogins(); - #} $sem = acquire_semaphore(); } $SIG{TERM} = \&start_no_new_jobs; @@ -2626,6 +2627,7 @@ sub get_options_from_array { "basenamereplace=s" => \$::opt_basenamereplace, "basenameextensionreplace=s" => \$::opt_basenameextensionreplace, "jobs|j=s" => \$::opt_P, + "load=s" => \$::opt_load, "max-line-length-allowed" => \$::opt_max_line_length_allowed, "number-of-cpus" => \$::opt_number_of_cpus, "number-of-cores" => \$::opt_number_of_cores, @@ -2820,11 +2822,7 @@ sub parse_options { print STDERR ("Warning: using -X or -m with --sshlogin may fail\n"); } - # Needs to be done after setting $Global::command and $Global::command_line_max_len - # as '-m' influences the number of commands that needs to be run - if(defined $::opt_P) { -# compute_number_of_processes_for_sshlogins(); - } else { + if(not defined $::opt_P) { for my $sshlogin (values %Global::host) { $sshlogin->set_max_jobs_running($Global::default_simultaneous_sshlogins); } @@ -3035,168 +3033,6 @@ sub shell_unquote { sub __NUMBER_OF_PROCESSES_FILEHANDLES_MAX_LENGTH_OF_COMMAND_LINE__ {} -# Number of parallel processes to run - - -sub compute_number_of_processes { - # Number of processes wanted and limited by system resources - # Returns: - # Number of processes - my $opt_P = shift; - my $sshlogin = shift; - my $wanted_processes = user_requested_processes($opt_P,$sshlogin); - if(not defined $wanted_processes) { - $wanted_processes = $Global::default_simultaneous_sshlogins; - } - debug("Wanted procs: $wanted_processes\n"); - my $system_limit = - processes_available_by_system_limit($wanted_processes,$sshlogin); - debug("Limited to procs: $system_limit\n"); - return $system_limit; -} - -sub processes_available_by_system_limit { - # If the wanted number of processes is bigger than the system limits: - # Limit them to the system limits - # Limits are: File handles, number of input lines, processes, - # and taking > 1 second to spawn 10 extra processes - # Returns: - # Number of processes - - my $wanted_processes = shift; - my $sshlogin = shift; - my $system_limit=0; - my @command_lines=(); - my ($next_command_line, $args_ref); - my $more_filehandles=1; - my $max_system_proc_reached=0; - my $slow_spawining_warning_printed=0; - my $time = time; - my %fh; - my @children; - do_not_reap(); - - # Reserve filehandles - # perl uses 7 filehandles for something? - # parallel uses 1 for memory_usage - for my $i (1..8) { - open($fh{"init-$i"},"empty() or $Global::semaphore) - and $more_filehandles - and not $max_system_proc_reached) { - $system_limit++; - - if(not $Global::semaphore) { - # If there are no more command lines, then we have a process - # per command line, so no need to go further - - ($next_command_line) = $Global::CommandLineQueue->get(); - push(@command_lines, $next_command_line); - } - - # Every simultaneous process uses 2 filehandles when grouping - $more_filehandles = open($fh{$system_limit*2}," 2 and not $slow_spawining_warning_printed) { - # It took more than 2 second to fork ten processes. - # Give the user a warning. He can press Ctrl-C if this - # sucks. - print STDERR ("Warning: Starting 10 extra processes takes > 2 sec.\n", - "Consider adjusting -j. Press CTRL-C to stop.\n"); - $slow_spawining_warning_printed = 1; - } - } - if($system_limit < $wanted_processes and not $more_filehandles) { - print STDERR ("Warning: Only enough filehandles to run ", - $system_limit, " jobs in parallel. ", - "Raising ulimit -n may help\n"); - } - if($system_limit < $wanted_processes and $max_system_proc_reached) { - print STDERR ("Warning: Only enough available processes to run ", - $system_limit, " jobs in parallel.\n"); - } - # Cleanup: Close the files - for (values %fh) { close $_ } - # Cleanup: Kill the children - for my $pid (@children) { - kill 9, $pid; - waitpid($pid,0); - } - - #wait(); - # Cleanup: Unget the command_lines (and args_refs) - $Global::CommandLineQueue->unget(@command_lines); - if($sshlogin->string() ne ":" and - $system_limit > $Global::default_simultaneous_sshlogins) { - $system_limit = - simultaneous_sshlogin_limit($sshlogin,$system_limit); - } - return $system_limit; -} - -sub simultaneous_sshlogin { - # Using $sshlogin try to see if we can do $wanted_processes - # simultaneous logins - # (ssh host echo simultaneouslogin & ssh host echo simultaneouslogin & ...)|grep simul|wc -l - # Returns: - # Number of succesful logins - my $sshlogin = shift; - my $wanted_processes = shift; - my $sshcmd = $sshlogin->sshcommand(); - my $serverlogin = $sshlogin->serverlogin(); - my $cmd = "$sshcmd $serverlogin echo simultaneouslogin 2>&1 &"x$wanted_processes; - debug("Trying $wanted_processes logins at $serverlogin"); - open (SIMUL, "($cmd)|grep simultaneouslogin | wc -l|") or die; - my $ssh_limit = ; - close SIMUL; - chomp $ssh_limit; - return $ssh_limit; -} - -sub simultaneous_sshlogin_limit { - # Test by logging in wanted number of times simultaneously - # Returns: - # min($wanted_processes,$working_simultaneous_ssh_logins-1) - my $sshlogin = shift; - my $wanted_processes = shift; - my $sshcmd = $sshlogin->sshcommand(); - my $serverlogin = $sshlogin->serverlogin(); - # Try twice because it guesses wrong sometimes - # Choose the minimal - my $ssh_limit = - min(simultaneous_sshlogin($sshlogin,$wanted_processes), - simultaneous_sshlogin($sshlogin,$wanted_processes)); - if($ssh_limit < $wanted_processes) { - print STDERR - ("Warning: ssh to $serverlogin only allows ", - "for $ssh_limit simultaneous logins.\n", - "You may raise this by changing ", - "/etc/ssh/sshd_config:MaxStartup on $serverlogin\n", - "Using only ",$ssh_limit-1," connections ", - "to avoid race conditions\n"); - } - # Race condition can cause problem if using all sshs. - if($ssh_limit > 1) { $ssh_limit -= 1; } - return $ssh_limit; -} sub enough_file_handles { # check that we have enough filehandles available for starting @@ -3220,194 +3056,6 @@ sub enough_file_handles { } } -sub user_requested_processes { - # Parse the number of processes that the user asked for using -j - # Returns: - # the number of processes to run on this sshlogin - my $opt_P = shift; - my $sshlogin = shift; - my $processes; - if(defined $opt_P) { - if($opt_P =~ /^\+(\d+)$/) { - # E.g. -P +2 - my $j = $1; - $processes = - $sshlogin->ncpus() + $j; - } elsif ($opt_P =~ /^-(\d+)$/) { - # E.g. -P -2 - my $j = $1; - $processes = - $sshlogin->ncpus() - $j; - } elsif ($opt_P =~ /^(\d+)\%$/) { - my $j = $1; - $processes = - $sshlogin->ncpus() * $j / 100; - } elsif ($opt_P =~ /^(\d+)$/) { - $processes = $1; - if($processes == 0) { - # -P 0 = infinity (or at least close) - $processes = 2**31; - } - } elsif (-f $opt_P) { - $Global::max_procs_file = $opt_P; - $Global::max_procs_file_last_mod = (stat($Global::max_procs_file))[9]; - if(open(IN, $Global::max_procs_file)) { - my $opt_P_file = join("",); - close IN; - $processes = user_requested_processes($opt_P_file); - } else { - print STDERR "Cannot open $opt_P\n"; - exit(255); - } - } else { - print STDERR "Parsing of --jobs/-j/--max-procs/-P failed\n"; - die_usage(); - } - if($processes < 1) { - $processes = 1; - } - } - return $processes; -} - - -sub _no_of_cpus { - # Returns: - # Number of physical CPUs - if(not $Private::no_of_cpus) { - local $/="\n"; # If delimiter is set, then $/ will be wrong - my $no_of_cpus = (no_of_cpus_freebsd() - || no_of_cpus_darwin() - || no_of_cpus_solaris() - || no_of_cpus_gnu_linux() - ); - if($no_of_cpus) { - $Private::no_of_cpus = $no_of_cpus; - } else { - warn("Cannot figure out number of cpus. Using 1"); - $Private::no_of_cpus = 1; - } - } - return $Private::no_of_cpus; -} - -sub _no_of_cores { - # Returns: - # Number of CPU cores - if(not $Private::no_of_cores) { - local $/="\n"; # If delimiter is set, then $/ will be wrong - my $no_of_cores = (no_of_cores_freebsd() - || no_of_cores_darwin() - || no_of_cores_solaris() - || no_of_cores_gnu_linux() - ); - if($no_of_cores) { - $Private::no_of_cores = $no_of_cores; - } else { - warn("Cannot figure out number of CPU cores. Using 1"); - $Private::no_of_cores = 1; - } - } - return $Private::no_of_cores; -} - -sub _no_of_cpus_gnu_linux { - # Returns: - # Number of physical CPUs on GNU/Linux - my $no_of_cpus; - if(-e "/proc/cpuinfo") { - $no_of_cpus = 0; - my %seen; - open(IN,"cat /proc/cpuinfo|") || return undef; - while() { - if(/^physical id.*[:](.*)/ and not $seen{$1}++) { - $no_of_cpus++; - } - } - close IN; - } - return $no_of_cpus; -} - -sub _no_of_cores_gnu_linux { - # Returns: - # Number of CPU cores on GNU/Linux - my $no_of_cores; - if(-e "/proc/cpuinfo") { - $no_of_cores = 0; - open(IN,"cat /proc/cpuinfo|") || return undef; - while() { - /^processor.*[:]/ and $no_of_cores++; - } - close IN; - } - return $no_of_cores; -} - -sub _no_of_cpus_darwin { - # Returns: - # Number of physical CPUs on Mac Darwin - my $no_of_cpus = `sysctl -a hw 2>/dev/null | grep -w physicalcpu | awk '{ print \$2 }'`; - return $no_of_cpus; -} - -sub _no_of_cores_darwin { - # Returns: - # Number of CPU cores on Mac Darwin - my $no_of_cores = `sysctl -a hw 2>/dev/null | grep -w logicalcpu | awk '{ print \$2 }'`; - return $no_of_cores; -} - -sub _no_of_cpus_freebsd { - # Returns: - # Number of physical CPUs on FreeBSD - my $no_of_cpus = `sysctl hw.ncpu 2>/dev/null | awk '{ print \$2 }'`; - return $no_of_cpus; -} - -sub _no_of_cores_freebsd { - # Returns: - # Number of CPU cores on FreeBSD - my $no_of_cores = `sysctl -a hw 2>/dev/null | grep -w logicalcpu | awk '{ print \$2 }'`; - return $no_of_cores; -} - -sub _no_of_cpus_solaris { - # Returns: - # Number of physical CPUs on Solaris - if(-x "/usr/sbin/psrinfo") { - my @psrinfo = `/usr/sbin/psrinfo`; - if($#psrinfo >= 0) { - return $#psrinfo +1; - } - } - if(-x "/usr/sbin/prtconf") { - my @prtconf = `/usr/sbin/prtconf | grep cpu..instance`; - if($#prtconf >= 0) { - return $#prtconf +1; - } - } - return undef; -} - -sub no_of_cores_solaris { - # Returns: - # Number of CPU cores on Solaris - if(-x "/usr/sbin/psrinfo") { - my @psrinfo = `/usr/sbin/psrinfo`; - if($#psrinfo >= 0) { - return $#psrinfo +1; - } - } - if(-x "/usr/sbin/prtconf") { - my @prtconf = `/usr/sbin/prtconf | grep cpu..instance`; - if($#prtconf >= 0) { - return $#prtconf +1; - } - } - return undef; -} - # # General useful library functions # @@ -3752,7 +3400,11 @@ sub start_more_jobs { } for my $sshlogin (values %Global::host) { debug("Running jobs on ".$sshlogin->string().": ".$sshlogin->jobs_running()."\n"); - while ($sshlogin->jobs_running() < $sshlogin->max_jobs_running()) { + while ($sshlogin->jobs_running() < $sshlogin->max_jobs_running() + and + (($::opt_load and $sshlogin->loadavg() < $sshlogin->max_loadavg()) + or + 1)) { if($Global::JobQueue->empty()) { last; } @@ -3807,10 +3459,6 @@ sub start_another_job { } } - - - - sub __READING_AND_QUOTING_ARGUMENTS__ {} sub get_job_with_sshlogin { @@ -3867,9 +3515,6 @@ sub get_job_with_sshlogin { return $job; } - - - sub __REMOTE_SSH__ {} sub read_sshloginfile { @@ -3939,8 +3584,6 @@ sub remote_hosts { return grep !/^:$/, keys %Global::host; } - - sub setup_basefile { # Transfer basefiles to each $sshlogin # This needs to be done before first jobs on $sshlogin is run @@ -4221,7 +3864,6 @@ sub my_size { } } - sub my_dump { # Returns: # ascii expression of object if Data::Dump(er) is installed @@ -4320,20 +3962,306 @@ sub set_max_jobs_running { $self->{'max_jobs_running'} = shift; } +sub loadavg { + my $self = shift; + # TODO add some caching so we do not compute this more than + # once per second + my $uptime = $self->sshcommand() . " " . $self->serverlogin() . " uptime"; + my $loadavg; + # load average: 0.76, 1.53, 1.45 + if($uptime =~ /load average: (\d+.\d+)/) { + $loadavg = $1; + } else { + die "Cannot find loadaverage from ".$self->string(); + } + return $loadavg; +} + +sub max_loadavg { + my $self = shift; + if(not defined $self->{'max_loadavg'}) { + $self->{'max_loadavg'} = + $self->compute_max_loadavg($::opt_load); + } + return $self->{'max_loadavg'}; +} + +sub compute_max_loadavg { + # Parse the max loadaverage that the user asked for using --load + # Returns: + # max loadaverage + my $self = shift; + my $loadspec = shift; + my $load; + if(defined $loadspec) { + if($loadspec =~ /^\+(\d+)$/) { + # E.g. --load +2 + my $j = $1; + $load = + $self->ncpus() + $j; + } elsif ($loadspec =~ /^-(\d+)$/) { + # E.g. --load -2 + my $j = $1; + $load = + $self->ncpus() - $j; + } elsif ($loadspec =~ /^(\d+)\%$/) { + my $j = $1; + $load = + $self->ncpus() * $j / 100; + } elsif ($loadspec =~ /^(\d+)$/) { + $load = $1; + if($load == 0) { + # --load 0 = infinity (or at least close) + $load = 2**31; + } + } elsif (-f $loadspec) { + # TODO this needs to be done for $loadspec + die; + $Global::max_procs_file = $loadspec; + $Global::max_procs_file_last_mod = (stat($Global::max_procs_file))[9]; + if(open(IN, $Global::max_procs_file)) { + my $opt_P_file = join("",); + close IN; + $load = $self->compute_max_loadavg($opt_P_file); + } else { + print STDERR "Cannot open $loadspec\n"; + exit(255); + } + } else { + print STDERR "Parsing of --load failed\n"; + ::die_usage(); + } + if($load < 0.01) { + $load = 0.01; + } + } + return $load; +} + sub max_jobs_running { my $self = shift; if(not defined $self->{'max_jobs_running'}) { $self->{'max_jobs_running'} = - ::compute_number_of_processes($::opt_P,$self); + $self->compute_number_of_processes($::opt_P); } return $self->{'max_jobs_running'}; } +sub compute_number_of_processes { + # Number of processes wanted and limited by system resources + # Returns: + # Number of processes + my $self = shift; + my $opt_P = shift; + my $wanted_processes = $self->user_requested_processes($opt_P); + if(not defined $wanted_processes) { + $wanted_processes = $Global::default_simultaneous_sshlogins; + } + ::debug("Wanted procs: $wanted_processes\n"); + my $system_limit = + $self->processes_available_by_system_limit($wanted_processes); + ::debug("Limited to procs: $system_limit\n"); + return $system_limit; +} + +sub processes_available_by_system_limit { + # If the wanted number of processes is bigger than the system limits: + # Limit them to the system limits + # Limits are: File handles, number of input lines, processes, + # and taking > 1 second to spawn 10 extra processes + # Returns: + # Number of processes + my $self = shift; + my $wanted_processes = shift; + + my $system_limit=0; + my @command_lines=(); + my ($next_command_line, $args_ref); + my $more_filehandles=1; + my $max_system_proc_reached=0; + my $slow_spawining_warning_printed=0; + my $time = time; + my %fh; + my @children; + ::do_not_reap(); + + # Reserve filehandles + # perl uses 7 filehandles for something? + # parallel uses 1 for memory_usage + for my $i (1..8) { + open($fh{"init-$i"},"empty() or $Global::semaphore) + and $more_filehandles + and not $max_system_proc_reached) { + $system_limit++; + + if(not $Global::semaphore) { + # If there are no more command lines, then we have a process + # per command line, so no need to go further + + ($next_command_line) = $Global::CommandLineQueue->get(); + push(@command_lines, $next_command_line); + } + + # Every simultaneous process uses 2 filehandles when grouping + $more_filehandles = open($fh{$system_limit*2}," 2 and not $slow_spawining_warning_printed) { + # It took more than 2 second to fork ten processes. + # Give the user a warning. He can press Ctrl-C if this + # sucks. + print STDERR ("Warning: Starting 10 extra processes takes > 2 sec.\n", + "Consider adjusting -j. Press CTRL-C to stop.\n"); + $slow_spawining_warning_printed = 1; + } + } + if($system_limit < $wanted_processes and not $more_filehandles) { + print STDERR ("Warning: Only enough filehandles to run ", + $system_limit, " jobs in parallel. ", + "Raising ulimit -n may help\n"); + } + if($system_limit < $wanted_processes and $max_system_proc_reached) { + print STDERR ("Warning: Only enough available processes to run ", + $system_limit, " jobs in parallel.\n"); + } + # Cleanup: Close the files + for (values %fh) { close $_ } + # Cleanup: Kill the children + for my $pid (@children) { + kill 9, $pid; + waitpid($pid,0); + } + + #wait(); + # Cleanup: Unget the command_lines (and args_refs) + $Global::CommandLineQueue->unget(@command_lines); + if($self->string() ne ":" and + $system_limit > $Global::default_simultaneous_sshlogins) { + $system_limit = + $self->simultaneous_sshlogin_limit($system_limit); + } + return $system_limit; +} + +sub simultaneous_sshlogin_limit { + # Test by logging in wanted number of times simultaneously + # Returns: + # min($wanted_processes,$working_simultaneous_ssh_logins-1) + my $self = shift; + my $wanted_processes = shift; + # Try twice because it guesses wrong sometimes + # Choose the minimal + my $ssh_limit = + ::min($self->simultaneous_sshlogin($wanted_processes), + $self->simultaneous_sshlogin($wanted_processes)); + if($ssh_limit < $wanted_processes) { + my $serverlogin = $self->serverlogin(); + print STDERR + ("Warning: ssh to $serverlogin only allows ", + "for $ssh_limit simultaneous logins.\n", + "You may raise this by changing ", + "/etc/ssh/sshd_config:MaxStartup on $serverlogin\n", + "Using only ",$ssh_limit-1," connections ", + "to avoid race conditions\n"); + } + # Race condition can cause problem if using all sshs. + if($ssh_limit > 1) { $ssh_limit -= 1; } + return $ssh_limit; +} + + +sub simultaneous_sshlogin { + # Using $sshlogin try to see if we can do $wanted_processes + # simultaneous logins + # (ssh host echo simultaneouslogin & ssh host echo simultaneouslogin & ...)|grep simul|wc -l + # Returns: + # Number of succesful logins + my $self = shift; + my $wanted_processes = shift; + my $sshcmd = $self->sshcommand(); + my $serverlogin = $self->serverlogin(); + my $cmd = "$sshcmd $serverlogin echo simultaneouslogin 2>&1 &"x$wanted_processes; + ::debug("Trying $wanted_processes logins at $serverlogin"); + open (SIMUL, "($cmd)|grep simultaneouslogin | wc -l|") or die; + my $ssh_limit = ; + close SIMUL; + chomp $ssh_limit; + return $ssh_limit; +} + sub set_ncpus { my $self = shift; $self->{'ncpus'} = shift; } +sub user_requested_processes { + # Parse the number of processes that the user asked for using -j + # Returns: + # the number of processes to run on this sshlogin + my $self = shift; + my $opt_P = shift; + my $processes; + if(defined $opt_P) { + if($opt_P =~ /^\+(\d+)$/) { + # E.g. -P +2 + my $j = $1; + $processes = + $self->ncpus() + $j; + } elsif ($opt_P =~ /^-(\d+)$/) { + # E.g. -P -2 + my $j = $1; + $processes = + $self->ncpus() - $j; + } elsif ($opt_P =~ /^(\d+)\%$/) { + my $j = $1; + $processes = + $self->ncpus() * $j / 100; + } elsif ($opt_P =~ /^(\d+)$/) { + $processes = $1; + if($processes == 0) { + # -P 0 = infinity (or at least close) + $processes = 2**31; + } + } elsif (-f $opt_P) { + $Global::max_procs_file = $opt_P; + $Global::max_procs_file_last_mod = (stat($Global::max_procs_file))[9]; + if(open(IN, $Global::max_procs_file)) { + my $opt_P_file = join("",); + close IN; + $processes = $self->user_requested_processes($opt_P_file); + } else { + print STDERR "Cannot open $opt_P\n"; + exit(255); + } + } else { + print STDERR "Parsing of --jobs/-j/--max-procs/-P failed\n"; + ::die_usage(); + } + if($processes < 1) { + $processes = 1; + } + } + return $processes; +} + sub ncpus { my $self = shift; if(not defined $self->{'ncpus'}) {