diff --git a/src/parallel b/src/parallel index c55da1b0..4c62232e 100755 --- a/src/parallel +++ b/src/parallel @@ -191,10 +191,10 @@ reaper(); ::debug("init", "Done reaping\n"); if($opt::pipe and @opt::a) { for my $job (@Global::tee_jobs) { - unlink $job->fh(2,"name"); + ::rm($job->fh(2,"name")); $job->set_fh(2,"name",""); $job->print(); - unlink $job->fh(1,"name"); + ::rm($job->fh(1,"name")); } } ::debug("init", "Cleaning\n"); @@ -3488,6 +3488,7 @@ sub reaper { ::wait_and_exit($Global::halt_exitstatus); } } + $job->cleanup(); start_more_jobs(); if($opt::progress) { my %progress = progress(); @@ -3814,6 +3815,7 @@ sub tmpfile { sub tmpname { # Select a name that does not exist # Do not create the file as it may be used for creating a socket (by tmux) + # Remember the name in $Global::unlink to avoid hitting the same name twice my $name = shift; my($tmpname); if(not -w $ENV{'TMPDIR'}) { @@ -3832,13 +3834,21 @@ sub tmpname { } sub tmpfifo { - # Securely make a fifo by securely making a dir with a fifo in it + # Find an unused name and mkfifo on it use POSIX qw(mkfifo); my $tmpfifo = tmpname("fif",@_); mkfifo($tmpfifo,0600); return $tmpfifo; } +sub rm { + # Remove file and remove it from %Global::unlink + # Uses: + # %Global::unlink + delete @Global::unlink{@_}; + unlink @_; +} + sub size_of_block_dev { # Like -s but for block devices # Input: @@ -4452,8 +4462,8 @@ sub new { sub DESTROY { my $self = shift; # Remove temporary files if they are created. - unlink $self->{'loadavg_file'}; - unlink $self->{'swap_activity_file'}; + ::rm($self->{'loadavg_file'}); + ::rm($self->{'swap_activity_file'}); } sub string { @@ -6158,6 +6168,8 @@ sub new { # filehandle for stdin (used for --pipe) # filename for writing stdout to (used for --files) # remaining data not sent to stdin (used for --pipe) + # tmpfiles to cleanup when job is done + 'unlink' => [], # amount of data sent via stdin (used for --pipe) 'transfersize' => 0, # size of files using --transfer 'returnsize' => 0, # size of files using --return @@ -6381,6 +6393,25 @@ sub openoutputfiles { } } +sub add_rm { + # Files to remove when job is done + my $self = shift; + push $self->{'unlink'}, @_; +} + +sub get_rm { + # Files to remove when job is done + my $self = shift; + return @{$self->{'unlink'}}; +} + +sub cleanup { + # Remove files when job is done + my $self = shift; + unlink $self->get_rm(); + delete @Global::unlink{$self->get_rm()}; +} + sub grouped { my $self = shift; # Set reading FD if using --group (--ungroup does not need) @@ -6392,7 +6423,7 @@ sub grouped { ::die_bug("fdr: Cannot open ".$self->fh($fdno,'name')); $self->set_fh($fdno,'r',$fdr); # Unlink if required - $Global::debug or unlink $self->fh($fdno,"unlink"); + $Global::debug or ::rm($self->fh($fdno,"unlink")); } } @@ -7402,7 +7433,7 @@ sub sshcleanup { my $workdir = $self->workdir(); my $cleancmd = ""; - for my $file ($self->cleanup()) { + for my $file ($self->remote_cleanup()) { my @subworkdirs = parentdirs_of($file); $cleancmd .= $sshlogin->cleanup_cmd($file,$workdir).";"; } @@ -7412,7 +7443,7 @@ sub sshcleanup { return $cleancmd; } -sub cleanup { +sub remote_cleanup { # Returns: # Files to remove at cleanup my $self = shift; @@ -7602,6 +7633,7 @@ sub start { $ENV{'PARALLEL_SEQ'} = $job->seq(); $ENV{'PARALLEL_PID'} = $$; $ENV{'PARALLEL_TMP'} = ::tmpname("par"); + $job->add_rm($ENV{'PARALLEL_TMP'}); ::debug("run", $Global::total_running, " processes . Starting (", $job->seq(), "): $command\n"); if($opt::pipe) { @@ -7721,6 +7753,7 @@ sub print_dryrun_and_verbose { my $actual_command = shift; # Temporary file name. Used for fifo to communicate exit val my $tmpfifo=::tmpname("tmx"); + $self->add_rm($tmpfifo); if(length($tmpfifo) >=100) { ::error("tmux does not support sockets with path > 100."); @@ -7882,7 +7915,7 @@ sub print { if($opt::dryrun) { # Nothing was printed to this job: # cleanup tmp files if --files was set - unlink $self->fh(1,"name"); + ::rm($self->fh(1,"name")); } if($opt::pipe and $self->virgin()) { # Skip --joblog, --dryrun, --verbose @@ -7973,8 +8006,8 @@ sub files_print { # Nothing was printed to this job: # cleanup unused tmp files if --files was set for my $fdno (1,2) { - unlink $self->fh($fdno,"name"); - unlink $self->fh($fdno,"unlink"); + ::rm($self->fh($fdno,"name")); + ::rm($self->fh($fdno,"unlink")); } } elsif($fdno == 1 and $self->fh($fdno,"name")) { print $out_fd $self->tag(),$self->fh($fdno,"name"),"\n"; @@ -9256,7 +9289,7 @@ sub tmux_length { my $tmuxcmd = $ENV{'TMUX'}." -S $tmpfile new-session -d -n echo $l". ("x"x$l). " && echo $l; rm -f $tmpfile"; push @out, ::qqx($tmuxcmd); - unlink $tmpfile; + ::rm($tmpfile); } ::debug("tmux","tmux-out ",@out); chomp @out; @@ -10124,9 +10157,9 @@ sub run { eval { $rv = $sth->execute(@_) }) { last; } else { - if($@ =~ /no such table/ + if($@ =~ /no such table|Table .* doesn.t exist|relation ".*" does not exist/ or - $DBI::errstr =~ /no such table/) { + $DBI::errstr =~ /no such table|Table .* doesn.t exist|relation ".*" does not exist/) { # This is fine: # It is just a worker that reported back too late - # another worker had finished the job first @@ -10330,7 +10363,7 @@ sub remove_dead_locks { if($host eq ::hostname()) { if(not kill 0, $pid) { ::debug("sem", "Dead: $d\n"); - unlink $d; + ::rm($d); } else { ::debug("sem", "Alive: $d\n"); } @@ -10379,12 +10412,12 @@ sub acquire { sub release { my $self = shift; - unlink $self->{'pidfile'}; + ::rm($self->{'pidfile'}); if($self->nlinks() == 1) { # This is the last link, so atomic cleanup $self->lock(); if($self->nlinks() == 1) { - unlink $self->{'idfile'}; + ::rm($self->{'idfile'}); rmdir $self->{'lockdir'}; } $self->unlock(); @@ -10402,7 +10435,7 @@ sub pid_change { $self->{'pidfile'} = $self->{'lockdir'}."/".$$.'@'.::hostname(); my $retval = link $self->{'idfile'}, $self->{'pidfile'}; ::debug("sem","link($self->{'idfile'},$self->{'pidfile'})=$retval\n"); - unlink $old_pidfile; + ::rm($old_pidfile); } sub atomic_link_if_count_less_than { @@ -10500,7 +10533,7 @@ sub lock { sub unlock { my $self = shift; - unlink $self->{'lockfile'}; + ::rm($self->{'lockfile'}); close $self->{'lockfh'}; ::debug("run", "unlocked\n"); } diff --git a/testsuite/tests-to-run/parallel-local-30s.sh b/testsuite/tests-to-run/parallel-local-30s.sh index 6504d5d5..88123115 100644 --- a/testsuite/tests-to-run/parallel-local-30s.sh +++ b/testsuite/tests-to-run/parallel-local-30s.sh @@ -4,27 +4,51 @@ # Each should be taking 30-100s and be possible to run in parallel # I.e.: No race conditions, no logins -# Assume /tmp/shm is easy to fill up -export SHM=/tmp/shm/parallel -mkdir -p $SHM -sudo umount -l $SHM -sudo mount -t tmpfs -o size=10% none $SHM +par_race_condition1() { + echo '### Test race condition on 8 CPU (my laptop)' + seq 1 5000000 > /tmp/parallel_race_cond + seq 1 10 | parallel -k "cat /tmp/parallel_race_cond | parallel --pipe --recend '' -k gzip >/dev/null; echo {}" + rm /tmp/parallel_race_cond +} -cat <<'EOF' | sed -e 's/;$/; /;s/$SERVER1/'$SERVER1'/;s/$SERVER2/'$SERVER2'/' | stdout parallel -vj4 -k --joblog /tmp/jl-`basename $0` -L1 -echo '### Test race condition on 8 CPU (my laptop)'; - seq 1 5000000 > /tmp/parallel_test; - seq 1 10 | parallel -k "cat /tmp/parallel_test | parallel --pipe --recend '' -k gzip >/dev/null; echo {}"; - rm /tmp/parallel_test +par_tmp_full() { + # Assume /tmp/shm is easy to fill up + export SHM=/tmp/shm/parallel + mkdir -p $SHM + sudo umount -l $SHM + sudo mount -t tmpfs -o size=10% none $SHM -echo '**' + echo "### Test --tmpdir running full. bug #40733 was caused by this" + stdout parallel -j1 --tmpdir $SHM cat /dev/zero ::: dummy +} -echo "### Test --tmpdir running full. bug #40733 was caused by this" - stdout parallel -j1 --tmpdir $SHM cat /dev/zero ::: dummy +par_bug_48290() { + echo "### bug #48290: round-robin does not distribute data based on business" + echo "Jobslot 1 is 256 times slower than jobslot 4 and should get much less data" + yes "$(seq 1000|xargs)" | head -c 30M | + parallel --tagstring {%} --linebuffer --compress -j4 --roundrobin --pipe --block 10k \ + pv -qL '{= $_=int( $job->slot()**4/2+1) =}'0000 | + perl -ne '/^\d+/ and $s{$&}++; END { print map { "$_\n" } sort { $s{$b} <=> $s{$a} } keys %s}' +} -echo '**' +par_memory_leak() { + a_run() { + seq $1 |time -v parallel true 2>&1 | + grep 'Maximum resident' | + field 6; + } + export -f a_run + echo "### Test for memory leaks" + echo "Of 10 runs of 1 job at least one should be bigger than a 3000 job run" + small_max=$(seq 10 | parallel a_run 1 | jq -s max) + big=$(a_run 3000) + if [ $small_max -lt $big ] ; then + echo "Bad: Memleak likely." + else + echo "Good: No memleak detected." + fi +} -echo "### bug #48290: round-robin does not distribute data based on business" - echo "Jobslot 1 is 8 times slower than jobslot 8 and should get much less data" - seq 10000000 | parallel --tagstring {%} --linebuffer --compress -j8 --roundrobin --pipe --block 300k 'pv -qL {%}00000'| perl -ne '/^\d+/ and $s{$&}++; END { print map { "$_\n" } sort { $s{$a} <=> $s{$b} } keys %s}' -EOF +export -f $(compgen -A function | grep par_) +compgen -A function | grep par_ | sort | parallel -j6 --tag -k '{} 2>&1' diff --git a/testsuite/wanted-results/parallel-local-30s b/testsuite/wanted-results/parallel-local-30s index 1b5d20b5..c92e9918 100644 --- a/testsuite/wanted-results/parallel-local-30s +++ b/testsuite/wanted-results/parallel-local-30s @@ -1,34 +1,23 @@ -echo '### Test race condition on 8 CPU (my laptop)'; seq 1 5000000 > /tmp/parallel_test; seq 1 10 | parallel -k "cat /tmp/parallel_test | parallel --pipe --recend '' -k gzip >/dev/null; echo {}"; rm /tmp/parallel_test -### Test race condition on 8 CPU (my laptop) -1 -2 -3 -4 -5 -6 -7 -8 -9 -10 -echo '**' -** -echo "### Test --tmpdir running full. bug #40733 was caused by this" -### Test --tmpdir running full. bug #40733 was caused by this - stdout parallel -j1 --tmpdir $SHM cat /dev/zero ::: dummy -parallel: Error: Output is incomplete. Cannot append to buffer file in /tmp/shm/parallel. Is the disk full? -parallel: Error: Change $TMPDIR with --tmpdir or use --compress. -echo '**' -** -echo "### bug #48290: round-robin does not distribute data based on business" -### bug #48290: round-robin does not distribute data based on business - echo "Jobslot 1 is 8 times slower than jobslot 8 and should get much less data" -Jobslot 1 is 8 times slower than jobslot 8 and should get much less data - seq 10000000 | parallel --tagstring {%} --linebuffer --compress -j8 --roundrobin --pipe --block 300k 'pv -qL {%}00000'| perl -ne '/^\d+/ and $s{$&}++; END { print map { "$_\n" } sort { $s{$a} <=> $s{$b} } keys %s}' -1 -2 -3 -5 -4 -7 -6 -8 +par_bug_48290 ### bug #48290: round-robin does not distribute data based on business +par_bug_48290 Jobslot 1 is 256 times slower than jobslot 4 and should get much less data +par_bug_48290 4 +par_bug_48290 3 +par_bug_48290 2 +par_bug_48290 1 +par_memory_leak ### Test for memory leaks +par_memory_leak Of 10 runs of 1 job at least one should be bigger than a 3000 job run +par_memory_leak Good: No memleak detected. +par_race_condition1 ### Test race condition on 8 CPU (my laptop) +par_race_condition1 1 +par_race_condition1 2 +par_race_condition1 3 +par_race_condition1 4 +par_race_condition1 5 +par_race_condition1 6 +par_race_condition1 7 +par_race_condition1 8 +par_race_condition1 9 +par_race_condition1 10 +par_tmp_full ### Test --tmpdir running full. bug #40733 was caused by this +par_tmp_full parallel: Error: Output is incomplete. Cannot append to buffer file in /tmp/shm/parallel. Is the disk full? +par_tmp_full parallel: Error: Change $TMPDIR with --tmpdir or use --compress.