diff --git a/src/niceload b/src/niceload index 459f6178..8cd70b20 100755 --- a/src/niceload +++ b/src/niceload @@ -1,5 +1,26 @@ #!/usr/bin/perl -w +# Copyright (C) 2004,2005,2006,2006,2008,2009,2010 Ole Tange, +# http://ole.tange.dk +# +# Copyright (C) 2010,2011 Ole Tange, http://ole.tange.dk and Free +# Software Foundation, Inc. +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, but +# WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, see +# or write to the Free Software Foundation, Inc., 51 Franklin St, +# Fifth Floor, Boston, MA 02110-1301 USA + use strict; use Getopt::Long; $Global::progname="niceload"; diff --git a/src/parallel b/src/parallel index 59094f7d..3098f21d 100755 --- a/src/parallel +++ b/src/parallel @@ -1,5 +1,23 @@ #!/usr/bin/perl -w +# Copyright (C) 2007,2008,2009,2010,2011 Ole Tange and Free Software +# Foundation, Inc. +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, but +# WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, see +# or write to the Free Software Foundation, Inc., 51 Franklin St, +# Fifth Floor, Boston, MA 02110-1301 USA + # open3 used in Job::start use IPC::Open3; # &WNOHANG used in reaper @@ -125,6 +143,8 @@ if($::opt_halt_on_error) { wait_and_exit(min(undef_as_zero($Global::exitstatus),254)); } +sub __PIPE_MODE__ {} + sub spreadstdin { # read a record # Spawn a job and print the record to it. @@ -307,6 +327,8 @@ sub write_record_to_pipe { return; } +sub __SEM_MODE__ {} + sub acquire_semaphore { # Acquires semaphore. If needed: spawns to the background # Returns: @@ -334,6 +356,8 @@ sub acquire_semaphore { return $sem; } +sub __PARSE_OPTIONS__ {} + sub options_hash { # Returns a hash of the GetOptions config return @@ -794,29 +818,6 @@ sub read_args_from_command_line { return @new_argv; } -sub open_or_exit { - # Returns: - # file handle to read-opened file - # exits if file cannot be opened - my $file = shift; - if($file eq "-") { - $Global::stdin_in_opt_a = 1; - return ($Global::original_stdin || *STDIN); - } - if(ref $file eq "GLOB") { - # This is an open filehandle - return $file; - } - my $fh = gensym; - if(not open($fh,"<",$file)) { - print STDERR "$Global::progname: ". - "Cannot open input file `$file': ". - "No such file or directory\n"; - wait_and_exit(255); - } - return $fh; -} - sub cleanup { # Returns: N/A if(@::opt_basefile) { @@ -824,9 +825,7 @@ sub cleanup { } } -# -# Generating the command line -# +sub __QUOTING_ARGUMENTS_FOR_SHELL__ {} sub shell_quote { my @strings = (@_); @@ -865,7 +864,7 @@ sub shell_unquote { return wantarray ? @strings : "@strings"; } -sub __NUMBER_OF_PROCESSES_FILEHANDLES_MAX_LENGTH_OF_COMMAND_LINE__ {} +sub __FILEHANDLES__ {} sub enough_file_handles { # check that we have enough filehandles available for starting @@ -889,94 +888,30 @@ sub enough_file_handles { } } -# -# General useful library functions -# - -sub min { +sub open_or_exit { # Returns: - # Minimum value of array - my $min; - for (@_) { - # Skip undefs - defined $_ or next; - defined $min or do { $min = $_; next; }; # Set $_ to the first non-undef - $min = ($min < $_) ? $min : $_; + # file handle to read-opened file + # exits if file cannot be opened + my $file = shift; + if($file eq "-") { + $Global::stdin_in_opt_a = 1; + return ($Global::original_stdin || *STDIN); } - return $min; -} - -sub max { - # Returns: - # Maximum value of array - my $max; - for (@_) { - # Skip undefs - defined $_ or next; - defined $max or do { $max = $_; next; }; # Set $_ to the first non-undef - $max = ($max > $_) ? $max : $_; + if(ref $file eq "GLOB") { + # This is an open filehandle + return $file; } - return $max; -} - -sub sum { - # Returns: - # Sum of values of array - my @args = @_; - my $sum = 0; - for (@args) { - # Skip undefs - $_ and do { $sum += $_; } + my $fh = gensym; + if(not open($fh,"<",$file)) { + print STDERR "$Global::progname: ". + "Cannot open input file `$file': ". + "No such file or directory\n"; + wait_and_exit(255); } - return $sum; + return $fh; } -sub undef_as_zero { - my $a = shift; - return $a ? $a : 0; -} - - -sub undef_as_empty { - my $a = shift; - return $a ? $a : ""; -} - -sub hostname { - if(not $Private::hostname) { - my $hostname = `hostname`; - chomp($hostname); - $Private::hostname = $hostname || "nohostname"; - } - return $Private::hostname; -} - -sub usleep { - # Sleep this many milliseconds. - my $secs = shift; - ::debug("Sleeping ",$secs," millisecs\n"); - select(undef, undef, undef, $secs/1000); -} - -sub multiply_binary_prefix { - # Evalualte numbers with binary prefix - # 13G = 13*1024*1024*1024 = 13958643712 - my $s = shift; - $s =~ s/Ki?/*1024/gi; - $s =~ s/Mi?/*1024*1024/gi; - $s =~ s/Gi?/*1024*1024*1024/gi; - $s =~ s/Ti?/*1024*1024*1024*1024/gi; - $s =~ s/Pi?/*1024*1024*1024*1024*1024/gi; - $s =~ s/Ei?/*1024*1024*1024*1024*1024*1024/gi; - $s =~ s/Zi?/*1024*1024*1024*1024*1024*1024*1024/gi; - $s =~ s/Yi?/*1024*1024*1024*1024*1024*1024*1024*1024/gi; - $s =~ s/Xi?/*1024*1024*1024*1024*1024*1024*1024*1024*1024/gi; - $s = eval $s; - return $s; -} - - -sub __RUNNING_AND_PRINTING_THE_JOBS__ {} +sub __RUNNING_THE_JOBS_AND_PRINTING_PROGRESS__ {} # Variable structure: # @@ -1004,6 +939,100 @@ sub init_run_jobs { } } +sub start_more_jobs { + # Returns: + # number of jobs started + my $jobs_started = 0; + if(not $Global::start_no_new_jobs) { + if($Global::max_procs_file) { + my $mtime = (stat($Global::max_procs_file))[9]; + if($mtime > $Global::max_procs_file_last_mod) { + $Global::max_procs_file_last_mod = $mtime; + for my $sshlogin (values %Global::host) { + $sshlogin->set_max_jobs_running(undef); + } + } + } + if($Global::max_load_file) { + my $mtime = (stat($Global::max_load_file))[9]; + if($mtime > $Global::max_load_file_last_mod) { + $Global::max_load_file_last_mod = $mtime; + for my $sshlogin (values %Global::host) { + $sshlogin->set_max_loadavg(undef); + } + } + } + + for my $sshlogin (values %Global::host) { + debug("Running jobs before on ".$sshlogin->string().": ".$sshlogin->jobs_running()."\n"); + if($::opt_load and $sshlogin->loadavg_too_high()) { + # The load is too high or unknown + next; + } + if($::opt_noswap and $sshlogin->swapping()) { + # The server is swapping + next; + } + while ($sshlogin->jobs_running() < $sshlogin->max_jobs_running()) { + if($Global::JobQueue->empty() and not $::opt_pipe) { + last; + } + debug($sshlogin->string()." has ".$sshlogin->jobs_running() + . " out of " . $sshlogin->max_jobs_running() + . " jobs running. Start another.\n"); + if(start_another_job($sshlogin) == 0) { + # No more jobs to start on this $sshlogin + debug("No jobs started on ".$sshlogin->string()."\n"); + last; + } + debug("Job started on ".$sshlogin->string()."\n"); + $sshlogin->inc_jobs_running(); + $jobs_started++; + } + debug("Running jobs after on ".$sshlogin->string().": ".$sshlogin->jobs_running() + ." of ".$sshlogin->max_jobs_running() ."\n"); + } + } + return $jobs_started; +} + +sub start_another_job { + # Grab a job from Global::JobQueue, start it at sshlogin + # and remember the pid, the STDOUT and the STDERR handles + # Returns: + # 1 if another jobs was started + # 0 otherwise + my $sshlogin = shift; + # Do we have enough file handles to start another job? + if(enough_file_handles()) { + if($Global::JobQueue->empty() and not $::opt_pipe) { + # No more commands to run + debug("Not starting: JobQueue empty\n"); + return 0; + } else { + my $job = get_job_with_sshlogin($sshlogin); + if(not defined $job) { + # No command available for that sshlogin + debug("Not starting: no jobs available for ".$sshlogin->string()."\n"); + return 0; + } + debug("Command to run on '".$job->sshlogin()."': '".$job->replaced()."'\n"); + if($job->start()) { + $Global::running{$job->pid()} = $job; + debug("Started as seq ".$job->seq(),"\n"); + return 1; + } else { + # If interactive says: Dont run the job, then skip it and run the next + return start_another_job($sshlogin); + } + } + } else { + # No more file handles + debug("Not starting: no more file handles\n"); + return 0; + } +} + sub drain_job_queue { # Returns: N/A $Private::first_completed ||= time; @@ -1250,102 +1279,6 @@ sub terminal_columns { return $Private::columns; } -sub start_more_jobs { - # Returns: - # number of jobs started - my $jobs_started = 0; - if(not $Global::start_no_new_jobs) { - if($Global::max_procs_file) { - my $mtime = (stat($Global::max_procs_file))[9]; - if($mtime > $Global::max_procs_file_last_mod) { - $Global::max_procs_file_last_mod = $mtime; - for my $sshlogin (values %Global::host) { - $sshlogin->set_max_jobs_running(undef); - } - } - } - if($Global::max_load_file) { - my $mtime = (stat($Global::max_load_file))[9]; - if($mtime > $Global::max_load_file_last_mod) { - $Global::max_load_file_last_mod = $mtime; - for my $sshlogin (values %Global::host) { - $sshlogin->set_max_loadavg(undef); - } - } - } - - for my $sshlogin (values %Global::host) { - debug("Running jobs before on ".$sshlogin->string().": ".$sshlogin->jobs_running()."\n"); - if($::opt_load and $sshlogin->loadavg_too_high()) { - # The load is too high or unknown - next; - } - if($::opt_noswap and $sshlogin->swapping()) { - # The server is swapping - next; - } - while ($sshlogin->jobs_running() < $sshlogin->max_jobs_running()) { - if($Global::JobQueue->empty() and not $::opt_pipe) { - last; - } - debug($sshlogin->string()." has ".$sshlogin->jobs_running() - . " out of " . $sshlogin->max_jobs_running() - . " jobs running. Start another.\n"); - if(start_another_job($sshlogin) == 0) { - # No more jobs to start on this $sshlogin - debug("No jobs started on ".$sshlogin->string()."\n"); - last; - } - debug("Job started on ".$sshlogin->string()."\n"); - $sshlogin->inc_jobs_running(); - $jobs_started++; - } - debug("Running jobs after on ".$sshlogin->string().": ".$sshlogin->jobs_running() - ." of ".$sshlogin->max_jobs_running() ."\n"); - } - } - return $jobs_started; -} - -sub start_another_job { - # Grab a job from Global::JobQueue, start it at sshlogin - # and remember the pid, the STDOUT and the STDERR handles - # Returns: - # 1 if another jobs was started - # 0 otherwise - my $sshlogin = shift; - # Do we have enough file handles to start another job? - if(enough_file_handles()) { - if($Global::JobQueue->empty() and not $::opt_pipe) { - # No more commands to run - debug("Not starting: JobQueue empty\n"); - return 0; - } else { - my $job = get_job_with_sshlogin($sshlogin); - if(not defined $job) { - # No command available for that sshlogin - debug("Not starting: no jobs available for ".$sshlogin->string()."\n"); - return 0; - } - debug("Command to run on '".$job->sshlogin()."': '".$job->replaced()."'\n"); - if($job->start()) { - $Global::running{$job->pid()} = $job; - debug("Started as seq ".$job->seq(),"\n"); - return 1; - } else { - # If interactive says: Dont run the job, then skip it and run the next - return start_another_job($sshlogin); - } - } - } else { - # No more file handles - debug("Not starting: no more file handles\n"); - return 0; - } -} - -sub __READING_AND_QUOTING_ARGUMENTS__ {} - sub get_job_with_sshlogin { # Returns: # next command to run with ssh command wrapping if remote @@ -1740,6 +1673,88 @@ sub show_limits { "press CTRL-D or CTRL-C\n"); } +sub __GENERIC_COMMON_FUNCTION__ {} + +sub min { + # Returns: + # Minimum value of array + my $min; + for (@_) { + # Skip undefs + defined $_ or next; + defined $min or do { $min = $_; next; }; # Set $_ to the first non-undef + $min = ($min < $_) ? $min : $_; + } + return $min; +} + +sub max { + # Returns: + # Maximum value of array + my $max; + for (@_) { + # Skip undefs + defined $_ or next; + defined $max or do { $max = $_; next; }; # Set $_ to the first non-undef + $max = ($max > $_) ? $max : $_; + } + return $max; +} + +sub sum { + # Returns: + # Sum of values of array + my @args = @_; + my $sum = 0; + for (@args) { + # Skip undefs + $_ and do { $sum += $_; } + } + return $sum; +} + +sub undef_as_zero { + my $a = shift; + return $a ? $a : 0; +} + +sub undef_as_empty { + my $a = shift; + return $a ? $a : ""; +} + +sub hostname { + if(not $Private::hostname) { + my $hostname = `hostname`; + chomp($hostname); + $Private::hostname = $hostname || "nohostname"; + } + return $Private::hostname; +} + +sub usleep { + # Sleep this many milliseconds. + my $secs = shift; + ::debug("Sleeping ",$secs," millisecs\n"); + select(undef, undef, undef, $secs/1000); +} + +sub multiply_binary_prefix { + # Evalualte numbers with binary prefix + # 13G = 13*1024*1024*1024 = 13958643712 + my $s = shift; + $s =~ s/Ki?/*1024/gi; + $s =~ s/Mi?/*1024*1024/gi; + $s =~ s/Gi?/*1024*1024*1024/gi; + $s =~ s/Ti?/*1024*1024*1024*1024/gi; + $s =~ s/Pi?/*1024*1024*1024*1024*1024/gi; + $s =~ s/Ei?/*1024*1024*1024*1024*1024*1024/gi; + $s =~ s/Zi?/*1024*1024*1024*1024*1024*1024*1024/gi; + $s =~ s/Yi?/*1024*1024*1024*1024*1024*1024*1024*1024/gi; + $s =~ s/Xi?/*1024*1024*1024*1024*1024*1024*1024*1024*1024/gi; + $s = eval $s; + return $s; +} sub __DEBUGGING__ {} @@ -1813,9 +1828,8 @@ sub my_dump { } } -### -##### OO Parts below -### +sub __OBJECT_ORIENTED_PARTS__ {} + package SSHLogin; @@ -2266,7 +2280,6 @@ sub simultaneous_sshlogin_limit { return $ssh_limit; } - sub simultaneous_sshlogin { # Using $sshlogin try to see if we can do $wanted_processes # simultaneous logins @@ -2377,7 +2390,7 @@ sub ncpus { sub no_of_cpus { # Returns: # Number of physical CPUs - local $/="\n"; # If delmiiter is set, then $/ will be wrong + local $/="\n"; # If delimiter is set, then $/ will be wrong my $no_of_cpus = (no_of_cpus_freebsd() || no_of_cpus_darwin() || no_of_cpus_solaris() @@ -2611,7 +2624,6 @@ sub sshcommand_of_sshlogin { $self->{'serverlogin'} = $serverlogin; } - sub control_path_dir { # Returns: # path to directory @@ -2628,7 +2640,6 @@ sub control_path_dir { } - package JobQueue; sub new { @@ -3905,22 +3916,6 @@ sub new { }, ref($class) || $class; } -sub seq { - my $self = shift; - return $self->{'seq'}; -} - -sub set_seq { - my $self = shift; - $self->{'seq'} = shift; -} - -sub quote_args { - my $self = shift; - # If there is not command emulate |bash - return $self->{'command'}; -} - sub get { my $self = shift; if(@{$self->{'unget'}}) { @@ -3962,6 +3957,22 @@ sub empty { return $empty; } +sub seq { + my $self = shift; + return $self->{'seq'}; +} + +sub set_seq { + my $self = shift; + $self->{'seq'} = shift; +} + +sub quote_args { + my $self = shift; + # If there is not command emulate |bash + return $self->{'command'}; +} + sub size { my $self = shift; if(not $self->{'size'}) { @@ -3983,7 +3994,7 @@ package Limits::Command; # Maximal command line length (for -m and -X) sub max_length { - # Find the max_length of a command line + # Find the max_length of a command line and cache it # Returns: # number of chars on the longest command line allowed if(not $Limits::Command::line_max_len) { @@ -4008,6 +4019,7 @@ sub max_length { } sub real_max_length { + # Find the max_length of a command line # Returns: # The maximal command line length # Use an upper bound of 8 MB if the shell allows for for infinite long lengths @@ -4053,6 +4065,7 @@ sub is_acceptable_command_line_length { return not $?; } + package RecordQueue; sub new { @@ -4113,6 +4126,7 @@ sub arg_number { return $self->{'arg_number'}; } + package RecordColQueue; sub new { @@ -4172,7 +4186,6 @@ sub empty { } - package MultifileQueue; @Global::unget_argv=(); @@ -4202,6 +4215,23 @@ sub get { } } +sub unget { + my $self = shift; + ::debug("MultifileQueue-unget '@_'\n"); + unshift @{$self->{'unget'}}, @_; +} + +sub empty { + my $self = shift; + my $empty = (not @Global::unget_argv + and not @{$self->{'unget'}}); + for my $fh (@{$self->{'fhs'}}) { + $empty &&= eof($fh); + } + ::debug("MultifileQueue->empty $empty\n"); + return $empty; +} + sub xapply_get { my $self = shift; if(@{$self->{'unget'}}) { @@ -4369,22 +4399,6 @@ sub expand_combinations { return @p; } -sub unget { - my $self = shift; - ::debug("MultifileQueue-unget '@_'\n"); - unshift @{$self->{'unget'}}, @_; -} - -sub empty { - my $self = shift; - my $empty = (not @Global::unget_argv - and not @{$self->{'unget'}}); - for my $fh (@{$self->{'fhs'}}) { - $empty &&= eof($fh); - } - ::debug("MultifileQueue->empty $empty\n"); - return $empty; -} package Arg; @@ -4546,7 +4560,6 @@ sub release { ::debug("released $self->{'pid'}\n"); } - sub atomic_link_if_count_less_than { # Link $file1 to $file2 if nlinks to $file1 < $count my $self = shift;