diff --git a/src/parallel b/src/parallel index 06a7d6dd..9851b9d3 100755 --- a/src/parallel +++ b/src/parallel @@ -471,39 +471,46 @@ sub nindex { return $i; } -sub round_robin_write { - # Input: - # $header_ref = ref to $header string - # $block_ref = ref to $block to be written - # $recstart = record start string - # $recend = record end string - # $endpos = end position of $block - # Uses: - # %Global::running - my ($header_ref,$block_ref,$recstart,$recend,$endpos) = @_; - my $something_written = 0; - my $block_passed = 0; - my $sleep = 1; - while(not $block_passed) { - # Continue flushing existing buffers - # until one is empty and a new block is passed - while(my ($pid,$job) = each %Global::running) { - if($job->stdin_buffer_length() > 0) { - $something_written += $job->non_block_write(); - } else { - $job->set_stdin_buffer($header_ref,$block_ref,$endpos,$recstart,$recend); - $block_passed = 1; - $job->set_virgin(0); - $something_written += $job->non_block_write(); - last; - } - } - $sleep = ::reap_usleep($sleep); - } - start_more_jobs(); - return $something_written; -} +{ + my @robin_queue; + sub round_robin_write { + # Input: + # $header_ref = ref to $header string + # $block_ref = ref to $block to be written + # $recstart = record start string + # $recend = record end string + # $endpos = end position of $block + # Uses: + # %Global::running + my ($header_ref,$block_ref,$recstart,$recend,$endpos) = @_; + my $something_written = 0; + my $block_passed = 0; + my $sleep = 1; + while(not $block_passed) { + # Continue flushing existing buffers + # until one is empty and a new block is passed + # Make a queue to spread the blocks evenly + if(not @robin_queue) { + push @robin_queue, values %Global::running; + } + while(my $job = shift @robin_queue) { + if($job->stdin_buffer_length() > 0) { + $something_written += $job->non_block_write(); + } else { + $job->set_stdin_buffer($header_ref,$block_ref,$endpos,$recstart,$recend); + $block_passed = 1; + $job->set_virgin(0); + $something_written += $job->non_block_write(); + last; + } + } + $sleep = ::reap_usleep($sleep); + } +# start_more_jobs(); + return $something_written; + } +} sub write_record_to_pipe { # Fork then @@ -608,7 +615,7 @@ sub options_hash { "keep-order|keeporder|k" => \$opt::keeporder, "group" => \$opt::group, "g" => \$opt::retired, - "ungroup|u" => \$opt::u, + "ungroup|u" => \$opt::ungroup, "linebuffer|linebuffered|line-buffer|line-buffered" => \$opt::linebuffer, "tmux" => \$opt::tmux, "null|0" => \$opt::0, @@ -780,7 +787,6 @@ sub parse_options { $Global::infinity = 2**31; $Global::debug = 0; $Global::verbose = 0; - $Global::grouped = 1; $Global::quoting = 0; # Read only table with default --rpl values %Global::replace = @@ -832,8 +838,6 @@ sub parse_options { $Global::shell = $ENV{'PARALLEL_SHELL'} || parent_shell($$) || $ENV{'SHELL'} || "/bin/sh"; if(defined $opt::X) { $Global::ContextReplace = 1; } if(defined $opt::silent) { $Global::verbose = 0; } - if(defined $opt::group) { $Global::grouped = 1; } - if(defined $opt::u) { $Global::grouped = 0; } if(defined $opt::0) { $/ = "\0"; } if(defined $opt::d) { my $e="sprintf \"$opt::d\""; $/ = eval $e; } if(defined $opt::p) { $Global::interactive = $opt::p; } @@ -944,7 +948,7 @@ sub parse_options { $opt::jobs = 1; } if(not defined $opt::group) { - $Global::grouped = 0; + $opt::ungroup = 0; } } if(@opt::trc) { @@ -1564,13 +1568,13 @@ sub enough_file_handles { # Check that we have enough filehandles available for starting # another job # Uses: - # $Global::grouped + # $opt::ungroup # %Global::fd # Returns: # 1 if ungrouped (thus not needing extra filehandles) # 0 if too few filehandles # 1 if enough filehandles - if($Global::grouped) { + if(not $opt::ungroup) { my %fh; my $enough_filehandles = 1; # perl uses 7 filehandles for something? @@ -2576,7 +2580,7 @@ sub onall { join(" ", ((defined $opt::jobs) ? "-P $opt::jobs" : ""), ((defined $opt::linebuffer) ? "--linebuffer" : ""), - ((defined $opt::u) ? "-u" : ""), + ((defined $opt::ungroup) ? "-u" : ""), ((defined $opt::group) ? "-g" : ""), ((defined $opt::keeporder) ? "--keeporder" : ""), ((defined $opt::D) ? "-D $opt::D" : ""), @@ -2585,7 +2589,7 @@ sub onall { ); my $suboptions = join(" ", - ((defined $opt::u) ? "-u" : ""), + ((defined $opt::ungroup) ? "-u" : ""), ((defined $opt::linebuffer) ? "--linebuffer" : ""), ((defined $opt::group) ? "-g" : ""), ((defined $opt::files) ? "--files" : ""), @@ -5027,7 +5031,7 @@ sub openoutputfiles { } $self->set_fh(1,"unlink",""); $self->set_fh(2,"unlink",""); - } elsif($Global::grouped) { + } elsif(not $opt::ungroup) { # To group we create temporary files for STDOUT and STDERR # To avoid the cleanup unlink the files immediately (but keep them open) if(@Global::tee_jobs) { @@ -5075,7 +5079,7 @@ sub openoutputfiles { $self->set_fh($fdno,'r',$fdr); $self->set_fh($fdno,'rpid',$rpid); } - } elsif($Global::grouped) { + } elsif(not $opt::ungroup) { # Set reading FD if using --group (--ungroup does not need) for my $fdno (1,2) { # Re-open the file for reading @@ -5838,7 +5842,7 @@ sub start { open OUT, '>&', $stdout_fh or ::die_bug("Can't redirect STDOUT: $!"); open ERR, '>&', $stderr_fh or ::die_bug("Can't dup STDOUT: $!"); - if(($opt::dryrun or $Global::verbose) and not $Global::grouped) { + if(($opt::dryrun or $Global::verbose) and $opt::ungroup) { if($Global::verbose <= 1) { print $stdout_fh $job->replaced(),"\n"; } else { @@ -6017,15 +6021,17 @@ sub print { if($opt::pipe and $self->virgin()) { # Skip --joblog, --dryrun, --verbose } else { - if($Global::joblog) { $self->print_joblog() } + if($Global::joblog and defined $self->{'exitstatus'}) { + # Add to joblog when finished + $self->print_joblog(); + } - # Printing is only relevant for grouped output. - $Global::grouped or return; + # Printing is only relevant for grouped/--line-buffer output. + $opt::ungroup and return; # Check for disk full exit_if_disk_full(); - my $command = $self->wrapped(); - if(($opt::dryrun or $Global::verbose) and $Global::grouped + if(($opt::dryrun or $Global::verbose) and not $self->{'verbose_printed'}) { $self->{'verbose_printed'}++; @@ -6033,7 +6039,7 @@ sub print { print STDOUT $self->replaced(),"\n"; } else { # Verbose level > 1: Print the rsync and stuff - print STDOUT $command,"\n"; + print STDOUT $self->wrapped(),"\n"; } # If STDOUT and STDERR are merged, # we want the command to be printed first @@ -6058,7 +6064,8 @@ sub print { close $self->fh($fdno,"w"); close $in_fh; if($opt::pipe and $self->virgin()) { - # Nothing was printed to this job: # cleanup unused tmp files if --files was set + # Nothing was printed to this job: + # cleanup unused tmp files if --files was set for my $fdno (1,2) { unlink $self->fh($fdno,"name"); unlink $self->fh($fdno,"unlink"); @@ -6068,74 +6075,7 @@ sub print { } } elsif($opt::linebuffer) { # Line buffered print out - my $partial = \$self->{'partial_line',$fdno}; - if(defined $self->{'exitstatus'}) { - # If the job is dead: close printing fh. Needed for --compress - close $self->fh($fdno,"w"); - if($opt::compress && $opt::linebuffer) { - # Blocked reading in final round - $Global::use{"Fcntl"} ||= eval "use Fcntl qw(:DEFAULT :flock); 1;"; - for my $fdno (1,2) { - my $fdr = $self->fh($fdno,'r'); - my $flags; - fcntl($fdr, &F_GETFL, $flags) || die $!; # Get the current flags on the filehandle - $flags &= ~&O_NONBLOCK; # Remove non-blocking to the flags - fcntl($fdr, &F_SETFL, $flags) || die $!; # Set the flags on the filehandle - } - } - } - # This seek will clear EOF - seek $in_fh, tell($in_fh), 0; - # The read is non-blocking: The $in_fh is set to non-blocking. - # 32768 --tag = 5.1s - # 327680 --tag = 4.4s - # 1024000 --tag = 4.4s - # 3276800 --tag = 4.3s - # 32768000 --tag = 4.7s - # 10240000 --tag = 4.3s - while(read($in_fh,substr($$partial,length $$partial),3276800)) { - # Append to $$partial - # Find the last \n - my $i = rindex($$partial,"\n"); - if($i != -1) { - # One or more complete lines were found - if($fdno == 2 and not $self->{'printed_first_line',$fdno}++) { - # OpenSSH_3.6.1p2 gives 'tcgetattr: Invalid argument' with -tt - # This is a crappy way of ignoring it. - $$partial =~ s/^(client_process_control: )?tcgetattr: Invalid argument\n//; - # Length of partial line has changed: Find the last \n again - $i = rindex($$partial,"\n"); - } - if($opt::tag or defined $opt::tagstring) { - # Replace ^ with $tag within the full line - my $tag = $self->tag(); - substr($$partial,0,$i+1) =~ s/^/$tag/gm; - # Length of partial line has changed: Find the last \n again - $i = rindex($$partial,"\n"); - } - # Print up to and including the last \n - print $out_fd substr($$partial,0,$i+1); - # Remove the printed part - substr($$partial,0,$i+1)=""; - } - } - if(defined $self->{'exitstatus'}) { - # If the job is dead: print the remaining partial line - # read remaining - if($$partial and ($opt::tag or defined $opt::tagstring)) { - my $tag = $self->tag(); - $$partial =~ s/^/$tag/gm; - } - print $out_fd $$partial; - # Release the memory - $$partial = undef; - if($self->fh($fdno,"rpid") and CORE::kill 0, $self->fh($fdno,"rpid")) { - # decompress still running - } else { - # decompress done: close fh - close $in_fh; - } - } + $self->linebuffer_print($fdno,$in_fh,$out_fd); } else { my $buf; close $self->fh($fdno,"w"); @@ -6179,6 +6119,80 @@ sub print { ::debug("print", "<{'partial_line',$fdno}; + + if(defined $self->{'exitstatus'}) { + # If the job is dead: close printing fh. Needed for --compress + close $self->fh($fdno,"w"); + if($opt::compress) { + # Blocked reading in final round + $Global::use{"Fcntl"} ||= eval "use Fcntl qw(:DEFAULT :flock); 1;"; + for my $fdno (1,2) { + my $fdr = $self->fh($fdno,'r'); + my $flags; + fcntl($fdr, &F_GETFL, $flags) || die $!; # Get the current flags on the filehandle + $flags &= ~&O_NONBLOCK; # Remove non-blocking to the flags + fcntl($fdr, &F_SETFL, $flags) || die $!; # Set the flags on the filehandle + } + } + } + # This seek will clear EOF + seek $in_fh, tell($in_fh), 0; + # The read is non-blocking: The $in_fh is set to non-blocking. + # 32768 --tag = 5.1s + # 327680 --tag = 4.4s + # 1024000 --tag = 4.4s + # 3276800 --tag = 4.3s + # 32768000 --tag = 4.7s + # 10240000 --tag = 4.3s + while(read($in_fh,substr($$partial,length $$partial),3276800)) { + # Append to $$partial + # Find the last \n + my $i = rindex($$partial,"\n"); + if($i != -1) { + # One or more complete lines were found + if($fdno == 2 and not $self->{'printed_first_line',$fdno}++) { + # OpenSSH_3.6.1p2 gives 'tcgetattr: Invalid argument' with -tt + # This is a crappy way of ignoring it. + $$partial =~ s/^(client_process_control: )?tcgetattr: Invalid argument\n//; + # Length of partial line has changed: Find the last \n again + $i = rindex($$partial,"\n"); + } + if($opt::tag or defined $opt::tagstring) { + # Replace ^ with $tag within the full line + my $tag = $self->tag(); + substr($$partial,0,$i+1) =~ s/^/$tag/gm; + # Length of partial line has changed: Find the last \n again + $i = rindex($$partial,"\n"); + } + # Print up to and including the last \n + print $out_fd substr($$partial,0,$i+1); + # Remove the printed part + substr($$partial,0,$i+1)=""; + } + } + if(defined $self->{'exitstatus'}) { + # If the job is dead: print the remaining partial line + # read remaining + if($$partial and ($opt::tag or defined $opt::tagstring)) { + my $tag = $self->tag(); + $$partial =~ s/^/$tag/gm; + } + print $out_fd $$partial; + # Release the memory + $$partial = undef; + if($self->fh($fdno,"rpid") and CORE::kill 0, $self->fh($fdno,"rpid")) { + # decompress still running + } else { + # decompress done: close fh + close $in_fh; + } + } +} + sub print_joblog { my $self = shift; my $cmd; diff --git a/testsuite/tests-to-run/parallel-local23.sh b/testsuite/tests-to-run/parallel-local23.sh index 38fd054b..9a1b9ddd 100755 --- a/testsuite/tests-to-run/parallel-local23.sh +++ b/testsuite/tests-to-run/parallel-local23.sh @@ -34,6 +34,9 @@ echo '### test round-robin'; echo '### bug #43600: --pipe --linebuffer --round does not work' seq 10000000000 | parallel --pipe --linebuffer --round cat | head +echo '### Check that 4 processes are really used' + seq 1000000 | parallel -j4 --pipe --round --line-buf wc |sort + echo '### --version must have higher priority than retired options' $NICEPAR --version -g -Y -U -W -T | tail diff --git a/testsuite/tests-to-run/parallel-remote1.sh b/testsuite/tests-to-run/parallel-remote1.sh index d8d4c3f9..1d1f0a73 100644 --- a/testsuite/tests-to-run/parallel-remote1.sh +++ b/testsuite/tests-to-run/parallel-remote1.sh @@ -13,7 +13,12 @@ echo 'ssh "$@"; echo "$@" >>/tmp/myssh2-run' >/tmp/myssh2 chmod 755 /tmp/myssh1 /tmp/myssh2 seq 1 100 | parallel --sshdelay 0.05 --sshlogin "/tmp/myssh1 $SSHLOGIN1,/tmp/myssh2 $SSHLOGIN2" -k echo -cat <<'EOF' | sed -e s/\$SERVER1/$SERVER1/\;s/\$SERVER2/$SERVER2/\;s/\$SSHLOGIN1/$SSHLOGIN1/\;s/\$SSHLOGIN2/$SSHLOGIN2/\;s/\$SSHLOGIN3/$SSHLOGIN3/ | parallel -vj2 -k -L1 +cat <<'EOF' | sed -e s/\$SERVER1/$SERVER1/\;s/\$SERVER2/$SERVER2/\;s/\$SSHLOGIN1/$SSHLOGIN1/\;s/\$SSHLOGIN2/$SSHLOGIN2/\;s/\$SSHLOGIN3/$SSHLOGIN3/ | parallel -vj1 -k -L1 +echo '### bug #41964: --controlmaster not seems to reuse OpenSSH connections to the same host' + (parallel -S redhat9.tange.dk true ::: {1..20}; echo No --controlmaster - finish last) & + (parallel -M -S redhat9.tange.dk true ::: {1..20}; echo With --controlmaster - finish first) & + wait + echo '### --filter-hosts - OK, non-such-user, connection refused, wrong host' parallel --nonall --filter-hosts -S localhost,NoUser@localhost,154.54.72.206,"ssh 5.5.5.5" hostname @@ -29,9 +34,4 @@ echo '### test --filter-hosts with server w/o ssh, non-existing server' echo '### Missing: test --filter-hosts proxied through the one host' -echo '### bug #41964: --controlmaster not seems to reuse OpenSSH connections to the same host' - (parallel -S redhat9.tange.dk true ::: {1..20}; echo No --controlmaster - finish last) & - (parallel -M -S redhat9.tange.dk true ::: {1..20}; echo With --controlmaster - finish first) & - wait - EOF diff --git a/testsuite/wanted-results/parallel-local23 b/testsuite/wanted-results/parallel-local23 index 830a7893..6a3f31d3 100644 --- a/testsuite/wanted-results/parallel-local23 +++ b/testsuite/wanted-results/parallel-local23 @@ -63,6 +63,13 @@ echo '### bug #43600: --pipe --linebuffer --round does not work' 8 9 10 +echo '### Check that 4 processes are really used' +### Check that 4 processes are really used + seq 1000000 | parallel -j4 --pipe --round --line-buf wc |sort + 149797 149797 1048579 + 235145 235145 1646016 + 299593 299593 2097151 + 315465 315465 2097150 echo '### --version must have higher priority than retired options' ### --version must have higher priority than retired options $NICEPAR --version -g -Y -U -W -T | tail