Semaphore supporting code. Passes unittest

This commit is contained in:
Ole Tange 2010-08-16 18:46:30 +02:00
parent ac16e65b8a
commit 94b1c3ec57

View file

@ -184,6 +184,17 @@ Multiple B<-B> can be specified to transfer more basefiles. The
I<file> will be transferred the same way as B<--transfer>.
=item B<--bg> (not implemented)
Run command in background thus GNU B<parallel> will not wait for
completion of the command before exiting. This is the default if
B<--semaphore> is set.
See also: B<--fg>
Implies B<--semaphore>.
=item B<--cleanup>
Remove transferred files. B<--cleanup> will remove the transferred files
@ -270,6 +281,16 @@ jobs. GNU B<parallel> normally only reads the next job to run.
Implies B<--progress>.
=item B<--fg> (not implemented)
Run command in foreground thus GNU B<parallel> will wait for
completion of the command before exiting.
See also: B<--bg>
Implies B<--semaphore>.
=item B<--file>
=item B<-f> (Use B<--file> as B<-f> may be removed in later versions)
@ -592,11 +613,33 @@ B<parallel> to start I<command> in the background. When the number of
simultaneous jobs is reached, GNU B<parallel> will wait for one of
these to complete before starting another command.
Used with B<--fg> and B<--semaphorename>.
B<--semaphore> implies B<--bg> unless B<--fg> is specified.
B<--semaphore> implies B<--semaphorename `tty`> unless
B<--semaphorename> is specified.
Used with B<--fg>, B<--wait>, and B<--semaphorename>.
The command B<sem> is an alias for B<parallel --semaphore>.
=item B<--semaphorename> I<name> (not implemented)
=item B<--id> I<name> (not implemented)
The name of the semaphore to use. The semaphore can be shared between
multiple processes.
Implies B<--semaphore>.
=item B<--semaphoretimeout> I<secs> (not implemented)
If the semaphore is not released within secs seconds, take it anyway.
Implies B<--semaphore>.
=item B<-S> I<[ncpu/]sshlogin[,[ncpu/]sshlogin[,...]]>
=item B<--sshlogin> I<[ncpu/]sshlogin[,[ncpu/]sshlogin[,...]]>
@ -774,6 +817,13 @@ B<--silent>. See also B<-t>.
Print the version GNU B<parallel> and exit.
=item B<--wait> (not implemented)
Wait for all commands to complete.
Implies B<--semaphore>.
=item B<-X>
xargs with context replace. This works like B<-m> except if B<{}> is part
@ -2039,6 +2089,13 @@ sub parse_options {
"version|V" => \$::opt_version,
"show-limits" => \$::opt_show_limits,
"exit|x" => \$::opt_x,
# Semaphore
"semaphore" => \$::opt_semaphore,
"semaphoretimeout=i" => \$::opt_semaphoretimeout,
"semaphorename|id=s" => \$::opt_semaphorename,
"fg" => \$::opt_fg,
"bg" => \$::opt_bg,
"wait" => \$::opt_wait,
) || die_usage();
$Global::debug = (defined $::opt_D);
if(defined $::opt_m) { $Global::xargs = 1; }
@ -2075,6 +2132,12 @@ sub parse_options {
if(defined @::opt_sshlogin) { @Global::sshlogin = @::opt_sshlogin; }
if(defined $::opt_sshloginfile) { read_sshloginfile($::opt_sshloginfile); }
if(defined @::opt_return) { push @Global::ret_files, @::opt_return; }
if(defined $::opt_semaphore) { $Global::semaphore = 1; }
if(defined $::opt_semaphoretimeout) { $Global::semaphore = 1; }
if(defined $::opt_semaphorename) { $Global::semaphore = 1; }
if(defined $::opt_fg) { $Global::semaphore = 1; }
if(defined $::opt_bg) { $Global::semaphore = 1; }
if(defined $::opt_wait) { $Global::semaphore = 1; }
if(defined @::opt_trc) {
push @Global::ret_files, @::opt_trc;
$::opt_transfer = 1;
@ -2159,6 +2222,14 @@ sub parse_options {
$Global::default_simultaneous_sshlogins;
}
}
# Semaphore defaults
if($Global::semaphore) {
$Semaphore::timeout = $::opt_semaphoretimeout || 0;
$Semaphore::name = $::opt_semaphorename || `tty`;
$Semaphore::fg = $::opt_fg;
$Semaphore::wait = $::opt_wait;
}
}
sub read_args_from_command_line {
@ -4148,5 +4219,127 @@ sub my_dump {
}
}
package Semaphore;
# This package provides a counting semaphore
#
# If a process dies without releasing the semaphore the next process
# that needs that entry will clean up dead semaphores
#
# The semaphores are stored in ~/.parallel/semaphores/id-<name> Each
# file in ~/.parallel/semaphores/id-<name>/ is the process ID of the
# process holding the entry. If the process dies, the entry can be
# taken by another process.
use Fcntl qw(:DEFAULT :flock);
sub new {
my $class = shift;
my $id = shift;
my $count = shift;
my $parallel_locks = $ENV{'HOME'}."/.parallel/semaphores";
-d $parallel_locks or mkdir $parallel_locks;
my $lockdir = "$parallel_locks/$id";
my $lockfile = $lockdir.".lock";
return bless {
'lockfile' => $lockfile,
'lockfh' => Symbol::gensym(),
'lockdir' => $lockdir,
'id' => $id,
'idfile' => $lockdir."/".$id,
'pid' => $$,
'pidfile' => $lockdir."/".$$,
'count' => $count
}, ref($class) || $class;
}
sub acquire {
my $self = shift;
while(1) {
$self->atomic_link_if_count_less_than() and last;
::debug("Remove dead locks");
my $lockdir = $self->{'lockdir'};
for my $d (<$lockdir/*>) {
$d =~ m:$lockdir/([0-9]+):o or next;
if(not kill 0, $1) {
::debug("Dead: $d");
unlink $d;
} else {
::debug("Alive: $d");
}
}
# try again
$self->atomic_link_if_count_less_than() and last;
sleep 1;
# TODO if timeout: last
}
::debug("got $self->{'pid'}");
}
sub release {
my ($self) = shift;
unlink $self->{'pidfile'};
if($self->nlinks() == 1) {
# This is the last link, so atomic cleanup
$self->lock();
if($self->nlinks() == 1) {
unlink $self->{'idfile'};
rmdir $self->{'lockdir'};
}
$self->unlock();
}
::debug("released $self->{'pid'}");
}
sub atomic_link_if_count_less_than {
# Link $file1 to $file2 if nlinks to $file1 < $count
my ($self) = shift;
my ($retval) = 0;
$self->lock();
if($self->nlinks() < $self->{'count'}) {
-d $self->{'lockdir'} || mkdir $self->{'lockdir'};
if(not -e $self->{'idfile'}) {
open (A, ">", $self->{'idfile'}) or die ">$self->{'idfile'}";
close A;
}
$retval = link $self->{'idfile'}, $self->{'pidfile'};
}
$self->unlock();
::debug("atomic $retval");
return $retval;
}
sub nlinks {
my $self = shift;
if(-e $self->{'idfile'}) {
return (stat(_))[3];
} else {
return 0;
}
}
sub lock {
my ($self) = shift;
open $self->{'lockfh'}, ">", $self->{'lockfile'}
or die "Can't open semaphore file $self->{'lockfile'}: $!";
chmod 0666, $self->{'lockfile'}; # assuming you want it a+rw
while(not flock $self->{'lockfh'}, LOCK_EX()|LOCK_NB()) {
::debug("Cannot lock $self->{'lockfile'}");
# TODO if timeout: last
sleep 1;
}
::debug("locked $self->{'lockfile'}");
}
sub unlock {
my $self = shift;
unlink $self->{'lockfile'};
close $self->{'lockfh'};
::debug("unlocked");
}
# Keep perl -w happy
$Private::control_path = 0;
$Private::control_path = $Semaphore::timeout = $Semaphore::name =
$Semaphore::wait = $Semaphore::fg = 0;