#!/usr/bin/perl -w
# Copyright (C) 2004,2005,2006,2006,2008,2009,2010 Ole Tange,
# http://ole.tange.dk
#
# Copyright (C) 2010,2011 Ole Tange, http://ole.tange.dk and Free
# Software Foundation, Inc.
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, see
# or write to the Free Software Foundation, Inc., 51 Franklin St,
# Fifth Floor, Boston, MA 02110-1301 USA
use strict;
use Getopt::Long;
$Global::progname="niceload";
$Global::version = 20120622;
Getopt::Long::Configure("bundling","require_order");
get_options_from_array(\@ARGV) || die_usage();
if($::opt_version) {
version();
exit 0;
}
if($::opt_help) {
help();
exit 0;
}
if($::opt_factor and $::opt_suspend) {
# You cannot have --suspend and --factor
help();
exit;
}
if(not (defined $::opt_start_io or defined $::opt_run_io
or defined $::opt_start_load or defined $::opt_run_load
or defined $::opt_start_mem or defined $::opt_run_mem
or defined $::opt_start_noswap or defined $::opt_run_noswap
or defined $::opt_io or defined $::opt_load
or defined $::opt_mem or defined $::opt_noswap)) {
# Default is --runload=1
$::opt_run_load = 1;
}
if(not defined $::opt_start_io) { $::opt_start_io = $::opt_io; }
if(not defined $::opt_run_io) { $::opt_run_io = $::opt_io; }
if(not defined $::opt_start_load) { $::opt_start_load = $::opt_load; }
if(not defined $::opt_run_load) { $::opt_run_load = $::opt_load; }
if(not defined $::opt_start_mem) { $::opt_start_mem = $::opt_mem; }
if(not defined $::opt_run_mem) { $::opt_run_mem = $::opt_mem; }
if(not defined $::opt_start_noswap) { $::opt_start_noswap = $::opt_noswap; }
if(not defined $::opt_run_noswap) { $::opt_run_noswap = $::opt_noswap; }
my $limit = Limit->new();
my $process = Process->new($::opt_nice,@ARGV);
if($::opt_pid) {
$process->set_pid($::opt_pid);
} elsif (@ARGV) {
# Wait until limit is below start_limit and run_limit
while($limit->over_start_limit()
or
($limit->hard() and $limit->over_run_limit())) {
$limit->sleep_for_recheck();
}
$process->start();
}
while($process->is_alive()) {
if($limit->over_run_limit()) {
$process->suspend();
$limit->sleep_for_recheck();
if(not $limit->hard()) {
$process->resume();
$limit->sleep_while_running();
}
} else {
$process->resume();
$limit->sleep_while_running();
}
}
exit($::exitstatus);
sub get_options_from_array {
# Run GetOptions on @array
# Returns:
# true if parsing worked
# false if parsing failed
# @array is changed
my $array_ref = shift;
# A bit of shuffling of @ARGV needed as GetOptionsFromArray is not
# supported everywhere
my @save_argv;
my $this_is_ARGV = (\@::ARGV == $array_ref);
if(not $this_is_ARGV) {
@save_argv = @::ARGV;
@::ARGV = @{$array_ref};
}
my @retval = GetOptions
("debug|D" => \$::opt_debug,
"factor|f=s" => \$::opt_factor,
"hard|H" => \$::opt_hard,
"soft|S" => \$::opt_soft,
"si|sio|startio|start-io=s" => \$::opt_start_io,
"ri|rio|runio|run-io=s" => \$::opt_run_io,
"io|I=s" => \$::opt_io,
"sl|startload|start-load=s" => \$::opt_start_load,
"rl|runload|run-load=s" => \$::opt_run_load,
"load|L|l=s" => \$::opt_load,
"sm|startmem|start-mem=s" => \$::opt_start_mem,
"rm|runmem|run-mem=s" => \$::opt_run_mem,
"mem|M=s" => \$::opt_mem,
"sn|startnoswap|start-noswap|start-no-swap" => \$::opt_start_noswap,
"rn|runnoswap|run-noswap|run-no-swap" => \$::opt_run_noswap,
"noswap|N" => \$::opt_noswap,
"nice|n=i" => \$::opt_nice,
"process|pid|p=s" => \$::opt_pid,
"suspend|s=s" => \$::opt_suspend,
"recheck|t=s" => \$::opt_recheck,
"quote|q" => \$::opt_quote,
"help|h" => \$::opt_help,
"verbose|v" => \$::opt_verbose,
"version|V" => \$::opt_version,
);
if(not $this_is_ARGV) {
@{$array_ref} = @::ARGV;
@::ARGV = @save_argv;
}
return @retval;
}
sub die_usage {
help();
exit 1;
}
sub help {
print q{
Usage:
niceload [-v] [-n niceness] [-L loadavg] [-I io] [-N] [-M mem]
[-s suspend_sec|-f factor] [-H] [-S]
command or -p pid
};
}
sub die_bug {
my $bugid = shift;
print STDERR
("$Global::progname: This should not happen. You have found a bug.\n",
"Please contact and include:\n",
"* The version number: $Global::version\n",
"* The bugid: $bugid\n",
"* The command line being run\n",
"* The files being read (put the files on a webserver if they are big)\n",
"\n",
"If you get the error on smaller/fewer files, please include those instead.\n");
exit(255);
}
sub usleep {
# Sleep this many milliseconds.
my $secs = shift;
::debug("Sleeping ",$secs," millisecs\n");
select(undef, undef, undef, $secs/1000);
}
sub debug {
if($::opt_debug) {
print STDERR @_;
}
}
sub my_dump {
# Returns:
# ascii expression of object if Data::Dump(er) is installed
# error code otherwise
my @dump_this = (@_);
eval "use Data::Dump qw(dump);";
if ($@) {
# Data::Dump not installed
eval "use Data::Dumper;";
if ($@) {
my $err = "Neither Data::Dump nor Data::Dumper is installed\n".
"Not dumping output\n";
print STDERR $err;
return $err;
} else {
return Dumper(@dump_this);
}
} else {
eval "use Data::Dump qw(dump);";
return (Data::Dump::dump(@dump_this));
}
}
sub version {
# Returns: N/A
print join("\n",
"GNU $Global::progname $Global::version",
"Copyright (C) 2004,2005,2006,2007,2008,2009 Ole Tange",
"Copyright (C) 2010,2011 Ole Tange and Free Software Foundation, Inc.",
"License GPLv3+: GNU GPL version 3 or later ",
"This is free software: you are free to change and redistribute it.",
"GNU $Global::progname comes with no warranty.",
"",
"Web site: http://www.gnu.org/software/parallel\n"
);
}
sub multiply_binary_prefix {
# Evalualte numbers with binary prefix
# 13G = 13*1024*1024*1024 = 13958643712
my $s = shift;
$s =~ s/Ki?/*1024/gi;
$s =~ s/Mi?/*1024*1024/gi;
$s =~ s/Gi?/*1024*1024*1024/gi;
$s =~ s/Ti?/*1024*1024*1024*1024/gi;
$s =~ s/Pi?/*1024*1024*1024*1024*1024/gi;
$s =~ s/Ei?/*1024*1024*1024*1024*1024*1024/gi;
$s =~ s/Zi?/*1024*1024*1024*1024*1024*1024*1024/gi;
$s =~ s/Yi?/*1024*1024*1024*1024*1024*1024*1024*1024/gi;
$s =~ s/Xi?/*1024*1024*1024*1024*1024*1024*1024*1024*1024/gi;
$s = eval $s;
return $s;
}
sub max {
# Returns:
# Maximum value of array
my $max;
for (@_) {
# Skip undefs
defined $_ or next;
defined $max or do { $max = $_; next; }; # Set $_ to the first non-undef
$max = ($max > $_) ? $max : $_;
}
return $max;
}
package Process;
sub new {
my $class = shift;
my $nice = shift;
my @ARGV = @_;
if($nice) {
unshift(@ARGV, "nice", "-n", $nice);
}
return bless {
'running' => 0, # Is the process running now?
'command' => [@ARGV],
}, ref($class) || $class;
}
sub set_pid {
my $self = shift;
$self->{'pid'} = shift;
push(@{$self->{'pids'}},$self->{'pid'});
$self->{'running'} = 1;
$::exitstatus = 0;
}
sub start {
# Start the program
my $self = shift;
::debug("Starting @{$self->{'command'}}\n");
$self->{'running'} = 1;
if($self->{'pid'} = fork) {
# set signal handler to kill children if parent is killed
push @{$self->{'pids'}}, $self->{'pid'};
$Global::process = $self;
$SIG{CHLD} = \&REAPER;
$SIG{INT}=\&kill_child_INT;
$SIG{TSTP}=\&kill_child_TSTP;
$SIG{CONT}=\&kill_child_CONT;
sleep 1; # Give child time to setpgrp(0,0);
$self->{'pgrp'} = getpgrp $self->{'pid'};
} else {
setpgrp(0,0);
::debug("Child pid: $$, pgrp: ",getpgrp $$,"\n");
::debug("@{$self->{'command'}}\n");
if($::opt_quote) {
system(@{$self->{'command'}});
} else {
system("@{$self->{'command'}}");
}
$::exitstatus = $? >> 8;
$::exitsignal = $? & 127;
::debug("Child exit $::exitstatus\n");
exit($::exitstatus);
}
}
use POSIX ":sys_wait_h";
sub REAPER {
my $stiff;
while (($stiff = waitpid(-1, &WNOHANG)) > 0) {
# do something with $stiff if you want
$::exitstatus = $? >> 8;
$::exitsignal = $? & 127;
}
$SIG{CHLD} = \&REAPER; # install *after* calling waitpid
}
sub kill_child_CONT {
my $self = $Global::process;
::debug("SIGCONT received. Killing $self->{'pid'}\n");
kill CONT => -getpgrp($self->{'pid'});
}
sub kill_child_TSTP {
my $self = $Global::process;
::debug("SIGTSTP received. Killing $self->{'pid'} and self ($$)\n");
kill TSTP => -getpgrp($self->{'pid'});
kill STOP => -$$;
kill STOP => $$;
}
sub kill_child_INT {
my $self = $Global::process;
::debug("SIGINT received. Killing $self->{'pid'} Exit\n");
kill INT => -getpgrp($self->{'pid'});
exit;
}
sub resume {
my $self = shift;
::debug("Resume @{$self->{'pids'}}\n");
if(not $self->{'running'}) {
# - = PID group
map { kill "CONT", -$_ } @{$self->{'pids'}};
# If using -p it is not in a group
map { kill "CONT", $_ } @{$self->{'pids'}};
$self->{'running'} = 1;
}
}
sub suspend {
my $self = shift;
::debug("Suspend @{$self->{'pids'}}\n");
if($self->{'running'}) {
# - = PID group
map { kill "STOP", -$_ } @{$self->{'pids'}};
# If using -p it is not in a group
map { kill "STOP", $_ } @{$self->{'pids'}};
$self->{'running'} = 0;
}
}
sub is_alive {
# The process is dead if none of the pids exist
my $self = shift;
my ($exists) = 0;
for my $pid (@{$self->{'pids'}}) {
if(kill 0 => $pid) { $exists++ }
}
::debug("is_alive: $exists\n");
return $exists;
}
package Limit;
sub new {
my $class = shift;
my %limits = @_;
my $hard = $::opt_soft ? 0 : $::opt_hard;
my $runio = $::opt_run_io ? ::multiply_binary_prefix($::opt_run_io) : 0;
my $startio = $::opt_start_io ? ::multiply_binary_prefix($::opt_start_io) : 0;
my $runload = $::opt_run_load ? ::multiply_binary_prefix($::opt_run_load) : 0;
my $startload = $::opt_start_load ? ::multiply_binary_prefix($::opt_start_load) : 0;
my $runmem = $::opt_run_mem ? ::multiply_binary_prefix($::opt_run_mem) : 0;
my $startmem = $::opt_start_mem ? ::multiply_binary_prefix($::opt_start_mem) : 0;
my $runnoswap = $::opt_run_noswap ? ::multiply_binary_prefix($::opt_run_noswap) : 0;
my $startnoswap = $::opt_start_noswap ? ::multiply_binary_prefix($::opt_start_noswap) : 0;
return bless {
'hard' => $hard,
'recheck' => 1, # Default
'runtime' => 1, # Default
'runio' => $runio,
'startio' => $startio,
'runload' => $runload,
'startload' => $startload,
'runmem' => $runmem,
'startmem' => $startmem,
'runnoswap' => $runnoswap,
'startnoswap' => $startnoswap,
'factor' => $::opt_factor || 1,
'recheck' => $::opt_recheck || 1,
'runtime' => $::opt_recheck || 1,
'over_run_limit' => 1,
'over_start_limit' => 1,
'verbose' => $::opt_verbose,
}, ref($class) || $class;
}
sub over_run_limit {
my $self = shift;
my $status = 0;
if($self->{'runmem'}) {
# mem should be between 0-10ish
# 100% available => 0 (1-1)
# 50% available => 1 (2-1)
# 10% available => 9 (10-1)
my $mem = $self->mem_status();
::debug("Run memory: $self->{'runmem'}/$mem\n");
$status += (::max(1,$self->{'runmem'}/$mem)-1);
}
if($self->{'runload'}) {
# load should be between 0-10ish
# 0 load => 0
my $load = $self->load_status();
$status += ::max(0,$load - $self->{'runload'});
}
if($self->{'runnoswap'}) {
# swap should be between 0-10ish
# swap in or swap out or no swap = 0
# else log(swapin*swapout)
my $swap = $self->swap_status();
$status += log(::max(1, $swap - $self->{'runnoswap'}));
}
if($self->{'runio'}) {
my $io = $self->io_status();
$status += ::max(0,$io - $self->{'runio'});
}
$self->{'over_run_limit'} = $status;
if(not $::opt_recheck) {
$self->{'recheck'} = $self->{'factor'} * $self->{'over_run_limit'};
}
::debug("over_run_limit: $status\n");
return $self->{'over_run_limit'};
}
sub over_start_limit {
my $self = shift;
my $status = 0;
if($self->{'startmem'}) {
# mem should be between 0-10ish
# 100% available => 0 (1-1)
# 50% available => 1 (2-1)
# 10% available => 9 (10-1)
my $mem = $self->mem_status();
::debug("Start memory: $self->{'startmem'}/$mem\n");
$status += (::max(1,$self->{'startmem'}/$mem)-1);
}
if($self->{'startload'}) {
# load should be between 0-10ish
# 0 load => 0
my $load = $self->load_status();
$status += ::max(0,$load - $self->{'startload'});
}
if($self->{'startnoswap'}) {
# swap should be between 0-10ish
# swap in or swap out or no swap = 0
# else log(swapin*swapout)
my $swap = $self->swap_status();
$status += log(::max(1, $swap - $self->{'startnoswap'}));
}
if($self->{'startio'}) {
my $io = $self->io_status();
$status += ::max(0,$io - $self->{'startio'});
}
$self->{'over_start_limit'} = $status;
if(not $::opt_recheck) {
$self->{'recheck'} = $self->{'factor'} * $self->{'over_start_limit'};
}
::debug("over_start_limit: $status\n");
return $self->{'over_start_limit'};
}
sub hard {
my $self = shift;
return $self->{'hard'};
}
sub verbose {
my $self = shift;
return $self->{'verbose'};
}
sub sleep_for_recheck {
my $self = shift;
if($self->{'recheck'} < 0.5) {
# Never sleep less than 0.5 sec
$self->{'recheck'} = 0.5;
}
if($self->verbose()) {
$self->{'recheck'} = int($self->{'recheck'}*100)/100;
print "Sleeping $self->{'recheck'}s\n";
}
::debug("recheck in $self->{'recheck'}s\n");
::usleep(1); # For some reason this gets interrupted
::usleep(1000*$self->{'recheck'});
}
sub sleep_while_running {
my $self = shift;
::debug("check in $self->{'runtime'}s\n");
if($self->verbose()) {
$self->{'runtime'} = int($self->{'runtime'}*100)/100;
print "Running $self->{'runtime'}s\n";
}
::usleep(1); # For some reason this gets interrupted
::usleep(1000*$self->{'runtime'});
}
sub load_status {
# Returns:
# loadavg
my $self = shift;
# Cache for some seconds
if(not defined $self->{'load_status'} or
$self->{'load_status_cache_time'}+$self->{'recheck'} < time) {
$self->{'load_status'} = load_status_linux();
$self->{'load_status_cache_time'} = time;
}
::debug("load_status: $self->{'load_status'}\n");
return $self->{'load_status'};
}
sub load_status_linux {
my ($loadavg);
if(open(IN,"/proc/loadavg")) {
# Linux specific (but fast)
my $upString = ;
if($upString =~ m/^(\d+\.\d+)/) {
$loadavg = $1;
} else {
::die_bug("proc_loadavg");
}
close IN;
} elsif (open(IN,"uptime|")) {
my $upString = ;
if($upString =~ m/average.\s*(\d+\.\d+)/) {
$loadavg = $1;
} else {
::die_bug("uptime");
}
close IN;
}
return $loadavg;
}
sub swap_status {
# Returns:
# (swap in)*(swap out) kb
my $self = shift;
# Cache for some seconds
if(not defined $self->{'swap_status'} or
$self->{'swap_status_cache_time'}+$self->{'recheck'} < time) {
my $status = swap_status_linux();
$self->{'swap_status'} = ::max($status,0);
$self->{'swap_status_cache_time'} = time;
}
::debug("swap_status: $self->{'swap_status'}\n");
return $self->{'swap_status'};
}
sub swap_status_linux {
my $swap_activity;
$swap_activity = "vmstat 1 2 | tail -n1 | awk '{print \$7*\$8}'";
# Run swap_activity measuring.
return qx{ $swap_activity };
}
sub mem_status {
# Returns:
# number of bytes (free+cache)
my $self = shift;
# Cache for one second
if(not defined $self->{'mem_status'} or
$self->{'mem_status_cache_time'}+$self->{'recheck'} < time) {
$self->{'mem_status'} = mem_status_linux();
$self->{'mem_status_cache_time'} = time;
}
::debug("mem_status: $self->{'mem_status'}\n");
return $self->{'mem_status'};
}
sub mem_status_linux {
# total used free shared buffers cached
# Mem: 3366496 2901664 464832 0 179228 1850692
# -/+ buffers/cache: 871744 2494752
# Swap: 6445476 1396860 5048616
my @free = `free`;
my $free = (split(/\s+/,$free[2]))[3];
return $free*1024;
}
sub io_status {
# Returns:
# max percent for all devices
my $self = shift;
# Cache for one second
if(not defined $self->{'io_status'} or
$self->{'io_status_cache_time'}+$self->{'recheck'} < time) {
$self->{'io_status'} = io_status_linux();
$self->{'io_status_cache_time'} = time;
}
::debug("io_status: $self->{'io_status'}\n");
return $self->{'io_status'};
}
sub io_status_linux {
# Device: rrqm/s wrqm/s r/s w/s rkB/s wkB/s avgrq-sz avgqu-sz await r_await w_await svctm %util
# sda 0.00 0.00 0.00 0.00 0.00 0.00 0.00 0.00 0.00 0.00 0.00 0.00 0.00
my @iostat_out = `LANG=C iostat -x 1 2`;
# throw away all execpt the last Device:-section
my @iostat;
for(reverse @iostat_out) {
/Device:/ and last;
push @iostat, (split(/\s+/,$_))[13];
}
my $io = ::max(@iostat);
return $io/10;
}
$::exitsignal = $::exitstatus = 0; # Dummy