#!/usr/bin/perl -w =head1 NAME sem - semaphore for executing shell command lines in parallel =head1 SYNOPSIS B [--fg] [--id ] [--timeout ] [-j ] [--wait] command =head1 DESCRIPTION GNU B is an alias for GNU B. It works as a tool for executing shell commands in parallel. GNU B acts as a counting semaphore. When GNU B is called with command it will start the command in the background. When I number of commands are running in the background, GNU B will wait for one of these to complete before starting another command. Before looking at the options you may want to check out the examples after the list of options. That will give you an idea of what GNU B is capable of. =head1 OPTIONS =over 9 =item I Command to execute. The command may be followed by arguments for the command. =item B<--count> I =item B<-j> I Run up to N commands in parallel. Default is 1 thus acting like a mutex. =item B<--id> I =item B<-i> I Use B as the name of the semaphore. Default is the name of the controlling tty (output from B). The default normally works as expected when used interactively, but when used in a script I should be set. $$ is often a good value. =item B<--fg> Do not put command in background. =item B<--timeout> I (not implemented) =item B<-t> I (not implemented) If the semaphore is not released within I seconds, take it anyway. =item B<--wait> =item B<-w> Wait for all commands to complete. =back =head1 EXAMPLE: Gzipping *.log for i in `ls *.log` ; do echo $i sem gzip $i ";" echo done done sem --wait =head1 BUGS Quoting and composed commands are not working. =head1 REPORTING BUGS Report bugs to . =head1 AUTHOR Copyright (C) 2010 Ole Tange, http://ole.tange.dk and Free Software Foundation, Inc. =head1 LICENSE Copyright (C) 2010 Free Software Foundation, Inc. This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation; either version 3 of the License, or at your option any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program. If not, see . =head2 Documentation license I Permission is granted to copy, distribute and/or modify this documentation under the terms of the GNU Free Documentation License, Version 1.3 or any later version published by the Free Software Foundation; with no Invariant Sections, with no Front-Cover Texts, and with no Back-Cover Texts. A copy of the license is included in the file fdl.txt. =head2 Documentation license II You are free: =over 9 =item B to copy, distribute and transmit the work =item B to adapt the work =back Under the following conditions: =over 9 =item B You must attribute the work in the manner specified by the author or licensor (but not in any way that suggests that they endorse you or your use of the work). =item B If you alter, transform, or build upon this work, you may distribute the resulting work only under the same, similar or a compatible license. =back With the understanding that: =over 9 =item B Any of the above conditions can be waived if you get permission from the copyright holder. =item B Where the work or any of its elements is in the public domain under applicable law, that status is in no way affected by the license. =item B In no way are any of the following rights affected by the license: =over 2 =item * Your fair dealing or fair use rights, or other applicable copyright exceptions and limitations; =item * The author's moral rights; =item * Rights other persons may have either in the work itself or in how the work is used, such as publicity or privacy rights. =back =back =over 9 =item B For any reuse or distribution, you must make clear to others the license terms of this work. =back A copy of the full license is included in the file as cc-by-sa.txt. =head1 DEPENDENCIES GNU B uses Perl, and the Perl modules Getopt::Long, Symbol, Fcntl. =head1 SEE ALSO B(1) =cut use strict; use Symbol qw(gensym); use Getopt::Long; Getopt::Long::Configure ("bundling","require_order"); GetOptions("debug|D" => \$::opt_D, "id|i=s" => \$::opt_id, "count|j=i" => \$::opt_count, "fg" => \$::opt_fg, "timeout|t=i" => \$::opt_timeout, "version" => \$::opt_version, "wait|w" => \$::opt_wait, ) || die_usage(); $Global::debug = $::opt_D; $Global::version = 20100814; $Global::progname = 'sem'; my $count = 1; # Default 1 = mutex if($::opt_count) { $count = $::opt_count + 1; } if($::opt_wait) { $count = 1; } my $id = $::opt_id; my $fg = $::opt_fg || $::opt_wait; $::opt_timeout = $::opt_timeout; if(defined $::opt_version) { version(); } if(not defined $id) { # $id = getppid(); # does not work with: # find . -name '*linux*' -exec sem -j1000 "sleep 3; echo `tty` '{}'" \; ; sem --wait echo done $id = `tty`; } $id = "id-$id"; $id=~s/([^-_a-z0-9])/unpack("H*",$1)/ige; # Convert non-word chars to hex my $sem = Semaphore->new($id,$count); $sem->acquire(); debug("run"); if($fg) { system @ARGV; $sem->release(); } else { # If run in the background, the PID will change # therefore release and re-acquire the semaphore $sem->release(); if(not fork()) { # child # Get a semaphore for this pid my $child_sem = Semaphore->new($id,$count); $child_sem->acquire(); system @ARGV; $child_sem->release(); } } sub version { # Returns: N/A print join("\n", "GNU $Global::progname $Global::version", "Copyright (C) 2010 Ole Tange and Free Software Foundation, Inc.", "License GPLv3+: GNU GPL version 3 or later ", "This is free software: you are free to change and redistribute it.", "GNU $Global::progname comes with no warranty.", "", "Web site: http://www.gnu.org/software/parallel\n" ); } sub usage { # Returns: N/A print "Usage:\n"; print "$Global::progname [options] [command [arguments]] < list_of_arguments)\n"; print "$Global::progname [options] [command [arguments]] ::: arguments\n"; print "$Global::progname [options] [command [arguments]] :::: argfile(s)\n"; print "\n"; print "See 'man $Global::progname' for the options\n"; } sub die_usage { usage(); exit(255); } sub debug { # Returns: N/A $Global::debug or return; @_ = grep { defined $_ ? $_ : "" } @_; print map {$_,"\n" } @_; } package Semaphore; # This package provides a counting semaphore # # If a process dies without releasing the semaphore the next process # that needs that entry will clean up dead semaphores # # The semaphores are stored in ~/.parallel/semaphores/id- Each # file in ~/.parallel/semaphores/id-/ is the process ID of the # process holding the entry. If the process dies, the entry can be # taken by another process. use Fcntl qw(:DEFAULT :flock); sub new { my $class = shift; my $id = shift; my $count = shift; my $parallel_locks = $ENV{'HOME'}."/.parallel/semaphores"; -d $parallel_locks or mkdir $parallel_locks; my $lockdir = "$parallel_locks/$id"; my $lockfile = $lockdir.".lock"; return bless { 'lockfile' => $lockfile, 'lockfh' => Symbol::gensym(), 'lockdir' => $lockdir, 'id' => $id, 'idfile' => $lockdir."/".$id, 'pid' => $$, 'pidfile' => $lockdir."/".$$, 'count' => $count }, ref($class) || $class; } sub acquire { my $self = shift; while(1) { $self->atomic_link_if_count_less_than() and last; ::debug("Remove dead locks"); my $lockdir = $self->{'lockdir'}; for my $d (<$lockdir/*>) { $d =~ m:$lockdir/([0-9]+):o or next; if(not kill 0, $1) { ::debug("Dead: $d"); unlink $d; } else { ::debug("Alive: $d"); } } # try again $self->atomic_link_if_count_less_than() and last; sleep 1; # TODO if timeout: last } ::debug("got $self->{'pid'}"); } sub release { my ($self) = shift; unlink $self->{'pidfile'}; if($self->nlinks() == 1) { # This is the last link, so atomic cleanup $self->lock(); if($self->nlinks() == 1) { unlink $self->{'idfile'}; rmdir $self->{'lockdir'}; } $self->unlock(); } ::debug("released $self->{'pid'}"); } sub atomic_link_if_count_less_than { # Link $file1 to $file2 if nlinks to $file1 < $count my ($self) = shift; my ($retval) = 0; $self->lock(); if($self->nlinks() < $count) { -d $self->{'lockdir'} || mkdir $self->{'lockdir'}; if(not -e $self->{'idfile'}) { open (A, ">", $self->{'idfile'}) or die ">$self->{'idfile'}"; close A; } $retval = link $self->{'idfile'}, $self->{'pidfile'}; } $self->unlock(); ::debug("atomic $retval"); return $retval; } sub nlinks { my $self = shift; if(-e $self->{'idfile'}) { return (stat(_))[3]; } else { return 0; } } sub lock { my ($self) = shift; open $self->{'lockfh'}, ">", $self->{'lockfile'} or die "Can't open semaphore file $self->{'lockfile'}: $!"; chmod 0666, $self->{'lockfile'}; # assuming you want it a+rw while(not flock $self->{'lockfh'}, LOCK_EX()|LOCK_NB()) { ::debug("Cannot lock $self->{'lockfile'}"); # TODO if timeout: last sleep 1; } ::debug("locked $self->{'lockfile'}"); } sub unlock { my $self = shift; unlink $self->{'lockfile'}; close $self->{'lockfh'}; ::debug("unlocked"); }