--semaphore implemented. Unittest passes.

This commit is contained in:
Ole Tange 2010-08-17 00:25:11 +02:00
parent 94b1c3ec57
commit c7eeac015b
9 changed files with 198 additions and 485 deletions

View file

@ -64,8 +64,8 @@
'AM_SET_LEADING_DOT' => 1, 'AM_SET_LEADING_DOT' => 1,
'AM_SET_DEPDIR' => 1, 'AM_SET_DEPDIR' => 1,
'_AM_DEPENDENCIES' => 1, '_AM_DEPENDENCIES' => 1,
'AM_PROG_INSTALL_SH' => 1,
'm4_include' => 1, 'm4_include' => 1,
'AM_PROG_INSTALL_SH' => 1,
'_AC_AM_CONFIG_HEADER_HOOK' => 1, '_AC_AM_CONFIG_HEADER_HOOK' => 1,
'AU_DEFUN' => 1, 'AU_DEFUN' => 1,
'AM_MAKE_INCLUDE' => 1 'AM_MAKE_INCLUDE' => 1
@ -83,25 +83,25 @@
'configure.ac' 'configure.ac'
], ],
{ {
'_LT_AC_TAGCONFIG' => 1,
'AM_PROG_F77_C_O' => 1, 'AM_PROG_F77_C_O' => 1,
'm4_pattern_forbid' => 1, '_LT_AC_TAGCONFIG' => 1,
'AC_INIT' => 1, 'AC_INIT' => 1,
'_AM_COND_IF' => 1, 'm4_pattern_forbid' => 1,
'AC_CANONICAL_TARGET' => 1, 'AC_CANONICAL_TARGET' => 1,
'AC_SUBST' => 1, '_AM_COND_IF' => 1,
'AC_CONFIG_LIBOBJ_DIR' => 1, 'AC_CONFIG_LIBOBJ_DIR' => 1,
'AC_FC_SRCEXT' => 1, 'AC_SUBST' => 1,
'AC_CANONICAL_HOST' => 1, 'AC_CANONICAL_HOST' => 1,
'AC_FC_SRCEXT' => 1,
'AC_PROG_LIBTOOL' => 1, 'AC_PROG_LIBTOOL' => 1,
'AM_INIT_AUTOMAKE' => 1, 'AM_INIT_AUTOMAKE' => 1,
'AC_CONFIG_SUBDIRS' => 1, 'AC_CONFIG_SUBDIRS' => 1,
'AM_AUTOMAKE_VERSION' => 1, 'AM_AUTOMAKE_VERSION' => 1,
'LT_CONFIG_LTDL_DIR' => 1, 'LT_CONFIG_LTDL_DIR' => 1,
'AC_REQUIRE_AUX_FILE' => 1,
'AC_CONFIG_LINKS' => 1, 'AC_CONFIG_LINKS' => 1,
'm4_sinclude' => 1, 'AC_REQUIRE_AUX_FILE' => 1,
'LT_SUPPORTED_TAG' => 1, 'LT_SUPPORTED_TAG' => 1,
'm4_sinclude' => 1,
'AM_MAINTAINER_MODE' => 1, 'AM_MAINTAINER_MODE' => 1,
'AM_GNU_GETTEXT_INTL_SUBDIR' => 1, 'AM_GNU_GETTEXT_INTL_SUBDIR' => 1,
'_m4_warn' => 1, '_m4_warn' => 1,
@ -114,17 +114,17 @@
'include' => 1, 'include' => 1,
'AM_GNU_GETTEXT' => 1, 'AM_GNU_GETTEXT' => 1,
'AC_LIBSOURCE' => 1, 'AC_LIBSOURCE' => 1,
'AM_PROG_FC_C_O' => 1,
'AC_CANONICAL_BUILD' => 1, 'AC_CANONICAL_BUILD' => 1,
'AM_PROG_FC_C_O' => 1,
'AC_FC_FREEFORM' => 1, 'AC_FC_FREEFORM' => 1,
'AH_OUTPUT' => 1, 'AH_OUTPUT' => 1,
'_AM_SUBST_NOTMAKE' => 1,
'AC_CONFIG_AUX_DIR' => 1, 'AC_CONFIG_AUX_DIR' => 1,
'sinclude' => 1, '_AM_SUBST_NOTMAKE' => 1,
'm4_pattern_allow' => 1,
'AM_PROG_CC_C_O' => 1, 'AM_PROG_CC_C_O' => 1,
'AC_CANONICAL_SYSTEM' => 1, 'm4_pattern_allow' => 1,
'sinclude' => 1,
'AM_CONDITIONAL' => 1, 'AM_CONDITIONAL' => 1,
'AC_CANONICAL_SYSTEM' => 1,
'AC_CONFIG_HEADERS' => 1, 'AC_CONFIG_HEADERS' => 1,
'AC_DEFINE_TRACE_LITERAL' => 1, 'AC_DEFINE_TRACE_LITERAL' => 1,
'm4_include' => 1, 'm4_include' => 1,

View file

@ -6,17 +6,20 @@ parallel.1: parallel Makefile
pod2man --release='$(PACKAGE_VERSION)' --center='$(PACKAGE_NAME)' \ pod2man --release='$(PACKAGE_VERSION)' --center='$(PACKAGE_NAME)' \
--section=1 $(srcdir)/parallel > $(srcdir)/parallel.1 --section=1 $(srcdir)/parallel > $(srcdir)/parallel.1
sem.1: sem Makefile sem.1: sem.pod Makefile
pod2man --release='$(PACKAGE_VERSION)' --center='$(PACKAGE_NAME)' \ pod2man --release='$(PACKAGE_VERSION)' --center='$(PACKAGE_NAME)' \
--section=1 $(srcdir)/sem > $(srcdir)/sem.1 --section=1 $(srcdir)/sem.pod > $(srcdir)/sem.1
parallel.html: parallel Makefile parallel.html: parallel Makefile
pod2html $(srcdir)/parallel > $(srcdir)/parallel.html pod2html $(srcdir)/parallel > $(srcdir)/parallel.html
rm $(srcdir)/pod2htm* rm $(srcdir)/pod2htm*
sem.html: sem Makefile sem.html: sem.pod Makefile
pod2html $(srcdir)/sem > $(srcdir)/sem.html pod2html $(srcdir)/sem.pod > $(srcdir)/sem.html
rm $(srcdir)/pod2htm* rm $(srcdir)/pod2htm*
sem: parallel
ln -s parallel sem
DISTCLEANFILES = parallel.1 sem.1 parallel.html sem.html DISTCLEANFILES = parallel.1 sem.1 parallel.html sem.html
EXTRA_DIST = parallel sem parallel.1 sem.1 parallel.html sem.html EXTRA_DIST = parallel sem parallel.1 sem.1 parallel.html sem.html

View file

@ -447,18 +447,21 @@ parallel.1: parallel Makefile
pod2man --release='$(PACKAGE_VERSION)' --center='$(PACKAGE_NAME)' \ pod2man --release='$(PACKAGE_VERSION)' --center='$(PACKAGE_NAME)' \
--section=1 $(srcdir)/parallel > $(srcdir)/parallel.1 --section=1 $(srcdir)/parallel > $(srcdir)/parallel.1
sem.1: sem Makefile sem.1: sem.pod Makefile
pod2man --release='$(PACKAGE_VERSION)' --center='$(PACKAGE_NAME)' \ pod2man --release='$(PACKAGE_VERSION)' --center='$(PACKAGE_NAME)' \
--section=1 $(srcdir)/sem > $(srcdir)/sem.1 --section=1 $(srcdir)/sem.pod > $(srcdir)/sem.1
parallel.html: parallel Makefile parallel.html: parallel Makefile
pod2html $(srcdir)/parallel > $(srcdir)/parallel.html pod2html $(srcdir)/parallel > $(srcdir)/parallel.html
rm $(srcdir)/pod2htm* rm $(srcdir)/pod2htm*
sem.html: sem Makefile sem.html: sem.pod Makefile
pod2html $(srcdir)/sem > $(srcdir)/sem.html pod2html $(srcdir)/sem.pod > $(srcdir)/sem.html
rm $(srcdir)/pod2htm* rm $(srcdir)/pod2htm*
sem: parallel
ln -s parallel sem
# Tell versions [3.59,3.63) of GNU make to not export all variables. # Tell versions [3.59,3.63) of GNU make to not export all variables.
# Otherwise a system limit (for SysV at least) may be exceeded. # Otherwise a system limit (for SysV at least) may be exceeded.
.NOEXPORT: .NOEXPORT:

View file

@ -1987,16 +1987,52 @@ use strict;
do_not_reap(); do_not_reap();
parse_options(); parse_options();
init_run_jobs(); init_run_jobs();
if($Global::semaphore) {
run_as_semaphore();
} else {
start_more_jobs(); start_more_jobs();
}
reap_if_needed(); reap_if_needed();
drain_job_queue(); drain_job_queue();
cleanup(); cleanup();
if($Global::semaphore) {
exit $Global::exitstatus;
}
if($::opt_halt_on_error) { if($::opt_halt_on_error) {
wait_and_exit($Global::halt_on_error_exitstatus); wait_and_exit($Global::halt_on_error_exitstatus);
} else { } else {
wait_and_exit(min($Global::exitstatus,254)); wait_and_exit(min($Global::exitstatus,254));
} }
sub run_as_semaphore {
my $sem = Semaphore->new($Semaphore::name,$Global::host{':'}{'max_no_of_running'});
$sem->acquire();
debug("run");
$Global::argfile = open_or_exit("/dev/null");
unget_arg("");
if($Semaphore::fg) {
start_more_jobs();
$sem->release();
} else {
# If run in the background, the PID will change
# therefore release and re-acquire the semaphore
$sem->release();
if(not fork()) {
# child
# Get a semaphore for this pid
my $child_sem = Semaphore->new($Semaphore::name,$Global::host{':'}{'max_no_of_running'});
$child_sem->acquire();
start_more_jobs();
reap_if_needed();
drain_job_queue();
cleanup();
$child_sem->release();
} else {
exit(0);
}
}
}
sub parse_options { sub parse_options {
# Returns: N/A # Returns: N/A
# Defaults: # Defaults:
@ -2209,6 +2245,22 @@ sub parse_options {
print STDERR ("Warning: using -X or -m with --sshlogin may fail\n"); print STDERR ("Warning: using -X or -m with --sshlogin may fail\n");
} }
# Semaphore defaults
# Must be done before computing number of processes
$Global::semaphore ||= ($0 =~ m:(^|/)sem$:); # called as 'sem'
if($Global::semaphore) {
$Semaphore::timeout = $::opt_semaphoretimeout || 0;
if(defined $::opt_semaphorename) {
$Semaphore::name = $::opt_semaphorename;
} else {
$Semaphore::name = `tty`;
chomp $Semaphore::name;
}
$Semaphore::fg = $::opt_fg;
$Semaphore::wait = $::opt_wait;
$Global::default_simultaneous_sshlogins = 1;
}
# Needs to be done after setting $Global::command and $Global::command_line_max_len # Needs to be done after setting $Global::command and $Global::command_line_max_len
# as '-m' influences the number of commands that needs to be run # as '-m' influences the number of commands that needs to be run
if(defined $::opt_P) { if(defined $::opt_P) {
@ -2222,14 +2274,6 @@ sub parse_options {
$Global::default_simultaneous_sshlogins; $Global::default_simultaneous_sshlogins;
} }
} }
# Semaphore defaults
if($Global::semaphore) {
$Semaphore::timeout = $::opt_semaphoretimeout || 0;
$Semaphore::name = $::opt_semaphorename || `tty`;
$Semaphore::fg = $::opt_fg;
$Semaphore::wait = $::opt_wait;
}
} }
sub read_args_from_command_line { sub read_args_from_command_line {
@ -2768,12 +2812,14 @@ sub processes_available_by_system_limit {
do { do {
$system_limit++; $system_limit++;
if(not $Global::semaphore) {
# If there are no more command lines, then we have a process # If there are no more command lines, then we have a process
# per command line, so no need to go further # per command line, so no need to go further
($next_command_line, $args_ref) = get_command_line(); ($next_command_line, $args_ref) = get_command_line();
if(defined $next_command_line) { if(defined $next_command_line) {
push(@command_lines, $next_command_line, $args_ref); push(@command_lines, $next_command_line, $args_ref);
} }
}
# Every simultaneous process uses 2 filehandles when grouping # Every simultaneous process uses 2 filehandles when grouping
$more_filehandles = open($fh{$system_limit*2},"</dev/null") $more_filehandles = open($fh{$system_limit*2},"</dev/null")
@ -2792,7 +2838,7 @@ sub processes_available_by_system_limit {
} else { } else {
$max_system_proc_reached = 1; $max_system_proc_reached = 1;
} }
debug("Time to fork ten procs ", time-$time, " process ", $system_limit); debug("Time to fork ten procs: ", time-$time, " (processes so far: ", $system_limit,")\n");
if(time-$time > 2) { if(time-$time > 2) {
# It took more than 2 second to fork ten processes. We should stop forking. # It took more than 2 second to fork ten processes. We should stop forking.
# Let us give the system a little slack # Let us give the system a little slack
@ -2802,7 +2848,7 @@ sub processes_available_by_system_limit {
$spawning_too_slow = 1; $spawning_too_slow = 1;
} }
} while($system_limit < $wanted_processes } while($system_limit < $wanted_processes
and defined $next_command_line and (defined $next_command_line or $Global::semaphore)
and $more_filehandles and $more_filehandles
and not $max_system_proc_reached and not $max_system_proc_reached
and not $spawning_too_slow); and not $spawning_too_slow);
@ -4237,6 +4283,8 @@ sub new {
my $class = shift; my $class = shift;
my $id = shift; my $id = shift;
my $count = shift; my $count = shift;
$id=~s/([^-_a-z0-9])/unpack("H*",$1)/ige; # Convert non-word chars to hex
$id="id-".$id; # To distinguish it from a process id
my $parallel_locks = $ENV{'HOME'}."/.parallel/semaphores"; my $parallel_locks = $ENV{'HOME'}."/.parallel/semaphores";
-d $parallel_locks or mkdir $parallel_locks; -d $parallel_locks or mkdir $parallel_locks;
my $lockdir = "$parallel_locks/$id"; my $lockdir = "$parallel_locks/$id";
@ -4249,7 +4297,7 @@ sub new {
'idfile' => $lockdir."/".$id, 'idfile' => $lockdir."/".$id,
'pid' => $$, 'pid' => $$,
'pidfile' => $lockdir."/".$$, 'pidfile' => $lockdir."/".$$,
'count' => $count 'count' => $count + 1 # nlinks returns a link for the 'id-' as well
}, ref($class) || $class; }, ref($class) || $class;
} }
@ -4260,7 +4308,7 @@ sub acquire {
::debug("Remove dead locks"); ::debug("Remove dead locks");
my $lockdir = $self->{'lockdir'}; my $lockdir = $self->{'lockdir'};
for my $d (<$lockdir/*>) { for my $d (<$lockdir/*>) {
$d =~ m:$lockdir/([0-9]+):o or next; $d =~ m:$lockdir/([0-9]+)$:o or next;
if(not kill 0, $1) { if(not kill 0, $1) {
::debug("Dead: $d"); ::debug("Dead: $d");
unlink $d; unlink $d;
@ -4273,7 +4321,7 @@ sub acquire {
sleep 1; sleep 1;
# TODO if timeout: last # TODO if timeout: last
} }
::debug("got $self->{'pid'}"); ::debug("acquired $self->{'pid'}\n");
} }
sub release { sub release {
@ -4288,7 +4336,7 @@ sub release {
} }
$self->unlock(); $self->unlock();
} }
::debug("released $self->{'pid'}"); ::debug("released $self->{'pid'}\n");
} }
@ -4297,6 +4345,7 @@ sub atomic_link_if_count_less_than {
my ($self) = shift; my ($self) = shift;
my ($retval) = 0; my ($retval) = 0;
$self->lock(); $self->lock();
::debug($self->nlinks()."<".$self->{'count'});
if($self->nlinks() < $self->{'count'}) { if($self->nlinks() < $self->{'count'}) {
-d $self->{'lockdir'} || mkdir $self->{'lockdir'}; -d $self->{'lockdir'} || mkdir $self->{'lockdir'};
if(not -e $self->{'idfile'}) { if(not -e $self->{'idfile'}) {
@ -4313,6 +4362,7 @@ sub atomic_link_if_count_less_than {
sub nlinks { sub nlinks {
my $self = shift; my $self = shift;
if(-e $self->{'idfile'}) { if(-e $self->{'idfile'}) {
::debug("nlinks".((stat(_))[3])."\n");
return (stat(_))[3]; return (stat(_))[3];
} else { } else {
return 0; return 0;
@ -4336,10 +4386,10 @@ sub unlock {
my $self = shift; my $self = shift;
unlink $self->{'lockfile'}; unlink $self->{'lockfile'};
close $self->{'lockfh'}; close $self->{'lockfh'};
::debug("unlocked"); ::debug("unlocked\n");
} }
# Keep perl -w happy # Keep perl -w happy
$Private::control_path = $Semaphore::timeout = $Semaphore::name = $Private::control_path = $Semaphore::timeout =
$Semaphore::wait = $Semaphore::fg = 0; $Semaphore::wait = 0;

431
src/sem
View file

@ -1,431 +0,0 @@
#!/usr/bin/perl -w
=head1 NAME
sem - semaphore for executing shell command lines in parallel
=head1 SYNOPSIS
B<sem> [--fg] [--id <id>] [--timeout <secs>] [--count <num>] [--wait] command
=head1 DESCRIPTION
GNU B<sem> is a tool for executing shell commands in parallel. GNU
B<sem> acts as a counting semaphore. When GNU B<sem> is called with
command it will start the command in the background. When I<num>
number of commands are running in the background, GNU B<sem> will wait
for one of these to complete before starting another command.
GNU B<sem> is a supplement to GNU B<parallel> and can be used if the
loop is too complex to put into a GNU B<parallel> command.
Before looking at the options you may want to check out the examples
after the list of options. That will give you an idea of what GNU
B<sem> is capable of.
=head1 OPTIONS
=over 9
=item I<command>
Command to execute. The command may be followed by arguments for the command.
=item B<--count> I<N>
=item B<-j> I<N>
Run up to N commands in parallel. Default is 1 thus acting like a
mutex.
=item B<--id> I<id>
=item B<-i> I<id>
Use B<id> as the name of the semaphore. Default is the name of the
controlling tty (output from B<tty>).
The default normally works as expected when used interactively, but
when used in a script I<id> should be set. $$ is often a good value.
=item B<--fg>
Do not put command in background.
=item B<--timeout> I<secs> (not implemented)
=item B<-t> I<secs> (not implemented)
If the semaphore is not released within I<secs> seconds, take it anyway.
=item B<--wait>
=item B<-w>
Wait for all commands to complete.
=back
=head1 EXAMPLE: Gzipping *.log
for i in `ls *.log` ; do
echo $i
sem gzip $i ";" echo done
done
sem -w
=head1 BUGS
Quoting and composed commands are not working.
=head1 REPORTING BUGS
Report bugs to <bug-parallel@gnu.org>.
=head1 AUTHOR
Copyright (C) 2010 Ole Tange, http://ole.tange.dk and Free Software
Foundation, Inc.
=head1 LICENSE
Copyright (C) 2010 Free Software Foundation, Inc.
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; either version 3 of the License, or
at your option any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
=head2 Documentation license I
Permission is granted to copy, distribute and/or modify this documentation
under the terms of the GNU Free Documentation License, Version 1.3 or
any later version published by the Free Software Foundation; with no
Invariant Sections, with no Front-Cover Texts, and with no Back-Cover
Texts. A copy of the license is included in the file fdl.txt.
=head2 Documentation license II
You are free:
=over 9
=item B<to Share>
to copy, distribute and transmit the work
=item B<to Remix>
to adapt the work
=back
Under the following conditions:
=over 9
=item B<Attribution>
You must attribute the work in the manner specified by the author or
licensor (but not in any way that suggests that they endorse you or
your use of the work).
=item B<Share Alike>
If you alter, transform, or build upon this work, you may distribute
the resulting work only under the same, similar or a compatible
license.
=back
With the understanding that:
=over 9
=item B<Waiver>
Any of the above conditions can be waived if you get permission from
the copyright holder.
=item B<Public Domain>
Where the work or any of its elements is in the public domain under
applicable law, that status is in no way affected by the license.
=item B<Other Rights>
In no way are any of the following rights affected by the license:
=over 2
=item *
Your fair dealing or fair use rights, or other applicable
copyright exceptions and limitations;
=item *
The author's moral rights;
=item *
Rights other persons may have either in the work itself or in
how the work is used, such as publicity or privacy rights.
=back
=back
=over 9
=item B<Notice>
For any reuse or distribution, you must make clear to others the
license terms of this work.
=back
A copy of the full license is included in the file as cc-by-sa.txt.
=head1 DEPENDENCIES
GNU B<sem> uses Perl, and the Perl modules Getopt::Long,
Symbol, Fcntl.
=head1 SEE ALSO
B<parallel>(1)
=cut
use strict;
use Symbol qw(gensym);
use Getopt::Long;
Getopt::Long::Configure ("bundling","require_order");
GetOptions("debug|D" => \$::opt_D,
"id|i=s" => \$::opt_id,
"count|j=i" => \$::opt_count,
"fg" => \$::opt_fg,
"timeout|t=i" => \$::opt_timeout,
"version" => \$::opt_version,
"wait|w" => \$::opt_wait,
) || die_usage();
$Global::debug = $::opt_D;
$Global::version = 20100814;
$Global::progname = 'sem';
my $count = 1; # Default 1 = mutex
if($::opt_count) {
$count = $::opt_count + 1;
}
if($::opt_wait) {
$count = 1;
}
my $id = $::opt_id;
my $fg = $::opt_fg || $::opt_wait;
$::opt_timeout = $::opt_timeout;
if(defined $::opt_version) {
version();
}
if(not defined $id) {
# $id = getppid();
# does not work with:
# find . -name '*linux*' -exec sem -j1000 "sleep 3; echo `tty` '{}'" \; ; sem --wait echo done
$id = `tty`;
}
$id = "id-$id";
$id=~s/([^-_a-z0-9])/unpack("H*",$1)/ige; # Convert non-word chars to hex
my $sem = Semaphore->new($id,$count);
$sem->acquire();
debug("run");
if($fg) {
system @ARGV;
$sem->release();
} else {
# If run in the background, the PID will change
# therefore release and re-acquire the semaphore
$sem->release();
if(not fork()) {
# child
# Get a semaphore for this pid
my $child_sem = Semaphore->new($id,$count);
$child_sem->acquire();
system @ARGV;
$child_sem->release();
}
}
sub version {
# Returns: N/A
print join("\n",
"GNU $Global::progname $Global::version",
"Copyright (C) 2010 Ole Tange and Free Software Foundation, Inc.",
"License GPLv3+: GNU GPL version 3 or later <http://gnu.org/licenses/gpl.html>",
"This is free software: you are free to change and redistribute it.",
"GNU $Global::progname comes with no warranty.",
"",
"Web site: http://www.gnu.org/software/parallel\n"
);
}
sub usage {
# Returns: N/A
print "Usage:\n";
print "$Global::progname [options] [command [arguments]] < list_of_arguments)\n";
print "$Global::progname [options] [command [arguments]] ::: arguments\n";
print "$Global::progname [options] [command [arguments]] :::: argfile(s)\n";
print "\n";
print "See 'man $Global::progname' for the options\n";
}
sub die_usage {
usage();
exit(255);
}
sub debug {
# Returns: N/A
$Global::debug or return;
@_ = grep { defined $_ ? $_ : "" } @_;
print map {$_,"\n" } @_;
}
package Semaphore;
# This package provides a counting semaphore
#
# If a process dies without releasing the semaphore the next process
# that needs that entry will clean up dead semaphores
#
# The semaphores are stored in ~/.parallel/semaphores/id-<name> Each
# file in ~/.parallel/semaphores/id-<name>/ is the process ID of the
# process holding the entry. If the process dies, the entry can be
# taken by another process.
use Fcntl qw(:DEFAULT :flock);
sub new {
my $class = shift;
my $id = shift;
my $count = shift;
my $parallel_locks = $ENV{'HOME'}."/.parallel/semaphores";
-d $parallel_locks or mkdir $parallel_locks;
my $lockdir = "$parallel_locks/$id";
my $lockfile = $lockdir.".lock";
return bless {
'lockfile' => $lockfile,
'lockfh' => Symbol::gensym(),
'lockdir' => $lockdir,
'id' => $id,
'idfile' => $lockdir."/".$id,
'pid' => $$,
'pidfile' => $lockdir."/".$$,
'count' => $count
}, ref($class) || $class;
}
sub acquire {
my $self = shift;
while(1) {
$self->atomic_link_if_count_less_than() and last;
::debug("Remove dead locks");
my $lockdir = $self->{'lockdir'};
for my $d (<$lockdir/*>) {
$d =~ m:$lockdir/([0-9]+):o or next;
if(not kill 0, $1) {
::debug("Dead: $d");
unlink $d;
} else {
::debug("Alive: $d");
}
}
# try again
$self->atomic_link_if_count_less_than() and last;
sleep 1;
# TODO if timeout: last
}
::debug("got $self->{'pid'}");
}
sub release {
my ($self) = shift;
unlink $self->{'pidfile'};
if($self->nlinks() == 1) {
# This is the last link, so atomic cleanup
$self->lock();
if($self->nlinks() == 1) {
unlink $self->{'idfile'};
rmdir $self->{'lockdir'};
}
$self->unlock();
}
::debug("released $self->{'pid'}");
}
sub atomic_link_if_count_less_than {
# Link $file1 to $file2 if nlinks to $file1 < $count
my ($self) = shift;
my ($retval) = 0;
$self->lock();
if($self->nlinks() < $count) {
-d $self->{'lockdir'} || mkdir $self->{'lockdir'};
if(not -e $self->{'idfile'}) {
open (A, ">", $self->{'idfile'}) or die ">$self->{'idfile'}";
close A;
}
$retval = link $self->{'idfile'}, $self->{'pidfile'};
}
$self->unlock();
::debug("atomic $retval");
return $retval;
}
sub nlinks {
my $self = shift;
if(-e $self->{'idfile'}) {
return (stat(_))[3];
} else {
return 0;
}
}
sub lock {
my ($self) = shift;
open $self->{'lockfh'}, ">", $self->{'lockfile'}
or die "Can't open semaphore file $self->{'lockfile'}: $!";
chmod 0666, $self->{'lockfile'}; # assuming you want it a+rw
while(not flock $self->{'lockfh'}, LOCK_EX()|LOCK_NB()) {
::debug("Cannot lock $self->{'lockfile'}");
# TODO if timeout: last
sleep 1;
}
::debug("locked $self->{'lockfile'}");
}
sub unlock {
my $self = shift;
unlink $self->{'lockfile'};
close $self->{'lockfh'};
::debug("unlocked");
}

1
src/sem Symbolic link
View file

@ -0,0 +1 @@
parallel

View file

@ -8,6 +8,7 @@ unittest: ../src/parallel tests-to-run/* wanted-results/*
echo 1+2 | bc | mop || (echo bc is required for unittest; /bin/false) echo 1+2 | bc | mop || (echo bc is required for unittest; /bin/false)
stdout gawk | mop || (echo gawk is required for unittest; /bin/false) stdout gawk | mop || (echo gawk is required for unittest; /bin/false)
expect -c 'spawn cat; puts "expect is installed"' || (echo expect is required for unittest; /bin/false) expect -c 'spawn cat; puts "expect is installed"' || (echo expect is required for unittest; /bin/false)
echo | pv -qL 10 || (echo pv is required for unittest; /bin/false)
time sh Start.sh time sh Start.sh
date date

View file

@ -1,9 +1,32 @@
#!/bin/bash #!/bin/bash
echo '### Test mutex. This should take 3 seconds' echo '### Test mutex. This should not mix output'
sem 'sleep 1; echo foo' parallel -u --semaphore seq 1 10 '|' pv -qL 20
sem 'sleep 1; echo foo' parallel -u --semaphore seq 11 20 '|' pv -qL 100
sem 'sleep 1; echo foo' parallel --semaphore --wait
echo done
echo '### Test default id = --id `tty`'
parallel --id `tty` -u --semaphore seq 1 10 '|' pv -qL 20
parallel -u --semaphore seq 11 20 '|' pv -qL 100
parallel --id `tty` --semaphore --wait
echo done
echo '### Test semaphore 2 jobs running simultaneously'
parallel -u -j2 --semaphore 'echo job1; sleep 0.5; echo job1'
parallel -u -j2 --semaphore 'echo job2; sleep 0.5; echo job2'
parallel --semaphore --wait
echo done
echo '### Test if parallel invoked as sem will run parallel --semaphore'
sem -u -j2 'echo job1; sleep 0.5; echo job1'
sem -u -j2 'echo job2; sleep 0.5; echo job2'
sem --wait sem --wait
echo done
echo '### Test similar example as from man page'
for i in 0.5 0.1 0.2 0.3 0.4 ; do
echo $i
sem -j+0 sleep $i ";" echo done $i
done
sem --wait

View file

@ -10,7 +10,7 @@ echo '### Test of --trim illegal'
stdout parallel --trim fj ::: echo stdout parallel --trim fj ::: echo
echo '### Test of eof string on :::' echo '### Test of eof string on :::'
parallel -E ole echo ::: foo ole bar parallel -k -E ole echo ::: foo ole bar
echo '### Test of ignore-empty string on :::' echo '### Test of ignore-empty string on :::'
parallel -k -r echo ::: foo '' ole bar parallel -k -r echo ::: foo '' ole bar

View file

@ -1,4 +1,67 @@
### Test mutex. This should take 3 seconds ### Test mutex. This should not mix output
foo 1
foo 2
foo 3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
done
### Test default id = --id `tty`
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
done
### Test semaphore 2 jobs running simultaneously
job1
job2
job1
job2
done
### Test if parallel invoked as sem will run parallel --semaphore
job1
job2
job1
job2
done
### Test similar example as from man page
0.5
0.1
0.2
done 0.1
0.3
done 0.5
done 0.2
0.4
done 0.3
done 0.4