parallel: Stop looping in --pipe if this reads 0 bytes.

This commit is contained in:
Ole Tange 2013-02-10 13:32:50 +01:00
parent 0be46ce826
commit 55ad133f46
3 changed files with 19 additions and 19 deletions

View file

@ -270,7 +270,6 @@ sub spreadstdin {
# read a record # read a record
# Spawn a job and print the record to it. # Spawn a job and print the record to it.
my @fhlist = @_; # Filehandles to read from (Defaults to STDIN) my @fhlist = @_; # Filehandles to read from (Defaults to STDIN)
my $record;
my $buf = ""; my $buf = "";
my $header = ""; my $header = "";
if($opt::header) { if($opt::header) {
@ -314,8 +313,11 @@ sub spreadstdin {
for my $in (@fhlist) { for my $in (@fhlist) {
piperead: while(1) { piperead: while(1) {
my $anything_written = 0; my $anything_written = 0;
eof($in) and $force_one_time_through++ and last piperead; if(not read($in,substr($buf,length $buf,0),$blocksize)
read($in,substr($buf,length $buf,0),$blocksize); and $force_one_time_through) {
last;
}
$force_one_time_through ||= 1;
if($opt::r) { if($opt::r) {
# Remove empty lines # Remove empty lines
@ -383,7 +385,7 @@ sub spreadstdin {
write_record_to_pipe(\$header,\$buf,$recstart,$recend,length $buf); write_record_to_pipe(\$header,\$buf,$recstart,$recend,length $buf);
::debug("Done reading input\n"); ::debug("Done reading input\n");
$Global::start_no_new_jobs = 1; $Global::start_no_new_jobs ||= 1;
} }
sub nindex { sub nindex {
@ -1799,7 +1801,7 @@ sub start_no_new_jobs {
"$Global::progname: Waiting for these ", scalar(keys %Global::running), "$Global::progname: Waiting for these ", scalar(keys %Global::running),
" jobs to finish. Send SIGTERM again to stop now.\n"); " jobs to finish. Send SIGTERM again to stop now.\n");
list_running_jobs(); list_running_jobs();
$Global::start_no_new_jobs++; $Global::start_no_new_jobs ||= 1;
} }
sub reaper { sub reaper {
@ -1857,7 +1859,7 @@ sub reaper {
"Waiting for ", scalar(keys %Global::running), "Waiting for ", scalar(keys %Global::running),
" jobs to finish. This job failed:\n", " jobs to finish. This job failed:\n",
$job->replaced(),"\n"); $job->replaced(),"\n");
$Global::start_no_new_jobs++; $Global::start_no_new_jobs ||= 1;
$Global::halt_on_error_exitstatus = $job->exitstatus(); $Global::halt_on_error_exitstatus = $job->exitstatus();
} elsif($opt::halt_on_error == 2) { } elsif($opt::halt_on_error == 2) {
# If halt on error == 2 we should exit immediately # If halt on error == 2 we should exit immediately
@ -3169,7 +3171,7 @@ sub sshcommand_of_sshlogin {
# Start it # Start it
my $pid = fork(); my $pid = fork();
if($pid) { if($pid) {
$Global::sshmaster{$pid}++; $Global::sshmaster{$pid} ||= 1;
} else { } else {
::debug($master,"\n"); ::debug($master,"\n");
`$master </dev/null`; `$master </dev/null`;
@ -4269,7 +4271,7 @@ sub populate {
and $self->{'arg_queue'}->empty() and $Global::max_jobs_running) { and $self->{'arg_queue'}->empty() and $Global::max_jobs_running) {
# -m or -X and EOF => Spread the arguments over all jobslots # -m or -X and EOF => Spread the arguments over all jobslots
# (unless they are already spread) # (unless they are already spread)
$CommandLine::already_spread++; $CommandLine::already_spread ||= 1;
if($self->number_of_args() > 1) { if($self->number_of_args() > 1) {
$self->{'max_number_of_args'} = $self->{'max_number_of_args'} =
::ceil($self->number_of_args()/$Global::max_jobs_running); ::ceil($self->number_of_args()/$Global::max_jobs_running);
@ -4621,7 +4623,7 @@ sub context_replace_placeholders {
my $tt = $target; my $tt = $target;
my %word; my %word;
while($tt =~ s/(\S*(?:$rep_str_regexp|$rep_str_pos_regexp|$rep_seq_regexp)\S*)/\0/o) { while($tt =~ s/(\S*(?:$rep_str_regexp|$rep_str_pos_regexp|$rep_seq_regexp)\S*)/\0/o) {
$word{$1}++; $word{$1} ||= 1;
} }
# For each word: Generate the replacement string for that word. # For each word: Generate the replacement string for that word.
for my $origword (keys %word) { for my $origword (keys %word) {
@ -4921,8 +4923,6 @@ sub is_acceptable_command_line_length {
# 1 otherwise # 1 otherwise
my $len = shift; my $len = shift;
$CommandMaxLength::is_acceptable_command_line_length++;
::debug("$CommandMaxLength::is_acceptable_command_line_length $len\n");
local *STDERR; local *STDERR;
open (STDERR, ">", "/dev/null"); open (STDERR, ">", "/dev/null");
system "true "."x"x$len; system "true "."x"x$len;

View file

@ -484,7 +484,7 @@ Implies B<--semaphore>.
Remove down hosts. For each remote host: check that login through ssh Remove down hosts. For each remote host: check that login through ssh
works. If not: do not use this host. works. If not: do not use this host.
Currently you can <i>not</i> put B<--filter-hosts> in a profile, Currently you can I<not> put B<--filter-hosts> in a profile,
$PARALLEL, /etc/parallel/config or similar. This is because GNU $PARALLEL, /etc/parallel/config or similar. This is because GNU
B<parallel> uses GNU B<parallel> to compute this, so you will get an B<parallel> uses GNU B<parallel> to compute this, so you will get an
infinite loop. This will likely be fixed in a later release. infinite loop. This will likely be fixed in a later release.
@ -3469,8 +3469,8 @@ Copyright (C) 2007-10-18 Ole Tange, http://ole.tange.dk
Copyright (C) 2008,2009,2010 Ole Tange, http://ole.tange.dk Copyright (C) 2008,2009,2010 Ole Tange, http://ole.tange.dk
Copyright (C) 2010,2011,2012 Ole Tange, http://ole.tange.dk and Free Copyright (C) 2010,2011,2012,2013 Ole Tange, http://ole.tange.dk and
Software Foundation, Inc. Free Software Foundation, Inc.
Parts of the manual concerning B<xargs> compatibility is inspired by Parts of the manual concerning B<xargs> compatibility is inspired by
the manual of B<xargs> from GNU findutils 4.4.2. the manual of B<xargs> from GNU findutils 4.4.2.
@ -3478,7 +3478,7 @@ the manual of B<xargs> from GNU findutils 4.4.2.
=head1 LICENSE =head1 LICENSE
Copyright (C) 2007,2008,2009,2010,2011,2012 Free Software Foundation, Copyright (C) 2007,2008,2009,2010,2011,2012,2013 Free Software Foundation,
Inc. Inc.
This program is free software; you can redistribute it and/or modify This program is free software; you can redistribute it and/or modify

View file

@ -512,7 +512,7 @@ Implies @strong{--semaphore}.
Remove down hosts. For each remote host: check that login through ssh Remove down hosts. For each remote host: check that login through ssh
works. If not: do not use this host. works. If not: do not use this host.
Currently you can <i>not</i> put @strong{--filter-hosts} in a profile, Currently you can @emph{not} put @strong{--filter-hosts} in a profile,
$PARALLEL, /etc/parallel/config or similar. This is because GNU $PARALLEL, /etc/parallel/config or similar. This is because GNU
@strong{parallel} uses GNU @strong{parallel} to compute this, so you will get an @strong{parallel} uses GNU @strong{parallel} to compute this, so you will get an
infinite loop. This will likely be fixed in a later release. infinite loop. This will likely be fixed in a later release.
@ -3707,8 +3707,8 @@ Copyright (C) 2007-10-18 Ole Tange, http://ole.tange.dk
Copyright (C) 2008,2009,2010 Ole Tange, http://ole.tange.dk Copyright (C) 2008,2009,2010 Ole Tange, http://ole.tange.dk
Copyright (C) 2010,2011,2012 Ole Tange, http://ole.tange.dk and Free Copyright (C) 2010,2011,2012,2013 Ole Tange, http://ole.tange.dk and
Software Foundation, Inc. Free Software Foundation, Inc.
Parts of the manual concerning @strong{xargs} compatibility is inspired by Parts of the manual concerning @strong{xargs} compatibility is inspired by
the manual of @strong{xargs} from GNU findutils 4.4.2. the manual of @strong{xargs} from GNU findutils 4.4.2.
@ -3716,7 +3716,7 @@ the manual of @strong{xargs} from GNU findutils 4.4.2.
@chapter LICENSE @chapter LICENSE
@anchor{LICENSE} @anchor{LICENSE}
Copyright (C) 2007,2008,2009,2010,2011,2012 Free Software Foundation, Copyright (C) 2007,2008,2009,2010,2011,2012,2013 Free Software Foundation,
Inc. Inc.
This program is free software; you can redistribute it and/or modify This program is free software; you can redistribute it and/or modify