niceload: Implemented --runmem, --startmem, --load, --hard, --soft. Rewritten to OO.

This commit is contained in:
Ole Tange 2011-07-18 18:29:37 +02:00
parent c2a2b34abf
commit ebeeb2755d
3 changed files with 382 additions and 159 deletions

View file

@ -1,5 +1,13 @@
= Notes on how to release new version = = Notes on how to release new version =
== alpha/beta/production ==
Update documentation:
Modified => alpha
Unmodified alpha since last version => beta
Unmodified beta since last version => production
== Update version == == Update version ==
=== configure.ac === === configure.ac ===
@ -163,9 +171,9 @@ cc:Peter Simons <simons@cryp.to>, Sandro Cazzaniga <kharec@mandriva.org>,
Christian Faulhammer <fauli@gentoo.org>, Ryoichiro Suzuki <ryoichiro.suzuki@gmail.com>, Christian Faulhammer <fauli@gentoo.org>, Ryoichiro Suzuki <ryoichiro.suzuki@gmail.com>,
Jesse Alama <jesse.alama@gmail.com> Jesse Alama <jesse.alama@gmail.com>
Subject: GNU Parallel 20110722 ('XXX') released Subject: GNU Parallel 20110722 ('Murdoch') released
GNU Parallel 20110722 ('XXX') has been released. It is GNU Parallel 20110722 ('Murdoch') has been released. It is
available for download at: http://ftp.gnu.org/gnu/parallel/ available for download at: http://ftp.gnu.org/gnu/parallel/
New in this release: New in this release:

View file

