parallel: --joblog records input and output data as Send/Receive.

This commit is contained in:
Ole Tange 2015-03-07 03:45:13 +01:00
parent d4ab51beb6
commit 9c73947d9f
4 changed files with 232 additions and 9 deletions

View file

@ -632,6 +632,8 @@ sub write_record_to_pipe {
my $job = shift @Global::virgin_jobs; my $job = shift @Global::virgin_jobs;
# Job is no longer virgin # Job is no longer virgin
$job->set_virgin(0); $job->set_virgin(0);
# We ignore the removed rec_sep which is technically wrong.
$job->add_transfersize($endpos + length $$header_ref);
if(fork()) { if(fork()) {
# Skip # Skip
} else { } else {
@ -1050,7 +1052,7 @@ sub parse_options {
sub init_globals { sub init_globals {
# Defaults: # Defaults:
$Global::version = 20150302; $Global::version = 20150305;
$Global::progname = 'parallel'; $Global::progname = 'parallel';
$Global::infinity = 2**31; $Global::infinity = 2**31;
$Global::debug = 0; $Global::debug = 0;
@ -5646,6 +5648,7 @@ sub set_stdin_buffer {
} }
$self->{'stdin_buffer_length'} = length $self->{'stdin_buffer'}; $self->{'stdin_buffer_length'} = length $self->{'stdin_buffer'};
$self->{'stdin_buffer_pos'} = 0; $self->{'stdin_buffer_pos'} = 0;
$self->add_transfersize($self->{'stdin_buffer_length'});
} }
sub stdin_buffer_length { sub stdin_buffer_length {
@ -6299,6 +6302,12 @@ sub transfersize {
return $self->{'transfersize'}; return $self->{'transfersize'};
} }
sub add_transfersize {
my $self = shift;
my $transfersize = shift;
$self->{'transfersize'} += $transfersize;
}
sub sshtransfer { sub sshtransfer {
# Returns for each transfer file: # Returns for each transfer file:
# rsync $file remote:$workdir # rsync $file remote:$workdir
@ -6335,6 +6344,12 @@ sub returnsize {
return $self->{'returnsize'}; return $self->{'returnsize'};
} }
sub add_returnsize {
my $self = shift;
my $returnsize = shift;
$self->{'returnsize'} += $returnsize;
}
sub sshreturn { sub sshreturn {
# Returns for each return-file: # Returns for each return-file:
# rsync remote:$workdir/$file . # rsync remote:$workdir/$file .
@ -6770,13 +6785,14 @@ sub print {
if($opt::pipe and $self->virgin()) { if($opt::pipe and $self->virgin()) {
# Skip --joblog, --dryrun, --verbose # Skip --joblog, --dryrun, --verbose
} else { } else {
if($Global::joblog and defined $self->{'exitstatus'}) { if($opt::ungroup and $Global::joblog and defined $self->{'exitstatus'}) {
# if($Global::joblog and defined $self->{'exitstatus'}) {
# Add to joblog when finished # Add to joblog when finished
$self->print_joblog(); $self->print_joblog();
}
# Printing is only relevant for grouped/--line-buffer output. # Printing is only relevant for grouped/--line-buffer output.
$opt::ungroup and return; $opt::ungroup and return;
}
# Check for disk full # Check for disk full
exit_if_disk_full(); exit_if_disk_full();
@ -6821,6 +6837,11 @@ sub print {
flush $out_fd; flush $out_fd;
} }
::debug("print", "<<joboutput @command\n"); ::debug("print", "<<joboutput @command\n");
if($Global::joblog and defined $self->{'exitstatus'}
and not ($self->virgin() and $opt::pipe)) {
# Add to joblog when finished
$self->print_joblog();
}
} }
sub files_print { sub files_print {
@ -6839,6 +6860,7 @@ sub files_print {
} }
} elsif($fdno == 1 and $self->fh($fdno,"name")) { } elsif($fdno == 1 and $self->fh($fdno,"name")) {
print $out_fd $self->fh($fdno,"name"),"\n"; print $out_fd $self->fh($fdno,"name"),"\n";
$self->add_returnsize(-s $self->fh($fdno,"name"));
} }
} }
@ -6871,6 +6893,7 @@ sub linebuffer_print {
# 3276800 --tag = 4.3s # 3276800 --tag = 4.3s
# 10240000 --tag = 4.3s # 10240000 --tag = 4.3s
# 32768000 --tag = 4.7s # 32768000 --tag = 4.7s
my $outputlength = 0;
while(read($in_fh,substr($$partial,length $$partial),3276800)) { while(read($in_fh,substr($$partial,length $$partial),3276800)) {
# Append to $$partial # Append to $$partial
# Find the last \n # Find the last \n
@ -6884,6 +6907,7 @@ sub linebuffer_print {
# Length of partial line has changed: Find the last \n again # Length of partial line has changed: Find the last \n again
$i = ::rindex64($partial,"\n"); $i = ::rindex64($partial,"\n");
} }
$outputlength += $i+1;
if($opt::tag or defined $opt::tagstring) { if($opt::tag or defined $opt::tagstring) {
# Replace ^ with $tag within the full line # Replace ^ with $tag within the full line
my $tag = $self->tag(); my $tag = $self->tag();
@ -6897,9 +6921,11 @@ sub linebuffer_print {
substr($$partial,0,$i+1) = ""; substr($$partial,0,$i+1) = "";
} }
} }
$self->add_returnsize($outputlength);
if(defined $self->{'exitstatus'}) { if(defined $self->{'exitstatus'}) {
# If the job is dead: print the remaining partial line # If the job is dead: print the remaining partial line
# read remaining # read remaining
$self->add_returnsize(length $$partial);
if($$partial and ($opt::tag or defined $opt::tagstring)) { if($$partial and ($opt::tag or defined $opt::tagstring)) {
my $tag = $self->tag(); my $tag = $self->tag();
$$partial =~ s/^/$tag/gm; $$partial =~ s/^/$tag/gm;
@ -6931,14 +6957,20 @@ sub tag_print {
if(/^(client_process_control: )?tcgetattr: Invalid argument\n/) { if(/^(client_process_control: )?tcgetattr: Invalid argument\n/) {
# Skip # Skip
} else { } else {
$self->add_returnsize(length $_);
print $out_fd $tag,$_; print $out_fd $tag,$_;
} }
# At most run the loop once # At most run the loop once
last; last;
} }
} }
my $outputlength = 0;
while(<$in_fh>) { while(<$in_fh>) {
print $out_fd $tag,$_; print $out_fd $tag,$_;
$outputlength += length $_;
}
if($fdno == 1) {
$self->add_returnsize($outputlength);
} }
close $in_fh; close $in_fh;
} }
@ -6956,9 +6988,15 @@ sub normal_print {
sysread($in_fh,$buf,1_000); sysread($in_fh,$buf,1_000);
$buf =~ s/^(client_process_control: )?tcgetattr: Invalid argument\n//; $buf =~ s/^(client_process_control: )?tcgetattr: Invalid argument\n//;
print $out_fd $buf; print $out_fd $buf;
$self->add_returnsize(length $buf);
} }
my $outputlength = 0;
while(sysread($in_fh,$buf,32768)) { while(sysread($in_fh,$buf,32768)) {
print $out_fd $buf; print $out_fd $buf;
$outputlength += length $buf;
}
if($fdno == 1) {
$self->add_returnsize($outputlength);
} }
close $in_fh; close $in_fh;
} }

View file

@ -89,9 +89,38 @@ echo '### bug #42913: Dont use $SHELL but the shell currently running'
perl -e 'system(qw(parallel -Dinit echo ::: 1))' | grep which; perl -e 'system(qw(parallel -Dinit echo ::: 1))' | grep which;
rm -f /tmp/par*.par rm -f /tmp/par*.par
echo '### added transfersize/returnsize to local jobs'
echo '### normal'
seq 100 111 | parallel --joblog /dev/stderr seq {} '|' pv -qL100 2>&1 >/dev/null | cut -f 5-7 | sort
echo '### --line-buffer'
seq 100 111 | parallel --joblog /dev/stderr --line-buffer seq {} '|' pv -qL100 2>&1 >/dev/null | cut -f 5-7 | sort
echo '### --tag'
seq 100 111 | parallel --tag --joblog /dev/stderr seq {} '|' pv -qL100 2>&1 >/dev/null | cut -f 5-7 | sort
echo '### --tag --line-buffer'
seq 100 111 | parallel --tag --line-buffer --joblog /dev/stderr seq {} '|' pv -qL100 2>&1 >/dev/null | cut -f 5-7 | sort
echo '### --files'
seq 100 111 | parallel --files --joblog /dev/stderr seq {} '|' pv -qL100 2>&1 >/dev/null | cut -f 5-7 | sort
echo '### --files --tag'
seq 100 111 | parallel --files --tag --joblog /dev/stderr seq {} '|' pv -qL100 2>&1 >/dev/null | cut -f 5-7 | sort
echo '### --pipe'
seq 1000 | parallel --joblog /dev/stderr --block 1111 --pipe pv -qL300 2>&1 >/dev/null | cut -f 5-7 | sort
echo '### --pipe --line-buffer'
seq 1000 | parallel --joblog /dev/stderr --block 1111 --pipe --line-buffer pv -qL300 2>&1 >/dev/null | cut -f 5-7 | sort
echo '### --pipe --tag'
seq 1000 | parallel --joblog /dev/stderr --block 1111 --pipe --tag pv -qL300 2>&1 >/dev/null | cut -f 5-7 | sort
echo '### --pipe --tag --line-buffer'
seq 1000 | parallel --joblog /dev/stderr --block 1111 --pipe --tag --line-buffer pv -qL300 2>&1 >/dev/null | cut -f 5-7 | sort
echo '### --files --pipe'
seq 1000 | parallel --joblog /dev/stderr --block 1111 --files --pipe pv -qL300 2>&1 >/dev/null | cut -f 5-7 | sort
echo '### --files --pipe --tag'
seq 1000 | parallel --joblog /dev/stderr --block 1111 --files --pipe --tag pv -qL300 2>&1 >/dev/null | cut -f 5-7 | sort
echo '### --pipe --round-robin'
seq 1000 | parallel --joblog /dev/stderr --block 1111 -j2 --pipe --round-robin pv -qL300 2>&1 >/dev/null | cut -f 5-7 | sort
echo '### --tmux test - check termination'
perl -e 'map {printf "$_%o%c\n",$_,$_}1..255' |
stdout parallel --tmux echo {} :::: - ::: a b |
perl -pe 's:tmp.par.*tms:tmp/parXXXXX.tms:; s/\d/0/g'
EOF EOF
# TODO This is too unstable
# echo '### --tmux test - check termination'
# perl -e 'map {printf "$_%o%c\n",$_,$_}1..255' | stdout parallel --tmux echo {} :::: - ::: a b | perl -pe 's/\d/0/g'

View file

@ -7,7 +7,7 @@ export XAP
NICEPAR="nice nice parallel" NICEPAR="nice nice parallel"
export NICEPAR export NICEPAR
cat <<'EOF' | sed -e s/\$SERVER1/$SERVER1/\;s/\$SERVER2/$SERVER2/ | stdout parallel -vj6 -k --joblog /tmp/jl-`basename $0` -L1 cat <<'EOF' | sed -e s/\$SERVER1/$SERVER1/\;s/\$SERVER2/$SERVER2/ | stdout parallel -vj4 -k --joblog /tmp/jl-`basename $0` -L1
echo 'bug #44250: pxz complains File format not recognized but decompresses anyway' echo 'bug #44250: pxz complains File format not recognized but decompresses anyway'
# The first line dumps core if run from make file. Why?! # The first line dumps core if run from make file. Why?!
stdout parallel --compress --compress-program pxz ls /{} ::: OK-if-missing-file stdout parallel --compress --compress-program pxz ls /{} ::: OK-if-missing-file

View file

@ -199,3 +199,159 @@ Contact the systems administrator for further assistance.
perl -e 'system(qw(parallel -Dinit echo ::: 1))' | grep which; rm -f /tmp/par*.par perl -e 'system(qw(parallel -Dinit echo ::: 1))' | grep which; rm -f /tmp/par*.par
shell? /bin/bash -c perl -e 'system(qw(parallel -Dinit echo ::: 1))' | grep which; rm -f /tmp/par*.par shell? /bin/bash -c perl -e 'system(qw(parallel -Dinit echo ::: 1))' | grep which; rm -f /tmp/par*.par
which bash => shell path /bin/bash which bash => shell path /bin/bash
echo '### added transfersize/returnsize to local jobs'
### added transfersize/returnsize to local jobs
echo '### normal'
### normal
seq 100 111 | parallel --joblog /dev/stderr seq {} '|' pv -qL100 2>&1 >/dev/null | cut -f 5-7 | sort
0 292 0
0 296 0
0 300 0
0 304 0
0 308 0
0 312 0
0 316 0
0 320 0
0 324 0
0 328 0
0 332 0
0 336 0
Send Receive Exitval
echo '### --line-buffer'
### --line-buffer
seq 100 111 | parallel --joblog /dev/stderr --line-buffer seq {} '|' pv -qL100 2>&1 >/dev/null | cut -f 5-7 | sort
0 292 0
0 296 0
0 300 0
0 304 0
0 308 0
0 312 0
0 316 0
0 320 0
0 324 0
0 328 0
0 332 0
0 336 0
Send Receive Exitval
echo '### --tag'
### --tag
seq 100 111 | parallel --tag --joblog /dev/stderr seq {} '|' pv -qL100 2>&1 >/dev/null | cut -f 5-7 | sort
0 292 0
0 296 0
0 300 0
0 304 0
0 308 0
0 312 0
0 316 0
0 320 0
0 324 0
0 328 0
0 332 0
0 336 0
Send Receive Exitval
echo '### --tag --line-buffer'
### --tag --line-buffer
seq 100 111 | parallel --tag --line-buffer --joblog /dev/stderr seq {} '|' pv -qL100 2>&1 >/dev/null | cut -f 5-7 | sort
0 292 0
0 296 0
0 300 0
0 304 0
0 308 0
0 312 0
0 316 0
0 320 0
0 324 0
0 328 0
0 332 0
0 336 0
Send Receive Exitval
echo '### --files'
### --files
seq 100 111 | parallel --files --joblog /dev/stderr seq {} '|' pv -qL100 2>&1 >/dev/null | cut -f 5-7 | sort
0 292 0
0 296 0
0 300 0
0 304 0
0 308 0
0 312 0
0 316 0
0 320 0
0 324 0
0 328 0
0 332 0
0 336 0
Send Receive Exitval
echo '### --files --tag'
### --files --tag
seq 100 111 | parallel --files --tag --joblog /dev/stderr seq {} '|' pv -qL100 2>&1 >/dev/null | cut -f 5-7 | sort
0 292 0
0 296 0
0 300 0
0 304 0
0 308 0
0 312 0
0 316 0
0 320 0
0 324 0
0 328 0
0 332 0
0 336 0
Send Receive Exitval
echo '### --pipe'
### --pipe
seq 1000 | parallel --joblog /dev/stderr --block 1111 --pipe pv -qL300 2>&1 >/dev/null | cut -f 5-7 | sort
1108 1108 0
1112 1112 0
1112 1112 0
561 561 0
Send Receive Exitval
echo '### --pipe --line-buffer'
### --pipe --line-buffer
seq 1000 | parallel --joblog /dev/stderr --block 1111 --pipe --line-buffer pv -qL300 2>&1 >/dev/null | cut -f 5-7 | sort
1108 1108 0
1112 1112 0
1112 1112 0
561 561 0
Send Receive Exitval
echo '### --pipe --tag'
### --pipe --tag
seq 1000 | parallel --joblog /dev/stderr --block 1111 --pipe --tag pv -qL300 2>&1 >/dev/null | cut -f 5-7 | sort
1108 1108 0
1112 1112 0
1112 1112 0
561 561 0
Send Receive Exitval
echo '### --pipe --tag --line-buffer'
### --pipe --tag --line-buffer
seq 1000 | parallel --joblog /dev/stderr --block 1111 --pipe --tag --line-buffer pv -qL300 2>&1 >/dev/null | cut -f 5-7 | sort
1108 1108 0
1112 1112 0
1112 1112 0
561 561 0
Send Receive Exitval
echo '### --files --pipe'
### --files --pipe
seq 1000 | parallel --joblog /dev/stderr --block 1111 --files --pipe pv -qL300 2>&1 >/dev/null | cut -f 5-7 | sort
1108 1108 0
1112 1112 0
1112 1112 0
561 561 0
Send Receive Exitval
echo '### --files --pipe --tag'
### --files --pipe --tag
seq 1000 | parallel --joblog /dev/stderr --block 1111 --files --pipe --tag pv -qL300 2>&1 >/dev/null | cut -f 5-7 | sort
1108 1108 0
1112 1112 0
1112 1112 0
561 561 0
Send Receive Exitval
echo '### --pipe --round-robin'
### --pipe --round-robin
seq 1000 | parallel --joblog /dev/stderr --block 1111 -j2 --pipe --round-robin pv -qL300 2>&1 >/dev/null | cut -f 5-7 | sort
1673 1673 0
2220 2220 0
Send Receive Exitval
echo '### --tmux test - check termination'
### --tmux test - check termination
perl -e 'map {printf "$_%o%c\n",$_,$_}1..255' | stdout parallel --tmux echo {} :::: - ::: a b | perl -pe 's:tmp.par.*tms:tmp/parXXXXX.tms:; s/\d/0/g'
See output with: tmux -S /tmp/parXXXXX.tms attach -t p000000