diff --git a/src/parallel b/src/parallel index 5c691ae5..0d3cca24 100755 --- a/src/parallel +++ b/src/parallel @@ -9,7 +9,6 @@ use File::Temp qw(tempfile tempdir); use Getopt::Long; use strict; use Carp; -use Time::HiRes qw(usleep); $::oodebug=0; $Global::original_sigterm = $SIG{TERM}; @@ -80,20 +79,27 @@ sub spreadstdin { # read a record # print it to the first jobs that is ready my @jobs = values %Global::running; - my $first; - my $second; - my $sleep = 1; - while() { - my @rec; - for(my $t = 0; $t < 1000 and not eof(STDIN); $t++) { - push @rec, scalar(); + my $first = 0; + my $second = 0; + my $sleep = 0.1; + my $max_sleep = 0.1; + my $record; + my $rest = ""; + my $buf = ""; + my $block_max = 1000_000; + my $rec_start = undef; + my $rec_end = undef; + my $err; + while(read(STDIN,$buf,$block_max)) { + $record = $rest.$buf; + if(($::opt_recstart and $record =~ s/($::opt_recstart.*?)$//smo) + or + ($::opt_recend and $record =~ s/$::opt_recend(.*?)$//smo)) { + $rest = $1; } - my $record = join("",@rec); - # Rotate jobs to spread the input - #@jobs = ($jobs[$#jobs],@jobs[0..$#jobs-1]); + ::debug("Read record: ".length($record)."\n"); write_record: while(defined $record) { for my $job (@jobs) { -# ::debug("Looking at ",$job); if($job->remaining()) { $job->complete_write(); } else { @@ -104,18 +110,33 @@ sub spreadstdin { last write_record; } } + # Rotate jobs to spread the input @jobs = ($jobs[$#jobs],@jobs[0..$#jobs-1]); usleep($sleep); - $sleep *=1.1; + $sleep *= 1.1; + $max_sleep = max($max_sleep, $sleep); $second++; } } - for my $job (values %Global::running) { + my $flush_done; + do { + $flush_done = 1; + for my $job (@jobs) { + if($job->remaining()) { + $job->complete_write(); + $flush_done = 0; + } + } + usleep($sleep); + $sleep *= 1.1; + } while (not $flush_done); + + for my $job (@jobs) { my $fh = $job->stdin(); close $fh; } $Global::start_no_new_jobs = 1; - print STDERR $first," ",$second,"\n"; + ::debug("Blocks send directly: $first Delays: $second Max sleep ms: $max_sleep\n"); } sub acquire_semaphore { @@ -205,6 +226,8 @@ sub get_options_from_array { "trim=s" => \$::opt_trim, "profile|J=s" => \$::opt_profile, "spreadstdin" => \$::opt_spreadstdin, + "recstart=s" => \$::opt_recstart, + "recend=s" => \$::opt_recend, # xargs-compatibility - implemented, man, testsuite "max-procs|P=s" => \$::opt_P, "delimiter|d=s" => \$::opt_d, @@ -684,6 +707,12 @@ sub hostname { return $Private::hostname; } +sub usleep { + my $ms = shift; + select(undef, undef, undef, $ms/1000); +} + + sub __RUNNING_AND_PRINTING_THE_JOBS__ {} # Variable structure: @@ -2275,7 +2304,6 @@ sub write { sub complete_write { my $self = shift; my $in = $self->{'stdin'}; - ::debug("complete_write\n"); my $len = syswrite($in,$self->{'remaining'}); if (!defined($len) && $! == ::EAGAIN) { diff --git a/testsuite/tests-to-run/test49.sh b/testsuite/tests-to-run/test49.sh new file mode 100644 index 00000000..eac51daa --- /dev/null +++ b/testsuite/tests-to-run/test49.sh @@ -0,0 +1,16 @@ +#!/bin/bash + +echo '### Test --spreadstdin - more procs than args' +rm /tmp/parallel.ss.* +seq 1 5 | stdout parallel -j 10 --spreadstdin 'cat >/tmp/parallel.ss.$PARALLEL_SEQ' >/dev/null +cat /tmp/parallel.ss.* + +echo '### Test --spreadstdin - more args than procs' +rm /tmp/parallel.ss.* +seq 1 10 | stdout parallel -j 5 --spreadstdin 'cat >/tmp/parallel.ss.$PARALLEL_SEQ' >/dev/null +cat /tmp/parallel.ss.* + +seq 1 1000| parallel -j1 --spreadstdin cat "|cat "|wc -c +seq 1 10000| parallel -j10 --spreadstdin cat "|cat "|wc -c +seq 1 100000| parallel -j1 --spreadstdin cat "|cat "|wc -c +seq 1 1000000| parallel -j10 --spreadstdin cat "|cat "|wc -c diff --git a/testsuite/wanted-results/test49 b/testsuite/wanted-results/test49 new file mode 100644 index 00000000..25603fbb --- /dev/null +++ b/testsuite/wanted-results/test49 @@ -0,0 +1,21 @@ +### Test --spreadstdin - more procs than args +1 +2 +3 +4 +5 +### Test --spreadstdin - more args than procs +1 +2 +3 +4 +5 +6 +7 +8 +9 +10 +3893 +48894 +588895 +6888896