parallel: --memfree prototype.

This commit is contained in:
Ole Tange 2014-12-15 11:09:43 +01:00
parent f2402e1e83
commit 962ad80ccd

View file

@ -688,6 +688,7 @@ sub options_hash {
"xapply" => \$opt::xapply,
"bibtex" => \$opt::bibtex,
"nn|nonotice|no-notice" => \$opt::no_notice,
"memfree=s" => \$opt::memfree,
# xargs-compatibility - implemented, man, testsuite
"max-procs|P=s" => \$opt::jobs,
"delimiter|d=s" => \$opt::d,
@ -816,6 +817,7 @@ sub parse_options {
not defined $opt::recend) { $opt::recend = "\n"; }
if(not defined $opt::blocksize) { $opt::blocksize = "1M"; }
$opt::blocksize = multiply_binary_prefix($opt::blocksize);
$opt::memfree = multiply_binary_prefix($opt::memfree);
if(defined $opt::controlmaster) { $opt::noctrlc = 1; }
if(defined $opt::halt_on_error and
$opt::halt_on_error=~/%/) { $opt::halt_on_error /= 100; }
@ -1508,9 +1510,10 @@ sub shell_quote_scalar {
# $shell_quoted = string quoted with \ as needed by the shell
my $a = $_[0];
if(defined $a) {
# $a =~ s/([\002-\011\013-\032\\\#\?\`\(\)\{\}\[\]\*\>\<\~\|\; \"\!\$\&\'\202-\377])/\\$1/g;
# Solaris sh wants ^ quoted.
# $a =~ s/([\002-\011\013-\032\\\#\?\`\(\)\{\}\[\]\^\*\>\<\~\|\; \"\!\$\&\'\202-\377])/\\$1/g;
# This is 1% faster than the above
$a =~ s/[\002-\011\013-\032\\\#\?\`\(\)\{\}\[\]\*\>\<\~\|\; \"\!\$\&\'\202-\377]/\\$&/go;
$a =~ s/[\002-\011\013-\032\\\#\?\`\(\)\{\}\[\]\^\*\>\<\~\|\; \"\!\$\&\'\202-\377]/\\$&/go;
$a =~ s/[\n]/'\n'/go; # filenames with '\n' is quoted using \'
}
return $a;
@ -1737,6 +1740,10 @@ sub init_run_jobs {
debug("run", "Running jobs before on ", $sshlogin->string(), ": ",
$sshlogin->jobs_running(), "\n");
if ($sshlogin->jobs_running() < $sshlogin->max_jobs_running()) {
if($opt::delay and $opt::delay > ::now() - $Global::newest_starttime) {
# It has been too short since last start
next;
}
if($opt::load and $sshlogin->loadavg_too_high()) {
# The load is too high or unknown
next;
@ -1745,12 +1752,13 @@ sub init_run_jobs {
# The server is swapping
next;
}
if($sshlogin->too_fast_remote_login()) {
# It has been too short since
if($opt::memfree and $sshlogin->memfree() < $opt::memfree) {
# The server has not enough mem free
::debug("mem", "Not starting job: not enough mem\n");
next;
}
if($opt::delay and $opt::delay > ::now() - $Global::newest_starttime) {
# It has been too short since last start
if($sshlogin->too_fast_remote_login()) {
# It has been too short since
next;
}
debug("run", $sshlogin->string(), " has ", $sshlogin->jobs_running(),
@ -3286,6 +3294,9 @@ sub reap_usleep {
if($opt::timeout) {
$Global::timeoutq->process_timeouts();
}
if($opt::memfree) {
kill_youngster_if_not_enough_mem();
}
usleep($ms);
Job::exit_if_disk_full();
if($opt::linebuffer) {
@ -3326,6 +3337,33 @@ sub now {
return (int(TimeHiRestime()*1000))/1000;
}
sub kill_youngster_if_not_enough_mem {
# Check each $sshlogin if there is enough mem.
# If less than 50% enough free mem: kill off the youngest child
# Put the child back in the queue.
my %jobs_of;
use Tie::RefHash;
tie %jobs_of, 'Tie::RefHash';
for my $job (values %Global::running) {
push @{$jobs_of{$job->sshlogin()}}, $job;
}
for my $sshlogin (keys %jobs_of) {
for my $job (sort { $b->seq() <=> $a->seq() } @{$jobs_of{$sshlogin}}) {
if($sshlogin->memfree() < $opt::memfree * 0.5) {
::debug("mem","\n",map { $_->seq()." " } (sort { $b->seq() <=> $a->seq() } @{$jobs_of{$sshlogin}}));
::debug("mem","\n", $job->seq(), "killed ",
$sshlogin->memfree()," < ",$opt::memfree * 0.5);
$job->kill();
$sshlogin->memfree_recompute();
} else {
last;
}
}
::debug("mem","Free mem OK ", $sshlogin->memfree()," > ",$opt::memfree * 0.5);
}
}
sub multiply_binary_prefix {
# Evalualte numbers with binary prefix
# Ki=2^10, Mi=2^20, Gi=2^30, Ti=2^40, Pi=2^50, Ei=2^70, Zi=2^80, Yi=2^80
@ -3607,6 +3645,83 @@ sub set_max_jobs_running {
$self->{'orig_max_jobs_running'} ||= $self->{'max_jobs_running'};
}
sub memfree {
# Returns:
# $memfree in bytes
my $self = shift;
if(not $self->{'memfree'}) {
$self->memfree_recompute();
}
# if(time - $self->{'last_memfree'} >= 1) {
# More than 10 seconds old: Recompute
$self->{'last_memfree'} = time;
$self->memfree_recompute();
# }
return (not defined $self->{'memfree'} or $self->{'memfree'})
}
sub memfree_recompute {
my $self = shift;
my $script = memfreescript();
# TODO add sshlogin and backgrounding
$self->{'memfree'} = `$script`;
#::debug("mem","New free:",$self->{'memfree'}," ");
}
{
my $script;
sub memfreescript {
# Returns:
# shellscript for giving available memory in bytes
if(not $script) {
my %script_of = (
# $ free
# total used free shared buffers cached
# Mem: 8075152 4922780 3152372 338856 233356 1658604
# -/+ buffers/cache: 3030820 5044332
# Swap: 8286204 116924 8169280
"linux" => q{ print (1024*((grep /buffers.cache/, `free`)[0] =~ /buffers.cache:\s+\S+\s+(\S+)/)[0]) },
# $ vmstat 1 1
# procs memory page faults cpu
# r b w avm free re at pi po fr de sr in sy cs us sy id
# 1 0 0 242793 389737 5 1 0 0 0 0 0 107 978 60 1 1 99
"hpux" => q{ print (((reverse `vmstat 1 1`)[0] =~ /(?:\d+\D+){4}(\d+)/)[0]*1024) },
# $ vmstat 1 2
# kthr memory page disk faults cpu
# r b w swap free re mf pi po fr de sr s3 s4 -- -- in sy cs us sy id
# 0 0 0 6496720 5170320 68 260 8 2 1 0 0 -0 3 0 0 309 1371 255 1 2 97
# 0 0 0 6434088 5072656 7 15 8 0 0 0 0 0 261 0 0 1889 1899 3222 0 8 92
#
# The last free is really free
"solaris" => q{ print (((reverse `vmstat 1 2`)[0] =~ /(?:\d+\D+){4}(\d+)/)[0]*1024) },
"freebsd" => q{
for(qx{/sbin/sysctl -a}) {
if (/^([^:]+):\s+(.+)\s*$/s) {
$sysctl->{$1} = $2;
}
}
print $sysctl->{"hw.pagesize"} *
($sysctl->{"vm.stats.vm.v_cache_count"}
+ $sysctl->{"vm.stats.vm.v_inactive_count"}
+ $sysctl->{"vm.stats.vm.v_free_count"});
},
);
my $perlscript = "";
# Make a perl script that detects the OS ($^O) and runs
# the appropriate command
for my $os (keys %script_of) {
$perlscript .= 'if($^O eq "'.$os.'") { '.$script_of{$os}.'}';
}
$perlscript =~ s/[\t\n]/ /g;
$perlscript = "perl -e " . ::shell_quote_scalar($perlscript);
$script = $Global::envvar. " " .$perlscript;
}
return $script
}
}
sub swapping {
my $self = shift;
my $swapping = $self->swap_activity();
@ -6170,6 +6285,7 @@ sub should_be_retried {
} else {
# This command should be retried
$self->set_endtime(undef);
$self->reset_exitstatus();
$Global::JobQueue->unget($self);
::debug("run", "Retry ", $self->seq(), "\n");
return 1;
@ -6438,6 +6554,11 @@ sub set_exitstatus {
}
}
sub reset_exitstatus {
my $self = shift;
$self->{'exitstatus'} = undef;
}
sub exitsignal {
my $self = shift;
return $self->{'exitsignal'};
@ -7011,7 +7132,7 @@ sub replaced {
if($quote) {
@target = ::shell_quote(@target);
}
::debug("replace", "%replace=",::my_dump(%replace),"\n");
# ::debug("replace", "%replace=",::my_dump(%replace),"\n");
if(%replace) {
# Substitute the replace strings with the replacement values
# Must be sorted by length if a short word is a substring of a long word