parallel: simple test for --spreadstdin.

--recstart and --recend implemented.
Time::Hires no longer needed.
This commit is contained in:
Ole Tange 2011-01-18 23:56:28 +01:00
parent 48454a36ea
commit 1d45092522
3 changed files with 81 additions and 16 deletions

View file

@ -9,7 +9,6 @@ use File::Temp qw(tempfile tempdir);
use Getopt::Long; use Getopt::Long;
use strict; use strict;
use Carp; use Carp;
use Time::HiRes qw(usleep);
$::oodebug=0; $::oodebug=0;
$Global::original_sigterm = $SIG{TERM}; $Global::original_sigterm = $SIG{TERM};
@ -80,20 +79,27 @@ sub spreadstdin {
# read a record # read a record
# print it to the first jobs that is ready # print it to the first jobs that is ready
my @jobs = values %Global::running; my @jobs = values %Global::running;
my $first; my $first = 0;
my $second; my $second = 0;
my $sleep = 1; my $sleep = 0.1;
while(<STDIN>) { my $max_sleep = 0.1;
my @rec; my $record;
for(my $t = 0; $t < 1000 and not eof(STDIN); $t++) { my $rest = "";
push @rec, scalar(<STDIN>); my $buf = "";
my $block_max = 1000_000;
my $rec_start = undef;
my $rec_end = undef;
my $err;
while(read(STDIN,$buf,$block_max)) {
$record = $rest.$buf;
if(($::opt_recstart and $record =~ s/($::opt_recstart.*?)$//smo)
or
($::opt_recend and $record =~ s/$::opt_recend(.*?)$//smo)) {
$rest = $1;
} }
my $record = join("",@rec); ::debug("Read record: ".length($record)."\n");
# Rotate jobs to spread the input
#@jobs = ($jobs[$#jobs],@jobs[0..$#jobs-1]);
write_record: while(defined $record) { write_record: while(defined $record) {
for my $job (@jobs) { for my $job (@jobs) {
# ::debug("Looking at ",$job);
if($job->remaining()) { if($job->remaining()) {
$job->complete_write(); $job->complete_write();
} else { } else {
@ -104,18 +110,33 @@ sub spreadstdin {
last write_record; last write_record;
} }
} }
# Rotate jobs to spread the input
@jobs = ($jobs[$#jobs],@jobs[0..$#jobs-1]); @jobs = ($jobs[$#jobs],@jobs[0..$#jobs-1]);
usleep($sleep); usleep($sleep);
$sleep *= 1.1; $sleep *= 1.1;
$max_sleep = max($max_sleep, $sleep);
$second++; $second++;
} }
} }
for my $job (values %Global::running) { my $flush_done;
do {
$flush_done = 1;
for my $job (@jobs) {
if($job->remaining()) {
$job->complete_write();
$flush_done = 0;
}
}
usleep($sleep);
$sleep *= 1.1;
} while (not $flush_done);
for my $job (@jobs) {
my $fh = $job->stdin(); my $fh = $job->stdin();
close $fh; close $fh;
} }
$Global::start_no_new_jobs = 1; $Global::start_no_new_jobs = 1;
print STDERR $first," ",$second,"\n"; ::debug("Blocks send directly: $first Delays: $second Max sleep ms: $max_sleep\n");
} }
sub acquire_semaphore { sub acquire_semaphore {
@ -205,6 +226,8 @@ sub get_options_from_array {
"trim=s" => \$::opt_trim, "trim=s" => \$::opt_trim,
"profile|J=s" => \$::opt_profile, "profile|J=s" => \$::opt_profile,
"spreadstdin" => \$::opt_spreadstdin, "spreadstdin" => \$::opt_spreadstdin,
"recstart=s" => \$::opt_recstart,
"recend=s" => \$::opt_recend,
# xargs-compatibility - implemented, man, testsuite # xargs-compatibility - implemented, man, testsuite
"max-procs|P=s" => \$::opt_P, "max-procs|P=s" => \$::opt_P,
"delimiter|d=s" => \$::opt_d, "delimiter|d=s" => \$::opt_d,
@ -684,6 +707,12 @@ sub hostname {
return $Private::hostname; return $Private::hostname;
} }
sub usleep {
my $ms = shift;
select(undef, undef, undef, $ms/1000);
}
sub __RUNNING_AND_PRINTING_THE_JOBS__ {} sub __RUNNING_AND_PRINTING_THE_JOBS__ {}
# Variable structure: # Variable structure:
@ -2275,7 +2304,6 @@ sub write {
sub complete_write { sub complete_write {
my $self = shift; my $self = shift;
my $in = $self->{'stdin'}; my $in = $self->{'stdin'};
::debug("complete_write\n");
my $len = syswrite($in,$self->{'remaining'}); my $len = syswrite($in,$self->{'remaining'});
if (!defined($len) && $! == ::EAGAIN) { if (!defined($len) && $! == ::EAGAIN) {

View file

@ -0,0 +1,16 @@
#!/bin/bash
echo '### Test --spreadstdin - more procs than args'
rm /tmp/parallel.ss.*
seq 1 5 | stdout parallel -j 10 --spreadstdin 'cat >/tmp/parallel.ss.$PARALLEL_SEQ' >/dev/null
cat /tmp/parallel.ss.*
echo '### Test --spreadstdin - more args than procs'
rm /tmp/parallel.ss.*
seq 1 10 | stdout parallel -j 5 --spreadstdin 'cat >/tmp/parallel.ss.$PARALLEL_SEQ' >/dev/null
cat /tmp/parallel.ss.*
seq 1 1000| parallel -j1 --spreadstdin cat "|cat "|wc -c
seq 1 10000| parallel -j10 --spreadstdin cat "|cat "|wc -c
seq 1 100000| parallel -j1 --spreadstdin cat "|cat "|wc -c
seq 1 1000000| parallel -j10 --spreadstdin cat "|cat "|wc -c

View file

@ -0,0 +1,21 @@
### Test --spreadstdin - more procs than args
1
2
3
4
5
### Test --spreadstdin - more args than procs
1
2
3
4
5
6
7
8
9
10
3893
48894
588895
6888896