diff --git a/doc/release_new_version b/doc/release_new_version index f3e558e4..555fe9d1 100644 --- a/doc/release_new_version +++ b/doc/release_new_version @@ -1,5 +1,13 @@ = Notes on how to release new version = +== alpha/beta/production == + +Update documentation: + +Modified => alpha +Unmodified alpha since last version => beta +Unmodified beta since last version => production + == Update version == === configure.ac === @@ -163,9 +171,9 @@ cc:Peter Simons , Sandro Cazzaniga , Christian Faulhammer , Ryoichiro Suzuki , Jesse Alama -Subject: GNU Parallel 20110722 ('XXX') released +Subject: GNU Parallel 20110722 ('Murdoch') released -GNU Parallel 20110722 ('XXX') has been released. It is +GNU Parallel 20110722 ('Murdoch') has been released. It is available for download at: http://ftp.gnu.org/gnu/parallel/ New in this release: diff --git a/src/niceload b/src/niceload index 88a42570..8b35420d 100755 --- a/src/niceload +++ b/src/niceload @@ -12,10 +12,10 @@ B [-v] [-h] [-n nice] [-l load] [-t time] [-s time|-f factor] -p=PID =head1 DESCRIPTION -GNU B will run a program when the load average is below a -certain limit. When the limit is reached the program will be suspended -for some time. Then resumed again for some time. Then the load load -average is checked again and we start over. +GNU B will slow down a program when the load average is +above a certain limit. When the limit is reached the program will be +suspended for some time. Then resumed again for some time. Then the +load average is checked again and we start over. If the load is 3.00 then the default settings will run a program like this: @@ -51,9 +51,9 @@ Max load. The maximal load average before suspending command. Default is 1.00. -=item B<-m> I +=item B<--rm> I -=item B<--mem> I +=item B<--runmem> I Required free mem. I is computed as free memory + cache. @@ -296,7 +296,7 @@ B(1), B(1) use strict; use Getopt::Long; $Global::progname="niceload"; -$Global::version = 20110622; +$Global::version = 20110718; Getopt::Long::Configure("bundling","require_order"); get_options_from_array(\@ARGV) || die_usage(); if($::opt_version) { @@ -313,54 +313,6 @@ if($::opt_factor and $::opt_suspend) { exit; } -my $nice = $::opt_nice || 0; # -n=0 Nice level (Default: 0) -my $max_load = $::opt_load || 1; # -l=1 Max acceptable load average (Default: 1) -my $check_time = $::opt_recheck || 1; # -t=1 Seconds between checking load average (Default: 1) -my $min_mem = $::opt_mem ? multiply_binary_prefix($::opt_mem) : undef; - - -my $wait_factor; -my $wait_time = 1; -if($::opt_suspend) { - # --suspend=sec Seconds to suspend process when load average is too high - $wait_time = $::opt_suspend; -} else { - # --factor=1 compute wait_time dynamically as (load - limit) * factor - $wait_factor=$::opt_factor || 1; -} -my $processid = $::opt_pid; # Control this PID (Default: control the command) -my $verbose = $::opt_verbose || $::opt_debug; -my @program = @ARGV; -$SIG{CHLD} = \&REAPER; - -if($processid) { - $Child::fork = $processid; - $::opt_verbose and print STDERR "Control $processid\n"; - init_signal_handling_attached_child(); - my $child_pgrp = getpgrp $Child::fork; - suspend_resume($min_mem,$max_load,$check_time,$wait_time,$wait_factor,$child_pgrp); -} elsif(@ARGV) { - if($Child::fork = fork) { - sleep 1; # Give child time to setpgrp(0,0); - init_signal_handling_my_child(); - my $child_pgrp = getpgrp $Child::fork; - suspend_resume($min_mem,$max_load,$check_time,$wait_time,$wait_factor,$child_pgrp); - } else { - setpgrp(0,0); - debug("Child pid: $$, pgrp: ",getpgrp $$,"\n"); - if($nice) { - unshift(@program,"nice","-n",$nice); - } - debug("@program\n"); - system(@program); - debug("Child exit\n"); - exit; - } -} else { - help(); - exit; -} - sub get_options_from_array { # Run GetOptions on @array # Returns: @@ -381,7 +333,8 @@ sub get_options_from_array { "factor|f=s" => \$::opt_factor, "hard|H" => \$::opt_hard, "load|l=s" => \$::opt_load, - "free|memory|mem|m=s" => \$::opt_mem, + "sm|startmem|start-mem=s" => \$::opt_start_mem, + "rm|runmem|run-mem=s" => \$::opt_run_mem, "nice|n=i" => \$::opt_nice, "noswap|N" => \$::opt_noswap, "process|pid|p=s" => \$::opt_pid, @@ -413,12 +366,42 @@ Usage: }; } +sub usleep { + # Sleep this many milliseconds. + my $secs = shift; + ::debug("Sleeping ",$secs," millisecs\n"); + select(undef, undef, undef, $secs/1000); +} + sub debug { if($::opt_debug) { print STDERR @_; } } +sub my_dump { + # Returns: + # ascii expression of object if Data::Dump(er) is installed + # error code otherwise + my @dump_this = (@_); + eval "use Data::Dump qw(dump);"; + if ($@) { + # Data::Dump not installed + eval "use Data::Dumper;"; + if ($@) { + my $err = "Neither Data::Dump nor Data::Dumper is installed\n". + "Not dumping output\n"; + print STDERR $err; + return $err; + } else { + return Dumper(@dump_this); + } + } else { + eval "use Data::Dump qw(dump);"; + return (Data::Dump::dump(@dump_this)); + } +} + sub version { # Returns: N/A print join("\n", @@ -432,21 +415,99 @@ sub version { "Web site: http://www.gnu.org/software/parallel\n" ); } - -sub init_signal_handling_attached_child { - $SIG{INT}=\&sigint_attached_child; + +sub multiply_binary_prefix { + # Evalualte numbers with binary prefix + # 13G = 13*1024*1024*1024 = 13958643712 + my $s = shift; + $s =~ s/Ki?/*1024/gi; + $s =~ s/Mi?/*1024*1024/gi; + $s =~ s/Gi?/*1024*1024*1024/gi; + $s =~ s/Ti?/*1024*1024*1024*1024/gi; + $s =~ s/Pi?/*1024*1024*1024*1024*1024/gi; + $s =~ s/Ei?/*1024*1024*1024*1024*1024*1024/gi; + $s =~ s/Zi?/*1024*1024*1024*1024*1024*1024*1024/gi; + $s =~ s/Yi?/*1024*1024*1024*1024*1024*1024*1024*1024/gi; + $s =~ s/Xi?/*1024*1024*1024*1024*1024*1024*1024*1024*1024/gi; + $s = eval $s; + return $s; } -sub sigint_attached_child { - # Let the attached child continue when detaching - kill_child_CONT(); - exit; +sub max { + # Returns: + # Maximum value of array + my $max; + for (@_) { + # Skip undefs + defined $_ or next; + defined $max or do { $max = $_; next; }; # Set $_ to the first non-undef + $max = ($max > $_) ? $max : $_; + } + return $max; } -sub init_signal_handling_my_child { - $SIG{INT}=\&kill_child_INT; - $SIG{TSTP}=\&kill_child_TSTP; - $SIG{CONT}=\&kill_child_CONT; +my $limit = Limit->new(); +my $process = Process->new($::opt_nice,@ARGV); +if(not $::opt_pid) { + # Wait until limit is below start_limit and run_limit + while($limit->over_start_limit() + or + ($limit->hard() and $limit->over_run_limit())) { + $limit->sleep_for_recheck(); + } +} +$process->start(); +while($process->is_running()) { + if($limit->over_run_limit()) { + $process->suspend(); + $limit->sleep_for_recheck(); + if(not $limit->hard()) { + $process->resume(); + $limit->sleep_while_running(); + } + } else { + $process->resume(); + $limit->sleep_while_running(); + } +} + +package Process; + +sub new { + my $class = shift; + my $nice = shift; + my @ARGV = @_; + if($nice) { + unshift(@ARGV, "nice", "-n", $nice); + } + return bless { + 'running' => 0, # Is the process running now? + 'command' => [@ARGV], + }, ref($class) || $class; +} + +sub start { + # Start the program + my $self = shift; + ::debug("Starting @{$self->{'command'}}\n"); + if($self->{'pid'} = fork) { + # set signal handler to kill children if parent is killed + push @{$self->{'pids'}}, $self->{'pid'}; + $Global::process = $self; + $SIG{CHLD} = \&REAPER; + $SIG{INT}=\&kill_child_INT; + $SIG{TSTP}=\&kill_child_TSTP; + $SIG{CONT}=\&kill_child_CONT; + sleep 1; # Give child time to setpgrp(0,0); + $self->{'pgrp'} = getpgrp $self->{'pid'}; + } else { + setpgrp(0,0); + ::debug("Child pid: $$, pgrp: ",getpgrp $$,"\n"); + ::debug("@{$self->{'command'}}\n"); + system("@{$self->{'command'}}"); + ::debug("Child exit\n"); + exit; + } } use POSIX ":sys_wait_h"; @@ -460,71 +521,200 @@ sub REAPER { } sub kill_child_CONT { - debug("SIGCONT received. Killing $Child::fork\n"); - kill CONT => -getpgrp($Child::fork); + my $self = $Global::process; + ::debug("SIGCONT received. Killing $self->{'pid'}\n"); + kill CONT => -getpgrp($self->{'pid'}); } sub kill_child_TSTP { - debug("SIGTSTP received. Killing $Child::fork and self\n"); - kill TSTP => -getpgrp($Child::fork); + my $self = $Global::process; + ::debug("SIGTSTP received. Killing $self->{'pid'} and self\n"); + kill TSTP => -getpgrp($self->{'pid'}); kill STOP => -$$; } sub kill_child_INT { - debug("SIGINT received. Killing $Child::fork Exit\n"); - kill INT => -getpgrp($Child::fork); + my $self = $Global::process; + ::debug("SIGINT received. Killing $self->{'pid'} Exit\n"); + kill INT => -getpgrp($self->{'pid'}); exit; } -sub suspend_resume { - my ($min_mem,$max_load,$check_time,$wait_time,$wait_factor,@pids) = @_; - debug("suspend_resume these @pids\n"); - resume_pids(@pids); - while (pids_exist(@pids)) { - my ($loadavg, $mem_free, $swap, $resume); - if (defined $max_load and - ($loadavg = loadavg()) > $max_load) { - if($wait_factor) { - $wait_time = ($loadavg - $max_load) * $wait_factor; - } - $::opt_verbose and print STDERR "niceload: load $loadavg. Suspending for $wait_time seconds\n"; - suspend_pids(@pids); - sleep 1; # for some reason this statement is skipped - sleep $wait_time; - } elsif (defined($min_mem) and - ($mem_free = mem_free()) < $min_mem) { - $::opt_verbose and print STDERR "niceload: mem free $mem_free. Suspending for $wait_time seconds\n"; - suspend_pids(@pids); - sleep 1; # for some reason this statement is skipped - sleep $wait_time; - } elsif (defined($::opt_noswap) and - (swap_activity()) != 0) { - $::opt_verbose and print STDERR "niceload: swapping. Suspending for $wait_time seconds\n"; - suspend_pids(@pids); - sleep 1; # for some reason this statement is skipped - sleep $wait_time; - } else { - $resume = 1; - } - if(not $::opt_hard or $resume) { - resume_pids(@pids); - $::opt_verbose and print STDERR "niceload: running for $check_time second(s)\n"; - sleep($check_time); - } +sub resume { + my $self = shift; + if(not $self->{'running'}) { + # - = PID group + map { kill "CONT", -$_ } @{$self->{'pids'}}; + $self->{'running'} = 1; } } - -sub pids_exist { - my (@pids) = @_; - my ($exists) = 0; - for my $pid (@pids) { - if(-e "/proc/".$pid) { $exists++ } - #if(kill 0 => $Child::fork) { $exists++ } + +sub suspend { + my $self = shift; + if($self->{'running'}) { + # - = PID group + map { kill "STOP", -$_ } @{$self->{'pids'}}; + $self->{'running'} = 0; } +} + +sub is_running { + # The process is dead if none of the pids exist + my $self = shift; + my ($exists) = 0; + for my $pid (@{$self->{'pids'}}) { + if(kill 0 => $pid) { $exists++ } + } + ::debug("is_running: $exists\n"); return $exists; } -sub loadavg { + +package Limit; + +sub new { + my $class = shift; + my %limits = @_; + my $hard = $::opt_soft ? 0 : $::opt_hard; + my $startmem = $::opt_start_mem ? ::multiply_binary_prefix($::opt_start_mem) : 0; + my $runmem = $::opt_run_mem ? ::multiply_binary_prefix($::opt_run_mem) : 0; + + return bless { + 'hard' => $hard, + 'recheck' => 1, # Default + 'runtime' => 1, # Default + 'load' => $::opt_load, + 'startmem' => $startmem, + 'runmem' => $runmem, + 'swap' => $::opt_noswap, + 'factor' => $::opt_factor || 1, + 'recheck' => $::opt_recheck || 1, + 'runtime' => 1, + 'over_run_limit' => 1, + 'over_start_limit' => 1, + 'verbose' => $::opt_verbose, + }, ref($class) || $class; +} + +sub over_run_limit { + my $self = shift; + my $status = 0; + if($self->{'runmem'}) { + # mem should be between 0-10ish + # 100% available => 0 (1-1) + # 50% available => 1 (2-1) + # 10% available => 9 (10-1) + my $mem = $self->mem_status(); +# $status += (::max(1,$self->{'runmem'}/$mem)-1)*10; + ::debug("Run memory: $self->{'runmem'}/$mem\n"); + $status += (::max(1,$self->{'runmem'}/$mem)-1); + } + $status += $self->over_general_limit(); + $self->{'over_run_limit'} = $status; + if(not $::opt_recheck) { + $self->{'recheck'} = $self->{'factor'} * $self->{'over_run_limit'}; + } + ::debug("over_run_limit: $status\n"); + return $self->{'over_run_limit'}; +} + +sub over_start_limit { + my $self = shift; + my $status = 0; + if($self->{'startmem'}) { + # mem should be between 0-10ish + # 100% available => 0 (1-1) + # 50% available => 1 (2-1) + # 10% available => 9 (10-1) + my $mem = $self->mem_status(); +# $status += (::max(1,$self->{'startmem'}/$mem)-1)*10; + $status += (::max(1,$self->{'startmem'}/$mem)-1); + } + $self->{'over_start_limit'} = $status; + if(not $::opt_recheck) { + # Wait at least 0.5s. Otherwise niceload might cause the load + $self->{'recheck'} = $self->{'factor'} * $self->{'over_start_limit'}; + } + ::debug("over_start_limit: $status\n"); + return $self->{'over_start_limit'}; +} + +sub over_general_limit { + # Return: + # 0 if under all limits + # >0 if over limit + my $self = shift; + my $status = 0; + if($self->{'load'}) { + # load should be between 0-10ish + # 0 load => 0 + my $load = $self->load_status(); + $status += ::max(0,$load - $self->{'load'}); + } + if($self->{'swap'}) { + # swap should be between 0-10ish + # swap in or swap out or no swap = 0 + # else log(swapin*swapout) + my $swap = $self->swap_status(); + $status += log(::max(1, $swap - $self->{'swap'})); + } + if($self->{'io'}) { + my $io = $self->io_status(); + $status += max(0,$io - $self->{'io'}); + } + return $status; +} + +sub hard { + my $self = shift; + return $self->{'hard'}; +} + +sub verbose { + my $self = shift; + return $self->{'verbose'}; +} + +sub sleep_for_recheck { + my $self = shift; + if($self->{'recheck'} < 0.5) { + # Never sleep less than 0.5 sec + $self->{'recheck'} = 0.5; + } + if($self->verbose()) { + $self->{'recheck'} = int($self->{'recheck'}*100)/100; + print "Sleeping $self->{'recheck'}s\n"; + } + ::debug("recheck in $self->{'recheck'}s\n"); + ::usleep(1000*$self->{'recheck'}); +} + +sub sleep_while_running { + my $self = shift; + ::debug("check in $self->{'runtime'}s\n"); + if($self->verbose()) { + $self->{'runtime'} = int($self->{'runtime'}*100)/100; + print "Running $self->{'runtime'}s\n"; + } + ::usleep(1); # For some reason this gets interrupted + ::usleep(1000*$self->{'runtime'}); +} + +sub load_status { + # Returns: + # loadavg + my $self = shift; + # Cache for some seconds + if(not defined $self->{'load_status'} or + $self->{'load_status_cache_time'}+$self->{'recheck'} < time) { + $self->{'load_status'} = load_status_linux(); + $self->{'load_status_cache_time'} = time; + } + ::debug("load_status: $self->{'load_status'}\n"); + return $self->{'load_status'}; +} + +sub load_status_linux { my ($loadavg); if(open(IN,"/proc/loadavg")) { # Linux specific (but fast) @@ -547,7 +737,43 @@ sub loadavg { return $loadavg; } -sub mem_free { +sub swap_status { + # Returns: + # (swap in)*(swap out) kb + my $self = shift; + # Cache for some seconds + if(not defined $self->{'swap_status'} or + $self->{'swap_status_cache_time'}+$self->{'recheck'} < time) { + my $status = swap_status_linux(); + $self->{'swap_status'} = ::max($status-$self->{'swap'},0); + $self->{'swap_status_cache_time'} = time; + } + ::debug("swap_status: $self->{'swap_status'}\n"); + return $self->{'swap_status'}; +} + +sub swap_status_linux { + my $swap_activity; + $swap_activity = "vmstat 1 2 | tail -n1 | awk '{print \$7*\$8}'"; + # Run swap_activity measuring. + return qx{ $swap_activity }; +} + +sub mem_status { + # Returns: + # number of bytes (free+cache) + my $self = shift; + # Cache for one second + if(not defined $self->{'mem_status'} or + $self->{'mem_status_cache_time'}+$self->{'recheck'} < time) { + $self->{'mem_status'} = mem_status_linux(); + $self->{'mem_status_cache_time'} = time; + } + ::debug("mem_status: $self->{'mem_status'}\n"); + return $self->{'mem_status'}; +} + +sub mem_status_linux { # total used free shared buffers cached # Mem: 3366496 2901664 464832 0 179228 1850692 # -/+ buffers/cache: 871744 2494752 @@ -557,48 +783,36 @@ sub mem_free { return $free*1024; } -sub swap_activity { - my $swap_activity; - $swap_activity = "vmstat 1 2 | tail -n1 | awk '{print \$7*\$8}'"; - # Run swap_activity measuring. - return qx{ $swap_activity }; -} - -sub suspend_pids { - my @pids = @_; - signal_pids("STOP",@pids); -} - -sub resume_pids { - my @pids = @_; - signal_pids("CONT",@pids); -} - -sub signal_pids { - my ($signal,@pids) = @_; - - # local $SIG{$signal} = 'IGNORE'; - for my $pid (@pids) { - kill $signal => -$pid; # stop PID group +sub io_status { + # Returns: + # max percent for all devices + my $self = shift; + # Cache for one second + if(not defined $self->{'io_status'} or + $self->{'io_status_cache_time'}+$self->{'recheck'} < time) { + $self->{'io_status'} = io_status_linux(); + $self->{'io_status_cache_time'} = time; } + return $self->{'io_status'}; } -sub multiply_binary_prefix { - # Evalualte numbers with binary prefix - # 13G = 13*1024*1024*1024 = 13958643712 - my $s = shift; - $s =~ s/Ki?/*1024/gi; - $s =~ s/Mi?/*1024*1024/gi; - $s =~ s/Gi?/*1024*1024*1024/gi; - $s =~ s/Ti?/*1024*1024*1024*1024/gi; - $s =~ s/Pi?/*1024*1024*1024*1024*1024/gi; - $s =~ s/Ei?/*1024*1024*1024*1024*1024*1024/gi; - $s =~ s/Zi?/*1024*1024*1024*1024*1024*1024*1024/gi; - $s =~ s/Yi?/*1024*1024*1024*1024*1024*1024*1024*1024/gi; - $s =~ s/Xi?/*1024*1024*1024*1024*1024*1024*1024*1024*1024/gi; - $s = eval $s; - return $s; +sub io_status_linux { +# Device: rrqm/s wrqm/s r/s w/s rkB/s wkB/s avgrq-sz avgqu-sz await r_await w_await svctm %util +# sda 0.00 0.00 0.00 0.00 0.00 0.00 0.00 0.00 0.00 0.00 0.00 0.00 0.00 +# sdb 0.00 0.00 0.00 0.00 0.00 0.00 0.00 0.00 0.00 0.00 0.00 0.00 0.00 +# sdd 0.00 0.00 0.00 0.00 0.00 0.00 0.00 0.00 0.00 0.00 0.00 0.00 0.00 +# sde 0.00 0.00 0.00 0.00 0.00 0.00 0.00 0.00 0.00 0.00 0.00 0.00 0.00 +# sdf 0.00 0.00 0.00 0.00 0.00 0.00 0.00 0.00 0.00 0.00 0.00 0.00 0.00 +# dm-0 0.00 0.00 0.00 0.00 0.00 0.00 0.00 0.00 0.00 0.00 0.00 0.00 0.00 +# sdg 0.00 0.00 0.00 0.00 0.00 0.00 0.00 0.00 0.00 0.00 0.00 0.00 0.00 + my @iostat_out = `LANG=C iostat -x 1 2`; + # throw away all execpt the last Device:-section + my @iostat = grep (/1/../Device:/, reverse @iostat_out); + print @iostat; + + my $io = (split(/\s+/,$iostat[2]))[3]; + return $io*1024; } # Keep -w happy -$::opt_soft = 1; +# = 1; diff --git a/src/parallel.pod b/src/parallel.pod index fcca00ff..d28c136f 100644 --- a/src/parallel.pod +++ b/src/parallel.pod @@ -60,7 +60,8 @@ http://www.youtube.com/watch?v=1ntxT-47VPA =item I Command to execute. If I or the following arguments contain -{} every instance will be substituted with the input line. +replacement strings (such as B<{}>) every instance will be substituted +with the input. If I is given, GNU B will behave similar to B. If I is not given GNU B will behave similar to B.