From 1e62be2464e7977979dc3ba6e52ca11d5077ea21 Mon Sep 17 00:00:00 2001 From: Ole Tange Date: Mon, 28 Aug 2017 23:18:43 +0200 Subject: [PATCH] parallel: --limit scripts. --- doc/release_new_version | 2 +- src/parallel | 105 +++++++++++++++++++++++++++++++++++++++- src/parallel.pod | 4 ++ 3 files changed, 109 insertions(+), 2 deletions(-) diff --git a/doc/release_new_version b/doc/release_new_version index 4ee913b3..9791cefb 100644 --- a/doc/release_new_version +++ b/doc/release_new_version @@ -198,7 +198,7 @@ to:parallel@gnu.org, bug-parallel@gnu.org stable-bcc: Jesse Alama -Subject: GNU Parallel 20170922 ('') released <<[stable]>> +Subject: GNU Parallel 20170922 ('Harvey/Peter Madsen') released <<[stable]>> GNU Parallel 20170922 ('') <<[stable]>> has been released. It is available for download at: http://ftpmirror.gnu.org/parallel/ diff --git a/src/parallel b/src/parallel index cb0b8e34..fda8d16b 100755 --- a/src/parallel +++ b/src/parallel @@ -1005,6 +1005,7 @@ sub options_hash { "wc|willcite|will-cite|nn|nonotice|no-notice" => \$opt::willcite, # Termination and retries "halt-on-error|halt=s" => \$opt::halt, + "limit=s" => \$opt::limit, "memfree=s" => \$opt::memfree, "retries=s" => \$opt::retries, "timeout=s" => \$opt::timeout, @@ -2500,6 +2501,10 @@ sub init_run_jobs { # The server is swapping next; } + if($opt::limit and $sshlogin->limit()) { + # Over limit + next; + } if($opt::memfree and $sshlogin->memfree() < $opt::memfree) { # The server has not enough mem free ::debug("mem", "Not starting job: not enough mem\n"); @@ -4671,6 +4676,9 @@ sub reap_usleep { if($opt::memfree) { kill_youngster_if_not_enough_mem(); } + if($opt::limit) { + kill_youngest_if_over_limit(); + } if($ms > 0.002) { # When a child dies, wake up from sleep (or select(,,,)) $SIG{CHLD} = sub { kill "ALRM", $$ }; @@ -4700,6 +4708,31 @@ sub reap_usleep { } } +sub kill_youngest_if_over_limit { + # Check each $sshlogin we are over limit + # If over limit: kill off the youngest child + # Put the child back in the queue. + # Uses: + # %Global::running + my %jobs_of; + my @sshlogins; + + for my $job (values %Global::running) { + if(not $jobs_of{$job->sshlogin()}) { + push @sshlogins, $job->sshlogin(); + } + push @{$jobs_of{$job->sshlogin()}}, $job; + } + for my $sshlogin (@sshlogins) { + for my $job (sort { $b->seq() <=> $a->seq() } @{$jobs_of{$sshlogin}}) { + if($sshlogin->limit() == 2) { + $job->kill(); + last; + } + } + } +} + sub kill_youngster_if_not_enough_mem { # Check each $sshlogin if there is enough mem. # If less than 50% enough free mem: kill off the youngest child @@ -5061,10 +5094,80 @@ sub memfree_recompute { $perlscript =~ s/[\t\n ]+/ /g; $script = "perl -e " . ::shell_quote_scalar($perlscript); } - return $script + return $script; } } +sub limit { + # Returns: + # 0 = Below limit. Start another job. + # 1 = Over limit. Start no jobs. + # 2 = Kill youngest job + my $self = shift; + + if(not defined $self->{'limitscript'}) { + my %limitscripts = + ("io" => q! + io() { + limit=$1; + io_file=$2; + # Do the measurement in the background + (tmp=$(tempfile); + LANG=C iostat -x 1 2 > $tmp; + mv $tmp $io_file) & + perl -e '-e $ARGV[0] or exit(1); + for(reverse <>) { + /Device:/ and last; + /(\S+)$/ and $max = $max > $1 ? $max : $1; } + exit ($max < '$limit')' $io_file; + }; + export -f io; + io %s %s + !, + "mem" => q! + mem() { + limit=$1; + awk '/^((Swap)?Cached|MemFree|Buffers):/{ sum += $2} + END { + if (sum*1024 < '$limit'/2) { exit 2; } + else { exit (sum*1024 < '$limit') } + }' /proc/meminfo; + }; + export -f mem; + mem %s; + !, + "load" => q! + load() { + limit=$1; + ps ax -o state,command | + grep -E '^[DOR].[^[]' | + wc -l | + perl -ne 'exit ('$limit' < $_)'; + }; + export -f load; + load %s; + !, + ); + my ($cmd,@args) = split /\s+/,$opt::limit; + if($limitscripts{$cmd}) { + my $tmpfile = ::tmpname("parlmt"); + $Global::unlink{$tmpfile}; + $self->{'limitscript'} = + ::spacefree(1, sprintf($limitscripts{$cmd},@args,$tmpfile)); + } else { + $self->{'limitscript'} = $opt::limit; + } + } + + my %env = %ENV; + local %ENV = %env; + $ENV{'SSHLOGIN'} = $self->string(); + system($Global::shell,"-c",$self->{'limitscript'}); + ::debug("limit","limit `".$self->{'limitscript'}."` result ".($?>>8)."\n"); + return $?>>8; +} + + sub swapping { my $self = shift; my $swapping = $self->swap_activity(); diff --git a/src/parallel.pod b/src/parallel.pod index 8db3e959..93edafc5 100644 --- a/src/parallel.pod +++ b/src/parallel.pod @@ -2590,6 +2590,10 @@ as many arguments that will fit on the line: ls | grep -E '\.log$' | parallel -m mv {} destdir +In many shells you can also use B: + + printf '%s\0' *.log | parallel -m mv {} destdir + =head1 EXAMPLE: Context replace