parallel: Fixed bug #48449: memory leak.

This commit is contained in:
Ole Tange 2016-07-10 16:02:26 +02:00
parent 8a5729cebd
commit 402d3960d4
3 changed files with 117 additions and 71 deletions

View file

@ -191,10 +191,10 @@ reaper();
::debug("init", "Done reaping\n"); ::debug("init", "Done reaping\n");
if($opt::pipe and @opt::a) { if($opt::pipe and @opt::a) {
for my $job (@Global::tee_jobs) { for my $job (@Global::tee_jobs) {
unlink $job->fh(2,"name"); ::rm($job->fh(2,"name"));
$job->set_fh(2,"name",""); $job->set_fh(2,"name","");
$job->print(); $job->print();
unlink $job->fh(1,"name"); ::rm($job->fh(1,"name"));
} }
} }
::debug("init", "Cleaning\n"); ::debug("init", "Cleaning\n");
@ -3488,6 +3488,7 @@ sub reaper {
::wait_and_exit($Global::halt_exitstatus); ::wait_and_exit($Global::halt_exitstatus);
} }
} }
$job->cleanup();
start_more_jobs(); start_more_jobs();
if($opt::progress) { if($opt::progress) {
my %progress = progress(); my %progress = progress();
@ -3814,6 +3815,7 @@ sub tmpfile {
sub tmpname { sub tmpname {
# Select a name that does not exist # Select a name that does not exist
# Do not create the file as it may be used for creating a socket (by tmux) # Do not create the file as it may be used for creating a socket (by tmux)
# Remember the name in $Global::unlink to avoid hitting the same name twice
my $name = shift; my $name = shift;
my($tmpname); my($tmpname);
if(not -w $ENV{'TMPDIR'}) { if(not -w $ENV{'TMPDIR'}) {
@ -3832,13 +3834,21 @@ sub tmpname {
} }
sub tmpfifo { sub tmpfifo {
# Securely make a fifo by securely making a dir with a fifo in it # Find an unused name and mkfifo on it
use POSIX qw(mkfifo); use POSIX qw(mkfifo);
my $tmpfifo = tmpname("fif",@_); my $tmpfifo = tmpname("fif",@_);
mkfifo($tmpfifo,0600); mkfifo($tmpfifo,0600);
return $tmpfifo; return $tmpfifo;
} }
sub rm {
# Remove file and remove it from %Global::unlink
# Uses:
# %Global::unlink
delete @Global::unlink{@_};
unlink @_;
}
sub size_of_block_dev { sub size_of_block_dev {
# Like -s but for block devices # Like -s but for block devices
# Input: # Input:
@ -4452,8 +4462,8 @@ sub new {
sub DESTROY { sub DESTROY {
my $self = shift; my $self = shift;
# Remove temporary files if they are created. # Remove temporary files if they are created.
unlink $self->{'loadavg_file'}; ::rm($self->{'loadavg_file'});
unlink $self->{'swap_activity_file'}; ::rm($self->{'swap_activity_file'});
} }
sub string { sub string {
@ -6158,6 +6168,8 @@ sub new {
# filehandle for stdin (used for --pipe) # filehandle for stdin (used for --pipe)
# filename for writing stdout to (used for --files) # filename for writing stdout to (used for --files)
# remaining data not sent to stdin (used for --pipe) # remaining data not sent to stdin (used for --pipe)
# tmpfiles to cleanup when job is done
'unlink' => [],
# amount of data sent via stdin (used for --pipe) # amount of data sent via stdin (used for --pipe)
'transfersize' => 0, # size of files using --transfer 'transfersize' => 0, # size of files using --transfer
'returnsize' => 0, # size of files using --return 'returnsize' => 0, # size of files using --return
@ -6381,6 +6393,25 @@ sub openoutputfiles {
} }
} }
sub add_rm {
# Files to remove when job is done
my $self = shift;
push $self->{'unlink'}, @_;
}
sub get_rm {
# Files to remove when job is done
my $self = shift;
return @{$self->{'unlink'}};
}
sub cleanup {
# Remove files when job is done
my $self = shift;
unlink $self->get_rm();
delete @Global::unlink{$self->get_rm()};
}
sub grouped { sub grouped {
my $self = shift; my $self = shift;
# Set reading FD if using --group (--ungroup does not need) # Set reading FD if using --group (--ungroup does not need)
@ -6392,7 +6423,7 @@ sub grouped {
::die_bug("fdr: Cannot open ".$self->fh($fdno,'name')); ::die_bug("fdr: Cannot open ".$self->fh($fdno,'name'));
$self->set_fh($fdno,'r',$fdr); $self->set_fh($fdno,'r',$fdr);
# Unlink if required # Unlink if required
$Global::debug or unlink $self->fh($fdno,"unlink"); $Global::debug or ::rm($self->fh($fdno,"unlink"));
} }
} }
@ -7402,7 +7433,7 @@ sub sshcleanup {
my $workdir = $self->workdir(); my $workdir = $self->workdir();
my $cleancmd = ""; my $cleancmd = "";
for my $file ($self->cleanup()) { for my $file ($self->remote_cleanup()) {
my @subworkdirs = parentdirs_of($file); my @subworkdirs = parentdirs_of($file);
$cleancmd .= $sshlogin->cleanup_cmd($file,$workdir).";"; $cleancmd .= $sshlogin->cleanup_cmd($file,$workdir).";";
} }
@ -7412,7 +7443,7 @@ sub sshcleanup {
return $cleancmd; return $cleancmd;
} }
sub cleanup { sub remote_cleanup {
# Returns: # Returns:
# Files to remove at cleanup # Files to remove at cleanup
my $self = shift; my $self = shift;
@ -7602,6 +7633,7 @@ sub start {
$ENV{'PARALLEL_SEQ'} = $job->seq(); $ENV{'PARALLEL_SEQ'} = $job->seq();
$ENV{'PARALLEL_PID'} = $$; $ENV{'PARALLEL_PID'} = $$;
$ENV{'PARALLEL_TMP'} = ::tmpname("par"); $ENV{'PARALLEL_TMP'} = ::tmpname("par");
$job->add_rm($ENV{'PARALLEL_TMP'});
::debug("run", $Global::total_running, " processes . Starting (", ::debug("run", $Global::total_running, " processes . Starting (",
$job->seq(), "): $command\n"); $job->seq(), "): $command\n");
if($opt::pipe) { if($opt::pipe) {
@ -7721,6 +7753,7 @@ sub print_dryrun_and_verbose {
my $actual_command = shift; my $actual_command = shift;
# Temporary file name. Used for fifo to communicate exit val # Temporary file name. Used for fifo to communicate exit val
my $tmpfifo=::tmpname("tmx"); my $tmpfifo=::tmpname("tmx");
$self->add_rm($tmpfifo);
if(length($tmpfifo) >=100) { if(length($tmpfifo) >=100) {
::error("tmux does not support sockets with path > 100."); ::error("tmux does not support sockets with path > 100.");
@ -7882,7 +7915,7 @@ sub print {
if($opt::dryrun) { if($opt::dryrun) {
# Nothing was printed to this job: # Nothing was printed to this job:
# cleanup tmp files if --files was set # cleanup tmp files if --files was set
unlink $self->fh(1,"name"); ::rm($self->fh(1,"name"));
} }
if($opt::pipe and $self->virgin()) { if($opt::pipe and $self->virgin()) {
# Skip --joblog, --dryrun, --verbose # Skip --joblog, --dryrun, --verbose
@ -7973,8 +8006,8 @@ sub files_print {
# Nothing was printed to this job: # Nothing was printed to this job:
# cleanup unused tmp files if --files was set # cleanup unused tmp files if --files was set
for my $fdno (1,2) { for my $fdno (1,2) {
unlink $self->fh($fdno,"name"); ::rm($self->fh($fdno,"name"));
unlink $self->fh($fdno,"unlink"); ::rm($self->fh($fdno,"unlink"));
} }
} elsif($fdno == 1 and $self->fh($fdno,"name")) { } elsif($fdno == 1 and $self->fh($fdno,"name")) {
print $out_fd $self->tag(),$self->fh($fdno,"name"),"\n"; print $out_fd $self->tag(),$self->fh($fdno,"name"),"\n";
@ -9256,7 +9289,7 @@ sub tmux_length {
my $tmuxcmd = $ENV{'TMUX'}." -S $tmpfile new-session -d -n echo $l". my $tmuxcmd = $ENV{'TMUX'}." -S $tmpfile new-session -d -n echo $l".
("x"x$l). " && echo $l; rm -f $tmpfile"; ("x"x$l). " && echo $l; rm -f $tmpfile";
push @out, ::qqx($tmuxcmd); push @out, ::qqx($tmuxcmd);
unlink $tmpfile; ::rm($tmpfile);
} }
::debug("tmux","tmux-out ",@out); ::debug("tmux","tmux-out ",@out);
chomp @out; chomp @out;
@ -10124,9 +10157,9 @@ sub run {
eval { $rv = $sth->execute(@_) }) { eval { $rv = $sth->execute(@_) }) {
last; last;
} else { } else {
if($@ =~ /no such table/ if($@ =~ /no such table|Table .* doesn.t exist|relation ".*" does not exist/
or or
$DBI::errstr =~ /no such table/) { $DBI::errstr =~ /no such table|Table .* doesn.t exist|relation ".*" does not exist/) {
# This is fine: # This is fine:
# It is just a worker that reported back too late - # It is just a worker that reported back too late -
# another worker had finished the job first # another worker had finished the job first
@ -10330,7 +10363,7 @@ sub remove_dead_locks {
if($host eq ::hostname()) { if($host eq ::hostname()) {
if(not kill 0, $pid) { if(not kill 0, $pid) {
::debug("sem", "Dead: $d\n"); ::debug("sem", "Dead: $d\n");
unlink $d; ::rm($d);
} else { } else {
::debug("sem", "Alive: $d\n"); ::debug("sem", "Alive: $d\n");
} }
@ -10379,12 +10412,12 @@ sub acquire {
sub release { sub release {
my $self = shift; my $self = shift;
unlink $self->{'pidfile'}; ::rm($self->{'pidfile'});
if($self->nlinks() == 1) { if($self->nlinks() == 1) {
# This is the last link, so atomic cleanup # This is the last link, so atomic cleanup
$self->lock(); $self->lock();
if($self->nlinks() == 1) { if($self->nlinks() == 1) {
unlink $self->{'idfile'}; ::rm($self->{'idfile'});
rmdir $self->{'lockdir'}; rmdir $self->{'lockdir'};
} }
$self->unlock(); $self->unlock();
@ -10402,7 +10435,7 @@ sub pid_change {
$self->{'pidfile'} = $self->{'lockdir'}."/".$$.'@'.::hostname(); $self->{'pidfile'} = $self->{'lockdir'}."/".$$.'@'.::hostname();
my $retval = link $self->{'idfile'}, $self->{'pidfile'}; my $retval = link $self->{'idfile'}, $self->{'pidfile'};
::debug("sem","link($self->{'idfile'},$self->{'pidfile'})=$retval\n"); ::debug("sem","link($self->{'idfile'},$self->{'pidfile'})=$retval\n");
unlink $old_pidfile; ::rm($old_pidfile);
} }
sub atomic_link_if_count_less_than { sub atomic_link_if_count_less_than {
@ -10500,7 +10533,7 @@ sub lock {
sub unlock { sub unlock {
my $self = shift; my $self = shift;
unlink $self->{'lockfile'}; ::rm($self->{'lockfile'});
close $self->{'lockfh'}; close $self->{'lockfh'};
::debug("run", "unlocked\n"); ::debug("run", "unlocked\n");
} }

View file

@ -4,27 +4,51 @@
# Each should be taking 30-100s and be possible to run in parallel # Each should be taking 30-100s and be possible to run in parallel
# I.e.: No race conditions, no logins # I.e.: No race conditions, no logins
par_race_condition1() {
echo '### Test race condition on 8 CPU (my laptop)'
seq 1 5000000 > /tmp/parallel_race_cond
seq 1 10 | parallel -k "cat /tmp/parallel_race_cond | parallel --pipe --recend '' -k gzip >/dev/null; echo {}"
rm /tmp/parallel_race_cond
}
par_tmp_full() {
# Assume /tmp/shm is easy to fill up # Assume /tmp/shm is easy to fill up
export SHM=/tmp/shm/parallel export SHM=/tmp/shm/parallel
mkdir -p $SHM mkdir -p $SHM
sudo umount -l $SHM sudo umount -l $SHM
sudo mount -t tmpfs -o size=10% none $SHM sudo mount -t tmpfs -o size=10% none $SHM
cat <<'EOF' | sed -e 's/;$/; /;s/$SERVER1/'$SERVER1'/;s/$SERVER2/'$SERVER2'/' | stdout parallel -vj4 -k --joblog /tmp/jl-`basename $0` -L1
echo '### Test race condition on 8 CPU (my laptop)';
seq 1 5000000 > /tmp/parallel_test;
seq 1 10 | parallel -k "cat /tmp/parallel_test | parallel --pipe --recend '' -k gzip >/dev/null; echo {}";
rm /tmp/parallel_test
echo '**'
echo "### Test --tmpdir running full. bug #40733 was caused by this" echo "### Test --tmpdir running full. bug #40733 was caused by this"
stdout parallel -j1 --tmpdir $SHM cat /dev/zero ::: dummy stdout parallel -j1 --tmpdir $SHM cat /dev/zero ::: dummy
}
echo '**' par_bug_48290() {
echo "### bug #48290: round-robin does not distribute data based on business" echo "### bug #48290: round-robin does not distribute data based on business"
echo "Jobslot 1 is 8 times slower than jobslot 8 and should get much less data" echo "Jobslot 1 is 256 times slower than jobslot 4 and should get much less data"
seq 10000000 | parallel --tagstring {%} --linebuffer --compress -j8 --roundrobin --pipe --block 300k 'pv -qL {%}00000'| perl -ne '/^\d+/ and $s{$&}++; END { print map { "$_\n" } sort { $s{$a} <=> $s{$b} } keys %s}' yes "$(seq 1000|xargs)" | head -c 30M |
parallel --tagstring {%} --linebuffer --compress -j4 --roundrobin --pipe --block 10k \
pv -qL '{= $_=int( $job->slot()**4/2+1) =}'0000 |
perl -ne '/^\d+/ and $s{$&}++; END { print map { "$_\n" } sort { $s{$b} <=> $s{$a} } keys %s}'
}
EOF par_memory_leak() {
a_run() {
seq $1 |time -v parallel true 2>&1 |
grep 'Maximum resident' |
field 6;
}
export -f a_run
echo "### Test for memory leaks"
echo "Of 10 runs of 1 job at least one should be bigger than a 3000 job run"
small_max=$(seq 10 | parallel a_run 1 | jq -s max)
big=$(a_run 3000)
if [ $small_max -lt $big ] ; then
echo "Bad: Memleak likely."
else
echo "Good: No memleak detected."
fi
}
export -f $(compgen -A function | grep par_)
compgen -A function | grep par_ | sort | parallel -j6 --tag -k '{} 2>&1'

View file

@ -1,34 +1,23 @@
echo '### Test race condition on 8 CPU (my laptop)'; seq 1 5000000 > /tmp/parallel_test; seq 1 10 | parallel -k "cat /tmp/parallel_test | parallel --pipe --recend '' -k gzip >/dev/null; echo {}"; rm /tmp/parallel_test par_bug_48290 ### bug #48290: round-robin does not distribute data based on business
### Test race condition on 8 CPU (my laptop) par_bug_48290 Jobslot 1 is 256 times slower than jobslot 4 and should get much less data
1 par_bug_48290 4
2 par_bug_48290 3
3 par_bug_48290 2
4 par_bug_48290 1
5 par_memory_leak ### Test for memory leaks
6 par_memory_leak Of 10 runs of 1 job at least one should be bigger than a 3000 job run
7 par_memory_leak Good: No memleak detected.
8 par_race_condition1 ### Test race condition on 8 CPU (my laptop)
9 par_race_condition1 1
10 par_race_condition1 2
echo '**' par_race_condition1 3
** par_race_condition1 4
echo "### Test --tmpdir running full. bug #40733 was caused by this" par_race_condition1 5
### Test --tmpdir running full. bug #40733 was caused by this par_race_condition1 6
stdout parallel -j1 --tmpdir $SHM cat /dev/zero ::: dummy par_race_condition1 7
parallel: Error: Output is incomplete. Cannot append to buffer file in /tmp/shm/parallel. Is the disk full? par_race_condition1 8
parallel: Error: Change $TMPDIR with --tmpdir or use --compress. par_race_condition1 9
echo '**' par_race_condition1 10
** par_tmp_full ### Test --tmpdir running full. bug #40733 was caused by this
echo "### bug #48290: round-robin does not distribute data based on business" par_tmp_full parallel: Error: Output is incomplete. Cannot append to buffer file in /tmp/shm/parallel. Is the disk full?
### bug #48290: round-robin does not distribute data based on business par_tmp_full parallel: Error: Change $TMPDIR with --tmpdir or use --compress.
echo "Jobslot 1 is 8 times slower than jobslot 8 and should get much less data"
Jobslot 1 is 8 times slower than jobslot 8 and should get much less data
seq 10000000 | parallel --tagstring {%} --linebuffer --compress -j8 --roundrobin --pipe --block 300k 'pv -qL {%}00000'| perl -ne '/^\d+/ and $s{$&}++; END { print map { "$_\n" } sort { $s{$a} <=> $s{$b} } keys %s}'
1
2
3
5
4
7
6
8