parallel: Code reorg. Copyright notice added.

niceload: Copyright notice added.
Passes testsuite.
This commit is contained in:
Ole Tange 2011-07-29 14:55:04 +02:00
parent 6d242f975e
commit 67e3d3ea77
2 changed files with 281 additions and 247 deletions

View file

@ -1,5 +1,26 @@
#!/usr/bin/perl -w #!/usr/bin/perl -w
# Copyright (C) 2004,2005,2006,2006,2008,2009,2010 Ole Tange,
# http://ole.tange.dk
#
# Copyright (C) 2010,2011 Ole Tange, http://ole.tange.dk and Free
# Software Foundation, Inc.
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, see <http://www.gnu.org/licenses/>
# or write to the Free Software Foundation, Inc., 51 Franklin St,
# Fifth Floor, Boston, MA 02110-1301 USA
use strict; use strict;
use Getopt::Long; use Getopt::Long;
$Global::progname="niceload"; $Global::progname="niceload";

View file

@ -1,5 +1,23 @@
#!/usr/bin/perl -w #!/usr/bin/perl -w
# Copyright (C) 2007,2008,2009,2010,2011 Ole Tange and Free Software
# Foundation, Inc.
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, see <http://www.gnu.org/licenses/>
# or write to the Free Software Foundation, Inc., 51 Franklin St,
# Fifth Floor, Boston, MA 02110-1301 USA
# open3 used in Job::start # open3 used in Job::start
use IPC::Open3; use IPC::Open3;
# &WNOHANG used in reaper # &WNOHANG used in reaper
@ -125,6 +143,8 @@ if($::opt_halt_on_error) {
wait_and_exit(min(undef_as_zero($Global::exitstatus),254)); wait_and_exit(min(undef_as_zero($Global::exitstatus),254));
} }
sub __PIPE_MODE__ {}
sub spreadstdin { sub spreadstdin {
# read a record # read a record
# Spawn a job and print the record to it. # Spawn a job and print the record to it.
@ -307,6 +327,8 @@ sub write_record_to_pipe {
return; return;
} }
sub __SEM_MODE__ {}
sub acquire_semaphore { sub acquire_semaphore {
# Acquires semaphore. If needed: spawns to the background # Acquires semaphore. If needed: spawns to the background
# Returns: # Returns:
@ -334,6 +356,8 @@ sub acquire_semaphore {
return $sem; return $sem;
} }
sub __PARSE_OPTIONS__ {}
sub options_hash { sub options_hash {
# Returns a hash of the GetOptions config # Returns a hash of the GetOptions config
return return
@ -794,29 +818,6 @@ sub read_args_from_command_line {
return @new_argv; return @new_argv;
} }
sub open_or_exit {
# Returns:
# file handle to read-opened file
# exits if file cannot be opened
my $file = shift;
if($file eq "-") {
$Global::stdin_in_opt_a = 1;
return ($Global::original_stdin || *STDIN);
}
if(ref $file eq "GLOB") {
# This is an open filehandle
return $file;
}
my $fh = gensym;
if(not open($fh,"<",$file)) {
print STDERR "$Global::progname: ".
"Cannot open input file `$file': ".
"No such file or directory\n";
wait_and_exit(255);
}
return $fh;
}
sub cleanup { sub cleanup {
# Returns: N/A # Returns: N/A
if(@::opt_basefile) { if(@::opt_basefile) {
@ -824,9 +825,7 @@ sub cleanup {
} }
} }
# sub __QUOTING_ARGUMENTS_FOR_SHELL__ {}
# Generating the command line
#
sub shell_quote { sub shell_quote {
my @strings = (@_); my @strings = (@_);
@ -865,7 +864,7 @@ sub shell_unquote {
return wantarray ? @strings : "@strings"; return wantarray ? @strings : "@strings";
} }
sub __NUMBER_OF_PROCESSES_FILEHANDLES_MAX_LENGTH_OF_COMMAND_LINE__ {} sub __FILEHANDLES__ {}
sub enough_file_handles { sub enough_file_handles {
# check that we have enough filehandles available for starting # check that we have enough filehandles available for starting
@ -889,94 +888,30 @@ sub enough_file_handles {
} }
} }
# sub open_or_exit {
# General useful library functions
#
sub min {
# Returns: # Returns:
# Minimum value of array # file handle to read-opened file
my $min; # exits if file cannot be opened
for (@_) { my $file = shift;
# Skip undefs if($file eq "-") {
defined $_ or next; $Global::stdin_in_opt_a = 1;
defined $min or do { $min = $_; next; }; # Set $_ to the first non-undef return ($Global::original_stdin || *STDIN);
$min = ($min < $_) ? $min : $_;
} }
return $min; if(ref $file eq "GLOB") {
# This is an open filehandle
return $file;
}
my $fh = gensym;
if(not open($fh,"<",$file)) {
print STDERR "$Global::progname: ".
"Cannot open input file `$file': ".
"No such file or directory\n";
wait_and_exit(255);
}
return $fh;
} }
sub max { sub __RUNNING_THE_JOBS_AND_PRINTING_PROGRESS__ {}
# Returns:
# Maximum value of array
my $max;
for (@_) {
# Skip undefs
defined $_ or next;
defined $max or do { $max = $_; next; }; # Set $_ to the first non-undef
$max = ($max > $_) ? $max : $_;
}
return $max;
}
sub sum {
# Returns:
# Sum of values of array
my @args = @_;
my $sum = 0;
for (@args) {
# Skip undefs
$_ and do { $sum += $_; }
}
return $sum;
}
sub undef_as_zero {
my $a = shift;
return $a ? $a : 0;
}
sub undef_as_empty {
my $a = shift;
return $a ? $a : "";
}
sub hostname {
if(not $Private::hostname) {
my $hostname = `hostname`;
chomp($hostname);
$Private::hostname = $hostname || "nohostname";
}
return $Private::hostname;
}
sub usleep {
# Sleep this many milliseconds.
my $secs = shift;
::debug("Sleeping ",$secs," millisecs\n");
select(undef, undef, undef, $secs/1000);
}
sub multiply_binary_prefix {
# Evalualte numbers with binary prefix
# 13G = 13*1024*1024*1024 = 13958643712
my $s = shift;
$s =~ s/Ki?/*1024/gi;
$s =~ s/Mi?/*1024*1024/gi;
$s =~ s/Gi?/*1024*1024*1024/gi;
$s =~ s/Ti?/*1024*1024*1024*1024/gi;
$s =~ s/Pi?/*1024*1024*1024*1024*1024/gi;
$s =~ s/Ei?/*1024*1024*1024*1024*1024*1024/gi;
$s =~ s/Zi?/*1024*1024*1024*1024*1024*1024*1024/gi;
$s =~ s/Yi?/*1024*1024*1024*1024*1024*1024*1024*1024/gi;
$s =~ s/Xi?/*1024*1024*1024*1024*1024*1024*1024*1024*1024/gi;
$s = eval $s;
return $s;
}
sub __RUNNING_AND_PRINTING_THE_JOBS__ {}
# Variable structure: # Variable structure:
# #
@ -1004,6 +939,100 @@ sub init_run_jobs {
} }
} }
sub start_more_jobs {
# Returns:
# number of jobs started
my $jobs_started = 0;
if(not $Global::start_no_new_jobs) {
if($Global::max_procs_file) {
my $mtime = (stat($Global::max_procs_file))[9];
if($mtime > $Global::max_procs_file_last_mod) {
$Global::max_procs_file_last_mod = $mtime;
for my $sshlogin (values %Global::host) {
$sshlogin->set_max_jobs_running(undef);
}
}
}
if($Global::max_load_file) {
my $mtime = (stat($Global::max_load_file))[9];
if($mtime > $Global::max_load_file_last_mod) {
$Global::max_load_file_last_mod = $mtime;
for my $sshlogin (values %Global::host) {
$sshlogin->set_max_loadavg(undef);
}
}
}
for my $sshlogin (values %Global::host) {
debug("Running jobs before on ".$sshlogin->string().": ".$sshlogin->jobs_running()."\n");
if($::opt_load and $sshlogin->loadavg_too_high()) {
# The load is too high or unknown
next;
}
if($::opt_noswap and $sshlogin->swapping()) {
# The server is swapping
next;
}
while ($sshlogin->jobs_running() < $sshlogin->max_jobs_running()) {
if($Global::JobQueue->empty() and not $::opt_pipe) {
last;
}
debug($sshlogin->string()." has ".$sshlogin->jobs_running()
. " out of " . $sshlogin->max_jobs_running()
. " jobs running. Start another.\n");
if(start_another_job($sshlogin) == 0) {
# No more jobs to start on this $sshlogin
debug("No jobs started on ".$sshlogin->string()."\n");
last;
}
debug("Job started on ".$sshlogin->string()."\n");
$sshlogin->inc_jobs_running();
$jobs_started++;
}
debug("Running jobs after on ".$sshlogin->string().": ".$sshlogin->jobs_running()
." of ".$sshlogin->max_jobs_running() ."\n");
}
}
return $jobs_started;
}
sub start_another_job {
# Grab a job from Global::JobQueue, start it at sshlogin
# and remember the pid, the STDOUT and the STDERR handles
# Returns:
# 1 if another jobs was started
# 0 otherwise
my $sshlogin = shift;
# Do we have enough file handles to start another job?
if(enough_file_handles()) {
if($Global::JobQueue->empty() and not $::opt_pipe) {
# No more commands to run
debug("Not starting: JobQueue empty\n");
return 0;
} else {
my $job = get_job_with_sshlogin($sshlogin);
if(not defined $job) {
# No command available for that sshlogin
debug("Not starting: no jobs available for ".$sshlogin->string()."\n");
return 0;
}
debug("Command to run on '".$job->sshlogin()."': '".$job->replaced()."'\n");
if($job->start()) {
$Global::running{$job->pid()} = $job;
debug("Started as seq ".$job->seq(),"\n");
return 1;
} else {
# If interactive says: Dont run the job, then skip it and run the next
return start_another_job($sshlogin);
}
}
} else {
# No more file handles
debug("Not starting: no more file handles\n");
return 0;
}
}
sub drain_job_queue { sub drain_job_queue {
# Returns: N/A # Returns: N/A
$Private::first_completed ||= time; $Private::first_completed ||= time;
@ -1250,102 +1279,6 @@ sub terminal_columns {
return $Private::columns; return $Private::columns;
} }
sub start_more_jobs {
# Returns:
# number of jobs started
my $jobs_started = 0;
if(not $Global::start_no_new_jobs) {
if($Global::max_procs_file) {
my $mtime = (stat($Global::max_procs_file))[9];
if($mtime > $Global::max_procs_file_last_mod) {
$Global::max_procs_file_last_mod = $mtime;
for my $sshlogin (values %Global::host) {
$sshlogin->set_max_jobs_running(undef);
}
}
}
if($Global::max_load_file) {
my $mtime = (stat($Global::max_load_file))[9];
if($mtime > $Global::max_load_file_last_mod) {
$Global::max_load_file_last_mod = $mtime;
for my $sshlogin (values %Global::host) {
$sshlogin->set_max_loadavg(undef);
}
}
}
for my $sshlogin (values %Global::host) {
debug("Running jobs before on ".$sshlogin->string().": ".$sshlogin->jobs_running()."\n");
if($::opt_load and $sshlogin->loadavg_too_high()) {
# The load is too high or unknown
next;
}
if($::opt_noswap and $sshlogin->swapping()) {
# The server is swapping
next;
}
while ($sshlogin->jobs_running() < $sshlogin->max_jobs_running()) {
if($Global::JobQueue->empty() and not $::opt_pipe) {
last;
}
debug($sshlogin->string()." has ".$sshlogin->jobs_running()
. " out of " . $sshlogin->max_jobs_running()
. " jobs running. Start another.\n");
if(start_another_job($sshlogin) == 0) {
# No more jobs to start on this $sshlogin
debug("No jobs started on ".$sshlogin->string()."\n");
last;
}
debug("Job started on ".$sshlogin->string()."\n");
$sshlogin->inc_jobs_running();
$jobs_started++;
}
debug("Running jobs after on ".$sshlogin->string().": ".$sshlogin->jobs_running()
." of ".$sshlogin->max_jobs_running() ."\n");
}
}
return $jobs_started;
}
sub start_another_job {
# Grab a job from Global::JobQueue, start it at sshlogin
# and remember the pid, the STDOUT and the STDERR handles
# Returns:
# 1 if another jobs was started
# 0 otherwise
my $sshlogin = shift;
# Do we have enough file handles to start another job?
if(enough_file_handles()) {
if($Global::JobQueue->empty() and not $::opt_pipe) {
# No more commands to run
debug("Not starting: JobQueue empty\n");
return 0;
} else {
my $job = get_job_with_sshlogin($sshlogin);
if(not defined $job) {
# No command available for that sshlogin
debug("Not starting: no jobs available for ".$sshlogin->string()."\n");
return 0;
}
debug("Command to run on '".$job->sshlogin()."': '".$job->replaced()."'\n");
if($job->start()) {
$Global::running{$job->pid()} = $job;
debug("Started as seq ".$job->seq(),"\n");
return 1;
} else {
# If interactive says: Dont run the job, then skip it and run the next
return start_another_job($sshlogin);
}
}
} else {
# No more file handles
debug("Not starting: no more file handles\n");
return 0;
}
}
sub __READING_AND_QUOTING_ARGUMENTS__ {}
sub get_job_with_sshlogin { sub get_job_with_sshlogin {
# Returns: # Returns:
# next command to run with ssh command wrapping if remote # next command to run with ssh command wrapping if remote
@ -1740,6 +1673,88 @@ sub show_limits {
"press CTRL-D or CTRL-C\n"); "press CTRL-D or CTRL-C\n");
} }
sub __GENERIC_COMMON_FUNCTION__ {}
sub min {
# Returns:
# Minimum value of array
my $min;
for (@_) {
# Skip undefs
defined $_ or next;
defined $min or do { $min = $_; next; }; # Set $_ to the first non-undef
$min = ($min < $_) ? $min : $_;
}
return $min;
}
sub max {
# Returns:
# Maximum value of array
my $max;
for (@_) {
# Skip undefs
defined $_ or next;
defined $max or do { $max = $_; next; }; # Set $_ to the first non-undef
$max = ($max > $_) ? $max : $_;
}
return $max;
}
sub sum {
# Returns:
# Sum of values of array
my @args = @_;
my $sum = 0;
for (@args) {
# Skip undefs
$_ and do { $sum += $_; }
}
return $sum;
}
sub undef_as_zero {
my $a = shift;
return $a ? $a : 0;
}
sub undef_as_empty {
my $a = shift;
return $a ? $a : "";
}
sub hostname {
if(not $Private::hostname) {
my $hostname = `hostname`;
chomp($hostname);
$Private::hostname = $hostname || "nohostname";
}
return $Private::hostname;
}
sub usleep {
# Sleep this many milliseconds.
my $secs = shift;
::debug("Sleeping ",$secs," millisecs\n");
select(undef, undef, undef, $secs/1000);
}
sub multiply_binary_prefix {
# Evalualte numbers with binary prefix
# 13G = 13*1024*1024*1024 = 13958643712
my $s = shift;
$s =~ s/Ki?/*1024/gi;
$s =~ s/Mi?/*1024*1024/gi;
$s =~ s/Gi?/*1024*1024*1024/gi;
$s =~ s/Ti?/*1024*1024*1024*1024/gi;
$s =~ s/Pi?/*1024*1024*1024*1024*1024/gi;
$s =~ s/Ei?/*1024*1024*1024*1024*1024*1024/gi;
$s =~ s/Zi?/*1024*1024*1024*1024*1024*1024*1024/gi;
$s =~ s/Yi?/*1024*1024*1024*1024*1024*1024*1024*1024/gi;
$s =~ s/Xi?/*1024*1024*1024*1024*1024*1024*1024*1024*1024/gi;
$s = eval $s;
return $s;
}
sub __DEBUGGING__ {} sub __DEBUGGING__ {}
@ -1813,9 +1828,8 @@ sub my_dump {
} }
} }
### sub __OBJECT_ORIENTED_PARTS__ {}
##### OO Parts below
###
package SSHLogin; package SSHLogin;
@ -2266,7 +2280,6 @@ sub simultaneous_sshlogin_limit {
return $ssh_limit; return $ssh_limit;
} }
sub simultaneous_sshlogin { sub simultaneous_sshlogin {
# Using $sshlogin try to see if we can do $wanted_processes # Using $sshlogin try to see if we can do $wanted_processes
# simultaneous logins # simultaneous logins
@ -2377,7 +2390,7 @@ sub ncpus {
sub no_of_cpus { sub no_of_cpus {
# Returns: # Returns:
# Number of physical CPUs # Number of physical CPUs
local $/="\n"; # If delmiiter is set, then $/ will be wrong local $/="\n"; # If delimiter is set, then $/ will be wrong
my $no_of_cpus = (no_of_cpus_freebsd() my $no_of_cpus = (no_of_cpus_freebsd()
|| no_of_cpus_darwin() || no_of_cpus_darwin()
|| no_of_cpus_solaris() || no_of_cpus_solaris()
@ -2611,7 +2624,6 @@ sub sshcommand_of_sshlogin {
$self->{'serverlogin'} = $serverlogin; $self->{'serverlogin'} = $serverlogin;
} }
sub control_path_dir { sub control_path_dir {
# Returns: # Returns:
# path to directory # path to directory
@ -2628,7 +2640,6 @@ sub control_path_dir {
} }
package JobQueue; package JobQueue;
sub new { sub new {
@ -3905,22 +3916,6 @@ sub new {
}, ref($class) || $class; }, ref($class) || $class;
} }
sub seq {
my $self = shift;
return $self->{'seq'};
}
sub set_seq {
my $self = shift;
$self->{'seq'} = shift;
}
sub quote_args {
my $self = shift;
# If there is not command emulate |bash
return $self->{'command'};
}
sub get { sub get {
my $self = shift; my $self = shift;
if(@{$self->{'unget'}}) { if(@{$self->{'unget'}}) {
@ -3962,6 +3957,22 @@ sub empty {
return $empty; return $empty;
} }
sub seq {
my $self = shift;
return $self->{'seq'};
}
sub set_seq {
my $self = shift;
$self->{'seq'} = shift;
}
sub quote_args {
my $self = shift;
# If there is not command emulate |bash
return $self->{'command'};
}
sub size { sub size {
my $self = shift; my $self = shift;
if(not $self->{'size'}) { if(not $self->{'size'}) {
@ -3983,7 +3994,7 @@ package Limits::Command;
# Maximal command line length (for -m and -X) # Maximal command line length (for -m and -X)
sub max_length { sub max_length {
# Find the max_length of a command line # Find the max_length of a command line and cache it
# Returns: # Returns:
# number of chars on the longest command line allowed # number of chars on the longest command line allowed
if(not $Limits::Command::line_max_len) { if(not $Limits::Command::line_max_len) {
@ -4008,6 +4019,7 @@ sub max_length {
} }
sub real_max_length { sub real_max_length {
# Find the max_length of a command line
# Returns: # Returns:
# The maximal command line length # The maximal command line length
# Use an upper bound of 8 MB if the shell allows for for infinite long lengths # Use an upper bound of 8 MB if the shell allows for for infinite long lengths
@ -4053,6 +4065,7 @@ sub is_acceptable_command_line_length {
return not $?; return not $?;
} }
package RecordQueue; package RecordQueue;
sub new { sub new {
@ -4113,6 +4126,7 @@ sub arg_number {
return $self->{'arg_number'}; return $self->{'arg_number'};
} }
package RecordColQueue; package RecordColQueue;
sub new { sub new {
@ -4172,7 +4186,6 @@ sub empty {
} }
package MultifileQueue; package MultifileQueue;
@Global::unget_argv=(); @Global::unget_argv=();
@ -4202,6 +4215,23 @@ sub get {
} }
} }
sub unget {
my $self = shift;
::debug("MultifileQueue-unget '@_'\n");
unshift @{$self->{'unget'}}, @_;
}
sub empty {
my $self = shift;
my $empty = (not @Global::unget_argv
and not @{$self->{'unget'}});
for my $fh (@{$self->{'fhs'}}) {
$empty &&= eof($fh);
}
::debug("MultifileQueue->empty $empty\n");
return $empty;
}
sub xapply_get { sub xapply_get {
my $self = shift; my $self = shift;
if(@{$self->{'unget'}}) { if(@{$self->{'unget'}}) {
@ -4369,22 +4399,6 @@ sub expand_combinations {
return @p; return @p;
} }
sub unget {
my $self = shift;
::debug("MultifileQueue-unget '@_'\n");
unshift @{$self->{'unget'}}, @_;
}
sub empty {
my $self = shift;
my $empty = (not @Global::unget_argv
and not @{$self->{'unget'}});
for my $fh (@{$self->{'fhs'}}) {
$empty &&= eof($fh);
}
::debug("MultifileQueue->empty $empty\n");
return $empty;
}
package Arg; package Arg;
@ -4546,7 +4560,6 @@ sub release {
::debug("released $self->{'pid'}\n"); ::debug("released $self->{'pid'}\n");
} }
sub atomic_link_if_count_less_than { sub atomic_link_if_count_less_than {
# Link $file1 to $file2 if nlinks to $file1 < $count # Link $file1 to $file2 if nlinks to $file1 < $count
my $self = shift; my $self = shift;