parallel: --pipe now forks instead of busy looping.

This commit is contained in:
Ole Tange 2012-05-24 02:03:57 +02:00
parent 91899593fc
commit 22b295cef1
8 changed files with 69 additions and 142 deletions

View file

@ -1,6 +1,6 @@
Summary: Shell tool for executing jobs in parallel
Name: parallel
Version: 20120422
Version: 20120522
Release: 1
License: GPL
Group: Productivity/File utilities

View file

@ -236,16 +236,12 @@ sub spreadstdin {
if($Global::max_number_of_args) {
# -N => (start..*?end){n}
while($buf =~ s/((?:$recstart.*?$recend){$Global::max_number_of_args})($recstart.*)$/$2/os) {
$record = $header.$1;
::debug("Read record -N: ".length($record)."\n");
write_record_to_pipe(\$record,$recstart,$recend);
write_record_to_pipe(\$header,\$1,$recstart,$recend,length $1);
}
} else {
# Find the last recend-recstart in $buf
if($buf =~ s/(.*$recend)($recstart.*?)$/$2/os) {
$record = $header.$1;
::debug("Matched record: ".length($record)."/".length($buf)."\n");
write_record_to_pipe(\$record,$recstart,$recend);
write_record_to_pipe(\$header,\$1,$recstart,$recend,length $1);
}
}
} else {
@ -254,20 +250,16 @@ sub spreadstdin {
my $i = 0;
while(($i = nindex(\$buf,$recendrecstart,$Global::max_number_of_args)) != -1) {
$i += length $recend; # find the actual splitting location
my $record = $header.substr($buf,0,$i);
write_record_to_pipe(\$header,\$buf,$recstart,$recend,$i);
substr($buf,0,$i) = "";
::debug("Read record: ".length($record)."\n");
write_record_to_pipe(\$record,$recstart,$recend);
}
} else {
# Find the last recend-recstart in $buf
my $i = rindex($buf,$recendrecstart);
if($i != -1) {
$i += length $recend; # find the actual splitting location
my $record = $header.substr($buf,0,$i);
write_record_to_pipe(\$header,\$buf,$recstart,$recend,$i);
substr($buf,0,$i) = "";
# ::debug("Read record: ".length($record)."\n");
write_record_to_pipe(\$record,$recstart,$recend);
}
}
}
@ -275,12 +267,10 @@ sub spreadstdin {
}
# If there is anything left in the buffer write it
substr($buf,0,0) = $header;
write_record_to_pipe(\$buf,$recstart,$recend);
substr($buf,0,0) = "";
write_record_to_pipe(\$header,\$buf,$recstart,$recend,length $buf);
::debug("Done reading input\n");
flush_and_close_pipes();
::debug("Done flushing to children\n");
$Global::start_no_new_jobs = 1;
}
@ -299,74 +289,45 @@ sub nindex {
return $i;
}
sub flush_and_close_pipes {
# Flush that that is cached to the open pipes
# and close them.
my $flush_done;
my $sleep = 0.05;
do {
$flush_done = 1;
# Make sure everything is written to the jobs
for my $job (values %Global::running) {
if($job->remaining()) {
if($job->complete_write()) {
# Some data was written - reset sleep timer
$sleep = 0.05;
}
$flush_done = 0;
}
}
$sleep = ::reap_usleep($sleep);
} while (not $flush_done);
for my $job (values %Global::running) {
my $fh = $job->stdin();
close $fh;
}
}
sub write_record_to_pipe {
# Fork then
# Write record from pos 0 .. $endpos to pipe
my $header_ref = shift;
my $record_ref = shift;
my $recstart = shift;
my $recend = shift;
my $endpos = shift;
if(length $$record_ref == 0) { return; }
# Find the minimal seq $job that has no data written == virgin
# If no virgin found, backoff
my $sleep = 0.0001; # 0.01 ms - better performance on highend
while(not @Global::virgin_jobs) {
::debug("No virgin jobs");
$sleep = ::reap_usleep($sleep);
start_more_jobs(); # These jobs may not be started because of loadavg
}
my $job = shift @Global::virgin_jobs;
if(fork()) {
# Skip
} else {
# Chop of at $endpos as we do not know how many rec_sep will
# be removed.
my $record = substr($$record_ref,0,$endpos);
# Remove rec_sep
if($::opt_remove_rec_sep) {
# Remove record separator
$$record_ref =~ s/$recend$recstart//gos;
$$record_ref =~ s/^$recstart//os;
$$record_ref =~ s/$recend$//os;
$record =~ s/$recend$recstart//gos;
$record =~ s/^$recstart//os;
$record =~ s/$recend$//os;
}
# Keep the pipes hot, but if nothing happens sleep should back off
my $sleep = 0.00001; # 0.00001 ms - better performance on highend
write_record: while(1) {
# Sorting according to sequence is necessary for -k to work
for my $job (sort { $a->seq() <=> $b->seq() } values %Global::running) {
::debug("Looking at ",$job->seq(),"-",$job->remaining(),"-",$job->datawritten(),"\n");
if($job->remaining()) {
# Part of the job's last record has not finished being written
if($job->complete_write()) {
# Something got written - reset sleep timer
$sleep = 0.00001;
}
} else {
if($job->datawritten() > 0) {
# There is no data remaining and we have written data before:
# So this means we have completed writing a block.
# close stdin
# This will cause the job to finish and when it dies we will spawn another job
$job->write($header_ref);
$job->write(\$record);
my $fh = $job->stdin();
close $fh;
} else {
$job->write($record_ref);
# Something got written - reset sleep timer
$sleep = 0.00001;
last write_record;
}
}
}
# Maybe this should be in an if statement: if sleep > 0.001: start more
start_more_jobs(); # These jobs may not be started because of loadavg
$sleep = ::reap_usleep($sleep);
exit;
}
my $fh = $job->stdin();
close $fh;
return;
}
@ -538,7 +499,7 @@ sub get_options_from_array {
sub parse_options {
# Returns: N/A
# Defaults:
$Global::version = 20120522;
$Global::version = 20120523;
$Global::progname = 'parallel';
$Global::infinity = 2**31;
$Global::debug = 0;
@ -1055,6 +1016,7 @@ sub __RUNNING_THE_JOBS_AND_PRINTING_PROGRESS__ {}
# Variable structure:
#
# $Global::running{$pid} = Pointer to Job-object
# @Global::virgin_jobs = Pointer to Job-object that have received no input
# $Global::host{$sshlogin} = Pointer to SSHLogin-object
# $Global::total_running = total number of running jobs
# $Global::total_started = total jobs started
@ -1159,6 +1121,9 @@ sub start_another_job {
debug("Command to run on '".$job->sshlogin()."': '".$job->replaced()."'\n");
if($job->start()) {
$Global::running{$job->pid()} = $job;
if($::opt_pipe) {
push(@Global::virgin_jobs,$job);
}
debug("Started as seq ",$job->seq()," pid:",$job->pid(),"\n");
return 1;
} else {
@ -2036,7 +2001,6 @@ sub my_dump {
sub __OBJECT_ORIENTED_PARTS__ {}
package SSHLogin;
sub new {
@ -3051,49 +3015,14 @@ sub stdin {
sub set_stdin {
my $self = shift;
my $stdin = shift;
# set non-blocking
fcntl($stdin, ::F_SETFL, ::O_NONBLOCK) or
::die_bug("Couldn't set flags for HANDLE: $!");
$self->{'stdin'} = $stdin;
}
sub write {
my $self = shift;
my $remaining_ref = shift;
if(length($$remaining_ref)) {
$self->{'remaining'} .= $$remaining_ref;
$self->complete_write();
}
}
sub complete_write {
# Returns:
# number of bytes written (see syswrite)
my $self = shift;
my $in = $self->{'stdin'};
my $len = syswrite($in,$self->{'remaining'});
if (!defined($len) && $! == &::EAGAIN) {
# write would block;
} else {
# Remove the part that was written
substr($self->{'remaining'},0,$len) = "";
$self->{'datawritten'} += $len;
}
return $len;
}
sub remaining {
my $self = shift;
if(defined $self->{'remaining'}) {
return length $self->{'remaining'};
} else {
return undef;
}
}
sub datawritten {
my $self = shift;
return $self->{'datawritten'};
syswrite($in,$$remaining_ref);
}
sub pid {

View file

@ -538,7 +538,7 @@ specified, and for B<-I>{} otherwise. This option is deprecated;
use B<-I> instead.
=item B<--joblog> I<logfile>
=item B<--joblog> I<logfile> (alpha testing)
Logfile for executed jobs. Save a list of the executed jobs to
I<logfile> in the following TAB separated format: sequence number,
@ -660,7 +660,7 @@ B<-l 0> is an alias for B<-l 1>.
Implies B<-X> unless B<-m> is set.
=item B<--load> I<max-load>
=item B<--load> I<max-load> (alpha testing)
Do not start new jobs on a given computer unless the load is less than
I<max-load>. I<max-load> uses the same syntax as B<--jobs>, so I<100%>
@ -755,9 +755,9 @@ Instead of printing the output to stdout (standard output) the output
of each job is saved in a file and the filename is then printed.
=item B<--pipe>
=item B<--pipe> (alpha testing)
=item B<--spreadstdin>
=item B<--spreadstdin> (alpha testing)
Spread input to jobs on stdin (standard input). Read a block of data
from stdin (standard input) and give one block of data as input to one

View file

@ -570,8 +570,8 @@ This option is a synonym for @strong{-I}@emph{replace-str} if @emph{replace-str}
specified, and for @strong{-I}@{@} otherwise. This option is deprecated;
use @strong{-I} instead.
@item @strong{--joblog} @emph{logfile}
@anchor{@strong{--joblog} @emph{logfile}}
@item @strong{--joblog} @emph{logfile} (alpha testing)
@anchor{@strong{--joblog} @emph{logfile} (alpha testing)}
Logfile for executed jobs. Save a list of the executed jobs to
@emph{logfile} in the following TAB separated format: sequence number,
@ -712,8 +712,8 @@ The @strong{-l} option is deprecated since the POSIX standard specifies
Implies @strong{-X} unless @strong{-m} is set.
@item @strong{--load} @emph{max-load}
@anchor{@strong{--load} @emph{max-load}}
@item @strong{--load} @emph{max-load} (alpha testing)
@anchor{@strong{--load} @emph{max-load} (alpha testing)}
Do not start new jobs on a given computer unless the load is less than
@emph{max-load}. @emph{max-load} uses the same syntax as @strong{--jobs}, so @emph{100%}
@ -810,11 +810,11 @@ all the output from one server will be grouped together.
Instead of printing the output to stdout (standard output) the output
of each job is saved in a file and the filename is then printed.
@item @strong{--pipe}
@anchor{@strong{--pipe}}
@item @strong{--pipe} (alpha testing)
@anchor{@strong{--pipe} (alpha testing)}
@item @strong{--spreadstdin}
@anchor{@strong{--spreadstdin}}
@item @strong{--spreadstdin} (alpha testing)
@anchor{@strong{--spreadstdin} (alpha testing)}
Spread input to jobs on stdin (standard input). Read a block of data
from stdin (standard input) and give one block of data as input to one

View file

@ -3,8 +3,9 @@
export LANG=C
SHFILE=/tmp/unittest-parallel.sh
# Try a failing test twice.
ls -t tests-to-run/*${1}*.sh \
| perl -pe 's:(.*/(.*)).sh:bash $1.sh > actual-results/$2; diff -Naur wanted-results/$2 actual-results/$2:' \
| perl -pe 's:(.*/(.*)).sh:bash $1.sh > actual-results/$2; diff -Naur wanted-results/$2 actual-results/$2 >/dev/null || bash $1.sh > actual-results/$2; diff -Naur wanted-results/$2 actual-results/$2:' \
>$SHFILE
mkdir -p actual-results

View file

@ -1,11 +1,11 @@
#!/bin/bash
rm -rf tmp 2>/dev/null
cd input-files
tar xjf random_dirs_with_newline.tar.bz2
cd ..
cp -a input-files/random_dirs_with_newline tmp
cd tmp
TMP=/tmp/parallel_local105
rm -rf $TMP 2>/dev/null
mkdir -p $TMP
tar -C $TMP -xf input-files/random_dirs_with_newline.tar.bz2
cd $TMP/random_dirs_with_newline
# tests if special dir names causes problems
find . -type d -print0 | perl -0 -pe 's:^./::' | parallel -0 -v touch -- {}/abc-{}-{} 2>&1 \
@ -33,4 +33,5 @@ find . -type f -print0 | perl -0 -ne '$a++;END{print $a}'
echo ' files'
cd ..
rm -rf tmp
rm -rf $TMP

View file

@ -165,7 +165,7 @@ echo "echo a" | parallel
parallel -j1 -I :: -X echo 'a::b::^c::[.}c' ::: 1
echo "### BUG: The length for -X is not close to max (131072)"
seq 1 4000 | parallel -X echo {.} aa {}{.} {}{}d{} {}dd{}d{.} |head -n 1 |wc
seq 1 4000 | parallel -k -X echo {.} aa {}{.} {}{}d{} {}dd{}d{.} |head -n 1 |wc
echo "### BUG: empty lines with --show-limit"
echo | parallel --show-limits

View file

@ -54,10 +54,6 @@ h2
21xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
22xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
Stop
Start
h1
h2
Stop
### Test --header with multiple :::
a2 b1 b1 a2
### Test --shellquote