@ -12,10 +12,10 @@ B<niceload> [-v] [-h] [-n nice] [-l load] [-t time] [-s time|-f factor] -p=PID
=head1 DESCRIPTION =head1 DESCRIPTION
GNU B<niceload> will run a program when the load average is below a GNU B<niceload> will slow down a program when the load average is
certain limit. When the limit is reached the program will be suspended above a certain limit. When the limit is reached the program will be
for some time. Then resumed again for some time. Then the load load suspended for some time. Then resumed again for some time. Then the
average is checked again and we start over. load average is checked again and we start over.
If the load is 3.00 then the default settings will run a program If the load is 3.00 then the default settings will run a program
like this: like this:
@ -51,9 +51,9 @@ Max load. The maximal load average before suspending command. Default
is 1.00. is 1.00.
=item B<-m> I<memory> =item B<--rm> I<memory>
=item B<--mem> I<memory> =item B<--runmem> I<memory>
Required free mem. I<memory> is computed as free memory + cache. Required free mem. I<memory> is computed as free memory + cache.
@ -296,7 +296,7 @@ B<parallel>(1), B<nice>(1)
use strict; use strict;
use Getopt::Long; use Getopt::Long;
$Global::progname="niceload"; $Global::progname="niceload";
$Global::version = 20110622; $Global::version = 20110718;
Getopt::Long::Configure("bundling","require_order"); Getopt::Long::Configure("bundling","require_order");
get_options_from_array(\@ARGV) || die_usage(); get_options_from_array(\@ARGV) || die_usage();
if($::opt_version) { if($::opt_version) {
@ -313,54 +313,6 @@ if($::opt_factor and $::opt_suspend) {
exit; exit;
} }
my $nice = $::opt_nice || 0; # -n=0 Nice level (Default: 0)
my $max_load = $::opt_load || 1; # -l=1 Max acceptable load average (Default: 1)
my $check_time = $::opt_recheck || 1; # -t=1 Seconds between checking load average (Default: 1)
my $min_mem = $::opt_mem ? multiply_binary_prefix($::opt_mem) : undef;
my $wait_factor;
my $wait_time = 1;
if($::opt_suspend) {
# --suspend=sec Seconds to suspend process when load average is too high
$wait_time = $::opt_suspend;
} else {
# --factor=1 compute wait_time dynamically as (load - limit) * factor
$wait_factor=$::opt_factor || 1;
}
my $processid = $::opt_pid; # Control this PID (Default: control the command)
my $verbose = $::opt_verbose || $::opt_debug;
my @program = @ARGV;
$SIG{CHLD} = \&REAPER;
if($processid) {
$Child::fork = $processid;
$::opt_verbose and print STDERR "Control $processid\n";
init_signal_handling_attached_child();
my $child_pgrp = getpgrp $Child::fork;
suspend_resume($min_mem,$max_load,$check_time,$wait_time,$wait_factor,$child_pgrp);
} elsif(@ARGV) {
if($Child::fork = fork) {
sleep 1; # Give child time to setpgrp(0,0);
init_signal_handling_my_child();
my $child_pgrp = getpgrp $Child::fork;
suspend_resume($min_mem,$max_load,$check_time,$wait_time,$wait_factor,$child_pgrp);
} else {
setpgrp(0,0);
debug("Child pid: $$, pgrp: ",getpgrp $$,"\n");
if($nice) {
unshift(@program,"nice","-n",$nice);
}
debug("@program\n");
system(@program);
debug("Child exit\n");
exit;
}
} else {
help();
exit;
}
sub get_options_from_array { sub get_options_from_array {
# Run GetOptions on @array # Run GetOptions on @array
# Returns: # Returns:
@ -381,7 +333,8 @@ sub get_options_from_array {
"factor|f=s" => \$::opt_factor, "factor|f=s" => \$::opt_factor,
"hard|H" => \$::opt_hard, "hard|H" => \$::opt_hard,
"load|l=s" => \$::opt_load, "load|l=s" => \$::opt_load,
"free|memory|mem|m=s" => \$::opt_mem, "sm|startmem|start-mem=s" => \$::opt_start_mem,
"rm|runmem|run-mem=s" => \$::opt_run_mem,
"nice|n=i" => \$::opt_nice, "nice|n=i" => \$::opt_nice,
"noswap|N" => \$::opt_noswap, "noswap|N" => \$::opt_noswap,
"process|pid|p=s" => \$::opt_pid, "process|pid|p=s" => \$::opt_pid,
@ -413,12 +366,42 @@ Usage:
}; };
} }
sub usleep {
# Sleep this many milliseconds.
my $secs = shift;
::debug("Sleeping ",$secs," millisecs\n");
select(undef, undef, undef, $secs/1000);
}
sub debug { sub debug {
if($::opt_debug) { if($::opt_debug) {
print STDERR @_; print STDERR @_;
} }
} }
sub my_dump {
# Returns:
# ascii expression of object if Data::Dump(er) is installed
# error code otherwise
my @dump_this = (@_);
eval "use Data::Dump qw(dump);";
if ($@) {
# Data::Dump not installed
eval "use Data::Dumper;";
if ($@) {
my $err = "Neither Data::Dump nor Data::Dumper is installed\n".
"Not dumping output\n";
print STDERR $err;
return $err;
} else {
return Dumper(@dump_this);
}
} else {
eval "use Data::Dump qw(dump);";
return (Data::Dump::dump(@dump_this));
}
}
sub version { sub version {
# Returns: N/A # Returns: N/A
print join("\n", print join("\n",
@ -433,20 +416,98 @@ sub version {
); );
} }
sub init_signal_handling_attached_child { sub multiply_binary_prefix {
$SIG{INT}=\&sigint_attached_child; # Evalualte numbers with binary prefix
# 13G = 13*1024*1024*1024 = 13958643712
my $s = shift;
$s =~ s/Ki?/*1024/gi;
$s =~ s/Mi?/*1024*1024/gi;
$s =~ s/Gi?/*1024*1024*1024/gi;
$s =~ s/Ti?/*1024*1024*1024*1024/gi;
$s =~ s/Pi?/*1024*1024*1024*1024*1024/gi;
$s =~ s/Ei?/*1024*1024*1024*1024*1024*1024/gi;
$s =~ s/Zi?/*1024*1024*1024*1024*1024*1024*1024/gi;
$s =~ s/Yi?/*1024*1024*1024*1024*1024*1024*1024*1024/gi;
$s =~ s/Xi?/*1024*1024*1024*1024*1024*1024*1024*1024*1024/gi;
$s = eval $s;
return $s;
} }
sub sigint_attached_child { sub max {
# Let the attached child continue when detaching # Returns:
kill_child_CONT(); # Maximum value of array
exit; my $max;
for (@_) {
# Skip undefs
defined $_ or next;
defined $max or do { $max = $_; next; }; # Set $_ to the first non-undef
$max = ($max > $_) ? $max : $_;
}
return $max;
} }
sub init_signal_handling_my_child { my $limit = Limit->new();
$SIG{INT}=\&kill_child_INT; my $process = Process->new($::opt_nice,@ARGV);
$SIG{TSTP}=\&kill_child_TSTP; if(not $::opt_pid) {
$SIG{CONT}=\&kill_child_CONT; # Wait until limit is below start_limit and run_limit
while($limit->over_start_limit()
or
($limit->hard() and $limit->over_run_limit())) {
$limit->sleep_for_recheck();
}
}
$process->start();
while($process->is_running()) {
if($limit->over_run_limit()) {
$process->suspend();
$limit->sleep_for_recheck();
if(not $limit->hard()) {
$process->resume();
$limit->sleep_while_running();
}
} else {
$process->resume();
$limit->sleep_while_running();
}
}
package Process;
sub new {
my $class = shift;
my $nice = shift;
my @ARGV = @_;
if($nice) {
unshift(@ARGV, "nice", "-n", $nice);
}
return bless {
'running' => 0, # Is the process running now?
'command' => [@ARGV],
}, ref($class) || $class;
}
sub start {
# Start the program
my $self = shift;
::debug("Starting @{$self->{'command'}}\n");
if($self->{'pid'} = fork) {
# set signal handler to kill children if parent is killed
push @{$self->{'pids'}}, $self->{'pid'};
$Global::process = $self;
$SIG{CHLD} = \&REAPER;
$SIG{INT}=\&kill_child_INT;
$SIG{TSTP}=\&kill_child_TSTP;
$SIG{CONT}=\&kill_child_CONT;
sleep 1; # Give child time to setpgrp(0,0);
$self->{'pgrp'} = getpgrp $self->{'pid'};
} else {
setpgrp(0,0);
::debug("Child pid: $$, pgrp: ",getpgrp $$,"\n");
::debug("@{$self->{'command'}}\n");
system("@{$self->{'command'}}");
::debug("Child exit\n");
exit;
}
} }
use POSIX ":sys_wait_h"; use POSIX ":sys_wait_h";
@ -460,71 +521,200 @@ sub REAPER {
} }
sub kill_child_CONT { sub kill_child_CONT {
debug("SIGCONT received. Killing $Child::fork\n"); my $self = $Global::process;
kill CONT => -getpgrp($Child::fork); ::debug("SIGCONT received. Killing $self->{'pid'}\n");
kill CONT => -getpgrp($self->{'pid'});
} }
sub kill_child_TSTP { sub kill_child_TSTP {
debug("SIGTSTP received. Killing $Child::fork and self\n"); my $self = $Global::process;
kill TSTP => -getpgrp($Child::fork); ::debug("SIGTSTP received. Killing $self->{'pid'} and self\n");
kill TSTP => -getpgrp($self->{'pid'});
kill STOP => -$$; kill STOP => -$$;
} }
sub kill_child_INT { sub kill_child_INT {
debug("SIGINT received. Killing $Child::fork Exit\n"); my $self = $Global::process;
kill INT => -getpgrp($Child::fork); ::debug("SIGINT received. Killing $self->{'pid'} Exit\n");
kill INT => -getpgrp($self->{'pid'});
exit; exit;
} }
sub suspend_resume { sub resume {
my ($min_mem,$max_load,$check_time,$wait_time,$wait_factor,@pids) = @_; my $self = shift;
debug("suspend_resume these @pids\n"); if(not $self->{'running'}) {
resume_pids(@pids); # - = PID group
while (pids_exist(@pids)) { map { kill "CONT", -$_ } @{$self->{'pids'}};
my ($loadavg, $mem_free, $swap, $resume); $self->{'running'} = 1;
if (defined $max_load and
($loadavg = loadavg()) > $max_load) {
if($wait_factor) {
$wait_time = ($loadavg - $max_load) * $wait_factor;
}
$::opt_verbose and print STDERR "niceload: load $loadavg. Suspending for $wait_time seconds\n";
suspend_pids(@pids);
sleep 1; # for some reason this statement is skipped
sleep $wait_time;
} elsif (defined($min_mem) and
($mem_free = mem_free()) < $min_mem) {
$::opt_verbose and print STDERR "niceload: mem free $mem_free. Suspending for $wait_time seconds\n";
suspend_pids(@pids);
sleep 1; # for some reason this statement is skipped
sleep $wait_time;
} elsif (defined($::opt_noswap) and
(swap_activity()) != 0) {
$::opt_verbose and print STDERR "niceload: swapping. Suspending for $wait_time seconds\n";
suspend_pids(@pids);
sleep 1; # for some reason this statement is skipped
sleep $wait_time;
} else {
$resume = 1;
}
if(not $::opt_hard or $resume) {
resume_pids(@pids);
$::opt_verbose and print STDERR "niceload: running for $check_time second(s)\n";
sleep($check_time);
}
} }
} }
sub pids_exist { sub suspend {
my (@pids) = @_; my $self = shift;
my ($exists) = 0; if($self->{'running'}) {
for my $pid (@pids) { # - = PID group
if(-e "/proc/".$pid) { $exists++ } map { kill "STOP", -$_ } @{$self->{'pids'}};
#if(kill 0 => $Child::fork) { $exists++ } $self->{'running'} = 0;
} }
}
sub is_running {
# The process is dead if none of the pids exist
my $self = shift;
my ($exists) = 0;
for my $pid (@{$self->{'pids'}}) {
if(kill 0 => $pid) { $exists++ }
}
::debug("is_running: $exists\n");
return $exists; return $exists;
} }
sub loadavg {
package Limit;
sub new {
my $class = shift;
my %limits = @_;
my $hard = $::opt_soft ? 0 : $::opt_hard;
my $startmem = $::opt_start_mem ? ::multiply_binary_prefix($::opt_start_mem) : 0;
my $runmem = $::opt_run_mem ? ::multiply_binary_prefix($::opt_run_mem) : 0;
return bless {
'hard' => $hard,
'recheck' => 1, # Default
'runtime' => 1, # Default
'load' => $::opt_load,
'startmem' => $startmem,
'runmem' => $runmem,
'swap' => $::opt_noswap,
'factor' => $::opt_factor || 1,
'recheck' => $::opt_recheck || 1,
'runtime' => 1,
'over_run_limit' => 1,
'over_start_limit' => 1,
'verbose' => $::opt_verbose,
}, ref($class) || $class;
}
sub over_run_limit {
my $self = shift;
my $status = 0;
if($self->{'runmem'}) {
# mem should be between 0-10ish
# 100% available => 0 (1-1)
# 50% available => 1 (2-1)
# 10% available => 9 (10-1)
my $mem = $self->mem_status();
# $status += (::max(1,$self->{'runmem'}/$mem)-1)*10;
::debug("Run memory: $self->{'runmem'}/$mem\n");
$status += (::max(1,$self->{'runmem'}/$mem)-1);
}
$status += $self->over_general_limit();
$self->{'over_run_limit'} = $status;
if(not $::opt_recheck) {
$self->{'recheck'} = $self->{'factor'} * $self->{'over_run_limit'};
}
::debug("over_run_limit: $status\n");
return $self->{'over_run_limit'};
}
sub over_start_limit {
my $self = shift;
my $status = 0;
if($self->{'startmem'}) {
# mem should be between 0-10ish
# 100% available => 0 (1-1)
# 50% available => 1 (2-1)
# 10% available => 9 (10-1)
my $mem = $self->mem_status();
# $status += (::max(1,$self->{'startmem'}/$mem)-1)*10;
$status += (::max(1,$self->{'startmem'}/$mem)-1);
}
$self->{'over_start_limit'} = $status;
if(not $::opt_recheck) {
# Wait at least 0.5s. Otherwise niceload might cause the load
$self->{'recheck'} = $self->{'factor'} * $self->{'over_start_limit'};
}
::debug("over_start_limit: $status\n");
return $self->{'over_start_limit'};
}
sub over_general_limit {
# Return:
# 0 if under all limits
# >0 if over limit
my $self = shift;
my $status = 0;
if($self->{'load'}) {
# load should be between 0-10ish
# 0 load => 0
my $load = $self->load_status();
$status += ::max(0,$load - $self->{'load'});
}
if($self->{'swap'}) {
# swap should be between 0-10ish
# swap in or swap out or no swap = 0
# else log(swapin*swapout)
my $swap = $self->swap_status();
$status += log(::max(1, $swap - $self->{'swap'}));
}
if($self->{'io'}) {
my $io = $self->io_status();
$status += max(0,$io - $self->{'io'});
}
return $status;
}
sub hard {
my $self = shift;
return $self->{'hard'};
}
sub verbose {
my $self = shift;
return $self->{'verbose'};
}
sub sleep_for_recheck {
my $self = shift;
if($self->{'recheck'} < 0.5) {
# Never sleep less than 0.5 sec
$self->{'recheck'} = 0.5;
}
if($self->verbose()) {
$self->{'recheck'} = int($self->{'recheck'}*100)/100;
print "Sleeping $self->{'recheck'}s\n";
}
::debug("recheck in $self->{'recheck'}s\n");
::usleep(1000*$self->{'recheck'});
}
sub sleep_while_running {
my $self = shift;
::debug("check in $self->{'runtime'}s\n");
if($self->verbose()) {
$self->{'runtime'} = int($self->{'runtime'}*100)/100;
print "Running $self->{'runtime'}s\n";
}
::usleep(1); # For some reason this gets interrupted
::usleep(1000*$self->{'runtime'});
}
sub load_status {
# Returns:
# loadavg
my $self = shift;
# Cache for some seconds
if(not defined $self->{'load_status'} or
$self->{'load_status_cache_time'}+$self->{'recheck'} < time) {
$self->{'load_status'} = load_status_linux();
$self->{'load_status_cache_time'} = time;
}
::debug("load_status: $self->{'load_status'}\n");
return $self->{'load_status'};
}
sub load_status_linux {
my ($loadavg); my ($loadavg);
if(open(IN,"/proc/loadavg")) { if(open(IN,"/proc/loadavg")) {
# Linux specific (but fast) # Linux specific (but fast)
@ -547,7 +737,43 @@ sub loadavg {
return $loadavg; return $loadavg;
} }
sub mem_free { sub swap_status {
# Returns:
# (swap in)*(swap out) kb
my $self = shift;
# Cache for some seconds
if(not defined $self->{'swap_status'} or
$self->{'swap_status_cache_time'}+$self->{'recheck'} < time) {
my $status = swap_status_linux();
$self->{'swap_status'} = ::max($status-$self->{'swap'},0);
$self->{'swap_status_cache_time'} = time;
}
::debug("swap_status: $self->{'swap_status'}\n");
return $self->{'swap_status'};
}
sub swap_status_linux {
my $swap_activity;
$swap_activity = "vmstat 1 2 | tail -n1 | awk '{print \$7*\$8}'";
# Run swap_activity measuring.
return qx{ $swap_activity };
}
sub mem_status {
# Returns:
# number of bytes (free+cache)
my $self = shift;
# Cache for one second
if(not defined $self->{'mem_status'} or
$self->{'mem_status_cache_time'}+$self->{'recheck'} < time) {
$self->{'mem_status'} = mem_status_linux();
$self->{'mem_status_cache_time'} = time;
}
::debug("mem_status: $self->{'mem_status'}\n");
return $self->{'mem_status'};
}
sub mem_status_linux {
# total used free shared buffers cached # total used free shared buffers cached
# Mem: 3366496 2901664 464832 0 179228 1850692 # Mem: 3366496 2901664 464832 0 179228 1850692
# -/+ buffers/cache: 871744 2494752 # -/+ buffers/cache: 871744 2494752
@ -557,48 +783,36 @@ sub mem_free {
return $free*1024; return $free*1024;
} }
sub swap_activity { sub io_status {
my $swap_activity; # Returns:
$swap_activity = "vmstat 1 2 | tail -n1 | awk '{print \$7*\$8}'"; # max percent for all devices
# Run swap_activity measuring. my $self = shift;
return qx{ $swap_activity }; # Cache for one second
} if(not defined $self->{'io_status'} or
$self->{'io_status_cache_time'}+$self->{'recheck'} < time) {
sub suspend_pids { $self->{'io_status'} = io_status_linux();
my @pids = @_; $self->{'io_status_cache_time'} = time;
signal_pids("STOP",@pids);
}
sub resume_pids {
my @pids = @_;
signal_pids("CONT",@pids);
}
sub signal_pids {
my ($signal,@pids) = @_;
# local $SIG{$signal} = 'IGNORE';
for my $pid (@pids) {
kill $signal => -$pid; # stop PID group
} }
return $self->{'io_status'};
} }
sub multiply_binary_prefix { sub io_status_linux {
# Evalualte numbers with binary prefix # Device: rrqm/s wrqm/s r/s w/s rkB/s wkB/s avgrq-sz avgqu-sz await r_await w_await svctm %util
# 13G = 13*1024*1024*1024 = 13958643712 # sda 0.00 0.00 0.00 0.00 0.00 0.00 0.00 0.00 0.00 0.00 0.00 0.00 0.00
my $s = shift; # sdb 0.00 0.00 0.00 0.00 0.00 0.00 0.00 0.00 0.00 0.00 0.00 0.00 0.00
$s =~ s/Ki?/*1024/gi; # sdd 0.00 0.00 0.00 0.00 0.00 0.00 0.00 0.00 0.00 0.00 0.00 0.00 0.00
$s =~ s/Mi?/*1024*1024/gi; # sde 0.00 0.00 0.00 0.00 0.00 0.00 0.00 0.00 0.00 0.00 0.00 0.00 0.00
$s =~ s/Gi?/*1024*1024*1024/gi; # sdf 0.00 0.00 0.00 0.00 0.00 0.00 0.00 0.00 0.00 0.00 0.00 0.00 0.00
$s =~ s/Ti?/*1024*1024*1024*1024/gi; # dm-0 0.00 0.00 0.00 0.00 0.00 0.00 0.00 0.00 0.00 0.00 0.00 0.00 0.00
$s =~ s/Pi?/*1024*1024*1024*1024*1024/gi; # sdg 0.00 0.00 0.00 0.00 0.00 0.00 0.00 0.00 0.00 0.00 0.00 0.00 0.00
$s =~ s/Ei?/*1024*1024*1024*1024*1024*1024/gi; my @iostat_out = `LANG=C iostat -x 1 2`;
$s =~ s/Zi?/*1024*1024*1024*1024*1024*1024*1024/gi; # throw away all execpt the last Device:-section
$s =~ s/Yi?/*1024*1024*1024*1024*1024*1024*1024*1024/gi; my @iostat = grep (/1/../Device:/, reverse @iostat_out);
$s =~ s/Xi?/*1024*1024*1024*1024*1024*1024*1024*1024*1024/gi; print @iostat;
$s = eval $s;
return $s; my $io = (split(/\s+/,$iostat[2]))[3];
return $io*1024;
} }
# Keep -w happy # Keep -w happy
$::opt_soft = 1; # = 1;

View file

@ -60,7 +60,8 @@ http://www.youtube.com/watch?v=1ntxT-47VPA
=item I<command> =item I<command>
Command to execute. If I<command> or the following arguments contain Command to execute. If I<command> or the following arguments contain
{} every instance will be substituted with the input line. replacement strings (such as B<{}>) every instance will be substituted
with the input.
If I<command> is given, GNU B<parallel> will behave similar to B<xargs>. If If I<command> is given, GNU B<parallel> will behave similar to B<xargs>. If
I<command> is not given GNU B<parallel> will behave similar to B<cat | sh>. I<command> is not given GNU B<parallel> will behave similar to B<cat | sh>.