diff --git a/src/parallel b/src/parallel index 3d85f5a9..4e6a0239 100755 --- a/src/parallel +++ b/src/parallel @@ -574,17 +574,11 @@ sub acquire_semaphore { if($Semaphore::fg) { # skip } else { - # If run in the background, the PID will change - # therefore release and re-acquire the semaphore - $sem->release(); if(fork()) { exit(0); } else { - # child - # Get a semaphore for this pid - ::die_bug("Can't start a new session: $!") if setsid() == -1; - $sem = Semaphore->new($Semaphore::name,$Global::host{':'}->max_jobs_running()); - $sem->acquire(); + # If run in the background, the PID will change + $sem->pid_change(); } } return $sem; @@ -715,7 +709,7 @@ sub options_hash { "exit|x" => \$opt::x, # Semaphore "semaphore" => \$opt::semaphore, - "semaphoretimeout=i" => \$opt::semaphoretimeout, + "semaphoretimeout|st=i" => \$opt::semaphoretimeout, "semaphorename|id=s" => \$opt::semaphorename, "fg" => \$opt::fg, "bg" => \$opt::bg, @@ -8352,43 +8346,58 @@ sub new { }, ref($class) || $class; } +sub remove_dead_locks { + my $self = shift; + my $lockdir = $self->{'lockdir'}; + + for my $d (glob "$lockdir/*") { + $d =~ m:$lockdir/([0-9]+)\@([-\._a-z0-9]+)$:o or next; + my ($pid, $host) = ($1, $2); + if($host eq ::hostname()) { + if(not kill 0, $pid) { + ::debug("sem", "Dead: $d\n"); + unlink $d; + } else { + ::debug("sem", "Alive: $d\n"); + } + } + } +} + sub acquire { my $self = shift; my $sleep = 1; # 1 ms my $start_time = time; while(1) { + # Can we get a lock? $self->atomic_link_if_count_less_than() and last; - ::debug("sem", "Remove dead locks"); - my $lockdir = $self->{'lockdir'}; - for my $d (glob "$lockdir/*") { - ::debug("sem", "Lock $d $lockdir\n"); - $d =~ m:$lockdir/([0-9]+)\@([-\._a-z0-9]+)$:o or next; - my ($pid, $host) = ($1, $2); - if($host eq ::hostname()) { - if(not kill 0, $1) { - ::debug("sem", "Dead: $d"); - unlink $d; - } else { - ::debug("sem", "Alive: $d"); - } - } - } - # try again - $self->atomic_link_if_count_less_than() and last; + $self->remove_dead_locks(); # Retry slower and slower up to 1 second $sleep = ($sleep < 1000) ? ($sleep * 1.1) : ($sleep); # Random to avoid every sleeping job waking up at the same time ::usleep(rand()*$sleep); - if(defined($opt::timeout) and - $start_time + $opt::timeout > time) { - # Acquire the lock anyway - if(not -e $self->{'idfile'}) { - open (my $fh, ">", $self->{'idfile'}) or - ::die_bug("timeout_write_idfile: $self->{'idfile'}"); - close $fh; + if($opt::semaphoretimeout) { + if($opt::semaphoretimeout > 0 + and + time - $start_time > $opt::semaphoretimeout) { + # Timeout: Take the semaphore anyway + ::warning("Semaphore timed out. Stealing the semaphore.\n"); + if(not -e $self->{'idfile'}) { + open (my $fh, ">", $self->{'idfile'}) or + ::die_bug("timeout_write_idfile: $self->{'idfile'}"); + close $fh; + } + link $self->{'idfile'}, $self->{'pidfile'}; + last; + } + if($opt::semaphoretimeout < 0 + and + time - $start_time > -$opt::semaphoretimeout) { + # Timeout: Exit + ::warning("Semaphore timed out. Exiting.\n"); + exit(1); + last; } - link $self->{'idfile'}, $self->{'pidfile'}; - last; } } ::debug("sem", "acquired $self->{'pid'}\n"); @@ -8409,66 +8418,35 @@ sub release { ::debug("run", "released $self->{'pid'}\n"); } -sub _release { +sub pid_change { + # This should do what release()+acquire() would do without having + # to re-acquire the semaphore my $self = shift; - unlink $self->{'pidfile'}; - $self->lock(); - my $nlinks = $self->nlinks(); - ::debug("sem", $nlinks, "<", $self->{'count'}); - if($nlinks-- > 1) { - unlink $self->{'idfile'}; - open (my $fh, ">", $self->{'idfile'}) or - ::die_bug("write_idfile: $self->{'idfile'}"); - print $fh "#"x$nlinks; - close $fh; - } else { - unlink $self->{'idfile'}; - rmdir $self->{'lockdir'}; - } - $self->unlock(); - ::debug("sem", "released $self->{'pid'}\n"); + my $old_pidfile = $self->{'pidfile'}; + $self->{'pid'} = $$; + $self->{'pidfile'} = $self->{'lockdir'}."/".$$.'@'.::hostname(); + my $retval = link $self->{'idfile'}, $self->{'pidfile'}; + ::debug("sem","link($self->{'idfile'},$self->{'pidfile'})=$retval\n"); + unlink $old_pidfile; } sub atomic_link_if_count_less_than { - # Link $file1 to $file2 if nlinks to $file1 < $count - my $self = shift; - my $retval = 0; - $self->lock(); - ::debug($self->nlinks(), "<", $self->{'count'}); - if($self->nlinks() < $self->{'count'}) { - -d $self->{'lockdir'} or mkdir_or_die($self->{'lockdir'}); - if(not -e $self->{'idfile'}) { - open (my $fh, ">", $self->{'idfile'}) or - ::die_bug("write_idfile: $self->{'idfile'}"); - close $fh; - } - $retval = link $self->{'idfile'}, $self->{'pidfile'}; - } - $self->unlock(); - ::debug("run", "atomic $retval"); - return $retval; -} - -sub _atomic_link_if_count_less_than { # Link $file1 to $file2 if nlinks to $file1 < $count my $self = shift; my $retval = 0; $self->lock(); my $nlinks = $self->nlinks(); - ::debug("sem", $nlinks, "<", $self->{'count'}); - if($nlinks++ < $self->{'count'}) { + ::debug("sem","$nlinks<$self->{'count'} "); + if($nlinks < $self->{'count'}) { -d $self->{'lockdir'} or mkdir_or_die($self->{'lockdir'}); if(not -e $self->{'idfile'}) { open (my $fh, ">", $self->{'idfile'}) or ::die_bug("write_idfile: $self->{'idfile'}"); close $fh; } - open (my $fh, ">", $self->{'idfile'}) or - ::die_bug("write_idfile: $self->{'idfile'}"); - print $fh "#"x$nlinks; - close $fh; $retval = link $self->{'idfile'}, $self->{'pidfile'}; + ::debug("sem","link($self->{'idfile'},$self->{'pidfile'})=$retval\n"); } $self->unlock(); ::debug("sem", "atomic $retval"); @@ -8478,7 +8456,6 @@ sub _atomic_link_if_count_less_than { sub nlinks { my $self = shift; if(-e $self->{'idfile'}) { - ::debug("sem", "nlinks", (stat(_))[3], "size", (stat(_))[7], "\n"); return (stat(_))[3]; } else { return 0; @@ -8522,12 +8499,22 @@ sub lock { ::usleep(rand()*$sleep); $total_sleep += $sleep; if($opt::semaphoretimeout) { - if($total_sleep/1000 > $opt::semaphoretimeout) { - # Timeout: bail out - ::warning("Semaphore timed out. Ignoring timeout."); + if($opt::semaphoretimeout > 0 + and + $total_sleep/1000 > $opt::semaphoretimeout) { + # Timeout: Take the semaphore anyway + ::warning("Semaphore timed out. Taking the semaphore."); $locked = 3; last; } + if($opt::semaphoretimeout < 0 + and + $total_sleep/1000 > -$opt::semaphoretimeout) { + # Timeout: Exit + ::warning("Semaphore timed out. Exiting."); + $locked = 4; + last; + } } else { if($total_sleep/1000 > 30) { ::warning("Semaphore stuck for 30 seconds. Consider using --semaphoretimeout."); diff --git a/src/parallel.pod b/src/parallel.pod index eb965478..972df70a 100644 --- a/src/parallel.pod +++ b/src/parallel.pod @@ -1467,7 +1467,7 @@ operating system and the B<-s> option. Pipe the input from /dev/null to do anything. -=item B<--semaphore> +=item B<--semaphore> (alpha testing) Work as a counting semaphore. B<--semaphore> will cause GNU B to start I in the background. When the number of @@ -1504,9 +1504,13 @@ Implies B<--semaphore>. See also B. -=item B<--semaphoretimeout> I +=item B<--semaphoretimeout> I (alpha testing) -If the semaphore is not released within secs seconds, take it anyway. +=item B<--st> I (alpha testing) + +If I > 0: If the semaphore is not released within I seconds, take it anyway. + +If I < 0: If the semaphore is not released within I seconds, exit. Implies B<--semaphore>. diff --git a/src/sem.pod b/src/sem.pod index af9980fd..389a858a 100755 --- a/src/sem.pod +++ b/src/sem.pod @@ -6,7 +6,7 @@ sem - semaphore for executing shell command lines in parallel =head1 SYNOPSIS -B [--fg] [--id ] [--timeout ] [-j ] [--wait] command +B [--fg] [--id ] [--semaphoretimeout ] [-j ] [--wait] command =head1 DESCRIPTION @@ -128,11 +128,13 @@ The semaphore is stored in ~/.parallel/semaphores/ Do not put command in background. -=item B<--timeout> I (not implemented) +=item B<--semaphoretimeout> I (alpha testing) -=item B<-t> I (not implemented) +=item B<--st> I (alpha testing) -If the semaphore is not released within I seconds, take it anyway. +If I > 0: If the semaphore is not released within I seconds, take it anyway. + +If I < 0: If the semaphore is not released within I seconds, exit. =item B<--wait> diff --git a/testsuite/tests-to-run/sem01.sh b/testsuite/tests-to-run/sem01.sh index 4819b365..952a1d7c 100755 --- a/testsuite/tests-to-run/sem01.sh +++ b/testsuite/tests-to-run/sem01.sh @@ -1,47 +1,57 @@ #!/bin/bash +cat <<'EOF' | sed -e s/\$SERVER1/$SERVER1/\;s/\$SERVER2/$SERVER2/ | parallel -v -k --joblog /tmp/jl-`basename $0` -L1 echo '### Test mutex. This should not mix output'; -parallel -u --semaphore seq 1 10 '|' pv -qL 20; -parallel -u --semaphore seq 11 20 '|' pv -qL 100; -parallel --semaphore --wait; -echo done + parallel --semaphore --id mutex -u seq 1 10 '|' pv -qL 20; + parallel --semaphore --id mutex -u seq 11 20 '|' pv -qL 100; + parallel --semaphore --id mutex --wait; + echo done echo '### Test semaphore 2 jobs running simultaneously' -parallel -u -j2 --semaphore 'echo job1a 1; sleep 1; echo job1b 3' -sleep 0.2 -parallel -u -j2 --semaphore 'echo job2a 2; sleep 1; echo job2b 5' -sleep 0.2 -parallel -u -j2 --semaphore 'echo job3a 4; sleep 1; echo job3b 6' -parallel --semaphore --wait -echo done + parallel --semaphore --id 2jobs -u -j2 'echo job1a 1; sleep 1; echo job1b 3'; + sleep 0.2; + parallel --semaphore --id 2jobs -u -j2 'echo job2a 2; sleep 1; echo job2b 5'; + sleep 0.2; + parallel --semaphore --id 2jobs -u -j2 'echo job3a 4; sleep 1; echo job3b 6'; + parallel --semaphore --id 2jobs --wait; + echo done echo '### Test if parallel invoked as sem will run parallel --semaphore' -sem -u -j2 'echo job1a 1; sleep 1; echo job1b 3' -sleep 0.2 -sem -u -j2 'echo job2a 2; sleep 1; echo job2b 5' -sleep 0.2 -sem -u -j2 'echo job3a 4; sleep 1; echo job3b 6' -sem --wait -echo done + sem --id as_sem -u -j2 'echo job1a 1; sleep 1; echo job1b 3'; + sleep 0.2; + sem --id as_sem -u -j2 'echo job2a 2; sleep 1; echo job2b 5'; + sleep 0.2; + sem --id as_sem -u -j2 'echo job3a 4; sleep 1; echo job3b 6'; + sem --id as_sem --wait; + echo done echo '### Test similar example as from man page - run 2 jobs simultaneously' echo 'Expect done: 1 2 5 3 4' for i in 5 1 2 3 4 ; do sleep 0.2; echo Scheduling $i; - sem -j2 -u echo starting $i ";" sleep $i ";" echo done $i; + sem -j2 --id ex2jobs -u echo starting $i ";" sleep $i ";" echo done $i; done; -sem --wait +sem --id ex2jobs --wait echo '### Test --fg followed by --bg' -parallel -u --fg --semaphore seq 1 10 '|' pv -qL 30; -parallel -u --bg --semaphore seq 11 20 '|' pv -qL 30; -parallel -u --fg --semaphore seq 21 30 '|' pv -qL 30; -parallel -u --bg --semaphore seq 31 40 '|' pv -qL 30; -sem --wait + parallel -u --id fgbg --fg --semaphore seq 1 10 '|' pv -qL 30; + parallel -u --id fgbg --bg --semaphore seq 11 20 '|' pv -qL 30; + parallel -u --id fgbg --fg --semaphore seq 21 30 '|' pv -qL 30; + parallel -u --id fgbg --bg --semaphore seq 31 40 '|' pv -qL 30; + sem --id fgbg --wait echo '### Test bug #33621: --bg -p should give an error message' -stdout parallel -p --bg echo x{} + stdout parallel -p --bg echo x{} echo '### Failed on 20141226' -sem --fg --line-buffer --id lock_id echo OK + sem --fg --line-buffer --id bugin20141226 echo OK + +echo '### Test --st +1/-1' + stdout sem --id st --line-buffer "echo A normal-start;sleep 3;echo C normal-end"; + stdout sem --id st --line-buffer --st 1 "echo B st1-start;sleep 3;echo D st1-end"; + stdout sem --id st --line-buffer --st -1 "echo ERROR-st-1-start;sleep 3;echo ERROR-st-1-end"; + stdout sem --id st --wait + + +EOF \ No newline at end of file diff --git a/testsuite/wanted-results/niceload03 b/testsuite/wanted-results/niceload03 index cf16c710..3ae7b2d2 100644 --- a/testsuite/wanted-results/niceload03 +++ b/testsuite/wanted-results/niceload03 @@ -11,8 +11,8 @@ echo '### niceload with no arguments should give no output' niceload echo '### Test -t and -s' ### Test -t and -s - # This should sleep 2*1s and run 2*2s - niceload -v -t 1 -s 2 sleep 4.5 + # This should sleep at least 2*1s and run 2*2s + stdout niceload -v -t 1 -s 2 sleep 4.5 | head -n 4 Sleeping 1s Running 2s Sleeping 1s diff --git a/testsuite/wanted-results/sem01 b/testsuite/wanted-results/sem01 index b8c18b52..22303a22 100644 --- a/testsuite/wanted-results/sem01 +++ b/testsuite/wanted-results/sem01 @@ -1,3 +1,4 @@ +echo '### Test mutex. This should not mix output'; parallel --semaphore --id mutex -u seq 1 10 '|' pv -qL 20; parallel --semaphore --id mutex -u seq 11 20 '|' pv -qL 100; parallel --semaphore --id mutex --wait; echo done ### Test mutex. This should not mix output 1 2 @@ -20,7 +21,9 @@ 19 20 done +echo '### Test semaphore 2 jobs running simultaneously' ### Test semaphore 2 jobs running simultaneously + parallel --semaphore --id 2jobs -u -j2 'echo job1a 1; sleep 1; echo job1b 3'; sleep 0.2; parallel --semaphore --id 2jobs -u -j2 'echo job2a 2; sleep 1; echo job2b 5'; sleep 0.2; parallel --semaphore --id 2jobs -u -j2 'echo job3a 4; sleep 1; echo job3b 6'; parallel --semaphore --id 2jobs --wait; echo done job1a 1 job2a 2 job1b 3 @@ -28,7 +31,9 @@ job3a 4 job2b 5 job3b 6 done +echo '### Test if parallel invoked as sem will run parallel --semaphore' ### Test if parallel invoked as sem will run parallel --semaphore + sem --id as_sem -u -j2 'echo job1a 1; sleep 1; echo job1b 3'; sleep 0.2; sem --id as_sem -u -j2 'echo job2a 2; sleep 1; echo job2b 5'; sleep 0.2; sem --id as_sem -u -j2 'echo job3a 4; sleep 1; echo job3b 6'; sem --id as_sem --wait; echo done job1a 1 job2a 2 job1b 3 @@ -36,8 +41,11 @@ job3a 4 job2b 5 job3b 6 done +echo '### Test similar example as from man page - run 2 jobs simultaneously' ### Test similar example as from man page - run 2 jobs simultaneously +echo 'Expect done: 1 2 5 3 4' Expect done: 1 2 5 3 4 +for i in 5 1 2 3 4 ; do sleep 0.2; echo Scheduling $i; sem -j2 --id ex2jobs -u echo starting $i ";" sleep $i ";" echo done $i; done; sem --id ex2jobs --wait Scheduling 5 starting 5 Scheduling 1 @@ -53,7 +61,9 @@ done 5 starting 4 done 3 done 4 +echo '### Test --fg followed by --bg' ### Test --fg followed by --bg + parallel -u --id fgbg --fg --semaphore seq 1 10 '|' pv -qL 30; parallel -u --id fgbg --bg --semaphore seq 11 20 '|' pv -qL 30; parallel -u --id fgbg --fg --semaphore seq 21 30 '|' pv -qL 30; parallel -u --id fgbg --bg --semaphore seq 31 40 '|' pv -qL 30; sem --id fgbg --wait 1 2 3 @@ -94,7 +104,20 @@ done 4 38 39 40 +echo '### Test bug #33621: --bg -p should give an error message' ### Test bug #33621: --bg -p should give an error message + stdout parallel -p --bg echo x{} parallel: Error: Jobs running in the background cannot be interactive. +echo '### Failed on 20141226' ### Failed on 20141226 + sem --fg --line-buffer --id bugin20141226 echo OK OK +echo '### Test --st +1/-1' +### Test --st +1/-1 + stdout sem --id st --line-buffer "echo A normal-start;sleep 3;echo C normal-end"; stdout sem --id st --line-buffer --st 1 "echo B st1-start;sleep 3;echo D st1-end"; stdout sem --id st --line-buffer --st -1 "echo ERROR-st-1-start;sleep 3;echo ERROR-st-1-end"; stdout sem --id st --wait +A normal-start +parallel: Warning: Semaphore timed out. Stealing the semaphore. +B st1-start +C normal-end +parallel: Warning: Semaphore timed out. Exiting. +D st1-end