parallel: --limit scripts.

This commit is contained in:
Ole Tange 2017-08-28 23:18:43 +02:00
parent 897d9f5db9
commit 1e62be2464
3 changed files with 109 additions and 2 deletions

View file

@ -198,7 +198,7 @@ to:parallel@gnu.org, bug-parallel@gnu.org
stable-bcc: Jesse Alama <jessealama@fastmail.fm> stable-bcc: Jesse Alama <jessealama@fastmail.fm>
Subject: GNU Parallel 20170922 ('') released <<[stable]>> Subject: GNU Parallel 20170922 ('Harvey/Peter Madsen') released <<[stable]>>
GNU Parallel 20170922 ('') <<[stable]>> has been released. It is available for download at: http://ftpmirror.gnu.org/parallel/ GNU Parallel 20170922 ('') <<[stable]>> has been released. It is available for download at: http://ftpmirror.gnu.org/parallel/

View file

@ -1005,6 +1005,7 @@ sub options_hash {
"wc|willcite|will-cite|nn|nonotice|no-notice" => \$opt::willcite, "wc|willcite|will-cite|nn|nonotice|no-notice" => \$opt::willcite,
# Termination and retries # Termination and retries
"halt-on-error|halt=s" => \$opt::halt, "halt-on-error|halt=s" => \$opt::halt,
"limit=s" => \$opt::limit,
"memfree=s" => \$opt::memfree, "memfree=s" => \$opt::memfree,
"retries=s" => \$opt::retries, "retries=s" => \$opt::retries,
"timeout=s" => \$opt::timeout, "timeout=s" => \$opt::timeout,
@ -2500,6 +2501,10 @@ sub init_run_jobs {
# The server is swapping # The server is swapping
next; next;
} }
if($opt::limit and $sshlogin->limit()) {
# Over limit
next;
}
if($opt::memfree and $sshlogin->memfree() < $opt::memfree) { if($opt::memfree and $sshlogin->memfree() < $opt::memfree) {
# The server has not enough mem free # The server has not enough mem free
::debug("mem", "Not starting job: not enough mem\n"); ::debug("mem", "Not starting job: not enough mem\n");
@ -4671,6 +4676,9 @@ sub reap_usleep {
if($opt::memfree) { if($opt::memfree) {
kill_youngster_if_not_enough_mem(); kill_youngster_if_not_enough_mem();
} }
if($opt::limit) {
kill_youngest_if_over_limit();
}
if($ms > 0.002) { if($ms > 0.002) {
# When a child dies, wake up from sleep (or select(,,,)) # When a child dies, wake up from sleep (or select(,,,))
$SIG{CHLD} = sub { kill "ALRM", $$ }; $SIG{CHLD} = sub { kill "ALRM", $$ };
@ -4700,6 +4708,31 @@ sub reap_usleep {
} }
} }
sub kill_youngest_if_over_limit {
# Check each $sshlogin we are over limit
# If over limit: kill off the youngest child
# Put the child back in the queue.
# Uses:
# %Global::running
my %jobs_of;
my @sshlogins;
for my $job (values %Global::running) {
if(not $jobs_of{$job->sshlogin()}) {
push @sshlogins, $job->sshlogin();
}
push @{$jobs_of{$job->sshlogin()}}, $job;
}
for my $sshlogin (@sshlogins) {
for my $job (sort { $b->seq() <=> $a->seq() } @{$jobs_of{$sshlogin}}) {
if($sshlogin->limit() == 2) {
$job->kill();
last;
}
}
}
}
sub kill_youngster_if_not_enough_mem { sub kill_youngster_if_not_enough_mem {
# Check each $sshlogin if there is enough mem. # Check each $sshlogin if there is enough mem.
# If less than 50% enough free mem: kill off the youngest child # If less than 50% enough free mem: kill off the youngest child
@ -5061,10 +5094,80 @@ sub memfree_recompute {
$perlscript =~ s/[\t\n ]+/ /g; $perlscript =~ s/[\t\n ]+/ /g;
$script = "perl -e " . ::shell_quote_scalar($perlscript); $script = "perl -e " . ::shell_quote_scalar($perlscript);
} }
return $script return $script;
} }
} }
sub limit {
# Returns:
# 0 = Below limit. Start another job.
# 1 = Over limit. Start no jobs.
# 2 = Kill youngest job
my $self = shift;
if(not defined $self->{'limitscript'}) {
my %limitscripts =
("io" => q!
io() {
limit=$1;
io_file=$2;
# Do the measurement in the background
(tmp=$(tempfile);
LANG=C iostat -x 1 2 > $tmp;
mv $tmp $io_file) &
perl -e '-e $ARGV[0] or exit(1);
for(reverse <>) {
/Device:/ and last;
/(\S+)$/ and $max = $max > $1 ? $max : $1; }
exit ($max < '$limit')' $io_file;
};
export -f io;
io %s %s
!,
"mem" => q!
mem() {
limit=$1;
awk '/^((Swap)?Cached|MemFree|Buffers):/{ sum += $2}
END {
if (sum*1024 < '$limit'/2) { exit 2; }
else { exit (sum*1024 < '$limit') }
}' /proc/meminfo;
};
export -f mem;
mem %s;
!,
"load" => q!
load() {
limit=$1;
ps ax -o state,command |
grep -E '^[DOR].[^[]' |
wc -l |
perl -ne 'exit ('$limit' < $_)';
};
export -f load;
load %s;
!,
);
my ($cmd,@args) = split /\s+/,$opt::limit;
if($limitscripts{$cmd}) {
my $tmpfile = ::tmpname("parlmt");
$Global::unlink{$tmpfile};
$self->{'limitscript'} =
::spacefree(1, sprintf($limitscripts{$cmd},@args,$tmpfile));
} else {
$self->{'limitscript'} = $opt::limit;
}
}
my %env = %ENV;
local %ENV = %env;
$ENV{'SSHLOGIN'} = $self->string();
system($Global::shell,"-c",$self->{'limitscript'});
::debug("limit","limit `".$self->{'limitscript'}."` result ".($?>>8)."\n");
return $?>>8;
}
sub swapping { sub swapping {
my $self = shift; my $self = shift;
my $swapping = $self->swap_activity(); my $swapping = $self->swap_activity();

View file

@ -2590,6 +2590,10 @@ as many arguments that will fit on the line:
ls | grep -E '\.log$' | parallel -m mv {} destdir ls | grep -E '\.log$' | parallel -m mv {} destdir
In many shells you can also use B<printf>:
printf '%s\0' *.log | parallel -m mv {} destdir
=head1 EXAMPLE: Context replace =head1 EXAMPLE: Context replace