parallel/src/sem.pod
Ole Tange 847841aa11 BSD xargs -o (open /dev/tty) is now default for the job running in foreground.
Useful for: ls | parallel -Xuj1 vi.
Unittest for tty commands using the command 'script'.
2010-09-05 12:22:08 +02:00

434 lines
9.8 KiB
Plaintext
Executable file

#!/usr/bin/perl -w
=head1 NAME
sem - semaphore for executing shell command lines in parallel
=head1 SYNOPSIS
B<sem> [--fg] [--id <id>] [--timeout <secs>] [-j <num>] [--wait] command
=head1 DESCRIPTION
GNU B<sem> is an alias for GNU B<parallel --semaphore>.
It works as a tool for executing shell commands in parallel. GNU
B<sem> acts as a counting semaphore. When GNU B<sem> is called with
command it will start the command in the background. When I<num>
number of commands are running in the background, GNU B<sem> will wait
for one of these to complete before starting another command.
Before looking at the options you may want to check out the examples
after the list of options. That will give you an idea of what GNU
B<sem> is capable of.
=head1 OPTIONS
=over 9
=item I<command>
Command to execute. The command may be followed by arguments for the command.
=item B<--count> I<N>
=item B<-j> I<N>
Run up to N commands in parallel. Default is 1 thus acting like a
mutex.
=item B<--id> I<id>
=item B<-i> I<id>
Use B<id> as the name of the semaphore. Default is the name of the
controlling tty (output from B<tty>).
The default normally works as expected when used interactively, but
when used in a script I<id> should be set. $$ is often a good value.
=item B<--fg>
Do not put command in background.
=item B<--timeout> I<secs> (not implemented)
=item B<-t> I<secs> (not implemented)
If the semaphore is not released within I<secs> seconds, take it anyway.
=item B<--wait>
=item B<-w>
Wait for all commands to complete.
=back
=head1 EXAMPLE: Gzipping *.log
Run one gzip process per CPU core. Block until a CPU core becomes
available.
for i in `ls *.log` ; do
echo $i
sem -j+0 gzip $i ";" echo done
done
sem --wait
=head1 BUGS
Quoting and composed commands are not working.
=head1 REPORTING BUGS
Report bugs to <bug-parallel@gnu.org>.
=head1 AUTHOR
Copyright (C) 2010 Ole Tange, http://ole.tange.dk and Free Software
Foundation, Inc.
=head1 LICENSE
Copyright (C) 2010 Free Software Foundation, Inc.
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; either version 3 of the License, or
at your option any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
=head2 Documentation license I
Permission is granted to copy, distribute and/or modify this documentation
under the terms of the GNU Free Documentation License, Version 1.3 or
any later version published by the Free Software Foundation; with no
Invariant Sections, with no Front-Cover Texts, and with no Back-Cover
Texts. A copy of the license is included in the file fdl.txt.
=head2 Documentation license II
You are free:
=over 9
=item B<to Share>
to copy, distribute and transmit the work
=item B<to Remix>
to adapt the work
=back
Under the following conditions:
=over 9
=item B<Attribution>
You must attribute the work in the manner specified by the author or
licensor (but not in any way that suggests that they endorse you or
your use of the work).
=item B<Share Alike>
If you alter, transform, or build upon this work, you may distribute
the resulting work only under the same, similar or a compatible
license.
=back
With the understanding that:
=over 9
=item B<Waiver>
Any of the above conditions can be waived if you get permission from
the copyright holder.
=item B<Public Domain>
Where the work or any of its elements is in the public domain under
applicable law, that status is in no way affected by the license.
=item B<Other Rights>
In no way are any of the following rights affected by the license:
=over 2
=item *
Your fair dealing or fair use rights, or other applicable
copyright exceptions and limitations;
=item *
The author's moral rights;
=item *
Rights other persons may have either in the work itself or in
how the work is used, such as publicity or privacy rights.
=back
=back
=over 9
=item B<Notice>
For any reuse or distribution, you must make clear to others the
license terms of this work.
=back
A copy of the full license is included in the file as cc-by-sa.txt.
=head1 DEPENDENCIES
GNU B<sem> uses Perl, and the Perl modules Getopt::Long,
Symbol, Fcntl.
=head1 SEE ALSO
B<parallel>(1)
=cut
use strict;
use Symbol qw(gensym);
use Getopt::Long;
Getopt::Long::Configure ("bundling","require_order");
GetOptions("debug|D" => \$::opt_D,
"id|i=s" => \$::opt_id,
"count|j=i" => \$::opt_count,
"fg" => \$::opt_fg,
"timeout|t=i" => \$::opt_timeout,
"version" => \$::opt_version,
"wait|w" => \$::opt_wait,
) || die_usage();
$Global::debug = $::opt_D;
$Global::version = 20100814;
$Global::progname = 'sem';
my $count = 1; # Default 1 = mutex
if($::opt_count) {
$count = $::opt_count + 1;
}
if($::opt_wait) {
$count = 1;
}
my $id = $::opt_id;
my $fg = $::opt_fg || $::opt_wait;
$::opt_timeout = $::opt_timeout;
if(defined $::opt_version) {
version();
}
if(not defined $id) {
# $id = getppid();
# does not work with:
# find . -name '*linux*' -exec sem -j1000 "sleep 3; echo `tty` '{}'" \; ; sem --wait echo done
$id = `tty`;
}
$id = "id-$id";
$id=~s/([^-_a-z0-9])/unpack("H*",$1)/ige; # Convert non-word chars to hex
my $sem = Semaphore->new($id,$count);
$sem->acquire();
debug("run");
if($fg) {
system @ARGV;
$sem->release();
} else {
# If run in the background, the PID will change
# therefore release and re-acquire the semaphore
$sem->release();
if(not fork()) {
# child
# Get a semaphore for this pid
my $child_sem = Semaphore->new($id,$count);
$child_sem->acquire();
system @ARGV;
$child_sem->release();
}
}
sub version {
# Returns: N/A
print join("\n",
"GNU $Global::progname $Global::version",
"Copyright (C) 2010 Ole Tange and Free Software Foundation, Inc.",
"License GPLv3+: GNU GPL version 3 or later <http://gnu.org/licenses/gpl.html>",
"This is free software: you are free to change and redistribute it.",
"GNU $Global::progname comes with no warranty.",
"",
"Web site: http://www.gnu.org/software/parallel\n"
);
}
sub usage {
# Returns: N/A
print "Usage:\n";
print "$Global::progname [options] [command [arguments]] < list_of_arguments)\n";
print "$Global::progname [options] [command [arguments]] ::: arguments\n";
print "$Global::progname [options] [command [arguments]] :::: argfile(s)\n";
print "\n";
print "See 'man $Global::progname' for the options\n";
}
sub die_usage {
usage();
exit(255);
}
sub debug {
# Returns: N/A
$Global::debug or return;
@_ = grep { defined $_ ? $_ : "" } @_;
print map {$_,"\n" } @_;
}
package Semaphore;
# This package provides a counting semaphore
#
# If a process dies without releasing the semaphore the next process
# that needs that entry will clean up dead semaphores
#
# The semaphores are stored in ~/.parallel/semaphores/id-<name> Each
# file in ~/.parallel/semaphores/id-<name>/ is the process ID of the
# process holding the entry. If the process dies, the entry can be
# taken by another process.
use Fcntl qw(:DEFAULT :flock);
sub new {
my $class = shift;
my $id = shift;
my $count = shift;
my $parallel_locks = $ENV{'HOME'}."/.parallel/semaphores";
-d $parallel_locks or mkdir $parallel_locks;
my $lockdir = "$parallel_locks/$id";
my $lockfile = $lockdir.".lock";
return bless {
'lockfile' => $lockfile,
'lockfh' => Symbol::gensym(),
'lockdir' => $lockdir,
'id' => $id,
'idfile' => $lockdir."/".$id,
'pid' => $$,
'pidfile' => $lockdir."/".$$,
'count' => $count
}, ref($class) || $class;
}
sub acquire {
my $self = shift;
while(1) {
$self->atomic_link_if_count_less_than() and last;
::debug("Remove dead locks");
my $lockdir = $self->{'lockdir'};
for my $d (<$lockdir/*>) {
$d =~ m:$lockdir/([0-9]+):o or next;
if(not kill 0, $1) {
::debug("Dead: $d");
unlink $d;
} else {
::debug("Alive: $d");
}
}
# try again
$self->atomic_link_if_count_less_than() and last;
sleep 1;
# TODO if timeout: last
}
::debug("got $self->{'pid'}");
}
sub release {
my ($self) = shift;
unlink $self->{'pidfile'};
if($self->nlinks() == 1) {
# This is the last link, so atomic cleanup
$self->lock();
if($self->nlinks() == 1) {
unlink $self->{'idfile'};
rmdir $self->{'lockdir'};
}
$self->unlock();
}
::debug("released $self->{'pid'}");
}
sub atomic_link_if_count_less_than {
# Link $file1 to $file2 if nlinks to $file1 < $count
my ($self) = shift;
my ($retval) = 0;
$self->lock();
if($self->nlinks() < $count) {
-d $self->{'lockdir'} || mkdir $self->{'lockdir'};
if(not -e $self->{'idfile'}) {
open (A, ">", $self->{'idfile'}) or die ">$self->{'idfile'}";
close A;
}
$retval = link $self->{'idfile'}, $self->{'pidfile'};
}
$self->unlock();
::debug("atomic $retval");
return $retval;
}
sub nlinks {
my $self = shift;
if(-e $self->{'idfile'}) {
return (stat(_))[3];
} else {
return 0;
}
}
sub lock {
my ($self) = shift;
open $self->{'lockfh'}, ">", $self->{'lockfile'}
or die "Can't open semaphore file $self->{'lockfile'}: $!";
chmod 0666, $self->{'lockfile'}; # assuming you want it a+rw
while(not flock $self->{'lockfh'}, LOCK_EX()|LOCK_NB()) {
::debug("Cannot lock $self->{'lockfile'}");
# TODO if timeout: last
sleep 1;
}
::debug("locked $self->{'lockfile'}");
}
sub unlock {
my $self = shift;
unlink $self->{'lockfile'};
close $self->{'lockfh'};
::debug("unlocked");
}