#!/usr/bin/perl -w # Copyright (C) 2004,2005,2006,2006,2008,2009,2010 Ole Tange, # http://ole.tange.dk # # Copyright (C) 2010,2011 Ole Tange, http://ole.tange.dk and Free # Software Foundation, Inc. # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation; either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, but # WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU # General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program; if not, see # or write to the Free Software Foundation, Inc., 51 Franklin St, # Fifth Floor, Boston, MA 02110-1301 USA use strict; use Getopt::Long; $Global::progname="niceload"; $Global::version = 20120322; Getopt::Long::Configure("bundling","require_order"); get_options_from_array(\@ARGV) || die_usage(); if($::opt_version) { version(); exit 0; } if($::opt_help) { help(); exit 0; } if($::opt_factor and $::opt_suspend) { # You cannot have --suspend and --factor help(); exit; } if(not (defined $::opt_start_io or defined $::opt_run_io or defined $::opt_start_load or defined $::opt_run_load or defined $::opt_start_mem or defined $::opt_run_mem or defined $::opt_start_noswap or defined $::opt_run_noswap or defined $::opt_io or defined $::opt_load or defined $::opt_mem or defined $::opt_noswap)) { # Default is --runload=1 $::opt_run_load = 1; } if(not defined $::opt_start_io) { $::opt_start_io = $::opt_io; } if(not defined $::opt_run_io) { $::opt_run_io = $::opt_io; } if(not defined $::opt_start_load) { $::opt_start_load = $::opt_load; } if(not defined $::opt_run_load) { $::opt_run_load = $::opt_load; } if(not defined $::opt_start_mem) { $::opt_start_mem = $::opt_mem; } if(not defined $::opt_run_mem) { $::opt_run_mem = $::opt_mem; } if(not defined $::opt_start_noswap) { $::opt_start_noswap = $::opt_noswap; } if(not defined $::opt_run_noswap) { $::opt_run_noswap = $::opt_noswap; } my $limit = Limit->new(); my $process = Process->new($::opt_nice,@ARGV); if($::opt_pid) { $process->set_pid($::opt_pid); } elsif (@ARGV) { # Wait until limit is below start_limit and run_limit while($limit->over_start_limit() or ($limit->hard() and $limit->over_run_limit())) { $limit->sleep_for_recheck(); } $process->start(); } while($process->is_alive()) { if($limit->over_run_limit()) { $process->suspend(); $limit->sleep_for_recheck(); if(not $limit->hard()) { $process->resume(); $limit->sleep_while_running(); } } else { $process->resume(); $limit->sleep_while_running(); } } exit($::exitstatus); sub get_options_from_array { # Run GetOptions on @array # Returns: # true if parsing worked # false if parsing failed # @array is changed my $array_ref = shift; # A bit of shuffling of @ARGV needed as GetOptionsFromArray is not # supported everywhere my @save_argv; my $this_is_ARGV = (\@::ARGV == $array_ref); if(not $this_is_ARGV) { @save_argv = @::ARGV; @::ARGV = @{$array_ref}; } my @retval = GetOptions ("debug|D" => \$::opt_debug, "factor|f=s" => \$::opt_factor, "hard|H" => \$::opt_hard, "soft|S" => \$::opt_soft, "si|sio|startio|start-io=s" => \$::opt_start_io, "ri|rio|runio|run-io=s" => \$::opt_run_io, "io|I=s" => \$::opt_io, "sl|startload|start-load=s" => \$::opt_start_load, "rl|runload|run-load=s" => \$::opt_run_load, "load|L|l=s" => \$::opt_load, "sm|startmem|start-mem=s" => \$::opt_start_mem, "rm|runmem|run-mem=s" => \$::opt_run_mem, "mem|M=s" => \$::opt_mem, "sn|startnoswap|start-noswap|start-no-swap" => \$::opt_start_noswap, "rn|runnoswap|run-noswap|run-no-swap" => \$::opt_run_noswap, "noswap|N" => \$::opt_noswap, "nice|n=i" => \$::opt_nice, "process|pid|p=s" => \$::opt_pid, "suspend|s=s" => \$::opt_suspend, "recheck|t=s" => \$::opt_recheck, "quote|q" => \$::opt_quote, "help|h" => \$::opt_help, "verbose|v" => \$::opt_verbose, "version|V" => \$::opt_version, ); if(not $this_is_ARGV) { @{$array_ref} = @::ARGV; @::ARGV = @save_argv; } return @retval; } sub die_usage { help(); exit 1; } sub help { print q{ Usage: niceload [-v] [-n niceness] [-L loadavg] [-I io] [-N] [-M mem] [-s suspend_sec|-f factor] [-H] [-S] command or -p pid }; } sub die_bug { my $bugid = shift; print STDERR ("$Global::progname: This should not happen. You have found a bug.\n", "Please contact and include:\n", "* The version number: $Global::version\n", "* The bugid: $bugid\n", "* The command line being run\n", "* The files being read (put the files on a webserver if they are big)\n", "\n", "If you get the error on smaller/fewer files, please include those instead.\n"); exit(255); } sub usleep { # Sleep this many milliseconds. my $secs = shift; ::debug("Sleeping ",$secs," millisecs\n"); select(undef, undef, undef, $secs/1000); } sub debug { if($::opt_debug) { print STDERR @_; } } sub my_dump { # Returns: # ascii expression of object if Data::Dump(er) is installed # error code otherwise my @dump_this = (@_); eval "use Data::Dump qw(dump);"; if ($@) { # Data::Dump not installed eval "use Data::Dumper;"; if ($@) { my $err = "Neither Data::Dump nor Data::Dumper is installed\n". "Not dumping output\n"; print STDERR $err; return $err; } else { return Dumper(@dump_this); } } else { eval "use Data::Dump qw(dump);"; return (Data::Dump::dump(@dump_this)); } } sub version { # Returns: N/A print join("\n", "GNU $Global::progname $Global::version", "Copyright (C) 2004,2005,2006,2007,2008,2009 Ole Tange", "Copyright (C) 2010,2011 Ole Tange and Free Software Foundation, Inc.", "License GPLv3+: GNU GPL version 3 or later ", "This is free software: you are free to change and redistribute it.", "GNU $Global::progname comes with no warranty.", "", "Web site: http://www.gnu.org/software/parallel\n" ); } sub multiply_binary_prefix { # Evalualte numbers with binary prefix # 13G = 13*1024*1024*1024 = 13958643712 my $s = shift; $s =~ s/Ki?/*1024/gi; $s =~ s/Mi?/*1024*1024/gi; $s =~ s/Gi?/*1024*1024*1024/gi; $s =~ s/Ti?/*1024*1024*1024*1024/gi; $s =~ s/Pi?/*1024*1024*1024*1024*1024/gi; $s =~ s/Ei?/*1024*1024*1024*1024*1024*1024/gi; $s =~ s/Zi?/*1024*1024*1024*1024*1024*1024*1024/gi; $s =~ s/Yi?/*1024*1024*1024*1024*1024*1024*1024*1024/gi; $s =~ s/Xi?/*1024*1024*1024*1024*1024*1024*1024*1024*1024/gi; $s = eval $s; return $s; } sub max { # Returns: # Maximum value of array my $max; for (@_) { # Skip undefs defined $_ or next; defined $max or do { $max = $_; next; }; # Set $_ to the first non-undef $max = ($max > $_) ? $max : $_; } return $max; } package Process; sub new { my $class = shift; my $nice = shift; my @ARGV = @_; if($nice) { unshift(@ARGV, "nice", "-n", $nice); } return bless { 'running' => 0, # Is the process running now? 'command' => [@ARGV], }, ref($class) || $class; } sub set_pid { my $self = shift; $self->{'pid'} = shift; push(@{$self->{'pids'}},$self->{'pid'}); $self->{'running'} = 1; $::exitstatus = 0; } sub start { # Start the program my $self = shift; ::debug("Starting @{$self->{'command'}}\n"); $self->{'running'} = 1; if($self->{'pid'} = fork) { # set signal handler to kill children if parent is killed push @{$self->{'pids'}}, $self->{'pid'}; $Global::process = $self; $SIG{CHLD} = \&REAPER; $SIG{INT}=\&kill_child_INT; $SIG{TSTP}=\&kill_child_TSTP; $SIG{CONT}=\&kill_child_CONT; sleep 1; # Give child time to setpgrp(0,0); $self->{'pgrp'} = getpgrp $self->{'pid'}; } else { setpgrp(0,0); ::debug("Child pid: $$, pgrp: ",getpgrp $$,"\n"); ::debug("@{$self->{'command'}}\n"); if($::opt_quote) { system(@{$self->{'command'}}); } else { system("@{$self->{'command'}}"); } $::exitstatus = $? >> 8; $::exitsignal = $? & 127; ::debug("Child exit $::exitstatus\n"); exit($::exitstatus); } } use POSIX ":sys_wait_h"; sub REAPER { my $stiff; while (($stiff = waitpid(-1, &WNOHANG)) > 0) { # do something with $stiff if you want $::exitstatus = $? >> 8; $::exitsignal = $? & 127; } $SIG{CHLD} = \&REAPER; # install *after* calling waitpid } sub kill_child_CONT { my $self = $Global::process; ::debug("SIGCONT received. Killing $self->{'pid'}\n"); kill CONT => -getpgrp($self->{'pid'}); } sub kill_child_TSTP { my $self = $Global::process; ::debug("SIGTSTP received. Killing $self->{'pid'} and self ($$)\n"); kill TSTP => -getpgrp($self->{'pid'}); kill STOP => -$$; kill STOP => $$; } sub kill_child_INT { my $self = $Global::process; ::debug("SIGINT received. Killing $self->{'pid'} Exit\n"); kill INT => -getpgrp($self->{'pid'}); exit; } sub resume { my $self = shift; ::debug("Resume @{$self->{'pids'}}\n"); if(not $self->{'running'}) { # - = PID group map { kill "CONT", -$_ } @{$self->{'pids'}}; # If using -p it is not in a group map { kill "CONT", $_ } @{$self->{'pids'}}; $self->{'running'} = 1; } } sub suspend { my $self = shift; ::debug("Suspend @{$self->{'pids'}}\n"); if($self->{'running'}) { # - = PID group map { kill "STOP", -$_ } @{$self->{'pids'}}; # If using -p it is not in a group map { kill "STOP", $_ } @{$self->{'pids'}}; $self->{'running'} = 0; } } sub is_alive { # The process is dead if none of the pids exist my $self = shift; my ($exists) = 0; for my $pid (@{$self->{'pids'}}) { if(kill 0 => $pid) { $exists++ } } ::debug("is_alive: $exists\n"); return $exists; } package Limit; sub new { my $class = shift; my %limits = @_; my $hard = $::opt_soft ? 0 : $::opt_hard; my $runio = $::opt_run_io ? ::multiply_binary_prefix($::opt_run_io) : 0; my $startio = $::opt_start_io ? ::multiply_binary_prefix($::opt_start_io) : 0; my $runload = $::opt_run_load ? ::multiply_binary_prefix($::opt_run_load) : 0; my $startload = $::opt_start_load ? ::multiply_binary_prefix($::opt_start_load) : 0; my $runmem = $::opt_run_mem ? ::multiply_binary_prefix($::opt_run_mem) : 0; my $startmem = $::opt_start_mem ? ::multiply_binary_prefix($::opt_start_mem) : 0; my $runnoswap = $::opt_run_noswap ? ::multiply_binary_prefix($::opt_run_noswap) : 0; my $startnoswap = $::opt_start_noswap ? ::multiply_binary_prefix($::opt_start_noswap) : 0; return bless { 'hard' => $hard, 'recheck' => 1, # Default 'runtime' => 1, # Default 'runio' => $runio, 'startio' => $startio, 'runload' => $runload, 'startload' => $startload, 'runmem' => $runmem, 'startmem' => $startmem, 'runnoswap' => $runnoswap, 'startnoswap' => $startnoswap, 'factor' => $::opt_factor || 1, 'recheck' => $::opt_recheck || 1, 'runtime' => $::opt_recheck || 1, 'over_run_limit' => 1, 'over_start_limit' => 1, 'verbose' => $::opt_verbose, }, ref($class) || $class; } sub over_run_limit { my $self = shift; my $status = 0; if($self->{'runmem'}) { # mem should be between 0-10ish # 100% available => 0 (1-1) # 50% available => 1 (2-1) # 10% available => 9 (10-1) my $mem = $self->mem_status(); ::debug("Run memory: $self->{'runmem'}/$mem\n"); $status += (::max(1,$self->{'runmem'}/$mem)-1); } if($self->{'runload'}) { # load should be between 0-10ish # 0 load => 0 my $load = $self->load_status(); $status += ::max(0,$load - $self->{'runload'}); } if($self->{'runnoswap'}) { # swap should be between 0-10ish # swap in or swap out or no swap = 0 # else log(swapin*swapout) my $swap = $self->swap_status(); $status += log(::max(1, $swap - $self->{'runnoswap'})); } if($self->{'runio'}) { my $io = $self->io_status(); $status += ::max(0,$io - $self->{'runio'}); } $self->{'over_run_limit'} = $status; if(not $::opt_recheck) { $self->{'recheck'} = $self->{'factor'} * $self->{'over_run_limit'}; } ::debug("over_run_limit: $status\n"); return $self->{'over_run_limit'}; } sub over_start_limit { my $self = shift; my $status = 0; if($self->{'startmem'}) { # mem should be between 0-10ish # 100% available => 0 (1-1) # 50% available => 1 (2-1) # 10% available => 9 (10-1) my $mem = $self->mem_status(); ::debug("Start memory: $self->{'startmem'}/$mem\n"); $status += (::max(1,$self->{'startmem'}/$mem)-1); } if($self->{'startload'}) { # load should be between 0-10ish # 0 load => 0 my $load = $self->load_status(); $status += ::max(0,$load - $self->{'startload'}); } if($self->{'startnoswap'}) { # swap should be between 0-10ish # swap in or swap out or no swap = 0 # else log(swapin*swapout) my $swap = $self->swap_status(); $status += log(::max(1, $swap - $self->{'startnoswap'})); } if($self->{'startio'}) { my $io = $self->io_status(); $status += ::max(0,$io - $self->{'startio'}); } $self->{'over_start_limit'} = $status; if(not $::opt_recheck) { $self->{'recheck'} = $self->{'factor'} * $self->{'over_start_limit'}; } ::debug("over_start_limit: $status\n"); return $self->{'over_start_limit'}; } sub hard { my $self = shift; return $self->{'hard'}; } sub verbose { my $self = shift; return $self->{'verbose'}; } sub sleep_for_recheck { my $self = shift; if($self->{'recheck'} < 0.5) { # Never sleep less than 0.5 sec $self->{'recheck'} = 0.5; } if($self->verbose()) { $self->{'recheck'} = int($self->{'recheck'}*100)/100; print "Sleeping $self->{'recheck'}s\n"; } ::debug("recheck in $self->{'recheck'}s\n"); ::usleep(1); # For some reason this gets interrupted ::usleep(1000*$self->{'recheck'}); } sub sleep_while_running { my $self = shift; ::debug("check in $self->{'runtime'}s\n"); if($self->verbose()) { $self->{'runtime'} = int($self->{'runtime'}*100)/100; print "Running $self->{'runtime'}s\n"; } ::usleep(1); # For some reason this gets interrupted ::usleep(1000*$self->{'runtime'}); } sub load_status { # Returns: # loadavg my $self = shift; # Cache for some seconds if(not defined $self->{'load_status'} or $self->{'load_status_cache_time'}+$self->{'recheck'} < time) { $self->{'load_status'} = load_status_linux(); $self->{'load_status_cache_time'} = time; } ::debug("load_status: $self->{'load_status'}\n"); return $self->{'load_status'}; } sub load_status_linux { my ($loadavg); if(open(IN,"/proc/loadavg")) { # Linux specific (but fast) my $upString = ; if($upString =~ m/^(\d+\.\d+)/) { $loadavg = $1; } else { ::die_bug("proc_loadavg"); } close IN; } elsif (open(IN,"uptime|")) { my $upString = ; if($upString =~ m/average.\s*(\d+\.\d+)/) { $loadavg = $1; } else { ::die_bug("uptime"); } close IN; } return $loadavg; } sub swap_status { # Returns: # (swap in)*(swap out) kb my $self = shift; # Cache for some seconds if(not defined $self->{'swap_status'} or $self->{'swap_status_cache_time'}+$self->{'recheck'} < time) { my $status = swap_status_linux(); $self->{'swap_status'} = ::max($status,0); $self->{'swap_status_cache_time'} = time; } ::debug("swap_status: $self->{'swap_status'}\n"); return $self->{'swap_status'}; } sub swap_status_linux { my $swap_activity; $swap_activity = "vmstat 1 2 | tail -n1 | awk '{print \$7*\$8}'"; # Run swap_activity measuring. return qx{ $swap_activity }; } sub mem_status { # Returns: # number of bytes (free+cache) my $self = shift; # Cache for one second if(not defined $self->{'mem_status'} or $self->{'mem_status_cache_time'}+$self->{'recheck'} < time) { $self->{'mem_status'} = mem_status_linux(); $self->{'mem_status_cache_time'} = time; } ::debug("mem_status: $self->{'mem_status'}\n"); return $self->{'mem_status'}; } sub mem_status_linux { # total used free shared buffers cached # Mem: 3366496 2901664 464832 0 179228 1850692 # -/+ buffers/cache: 871744 2494752 # Swap: 6445476 1396860 5048616 my @free = `free`; my $free = (split(/\s+/,$free[2]))[3]; return $free*1024; } sub io_status { # Returns: # max percent for all devices my $self = shift; # Cache for one second if(not defined $self->{'io_status'} or $self->{'io_status_cache_time'}+$self->{'recheck'} < time) { $self->{'io_status'} = io_status_linux(); $self->{'io_status_cache_time'} = time; } ::debug("io_status: $self->{'io_status'}\n"); return $self->{'io_status'}; } sub io_status_linux { # Device: rrqm/s wrqm/s r/s w/s rkB/s wkB/s avgrq-sz avgqu-sz await r_await w_await svctm %util # sda 0.00 0.00 0.00 0.00 0.00 0.00 0.00 0.00 0.00 0.00 0.00 0.00 0.00 my @iostat_out = `LANG=C iostat -x 1 2`; # throw away all execpt the last Device:-section my @iostat; for(reverse @iostat_out) { /Device:/ and last; push @iostat, (split(/\s+/,$_))[13]; } my $io = ::max(@iostat); return $io/10; } $::exitsignal = $::exitstatus = 0; # Dummy