--spreadstdin now forks a new child for each block.

This commit is contained in:
Ole Tange 2011-01-19 16:25:25 +01:00
parent 1d45092522
commit 0247e4beee
8 changed files with 113 additions and 50 deletions

20
configure vendored
View file

@ -1,6 +1,6 @@
#! /bin/sh #! /bin/sh
# Guess values for system-dependent variables and create Makefiles. # Guess values for system-dependent variables and create Makefiles.
# Generated by GNU Autoconf 2.67 for parallel 20110101. # Generated by GNU Autoconf 2.67 for parallel 20110119.
# #
# Report bugs to <bug-parallel@gnu.org>. # Report bugs to <bug-parallel@gnu.org>.
# #
@ -551,8 +551,8 @@ MAKEFLAGS=
# Identity of this package. # Identity of this package.
PACKAGE_NAME='parallel' PACKAGE_NAME='parallel'
PACKAGE_TARNAME='parallel' PACKAGE_TARNAME='parallel'
PACKAGE_VERSION='20110101' PACKAGE_VERSION='20110119'
PACKAGE_STRING='parallel 20110101' PACKAGE_STRING='parallel 20110119'
PACKAGE_BUGREPORT='bug-parallel@gnu.org' PACKAGE_BUGREPORT='bug-parallel@gnu.org'
PACKAGE_URL='' PACKAGE_URL=''
@ -1168,7 +1168,7 @@ if test "$ac_init_help" = "long"; then
# Omit some internal or obsolete options to make the list less imposing. # Omit some internal or obsolete options to make the list less imposing.
# This message is too long to be a string in the A/UX 3.1 sh. # This message is too long to be a string in the A/UX 3.1 sh.
cat <<_ACEOF cat <<_ACEOF
\`configure' configures parallel 20110101 to adapt to many kinds of systems. \`configure' configures parallel 20110119 to adapt to many kinds of systems.
Usage: $0 [OPTION]... [VAR=VALUE]... Usage: $0 [OPTION]... [VAR=VALUE]...
@ -1234,7 +1234,7 @@ fi
if test -n "$ac_init_help"; then if test -n "$ac_init_help"; then
case $ac_init_help in case $ac_init_help in
short | recursive ) echo "Configuration of parallel 20110101:";; short | recursive ) echo "Configuration of parallel 20110119:";;
esac esac
cat <<\_ACEOF cat <<\_ACEOF
@ -1301,7 +1301,7 @@ fi
test -n "$ac_init_help" && exit $ac_status test -n "$ac_init_help" && exit $ac_status
if $ac_init_version; then if $ac_init_version; then
cat <<\_ACEOF cat <<\_ACEOF
parallel configure 20110101 parallel configure 20110119
generated by GNU Autoconf 2.67 generated by GNU Autoconf 2.67
Copyright (C) 2010 Free Software Foundation, Inc. Copyright (C) 2010 Free Software Foundation, Inc.
@ -1318,7 +1318,7 @@ cat >config.log <<_ACEOF
This file contains any messages produced by compilers while This file contains any messages produced by compilers while
running configure, to aid debugging if configure makes a mistake. running configure, to aid debugging if configure makes a mistake.
It was created by parallel $as_me 20110101, which was It was created by parallel $as_me 20110119, which was
generated by GNU Autoconf 2.67. Invocation command line was generated by GNU Autoconf 2.67. Invocation command line was
$ $0 $@ $ $0 $@
@ -2133,7 +2133,7 @@ fi
# Define the identity of the package. # Define the identity of the package.
PACKAGE='parallel' PACKAGE='parallel'
VERSION='20110101' VERSION='20110119'
cat >>confdefs.h <<_ACEOF cat >>confdefs.h <<_ACEOF
@ -2684,7 +2684,7 @@ cat >>$CONFIG_STATUS <<\_ACEOF || ac_write_fail=1
# report actual input values of CONFIG_FILES etc. instead of their # report actual input values of CONFIG_FILES etc. instead of their
# values after options handling. # values after options handling.
ac_log=" ac_log="
This file was extended by parallel $as_me 20110101, which was This file was extended by parallel $as_me 20110119, which was
generated by GNU Autoconf 2.67. Invocation command line was generated by GNU Autoconf 2.67. Invocation command line was
CONFIG_FILES = $CONFIG_FILES CONFIG_FILES = $CONFIG_FILES
@ -2746,7 +2746,7 @@ _ACEOF
cat >>$CONFIG_STATUS <<_ACEOF || ac_write_fail=1 cat >>$CONFIG_STATUS <<_ACEOF || ac_write_fail=1
ac_cs_config="`$as_echo "$ac_configure_args" | sed 's/^ //; s/[\\""\`\$]/\\\\&/g'`" ac_cs_config="`$as_echo "$ac_configure_args" | sed 's/^ //; s/[\\""\`\$]/\\\\&/g'`"
ac_cs_version="\\ ac_cs_version="\\
parallel config.status 20110101 parallel config.status 20110119
configured by $0, generated by GNU Autoconf 2.67, configured by $0, generated by GNU Autoconf 2.67,
with options \\"\$ac_cs_config\\" with options \\"\$ac_cs_config\\"

View file

@ -1,4 +1,4 @@
AC_INIT([parallel], [20110101], [bug-parallel@gnu.org]) AC_INIT([parallel], [20110119], [bug-parallel@gnu.org])
AM_INIT_AUTOMAKE([-Wall -Werror foreign]) AM_INIT_AUTOMAKE([-Wall -Werror foreign])
AC_CONFIG_HEADERS([config.h]) AC_CONFIG_HEADERS([config.h])
AC_CONFIG_FILES([ AC_CONFIG_FILES([

View file

@ -43,7 +43,7 @@ make dist-bzip2
YYYYMMDD=`yyyymmdd` YYYYMMDD=`yyyymmdd`
cp parallel-$YYYYMMDD.tar.bz2 /tmp cp parallel-$YYYYMMDD.tar.bz2 /tmp
cd /tmp pushd /tmp
tar xjvf parallel-$YYYYMMDD.tar.bz2 tar xjvf parallel-$YYYYMMDD.tar.bz2
cd parallel-$YYYYMMDD cd parallel-$YYYYMMDD
./configure && make -j && sudo make -j install ./configure && make -j && sudo make -j install
@ -64,8 +64,9 @@ echo put parallel-$YYYYMMDD.tar.bz2{,.sig,*asc} | ncftp ftp://ftp-upload.gnu.org
== Download and test == == Download and test ==
cd /tmp pushd /tmp
wget http://ftp.gnu.org/gnu/parallel/parallel-$YYYYMMDD.tar.bz2 wget http://ftp.gnu.org/gnu/parallel/parallel-$YYYYMMDD.tar.bz2
#wget http://alpha.gnu.org/gnu/parallel/parallel-$YYYYMMDD.tar.bz2
tar xjvf parallel-$YYYYMMDD.tar.bz2 tar xjvf parallel-$YYYYMMDD.tar.bz2
cd parallel-$YYYYMMDD cd parallel-$YYYYMMDD
./configure ./configure

View file

@ -236,7 +236,7 @@ B<parallel>(1), B<nice>(1)
use strict; use strict;
use Getopt::Long; use Getopt::Long;
$Global::progname="niceload"; $Global::progname="niceload";
$Global::version = 20110101; $Global::version = 20110119;
Getopt::Long::Configure("bundling","require_order"); Getopt::Long::Configure("bundling","require_order");
get_options_from_array(\@ARGV) || die_usage(); get_options_from_array(\@ARGV) || die_usage();
if($::opt_version) { if($::opt_version) {

View file

@ -78,42 +78,56 @@ if($::opt_halt_on_error) {
sub spreadstdin { sub spreadstdin {
# read a record # read a record
# print it to the first jobs that is ready # print it to the first jobs that is ready
my @jobs = values %Global::running;
my $first = 0; my $first = 0;
my $second = 0; my $second = 0;
my $sleep = 0.1; my $sleep = 0.1;
my $max_sleep = 0.1; my $max_sleep = 0.1;
my $record; my $record;
my $rest = ""; my $partial = "";
my $buf = ""; my $buf = "";
my $block_max = 1000_000; my $block_max = 1000_000;
my $rec_start = undef; my $rec_start = undef;
my $rec_end = undef; my $rec_end = undef;
my $err; my $err;
while(read(STDIN,$buf,$block_max)) { while(read(STDIN,$buf,$block_max)) {
$record = $rest.$buf; $record = $partial.$buf;
if(($::opt_recstart and $record =~ s/($::opt_recstart.*?)$//smo) if(eof STDIN) {
or # There is no partial record at the end of file
($::opt_recend and $record =~ s/$::opt_recend(.*?)$//smo)) { $partial = "";
$rest = $1; } else {
if(($::opt_recstart and $record =~ s/(.*)($::opt_recstart.*?)$/$1/os)
or
($::opt_recend and $record =~ s/(.*$::opt_recend)(.*?)$/$1/os)) {
$partial = $2;
}
} }
::debug("Read record: ".length($record)."\n"); ::debug("Read record: ".length($record)."\n");
write_record: while(defined $record) { write_record: while(defined $record) {
for my $job (@jobs) { # Sorting according to sequence is necessary for -k to work
for my $job (sort { $a->seq() <=> $b->seq() } values %Global::running) {
if($job->remaining()) { if($job->remaining()) {
# Part of the job's last record has not finished being written
$job->complete_write(); $job->complete_write();
} else { } else {
$job->write($record); if($job->datawritten() > 0) {
$record = undef; # if opt -k:
$first++; # close stdin
$sleep=1; # start another (maybe done by start more?)
last write_record; my $fh = $job->stdin();
close $fh;
} else {
$job->write($record);
$record = undef;
$first++;
$sleep=1;
last write_record;
}
} }
} }
# Rotate jobs to spread the input # Rotate jobs to spread the input
@jobs = ($jobs[$#jobs],@jobs[0..$#jobs-1]); # @jobs = ($jobs[$#jobs],@jobs[0..$#jobs-1]);
usleep($sleep); usleep($sleep);
$sleep *= 1.1; $sleep *= 1.1; # exponential back off
$max_sleep = max($max_sleep, $sleep); $max_sleep = max($max_sleep, $sleep);
$second++; $second++;
} }
@ -121,17 +135,18 @@ sub spreadstdin {
my $flush_done; my $flush_done;
do { do {
$flush_done = 1; $flush_done = 1;
for my $job (@jobs) { # Make sure everything is written to the jobs
for my $job (values %Global::running) {
if($job->remaining()) { if($job->remaining()) {
$job->complete_write(); $job->complete_write();
$flush_done = 0; $flush_done = 0;
} }
} }
usleep($sleep); usleep($sleep);
$sleep *= 1.1; $sleep *= 1.1; # exponential back off
} while (not $flush_done); } while (not $flush_done);
for my $job (@jobs) { for my $job (values %Global::running) {
my $fh = $job->stdin(); my $fh = $job->stdin();
close $fh; close $fh;
} }
@ -228,6 +243,7 @@ sub get_options_from_array {
"spreadstdin" => \$::opt_spreadstdin, "spreadstdin" => \$::opt_spreadstdin,
"recstart=s" => \$::opt_recstart, "recstart=s" => \$::opt_recstart,
"recend=s" => \$::opt_recend, "recend=s" => \$::opt_recend,
"files" => \$::opt_files,
# xargs-compatibility - implemented, man, testsuite # xargs-compatibility - implemented, man, testsuite
"max-procs|P=s" => \$::opt_P, "max-procs|P=s" => \$::opt_P,
"delimiter|d=s" => \$::opt_d, "delimiter|d=s" => \$::opt_d,
@ -269,7 +285,7 @@ sub get_options_from_array {
sub parse_options { sub parse_options {
# Returns: N/A # Returns: N/A
# Defaults: # Defaults:
$Global::version = 20110118; $Global::version = 20110119;
$Global::progname = 'parallel'; $Global::progname = 'parallel';
$Global::infinity = 2**31; $Global::infinity = 2**31;
$Global::debug = 0; $Global::debug = 0;
@ -420,7 +436,7 @@ sub parse_options {
} else { } else {
print $Global::joblog print $Global::joblog
join("\t", "Seq", "Host", "Starttime", "Runtime", join("\t", "Seq", "Host", "Starttime", "Runtime",
"Trans", "Return", "Command" "Send", "Receive", "Command"
). "\n"; ). "\n";
} }
} }
@ -2227,8 +2243,10 @@ sub new {
'seq' => undef, # $PARALLEL_SEQ 'seq' => undef, # $PARALLEL_SEQ
'stdin' => undef, # filehandle for stdin (used for --spreadstdin) 'stdin' => undef, # filehandle for stdin (used for --spreadstdin)
'stdout' => undef, # filehandle for stdout (used for --group) 'stdout' => undef, # filehandle for stdout (used for --group)
'stdoutfilename' => undef, # filename for writing stdout to (used for --files)
'stderr' => undef, # filehandle for stderr (used for --group) 'stderr' => undef, # filehandle for stderr (used for --group)
'remaining' => "", # remaining data not sent to stdin (used for --spreadstdin) 'remaining' => "", # remaining data not sent to stdin (used for --spreadstdin)
'datawritten' => 0, # amount of data sent via stdin (used for --spreadstdin)
'transfersize' => 0, # size of files using --transfer 'transfersize' => 0, # size of files using --transfer
'returnsize' => 0, # size of files using --return 'returnsize' => 0, # size of files using --return
'pid' => undef, 'pid' => undef,
@ -2268,6 +2286,17 @@ sub stdout {
return $self->{'stdout'}; return $self->{'stdout'};
} }
sub set_stdoutfilename {
my $self = shift;
my $stdoutfilename = shift;
$self->{'stdoutfilename'} = $stdoutfilename;
}
sub stdoutfilename {
my $self = shift;
return $self->{'stdoutfilename'};
}
sub stderr { sub stderr {
my $self = shift; my $self = shift;
return $self->{'stderr'}; return $self->{'stderr'};
@ -2311,6 +2340,7 @@ sub complete_write {
} else { } else {
# Remove the part that was written # Remove the part that was written
substr($self->{'remaining'},0,$len) = ""; substr($self->{'remaining'},0,$len) = "";
$self->{'datawritten'} += $len;
} }
} }
@ -2323,6 +2353,10 @@ sub remaining {
} }
} }
sub datawritten {
my $self = shift;
return $self->{'datawritten'};
}
sub pid { sub pid {
my $self = shift; my $self = shift;
@ -2489,7 +2523,9 @@ sub transfer {
for my $arg (@$record) { for my $arg (@$record) {
CORE::push @transfer, $arg->orig(); CORE::push @transfer, $arg->orig();
# filesize # filesize
$self->{'transfersize'} += (stat($arg->orig()))[7]; if(-e $arg->orig()) {
$self->{'transfersize'} += (stat($arg->orig()))[7];
}
} }
} }
} }
@ -2691,7 +2727,8 @@ sub start {
# To group we create temporary files for STDOUT and STDERR # To group we create temporary files for STDOUT and STDERR
# To avoid the cleanup unlink the files immediately (but keep them open) # To avoid the cleanup unlink the files immediately (but keep them open)
($outfh,$name) = ::tempfile(SUFFIX => ".par"); ($outfh,$name) = ::tempfile(SUFFIX => ".par");
unlink $name; $job->set_stdoutfilename($name);
$::opt_files or unlink $name;
($errfh,$name) = ::tempfile(SUFFIX => ".par"); ($errfh,$name) = ::tempfile(SUFFIX => ".par");
unlink $name; unlink $name;
@ -2815,7 +2852,7 @@ sub print {
# so flush to avoid STDOUT being buffered # so flush to avoid STDOUT being buffered
flush STDOUT; flush STDOUT;
} }
seek $_, 0, 0 for $out, $err; seek $err, 0, 0;
if($Global::debug) { if($Global::debug) {
print STDERR "ERR:\n"; print STDERR "ERR:\n";
} }
@ -2824,14 +2861,20 @@ sub print {
print STDERR $buf; print STDERR $buf;
} }
flush STDERR; flush STDERR;
if($Global::debug) {
print STDOUT "OUT:\n"; if($::opt_files) {
print STDOUT $self->{'stdoutfilename'},"\n";
} else {
seek $out, 0, 0;
if($Global::debug) {
print STDOUT "OUT:\n";
}
while(sysread($out,$buf,1000_000)) {
print STDOUT $buf;
}
flush STDOUT;
::debug("<<joboutput $command\n");
} }
while(sysread($out,$buf,1000_000)) {
print STDOUT $buf;
}
flush STDOUT;
::debug("<<joboutput $command\n");
close $out; close $out;
close $err; close $err;
} }

View file

@ -393,11 +393,12 @@ use B<-I> instead.
=item B<--joblog> I<logfile> (beta testing) =item B<--joblog> I<logfile> (beta testing)
Logfile for executed jobs. Saved a list of the executed jobs to Logfile for executed jobs. Saved a list of the executed jobs to
I<logfile> in the following format: sequence number, sshlogin, start I<logfile> in the following TAB separated format: sequence number,
time as seconds since epoch, run time in seconds, bytes in files sshlogin, start time as seconds since epoch, run time in seconds,
transfered, bytes in files returned, command run. bytes in files transfered, bytes in files returned, command run.
To convert the times into ISO-8601 strict do: To convert the times into ISO-8601 strict do:
B<perl -a -F"\t" -ne 'chomp($F[2]=`date -d \@$F[2] +%FT%T`); print join("\t",@F)'> B<perl -a -F"\t" -ne 'chomp($F[2]=`date -d \@$F[2] +%FT%T`); print join("\t",@F)'>
@ -410,7 +411,7 @@ B<perl -a -F"\t" -ne 'chomp($F[2]=`date -d \@$F[2] +%FT%T`); print join("\t",@F)
=item B<-P> I<N> =item B<-P> I<N>
Number of jobslots. Run up to N jobs in parallel. 0 means as many as Number of jobslots. Run up to N jobs in parallel. 0 means as many as
possible. Default is 9. possible. Default is +0 which will run one job per CPU core.
If B<--semaphore> is set default is 1 thus making a mutex. If B<--semaphore> is set default is 1 thus making a mutex.
@ -2477,7 +2478,7 @@ you file a bug-report.
=head1 REPORTING BUGS =head1 REPORTING BUGS
Report bugs to <bug-parallel@gnu.org>. Report bugs to <bug-parallel@gnu.org> or https://savannah.gnu.org/bugs/?func=additem&group=parallel
=head1 AUTHOR =head1 AUTHOR

View file

@ -528,7 +528,7 @@ $Global::Initfile && unlink $Global::Initfile;
exit ($err); exit ($err);
sub parse_options { sub parse_options {
$Global::version = 20110101; $Global::version = 20110119;
$Global::progname = 'sql'; $Global::progname = 'sql';
# This must be done first as this may exec myself # This must be done first as this may exec myself

View file

@ -14,3 +14,21 @@ seq 1 1000| parallel -j1 --spreadstdin cat "|cat "|wc -c
seq 1 10000| parallel -j10 --spreadstdin cat "|cat "|wc -c seq 1 10000| parallel -j10 --spreadstdin cat "|cat "|wc -c
seq 1 100000| parallel -j1 --spreadstdin cat "|cat "|wc -c seq 1 100000| parallel -j1 --spreadstdin cat "|cat "|wc -c
seq 1 1000000| parallel -j10 --spreadstdin cat "|cat "|wc -c seq 1 1000000| parallel -j10 --spreadstdin cat "|cat "|wc -c
seq 1 10 | src/parallel --recend "\n" -j1 --spreadstdin gzip -9 >/tmp/foo.gz
echo '### Test --spreadstdin - similar to the failing below'
seq 1 100000 | parallel --recend "\n" -j10 --spreadstdin gzip -9 >/tmp/foo.gz
diff <(seq 1 100000) <(zcat /tmp/foo.gz |sort -n)
diff <(seq 1 100000|wc -c) <(zcat /tmp/foo.gz |wc -c)
echo '### Test --spreadstdin - this failed during devel'
seq 1 1000000 | md5sum
# Should give same result when sorted
seq 1 1000000 | parallel --recend "\n" -j10 --spreadstdin gzip -9 | zcat | sort -n | md5sum
echo '### Test --spreadstdin -k'
seq 1 1000000 | parallel -k --recend "\n" -j10 --spreadstdin gzip -9 | zcat | md5sum
echo '### Test --spreadstdin --files'
seq 1 1000000 | shuf | parallel --files --recend "\n" -j10 --spreadstdin sort | parallel -X sort -m | md5sum