parallel/parallel
2007-09-14 00:51:23 +02:00

710 lines
18 KiB
Perl
Executable file

#!/usr/bin/perl -w
=head1 NAME
parallel - build and execute command lines from standard input in parallel
=head1 SYNOPSIS
B<parallel> [options] [command [arguments]] < list_of_arguments
=head1 DESCRIPTION
For each line of input B<parallel> will execute B<command> with the
line as arguments. If no B<command> is given the line of input is
executed. B<parallel> can often be used as a substitute for B<xargs>
or B<cat | sh>.
Several lines will be run in parallel.
=over 9
=item I<command>
Command to execute. If B<command> or the following arguments contain {}
every instance will be substituted with the input line. Setting a
command also invokes B<-f>.
If B<command> is given, B<parallel> will behave similar to B<xargs>. If
B<command> is not given B<parallel> will behave similar to B<cat | sh>.
=item I<-0>
Use NUL as delimiter. Normally input lines will end in \n
(newline). If they end in \0 (NUL), then use this option. It is useful
for processing filenames that may contain \n (newline).
=item B<-c>
Line is a command. The input line contains more than one argument or
the input line needs to be evaluated by the shell. This is the default
if B<command> is not set. Can be reversed with B<-f>.
=item B<-f>
Line is a filename. The input line contains a filename that will be
quoted so it is not evaluated by the shell. This is the default if
B<command> is set. Can be reversed with B<-c>.
=item B<-g>
Group output. Output from each jobs is grouped together and is only
printed when the command is finished. STDERR first followed by STDOUT.
B<-g> is the default. Can be reversed with B<-u>.
=item B<-j> I<N>
Run N jobs in parallel. 0 means as many as possible. Default is 10.
=item B<-j> +I<N>
Add N to the number of CPUs. Run this many jobs in parallel. For
compute intensive jobs I<-j +0> is useful as it will run
number-of-cpus jobs in parallel.
=item B<-j> -I<N>
Subtract N from the number of CPUs. Run this many jobs in parallel.
If the evaluated number is less than 1 then 1 will be used.
=item B<-j> I<N>%
Multiply N% with the number of CPUs. Run this many jobs in parallel.
If the evaluated number is less than 1 then 1 will be used.
=item B<-q>
Quote B<command>. This will quote the command line so special
characters are not interpreted by the shell. See the section
QUOTING. Most people will never need this. Quoting is disabled by
default.
=item B<-s>
Silent. The job to be run will not be printed. This is the default.
Can be reversed with B<-v>.
=item B<-u>
Ungroup output. Output is printed as soon as possible. This may cause
output from different commands to be mixed. Can be reversed with B<-g>.
=item B<-v>
Verbose. Print the job to be run. Can be reversed with B<-s>.
=back
=head1 EXAMPLE 1: Working as cat | sh. Ressource inexpensive jobs and evaluation
B<parallel> can work similar to B<cat | sh>.
A ressource inexpensive job is a job that takes very little CPU, disk
I/O and network I/O. Ping is an example of a ressource inexpensive
job. wget is too - if the webpages are small.
The content of the file jobs_to_run:
ping -c 1 10.0.0.1
wget http://status-server/status.cgi?ip=10.0.0.1
ping -c 1 10.0.0.2
wget http://status-server/status.cgi?ip=10.0.0.2
...
ping -c 1 10.0.0.255
wget http://status-server/status.cgi?ip=10.0.0.255
To run 100 processes simultaneously do:
B<parallel -j 100 < jobs_to_run>
As there is not a B<command> the option B<-c> is default because the
jobs needs to be evaluated by the shell.
=head1 EXAMPLE 2: Working as xargs -n1. Argument appending
B<parallel> can work similar to B<xargs -n1>.
To output all html files run:
B<find . -name '*.html' | parallel cat>
As there is a B<command> the option B<-f> is default because the
filenames needs to be protected from the shell in case a filename
contains special characters.
=head1 EXAMPLE 3: Compute intensive jobs and substitution
If ImageMagick is installed this will generate a thumbnail of a jpg
file:
B<convert -geometry 120 foo.jpg thumb_foo.jpg>
If the system has more than 1 CPU it can be run with number-of-cpus
jobs in parallel (-j +0). This will do that for all jpg files in a
directory:
B<ls *.jpg | parallel -j +0 convert -geometry 120 {} thumb_{}>
To do it recursively use B<find>:
B<find . -name '*.jpg' | parallel -j +0 convert -geometry 120 {} {}_thumb.jpg>
Notice how the argument has to start with {} as {} will include path
(e.g. running B<convert -geometry 120 ./foo/bar.jpg
thumb_./foo/bar.jpg> would clearly be wrong). It will result in files
like ./foo/bar.jpg_thumb.jpg. If that is not wanted this can fix it:
find . -name '*.jpg' | \
perl -pe 'chomp; $a=$_; s:/([^/]+)$:/thumb_$1:; $_="convert -geometry 120 $a $_\n"' | \
parallel -c -j +0
Unfortunately this will not work if the filenames contain special
characters (such as space or quotes). If you have B<ren> installed this
is a better solution:
find . -name '*.jpg' | parallel -j +0 convert -geometry 120 {} {}_thumb.jpg
find . -name '*_thumb.jpg' | ren 's:/([^/]+)_thumb.jpg$:/thumb_$1:'
=head1 EXAMPLE 4: Substitution and redirection
This will compare all files in the dir to the file foo and save the
diffs in corresponding .diff files:
B<ls | parallel diff {} foo ">>B<"{}.diff>
Quoting of > is necessary to postpone the redirection. Another
solution is to quote the whole command:
B<ls | parallel "diff {} foo >>B<{}.diff">
=head1 EXAMPLE 5: Composed commands
A job can consist of several commands. This will print the number of
files in each directory:
B<ls | parallel 'echo -n {}" "; ls {}|wc -l'>
=head1 QUOTING
For more advanced use quoting may be an issue. The following will
print the filename for each line that has exactly 2 columns:
B<perl -ne '/^\S+\s+\S+$/ and print $ARGV,"\n"' file>
This can be done by B<parallel> using:
B<ls | parallel "perl -ne '/^\\S+\\s+\\S+$/ and print \$ARGV,\"\\n\"'">
Notice how \'s, "'s, and $'s needs to be quoted. B<parallel> can do
the quoting by using option B<-q>:
B<ls | parallel -q perl -ne '/^\S+\s+\S+$/ and print $ARGV,"\n"'>
However, this means you cannot make the shell interpret special
characters. For example this B<will not work>:
B<ls | parallel -q "diff {} foo >>B<{}.diff">
because > needs to be interpreted by the shell.
B<Conclusion>: To avoid dealing with the quoting problems it may be
easier just to write a small script and have B<parallel> call that
script.
=head1 DIFFERENCES BETWEEN xargs/find -exec AND parallel
B<xargs> and B<find -exec> offer some of the same possibilites as
B<parallel>.
B<find -exec> only works on files. So processing other input (such as
hosts or URLs) will require creating these inputs as files. B<find
-exec> has no support for running commands in parallel.
B<xargs> deals badly with special characters (such as space, ' and ") unless
B<-0> is specified. Many input generators are not optimized for using
B<NUL> as separator but are optimized for B<newline> as separator. E.g
B<head>, B<tail>, B<awk>, B<ls>, B<echo>, B<sed>, B<tar -v>, B<perl>
(-0 and \0 instead of \n), B<locate> (requires using -0), B<find>
(requires using -print0), B<grep> (requires user to use -z or -Z).
The input I<can> be fixed for B<xargs> with:
tr '\n' '\0'
So B<parallel>'s newline separation can be emulated with:
cat | tr '\n' '\0' | xargs -0 -n1 I<command>
B<xargs> can run a given number of jobs in parallel, but has no
support for running no_of_cpus jobs in parallel.
B<xargs> has no support for grouping the output, so output may run
together, so the first half of a line is from one process and the last
half of the line is from another process.
If no command is given to B<xargs> it defaults to /bin/echo. So the
B<cat | sh> functionality is missing.
=head1 BUGS
Filenames beginning with '-' can cause some commands to give
unexpected results, as it will often be interpreted as an option.
=head1 REPORTING BUGS
Report bugs to <bug-parallel@tange.dk>.
=head1 AUTHOR
Copyright (C) 2007 Ole Tange, http://ole.tange.dk
=head1 LICENSE
Copyright (C) 2007 Free Software Foundation, Inc.
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
=head1 DEPENDENCIES
B<parallel> uses Perl, and the Perl modules Getopt::Std, IPC::Open3,
Symbol, IO::File, POSIX, and File::Temp.
=head1 SEE ALSO
B<find>(1), B<xargs>(1)
=cut
use IPC::Open3;
use Symbol qw(gensym);
use IO::File;
use POSIX ":sys_wait_h";
use File::Temp qw/ tempfile tempdir /;
use Getopt::Std;
my ($processes,$command);
getopts("0cdefgj:qsuv") || die_usage();
# Defaults:
$Global::debug = 0;
$processes = 10;
$command = undef;
$Global::verbose = 0;
$Global::grouped = 1;
$Global::quoting = 0;
$Global::input_is_filename = (@ARGV);
$/="\n";
$Global::debug = (defined $::opt_d);
if(defined $::opt_j) { $processes = compute_number_of_processes($::opt_j); }
if(defined $::opt_v) { $Global::verbose = 1; }
if(defined $::opt_s) { $Global::verbose = 0; }
if(defined $::opt_g) { $Global::grouped = 1; }
if(defined $::opt_u) { $Global::grouped = 0; }
if(defined $::opt_c) { $Global::input_is_filename = 0; }
if(defined $::opt_f) { $Global::input_is_filename = 1; }
if(defined $::opt_0) { $/ = "\0"; }
if(defined $::opt_q) { $Global::quoting = 1; }
if(@ARGV) {
if($Global::quoting) {
$command = join(" ", shell_quote(@ARGV));
} else {
$command = join(" ", @ARGV);
}
}
init_run_jobs();
while($args = <STDIN>) {
my $cmd_line = generate_command_line($command, $args);
queue_job($processes, $cmd_line);
}
drain_job_queue();
#
# Generating the command line
#
sub generate_command_line {
my $command = shift;
my $line = shift;
chomp($line);
my ($job_line,$arg);
if($Global::input_is_filename) {
($line) = (shell_quote($line));
}
if($command) {
$job_line = $command;
$arg = $line;
if($job_line =~ s/{}/$arg/g) {
# substituted {} with args
} else {
# append args
$job_line .= " $arg";
}
} else {
$job_line = $line;
}
return $job_line;
}
sub shell_quote {
# Quote the string so shell will not expand any special chars
my (@strings) = (@_);
my $arg;
for $arg (@strings) {
# what is the right thing to do about '-' at start of line?
# maybe substitute with './'
# so it is not regarded as -option.
$arg =~ s/\\/\\\\/g;
$arg =~ s/([\#\?\`\(\)\*\>\<\~\|\; \"\!\$\&\'])/\\$1/g;
$arg =~ s/([\002-\011\013-\032])/\\$1/g;
$arg =~ s/([\n])/'\n'/g; # filenames with '\n' is quoted using \'
}
return (@strings);
}
#
# Number of processes
#
sub compute_number_of_processes {
my $opt_j = shift;
my $processes = 0;
if(defined $opt_j) {
if($opt_j =~ /^\+(\d+)$/) {
# E.g. -j +2
my $j = $1;
$processes = $j + no_of_cpus();
} elsif ($opt_j =~ /^-(\d+)$/) {
# E.g. -j -2
my $j = $1;
$processes = no_of_cpus() - $j;
} elsif ($opt_j =~ /^(\d+)\%$/) {
my $j = $1;
$processes = no_of_cpus() * $j / 100;
} elsif ($opt_j =~ /^(\d+)$/) {
$processes = $1;
if($processes == 0) {
# -j 0 = infinity (or at least close)
$processes = 2**31;
}
} else {
die_usage();
}
if($processes < 1) {
$processes = 1;
}
}
my $free_handles = compute_no_of_free_filehandles();
if($processes > $free_handles / 2) {
# Every simultaneous process uses 2 filehandles when grouping
# perl uses 7 for something?
$processes = int (($free_handles -7) / 2);
print STDERR ("Warning: Only enough filehandles to run ",
$processes, " jobs in parallel\n");
}
debug("Computed processes: ".(int $processes)."\n");
return int $processes;
}
sub compute_no_of_free_filehandles {
my $i=1;
my %fh;
while(open($fh{$i},"</dev/null")) { $i++ }
for (keys %fh) { close $fh{$_} }
debug("Number of free handles: ".."\n");
return $i;
}
sub no_of_cpus {
my $no_of_cpus =
(no_of_cpus_gnu_linux() ||
no_of_cpus_solaris());
if($no_of_cpus) {
return $no_of_cpus;
} else {
warn("Cannot figure out no of cpus. Using 1");
return 1;
}
}
sub no_of_cpus_gnu_linux {
my $no_of_cpus;
if(-e "/proc/cpuinfo") {
$no_of_cpus = 0;
open(IN,"cat /proc/cpuinfo|") || return undef;
while(<IN>) {
/^processor.*[:]/ and $no_of_cpus++;
}
close IN;
}
return $no_of_cpus;
}
sub no_of_cpus_solaris {
if(-x "/usr/sbin/psrinfo") {
my @psrinfo = `/usr/sbin/psrinfo`;
if($#psrinfo >= 0) {
return $#psrinfo +1;
}
}
if(-x "/usr/sbin/prtconf") {
my @prtconf = `/usr/sbin/prtconf | grep cpu..instance`;
if($#prtconf >= 0) {
return $#prtconf +1;
}
}
return undef;
}
#
# Running and printing the jobs
#
sub init_run_jobs {
# Remember the original STDOUT and STDERR
open $Global::original_stdout, ">&STDOUT" or die "Can't dup STDOUT: $!";
open $Global::original_stderr, ">&STDERR" or die "Can't dup STDERR: $!";
$Global::running_jobs=0;
}
sub queue_job {
my $processes = shift;
my $command = shift;
if($command =~ /\S/) {
# Only run commands - not empty lines
push @Global::command, $command;
}
if($Global::running_jobs < $processes) {
debug("queing $command\n");
start_another_job();
debug("queued $command\n");
}
}
sub drain_job_queue {
while(1) {
if($Global::running_jobs == 0) { last }
debug("jobs running: $Global::running_jobs\n");
sleep 1;
}
}
sub run_jobs {
# Run the jobs in @Global::command with $processes simultaneously
my $processes = shift;
for my $i (1..$processes) {
# Start $processes jobs.
# When a job dies, the handler will take care of starting another
start_another_job();
}
while(1) {
if($Global::running_jobs == 0) { last }
debug("jobs running: $Global::running_jobs\n");
sleep 1;
}
}
sub start_another_job {
# Grab a job from @Global::command, start it
# and remember the pid, the STDOUT and the STDERR handles
# If no more jobs: do nothing
my $command = shift @Global::command;
if(defined $command) {
DoNotReap();
my %jobinfo = start_job($command);
$Global::running{$jobinfo{"pid"}} = \%jobinfo;
ReapIfNeeded();
}
}
sub start_job {
# Setup STDOUT and STDERR for a job and start it.
my $command = shift;
my ($pid,$out,$err,%out,%err,$outname,$errname,$name);
if($Global::grouped) {
# To group we create temporary files for STDOUT and STDERR
# Filehandles are global, so to not overwrite the filehandles use a hash with new keys
# To avoid the cleanup unlink the files immediately (but keep them open)
$outname = ++$Global::TmpFilename;
($out{$outname},$name) = tempfile(SUFFIX => ".par");
unlink $name;
$errname = ++$Global::TmpFilename;
($err{$errname},$name) = tempfile(SUFFIX => ".par");
unlink $name;
open STDOUT, '>&', $out{$outname} or die "Can't redirect STDOUT: $!";
open STDERR, '>&', $err{$errname} or die "Can't dup STDOUT: $!";
}
if($Global::verbose and not $Global::grouped) {
print STDOUT $command,"\n";
}
$Global::running_jobs++;
debug("starting: $command\n");
$pid = open3(gensym, ">&STDOUT", ">&STDERR", $command) ||
die("open3 failed. Report a bug\n");
debug("started: $command\n");
open STDOUT, ">&", $Global::original_stdout or die "Can't dup \$oldout: $!";
open STDERR, ">&", $Global::original_stderr or die "Can't dup \$oldout: $!";
if($Global::grouped) {
return ("pid" => $pid,
"out" => $out{$outname},
"err" => $err{$errname},
"command" => $command);
} else {
return ("pid" => $pid, "command" => $command);
}
}
sub print_job {
# Print the output of the jobs
# Only relevant for grouping
$Global::grouped or return;
my $fhs = shift;
my $out = $fhs->{out};
my $err = $fhs->{err};
my $command = $fhs->{command};
debug(">>job\n");
if($Global::verbose and $Global::grouped) {
print STDOUT $command,"\n";
# If STDOUT and STDERR is merged, we want the command to be printed first
# so flush to avoid STDOUT being buffered
flush STDOUT;
}
seek $_, 0, 0 for $out, $err;
if($Global::debug) {
while( <$err> ) { print STDERR "ERR: $_" }
while( <$out> ) { print STDOUT "OUT: $_" }
} else {
while( <$err> ) { print STDERR }
while( <$out> ) { print STDOUT }
}
debug("<<job\n");
close $out;
close $err;
}
#
# Signal handling stuff
#
sub CountSigChild {
$Global::SigChildCaught++;
}
sub DoNotReap {
# This will postpone SIGCHILD for sections that cannot be distracted by a dying child
# (Racecondition)
$SIG{CHLD} = \&CountSigChild;
}
sub ReapIfNeeded {
# Do the postponed SIGCHILDs if any and re-install normal reaper for SIGCHILD
# (Racecondition)
if($Global::SigChildCaught) {
$Global::SigChildCaught = 0;
Reaper();
}
$SIG{CHLD} = \&Reaper;
}
sub Reaper {
# A job finished.
# Print the output.
# Start another job
DoNotReap();
my $stiff;
debug("Reaper called\n");
while (($stiff = waitpid(-1, &WNOHANG)) > 0) {
print_job($Global::running{$stiff});
delete $Global::running{$stiff};
$Global::running_jobs--;
start_another_job();
}
ReapIfNeeded();
}
#
# Usage
#
sub die_usage {
usage();
exit(1);
}
sub usage {
print "Usage:\n";
print "parallel [options] [command [arguments]] < list_of_arguments\n";
}
#
# Debugging
#
sub debug {
$Global::debug or return;
if($Global::original_stdout) {
print $Global::original_stdout @_;
} else {
print @_;
}
}
sub my_dump {
my @dump_this = (@_);
eval "use Data::Dump qw(dump);";
if ($@) {
# Data::Dump not installed
eval "use Data::Dumper;";
if ($@) {
my $err = "Neither Data::Dump nor Data::Dumper is installed\n".
"Not dumping output\n";
print STDERR $err;
return $err;
} else {
return Dumper(@dump_this);
}
} else {
eval "use Data::Dump qw(dump);";
return (Data::Dump::dump(@dump_this));
}
}
# Keep perl -w happy
$main::opt_u = $main::opt_c = $main::opt_f = $main::opt_q =
$main::opt_0 = $main::opt_s = $main::opt_v = $main::opt_g =
$main::opt_j = $main::opt_d=1;