parallel/parallel
2007-09-10 22:28:03 +02:00

653 lines
17 KiB
Perl
Executable file

#!/usr/bin/perl -w
=head1 NAME
parallel - build and execute command lines from standard input in parallel
=head1 SYNOPSIS
B<parallel> [options] [command [arguments]] < list_of_arguments
=head1 DESCRIPTION
For each line of input B<parallel> will execute B<command> with the
line as arguments. If no B<command> is given the line of input is
executed. B<parallel> can often be used as a substitute for B<xargs>
or B<cat | sh>.
Several lines will be run in parallel.
=over 9
=item I<command>
Command to execute. If B<command> or the following arguments contain {}
every instance will be substituted with the input line. Setting a
command also invokes B<-f>.
If B<command> is given, B<parallel> will behave similar to B<xargs>. If
B<command> is not given B<parallel> will behave similar to B<cat | sh>.
=item I<-0>
Use NUL as delimiter. Normally input lines will end in \n
(newline). If they end in \0 (NUL), then use this option. It is useful
for processing filenames that may contain \n (newline).
=item B<-c>
Line is a command. The input line contains more than one argument or
the input line needs to be evaluated by the shell. This is the default
if B<command> is not set. Can be reversed with B<-f>.
=item B<-f>
Line is a filename. The input line contains a filename that will be
quoted so it is not evaluated by the shell. This is the default if
B<command> is set. Can be reversed with B<-c>.
=item B<-g>
Group output. Output from each jobs is grouped together and is only
printed when the command is finished. STDERR first followed by STDOUT.
B<-g> is the default. Can be reversed with B<-u>.
=item B<-j> I<N>
Run N jobs in parallel. Default is 10.
=item B<-j> +I<N>
Add N to the number of CPUs. Run this many jobs in parallel. For
compute intensive jobs I<-j +0> is useful as it will run
number-of-cpus jobs in parallel.
=item B<-j> -I<N>
Subtract N from the number of CPUs. Run this many jobs in parallel.
If the evaluated number is less than 1 then 1 will be used.
=item B<-j> I<N>%
Multiply N% with the number of CPUs. Run this many jobs in parallel.
If the evaluated number is less than 1 then 1 will be used.
=item B<-q>
Quote B<command>. This will quote the command line so special
characters are not interpreted by the shell. See the section
QUOTING. Most people will never need this. Quoting is disabled by
default.
=item B<-s>
Silent. The job to be run will not be printed. This is the default.
Can be reversed with B<-v>.
=item B<-u>
Ungroup output. Output is printed as soon as possible. This may cause
output from different commands to be mixed. Can be reversed with B<-g>.
=item B<-v>
Verbose. Print the job to be run. Can be reversed with B<-s>.
=back
=head1 EXAMPLE 1: Working as cat | sh. Ressource inexpensive jobs and evaluation
B<parallel> can work similar to B<cat | sh>.
A ressource inexpensive job is a job that takes very little CPU, disk
I/O and network I/O. Ping is an example of a ressource inexpensive
job. wget is too - if the webpages are small.
The content of the file jobs_to_run:
ping -c 1 10.0.0.1
wget http://status-server/status.cgi?ip=10.0.0.1
ping -c 1 10.0.0.2
wget http://status-server/status.cgi?ip=10.0.0.2
...
ping -c 1 10.0.0.255
wget http://status-server/status.cgi?ip=10.0.0.255
To run 100 processes simultaneously do:
B<parallel -j 100 < jobs_to_run>
As there is not a B<command> the option B<-c> is default because the
jobs needs to be evaluated by the shell.
=head1 EXAMPLE 2: Working as xargs -n1. Argument appending
B<parallel> can work similar to B<xargs -n1>.
To output all html files run:
B<find . -name '*.html' | parallel cat>
As there is a B<command> the option B<-f> is default because the
filenames needs to be protected from the shell in case a filename
contains special characters.
=head1 EXAMPLE 3: Compute intensive jobs and substitution
If ImageMagick is installed this will generate a thumbnail of a jpg
file:
B<convert -geometry 120 foo.jpg thumb_foo.jpg>
If the system has more than 1 CPU it can be run with number-of-cpus
jobs in parallel (-j +0). This will do that for all jpg files in a
directory:
B<ls *.jpg | parallel -j +0 convert -geometry 120 {} thumb_{}>
To do it recursively use B<find>:
B<find . -name '*.jpg' | parallel -j +0 convert -geometry 120 {} {}_thumb.jpg>
Notice how the argument has to start with {} as {} will include path
(e.g. running B<convert -geometry 120 ./foo/bar.jpg
thumb_./foo/bar.jpg> would clearly be wrong). It will result in files
like ./foo/bar.jpg_thumb.jpg. If that is not wanted this can fix it:
find . -name '*.jpg' | \
perl -pe 'chomp; $a=$_; s:/([^/]+)$:/thumb_$1:; $_="convert -geometry 120 $a $_\n"' | \
parallel -c -j +0
Unfortunately this will not work if the filenames contain special
characters (such as space or quotes). If you have B<ren> installed this
is a better solution:
find . -name '*.jpg' | parallel -j +0 convert -geometry 120 {} {}_thumb.jpg
find . -name '*_thumb.jpg' | ren 's:/([^/]+)_thumb.jpg$:/thumb_$1:'
=head1 EXAMPLE 4: Substitution and redirection
This will compare all files in the dir to the file foo and save the
diffs in corresponding .diff files:
B<ls | parallel diff {} foo ">>B<"{}.diff>
Quoting of > is necessary to postpone the redirection. Another
solution is to quote the whole command:
B<ls | parallel "diff {} foo >>B<{}.diff">
=head1 EXAMPLE 5: Composed commands
A job can consist of several commands. This will print the number of
files in each directory:
B<ls | parallel 'echo -n {}" "; ls {}|wc -l'>
=head1 QUOTING
For more advanced use quoting may be an issue. The following will
print the filename for each line that has exactly 2 columns:
B<perl -ne '/^\S+\s+\S+$/ and print $ARGV,"\n"' file>
This can be done by B<parallel> using:
B<ls | parallel "perl -ne '/^\\S+\\s+\\S+$/ and print \$ARGV,\"\\n\"'">
Notice how \'s, "'s, and $'s needs to be quoted. B<parallel> can do
the quoting by using option B<-q>:
B<ls | parallel -q perl -ne '/^\S+\s+\S+$/ and print $ARGV,"\n"'>
However, this means you cannot make the shell interpret special
characters. For example this B<will not work>:
B<ls | parallel -q "diff {} foo >>B<{}.diff">
because > needs to be interpreted by the shell.
B<Conclusion>: To avoid dealing with the quoting problems it may be
easier just to write a small script and have B<parallel> call that
script.
=head1 WHY NOT xargs/find -exec
B<xargs> and B<find -exec> offer some of the same possibilites as
B<parallel>. However, the support for running jobs in parallel is
limited.
B<find -exec> only works on files. So processing other input (such as
hosts or URLs) will require creating these inputs as files.
B<xargs> deals badly with special characters (such as space, ' and ") unless
B<-0> is specified. Many input generators are not optimized for using
B<NUL> as separator but are optimized for B<newline> as separator. E.g
B<head>, B<tail>, B<awk>, B<ls>, B<echo>, B<sed>, B<tar -v>, B<perl>
(-0 and \0 instead of \n), B<locate> (requires using -0), B<find>
(requires using -print0), B<grep> (requires user to use -z or -Z).
An example would be to list the number of lines in files containing GNU/Linux:
find . -type f -print0 | xargs -0 grep -Zl GNU/Linux | xargs -0n1 wc -l
find . -type f | parallel grep -l GNU/Linux | parallel wc -l
In the B<xargs> version the user will have to remember B<-Z> for B<grep>.
B<parallel> offers the option B<-0> for B<NUL> separated strings in the
rare event that the input strings contain \n (newline).
=head1 BUGS
Filenames beginning with '-' can cause some commands to give
unexpected results, as it will often be interpreted as an option.
=head1 REPORTING BUGS
Report bugs to <bug-parallel@tange.dk>.
=head1 AUTHOR
Copyright (C) 2007 Ole Tange, http://ole.tange.dk
=head1 LICENSE
Copyright (C) 2007 Free Software Foundation, Inc.
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
=head1 DEPENDENCIES
B<parallel> uses Perl, and the Perl modules Getopt::Std, IPC::Open3,
Symbol, IO::File, POSIX, and File::Temp.
=head1 SEE ALSO
B<find>(1), B<xargs>(1)
=cut
use IPC::Open3;
use Symbol qw(gensym);
use IO::File;
use POSIX ":sys_wait_h";
use File::Temp qw/ tempfile tempdir /;
use Getopt::Std;
my ($processes,$command);
getopts("0cdefgj:qsuv") || die $::Usage;
# Defaults:
$Global::debug = 0;
$processes = 10;
$command = undef;
$Global::verbose = 0;
$Global::grouped = 1;
$Global::quoting = 0;
$Global::input_is_filename = (defined @ARGV);
$/="\n";
$Global::debug = (defined $::opt_d);
if(defined $::opt_j) { $processes = compute_number_of_processes($::opt_j); }
if(defined $::opt_v) { $Global::verbose = 1; }
if(defined $::opt_s) { $Global::verbose = 0; }
if(defined $::opt_g) { $Global::grouped = 1; }
if(defined $::opt_u) { $Global::grouped = 0; }
if(defined $::opt_c) { $Global::input_is_filename = 0; }
if(defined $::opt_f) { $Global::input_is_filename = 1; }
if(defined $::opt_0) { $/ = "\0"; }
if(defined $::opt_q) { $Global::quoting = 1; }
if(@ARGV) {
if($Global::quoting) {
$command = join(" ", shell_quote(@ARGV));
} else {
$command = join(" ", @ARGV);
}
}
init_run_jobs();
while($args = <STDIN>) {
my $cmd_line = generate_command_line($command, $args);
queue_job($processes, $cmd_line);
}
drain_job_queue();
#
# Generating the command line
#
sub generate_command_line {
my $command = shift;
my $line = shift;
chomp($line);
my ($job_line,$arg);
if($Global::input_is_filename) {
($line) = (shell_quote($line));
}
if($command) {
$job_line = $command;
$arg = $line;
if($job_line =~ s/{}/$arg/g) {
# substituted {} with args
} else {
# append args
$job_line .= " $arg";
}
} else {
$job_line = $line;
}
return $job_line;
}
sub shell_quote {
# Quote the string so shell will not expand any special chars
my (@strings) = (@_);
my $arg;
for $arg (@strings) {
# what is the right thing to do about '-' at start of line?
# maybe substitute with './'
# so it is not regarded as -option.
$arg =~ s/\\/\\\\/g;
$arg =~ s/([\#\?\`\(\)\*\>\<\~\|\; \"\!\$\&\'])/\\$1/g;
$arg =~ s/([\002-\011\013-\032])/\\$1/g;
$arg =~ s/([\n])/'\n'/g; # filenames with '\n' is quoted using \'
}
return (@strings);
}
#
# Number of processes
#
sub compute_number_of_processes {
my $opt_j = shift;
my $processes = 0;
if(defined $opt_j) {
if($opt_j =~ /^\+(\d+)$/) {
# E.g. -j +2
my $j = $1;
$processes = $j + no_of_cpus();
} elsif ($opt_j =~ /^-(\d+)$/) {
# E.g. -j -2
my $j = $1;
$processes = no_of_cpus() - $j;
} elsif ($opt_j =~ /^(\d+)\%$/) {
my $j = $1;
$processes = no_of_cpus() * $j / 100;
} elsif ($opt_j =~ /^(\d+)$/) {
$processes = $1;
} else {
die $::Usage;
}
if($processes < 1) {
$processes = 1;
}
}
$Global::debug and print "Computed processes: ".(int $processes)."\n";
return int $processes;
}
sub no_of_cpus {
my $no_of_cpus =
(no_of_cpus_gnu_linux() ||
no_of_cpus_solaris());
if($no_of_cpus) {
return $no_of_cpus;
} else {
warn("Cannot figure out no of cpus. Using 1");
return 1;
}
}
sub no_of_cpus_gnu_linux {
my $no_of_cpus;
if(-e "/proc/cpuinfo") {
$no_of_cpus = 0;
open(IN,"cat /proc/cpuinfo|") || return undef;
while(<IN>) {
/^processor.*[:]/ and $no_of_cpus++;
}
close IN;
}
return $no_of_cpus;
}
sub no_of_cpus_solaris {
if(-x "/usr/sbin/psrinfo") {
my @psrinfo = `/usr/sbin/psrinfo`;
if($#psrinfo >= 0) {
return $#psrinfo +1;
}
}
if(-x "/usr/sbin/prtconf") {
my @prtconf = `/usr/sbin/prtconf | grep cpu..instance`;
if($#prtconf >= 0) {
return $#prtconf +1;
}
}
return undef;
}
#
# Running and printing the jobs
#
sub init_run_jobs {
# Remember the original STDOUT and STDERR
open $Global::original_stdout, ">&STDOUT" or die "Can't dup STDOUT: $!";
open $Global::original_stderr, ">&STDERR" or die "Can't dup STDERR: $!";
$Global::running_jobs=0;
}
sub queue_job {
my $processes = shift;
my $command = shift;
push @Global::command, $command;
if($Global::running_jobs <= $processes) {
$Global::debug and print "queing $command\n";
start_another_job();
$Global::debug and print "queed $command\n";
}
}
sub drain_job_queue {
while(1) {
if($Global::running_jobs == 0) { last }
$Global::debug and print "jobs running: $Global::running_jobs\n";
sleep 1;
}
}
sub run_jobs {
# Run the jobs in @Global::command with $processes simultaneously
my $processes = shift;
for my $i (1..$processes) {
# Start $processes jobs.
# When a job dies, the handler will take care of starting another
start_another_job();
}
while(1) {
if($Global::running_jobs == 0) { last }
$Global::debug and print "jobs running: $Global::running_jobs\n";
sleep 1;
}
}
sub start_another_job {
# Grab a job from @Global::command, start it
# and remember the pid, the STDOUT and the STDERR handles
# If no more jobs: do nothing
my $command = shift @Global::command;
if(defined $command) {
DoNotReap();
my %jobinfo = start_job($command);
$Global::running{$jobinfo{"pid"}} = \%jobinfo;
ReapIfNeeded();
}
}
sub start_job {
# Setup STDOUT and STDERR for a job and start it.
my $command = shift;
my ($pid,$out,$err,%out,%err,$outname,$errname,$name);
if($Global::grouped) {
# To group we create temporary files for STDOUT and STDERR
# Filehandles are global, so to not overwrite the filehandles use a hash with new keys
# To avoid the cleanup unlink the files immediately (but keep them open)
$outname = ++$Global::TmpFilename;
($out{$outname},$name) = tempfile(SUFFIX => ".par");
unlink $name;
$errname = ++$Global::TmpFilename;
($err{$errname},$name) = tempfile(SUFFIX => ".par");
unlink $name;
open STDOUT, '>&', $out{$outname} or die "Can't redirect STDOUT: $!";
open STDERR, '>&', $err{$errname} or die "Can't dup STDOUT: $!";
}
$Global::debug and print "starting: $command\n";
if($Global::verbose and not $Global::grouped) {
print STDOUT $command,"\n";
}
$Global::running_jobs++;
$pid = open3(gensym, ">&STDOUT", ">&STDERR", $command);
open STDOUT, ">&", $Global::original_stdout or die "Can't dup \$oldout: $!";
open STDERR, ">&", $Global::original_stderr or die "Can't dup \$oldout: $!";
if($Global::grouped) {
return ("pid" => $pid,
"out" => $out{$outname},
"err" => $err{$errname},
"command" => $command);
} else {
return ("pid" => $pid, "command" => $command);
}
}
sub print_job {
# Print the output of the jobs
# Only relevant for grouping
$Global::grouped or return;
my $fhs = shift;
my $out = $fhs->{out};
my $err = $fhs->{err};
my $command = $fhs->{command};
$Global::debug and print ">>job\n";
if($Global::verbose and $Global::grouped) {
print STDOUT $command,"\n";
# If STDOUT and STDERR is merged, we want the command to be printed first
# so flush to avoid STDOUT being buffered
flush STDOUT;
}
seek $_, 0, 0 for $out, $err;
if($Global::debug) {
while( <$err> ) { print STDERR "ERR: $_" }
while( <$out> ) { print STDOUT "OUT: $_" }
} else {
while( <$err> ) { print STDERR }
while( <$out> ) { print STDOUT }
}
$Global::debug and print "<<job\n";
close $out;
close $err;
}
#
# Signal handling stuff
#
sub CountSigChild {
$Global::SigChildCaught++;
}
sub DoNotReap {
# This will postpone SIGCHILD for sections that cannot be distracted by a dying child
# (Racecondition)
$SIG{CHLD} = \&CountSigChild;
}
sub ReapIfNeeded {
# Do the postponed SIGCHILDs if any and re-install normal reaper for SIGCHILD
# (Racecondition)
if($Global::SigChildCaught) {
$Global::SigChildCaught = 0;
Reaper();
}
$SIG{CHLD} = \&Reaper;
}
sub Reaper {
# A job finished.
# Print the output.
# Start another job
DoNotReap();
my $stiff;
$Global::debug and print "Reaper called\n";
while (($stiff = waitpid(-1, &WNOHANG)) > 0) {
print_job($Global::running{$stiff});
delete $Global::running{$stiff};
$Global::running_jobs--;
start_another_job();
}
ReapIfNeeded();
}
#
# Debugging
#
sub my_dump {
my @dump_this = (@_);
eval "use Data::Dump qw(dump);";
if ($@) {
# Data::Dump not installed
eval "use Data::Dumper;";
if ($@) {
my $err = "Neither Data::Dump nor Data::Dumper is installed\n".
"Not dumping output\n";
print STDERR $err;
return $err;
} else {
return Dumper(@dump_this);
}
} else {
eval "use Data::Dump qw(dump);";
return (Data::Dump::dump(@dump_this));
}
}
# Keep perl -w happy
$main::opt_u = $main::opt_c = $main::opt_f = $main::opt_q =
$main::opt_0 = $main::opt_s = $main::opt_v = $main::opt_g =
$main::opt_j = $main::opt_d=1;