diff --git a/configure b/configure index fc8b46d9..a68799fc 100755 --- a/configure +++ b/configure @@ -1,6 +1,6 @@ #! /bin/sh # Guess values for system-dependent variables and create Makefiles. -# Generated by GNU Autoconf 2.67 for parallel 20110101. +# Generated by GNU Autoconf 2.67 for parallel 20110119. # # Report bugs to . # @@ -551,8 +551,8 @@ MAKEFLAGS= # Identity of this package. PACKAGE_NAME='parallel' PACKAGE_TARNAME='parallel' -PACKAGE_VERSION='20110101' -PACKAGE_STRING='parallel 20110101' +PACKAGE_VERSION='20110119' +PACKAGE_STRING='parallel 20110119' PACKAGE_BUGREPORT='bug-parallel@gnu.org' PACKAGE_URL='' @@ -1168,7 +1168,7 @@ if test "$ac_init_help" = "long"; then # Omit some internal or obsolete options to make the list less imposing. # This message is too long to be a string in the A/UX 3.1 sh. cat <<_ACEOF -\`configure' configures parallel 20110101 to adapt to many kinds of systems. +\`configure' configures parallel 20110119 to adapt to many kinds of systems. Usage: $0 [OPTION]... [VAR=VALUE]... @@ -1234,7 +1234,7 @@ fi if test -n "$ac_init_help"; then case $ac_init_help in - short | recursive ) echo "Configuration of parallel 20110101:";; + short | recursive ) echo "Configuration of parallel 20110119:";; esac cat <<\_ACEOF @@ -1301,7 +1301,7 @@ fi test -n "$ac_init_help" && exit $ac_status if $ac_init_version; then cat <<\_ACEOF -parallel configure 20110101 +parallel configure 20110119 generated by GNU Autoconf 2.67 Copyright (C) 2010 Free Software Foundation, Inc. @@ -1318,7 +1318,7 @@ cat >config.log <<_ACEOF This file contains any messages produced by compilers while running configure, to aid debugging if configure makes a mistake. -It was created by parallel $as_me 20110101, which was +It was created by parallel $as_me 20110119, which was generated by GNU Autoconf 2.67. Invocation command line was $ $0 $@ @@ -2133,7 +2133,7 @@ fi # Define the identity of the package. PACKAGE='parallel' - VERSION='20110101' + VERSION='20110119' cat >>confdefs.h <<_ACEOF @@ -2684,7 +2684,7 @@ cat >>$CONFIG_STATUS <<\_ACEOF || ac_write_fail=1 # report actual input values of CONFIG_FILES etc. instead of their # values after options handling. ac_log=" -This file was extended by parallel $as_me 20110101, which was +This file was extended by parallel $as_me 20110119, which was generated by GNU Autoconf 2.67. Invocation command line was CONFIG_FILES = $CONFIG_FILES @@ -2746,7 +2746,7 @@ _ACEOF cat >>$CONFIG_STATUS <<_ACEOF || ac_write_fail=1 ac_cs_config="`$as_echo "$ac_configure_args" | sed 's/^ //; s/[\\""\`\$]/\\\\&/g'`" ac_cs_version="\\ -parallel config.status 20110101 +parallel config.status 20110119 configured by $0, generated by GNU Autoconf 2.67, with options \\"\$ac_cs_config\\" diff --git a/configure.ac b/configure.ac index 299e6931..9b91bffe 100644 --- a/configure.ac +++ b/configure.ac @@ -1,4 +1,4 @@ -AC_INIT([parallel], [20110101], [bug-parallel@gnu.org]) +AC_INIT([parallel], [20110119], [bug-parallel@gnu.org]) AM_INIT_AUTOMAKE([-Wall -Werror foreign]) AC_CONFIG_HEADERS([config.h]) AC_CONFIG_FILES([ diff --git a/doc/release_new_version b/doc/release_new_version index 8ebd4ddf..44a5b343 100644 --- a/doc/release_new_version +++ b/doc/release_new_version @@ -43,7 +43,7 @@ make dist-bzip2 YYYYMMDD=`yyyymmdd` cp parallel-$YYYYMMDD.tar.bz2 /tmp -cd /tmp +pushd /tmp tar xjvf parallel-$YYYYMMDD.tar.bz2 cd parallel-$YYYYMMDD ./configure && make -j && sudo make -j install @@ -64,8 +64,9 @@ echo put parallel-$YYYYMMDD.tar.bz2{,.sig,*asc} | ncftp ftp://ftp-upload.gnu.org == Download and test == -cd /tmp +pushd /tmp wget http://ftp.gnu.org/gnu/parallel/parallel-$YYYYMMDD.tar.bz2 +#wget http://alpha.gnu.org/gnu/parallel/parallel-$YYYYMMDD.tar.bz2 tar xjvf parallel-$YYYYMMDD.tar.bz2 cd parallel-$YYYYMMDD ./configure diff --git a/src/niceload b/src/niceload index a9e28052..d09817d5 100755 --- a/src/niceload +++ b/src/niceload @@ -236,7 +236,7 @@ B(1), B(1) use strict; use Getopt::Long; $Global::progname="niceload"; -$Global::version = 20110101; +$Global::version = 20110119; Getopt::Long::Configure("bundling","require_order"); get_options_from_array(\@ARGV) || die_usage(); if($::opt_version) { diff --git a/src/parallel b/src/parallel index 0d3cca24..02198141 100755 --- a/src/parallel +++ b/src/parallel @@ -78,42 +78,56 @@ if($::opt_halt_on_error) { sub spreadstdin { # read a record # print it to the first jobs that is ready - my @jobs = values %Global::running; my $first = 0; my $second = 0; my $sleep = 0.1; my $max_sleep = 0.1; my $record; - my $rest = ""; + my $partial = ""; my $buf = ""; my $block_max = 1000_000; my $rec_start = undef; my $rec_end = undef; my $err; while(read(STDIN,$buf,$block_max)) { - $record = $rest.$buf; - if(($::opt_recstart and $record =~ s/($::opt_recstart.*?)$//smo) - or - ($::opt_recend and $record =~ s/$::opt_recend(.*?)$//smo)) { - $rest = $1; + $record = $partial.$buf; + if(eof STDIN) { + # There is no partial record at the end of file + $partial = ""; + } else { + if(($::opt_recstart and $record =~ s/(.*)($::opt_recstart.*?)$/$1/os) + or + ($::opt_recend and $record =~ s/(.*$::opt_recend)(.*?)$/$1/os)) { + $partial = $2; + } } ::debug("Read record: ".length($record)."\n"); write_record: while(defined $record) { - for my $job (@jobs) { + # Sorting according to sequence is necessary for -k to work + for my $job (sort { $a->seq() <=> $b->seq() } values %Global::running) { if($job->remaining()) { + # Part of the job's last record has not finished being written $job->complete_write(); } else { - $job->write($record); - $record = undef; - $first++; - $sleep=1; - last write_record; + if($job->datawritten() > 0) { + # if opt -k: + # close stdin + # start another (maybe done by start more?) + my $fh = $job->stdin(); + close $fh; + } else { + $job->write($record); + $record = undef; + $first++; + $sleep=1; + last write_record; + } } } # Rotate jobs to spread the input - @jobs = ($jobs[$#jobs],@jobs[0..$#jobs-1]); +# @jobs = ($jobs[$#jobs],@jobs[0..$#jobs-1]); usleep($sleep); - $sleep *= 1.1; + $sleep *= 1.1; # exponential back off $max_sleep = max($max_sleep, $sleep); $second++; } @@ -121,17 +135,18 @@ sub spreadstdin { my $flush_done; do { $flush_done = 1; - for my $job (@jobs) { + # Make sure everything is written to the jobs + for my $job (values %Global::running) { if($job->remaining()) { $job->complete_write(); $flush_done = 0; } } usleep($sleep); - $sleep *= 1.1; + $sleep *= 1.1; # exponential back off } while (not $flush_done); - for my $job (@jobs) { + for my $job (values %Global::running) { my $fh = $job->stdin(); close $fh; } @@ -228,6 +243,7 @@ sub get_options_from_array { "spreadstdin" => \$::opt_spreadstdin, "recstart=s" => \$::opt_recstart, "recend=s" => \$::opt_recend, + "files" => \$::opt_files, # xargs-compatibility - implemented, man, testsuite "max-procs|P=s" => \$::opt_P, "delimiter|d=s" => \$::opt_d, @@ -269,7 +285,7 @@ sub get_options_from_array { sub parse_options { # Returns: N/A # Defaults: - $Global::version = 20110118; + $Global::version = 20110119; $Global::progname = 'parallel'; $Global::infinity = 2**31; $Global::debug = 0; @@ -420,7 +436,7 @@ sub parse_options { } else { print $Global::joblog join("\t", "Seq", "Host", "Starttime", "Runtime", - "Trans", "Return", "Command" + "Send", "Receive", "Command" ). "\n"; } } @@ -2227,8 +2243,10 @@ sub new { 'seq' => undef, # $PARALLEL_SEQ 'stdin' => undef, # filehandle for stdin (used for --spreadstdin) 'stdout' => undef, # filehandle for stdout (used for --group) + 'stdoutfilename' => undef, # filename for writing stdout to (used for --files) 'stderr' => undef, # filehandle for stderr (used for --group) 'remaining' => "", # remaining data not sent to stdin (used for --spreadstdin) + 'datawritten' => 0, # amount of data sent via stdin (used for --spreadstdin) 'transfersize' => 0, # size of files using --transfer 'returnsize' => 0, # size of files using --return 'pid' => undef, @@ -2268,6 +2286,17 @@ sub stdout { return $self->{'stdout'}; } +sub set_stdoutfilename { + my $self = shift; + my $stdoutfilename = shift; + $self->{'stdoutfilename'} = $stdoutfilename; +} + +sub stdoutfilename { + my $self = shift; + return $self->{'stdoutfilename'}; +} + sub stderr { my $self = shift; return $self->{'stderr'}; @@ -2311,6 +2340,7 @@ sub complete_write { } else { # Remove the part that was written substr($self->{'remaining'},0,$len) = ""; + $self->{'datawritten'} += $len; } } @@ -2323,6 +2353,10 @@ sub remaining { } } +sub datawritten { + my $self = shift; + return $self->{'datawritten'}; +} sub pid { my $self = shift; @@ -2489,7 +2523,9 @@ sub transfer { for my $arg (@$record) { CORE::push @transfer, $arg->orig(); # filesize - $self->{'transfersize'} += (stat($arg->orig()))[7]; + if(-e $arg->orig()) { + $self->{'transfersize'} += (stat($arg->orig()))[7]; + } } } } @@ -2691,7 +2727,8 @@ sub start { # To group we create temporary files for STDOUT and STDERR # To avoid the cleanup unlink the files immediately (but keep them open) ($outfh,$name) = ::tempfile(SUFFIX => ".par"); - unlink $name; + $job->set_stdoutfilename($name); + $::opt_files or unlink $name; ($errfh,$name) = ::tempfile(SUFFIX => ".par"); unlink $name; @@ -2815,7 +2852,7 @@ sub print { # so flush to avoid STDOUT being buffered flush STDOUT; } - seek $_, 0, 0 for $out, $err; + seek $err, 0, 0; if($Global::debug) { print STDERR "ERR:\n"; } @@ -2824,14 +2861,20 @@ sub print { print STDERR $buf; } flush STDERR; - if($Global::debug) { - print STDOUT "OUT:\n"; + + if($::opt_files) { + print STDOUT $self->{'stdoutfilename'},"\n"; + } else { + seek $out, 0, 0; + if($Global::debug) { + print STDOUT "OUT:\n"; + } + while(sysread($out,$buf,1000_000)) { + print STDOUT $buf; + } + flush STDOUT; + ::debug("< instead. =item B<--joblog> I (beta testing) Logfile for executed jobs. Saved a list of the executed jobs to -I in the following format: sequence number, sshlogin, start -time as seconds since epoch, run time in seconds, bytes in files -transfered, bytes in files returned, command run. +I in the following TAB separated format: sequence number, +sshlogin, start time as seconds since epoch, run time in seconds, +bytes in files transfered, bytes in files returned, command run. To convert the times into ISO-8601 strict do: + B @@ -410,7 +411,7 @@ B I Number of jobslots. Run up to N jobs in parallel. 0 means as many as -possible. Default is 9. +possible. Default is +0 which will run one job per CPU core. If B<--semaphore> is set default is 1 thus making a mutex. @@ -2477,7 +2478,7 @@ you file a bug-report. =head1 REPORTING BUGS -Report bugs to . +Report bugs to or https://savannah.gnu.org/bugs/?func=additem&group=parallel =head1 AUTHOR diff --git a/src/sql b/src/sql index 18997abe..176dee22 100755 --- a/src/sql +++ b/src/sql @@ -528,7 +528,7 @@ $Global::Initfile && unlink $Global::Initfile; exit ($err); sub parse_options { - $Global::version = 20110101; + $Global::version = 20110119; $Global::progname = 'sql'; # This must be done first as this may exec myself diff --git a/testsuite/tests-to-run/test49.sh b/testsuite/tests-to-run/test49.sh index eac51daa..884e6d00 100644 --- a/testsuite/tests-to-run/test49.sh +++ b/testsuite/tests-to-run/test49.sh @@ -14,3 +14,21 @@ seq 1 1000| parallel -j1 --spreadstdin cat "|cat "|wc -c seq 1 10000| parallel -j10 --spreadstdin cat "|cat "|wc -c seq 1 100000| parallel -j1 --spreadstdin cat "|cat "|wc -c seq 1 1000000| parallel -j10 --spreadstdin cat "|cat "|wc -c + +seq 1 10 | src/parallel --recend "\n" -j1 --spreadstdin gzip -9 >/tmp/foo.gz + +echo '### Test --spreadstdin - similar to the failing below' +seq 1 100000 | parallel --recend "\n" -j10 --spreadstdin gzip -9 >/tmp/foo.gz +diff <(seq 1 100000) <(zcat /tmp/foo.gz |sort -n) +diff <(seq 1 100000|wc -c) <(zcat /tmp/foo.gz |wc -c) + +echo '### Test --spreadstdin - this failed during devel' +seq 1 1000000 | md5sum +# Should give same result when sorted +seq 1 1000000 | parallel --recend "\n" -j10 --spreadstdin gzip -9 | zcat | sort -n | md5sum + +echo '### Test --spreadstdin -k' +seq 1 1000000 | parallel -k --recend "\n" -j10 --spreadstdin gzip -9 | zcat | md5sum + +echo '### Test --spreadstdin --files' +seq 1 1000000 | shuf | parallel --files --recend "\n" -j10 --spreadstdin sort | parallel -X sort -m | md5sum