Added -k: Keep sequence of output same as the order of input.

If jobs 1 2 3 4 end in the sequence 3 1 4 2 the output will still be 1 2 3 4.
This commit is contained in:
Ole Tange 2009-09-04 07:23:57 +02:00
parent 84118cb14b
commit 2d930320ad
5 changed files with 290 additions and 73 deletions

189
parallel
View file

@ -6,7 +6,7 @@ parallel - build and execute command lines from standard input in parallel
=head1 SYNOPSIS
B<parallel> [-0cfgqsuvxX] [-j num] [command [arguments]] < list_of_arguments
B<parallel> [-0cfgkqsuvxX] [-j num] [command [arguments]] < list_of_arguments
=head1 DESCRIPTION
@ -72,6 +72,11 @@ If the evaluated number is less than 1 then 1 will be used.
Multiply N% with the number of CPUs. Run this many jobs in parallel.
If the evaluated number is less than 1 then 1 will be used.
=item B<-k>
Keep sequence of output same as the order of input. If jobs 1 2 3 4
end in the sequence 3 1 4 2 the output will still be 1 2 3 4.
=item B<-q>
Quote B<command>. This will quote the command line so special
@ -223,6 +228,26 @@ B<seq -f %04g 0 9999 | parallel -X rm pict{}.jpg>
This will also only run B<rm> as many times needed to keep the command
line length short enough.
=head1 EXAMPLE 7: Keep order of output same as order of input
Normally the output of a job will be printed as soon as it
completes. Sometimes you want the order of the output to remain the
same as the order of the input. B<-k> will make sure the order of
output will be in the same order as input even if later jobs end
before earlier jobs.
If you have a directory with subdirectories that contain different
amount of files running:
B<ls | sort | parallel -v "ls {} | wc">
will give the output of each dir, but it will be sorted accoring to
which job completed first.
To keep the order the same as input run:
B<ls | sort | parallel -kv "ls {} | wc">
=head1 QUOTING
@ -255,6 +280,15 @@ B<sh: -c: line 0: syntax error near unexpected token>
then you might try using B<-q>.
If you are using B<bash> process substitution like B<<(cat foo)> then
you may try B<-q> and prepending B<command> with B<bash -c>:
B<ls | parallel -q bash -c 'wc -c <(echo {})'>
Or for substituting output:
B<ls | parallel -q bash -c 'tar c {} | tee >>B<(gzip >>B<{}.tar.gz) | bzip2 >>B<{}.tar.bz2'>
B<Conclusion>: To avoid dealing with the quoting problems it may be
easier just to write a small script and have B<parallel> call that
script.
@ -269,20 +303,17 @@ B<find -exec> only works on files. So processing other input (such as
hosts or URLs) will require creating these inputs as files. B<find
-exec> has no support for running commands in parallel.
B<xargs> deals badly with special characters (such as space, ' and ") unless
B<-0> is specified. Many input generators are not optimized for using
B<NUL> as separator but are optimized for B<newline> as separator. E.g
B<head>, B<tail>, B<awk>, B<ls>, B<echo>, B<sed>, B<tar -v>, B<perl>
(-0 and \0 instead of \n), B<locate> (requires using -0), B<find>
(requires using -print0), B<grep> (requires user to use -z or -Z).
The input I<can> be fixed for B<xargs> with:
tr '\n' '\0'
B<xargs> deals badly with special characters (such as space, ' and ")
unless B<-0> or B<-d "\n"> is specified. Many input generators are not
optimized for using B<NUL> as separator but are optimized for
B<newline> as separator. E.g B<head>, B<tail>, B<awk>, B<ls>, B<echo>,
B<sed>, B<tar -v>, B<perl> (-0 and \0 instead of \n), B<locate>
(requires using -0), B<find> (requires using -print0), B<grep>
(requires user to use -z or -Z).
So B<parallel>'s newline separation can be emulated with:
cat | tr '\n' '\0' | xargs -0 -n1 I<command>
B<cat | xargs -d "\n" -n1 I<command>>
B<xargs> can run a given number of jobs in parallel, but has no
support for running no_of_cpus jobs in parallel.
@ -291,27 +322,45 @@ B<xargs> has no support for grouping the output, therefore output may
run together, e.g. the first half of a line is from one process and
the last half of the line is from another process.
If no command is given to B<xargs> it defaults to /bin/echo. So the
B<cat | sh> functionality is missing.
B<xargs> has no support for keeping the order of the output, therefore
output of the second job cannot be postponed till the first job is done.
B<xargs> has no support for context replace, so you will have to create the
arguments.
If you use a replace string in B<xargs> (B<-I>) you can not force
B<xargs> to use more than one argument.
Quoting in B<xargs> works like B<-q> in B<parallel>. This means
composed commands and redirection is impossible: B<ls | parallel "wc
{} >> B<{}.wc"> or B<ls | parallel "echo {}; ls {}|wc"> cannot be done using
B<xargs>.
composed commands and redirection requires using B<bash -c>.
B<ls | parallel "wc {} >> B<{}.wc">
becomes
B<ls | xargs -d "\n" -P10 -I {} bash -c "wc {} >>B< {}.wc">
and
B<ls | parallel "echo {}; ls {}|wc">
becomes
B<ls | xargs -d "\n" -P10 -I {} bash -c "echo {}; ls {}|wc">
=head1 BUGS
Filenames beginning with '-' can cause some commands to give
unexpected results, as it will often be interpreted as an option.
Some Bash specific constructs like '<(cmd)' do not work. They can be
fixed by prepending '/bin/bash -c'. How much slower will that be?
=head1 REPORTING BUGS
Report bugs to <bug-parallel@tange.dk>.
=head1 IDEAS
xargs dropin-replacement.
Implement the missing --features
=head1 AUTHOR
@ -360,14 +409,15 @@ use Getopt::Std;
use strict;
my ($processes,$command);
getopts("0cdefgj:qsuvxX") || die_usage();
getopts("0cdfgj:kqsuvxX") || die_usage();
# Defaults:
$Global::debug = 0;
$processes = 10;
$Global::processes_to_run = 10;
$command = undef;
$Global::verbose = 0;
$Global::grouped = 1;
$Global::keeporder = 0;
$Global::quoting = 0;
$Global::input_is_filename = (@ARGV);
$/="\n";
@ -383,6 +433,7 @@ if(defined $::opt_X) {
}
if(defined $::opt_v) { $Global::verbose = 1; }
if(defined $::opt_s) { $Global::verbose = 0; }
if(defined $::opt_k) { $Global::keeporder = 1; }
if(defined $::opt_g) { $Global::grouped = 1; }
if(defined $::opt_u) { $Global::grouped = 0; }
if(defined $::opt_c) { $Global::input_is_filename = 0; }
@ -398,17 +449,13 @@ if(@ARGV) {
}
# Needs to be done after setting $Global::command and $Global::command_line_max_len
# as '-x' influences the number of commands that needs to be run
if(defined $::opt_j) { $processes = compute_number_of_processes($::opt_j); }
if(defined $::opt_j) { $Global::processes_to_run = compute_number_of_processes($::opt_j); }
$Global::job_end_sequence=1;
init_run_jobs();
DoNotReap();
while($Global::running_jobs < $processes
and
start_another_job()) {
# skip
}
start_more_jobs();
ReapIfNeeded();
drain_job_queue();
@ -646,7 +693,7 @@ sub processes_available_by_system_limit {
"Limiting to ", $system_limit, " jobs in parallel.\n");
}
# Cleanup: Close the files
for (keys %fh) { close $fh{$_} }
for (values %fh) { close $_ }
# Cleanup: Kill the children
for my $pid (@children) {
kill 15, $pid;
@ -658,6 +705,24 @@ sub processes_available_by_system_limit {
return $system_limit;
}
sub enough_file_handles {
# check that we have enough filehandles available for starting
# another job
if($Global::grouped) {
my %fh;
my $enough_filehandles = 1;
# We need a filehandle for STDOUT and STDERR
# open3 uses 2 extra filehandles temporarily
for my $i (1..4) {
$enough_filehandles &&= open($fh{$i},"</dev/null");
}
for (values %fh) { close $_; }
return $enough_filehandles;
} else {
return 1;
}
}
sub user_requested_processes {
# Parse the number of processes that the user asked for
my $opt_j = shift;
@ -787,15 +852,30 @@ sub drain_job_queue {
}
}
sub start_more_jobs {
my $jobs_started = 0;
while($Global::running_jobs < $Global::processes_to_run
and
start_another_job()) {
$jobs_started++;
}
return $jobs_started;
}
sub start_another_job {
# Grab a job from @Global::command, start it
# and remember the pid, the STDOUT and the STDERR handles
# If no more jobs: do nothing
my $command = next_command_line();
if(defined $command) {
my %jobinfo = start_job($command);
$Global::running{$jobinfo{"pid"}} = \%jobinfo;
return 1;
# Do we have enough file handles to start another job?
if(enough_file_handles()) {
my $command = next_command_line();
if(defined $command) {
my %jobinfo = start_job($command);
$Global::running{$jobinfo{"pid"}} = \%jobinfo;
return 1;
} else {
return 0;
}
} else {
return 0;
}
@ -831,13 +911,17 @@ sub start_job {
open STDOUT, ">&", $Global::original_stdout or die "Can't dup \$oldout: $!";
open STDERR, ">&", $Global::original_stderr or die "Can't dup \$oldout: $!";
$Global::job_start_sequence++;
if($Global::grouped) {
return ("pid" => $pid,
return ("seq" => $Global::job_start_sequence,
"pid" => $pid,
"out" => $out{$outname},
"err" => $err{$errname},
"command" => $command);
} else {
return ("pid" => $pid, "command" => $command);
return ("seq" => $Global::job_start_sequence,
"pid" => $pid,
"command" => $command);
}
}
@ -912,10 +996,24 @@ sub Reaper {
my $stiff;
debug("Reaper called $Global::reaperlevel\n");
while (($stiff = waitpid(-1, &WNOHANG)) > 0) {
print_job($Global::running{$stiff});
delete $Global::running{$stiff};
$Global::running_jobs--;
start_another_job();
if($Global::keeporder) {
$Global::print_later{$Global::running{$stiff}{"seq"}} = $Global::running{$stiff};
debug("died: $Global::running{$stiff}{'seq'}");
while($Global::print_later{$Global::job_end_sequence}) {
debug("Found job end $Global::job_end_sequence");
print_job($Global::print_later{$Global::job_end_sequence});
delete $Global::print_later{$Global::job_end_sequence};
$Global::job_end_sequence++;
}
delete $Global::running{$stiff};
$Global::running_jobs--;
start_more_jobs();
} else {
print_job($Global::running{$stiff});
delete $Global::running{$stiff};
$Global::running_jobs--;
start_more_jobs();
}
}
ReapIfNeeded();
debug("Reaper exit $Global::reaperlevel\n");
@ -933,7 +1031,7 @@ sub die_usage {
sub usage {
print "Usage:\n";
print "parallel [-0cfgqsuvxX] [-j num] [command [arguments]] < list_of_arguments\n";
print "parallel [-0cdfgkqsuvxX] [-j num] [command [arguments]] < list_of_arguments\n";
}
#
@ -999,5 +1097,6 @@ sub my_dump {
# Keep perl -w happy
$main::opt_u = $main::opt_c = $main::opt_f = $main::opt_q =
$main::opt_0 = $main::opt_s = $main::opt_v = $main::opt_g =
$main::opt_j = $main::opt_d = $main::opt_x = $main::opt_X =1;
$Global::xargs = 1;
$main::opt_j = $main::opt_d = $main::opt_x = $main::opt_X =
$main::opt_k =
$Global::xargs = $Global::keeporder = 0;

View file

@ -124,7 +124,7 @@
.\" ========================================================================
.\"
.IX Title "PARALLEL 1"
.TH PARALLEL 1 "2009-08-26" "perl v5.10.0" "User Contributed Perl Documentation"
.TH PARALLEL 1 "2009-09-02" "perl v5.10.0" "User Contributed Perl Documentation"
.\" For nroff, turn off justification. Always turn off hyphenation; it makes
.\" way too many mistakes in technical documents.
.if n .ad l
@ -133,7 +133,7 @@
parallel \- build and execute command lines from standard input in parallel
.SH "SYNOPSIS"
.IX Header "SYNOPSIS"
\&\fBparallel\fR [\-0cfgqsuvxX] [\-j num] [command [arguments]] < list_of_arguments
\&\fBparallel\fR [\-0cfgkqsuvxX] [\-j num] [command [arguments]] < list_of_arguments
.SH "DESCRIPTION"
.IX Header "DESCRIPTION"
For each line of input \fBparallel\fR will execute \fBcommand\fR with the
@ -186,6 +186,10 @@ If the evaluated number is less than 1 then 1 will be used.
.IX Item "-j N%"
Multiply N% with the number of CPUs. Run this many jobs in parallel.
If the evaluated number is less than 1 then 1 will be used.
.IP "\fB\-k\fR" 9
.IX Item "-k"
Keep sequence of output same as the order of input. If jobs 1 2 3 4
end in the sequence 3 1 4 2 the output will still be 1 2 3 4.
.IP "\fB\-q\fR" 9
.IX Item "-q"
Quote \fBcommand\fR. This will quote the command line so special
@ -305,27 +309,46 @@ files in each directory:
.PP
To put the output in a file called <name>.dir:
.PP
\&\fBls | parallel '(echo \-n {}\*(L" \*(R"; ls {}|wc \-l) \fR>\fB {}.dir'\fR
\&\fBls | parallel '(echo \-n {}\*(L" \*(R"; ls {}|wc \-l) \fR> \fB{}.dir'\fR
.SH "EXAMPLE 6: Context replace"
.IX Header "EXAMPLE 6: Context replace"
To remove the files \fIpict1000.jpg\fR .. \fIpict9999.jpg\fR you could do:
To remove the files \fIpict0000.jpg\fR .. \fIpict9999.jpg\fR you could do:
.PP
\&\fBseq 1000 9999 | parallel rm pict{}.jpg\fR
\&\fBseq \-f \f(CB%04g\fB 0 9999 | parallel rm pict{}.jpg\fR
.PP
You could also do:
.PP
\&\fBseq 1000 9999 | perl \-pe 's/(.*)/pict$1.jpg/' | parallel \-x rm\fR
\&\fBseq \-f \f(CB%04g\fB 0 9999 | perl \-pe 's/(.*)/pict$1.jpg/' | parallel \-x rm\fR
.PP
The first will run \fBrm\fR 8999 times, while the last will only run
The first will run \fBrm\fR 10000 times, while the last will only run
\&\fBrm\fR as many times needed to keep the command line length short
enough.
enough (typically 1\-2 times).
.PP
You could also run:
.PP
\&\fBseq 1000 9999 | parallel \-X rm pict{}.jpg\fR
\&\fBseq \-f \f(CB%04g\fB 0 9999 | parallel \-X rm pict{}.jpg\fR
.PP
This will also only run \fBrm\fR as many times needed to keep the command
line length short enough.
.SH "EXAMPLE 7: Keep order of output same as order of input"
.IX Header "EXAMPLE 7: Keep order of output same as order of input"
Normally the output of a job will be printed as soon as it
completes. Sometimes you want the order of the output to remain the
same as the order of the input. \fB\-k\fR will make sure the order of
output will be in the same order as input even if later jobs end
before earlier jobs.
.PP
If you have a directory with subdirectories that contain different
amount of files running:
.PP
\&\fBls | sort | parallel \-v \*(L"ls {} | wc\*(R"\fR
.PP
will give the output of each dir, but it will be sorted accoring to
which job completed first.
.PP
To keep the order the same as input run:
.PP
\&\fBls | sort | parallel \-kv \*(L"ls {} | wc\*(R"\fR
.SH "QUOTING"
.IX Header "QUOTING"
For more advanced use quoting may be an issue. The following will
@ -357,6 +380,15 @@ If you get errors like:
.PP
then you might try using \fB\-q\fR.
.PP
If you are using \fBbash\fR process substitution like \fB<(cat foo)\fR then
you may try \fB\-q\fR and prepending \fBcommand\fR with \fBbash \-c\fR:
.PP
\&\fBls | parallel \-q bash \-c 'wc \-c <(echo {})'\fR
.PP
Or for substituting output:
.PP
\&\fBls | parallel \-q bash \-c 'tar c {} | tee \fR>\fB(gzip \fR>\fB{}.tar.gz) | bzip2 \fR>\fB{}.tar.bz2'\fR
.PP
\&\fBConclusion\fR: To avoid dealing with the quoting problems it may be
easier just to write a small script and have \fBparallel\fR call that
script.
@ -369,20 +401,17 @@ script.
hosts or URLs) will require creating these inputs as files. \fBfind
\&\-exec\fR has no support for running commands in parallel.
.PP
\&\fBxargs\fR deals badly with special characters (such as space, ' and ") unless
\&\fB\-0\fR is specified. Many input generators are not optimized for using
\&\fB\s-1NUL\s0\fR as separator but are optimized for \fBnewline\fR as separator. E.g
\&\fBhead\fR, \fBtail\fR, \fBawk\fR, \fBls\fR, \fBecho\fR, \fBsed\fR, \fBtar \-v\fR, \fBperl\fR
(\-0 and \e0 instead of \en), \fBlocate\fR (requires using \-0), \fBfind\fR
(requires using \-print0), \fBgrep\fR (requires user to use \-z or \-Z).
.PP
The input \fIcan\fR be fixed for \fBxargs\fR with:
.PP
tr '\en' '\e0'
\&\fBxargs\fR deals badly with special characters (such as space, ' and ")
unless \fB\-0\fR or \fB\-d \*(L"\en\*(R"\fR is specified. Many input generators are not
optimized for using \fB\s-1NUL\s0\fR as separator but are optimized for
\&\fBnewline\fR as separator. E.g \fBhead\fR, \fBtail\fR, \fBawk\fR, \fBls\fR, \fBecho\fR,
\&\fBsed\fR, \fBtar \-v\fR, \fBperl\fR (\-0 and \e0 instead of \en), \fBlocate\fR
(requires using \-0), \fBfind\fR (requires using \-print0), \fBgrep\fR
(requires user to use \-z or \-Z).
.PP
So \fBparallel\fR's newline separation can be emulated with:
.PP
cat | tr '\en' '\e0' | xargs \-0 \-n1 \fIcommand\fR
\&\fBcat | xargs \-d \*(L"\en\*(R" \-n1 \f(BIcommand\fB\fR
.PP
\&\fBxargs\fR can run a given number of jobs in parallel, but has no
support for running no_of_cpus jobs in parallel.
@ -391,23 +420,42 @@ support for running no_of_cpus jobs in parallel.
run together, e.g. the first half of a line is from one process and
the last half of the line is from another process.
.PP
If no command is given to \fBxargs\fR it defaults to /bin/echo. So the
\&\fBcat | sh\fR functionality is missing.
\&\fBxargs\fR has no support for keeping the order of the output, therefore
output of the second job cannot be postponed till the first job is done.
.PP
\&\fBxargs\fR has no support for context replace, so you will have to create the
arguments.
.PP
If you use a replace string in \fBxargs\fR (\fB\-I\fR) you can not force
\&\fBxargs\fR to use more than one argument.
.PP
Quoting in \fBxargs\fR works like \fB\-q\fR in \fBparallel\fR. This means
composed commands and redirection is impossible: \fBls | parallel "wc
{} \fR> \fB{}.wc"\fR or \fBls | parallel \*(L"echo {}; ls {}|wc\*(R"\fR cannot be done using
\&\fBxargs\fR.
composed commands and redirection requires using \fBbash \-c\fR.
.PP
\&\fBls | parallel "wc {} \fR> \fB{}.wc"\fR
.PP
becomes
.PP
\&\fBls | xargs \-d \*(L"\en\*(R" \-P10 \-I {} bash \-c "wc {} \fR>\fB {}.wc"\fR
.PP
and
.PP
\&\fBls | parallel \*(L"echo {}; ls {}|wc\*(R"\fR
.PP
becomes
.PP
\&\fBls | xargs \-d \*(L"\en\*(R" \-P10 \-I {} bash \-c \*(L"echo {}; ls {}|wc\*(R"\fR
.SH "BUGS"
.IX Header "BUGS"
Filenames beginning with '\-' can cause some commands to give
unexpected results, as it will often be interpreted as an option.
.PP
Some Bash specific constructs like '<(cmd)' do not work. They can be
fixed by prepending '/bin/bash \-c'. How much slower will that be?
.SH "REPORTING BUGS"
.IX Header "REPORTING BUGS"
Report bugs to <bug\-parallel@tange.dk>.
.SH "IDEAS"
.IX Header "IDEAS"
xargs dropin-replacement.
Implement the missing \-\-features
.SH "AUTHOR"
.IX Header "AUTHOR"
Copyright (C) 2007\-10\-18 Ole Tange, http://ole.tange.dk

View file

@ -0,0 +1,32 @@
begin
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
end

View file

@ -0,0 +1,6 @@
#!/bin/bash
# Test -k
ulimit -n 50
(echo "sleep 3; echo begin"; seq 1 30 | parallel -kq echo "sleep 1; echo {}"; echo "echo end") \
| parallel -k -j0

View file

@ -0,0 +1,32 @@
begin
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
end