From 7fba9278f8fc95057bef878aed1258dafdc805a3 Mon Sep 17 00:00:00 2001 From: Ole Tange Date: Mon, 15 Dec 2014 11:09:43 +0100 Subject: [PATCH] parallel: --memfree prototype. --- src/parallel | 135 ++++++++++++++++++++++++++++++++++++++++++++++++--- 1 file changed, 128 insertions(+), 7 deletions(-) diff --git a/src/parallel b/src/parallel index c7110c95..2359c7e8 100755 --- a/src/parallel +++ b/src/parallel @@ -687,6 +687,7 @@ sub options_hash { "xapply" => \$opt::xapply, "bibtex" => \$opt::bibtex, "nn|nonotice|no-notice" => \$opt::no_notice, + "memfree=s" => \$opt::memfree, # xargs-compatibility - implemented, man, testsuite "max-procs|P=s" => \$opt::jobs, "delimiter|d=s" => \$opt::d, @@ -815,6 +816,7 @@ sub parse_options { not defined $opt::recend) { $opt::recend = "\n"; } if(not defined $opt::blocksize) { $opt::blocksize = "1M"; } $opt::blocksize = multiply_binary_prefix($opt::blocksize); + $opt::memfree = multiply_binary_prefix($opt::memfree); if(defined $opt::controlmaster) { $opt::noctrlc = 1; } if(defined $opt::halt_on_error and $opt::halt_on_error=~/%/) { $opt::halt_on_error /= 100; } @@ -1507,9 +1509,10 @@ sub shell_quote_scalar { # $shell_quoted = string quoted with \ as needed by the shell my $a = $_[0]; if(defined $a) { - # $a =~ s/([\002-\011\013-\032\\\#\?\`\(\)\{\}\[\]\*\>\<\~\|\; \"\!\$\&\'\202-\377])/\\$1/g; + # Solaris sh wants ^ quoted. + # $a =~ s/([\002-\011\013-\032\\\#\?\`\(\)\{\}\[\]\^\*\>\<\~\|\; \"\!\$\&\'\202-\377])/\\$1/g; # This is 1% faster than the above - $a =~ s/[\002-\011\013-\032\\\#\?\`\(\)\{\}\[\]\*\>\<\~\|\; \"\!\$\&\'\202-\377]/\\$&/go; + $a =~ s/[\002-\011\013-\032\\\#\?\`\(\)\{\}\[\]\^\*\>\<\~\|\; \"\!\$\&\'\202-\377]/\\$&/go; $a =~ s/[\n]/'\n'/go; # filenames with '\n' is quoted using \' } return $a; @@ -1736,6 +1739,10 @@ sub init_run_jobs { debug("run", "Running jobs before on ", $sshlogin->string(), ": ", $sshlogin->jobs_running(), "\n"); if ($sshlogin->jobs_running() < $sshlogin->max_jobs_running()) { + if($opt::delay and $opt::delay > ::now() - $Global::newest_starttime) { + # It has been too short since last start + next; + } if($opt::load and $sshlogin->loadavg_too_high()) { # The load is too high or unknown next; @@ -1744,12 +1751,13 @@ sub init_run_jobs { # The server is swapping next; } - if($sshlogin->too_fast_remote_login()) { - # It has been too short since + if($opt::memfree and $sshlogin->memfree() < $opt::memfree) { + # The server has not enough mem free + ::debug("mem", "Not starting job: not enough mem\n"); next; } - if($opt::delay and $opt::delay > ::now() - $Global::newest_starttime) { - # It has been too short since last start + if($sshlogin->too_fast_remote_login()) { + # It has been too short since next; } debug("run", $sshlogin->string(), " has ", $sshlogin->jobs_running(), @@ -3285,6 +3293,9 @@ sub reap_usleep { if($opt::timeout) { $Global::timeoutq->process_timeouts(); } + if($opt::memfree) { + kill_youngster_if_not_enough_mem(); + } usleep($ms); Job::exit_if_disk_full(); if($opt::linebuffer) { @@ -3325,6 +3336,33 @@ sub now { return (int(TimeHiRestime()*1000))/1000; } +sub kill_youngster_if_not_enough_mem { + # Check each $sshlogin if there is enough mem. + # If less than 50% enough free mem: kill off the youngest child + # Put the child back in the queue. + my %jobs_of; + use Tie::RefHash; + tie %jobs_of, 'Tie::RefHash'; + + for my $job (values %Global::running) { + push @{$jobs_of{$job->sshlogin()}}, $job; + } + for my $sshlogin (keys %jobs_of) { + for my $job (sort { $b->seq() <=> $a->seq() } @{$jobs_of{$sshlogin}}) { + if($sshlogin->memfree() < $opt::memfree * 0.5) { + ::debug("mem","\n",map { $_->seq()." " } (sort { $b->seq() <=> $a->seq() } @{$jobs_of{$sshlogin}})); + ::debug("mem","\n", $job->seq(), "killed ", + $sshlogin->memfree()," < ",$opt::memfree * 0.5); + $job->kill(); + $sshlogin->memfree_recompute(); + } else { + last; + } + } + ::debug("mem","Free mem OK ", $sshlogin->memfree()," > ",$opt::memfree * 0.5); + } +} + sub multiply_binary_prefix { # Evalualte numbers with binary prefix # Ki=2^10, Mi=2^20, Gi=2^30, Ti=2^40, Pi=2^50, Ei=2^70, Zi=2^80, Yi=2^80 @@ -3606,6 +3644,83 @@ sub set_max_jobs_running { $self->{'orig_max_jobs_running'} ||= $self->{'max_jobs_running'}; } +sub memfree { + # Returns: + # $memfree in bytes + my $self = shift; + if(not $self->{'memfree'}) { + $self->memfree_recompute(); + } +# if(time - $self->{'last_memfree'} >= 1) { + # More than 10 seconds old: Recompute + $self->{'last_memfree'} = time; + $self->memfree_recompute(); +# } + return (not defined $self->{'memfree'} or $self->{'memfree'}) +} + +sub memfree_recompute { + my $self = shift; + my $script = memfreescript(); + + # TODO add sshlogin and backgrounding + $self->{'memfree'} = `$script`; + #::debug("mem","New free:",$self->{'memfree'}," "); +} + +{ + my $script; + + sub memfreescript { + # Returns: + # shellscript for giving available memory in bytes + if(not $script) { + my %script_of = ( + # $ free + # total used free shared buffers cached + # Mem: 8075152 4922780 3152372 338856 233356 1658604 + # -/+ buffers/cache: 3030820 5044332 + # Swap: 8286204 116924 8169280 + "linux" => q{ print (1024*((grep /buffers.cache/, `free`)[0] =~ /buffers.cache:\s+\S+\s+(\S+)/)[0]) }, + # $ vmstat 1 1 + # procs memory page faults cpu + # r b w avm free re at pi po fr de sr in sy cs us sy id + # 1 0 0 242793 389737 5 1 0 0 0 0 0 107 978 60 1 1 99 + "hpux" => q{ print (((reverse `vmstat 1 1`)[0] =~ /(?:\d+\D+){4}(\d+)/)[0]*1024) }, + # $ vmstat 1 2 + # kthr memory page disk faults cpu + # r b w swap free re mf pi po fr de sr s3 s4 -- -- in sy cs us sy id + # 0 0 0 6496720 5170320 68 260 8 2 1 0 0 -0 3 0 0 309 1371 255 1 2 97 + # 0 0 0 6434088 5072656 7 15 8 0 0 0 0 0 261 0 0 1889 1899 3222 0 8 92 + # + # The last free is really free + "solaris" => q{ print (((reverse `vmstat 1 2`)[0] =~ /(?:\d+\D+){4}(\d+)/)[0]*1024) }, + "freebsd" => q{ + for(qx{/sbin/sysctl -a}) { + if (/^([^:]+):\s+(.+)\s*$/s) { + $sysctl->{$1} = $2; + } + } + print $sysctl->{"hw.pagesize"} * + ($sysctl->{"vm.stats.vm.v_cache_count"} + + $sysctl->{"vm.stats.vm.v_inactive_count"} + + $sysctl->{"vm.stats.vm.v_free_count"}); + }, + ); + my $perlscript = ""; + # Make a perl script that detects the OS ($^O) and runs + # the appropriate command + for my $os (keys %script_of) { + $perlscript .= 'if($^O eq "'.$os.'") { '.$script_of{$os}.'}'; + } + $perlscript =~ s/[\t\n]/ /g; + $perlscript = "perl -e " . ::shell_quote_scalar($perlscript); + $script = $Global::envvar. " " .$perlscript; + } + return $script + } +} + sub swapping { my $self = shift; my $swapping = $self->swap_activity(); @@ -6239,6 +6354,7 @@ sub should_be_retried { } else { # This command should be retried $self->set_endtime(undef); + $self->reset_exitstatus(); $Global::JobQueue->unget($self); ::debug("run", "Retry ", $self->seq(), "\n"); return 1; @@ -6507,6 +6623,11 @@ sub set_exitstatus { } } +sub reset_exitstatus { + my $self = shift; + $self->{'exitstatus'} = undef; +} + sub exitsignal { my $self = shift; return $self->{'exitsignal'}; @@ -7080,7 +7201,7 @@ sub replaced { if($quote) { @target = ::shell_quote(@target); } - ::debug("replace", "%replace=",::my_dump(%replace),"\n"); + # ::debug("replace", "%replace=",::my_dump(%replace),"\n"); if(%replace) { # Substitute the replace strings with the replacement values # Must be sorted by length if a short word is a substring of a long word