parallel: If --block is left out, --pipepart will use a block size that will result in 10 jobs per jobslot.

This commit is contained in:
Ole Tange 2016-05-26 00:13:48 +02:00
parent d570ec2d20
commit 51f212e548
7 changed files with 91 additions and 35 deletions

View file

@ -120,17 +120,30 @@ $Global::JobQueue = JobQueue->new(
$number_of_args,\@Global::transfer_files,\@Global::ret_files); $number_of_args,\@Global::transfer_files,\@Global::ret_files);
if($opt::pipepart) { if($opt::pipepart) {
if($opt::roundrobin) { if(not $opt::blocksize or $opt::roundrobin) {
# Compute size of -a # --block not set =>
# compute reasonable value giving 10 jobs per jobslot
# --roundrobin => divide equally between jobslots
my $size = 0; my $size = 0;
# Compute size of -a
$size += -s $_ for @opt::a; $size += -s $_ for @opt::a;
# Compute $Global::max_jobs_running # Compute $Global::max_jobs_running
$Global::dummy_jobs = 1;
for my $sshlogin (values %Global::host) { for my $sshlogin (values %Global::host) {
$sshlogin->max_jobs_running(); $sshlogin->max_jobs_running();
} }
$Global::max_jobs_running or ::die_bug("Global::max_jobs_running not set"); $Global::max_jobs_running or
::die_bug("Global::max_jobs_running not set");
if($opt::roundrobin) {
# Run in total $job_slots jobs
# Set --blocksize = size / no of proc # Set --blocksize = size / no of proc
$opt::blocksize = 1 + $size / $Global::max_jobs_running; $Global::blocksize = 1 + int($size / $Global::max_jobs_running);
} else {
# Run in total $job_slots*10 jobs
# Set --blocksize = size / no of proc / 10
$Global::blocksize = 1 +
int($size / $Global::max_jobs_running / 10);
}
} }
@Global::cat_partials = map { pipe_part_files($_) } @opt::a; @Global::cat_partials = map { pipe_part_files($_) } @opt::a;
# Unget the empty arg as many times as there are parts # Unget the empty arg as many times as there are parts
@ -215,7 +228,7 @@ sub pipe_part_files {
} }
my $header = find_header(\$buf,open_or_exit($file)); my $header = find_header(\$buf,open_or_exit($file));
# find positions # find positions
my @pos = find_split_positions($file,$opt::blocksize,length $header); my @pos = find_split_positions($file,$Global::blocksize,length $header);
# Make @cat_partials # Make @cat_partials
my @cat_partials = (); my @cat_partials = ();
for(my $i=0; $i<$#pos; $i++) { for(my $i=0; $i<$#pos; $i++) {
@ -230,7 +243,7 @@ sub find_header {
# $fh = filehandle to read from # $fh = filehandle to read from
# Uses: # Uses:
# $opt::header # $opt::header
# $opt::blocksize # $Global::blocksize
# Returns: # Returns:
# $header string # $header string
my ($buf_ref, $fh) = @_; my ($buf_ref, $fh) = @_;
@ -239,7 +252,7 @@ sub find_header {
if($opt::header eq ":") { $opt::header = "(.*\n)"; } if($opt::header eq ":") { $opt::header = "(.*\n)"; }
# Number = number of lines # Number = number of lines
$opt::header =~ s/^(\d+)$/"(.*\n)"x$1/e; $opt::header =~ s/^(\d+)$/"(.*\n)"x$1/e;
while(read($fh,substr($$buf_ref,length $$buf_ref,0),$opt::blocksize)) { while(read($fh,substr($$buf_ref,length $$buf_ref,0),$Global::blocksize)) {
if($$buf_ref=~s/^($opt::header)//) { if($$buf_ref=~s/^($opt::header)//) {
$header = $1; $header = $1;
last; last;
@ -318,7 +331,7 @@ sub spreadstdin {
# read a record # read a record
# Spawn a job and print the record to it. # Spawn a job and print the record to it.
# Uses: # Uses:
# $opt::blocksize # $Global::blocksize
# STDIN # STDIN
# $opt::r # $opt::r
# $Global::max_lines # $Global::max_lines
@ -335,7 +348,7 @@ sub spreadstdin {
my $chunk_number = 1; my $chunk_number = 1;
my $one_time_through; my $one_time_through;
my $two_gb = 2**31-1; my $two_gb = 2**31-1;
my $blocksize = $opt::blocksize; my $blocksize = $Global::blocksize;
my $in = *STDIN; my $in = *STDIN;
my $header = find_header(\$buf,$in); my $header = find_header(\$buf,$in);
while(1) { while(1) {
@ -998,11 +1011,10 @@ sub parse_options {
if(@opt::transfer_files) { push @Global::transfer_files, @opt::transfer_files; } if(@opt::transfer_files) { push @Global::transfer_files, @opt::transfer_files; }
if(not defined $opt::recstart and if(not defined $opt::recstart and
not defined $opt::recend) { $opt::recend = "\n"; } not defined $opt::recend) { $opt::recend = "\n"; }
if(not defined $opt::blocksize) { $opt::blocksize = "1M"; } $Global::blocksize = multiply_binary_prefix($opt::blocksize || "1M");
$opt::blocksize = multiply_binary_prefix($opt::blocksize); if($Global::blocksize > 2**31-1) {
if($opt::blocksize > 2**31-1) {
warning("--blocksize >= 2G causes problems. Using 2G-1."); warning("--blocksize >= 2G causes problems. Using 2G-1.");
$opt::blocksize = 2**31-1; $Global::blocksize = 2**31-1;
} }
$opt::memfree = multiply_binary_prefix($opt::memfree); $opt::memfree = multiply_binary_prefix($opt::memfree);
check_invalid_option_combinations(); check_invalid_option_combinations();
@ -5130,7 +5142,7 @@ sub compute_number_of_processes {
$max_system_proc_reached and last; $max_system_proc_reached and last;
my $before_getting_arg = time; my $before_getting_arg = time;
if(!$opt::roundrobin) { if(!$Global::dummy_jobs) {
get_args_or_jobs() or last; get_args_or_jobs() or last;
} }
$wait_time_for_getting_args += time - $before_getting_arg; $wait_time_for_getting_args += time - $before_getting_arg;

View file

@ -1133,13 +1133,19 @@ B<--files> is often used with B<--pipe>.
B<--pipe> maxes out at around 1 GB/s input, and 100 MB/s output. If B<--pipe> maxes out at around 1 GB/s input, and 100 MB/s output. If
performance is important use B<--pipepart>. performance is important use B<--pipepart>.
See also: B<--recstart>, B<--recend>, B<--fifo>, B<--cat>, B<--pipepart>. See also: B<--recstart>, B<--recend>, B<--fifo>, B<--cat>,
B<--pipepart>, B<--files>.
=item B<--pipepart> =item B<--pipepart>
Pipe parts of a physical file. B<--pipepart> works similar to Pipe parts of a physical file. B<--pipepart> works similar to
B<--pipe>, but is much faster. It has a few limitations: B<--pipe>, but is much faster.
If B<--block> is left out, B<--pipepart> will use a block size that
will result in 10 jobs per jobslot.
B<--pipepart> has a few limitations:
=over 3 =over 3
@ -2866,7 +2872,7 @@ Or if the regexps are fixed strings:
grep -F -f regexps.txt bigfile grep -F -f regexps.txt bigfile
There are 2 limiting factors: CPU and disk I/O. CPU is easy to There are 2 limiting factors: CPU and disk I/O. CPU is easy to
measure: If the grep takes >90% CPU (e.g. when running top), then the measure: If the B<grep> takes >90% CPU (e.g. when running top), then the
CPU is a limiting factor, and parallelization will speed this up. If CPU is a limiting factor, and parallelization will speed this up. If
not, then disk I/O is the limiting factor, and depending on the disk not, then disk I/O is the limiting factor, and depending on the disk
system it may be faster or slower to parallelize. The only way to know system it may be faster or slower to parallelize. The only way to know
@ -2876,23 +2882,23 @@ If the CPU is the limiting factor parallelization should be done on the regexps:
cat regexp.txt | parallel --pipe -L1000 --round-robin grep -f - bigfile cat regexp.txt | parallel --pipe -L1000 --round-robin grep -f - bigfile
If a line matches multiple regexps, the line may be duplicated. The command If a line matches multiple regexps, the line may be duplicated. The
will start one grep per CPU and read bigfile one time per CPU, command will start one B<grep> per CPU and read I<bigfile> one time
but as that is done in parallel, all reads except the first will be per CPU, but as that is done in parallel, all reads except the first
cached in RAM. Depending on the size of regexp.txt it may be faster to will be cached in RAM. Depending on the size of I<regexp.txt> it may
use --block 10m instead of -L1000. If regexp.txt is too big to fit in be faster to use B<--block 10m> instead of B<-L1000>. If I<regexp.txt>
RAM, remove --round-robin and adjust -L1000. This will cause bigfile is too big to fit in RAM, remove B<--round-robin> and adjust
to be read more times. B<-L1000>. This will cause I<bigfile> to be read more times.
Some storage systems perform better when reading multiple chunks in Some storage systems perform better when reading multiple chunks in
parallel. This is true for some RAID systems and for some network file parallel. This is true for some RAID systems and for some network file
systems. To parallelize the reading of bigfile: systems. To parallelize the reading of I<bigfile>:
parallel --pipepart --block 100M -a bigfile -k grep -f regexp.txt parallel --pipepart --block 100M -a bigfile -k grep -f regexp.txt
This will split bigfile into 100MB chunks and run grep on each of This will split I<bigfile> into 100MB chunks and run B<grep> on each of
these chunks. To parallelize both reading of bigfile and regexp.txt these chunks. To parallelize both reading of I<bigfile> and I<regexp.txt>
combine the two using --fifo: combine the two using B<--fifo>:
parallel --pipepart --block 100M -a bigfile --fifo cat regexp.txt \ parallel --pipepart --block 100M -a bigfile --fifo cat regexp.txt \
\| parallel --pipe -L1000 --round-robin grep -f - {} \| parallel --pipe -L1000 --round-robin grep -f - {}
@ -4860,8 +4866,9 @@ it also uses rsync with ssh.
=head1 SEE ALSO =head1 SEE ALSO
B<ssh>(1), B<rsync>(1), B<find>(1), B<xargs>(1), B<dirname>(1), B<ssh>(1), B<ssh-agent>(1), B<sshpass>(1), B<ssh-copy-id>(1),
B<make>(1), B<pexec>(1), B<ppss>(1), B<xjobs>(1), B<prll>(1), B<rsync>(1), B<find>(1), B<xargs>(1), B<dirname>(1), B<make>(1),
B<dxargs>(1), B<mdm>(1) B<pexec>(1), B<ppss>(1), B<xjobs>(1), B<prll>(1), B<dxargs>(1),
B<mdm>(1)
=cut =cut

View file

@ -1819,7 +1819,7 @@
<p>When using <b>--cat</b>, <b>--pipepart</b>, or when a job is run on a remote machine, the command is wrapped with helper scripts. <b>-vv</b> shows all of this.</p> <p>When using <b>--cat</b>, <b>--pipepart</b>, or when a job is run on a remote machine, the command is wrapped with helper scripts. <b>-vv</b> shows all of this.</p>
<pre><code> parallel -vv --pipepart wc :::: num30000</code></pre> <pre><code> parallel -vv --pipepart --block 1M wc :::: num30000</code></pre>
<p>Output:</p> <p>Output:</p>

View file

@ -1816,7 +1816,7 @@ When using B<--cat>, B<--pipepart>, or when a job is run on a remote
machine, the command is wrapped with helper scripts. B<-vv> shows all machine, the command is wrapped with helper scripts. B<-vv> shows all
of this. of this.
parallel -vv --pipepart wc :::: num30000 parallel -vv --pipepart --block 1M wc :::: num30000
Output: Output:

View file

@ -596,6 +596,16 @@ echo '### bug #34422: parallel -X --eta crashes with div by zero'
# We do not care how long it took # We do not care how long it took
seq 2 | stdout parallel -X --eta echo | grep -E -v 'ETA:.*AVG' seq 2 | stdout parallel -X --eta echo | grep -E -v 'ETA:.*AVG'
echo '**'
echo '### --pipepart autoset --block => 10*joblots'
seq 1000 > /run/shm/parallel$$;
parallel -j2 -k --pipepart echo {#} :::: /run/shm/parallel$$;
rm /run/shm/parallel$$
echo '**'
EOF EOF
echo '### 1 .par file from --files expected' echo '### 1 .par file from --files expected'

View file

@ -1638,5 +1638,32 @@ Computers / CPU cores / Max jobs to run
1:local / 8 / 2 1:local / 8 / 2
Computer:jobs running/jobs completed/%of started jobs/Average seconds to complete Computer:jobs running/jobs completed/%of started jobs/Average seconds to complete
echo '**'
**
echo '### --pipepart autoset --block => 10*joblots'
### --pipepart autoset --block => 10*joblots
seq 1000 > /run/shm/parallel$$; parallel -j2 -k --pipepart echo {#} :::: /run/shm/parallel$$; rm /run/shm/parallel$$
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
echo '**'
**
### 1 .par file from --files expected ### 1 .par file from --files expected
0 0

View file

@ -846,7 +846,7 @@ _
parallel --env _ -S $SERVER1 'echo $VAR; my_func2' ::: bar parallel --env _ -S $SERVER1 'echo $VAR; my_func2' ::: bar
/bin/bash: my_func2: command not found /bin/bash: my_func2: command not found
parallel -vv --pipepart wc :::: num30000 parallel -vv --pipepart --block 1M wc :::: num30000
<num30000 perl -e 'while(@ARGV) { sysseek(STDIN,shift,0) || die; $left = shift; while($read = sysread(STDIN,$buf, ($left > 131072 ? 131072 : $left))){ $left -= $read; syswrite(STDOUT,$buf); } }' 0 0 0 168894 | (wc) <num30000 perl -e 'while(@ARGV) { sysseek(STDIN,shift,0) || die; $left = shift; while($read = sysread(STDIN,$buf, ($left > 131072 ? 131072 : $left))){ $left -= $read; syswrite(STDOUT,$buf); } }' 0 0 0 168894 | (wc)
30000 30000 168894 30000 30000 168894
my_func3() { my_func3() {