diff --git a/packager/obs/home:tange/parallel/parallel.spec b/packager/obs/home:tange/parallel/parallel.spec index 846ccab7..b09a9527 100644 --- a/packager/obs/home:tange/parallel/parallel.spec +++ b/packager/obs/home:tange/parallel/parallel.spec @@ -1,6 +1,6 @@ Summary: Shell tool for executing jobs in parallel Name: parallel -Version: 20120422 +Version: 20120522 Release: 1 License: GPL Group: Productivity/File utilities diff --git a/src/parallel b/src/parallel index fd7a0f6f..bc5127fb 100755 --- a/src/parallel +++ b/src/parallel @@ -236,16 +236,12 @@ sub spreadstdin { if($Global::max_number_of_args) { # -N => (start..*?end){n} while($buf =~ s/((?:$recstart.*?$recend){$Global::max_number_of_args})($recstart.*)$/$2/os) { - $record = $header.$1; - ::debug("Read record -N: ".length($record)."\n"); - write_record_to_pipe(\$record,$recstart,$recend); + write_record_to_pipe(\$header,\$1,$recstart,$recend,length $1); } } else { # Find the last recend-recstart in $buf if($buf =~ s/(.*$recend)($recstart.*?)$/$2/os) { - $record = $header.$1; - ::debug("Matched record: ".length($record)."/".length($buf)."\n"); - write_record_to_pipe(\$record,$recstart,$recend); + write_record_to_pipe(\$header,\$1,$recstart,$recend,length $1); } } } else { @@ -254,20 +250,16 @@ sub spreadstdin { my $i = 0; while(($i = nindex(\$buf,$recendrecstart,$Global::max_number_of_args)) != -1) { $i += length $recend; # find the actual splitting location - my $record = $header.substr($buf,0,$i); + write_record_to_pipe(\$header,\$buf,$recstart,$recend,$i); substr($buf,0,$i) = ""; - ::debug("Read record: ".length($record)."\n"); - write_record_to_pipe(\$record,$recstart,$recend); } } else { # Find the last recend-recstart in $buf my $i = rindex($buf,$recendrecstart); if($i != -1) { $i += length $recend; # find the actual splitting location - my $record = $header.substr($buf,0,$i); + write_record_to_pipe(\$header,\$buf,$recstart,$recend,$i); substr($buf,0,$i) = ""; - # ::debug("Read record: ".length($record)."\n"); - write_record_to_pipe(\$record,$recstart,$recend); } } } @@ -275,12 +267,10 @@ sub spreadstdin { } # If there is anything left in the buffer write it - substr($buf,0,0) = $header; - write_record_to_pipe(\$buf,$recstart,$recend); + substr($buf,0,0) = ""; + write_record_to_pipe(\$header,\$buf,$recstart,$recend,length $buf); ::debug("Done reading input\n"); - flush_and_close_pipes(); - ::debug("Done flushing to children\n"); $Global::start_no_new_jobs = 1; } @@ -299,74 +289,45 @@ sub nindex { return $i; } -sub flush_and_close_pipes { - # Flush that that is cached to the open pipes - # and close them. - my $flush_done; - my $sleep = 0.05; - do { - $flush_done = 1; - # Make sure everything is written to the jobs - for my $job (values %Global::running) { - if($job->remaining()) { - if($job->complete_write()) { - # Some data was written - reset sleep timer - $sleep = 0.05; - } - $flush_done = 0; - } - } - $sleep = ::reap_usleep($sleep); - } while (not $flush_done); - for my $job (values %Global::running) { - my $fh = $job->stdin(); - close $fh; - } -} - sub write_record_to_pipe { + # Fork then + # Write record from pos 0 .. $endpos to pipe + my $header_ref = shift; my $record_ref = shift; my $recstart = shift; my $recend = shift; + my $endpos = shift; if(length $$record_ref == 0) { return; } - if($::opt_remove_rec_sep) { - # Remove record separator - $$record_ref =~ s/$recend$recstart//gos; - $$record_ref =~ s/^$recstart//os; - $$record_ref =~ s/$recend$//os; + # Find the minimal seq $job that has no data written == virgin + # If no virgin found, backoff + my $sleep = 0.0001; # 0.01 ms - better performance on highend + while(not @Global::virgin_jobs) { + ::debug("No virgin jobs"); + $sleep = ::reap_usleep($sleep); + start_more_jobs(); # These jobs may not be started because of loadavg } - # Keep the pipes hot, but if nothing happens sleep should back off - my $sleep = 0.00001; # 0.00001 ms - better performance on highend - write_record: while(1) { - # Sorting according to sequence is necessary for -k to work - for my $job (sort { $a->seq() <=> $b->seq() } values %Global::running) { - ::debug("Looking at ",$job->seq(),"-",$job->remaining(),"-",$job->datawritten(),"\n"); - if($job->remaining()) { - # Part of the job's last record has not finished being written - if($job->complete_write()) { - # Something got written - reset sleep timer - $sleep = 0.00001; - } - } else { - if($job->datawritten() > 0) { - # There is no data remaining and we have written data before: - # So this means we have completed writing a block. - # close stdin - # This will cause the job to finish and when it dies we will spawn another job - my $fh = $job->stdin(); - close $fh; - } else { - $job->write($record_ref); - # Something got written - reset sleep timer - $sleep = 0.00001; - last write_record; - } - } - } - # Maybe this should be in an if statement: if sleep > 0.001: start more - start_more_jobs(); # These jobs may not be started because of loadavg - $sleep = ::reap_usleep($sleep); - } + my $job = shift @Global::virgin_jobs; + if(fork()) { + # Skip + } else { + # Chop of at $endpos as we do not know how many rec_sep will + # be removed. + my $record = substr($$record_ref,0,$endpos); + # Remove rec_sep + if($::opt_remove_rec_sep) { + # Remove record separator + $record =~ s/$recend$recstart//gos; + $record =~ s/^$recstart//os; + $record =~ s/$recend$//os; + } + $job->write($header_ref); + $job->write(\$record); + my $fh = $job->stdin(); + close $fh; + exit; + } + my $fh = $job->stdin(); + close $fh; return; } @@ -538,7 +499,7 @@ sub get_options_from_array { sub parse_options { # Returns: N/A # Defaults: - $Global::version = 20120522; + $Global::version = 20120523; $Global::progname = 'parallel'; $Global::infinity = 2**31; $Global::debug = 0; @@ -1055,6 +1016,7 @@ sub __RUNNING_THE_JOBS_AND_PRINTING_PROGRESS__ {} # Variable structure: # # $Global::running{$pid} = Pointer to Job-object +# @Global::virgin_jobs = Pointer to Job-object that have received no input # $Global::host{$sshlogin} = Pointer to SSHLogin-object # $Global::total_running = total number of running jobs # $Global::total_started = total jobs started @@ -1159,6 +1121,9 @@ sub start_another_job { debug("Command to run on '".$job->sshlogin()."': '".$job->replaced()."'\n"); if($job->start()) { $Global::running{$job->pid()} = $job; + if($::opt_pipe) { + push(@Global::virgin_jobs,$job); + } debug("Started as seq ",$job->seq()," pid:",$job->pid(),"\n"); return 1; } else { @@ -2036,7 +2001,6 @@ sub my_dump { sub __OBJECT_ORIENTED_PARTS__ {} - package SSHLogin; sub new { @@ -3051,49 +3015,14 @@ sub stdin { sub set_stdin { my $self = shift; my $stdin = shift; - # set non-blocking - fcntl($stdin, ::F_SETFL, ::O_NONBLOCK) or - ::die_bug("Couldn't set flags for HANDLE: $!"); $self->{'stdin'} = $stdin; } sub write { my $self = shift; my $remaining_ref = shift; - if(length($$remaining_ref)) { - $self->{'remaining'} .= $$remaining_ref; - $self->complete_write(); - } -} - -sub complete_write { - # Returns: - # number of bytes written (see syswrite) - my $self = shift; my $in = $self->{'stdin'}; - my $len = syswrite($in,$self->{'remaining'}); - if (!defined($len) && $! == &::EAGAIN) { - # write would block; - } else { - # Remove the part that was written - substr($self->{'remaining'},0,$len) = ""; - $self->{'datawritten'} += $len; - } - return $len; -} - -sub remaining { - my $self = shift; - if(defined $self->{'remaining'}) { - return length $self->{'remaining'}; - } else { - return undef; - } -} - -sub datawritten { - my $self = shift; - return $self->{'datawritten'}; + syswrite($in,$$remaining_ref); } sub pid { diff --git a/src/parallel.pod b/src/parallel.pod index b8dd9545..85f2f27d 100644 --- a/src/parallel.pod +++ b/src/parallel.pod @@ -538,7 +538,7 @@ specified, and for B<-I>{} otherwise. This option is deprecated; use B<-I> instead. -=item B<--joblog> I +=item B<--joblog> I (alpha testing) Logfile for executed jobs. Save a list of the executed jobs to I in the following TAB separated format: sequence number, @@ -660,7 +660,7 @@ B<-l 0> is an alias for B<-l 1>. Implies B<-X> unless B<-m> is set. -=item B<--load> I +=item B<--load> I (alpha testing) Do not start new jobs on a given computer unless the load is less than I. I uses the same syntax as B<--jobs>, so I<100%> @@ -755,9 +755,9 @@ Instead of printing the output to stdout (standard output) the output of each job is saved in a file and the filename is then printed. -=item B<--pipe> +=item B<--pipe> (alpha testing) -=item B<--spreadstdin> +=item B<--spreadstdin> (alpha testing) Spread input to jobs on stdin (standard input). Read a block of data from stdin (standard input) and give one block of data as input to one diff --git a/src/parallel.texi b/src/parallel.texi index ddea6392..4adf90ea 100644 --- a/src/parallel.texi +++ b/src/parallel.texi @@ -570,8 +570,8 @@ This option is a synonym for @strong{-I}@emph{replace-str} if @emph{replace-str} specified, and for @strong{-I}@{@} otherwise. This option is deprecated; use @strong{-I} instead. -@item @strong{--joblog} @emph{logfile} -@anchor{@strong{--joblog} @emph{logfile}} +@item @strong{--joblog} @emph{logfile} (alpha testing) +@anchor{@strong{--joblog} @emph{logfile} (alpha testing)} Logfile for executed jobs. Save a list of the executed jobs to @emph{logfile} in the following TAB separated format: sequence number, @@ -712,8 +712,8 @@ The @strong{-l} option is deprecated since the POSIX standard specifies Implies @strong{-X} unless @strong{-m} is set. -@item @strong{--load} @emph{max-load} -@anchor{@strong{--load} @emph{max-load}} +@item @strong{--load} @emph{max-load} (alpha testing) +@anchor{@strong{--load} @emph{max-load} (alpha testing)} Do not start new jobs on a given computer unless the load is less than @emph{max-load}. @emph{max-load} uses the same syntax as @strong{--jobs}, so @emph{100%} @@ -810,11 +810,11 @@ all the output from one server will be grouped together. Instead of printing the output to stdout (standard output) the output of each job is saved in a file and the filename is then printed. -@item @strong{--pipe} -@anchor{@strong{--pipe}} +@item @strong{--pipe} (alpha testing) +@anchor{@strong{--pipe} (alpha testing)} -@item @strong{--spreadstdin} -@anchor{@strong{--spreadstdin}} +@item @strong{--spreadstdin} (alpha testing) +@anchor{@strong{--spreadstdin} (alpha testing)} Spread input to jobs on stdin (standard input). Read a block of data from stdin (standard input) and give one block of data as input to one diff --git a/testsuite/Start.sh b/testsuite/Start.sh index 7876ba2b..783c1013 100644 --- a/testsuite/Start.sh +++ b/testsuite/Start.sh @@ -3,8 +3,9 @@ export LANG=C SHFILE=/tmp/unittest-parallel.sh +# Try a failing test twice. ls -t tests-to-run/*${1}*.sh \ -| perl -pe 's:(.*/(.*)).sh:bash $1.sh > actual-results/$2; diff -Naur wanted-results/$2 actual-results/$2:' \ +| perl -pe 's:(.*/(.*)).sh:bash $1.sh > actual-results/$2; diff -Naur wanted-results/$2 actual-results/$2 >/dev/null || bash $1.sh > actual-results/$2; diff -Naur wanted-results/$2 actual-results/$2:' \ >$SHFILE mkdir -p actual-results diff --git a/testsuite/tests-to-run/parallel-local105.sh b/testsuite/tests-to-run/parallel-local105.sh index 1939b5c9..76ac7ada 100755 --- a/testsuite/tests-to-run/parallel-local105.sh +++ b/testsuite/tests-to-run/parallel-local105.sh @@ -1,11 +1,11 @@ #!/bin/bash -rm -rf tmp 2>/dev/null -cd input-files -tar xjf random_dirs_with_newline.tar.bz2 -cd .. -cp -a input-files/random_dirs_with_newline tmp -cd tmp +TMP=/tmp/parallel_local105 +rm -rf $TMP 2>/dev/null +mkdir -p $TMP +tar -C $TMP -xf input-files/random_dirs_with_newline.tar.bz2 + +cd $TMP/random_dirs_with_newline # tests if special dir names causes problems find . -type d -print0 | perl -0 -pe 's:^./::' | parallel -0 -v touch -- {}/abc-{}-{} 2>&1 \ @@ -33,4 +33,5 @@ find . -type f -print0 | perl -0 -ne '$a++;END{print $a}' echo ' files' cd .. -rm -rf tmp +rm -rf $TMP + diff --git a/testsuite/tests-to-run/parallel-local9.sh b/testsuite/tests-to-run/parallel-local9.sh index 978f1ad3..99f0c783 100644 --- a/testsuite/tests-to-run/parallel-local9.sh +++ b/testsuite/tests-to-run/parallel-local9.sh @@ -165,7 +165,7 @@ echo "echo a" | parallel parallel -j1 -I :: -X echo 'a::b::^c::[.}c' ::: 1 echo "### BUG: The length for -X is not close to max (131072)" -seq 1 4000 | parallel -X echo {.} aa {}{.} {}{}d{} {}dd{}d{.} |head -n 1 |wc +seq 1 4000 | parallel -k -X echo {.} aa {}{.} {}{}d{} {}dd{}d{.} |head -n 1 |wc echo "### BUG: empty lines with --show-limit" echo | parallel --show-limits diff --git a/testsuite/wanted-results/test65 b/testsuite/wanted-results/test65 index b2633de7..3bf27eb0 100644 --- a/testsuite/wanted-results/test65 +++ b/testsuite/wanted-results/test65 @@ -54,10 +54,6 @@ h2 21xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx 22xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx Stop -Start -h1 -h2 -Stop ### Test --header with multiple ::: a2 b1 b1 a2 ### Test --shellquote