parallel: --header --pipepart works.

This commit is contained in:
Ole Tange 2014-05-29 03:20:07 +02:00
parent 35939753d6
commit 20d8d4554a

View file

@ -158,11 +158,8 @@ if($opt::eta or $opt::bar) {
$Global::JobQueue->total_jobs(); $Global::JobQueue->total_jobs();
} }
if($opt::pipepart) { if($opt::pipepart) {
my @cmdlines; $Global::JobQueue->{'commandlinequeue'}->unget(
for(@opt::a) { map { pipe_part_files($_) } @opt::a);
push(@cmdlines, pipe_part_files($_));
}
$Global::JobQueue->{'commandlinequeue'}->unget(@cmdlines);
} }
for my $sshlogin (values %Global::host) { for my $sshlogin (values %Global::host) {
$sshlogin->max_jobs_running(); $sshlogin->max_jobs_running();
@ -217,14 +214,17 @@ sub pipe_part_files {
# Returns: # Returns:
# @commands to run to pipe the blocks of the file to the command given # @commands to run to pipe the blocks of the file to the command given
my ($file) = @_; my ($file) = @_;
my $buf = "";
open(my $fh, "<", $file) || die;
my $header = find_header(\$buf,$fh);
# find positions # find positions
my @pos = find_split_positions($file,$opt::blocksize); my @pos = find_split_positions($file,$opt::blocksize,length $header);
# unshift job with cat_partial # unshift job with cat_partial
my @cmdlines; my @cmdlines;
for(my $i=0; $i<$#pos; $i++) { for(my $i=0; $i<$#pos; $i++) {
my $cmd = $Global::JobQueue->{'commandlinequeue'}->get(); my $cmd = $Global::JobQueue->{'commandlinequeue'}->get();
# TODO prepend --header (how?) $cmd->{'replaced'} =
$cmd->{'replaced'} = cat_partial($file, $pos[$i],$pos[$i+1])."|" . cat_partial($file, 0, length($header), $pos[$i], $pos[$i+1]) . "|" .
"(".$cmd->{'replaced'}.")"; "(".$cmd->{'replaced'}.")";
::debug("Unget ".$cmd->{'replaced'}."\n"); ::debug("Unget ".$cmd->{'replaced'}."\n");
push(@cmdlines, $cmd); push(@cmdlines, $cmd);
@ -232,16 +232,34 @@ sub pipe_part_files {
return @cmdlines; return @cmdlines;
} }
sub find_header {
my ($buf_ref, $fh) = @_;
my $header = "";
if($opt::header) {
if($opt::header eq ":") { $opt::header = "(.*\n)"; }
# Number = number of lines
$opt::header =~ s/^(\d+)$/"(.*\n)"x$1/e;
while(read($fh,substr($$buf_ref,length $$buf_ref,0),$opt::blocksize)) {
if($$buf_ref=~s/^($opt::header)//) {
$header = $1;
last;
}
}
}
return $header;
}
sub find_split_positions { sub find_split_positions {
# Input: # Input:
# $file = the file to read # $file = the file to read
# $block = (minimal) --block-size of each chunk # $block = (minimal) --block-size of each chunk
# $headerlen = length of header to be skipped
# Uses: # Uses:
# $opt::recstart # $opt::recstart
# $opt::recend # $opt::recend
# Returns: # Returns:
# @positions of block start/end # @positions of block start/end
my($file, $block) = @_; my($file, $block, $headerlen) = @_;
my $size = -s $file; my $size = -s $file;
# The optimal dd blocksize for mint, redhat, solaris, openbsd = 2^17..2^20 # The optimal dd blocksize for mint, redhat, solaris, openbsd = 2^17..2^20
# The optimal dd blocksize for freebsd = 2^15..2^17 # The optimal dd blocksize for freebsd = 2^15..2^17
@ -250,8 +268,8 @@ sub find_split_positions {
my ($recstart,$recend) = recstartrecend(); my ($recstart,$recend) = recstartrecend();
my $recendrecstart = $recend.$recstart; my $recendrecstart = $recend.$recstart;
open (my $fh, "<", $file) || die; open (my $fh, "<", $file) || die;
push(@pos,0); push(@pos,$headerlen);
for(my $pos = $block; $pos < $size; $pos += $block) { for(my $pos = $block+$headerlen; $pos < $size; $pos += $block) {
my $buf; my $buf;
seek($fh, $pos, 0) || die; seek($fh, $pos, 0) || die;
while(read($fh,substr($buf,length $buf,0),$dd_block_size)) { while(read($fh,substr($buf,length $buf,0),$dd_block_size)) {
@ -300,24 +318,13 @@ sub spreadstdin {
# read a record # read a record
# Spawn a job and print the record to it. # Spawn a job and print the record to it.
my $buf = ""; my $buf = "";
my $header = "";
if($opt::header) {
if($opt::header eq ":") { $opt::header = "(.*\n)"; }
# Number = number of lines
$opt::header =~ s/^(\d+)$/"(.*\n)"x$1/e;
while(read(STDIN,substr($buf,length $buf,0),$opt::blocksize)) {
if($buf=~s/^($opt::header)//) {
$header = $1;
last;
}
}
}
my ($recstart,$recend) = recstartrecend(); my ($recstart,$recend) = recstartrecend();
my $recendrecstart = $recend.$recstart; my $recendrecstart = $recend.$recstart;
my $chunk_number = 1; my $chunk_number = 1;
my $one_time_through; my $one_time_through;
my $blocksize = $opt::blocksize; my $blocksize = $opt::blocksize;
my $in = *STDIN; my $in = *STDIN;
my $header = find_header(\$buf,$in);
while(1) { while(1) {
my $anything_written = 0; my $anything_written = 0;
if(not read($in,substr($buf,length $buf,0),$blocksize)) { if(not read($in,substr($buf,length $buf,0),$blocksize)) {