Code cleanup.

This commit is contained in:
Ole Tange 2014-02-16 16:34:47 +01:00
parent 846085cfd3
commit 7df7758404
8 changed files with 134 additions and 123 deletions

View file

@ -137,8 +137,8 @@ if($opt::pipe and @opt::a) {
}
@Global::tee_jobs = @jobs;
$command = "tee".join("",map {" >((".$_->replaced().")".
" >".$_->fd(1,"name").
" 2>".$_->fd(2,"name").")" } @jobs)
" >".$_->fh(1,"name").
" 2>".$_->fh(2,"name").")" } @jobs)
." >/dev/null";
$Global::JobQueue = JobQueue->new(
$command,\@fhlist,$Global::ContextReplace,$number_of_args,\@Global::ret_files);
@ -173,10 +173,10 @@ drain_job_queue();
reaper();
if($opt::pipe and @opt::a) {
for my $job (@Global::tee_jobs) {
unlink $job->fd(2,"name");
$job->set_fd(2,"name","");
unlink $job->fh(2,"name");
$job->set_fh(2,"name","");
$job->print();
unlink $job->fd(1,"name");
unlink $job->fh(1,"name");
}
}
cleanup();
@ -325,7 +325,7 @@ sub spreadstdin {
$Global::start_no_new_jobs ||= 1;
if($opt::roundrobin) {
for my $job (values %Global::running) {
close $job->fd(0,"w");
close $job->fh(0,"w");
}
my %incomplete_jobs = %Global::running;
my $sleep = 1;
@ -419,10 +419,10 @@ sub write_record_to_pipe {
}
$job->write($header_ref);
$job->write($record_ref);
close $job->fd(0,"w");
close $job->fh(0,"w");
exit(0);
}
close $job->fd(0,"w");
close $job->fh(0,"w");
return 1;
}
@ -1529,7 +1529,7 @@ sub drain_job_queue {
if($opt::pipe) {
# When using --pipe sometimes file handles are not closed properly
for my $job (values %Global::running) {
close $job->fd(0,"w");
close $job->fh(0,"w");
}
}
if($opt::progress) {
@ -4090,10 +4090,10 @@ sub cattail {
sub openoutputfiles {
# Open files for STDOUT and STDERR
# Set file handles in $self->fd
# Set file handles in $self->fh
# TODO non-blocking $opt::linebuffer
my $self = shift;
my ($outfhw, $errfhw, $outrfhr, $errfhr, $outname, $errname, $unlink_out, $unlink_err);
my ($outfhw, $errfhw, $outname, $errname);
if($opt::results) {
my $args_as_dirname = $self->{'commandline'}->args_as_dirname();
# prefix/name1/val1/name2/val2/
@ -4111,8 +4111,8 @@ sub openoutputfiles {
::error("Cannot write to `$errname'.\n");
::wait_and_exit(255);
}
$self->set_fd(1,"unlink","");
$self->set_fd(2,"unlink","");
$self->set_fh(1,"unlink","");
$self->set_fh(2,"unlink","");
} elsif($Global::grouped) {
# To group we create temporary files for STDOUT and STDERR
# To avoid the cleanup unlink the files immediately (but keep them open)
@ -4122,13 +4122,13 @@ sub openoutputfiles {
($outfhw, $outname) = ::tempfile(SUFFIX => ".par");
($errfhw, $errname) = ::tempfile(SUFFIX => ".par");
# --files => only remove stderr
$self->set_fd(1,"unlink","");
$self->set_fd(2,"unlink",$errname);
$self->set_fh(1,"unlink","");
$self->set_fh(2,"unlink",$errname);
} else {
($outfhw, $outname) = ::tempfile(SUFFIX => ".par");
($errfhw, $errname) = ::tempfile(SUFFIX => ".par");
$self->set_fd(1,"unlink",$outname);
$self->set_fd(2,"unlink",$errname);
$self->set_fh(1,"unlink",$outname);
$self->set_fh(2,"unlink",$errname);
}
} else {
# --ungroup
@ -4137,87 +4137,61 @@ sub openoutputfiles {
# File name must be empty as it will otherwise be printed
$outname = "";
$errname = "";
$self->set_fd(1,"unlink",$outname);
$self->set_fd(2,"unlink",$errname);
$self->set_fh(1,"unlink",$outname);
$self->set_fh(2,"unlink",$errname);
}
# Set writing FD
$self->set_fd(1,'w',$outfhw);
$self->set_fd(2,'w',$errfhw);
$self->set_fd(1,'name',$outname);
$self->set_fd(2,'name',$errname);
$self->set_fh(1,'w',$outfhw);
$self->set_fh(2,'w',$errfhw);
$self->set_fh(1,'name',$outname);
$self->set_fh(2,'name',$errname);
if($opt::compress) {
# Send stdout to stdin for $opt::compress_program(1)
# Send stderr to stdin for $opt::compress_program(2)
# cattail get pid i $pid = $self->fd($fdno,'rpid');
# cattail get pid i $pid = $self->fh($fdno,'rpid');
my $cattail = cattail();
for my $fdno (1,2) {
my $wpid = open(my $fdw,"|-","$opt::compress_program >".
$self->fd($fdno,'name')) || die $?;
$self->set_fd($fdno,'w',$fdw);
$self->set_fd($fdno,'wpid',$wpid);
$self->fh($fdno,'name')) || die $?;
$self->set_fh($fdno,'w',$fdw);
$self->set_fh($fdno,'wpid',$wpid);
my $rpid = open(my $fdr, "-|", "perl", "-e", $cattail,
$opt::decompress_program, $self->fd($fdno,'name')) || die $?;
$self->set_fd($fdno,'r',$fdr);
$self->set_fd($fdno,'rpid',$rpid);
# Unlink if required
$opt::decompress_program, $self->fh($fdno,'name')) || die $?;
$self->set_fh($fdno,'r',$fdr);
$self->set_fh($fdno,'rpid',$rpid);
# Unlink if required but only when cattail and compress_program has started.
# TODO this is disabled for now
# unlink $self->fd($fdno,"unlink");
# unlink $self->fh($fdno,"unlink");
}
} else {
# Set reading FD
for my $fdno (1,2) {
my $fdw = $self->fd($fdno,'w');
my $fdw = $self->fh($fdno,'w');
# Duplicate filehandle, so fdw can be closed seperately
open(my $fdr,"<&",$fdw) || die;
$self->set_fd($fdno,'r',$fdr);
$self->set_fh($fdno,'r',$fdr);
# Unlink if required
unlink $self->fd($fdno,"unlink");
unlink $self->fh($fdno,"unlink");
}
}
}
sub set_fd {
# Set file descriptor
my ($self, $fd_no, $key, $fd) = @_;
$self->{'fd'}{$fd_no,$key} = $fd;
sub set_fh {
# Set file handle
my ($self, $fd_no, $key, $fh) = @_;
$self->{'fd'}{$fd_no,$key} = $fh;
}
sub fd {
# Get file descriptor
sub fh {
# Get file handle
my ($self, $fd_no, $key) = @_;
return $self->{'fd'}{$fd_no,$key};
}
sub _set_fd_input {
my ($self, $fd_no, $fd_input) = @_;
$self->{'fd_input'}{$fd_no} = $fd_input;
}
sub _fd_input {
# Get input file descriptor
my ($self, $fd_no) = @_;
return $self->{'fd_input'}{$fd_no};
}
sub _set_fd_file_name {
# Set file name for a file descriptor
my $self = shift;
my $fd_no = shift;
$self->{'fd_file_name',$fd_no} = shift;
}
sub _fd_file_name {
# Get file name for a file descriptor
my $self = shift;
my $fd_no = shift;
return $self->{'fd_file_name',$fd_no};
}
sub write {
my $self = shift;
my $remaining_ref = shift;
my $stdin_fh = $self->fd(0,"w");
my $stdin_fh = $self->fh(0,"w");
syswrite($stdin_fh,$$remaining_ref);
}
@ -4252,7 +4226,7 @@ sub non_block_write {
# use Fcntl;
# my $flags = '';
for my $buf (substr($self->{'stdin_buffer'},$self->{'stdin_buffer_pos'})) {
my $in = $self->fd(0,"w");
my $in = $self->fh(0,"w");
# fcntl($in, F_GETFL, $flags)
# or die "Couldn't get flags for HANDLE : $!\n";
# $flags |= O_NONBLOCK;
@ -4772,7 +4746,7 @@ sub start {
my $pid;
$job->openoutputfiles();
my($stdout_fh,$stderr_fh) = ($job->fd(1,"w"),$job->fd(2,"w"));
my($stdout_fh,$stderr_fh) = ($job->fh(1,"w"),$job->fh(2,"w"));
local (*IN,*OUT,*ERR);
open OUT, '>&', $stdout_fh or ::die_bug("Can't redirect STDOUT: $!");
open ERR, '>&', $stderr_fh or ::die_bug("Can't dup STDOUT: $!");
@ -4814,7 +4788,7 @@ sub start {
::die_bug("open3-pipe");
1;
};
$job->set_fd(0,"w",$stdin_fh);
$job->set_fh(0,"w",$stdin_fh);
} elsif(@opt::a and not $Global::stdin_in_opt_a and $job->seq() == 1
and $job->sshlogin()->string() eq ":") {
# Give STDIN to the first job if using -a (but only if running
@ -4928,13 +4902,13 @@ sub print {
if($opt::pipe and $self->virgin()) {
# Nothing was printed to this job:
# cleanup tmp files if --files was set
unlink $self->fd(1,"name");
unlink $self->fh(1,"name");
return;
}
if($opt::dryrun) {
# Nothing was printed to this job:
# cleanup tmp files if --files was set
unlink $self->fd(1,"name");
unlink $self->fh(1,"name");
}
if($Global::joblog) { $self->print_joblog() }
@ -4964,8 +4938,8 @@ sub print {
# Sort by file descriptor numerically: 1,2,3,..,9,10,11
$fdno == 0 and next;
my $out_fd = $Global::fd{$fdno};
my $in_fd = $self->fd($fdno,"r");
if(not $in_fd) {
my $in_fh = $self->fh($fdno,"r");
if(not $in_fh) {
if(not $Job::file_descriptor_warning_printed{$fdno}++) {
# ::warning("File descriptor $fdno not defined\n");
}
@ -4973,17 +4947,17 @@ sub print {
}
::debug("File descriptor $fdno:");
if($opt::files) {
# If --compress: $in_fd must be closed first.
close $self->fd($fdno,"w");
if($fdno == 1 and $self->fd($fdno,"name")) {
print $out_fd $self->fd($fdno,"name"),"\n";
# If --compress: $in_fh must be closed first.
close $self->fh($fdno,"w");
if($fdno == 1 and $self->fh($fdno,"name")) {
print $out_fd $self->fh($fdno,"name"),"\n";
}
} elsif($opt::linebuffer) {
# Line buffered print out
my $partial = \$self->{'partial_line',$fdno};
# This seek will clear EOF
seek $in_fd, tell($in_fd), 0;
while(read($in_fd,substr($$partial,length $$partial),1_000_000)) {
seek $in_fh, tell($in_fh), 0;
while(read($in_fh,substr($$partial,length $$partial),1_000_000)) {
# Append to $$partial
# Find the last \n
my $i = rindex($$partial,"\n");
@ -5019,28 +4993,28 @@ sub print {
print $out_fd $$partial;
$$partial = undef;
# then close fd
close $in_fd;
close $in_fh;
}
} else {
my $buf;
close $self->fd($fdno,"w");
if($self->fd($fdno,"wpid")) {
close $self->fh($fdno,"w");
if($self->fh($fdno,"wpid")) {
# This is --compress
# Wait for $compress_program to finish
waitpid($self->fd($fdno,"wpid"),0);
waitpid($self->fh($fdno,"wpid"),0);
# Then tell cattail this the last reading
CORE::kill "HUP", $self->fd($fdno,"rpid");
CORE::kill "HUP", $self->fh($fdno,"rpid");
} else {
# Seek to start
seek $in_fd, 0, 0;
seek $in_fh, 0, 0;
}
# $in_fd is now ready for reading at position 0
# $in_fh is now ready for reading at position 0
if($opt::tag or defined $opt::tagstring) {
my $tag = $self->tag();
if($fdno == 2) {
# OpenSSH_3.6.1p2 gives 'tcgetattr: Invalid argument' with -tt
# This is a crappy way of ignoring it.
while(<$in_fd>) {
while(<$in_fh>) {
if($_ ne "tcgetattr: Invalid argument\n") {
print $out_fd $tag,$_;
}
@ -5048,7 +5022,7 @@ sub print {
last;
}
}
while(<$in_fd>) {
while(<$in_fh>) {
print $out_fd $tag,$_;
}
} else {
@ -5056,15 +5030,15 @@ sub print {
if($fdno == 2) {
# OpenSSH_3.6.1p2 gives 'tcgetattr: Invalid argument' with -tt
# This is a crappy way of ignoring it.
sysread($in_fd,$buf,1_000);
sysread($in_fh,$buf,1_000);
$buf =~ s/^tcgetattr: Invalid argument\n//;
print $out_fd $buf;
}
while(sysread($in_fd,$buf,1_000_000)) {
while(sysread($in_fh,$buf,1_000_000)) {
print $out_fd $buf;
}
}
close $in_fd;
close $in_fh;
}
flush $out_fd;
::debug("<<joboutput $command\n");

View file

@ -54,6 +54,7 @@ installparallel: ../src/parallel
cd .. && make -j && sudo make -j install
startdb:
echo shutdown abort | sudo su - oracle -c "sqlplus / as sysdba"
sudo parallel /etc/init.d/{} restart ::: postgresql mysql oracle-xe
clean:

View file

@ -9,7 +9,7 @@ echo '### bug #41565: Print happens in blocks - not after each job complete'
echo 'The timing here is important: 2 3 4 5 6'
ping -c 7 lo | parallel -j3 'echo {#}' | timestamp -dd | perl -pe '$_=int($_+0.2)."\n"'
echo '300 ms jobs:'
ping -i .3 -c 10 lo | parallel -j3 --delay 0.3 echo | timestamp -d -d | perl -pe 's/(...).*/$1/' | tail -n +2
ping -i .3 -c 10 lo | parallel -j3 --delay 0.3 echo | timestamp -d -d | perl -pe 's/(...).*/int($1*10+0.2)/e' | tail -n +2
echo '### Test --tagstring'
nice parallel -j1 -X -v --tagstring a{}b echo ::: 3 4

View file

@ -2,9 +2,6 @@
# Test {.}
echo '### Test weird regexp chars'
seq 1 6 | parallel -j1 -I :: -X echo a::b::^c::[.}c
rsync -Ha --delete input-files/testdir2/ tmp/
cd tmp
@ -32,6 +29,15 @@ ls | parallel -kv rm -- {.}/abc-{.}-{} 2>&1
# -L1 will join lines ending in ' '
cat <<'EOF' | sed -e s/\$SERVER1/$SERVER1/\;s/\$SERVER2/$SERVER2/ | nice parallel -j0 -k -L1
echo '### Test compress'
seq 5 | parallel -j2 --tag --compress 'seq {} | pv -q -L 10'
echo '### Test compress - stderr'
seq 5 | parallel -j2 --tag --compress 'seq {} | pv -q -L 10 >&2' 2>&1 >/dev/null
echo '### Test weird regexp chars'
seq 1 6 | parallel -j1 -I :: -X echo a::b::^c::[.}c
echo '### Test -m'
(echo foo;echo bar;echo joe.gif) | parallel -j1 -km echo 1{}2{.}3 A{.}B{.}C
(echo foo;echo bar;echo joe.gif) | parallel -j1 -kX echo 1{}2{.}3 A{.}B{.}C

View file

@ -1,29 +1,28 @@
#!/bin/bash
cat <<'EOF' | parallel -j0 -k
echo '### Test of --eta'
seq 1 10 | stdout parallel --eta "sleep 1; echo {}" | wc -l
seq 1 10 | stdout parallel --eta "sleep 1; echo {}" | wc -l
echo '### Test of --eta with no jobs'
stdout parallel --eta "sleep 1; echo {}" < /dev/null
stdout parallel --eta "sleep 1; echo {}" < /dev/null
echo '### Test of --progress'
seq 1 10 | stdout parallel --progress "sleep 1; echo {}" | wc -l
seq 1 10 | stdout parallel --progress "sleep 1; echo {}" | wc -l
echo '### Test of --progress with no jobs'
stdout parallel --progress "sleep 1; echo {}" < /dev/null
stdout parallel --progress "sleep 1; echo {}" < /dev/null
echo '### bug #34422: parallel -X --eta crashes with div by zero'
# We do not care how long it took
seq 2 | stdout parallel -X --eta echo | grep -E -v 'ETA:.*avg'
seq 2 | stdout parallel -X --eta echo | grep -E -v 'ETA:.*AVG'
echo '### --timeout --onall on remote machines: 2*slept 1, 2 jobs failed'
parallel -j0 --timeout 6 --onall -S localhost,parallel@parallel-server1 'sleep {}; echo slept {}' ::: 1 8 9 ; echo jobs failed: $?
parallel -j0 --timeout 6 --onall -S localhost,parallel@parallel-server1 'sleep {}; echo slept {}' ::: 1 8 9 ; echo jobs failed: $?
echo '### --pipe without command'
seq -w 10 | stdout parallel --pipe
seq -w 10 | stdout parallel --pipe
echo '### bug #36260: {n} expansion in --colsep files fails for empty fields if all following fields are also empty'
echo A,B,, | parallel --colsep , echo {1}{3}{2}
echo A,B,, | parallel --colsep , echo {1}{3}{2}
EOF

View file

@ -13,20 +13,20 @@ The timing here is important: 2 3 4 5 6
0
0
300 ms jobs:
0.3
0.3
0.3
0.3
0.3
0.3
0.3
0.3
0.3
0.3
0.3
0.3
0.3
0.3
3
3
3
3
3
3
3
3
3
3
3
3
3
3
### Test --tagstring
echo 3 4
a3b a4b 3 4

View file

@ -1,5 +1,3 @@
### Test weird regexp chars
a1b1^c1[.}c a2b2^c2[.}c a3b3^c3[.}c a4b4^c4[.}c a5b5^c5[.}c a6b6^c6[.}c
### Test {.} and {}
### Test {.} with files that have no . but dir does
/tmp/test-of-{.}-parallel/subdir/file
@ -57,6 +55,40 @@ rm -- 2-col/abc-2-col-2-col.txt
rm -- a/abc-a-a
rm -- b/abc-b-b
rm -- \ä\¸\­\å\\½\ \(Zh\Å\<5C>nggu\Ã\³\)/abc-\ä\¸\­\å\\½\ \(Zh\Å\<5C>nggu\Ã\³\)-\ä\¸\­\å\\½\ \(Zh\Å\<5C>nggu\Ã\³\)
### Test compress
1 1
2 1
2 2
3 1
3 2
3 3
4 1
4 2
4 3
4 4
5 1
5 2
5 3
5 4
5 5
### Test compress - stderr
1 1
2 1
2 2
3 1
3 2
3 3
4 1
4 2
4 3
4 4
5 1
5 2
5 3
5 4
5 5
### Test weird regexp chars
a1b1^c1[.}c a2b2^c2[.}c a3b3^c3[.}c a4b4^c4[.}c a5b5^c5[.}c a6b6^c6[.}c
### Test -m
1foo bar joe.gif2foo bar joe3 Afoo bar joeBfoo bar joeC
1foo2foo3 1bar2bar3 1joe.gif2joe3 AfooBfooC AbarBbarC AjoeBjoeC

View file

@ -19,7 +19,6 @@ Computers / CPU cores / Max jobs to run
Computer:jobs running/jobs completed/%of started jobs/Average seconds to complete
local:1/0/100%/0.0s 1 2
ETA: 0s Left: 0 AVG: 0.00s local:0/1/100%/1.0s
### --timeout --onall on remote machines: 2*slept 1, 2 jobs failed
slept 1
slept 1