From 7df7758404bea42fc00da5df17ee961d38c82fad Mon Sep 17 00:00:00 2001 From: Ole Tange Date: Sun, 16 Feb 2014 16:34:47 +0100 Subject: [PATCH] Code cleanup. --- src/parallel | 160 ++++++++------------ testsuite/Makefile | 1 + testsuite/tests-to-run/parallel-local150.sh | 2 +- testsuite/tests-to-run/parallel-local19.sh | 12 +- testsuite/tests-to-run/test30.sh | 17 +-- testsuite/wanted-results/parallel-local150 | 28 ++-- testsuite/wanted-results/parallel-local19 | 36 ++++- testsuite/wanted-results/test30 | 1 - 8 files changed, 134 insertions(+), 123 deletions(-) diff --git a/src/parallel b/src/parallel index 0e293c6a..33148272 100755 --- a/src/parallel +++ b/src/parallel @@ -137,8 +137,8 @@ if($opt::pipe and @opt::a) { } @Global::tee_jobs = @jobs; $command = "tee".join("",map {" >((".$_->replaced().")". - " >".$_->fd(1,"name"). - " 2>".$_->fd(2,"name").")" } @jobs) + " >".$_->fh(1,"name"). + " 2>".$_->fh(2,"name").")" } @jobs) ." >/dev/null"; $Global::JobQueue = JobQueue->new( $command,\@fhlist,$Global::ContextReplace,$number_of_args,\@Global::ret_files); @@ -173,10 +173,10 @@ drain_job_queue(); reaper(); if($opt::pipe and @opt::a) { for my $job (@Global::tee_jobs) { - unlink $job->fd(2,"name"); - $job->set_fd(2,"name",""); + unlink $job->fh(2,"name"); + $job->set_fh(2,"name",""); $job->print(); - unlink $job->fd(1,"name"); + unlink $job->fh(1,"name"); } } cleanup(); @@ -325,7 +325,7 @@ sub spreadstdin { $Global::start_no_new_jobs ||= 1; if($opt::roundrobin) { for my $job (values %Global::running) { - close $job->fd(0,"w"); + close $job->fh(0,"w"); } my %incomplete_jobs = %Global::running; my $sleep = 1; @@ -419,10 +419,10 @@ sub write_record_to_pipe { } $job->write($header_ref); $job->write($record_ref); - close $job->fd(0,"w"); + close $job->fh(0,"w"); exit(0); } - close $job->fd(0,"w"); + close $job->fh(0,"w"); return 1; } @@ -1529,7 +1529,7 @@ sub drain_job_queue { if($opt::pipe) { # When using --pipe sometimes file handles are not closed properly for my $job (values %Global::running) { - close $job->fd(0,"w"); + close $job->fh(0,"w"); } } if($opt::progress) { @@ -4090,10 +4090,10 @@ sub cattail { sub openoutputfiles { # Open files for STDOUT and STDERR - # Set file handles in $self->fd + # Set file handles in $self->fh # TODO non-blocking $opt::linebuffer my $self = shift; - my ($outfhw, $errfhw, $outrfhr, $errfhr, $outname, $errname, $unlink_out, $unlink_err); + my ($outfhw, $errfhw, $outname, $errname); if($opt::results) { my $args_as_dirname = $self->{'commandline'}->args_as_dirname(); # prefix/name1/val1/name2/val2/ @@ -4111,8 +4111,8 @@ sub openoutputfiles { ::error("Cannot write to `$errname'.\n"); ::wait_and_exit(255); } - $self->set_fd(1,"unlink",""); - $self->set_fd(2,"unlink",""); + $self->set_fh(1,"unlink",""); + $self->set_fh(2,"unlink",""); } elsif($Global::grouped) { # To group we create temporary files for STDOUT and STDERR # To avoid the cleanup unlink the files immediately (but keep them open) @@ -4122,13 +4122,13 @@ sub openoutputfiles { ($outfhw, $outname) = ::tempfile(SUFFIX => ".par"); ($errfhw, $errname) = ::tempfile(SUFFIX => ".par"); # --files => only remove stderr - $self->set_fd(1,"unlink",""); - $self->set_fd(2,"unlink",$errname); + $self->set_fh(1,"unlink",""); + $self->set_fh(2,"unlink",$errname); } else { ($outfhw, $outname) = ::tempfile(SUFFIX => ".par"); ($errfhw, $errname) = ::tempfile(SUFFIX => ".par"); - $self->set_fd(1,"unlink",$outname); - $self->set_fd(2,"unlink",$errname); + $self->set_fh(1,"unlink",$outname); + $self->set_fh(2,"unlink",$errname); } } else { # --ungroup @@ -4137,87 +4137,61 @@ sub openoutputfiles { # File name must be empty as it will otherwise be printed $outname = ""; $errname = ""; - $self->set_fd(1,"unlink",$outname); - $self->set_fd(2,"unlink",$errname); + $self->set_fh(1,"unlink",$outname); + $self->set_fh(2,"unlink",$errname); } # Set writing FD - $self->set_fd(1,'w',$outfhw); - $self->set_fd(2,'w',$errfhw); - $self->set_fd(1,'name',$outname); - $self->set_fd(2,'name',$errname); + $self->set_fh(1,'w',$outfhw); + $self->set_fh(2,'w',$errfhw); + $self->set_fh(1,'name',$outname); + $self->set_fh(2,'name',$errname); if($opt::compress) { # Send stdout to stdin for $opt::compress_program(1) # Send stderr to stdin for $opt::compress_program(2) - # cattail get pid i $pid = $self->fd($fdno,'rpid'); + # cattail get pid i $pid = $self->fh($fdno,'rpid'); my $cattail = cattail(); for my $fdno (1,2) { my $wpid = open(my $fdw,"|-","$opt::compress_program >". - $self->fd($fdno,'name')) || die $?; - $self->set_fd($fdno,'w',$fdw); - $self->set_fd($fdno,'wpid',$wpid); + $self->fh($fdno,'name')) || die $?; + $self->set_fh($fdno,'w',$fdw); + $self->set_fh($fdno,'wpid',$wpid); my $rpid = open(my $fdr, "-|", "perl", "-e", $cattail, - $opt::decompress_program, $self->fd($fdno,'name')) || die $?; - $self->set_fd($fdno,'r',$fdr); - $self->set_fd($fdno,'rpid',$rpid); - # Unlink if required + $opt::decompress_program, $self->fh($fdno,'name')) || die $?; + $self->set_fh($fdno,'r',$fdr); + $self->set_fh($fdno,'rpid',$rpid); + # Unlink if required but only when cattail and compress_program has started. # TODO this is disabled for now - # unlink $self->fd($fdno,"unlink"); - + # unlink $self->fh($fdno,"unlink"); } } else { # Set reading FD for my $fdno (1,2) { - my $fdw = $self->fd($fdno,'w'); + my $fdw = $self->fh($fdno,'w'); # Duplicate filehandle, so fdw can be closed seperately open(my $fdr,"<&",$fdw) || die; - $self->set_fd($fdno,'r',$fdr); + $self->set_fh($fdno,'r',$fdr); # Unlink if required - unlink $self->fd($fdno,"unlink"); + unlink $self->fh($fdno,"unlink"); } } } -sub set_fd { - # Set file descriptor - my ($self, $fd_no, $key, $fd) = @_; - $self->{'fd'}{$fd_no,$key} = $fd; +sub set_fh { + # Set file handle + my ($self, $fd_no, $key, $fh) = @_; + $self->{'fd'}{$fd_no,$key} = $fh; } -sub fd { - # Get file descriptor +sub fh { + # Get file handle my ($self, $fd_no, $key) = @_; return $self->{'fd'}{$fd_no,$key}; } -sub _set_fd_input { - my ($self, $fd_no, $fd_input) = @_; - $self->{'fd_input'}{$fd_no} = $fd_input; -} - -sub _fd_input { - # Get input file descriptor - my ($self, $fd_no) = @_; - return $self->{'fd_input'}{$fd_no}; -} - -sub _set_fd_file_name { - # Set file name for a file descriptor - my $self = shift; - my $fd_no = shift; - $self->{'fd_file_name',$fd_no} = shift; -} - -sub _fd_file_name { - # Get file name for a file descriptor - my $self = shift; - my $fd_no = shift; - return $self->{'fd_file_name',$fd_no}; -} - sub write { my $self = shift; my $remaining_ref = shift; - my $stdin_fh = $self->fd(0,"w"); + my $stdin_fh = $self->fh(0,"w"); syswrite($stdin_fh,$$remaining_ref); } @@ -4252,7 +4226,7 @@ sub non_block_write { # use Fcntl; # my $flags = ''; for my $buf (substr($self->{'stdin_buffer'},$self->{'stdin_buffer_pos'})) { - my $in = $self->fd(0,"w"); + my $in = $self->fh(0,"w"); # fcntl($in, F_GETFL, $flags) # or die "Couldn't get flags for HANDLE : $!\n"; # $flags |= O_NONBLOCK; @@ -4772,7 +4746,7 @@ sub start { my $pid; $job->openoutputfiles(); - my($stdout_fh,$stderr_fh) = ($job->fd(1,"w"),$job->fd(2,"w")); + my($stdout_fh,$stderr_fh) = ($job->fh(1,"w"),$job->fh(2,"w")); local (*IN,*OUT,*ERR); open OUT, '>&', $stdout_fh or ::die_bug("Can't redirect STDOUT: $!"); open ERR, '>&', $stderr_fh or ::die_bug("Can't dup STDOUT: $!"); @@ -4814,7 +4788,7 @@ sub start { ::die_bug("open3-pipe"); 1; }; - $job->set_fd(0,"w",$stdin_fh); + $job->set_fh(0,"w",$stdin_fh); } elsif(@opt::a and not $Global::stdin_in_opt_a and $job->seq() == 1 and $job->sshlogin()->string() eq ":") { # Give STDIN to the first job if using -a (but only if running @@ -4928,13 +4902,13 @@ sub print { if($opt::pipe and $self->virgin()) { # Nothing was printed to this job: # cleanup tmp files if --files was set - unlink $self->fd(1,"name"); + unlink $self->fh(1,"name"); return; } if($opt::dryrun) { # Nothing was printed to this job: # cleanup tmp files if --files was set - unlink $self->fd(1,"name"); + unlink $self->fh(1,"name"); } if($Global::joblog) { $self->print_joblog() } @@ -4964,8 +4938,8 @@ sub print { # Sort by file descriptor numerically: 1,2,3,..,9,10,11 $fdno == 0 and next; my $out_fd = $Global::fd{$fdno}; - my $in_fd = $self->fd($fdno,"r"); - if(not $in_fd) { + my $in_fh = $self->fh($fdno,"r"); + if(not $in_fh) { if(not $Job::file_descriptor_warning_printed{$fdno}++) { # ::warning("File descriptor $fdno not defined\n"); } @@ -4973,17 +4947,17 @@ sub print { } ::debug("File descriptor $fdno:"); if($opt::files) { - # If --compress: $in_fd must be closed first. - close $self->fd($fdno,"w"); - if($fdno == 1 and $self->fd($fdno,"name")) { - print $out_fd $self->fd($fdno,"name"),"\n"; + # If --compress: $in_fh must be closed first. + close $self->fh($fdno,"w"); + if($fdno == 1 and $self->fh($fdno,"name")) { + print $out_fd $self->fh($fdno,"name"),"\n"; } } elsif($opt::linebuffer) { # Line buffered print out my $partial = \$self->{'partial_line',$fdno}; # This seek will clear EOF - seek $in_fd, tell($in_fd), 0; - while(read($in_fd,substr($$partial,length $$partial),1_000_000)) { + seek $in_fh, tell($in_fh), 0; + while(read($in_fh,substr($$partial,length $$partial),1_000_000)) { # Append to $$partial # Find the last \n my $i = rindex($$partial,"\n"); @@ -5019,28 +4993,28 @@ sub print { print $out_fd $$partial; $$partial = undef; # then close fd - close $in_fd; + close $in_fh; } } else { my $buf; - close $self->fd($fdno,"w"); - if($self->fd($fdno,"wpid")) { + close $self->fh($fdno,"w"); + if($self->fh($fdno,"wpid")) { # This is --compress # Wait for $compress_program to finish - waitpid($self->fd($fdno,"wpid"),0); + waitpid($self->fh($fdno,"wpid"),0); # Then tell cattail this the last reading - CORE::kill "HUP", $self->fd($fdno,"rpid"); + CORE::kill "HUP", $self->fh($fdno,"rpid"); } else { # Seek to start - seek $in_fd, 0, 0; + seek $in_fh, 0, 0; } - # $in_fd is now ready for reading at position 0 + # $in_fh is now ready for reading at position 0 if($opt::tag or defined $opt::tagstring) { my $tag = $self->tag(); if($fdno == 2) { # OpenSSH_3.6.1p2 gives 'tcgetattr: Invalid argument' with -tt # This is a crappy way of ignoring it. - while(<$in_fd>) { + while(<$in_fh>) { if($_ ne "tcgetattr: Invalid argument\n") { print $out_fd $tag,$_; } @@ -5048,7 +5022,7 @@ sub print { last; } } - while(<$in_fd>) { + while(<$in_fh>) { print $out_fd $tag,$_; } } else { @@ -5056,15 +5030,15 @@ sub print { if($fdno == 2) { # OpenSSH_3.6.1p2 gives 'tcgetattr: Invalid argument' with -tt # This is a crappy way of ignoring it. - sysread($in_fd,$buf,1_000); + sysread($in_fh,$buf,1_000); $buf =~ s/^tcgetattr: Invalid argument\n//; print $out_fd $buf; } - while(sysread($in_fd,$buf,1_000_000)) { + while(sysread($in_fh,$buf,1_000_000)) { print $out_fd $buf; } } - close $in_fd; + close $in_fh; } flush $out_fd; ::debug("<&1 # -L1 will join lines ending in ' ' cat <<'EOF' | sed -e s/\$SERVER1/$SERVER1/\;s/\$SERVER2/$SERVER2/ | nice parallel -j0 -k -L1 +echo '### Test compress' + seq 5 | parallel -j2 --tag --compress 'seq {} | pv -q -L 10' + +echo '### Test compress - stderr' + seq 5 | parallel -j2 --tag --compress 'seq {} | pv -q -L 10 >&2' 2>&1 >/dev/null + +echo '### Test weird regexp chars' + seq 1 6 | parallel -j1 -I :: -X echo a::b::^c::[.}c + echo '### Test -m' (echo foo;echo bar;echo joe.gif) | parallel -j1 -km echo 1{}2{.}3 A{.}B{.}C (echo foo;echo bar;echo joe.gif) | parallel -j1 -kX echo 1{}2{.}3 A{.}B{.}C diff --git a/testsuite/tests-to-run/test30.sh b/testsuite/tests-to-run/test30.sh index 572c224b..71d430e6 100644 --- a/testsuite/tests-to-run/test30.sh +++ b/testsuite/tests-to-run/test30.sh @@ -1,29 +1,28 @@ #!/bin/bash - cat <<'EOF' | parallel -j0 -k echo '### Test of --eta' -seq 1 10 | stdout parallel --eta "sleep 1; echo {}" | wc -l + seq 1 10 | stdout parallel --eta "sleep 1; echo {}" | wc -l echo '### Test of --eta with no jobs' -stdout parallel --eta "sleep 1; echo {}" < /dev/null + stdout parallel --eta "sleep 1; echo {}" < /dev/null echo '### Test of --progress' -seq 1 10 | stdout parallel --progress "sleep 1; echo {}" | wc -l + seq 1 10 | stdout parallel --progress "sleep 1; echo {}" | wc -l echo '### Test of --progress with no jobs' -stdout parallel --progress "sleep 1; echo {}" < /dev/null + stdout parallel --progress "sleep 1; echo {}" < /dev/null echo '### bug #34422: parallel -X --eta crashes with div by zero' # We do not care how long it took -seq 2 | stdout parallel -X --eta echo | grep -E -v 'ETA:.*avg' + seq 2 | stdout parallel -X --eta echo | grep -E -v 'ETA:.*AVG' echo '### --timeout --onall on remote machines: 2*slept 1, 2 jobs failed' -parallel -j0 --timeout 6 --onall -S localhost,parallel@parallel-server1 'sleep {}; echo slept {}' ::: 1 8 9 ; echo jobs failed: $? + parallel -j0 --timeout 6 --onall -S localhost,parallel@parallel-server1 'sleep {}; echo slept {}' ::: 1 8 9 ; echo jobs failed: $? echo '### --pipe without command' -seq -w 10 | stdout parallel --pipe + seq -w 10 | stdout parallel --pipe echo '### bug #36260: {n} expansion in --colsep files fails for empty fields if all following fields are also empty' -echo A,B,, | parallel --colsep , echo {1}{3}{2} + echo A,B,, | parallel --colsep , echo {1}{3}{2} EOF diff --git a/testsuite/wanted-results/parallel-local150 b/testsuite/wanted-results/parallel-local150 index 18a726de..349e76f9 100644 --- a/testsuite/wanted-results/parallel-local150 +++ b/testsuite/wanted-results/parallel-local150 @@ -13,20 +13,20 @@ The timing here is important: 2 3 4 5 6 0 0 300 ms jobs: -0.3 -0.3 -0.3 -0.3 -0.3 -0.3 -0.3 -0.3 -0.3 -0.3 -0.3 -0.3 -0.3 -0.3 +3 +3 +3 +3 +3 +3 +3 +3 +3 +3 +3 +3 +3 +3 ### Test --tagstring echo 3 4 a3b a4b 3 4 diff --git a/testsuite/wanted-results/parallel-local19 b/testsuite/wanted-results/parallel-local19 index 0498d2d4..ee9417d4 100644 --- a/testsuite/wanted-results/parallel-local19 +++ b/testsuite/wanted-results/parallel-local19 @@ -1,5 +1,3 @@ -### Test weird regexp chars -a1b1^c1[.}c a2b2^c2[.}c a3b3^c3[.}c a4b4^c4[.}c a5b5^c5[.}c a6b6^c6[.}c ### Test {.} and {} ### Test {.} with files that have no . but dir does /tmp/test-of-{.}-parallel/subdir/file @@ -57,6 +55,40 @@ rm -- 2-col/abc-2-col-2-col.txt rm -- a/abc-a-a rm -- b/abc-b-b rm -- \ä\¸\­\å\›\½\ \(Zh\Å\nggu\Ã\³\)/abc-\ä\¸\­\å\›\½\ \(Zh\Å\nggu\Ã\³\)-\ä\¸\­\å\›\½\ \(Zh\Å\nggu\Ã\³\) +### Test compress +1 1 +2 1 +2 2 +3 1 +3 2 +3 3 +4 1 +4 2 +4 3 +4 4 +5 1 +5 2 +5 3 +5 4 +5 5 +### Test compress - stderr +1 1 +2 1 +2 2 +3 1 +3 2 +3 3 +4 1 +4 2 +4 3 +4 4 +5 1 +5 2 +5 3 +5 4 +5 5 +### Test weird regexp chars +a1b1^c1[.}c a2b2^c2[.}c a3b3^c3[.}c a4b4^c4[.}c a5b5^c5[.}c a6b6^c6[.}c ### Test -m 1foo bar joe.gif2foo bar joe3 Afoo bar joeBfoo bar joeC 1foo2foo3 1bar2bar3 1joe.gif2joe3 AfooBfooC AbarBbarC AjoeBjoeC diff --git a/testsuite/wanted-results/test30 b/testsuite/wanted-results/test30 index 67316aa7..d8500251 100644 --- a/testsuite/wanted-results/test30 +++ b/testsuite/wanted-results/test30 @@ -19,7 +19,6 @@ Computers / CPU cores / Max jobs to run Computer:jobs running/jobs completed/%of started jobs/Average seconds to complete local:1/0/100%/0.0s 1 2 - ETA: 0s Left: 0 AVG: 0.00s local:0/1/100%/1.0s ### --timeout --onall on remote machines: 2*slept 1, 2 jobs failed slept 1 slept 1