From 94b1c3ec5727c34bfaa23b20da094da75ad1b349 Mon Sep 17 00:00:00 2001 From: Ole Tange Date: Mon, 16 Aug 2010 18:46:30 +0200 Subject: [PATCH] Semaphore supporting code. Passes unittest --- src/parallel | 197 ++++++++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 195 insertions(+), 2 deletions(-) diff --git a/src/parallel b/src/parallel index fc340124..da99b328 100755 --- a/src/parallel +++ b/src/parallel @@ -184,6 +184,17 @@ Multiple B<-B> can be specified to transfer more basefiles. The I will be transferred the same way as B<--transfer>. +=item B<--bg> (not implemented) + +Run command in background thus GNU B will not wait for +completion of the command before exiting. This is the default if +B<--semaphore> is set. + +See also: B<--fg> + +Implies B<--semaphore>. + + =item B<--cleanup> Remove transferred files. B<--cleanup> will remove the transferred files @@ -270,6 +281,16 @@ jobs. GNU B normally only reads the next job to run. Implies B<--progress>. +=item B<--fg> (not implemented) + +Run command in foreground thus GNU B will wait for +completion of the command before exiting. + +See also: B<--bg> + +Implies B<--semaphore>. + + =item B<--file> =item B<-f> (Use B<--file> as B<-f> may be removed in later versions) @@ -592,11 +613,33 @@ B to start I in the background. When the number of simultaneous jobs is reached, GNU B will wait for one of these to complete before starting another command. -Used with B<--fg> and B<--semaphorename>. +B<--semaphore> implies B<--bg> unless B<--fg> is specified. + +B<--semaphore> implies B<--semaphorename `tty`> unless +B<--semaphorename> is specified. + +Used with B<--fg>, B<--wait>, and B<--semaphorename>. The command B is an alias for B. +=item B<--semaphorename> I (not implemented) + +=item B<--id> I (not implemented) + +The name of the semaphore to use. The semaphore can be shared between +multiple processes. + +Implies B<--semaphore>. + + +=item B<--semaphoretimeout> I (not implemented) + +If the semaphore is not released within secs seconds, take it anyway. + +Implies B<--semaphore>. + + =item B<-S> I<[ncpu/]sshlogin[,[ncpu/]sshlogin[,...]]> =item B<--sshlogin> I<[ncpu/]sshlogin[,[ncpu/]sshlogin[,...]]> @@ -774,6 +817,13 @@ B<--silent>. See also B<-t>. Print the version GNU B and exit. +=item B<--wait> (not implemented) + +Wait for all commands to complete. + +Implies B<--semaphore>. + + =item B<-X> xargs with context replace. This works like B<-m> except if B<{}> is part @@ -2039,6 +2089,13 @@ sub parse_options { "version|V" => \$::opt_version, "show-limits" => \$::opt_show_limits, "exit|x" => \$::opt_x, + # Semaphore + "semaphore" => \$::opt_semaphore, + "semaphoretimeout=i" => \$::opt_semaphoretimeout, + "semaphorename|id=s" => \$::opt_semaphorename, + "fg" => \$::opt_fg, + "bg" => \$::opt_bg, + "wait" => \$::opt_wait, ) || die_usage(); $Global::debug = (defined $::opt_D); if(defined $::opt_m) { $Global::xargs = 1; } @@ -2075,6 +2132,12 @@ sub parse_options { if(defined @::opt_sshlogin) { @Global::sshlogin = @::opt_sshlogin; } if(defined $::opt_sshloginfile) { read_sshloginfile($::opt_sshloginfile); } if(defined @::opt_return) { push @Global::ret_files, @::opt_return; } + if(defined $::opt_semaphore) { $Global::semaphore = 1; } + if(defined $::opt_semaphoretimeout) { $Global::semaphore = 1; } + if(defined $::opt_semaphorename) { $Global::semaphore = 1; } + if(defined $::opt_fg) { $Global::semaphore = 1; } + if(defined $::opt_bg) { $Global::semaphore = 1; } + if(defined $::opt_wait) { $Global::semaphore = 1; } if(defined @::opt_trc) { push @Global::ret_files, @::opt_trc; $::opt_transfer = 1; @@ -2159,6 +2222,14 @@ sub parse_options { $Global::default_simultaneous_sshlogins; } } + + # Semaphore defaults + if($Global::semaphore) { + $Semaphore::timeout = $::opt_semaphoretimeout || 0; + $Semaphore::name = $::opt_semaphorename || `tty`; + $Semaphore::fg = $::opt_fg; + $Semaphore::wait = $::opt_wait; + } } sub read_args_from_command_line { @@ -4148,5 +4219,127 @@ sub my_dump { } } +package Semaphore; + +# This package provides a counting semaphore +# +# If a process dies without releasing the semaphore the next process +# that needs that entry will clean up dead semaphores +# +# The semaphores are stored in ~/.parallel/semaphores/id- Each +# file in ~/.parallel/semaphores/id-/ is the process ID of the +# process holding the entry. If the process dies, the entry can be +# taken by another process. + +use Fcntl qw(:DEFAULT :flock); + +sub new { + my $class = shift; + my $id = shift; + my $count = shift; + my $parallel_locks = $ENV{'HOME'}."/.parallel/semaphores"; + -d $parallel_locks or mkdir $parallel_locks; + my $lockdir = "$parallel_locks/$id"; + my $lockfile = $lockdir.".lock"; + return bless { + 'lockfile' => $lockfile, + 'lockfh' => Symbol::gensym(), + 'lockdir' => $lockdir, + 'id' => $id, + 'idfile' => $lockdir."/".$id, + 'pid' => $$, + 'pidfile' => $lockdir."/".$$, + 'count' => $count + }, ref($class) || $class; +} + +sub acquire { + my $self = shift; + while(1) { + $self->atomic_link_if_count_less_than() and last; + ::debug("Remove dead locks"); + my $lockdir = $self->{'lockdir'}; + for my $d (<$lockdir/*>) { + $d =~ m:$lockdir/([0-9]+):o or next; + if(not kill 0, $1) { + ::debug("Dead: $d"); + unlink $d; + } else { + ::debug("Alive: $d"); + } + } + # try again + $self->atomic_link_if_count_less_than() and last; + sleep 1; + # TODO if timeout: last + } + ::debug("got $self->{'pid'}"); +} + +sub release { + my ($self) = shift; + unlink $self->{'pidfile'}; + if($self->nlinks() == 1) { + # This is the last link, so atomic cleanup + $self->lock(); + if($self->nlinks() == 1) { + unlink $self->{'idfile'}; + rmdir $self->{'lockdir'}; + } + $self->unlock(); + } + ::debug("released $self->{'pid'}"); +} + + +sub atomic_link_if_count_less_than { + # Link $file1 to $file2 if nlinks to $file1 < $count + my ($self) = shift; + my ($retval) = 0; + $self->lock(); + if($self->nlinks() < $self->{'count'}) { + -d $self->{'lockdir'} || mkdir $self->{'lockdir'}; + if(not -e $self->{'idfile'}) { + open (A, ">", $self->{'idfile'}) or die ">$self->{'idfile'}"; + close A; + } + $retval = link $self->{'idfile'}, $self->{'pidfile'}; + } + $self->unlock(); + ::debug("atomic $retval"); + return $retval; +} + +sub nlinks { + my $self = shift; + if(-e $self->{'idfile'}) { + return (stat(_))[3]; + } else { + return 0; + } +} + +sub lock { + my ($self) = shift; + open $self->{'lockfh'}, ">", $self->{'lockfile'} + or die "Can't open semaphore file $self->{'lockfile'}: $!"; + chmod 0666, $self->{'lockfile'}; # assuming you want it a+rw + while(not flock $self->{'lockfh'}, LOCK_EX()|LOCK_NB()) { + ::debug("Cannot lock $self->{'lockfile'}"); + # TODO if timeout: last + sleep 1; + } + ::debug("locked $self->{'lockfile'}"); +} + +sub unlock { + my $self = shift; + unlink $self->{'lockfile'}; + close $self->{'lockfh'}; + ::debug("unlocked"); +} + # Keep perl -w happy -$Private::control_path = 0; +$Private::control_path = $Semaphore::timeout = $Semaphore::name = +$Semaphore::wait = $Semaphore::fg = 0; +