parallel: Implemented --pipepart

This commit is contained in:
Ole Tange 2014-04-09 05:11:51 +02:00
parent f2c1f65a5a
commit 34e131b894
2 changed files with 231 additions and 36 deletions

View file

@ -158,6 +158,11 @@ git commit -a
Released as 20140X22 ('').
TAG=MyTag
YYYYMMDD=`yyyymmdd`
git tag -s -m "Released as $YYYYMMDD ('$TAG')" $TAG
git tag -s -m "Released as $YYYYMMDD ('$TAG')" $YYYYMMDD
== Update Savannah ==

View file

@ -68,9 +68,13 @@ if(@ARGV) {
}
my @fhlist;
@fhlist = map { open_or_exit($_) } @opt::a;
if(not @fhlist and not $opt::pipe) {
if($opt::pipepart) {
@fhlist = map { open_or_exit($_) } "/dev/null";
} else {
@fhlist = map { open_or_exit($_) } @opt::a;
if(not @fhlist and not $opt::pipe) {
@fhlist = (*STDIN);
}
}
if($opt::skip_first_line) {
# Skip the first line for the first file handle
@ -121,7 +125,7 @@ if($opt::nonall or $opt::onall) {
$Global::JobQueue = JobQueue->new(
$command,\@fhlist,$Global::ContextReplace,$number_of_args,\@Global::ret_files);
if($opt::pipe and @opt::a) {
if(0 and $opt::pipe and @opt::a) {
# ... | parallel --pipe cmd ::: arg1 arg2
# The command to run is:
# tee >((cmd arg1) >/tmp/tmp1 2>/tmp/err1) >((cmd arg2) >/tmp/tmp2 2>/tmp/err2) >/dev/null
@ -153,6 +157,13 @@ if($opt::eta or $opt::bar) {
# Count the number of jobs before starting any
$Global::JobQueue->total_jobs();
}
if($opt::pipepart) {
my @cmdlines;
for(@opt::a) {
push(@cmdlines, pipe_part_files($_));
}
$Global::JobQueue->{'commandlinequeue'}->unget(@cmdlines);
}
for my $sshlogin (values %Global::host) {
$sshlogin->max_jobs_running();
}
@ -163,9 +174,12 @@ if($Global::semaphore) {
$sem = acquire_semaphore();
}
$SIG{TERM} = \&start_no_new_jobs;
start_more_jobs();
if($opt::pipe) {
if(not $opt::pipepart) {
if($opt::pipe) {
spreadstdin();
}
}
::debug("Start draining\n");
drain_job_queue();
@ -197,6 +211,170 @@ if($opt::halt_on_error) {
sub __PIPE_MODE__ {}
# parallel --part-pipe -a bigfile cat
# =>
# (dd 1) | cat
# (dd 2) | cat
# (dd 3) | cat
sub pipe_part_files {
my ($file) = @_;
# find positions
my @pos = find_split_positions($file,$opt::blocksize);
# unshift job with dd_prefix
my @cmdlines;
for(my $i=0; $i<$#pos; $i++) {
my $cmd = $Global::JobQueue->{'commandlinequeue'}->get();
# TODO prepend --header (how?)
$cmd->{'replaced'} = cat_partial($file, $pos[$i],$pos[$i+1])."|" .
$cmd->{'replaced'};
::debug("Unget ".$cmd->{'replaced'}."\n");
push(@cmdlines, $cmd);
}
return @cmdlines;
}
sub find_split_positions {
# Input:
# $file = the file to read
# $block = (minimal) --block-size of each chunk
# Uses:
# $opt::recstart
# $opt::recend
# Returns:
# @positions of block start/end
my($file, $block) = @_;
my $size = -s $file;
# The optimal dd blocksize for mint, redhat, solaris, openbsd = 2^17..2^20
# The optimal dd blocksize for freebsd = 2^15..2^17
my $dd_block_size = 131072; # 2^17
my @pos;
my ($recstart,$recend) = recstartrecend();
my $recendrecstart = $recend.$recstart;
open (my $fh, "<", $file) || die;
push(@pos,0);
for(my $pos = $block; $pos < $size; $pos += $block) {
my $buf;
seek($fh, $pos, 0) || die;
while(read($fh,substr($buf,length $buf,0),$dd_block_size)) {
# If match $recend$recstart => Record position
my $i = index($buf,$recendrecstart);
if($i != -1) {
push(@pos,$pos+$i);
# Start looking for next record _after_ this match
$pos += $i;
last;
}
}
}
push(@pos,$size);
close $fh;
return @pos;
}
sub cat_partial {
# Input:
# $file = the file to read
# $start = start byte
# $end = end byte
# Returns:
# Efficent perl command to copy $start..$end to stdout
my($file, $start, $end) = @_;
my $len = $end - $start;
return "<". shell_quote_scalar($file) .
q{ perl -e 'sysseek(STDIN,shift,0) || die; $left = shift; while($read = sysread(STDIN,$buf, ($left > 32768 ? 32768 : $left))){ $left -= $read; syswrite(STDOUT,$buf); }' } .
" ".$start." ".$len;
}
sub _dd_prefix_part_job {
# Input:
# $file = the file to read
# $start = start byte
# $end = end byte
# Returns:
# Efficent dd command to copy $start..$end to stdout
my($file, $start, $end) = @_;
# The optimal blocksize for mint, redhat, solaris, openbsd = 2^17..2^20
# The optimal blocksize for freebsd = 2^15..2^17
my $big_block = 131072;
my $small_block = 512;
# Copy:
# start .. 512*n: 1 byte at a time (1 MB/s)
# 512*n .. 131072*n: 512 bytes at a time (300 MB/s)
# 131072*n1 .. 131072*n2: 131072 bytes at a time (1 GB/s)
# 131072*n .. 512*n: 512 bytes at a time (medium speed)
# 512*n .. end: 1 byte at a time
# start = 1234;
# end = 4321;
# len = end - start = 3087;
my $len = $end - $start;
# copy1_start = start;
my $copy1_start = $start;
# copy1_len = (10 - 1234) % 10 = (small_block - copy1_start) % small_block = 6;
my $copy1_len = ($small_block - $copy1_start) % $small_block;
# copy1_bs = 1;
my $copy1_bs = 1;
# copy1_count = 6 / 1 = copy1_len / copy1_bs = 6;
my $copy1_count = $copy1_len / $copy1_bs;
# copy1_skip = 1234 = start / copy1_bs;
my $copy1_skip = $start / $copy1_bs;
# copy2_start = start + copy1_len = 1240;
my $copy2_start = $start + $copy1_len;
# copy2_len = (100 - 1240) % 100 = (big_block - copy2_start) % big_block = 60;
my $copy2_len = ($big_block - $copy2_start) % $big_block;
# copy2_bs = small_block = 10;
my $copy2_bs = $small_block;
# copy2_count = 60 / 10 = copy2_len / copy2_bs = 6;
my $copy2_count = $copy2_len / $copy2_bs;
# copy2_skip = 1240 / 10 = copy2_start / copy2_bs = 124
my $copy2_skip = $copy2_start / $copy2_bs;
# copy5_len = 4321 % 10 = end % small_block = 1;
my $copy5_len = $end % $small_block;
# copy5_start = 4321 - 1 = end - copy5_len = 4320;
my $copy5_start = $end - $copy5_len;
# copy5_bs = 1;
my $copy5_bs = 1;
# copy5_count = 1 / 1 = copy5_len / copy5_bs = 1;
my $copy5_count = $copy5_len / $copy5_bs;
# copy5_skip = 4320 / 1 = copy5_start / copy5_bs = 4320
my $copy5_skip = $copy5_start / $copy5_bs;
# copy4_len = 4320 % 100 = copy5_start % big_block = 20;
my $copy4_len = $copy5_start % $big_block;
# copy4_start = end - copy5_len - copy4_len = 4300;
my $copy4_start = $end - $copy5_len - $copy4_len;
# copy4_bs = small_block = 10;
my $copy4_bs = $small_block;
# copy4_count = 20 / 10 = copy4_len / copy4_bs = 2;
my $copy4_count = $copy4_len / $copy4_bs;
# copy4_skip = 4300 / 10 = copy4_start / copy4_bs = 430
my $copy4_skip = $copy4_start / $copy4_bs;
# copy3_start = start + copy1_len + copy2_len = 1300;
my $copy3_start = $start + $copy1_len + $copy2_len;
# copy3_len = 4300 - 1300 = copy4_start - copy3_start = 3000;
my $copy3_len = $copy4_start - $copy3_start;
# copy3_bs = big_block = 100;
my $copy3_bs = $big_block;
# copy3_count = 3000 / 100 = copy3_len / copy3_bs = 3000;
my $copy3_count = $copy3_len / $copy3_bs;
# copy3_skip = 1300 / 100 = copy3_start / copy3_bs = 13
my $copy3_skip = $copy3_start / $copy3_bs;
return
"dd if=$file bs=$copy1_bs skip=$copy1_skip count=$copy1_count iflag=fullblock;" .
"dd if=$file bs=$copy2_bs skip=$copy2_skip count=$copy2_count iflag=fullblock;" .
"dd if=$file bs=$copy3_bs skip=$copy3_skip count=$copy3_count iflag=fullblock;" .
"dd if=$file bs=$copy4_bs skip=$copy4_skip count=$copy4_count iflag=fullblock;" .
"dd if=$file bs=$copy5_bs skip=$copy5_skip count=$copy5_count iflag=fullblock;"
;
}
sub spreadstdin {
# read a record
# Spawn a job and print the record to it.
@ -213,30 +391,7 @@ sub spreadstdin {
}
}
}
my ($recstart,$recend);
if(defined($opt::recstart) and defined($opt::recend)) {
# If both --recstart and --recend is given then both must match
$recstart = $opt::recstart;
$recend = $opt::recend;
} elsif(defined($opt::recstart)) {
# If --recstart is given it must match start of record
$recstart = $opt::recstart;
$recend = "";
} elsif(defined($opt::recend)) {
# If --recend is given then it must match end of record
$recstart = "";
$recend = $opt::recend;
}
if($opt::regexp) {
# If $recstart/$recend contains '|' this should only apply to the regexp
$recstart = "(?:".$recstart.")";
$recend = "(?:".$recend.")";
} else {
# $recstart/$recend = printf strings (\n)
$recstart =~ s/\\([rnt\'\"\\])/"qq|\\$1|"/gee;
$recend =~ s/\\([rnt\'\"\\])/"qq|\\$1|"/gee;
}
my ($recstart,$recend) = recstartrecend();
my $recendrecstart = $recend.$recstart;
my $chunk_number = 1;
my $one_time_through;
@ -355,6 +510,39 @@ sub spreadstdin {
}
}
sub recstartrecend {
# Uses:
# $opt::recstart
# $opt::recend
# Returns:
# $recstart,$recend with default values and regexp conversion
my($recstart,$recend);
if(defined($opt::recstart) and defined($opt::recend)) {
# If both --recstart and --recend is given then both must match
$recstart = $opt::recstart;
$recend = $opt::recend;
} elsif(defined($opt::recstart)) {
# If --recstart is given it must match start of record
$recstart = $opt::recstart;
$recend = "";
} elsif(defined($opt::recend)) {
# If --recend is given then it must match end of record
$recstart = "";
$recend = $opt::recend;
}
if($opt::regexp) {
# If $recstart/$recend contains '|' this should only apply to the regexp
$recstart = "(?:".$recstart.")";
$recend = "(?:".$recend.")";
} else {
# $recstart/$recend = printf strings (\n)
$recstart =~ s/\\([rnt\'\"\\])/"qq|\\$1|"/gee;
$recend =~ s/\\([rnt\'\"\\])/"qq|\\$1|"/gee;
}
return ($recstart,$recend);
}
sub nindex {
# See if string is in buffer N times
# Returns:
@ -591,6 +779,7 @@ sub options_hash {
"header=s" => \$opt::header,
"cat" => \$opt::cat,
"fifo" => \$opt::fifo,
"pipepart" => \$opt::pipepart,
);
}
@ -630,7 +819,7 @@ sub get_options_from_array {
sub parse_options {
# Returns: N/A
# Defaults:
$Global::version = 20140401;
$Global::version = 20140409;
$Global::progname = 'parallel';
$Global::infinity = 2**31;
$Global::debug = 0;
@ -3842,7 +4031,6 @@ sub sshcommand_of_sshlogin {
# Run a sleep that outputs data, so it will discover if the ssh connection closes.
my $sleep = ::shell_quote_scalar('$|=1;while(1){sleep 1;print "foo\n"}');
my @master = ("ssh", "-tt", "-MTS", $control_path, $serverlogin, "perl", "-e", $sleep);
::debug("@master\n");
exec(@master);
}
}
@ -5206,9 +5394,11 @@ sub new {
$command = $Global::replace{'{}'};
} elsif($opt::pipe) {
# With --pipe you can have ::: or not
if(@opt::a) {
$command .=" ".$Global::replace{'{}'};
}
# if(@opt::a) {
# $command .=" ".$Global::replace{'{}'};
# }
} elsif($opt::pipepart) {
# With --pipe-part you can have nothing
} else {
# Add {} to the command if there are no {...}'s
$command .=" ".$Global::replace{'{}'};
@ -5871,7 +6061,7 @@ sub get {
);
$cmd_line->populate();
::debug("cmd_line->number_of_args ".$cmd_line->number_of_args()."\n");
if($opt::pipe) {
if($opt::pipe or $opt::pipepart) {
if($cmd_line->replaced() eq "") {
# Empty command - pipe requires a command
::error("--pipe must have a command to pipe into (e.g. 'cat').\n");