sem: --semaphoretimeout <negative value> means: Exit if no semaphore gotten.

This commit is contained in:
Ole Tange 2015-01-03 14:48:01 +01:00
parent a83c02c95d
commit 40668af537
6 changed files with 144 additions and 118 deletions

View file

@ -574,17 +574,11 @@ sub acquire_semaphore {
if($Semaphore::fg) { if($Semaphore::fg) {
# skip # skip
} else { } else {
# If run in the background, the PID will change
# therefore release and re-acquire the semaphore
$sem->release();
if(fork()) { if(fork()) {
exit(0); exit(0);
} else { } else {
# child # If run in the background, the PID will change
# Get a semaphore for this pid $sem->pid_change();
::die_bug("Can't start a new session: $!") if setsid() == -1;
$sem = Semaphore->new($Semaphore::name,$Global::host{':'}->max_jobs_running());
$sem->acquire();
} }
} }
return $sem; return $sem;
@ -715,7 +709,7 @@ sub options_hash {
"exit|x" => \$opt::x, "exit|x" => \$opt::x,
# Semaphore # Semaphore
"semaphore" => \$opt::semaphore, "semaphore" => \$opt::semaphore,
"semaphoretimeout=i" => \$opt::semaphoretimeout, "semaphoretimeout|st=i" => \$opt::semaphoretimeout,
"semaphorename|id=s" => \$opt::semaphorename, "semaphorename|id=s" => \$opt::semaphorename,
"fg" => \$opt::fg, "fg" => \$opt::fg,
"bg" => \$opt::bg, "bg" => \$opt::bg,
@ -8352,43 +8346,58 @@ sub new {
}, ref($class) || $class; }, ref($class) || $class;
} }
sub remove_dead_locks {
my $self = shift;
my $lockdir = $self->{'lockdir'};
for my $d (glob "$lockdir/*") {
$d =~ m:$lockdir/([0-9]+)\@([-\._a-z0-9]+)$:o or next;
my ($pid, $host) = ($1, $2);
if($host eq ::hostname()) {
if(not kill 0, $pid) {
::debug("sem", "Dead: $d\n");
unlink $d;
} else {
::debug("sem", "Alive: $d\n");
}
}
}
}
sub acquire { sub acquire {
my $self = shift; my $self = shift;
my $sleep = 1; # 1 ms my $sleep = 1; # 1 ms
my $start_time = time; my $start_time = time;
while(1) { while(1) {
# Can we get a lock?
$self->atomic_link_if_count_less_than() and last; $self->atomic_link_if_count_less_than() and last;
::debug("sem", "Remove dead locks"); $self->remove_dead_locks();
my $lockdir = $self->{'lockdir'};
for my $d (glob "$lockdir/*") {
::debug("sem", "Lock $d $lockdir\n");
$d =~ m:$lockdir/([0-9]+)\@([-\._a-z0-9]+)$:o or next;
my ($pid, $host) = ($1, $2);
if($host eq ::hostname()) {
if(not kill 0, $1) {
::debug("sem", "Dead: $d");
unlink $d;
} else {
::debug("sem", "Alive: $d");
}
}
}
# try again
$self->atomic_link_if_count_less_than() and last;
# Retry slower and slower up to 1 second # Retry slower and slower up to 1 second
$sleep = ($sleep < 1000) ? ($sleep * 1.1) : ($sleep); $sleep = ($sleep < 1000) ? ($sleep * 1.1) : ($sleep);
# Random to avoid every sleeping job waking up at the same time # Random to avoid every sleeping job waking up at the same time
::usleep(rand()*$sleep); ::usleep(rand()*$sleep);
if(defined($opt::timeout) and if($opt::semaphoretimeout) {
$start_time + $opt::timeout > time) { if($opt::semaphoretimeout > 0
# Acquire the lock anyway and
if(not -e $self->{'idfile'}) { time - $start_time > $opt::semaphoretimeout) {
open (my $fh, ">", $self->{'idfile'}) or # Timeout: Take the semaphore anyway
::die_bug("timeout_write_idfile: $self->{'idfile'}"); ::warning("Semaphore timed out. Stealing the semaphore.\n");
close $fh; if(not -e $self->{'idfile'}) {
open (my $fh, ">", $self->{'idfile'}) or
::die_bug("timeout_write_idfile: $self->{'idfile'}");
close $fh;
}
link $self->{'idfile'}, $self->{'pidfile'};
last;
}
if($opt::semaphoretimeout < 0
and
time - $start_time > -$opt::semaphoretimeout) {
# Timeout: Exit
::warning("Semaphore timed out. Exiting.\n");
exit(1);
last;
} }
link $self->{'idfile'}, $self->{'pidfile'};
last;
} }
} }
::debug("sem", "acquired $self->{'pid'}\n"); ::debug("sem", "acquired $self->{'pid'}\n");
@ -8409,66 +8418,35 @@ sub release {
::debug("run", "released $self->{'pid'}\n"); ::debug("run", "released $self->{'pid'}\n");
} }
sub _release { sub pid_change {
# This should do what release()+acquire() would do without having
# to re-acquire the semaphore
my $self = shift; my $self = shift;
unlink $self->{'pidfile'}; my $old_pidfile = $self->{'pidfile'};
$self->lock(); $self->{'pid'} = $$;
my $nlinks = $self->nlinks(); $self->{'pidfile'} = $self->{'lockdir'}."/".$$.'@'.::hostname();
::debug("sem", $nlinks, "<", $self->{'count'}); my $retval = link $self->{'idfile'}, $self->{'pidfile'};
if($nlinks-- > 1) { ::debug("sem","link($self->{'idfile'},$self->{'pidfile'})=$retval\n");
unlink $self->{'idfile'}; unlink $old_pidfile;
open (my $fh, ">", $self->{'idfile'}) or
::die_bug("write_idfile: $self->{'idfile'}");
print $fh "#"x$nlinks;
close $fh;
} else {
unlink $self->{'idfile'};
rmdir $self->{'lockdir'};
}
$self->unlock();
::debug("sem", "released $self->{'pid'}\n");
} }
sub atomic_link_if_count_less_than { sub atomic_link_if_count_less_than {
# Link $file1 to $file2 if nlinks to $file1 < $count
my $self = shift;
my $retval = 0;
$self->lock();
::debug($self->nlinks(), "<", $self->{'count'});
if($self->nlinks() < $self->{'count'}) {
-d $self->{'lockdir'} or mkdir_or_die($self->{'lockdir'});
if(not -e $self->{'idfile'}) {
open (my $fh, ">", $self->{'idfile'}) or
::die_bug("write_idfile: $self->{'idfile'}");
close $fh;
}
$retval = link $self->{'idfile'}, $self->{'pidfile'};
}
$self->unlock();
::debug("run", "atomic $retval");
return $retval;
}
sub _atomic_link_if_count_less_than {
# Link $file1 to $file2 if nlinks to $file1 < $count # Link $file1 to $file2 if nlinks to $file1 < $count
my $self = shift; my $self = shift;
my $retval = 0; my $retval = 0;
$self->lock(); $self->lock();
my $nlinks = $self->nlinks(); my $nlinks = $self->nlinks();
::debug("sem", $nlinks, "<", $self->{'count'}); ::debug("sem","$nlinks<$self->{'count'} ");
if($nlinks++ < $self->{'count'}) { if($nlinks < $self->{'count'}) {
-d $self->{'lockdir'} or mkdir_or_die($self->{'lockdir'}); -d $self->{'lockdir'} or mkdir_or_die($self->{'lockdir'});
if(not -e $self->{'idfile'}) { if(not -e $self->{'idfile'}) {
open (my $fh, ">", $self->{'idfile'}) or open (my $fh, ">", $self->{'idfile'}) or
::die_bug("write_idfile: $self->{'idfile'}"); ::die_bug("write_idfile: $self->{'idfile'}");
close $fh; close $fh;
} }
open (my $fh, ">", $self->{'idfile'}) or
::die_bug("write_idfile: $self->{'idfile'}");
print $fh "#"x$nlinks;
close $fh;
$retval = link $self->{'idfile'}, $self->{'pidfile'}; $retval = link $self->{'idfile'}, $self->{'pidfile'};
::debug("sem","link($self->{'idfile'},$self->{'pidfile'})=$retval\n");
} }
$self->unlock(); $self->unlock();
::debug("sem", "atomic $retval"); ::debug("sem", "atomic $retval");
@ -8478,7 +8456,6 @@ sub _atomic_link_if_count_less_than {
sub nlinks { sub nlinks {
my $self = shift; my $self = shift;
if(-e $self->{'idfile'}) { if(-e $self->{'idfile'}) {
::debug("sem", "nlinks", (stat(_))[3], "size", (stat(_))[7], "\n");
return (stat(_))[3]; return (stat(_))[3];
} else { } else {
return 0; return 0;
@ -8522,12 +8499,22 @@ sub lock {
::usleep(rand()*$sleep); ::usleep(rand()*$sleep);
$total_sleep += $sleep; $total_sleep += $sleep;
if($opt::semaphoretimeout) { if($opt::semaphoretimeout) {
if($total_sleep/1000 > $opt::semaphoretimeout) { if($opt::semaphoretimeout > 0
# Timeout: bail out and
::warning("Semaphore timed out. Ignoring timeout."); $total_sleep/1000 > $opt::semaphoretimeout) {
# Timeout: Take the semaphore anyway
::warning("Semaphore timed out. Taking the semaphore.");
$locked = 3; $locked = 3;
last; last;
} }
if($opt::semaphoretimeout < 0
and
$total_sleep/1000 > -$opt::semaphoretimeout) {
# Timeout: Exit
::warning("Semaphore timed out. Exiting.");
$locked = 4;
last;
}
} else { } else {
if($total_sleep/1000 > 30) { if($total_sleep/1000 > 30) {
::warning("Semaphore stuck for 30 seconds. Consider using --semaphoretimeout."); ::warning("Semaphore stuck for 30 seconds. Consider using --semaphoretimeout.");

View file

@ -1467,7 +1467,7 @@ operating system and the B<-s> option. Pipe the input from /dev/null
to do anything. to do anything.
=item B<--semaphore> =item B<--semaphore> (alpha testing)
Work as a counting semaphore. B<--semaphore> will cause GNU Work as a counting semaphore. B<--semaphore> will cause GNU
B<parallel> to start I<command> in the background. When the number of B<parallel> to start I<command> in the background. When the number of
@ -1504,9 +1504,13 @@ Implies B<--semaphore>.
See also B<man sem>. See also B<man sem>.
=item B<--semaphoretimeout> I<secs> =item B<--semaphoretimeout> I<secs> (alpha testing)
If the semaphore is not released within secs seconds, take it anyway. =item B<--st> I<secs> (alpha testing)
If I<secs> > 0: If the semaphore is not released within I<secs> seconds, take it anyway.
If I<secs> < 0: If the semaphore is not released within I<secs> seconds, exit.
Implies B<--semaphore>. Implies B<--semaphore>.

View file

@ -6,7 +6,7 @@ sem - semaphore for executing shell command lines in parallel
=head1 SYNOPSIS =head1 SYNOPSIS
B<sem> [--fg] [--id <id>] [--timeout <secs>] [-j <num>] [--wait] command B<sem> [--fg] [--id <id>] [--semaphoretimeout <secs>] [-j <num>] [--wait] command
=head1 DESCRIPTION =head1 DESCRIPTION
@ -128,11 +128,13 @@ The semaphore is stored in ~/.parallel/semaphores/
Do not put command in background. Do not put command in background.
=item B<--timeout> I<secs> (not implemented) =item B<--semaphoretimeout> I<secs> (alpha testing)
=item B<-t> I<secs> (not implemented) =item B<--st> I<secs> (alpha testing)
If the semaphore is not released within I<secs> seconds, take it anyway. If I<secs> > 0: If the semaphore is not released within I<secs> seconds, take it anyway.
If I<secs> < 0: If the semaphore is not released within I<secs> seconds, exit.
=item B<--wait> =item B<--wait>

View file

@ -1,47 +1,57 @@
#!/bin/bash #!/bin/bash
cat <<'EOF' | sed -e s/\$SERVER1/$SERVER1/\;s/\$SERVER2/$SERVER2/ | parallel -v -k --joblog /tmp/jl-`basename $0` -L1
echo '### Test mutex. This should not mix output'; echo '### Test mutex. This should not mix output';
parallel -u --semaphore seq 1 10 '|' pv -qL 20; parallel --semaphore --id mutex -u seq 1 10 '|' pv -qL 20;
parallel -u --semaphore seq 11 20 '|' pv -qL 100; parallel --semaphore --id mutex -u seq 11 20 '|' pv -qL 100;
parallel --semaphore --wait; parallel --semaphore --id mutex --wait;
echo done echo done
echo '### Test semaphore 2 jobs running simultaneously' echo '### Test semaphore 2 jobs running simultaneously'
parallel -u -j2 --semaphore 'echo job1a 1; sleep 1; echo job1b 3' parallel --semaphore --id 2jobs -u -j2 'echo job1a 1; sleep 1; echo job1b 3';
sleep 0.2 sleep 0.2;
parallel -u -j2 --semaphore 'echo job2a 2; sleep 1; echo job2b 5' parallel --semaphore --id 2jobs -u -j2 'echo job2a 2; sleep 1; echo job2b 5';
sleep 0.2 sleep 0.2;
parallel -u -j2 --semaphore 'echo job3a 4; sleep 1; echo job3b 6' parallel --semaphore --id 2jobs -u -j2 'echo job3a 4; sleep 1; echo job3b 6';
parallel --semaphore --wait parallel --semaphore --id 2jobs --wait;
echo done echo done
echo '### Test if parallel invoked as sem will run parallel --semaphore' echo '### Test if parallel invoked as sem will run parallel --semaphore'
sem -u -j2 'echo job1a 1; sleep 1; echo job1b 3' sem --id as_sem -u -j2 'echo job1a 1; sleep 1; echo job1b 3';
sleep 0.2 sleep 0.2;
sem -u -j2 'echo job2a 2; sleep 1; echo job2b 5' sem --id as_sem -u -j2 'echo job2a 2; sleep 1; echo job2b 5';
sleep 0.2 sleep 0.2;
sem -u -j2 'echo job3a 4; sleep 1; echo job3b 6' sem --id as_sem -u -j2 'echo job3a 4; sleep 1; echo job3b 6';
sem --wait sem --id as_sem --wait;
echo done echo done
echo '### Test similar example as from man page - run 2 jobs simultaneously' echo '### Test similar example as from man page - run 2 jobs simultaneously'
echo 'Expect done: 1 2 5 3 4' echo 'Expect done: 1 2 5 3 4'
for i in 5 1 2 3 4 ; do for i in 5 1 2 3 4 ; do
sleep 0.2; sleep 0.2;
echo Scheduling $i; echo Scheduling $i;
sem -j2 -u echo starting $i ";" sleep $i ";" echo done $i; sem -j2 --id ex2jobs -u echo starting $i ";" sleep $i ";" echo done $i;
done; done;
sem --wait sem --id ex2jobs --wait
echo '### Test --fg followed by --bg' echo '### Test --fg followed by --bg'
parallel -u --fg --semaphore seq 1 10 '|' pv -qL 30; parallel -u --id fgbg --fg --semaphore seq 1 10 '|' pv -qL 30;
parallel -u --bg --semaphore seq 11 20 '|' pv -qL 30; parallel -u --id fgbg --bg --semaphore seq 11 20 '|' pv -qL 30;
parallel -u --fg --semaphore seq 21 30 '|' pv -qL 30; parallel -u --id fgbg --fg --semaphore seq 21 30 '|' pv -qL 30;
parallel -u --bg --semaphore seq 31 40 '|' pv -qL 30; parallel -u --id fgbg --bg --semaphore seq 31 40 '|' pv -qL 30;
sem --wait sem --id fgbg --wait
echo '### Test bug #33621: --bg -p should give an error message' echo '### Test bug #33621: --bg -p should give an error message'
stdout parallel -p --bg echo x{} stdout parallel -p --bg echo x{}
echo '### Failed on 20141226' echo '### Failed on 20141226'
sem --fg --line-buffer --id lock_id echo OK sem --fg --line-buffer --id bugin20141226 echo OK
echo '### Test --st +1/-1'
stdout sem --id st --line-buffer "echo A normal-start;sleep 3;echo C normal-end";
stdout sem --id st --line-buffer --st 1 "echo B st1-start;sleep 3;echo D st1-end";
stdout sem --id st --line-buffer --st -1 "echo ERROR-st-1-start;sleep 3;echo ERROR-st-1-end";
stdout sem --id st --wait
EOF

View file

@ -11,8 +11,8 @@ echo '### niceload with no arguments should give no output'
niceload niceload
echo '### Test -t and -s' echo '### Test -t and -s'
### Test -t and -s ### Test -t and -s
# This should sleep 2*1s and run 2*2s # This should sleep at least 2*1s and run 2*2s
niceload -v -t 1 -s 2 sleep 4.5 stdout niceload -v -t 1 -s 2 sleep 4.5 | head -n 4
Sleeping 1s Sleeping 1s
Running 2s Running 2s
Sleeping 1s Sleeping 1s

View file

@ -1,3 +1,4 @@
echo '### Test mutex. This should not mix output'; parallel --semaphore --id mutex -u seq 1 10 '|' pv -qL 20; parallel --semaphore --id mutex -u seq 11 20 '|' pv -qL 100; parallel --semaphore --id mutex --wait; echo done
### Test mutex. This should not mix output ### Test mutex. This should not mix output
1 1
2 2
@ -20,7 +21,9 @@
19 19
20 20
done done
echo '### Test semaphore 2 jobs running simultaneously'
### Test semaphore 2 jobs running simultaneously ### Test semaphore 2 jobs running simultaneously
parallel --semaphore --id 2jobs -u -j2 'echo job1a 1; sleep 1; echo job1b 3'; sleep 0.2; parallel --semaphore --id 2jobs -u -j2 'echo job2a 2; sleep 1; echo job2b 5'; sleep 0.2; parallel --semaphore --id 2jobs -u -j2 'echo job3a 4; sleep 1; echo job3b 6'; parallel --semaphore --id 2jobs --wait; echo done
job1a 1 job1a 1
job2a 2 job2a 2
job1b 3 job1b 3
@ -28,7 +31,9 @@ job3a 4
job2b 5 job2b 5
job3b 6 job3b 6
done done
echo '### Test if parallel invoked as sem will run parallel --semaphore'
### Test if parallel invoked as sem will run parallel --semaphore ### Test if parallel invoked as sem will run parallel --semaphore
sem --id as_sem -u -j2 'echo job1a 1; sleep 1; echo job1b 3'; sleep 0.2; sem --id as_sem -u -j2 'echo job2a 2; sleep 1; echo job2b 5'; sleep 0.2; sem --id as_sem -u -j2 'echo job3a 4; sleep 1; echo job3b 6'; sem --id as_sem --wait; echo done
job1a 1 job1a 1
job2a 2 job2a 2
job1b 3 job1b 3
@ -36,8 +41,11 @@ job3a 4
job2b 5 job2b 5
job3b 6 job3b 6
done done
echo '### Test similar example as from man page - run 2 jobs simultaneously'
### Test similar example as from man page - run 2 jobs simultaneously ### Test similar example as from man page - run 2 jobs simultaneously
echo 'Expect done: 1 2 5 3 4'
Expect done: 1 2 5 3 4 Expect done: 1 2 5 3 4
for i in 5 1 2 3 4 ; do sleep 0.2; echo Scheduling $i; sem -j2 --id ex2jobs -u echo starting $i ";" sleep $i ";" echo done $i; done; sem --id ex2jobs --wait
Scheduling 5 Scheduling 5
starting 5 starting 5
Scheduling 1 Scheduling 1
@ -53,7 +61,9 @@ done 5
starting 4 starting 4
done 3 done 3
done 4 done 4
echo '### Test --fg followed by --bg'
### Test --fg followed by --bg ### Test --fg followed by --bg
parallel -u --id fgbg --fg --semaphore seq 1 10 '|' pv -qL 30; parallel -u --id fgbg --bg --semaphore seq 11 20 '|' pv -qL 30; parallel -u --id fgbg --fg --semaphore seq 21 30 '|' pv -qL 30; parallel -u --id fgbg --bg --semaphore seq 31 40 '|' pv -qL 30; sem --id fgbg --wait
1 1
2 2
3 3
@ -94,7 +104,20 @@ done 4
38 38
39 39
40 40
echo '### Test bug #33621: --bg -p should give an error message'
### Test bug #33621: --bg -p should give an error message ### Test bug #33621: --bg -p should give an error message
stdout parallel -p --bg echo x{}
parallel: Error: Jobs running in the background cannot be interactive. parallel: Error: Jobs running in the background cannot be interactive.
echo '### Failed on 20141226'
### Failed on 20141226 ### Failed on 20141226
sem --fg --line-buffer --id bugin20141226 echo OK
OK OK
echo '### Test --st +1/-1'
### Test --st +1/-1
stdout sem --id st --line-buffer "echo A normal-start;sleep 3;echo C normal-end"; stdout sem --id st --line-buffer --st 1 "echo B st1-start;sleep 3;echo D st1-end"; stdout sem --id st --line-buffer --st -1 "echo ERROR-st-1-start;sleep 3;echo ERROR-st-1-end"; stdout sem --id st --wait
A normal-start
parallel: Warning: Semaphore timed out. Stealing the semaphore.
B st1-start
C normal-end
parallel: Warning: Semaphore timed out. Exiting.
D st1-end