src/parallel: Implemented --compress(-program). Test suite missing.

This commit is contained in:
Ole Tange 2013-11-03 04:47:20 +06:30
parent 59fd5ed103
commit 56be1f38c3
5 changed files with 194 additions and 154 deletions

View file

@ -201,71 +201,17 @@ cc:Sandro Cazzaniga <kharec@mandriva.org>,
Ryoichiro Suzuki <ryoichiro.suzuki@gmail.com>,
Jesse Alama <jesse.alama@gmail.com>
Subject: GNU Parallel 20131022 ('Westgate') released
Subject: GNU Parallel 20131122 ('') released
GNU Parallel 20131022 ('Westgate') has been released. It is
GNU Parallel 20131122 ('') has been released. It is
available for download at: http://ftp.gnu.org/gnu/parallel/
New in this release:
* --transfer files with /./ in the path will copy the files relative
to the --workdir.
*
* The maximal command length is now cached in a file halfing the
startup time.
* GNU Parallel was cited in: Investigating speaker gender using rhythm
metrics in Arabic dialects
http://ieeexplore.ieee.org/xpl/login.jsp?tp=&arnumber=6602389&url=http%3A%2F%2Fieeexplore.ieee.org%2Fxpls%2Fabs_all.jsp%3Farnumber%3D6602389
* GNU Parallel was cited in: Tiedon erist ̈minen ja visualisointi a
lastensuojelun asiakaskertomuksista
https://tampub.uta.fi/handle/10024/84978
* Some Additional Useful Bioinformatics Software
http://nix-bio.blogspot.dk/2013/10/additional-software-for-bioinformatics.html
* GNU Parallel Example Screen Cast (bil to Geotiff conversion)
http://www.youtube.com/watch?v=PpF27eC7Mys
* Using GNU Parallel to roll-your-own Map Reduce!
http://www.rankfocus.com/hello-world/
* Use multiple CPU Cores with your Linux commands
http://www.rankfocus.com/use-cpu-cores-linux-commands/
* Using GNU Parallel at HPC @ Uni.lu
https://hpc.uni.lu/users/use_cases/
* Scaling up with parallelization
https://www.msi.umn.edu/sites/default/files/AdvPython_1.pdf
* How to teach yourself to be a technical tester
http://infiniteundo.com/post/62421363463/qa-toc
* Faster Rasters For All
http://2013.foss4g.org/conf/programme/presentations/52/
* Optimizing translated file downloads
http://www.smartling.com/blog/2013/05/20/optimizing-translated-file-downloads/
* EMC NetWorker parallel saveset cloning with nsrclone and GNU parallel
http://www.beebotech.com.au/2013/10/emc-networker-parallel-saveset-cloning-with-nsrclone-and-gnu-parallel/
* Best Practices for Amazon EMR
http://media.amazonwebservices.com/AWS_Amazon_EMR_Best_Practices.pdf
* Computing checksums for backup
https://identi.ca/evan/note/6yf1GzAARtyBhj__xzMvAg
* Un exemplu de utilizare a programului GNU parallel
http://vundicind.blogspot.dk/2013/10/exemplu-utilizare-gnu-parallel.html
* 平行化你的工作
http://www.slideshare.net/drakeguan/part1-23705978
* なんか気持ちよかです Gnu Parallel
http://a3-lab.jpn.org/note/%E3%81%AA%E3%82%93%E3%81%8B%E6%B0%97%E6%8C%81%E3%81%A1%E3%82%88%E3%81%8B%E3%81%A7%E3%81%99-gnu-parallel
* 如何利用多核CPU來加速你的Linux命令 — awk, sed, bzip2, grep, wc等
http://www.hksilicon.com/kb/articles/290543/CPULinuxawk-sed-bzip2-grep-wc
* Bug fixes and man page updates.

View file

@ -694,6 +694,8 @@ sub options_hash {
"W=s" => \$opt::retired,
"tmpdir=s" => \$opt::tmpdir,
"tempdir=s" => \$opt::tmpdir,
"use-compress-program|compress-program=s" => \$opt::compress_program,
"compress" => \$opt::compress,
"tty" => \$opt::tty,
"T" => \$opt::retired,
"halt-on-error|halt=i" => \$opt::halt_on_error,
@ -901,6 +903,15 @@ sub parse_options {
# Set --delay to --sshdelay if not set
$opt::delay = $opt::sshdelay;
}
if($opt::compress_program) {
$opt::compress = 1;
$opt::decompress_program = $opt::compress_program." -dc";
}
if($opt::compress) {
my ($compress, $decompress) = find_compression_program();
$opt::compress_program ||= $compress;
$opt::decompress_program ||= $decompress;
}
if($opt::tollef and not $opt::gnu and not $opt::plain) {
# Behave like tollef parallel (from moreutils)
if($Global::version > 2014_02_22) {
@ -1179,10 +1190,37 @@ sub open_joblog {
}
}
sub find_compression_program {
# Find a fast compression program
# Returns:
# $compress_program = compress program with options
# $decompress_program = decompress program with options
# Search for these. Sorted by speed
my @prg = qw(lzop pigz gzip pbzip2 plzip bzip2 lzma lzip xz);
for my $p (@prg) {
if(which($p)) {
return ("$p -c -1","$p -dc");
}
}
# Fall back to cat
return ("cat","cat");
}
sub which {
# Input:
# $program = program to find the path to
# Returns:
# $full_path = full path to $program. undef if not found
my $program = $_[0];
return (grep { -e $_."/".$program } split(":",$ENV{'PATH'}))[0];
}
sub read_options {
# Read options from command line, profile and $PARALLEL
# Returns:
# @ARGV without --options
# @ARGV_no_opt = @ARGV without --options
# This must be done first as this may exec myself
if(defined $ARGV[0] and ($ARGV[0]=~/^--shebang/ or
$ARGV[0]=~/^--shebang-?wrap/ or
@ -1271,9 +1309,10 @@ sub read_args_from_command_line {
# Removes the arguments from @ARGV and:
# - puts filenames into -a
# - puts arguments into files and add the files to -a
# Input:
# @::ARGV = command option ::: arg arg arg :::: argfiles
# Returns:
# @ARGV without ::: and :::: and following args
# Input: @ARGV = command option ::: arg arg arg :::: argfiles
# @argv_no_argsep = @::ARGV without ::: and :::: and following args
my @new_argv = ();
for(my $arg = shift @ARGV; @ARGV; $arg = shift @ARGV) {
if($arg eq $Global::arg_sep
@ -1356,9 +1395,11 @@ sub shell_quote_empty {
sub shell_quote_scalar {
# Quote the string so shell will not expand any special chars
# Inputs:
# $string = string to be quoted
# Returns:
# string quoted with \ as needed by the shell
my $a = shift;
# $shell_quoted = string quoted with \ as needed by the shell
my $a = $_[0];
if(defined $a) {
$a =~ s/([\002-\011\013-\032\\\#\?\`\(\)\{\}\[\]\*\>\<\~\|\; \"\!\$\&\'\202-\377])/\\$1/g;
$a =~ s/[\n]/'\n'/g; # filenames with '\n' is quoted using \'
@ -1368,8 +1409,10 @@ sub shell_quote_scalar {
sub shell_quote_file {
# Quote the string so shell will not expand any special chars and prepend ./ if needed
# Input:
# $filename = filename to be shell quoted
# Returns:
# string quoted with \ as needed by the shell and ./ if needed
# $quoted_filename = filename quoted with \ as needed by the shell and ./ if needed
my $a = shell_quote_scalar(shift);
if(defined $a) {
if($a =~ m:^/: or $a =~ m:^\./:) {
@ -1382,30 +1425,14 @@ sub shell_quote_file {
return $a;
}
sub shell_quote_files {
# Quote the strings so shell will not expand any special chars and prepend ./ if needed
# Returns:
# strings quoted with \ as needed by the shell and ./ if needed
my @strings = shell_quote(@_);
for my $a (@strings) {
if(defined $a) {
if($a =~ m:^/: or m:\./:) {
# /abs/path or ./rel/path => skip
} else {
# rel/path => ./rel/path
$a = "./".$a;
}
}
}
return @strings;;
}
sub maybe_quote {
# If $Global::quoting then quote the string so shell will not expand any special chars
# If $Global::quoting is set then quote the string so shell will not expand any special chars
# Else do not quote
# Inputs:
# $string = string to be quoted
# Returns:
# if $Global::quoting string quoted with \ as needed by the shell
# else string unaltered
# $maybe_quoted_string = $string quoted if needed
if($Global::quoting) {
return shell_quote_scalar(@_);
} else {
@ -1416,9 +1443,10 @@ sub maybe_quote {
sub maybe_unquote {
# If $Global::quoting then unquote the string as shell would
# Else do not unquote
# Inputs:
# $maybe_quoted_string = string to be maybe unquoted
# Returns:
# if $Global::quoting string unquoted as done by the shell
# else string unaltered
# $string = $maybe_quoted_string unquoted if needed
if($Global::quoting) {
return shell_unquote(@_);
} else {
@ -1428,8 +1456,10 @@ sub maybe_unquote {
sub shell_unquote {
# Unquote strings from shell_quote
# Inputs:
# @strings = strings to be unquoted
# Returns:
# string with shell quoting removed
# @unquoted_strings = @strings with shell quoting removed
my @strings = (@_);
my $arg;
for my $arg (@strings) {
@ -1492,9 +1522,11 @@ sub enough_file_handles {
}
sub open_or_exit {
# Open a file name or exit if the fille cannot be opened
# Inputs:
# $file = filehandle or filename to open
# Returns:
# file handle to read-opened file
# exits if file cannot be opened
# $fh = file handle to read-opened file
my $file = shift;
if($file eq "-") {
$Global::stdin_in_opt_a = 1;
@ -1532,8 +1564,14 @@ sub init_run_jobs {
}
sub start_more_jobs {
# Run start_another_job() but only if:
# * not $Global::start_no_new_jobs set
# * not JobQueue is empty
# * not load on server is too high
# * not server swapping
# * not too short time since last remote login
# Returns:
# number of jobs started
# $jobs_started = number of jobs started
my $jobs_started = 0;
my $jobs_started_this_round = 0;
if($Global::start_no_new_jobs) {
@ -1592,8 +1630,14 @@ sub start_more_jobs {
}
sub start_another_job {
# Grab a job from Global::JobQueue, start it at sshlogin
# and remember the pid, the STDOUT and the STDERR handles
# If there are enough filehandles
# and JobQueue not empty
# and not $job is in joblog
# Then grab a job from Global::JobQueue,
# start it at sshlogin
# mark it as virgin_job
# Inputs:
# $sshlogin = the SSHLogin to start the job on
# Returns:
# 1 if another jobs was started
# 0 otherwise
@ -3872,8 +3916,10 @@ sub seq {
}
sub openoutputfiles {
# Open files for STDOUT and STDERR
# Set file handles in $self->fd and possibly $self->fd_input
my $self = shift;
my ($outfh,$errfh,$outname,$errname);
my ($outfh, $errfh, $outname, $errname);
if($opt::results) {
my $args_as_dirname = $self->{'commandline'}->args_as_dirname();
# prefix/name1/val1/name2/val2/
@ -3881,27 +3927,40 @@ sub openoutputfiles {
File::Path::mkpath($dir);
# prefix/name1/val1/name2/val2/stdout
$outname = "$dir/stdout";
if(not open($outfh,"+>",$outname)) {
if(not open($outfh, "+>", $outname)) {
::error("Cannot write to `$outname'.\n");
::wait_and_exit(255);
}
# prefix/name1/val1/name2/val2/stderr
$errname = "$dir/stderr";
if(not open($errfh,"+>",$errname)) {
if(not open($errfh, "+>", $errname)) {
::error("Cannot write to `$errname'.\n");
::wait_and_exit(255);
}
} elsif($Global::grouped) {
# To group we create temporary files for STDOUT and STDERR
# To avoid the cleanup unlink the files immediately (but keep them open)
($outfh, $outname) = ::tempfile(SUFFIX => ".par");
($errfh, $errname) = ::tempfile(SUFFIX => ".par");
if(@Global::tee_jobs) {
if($opt::compress) {
my($i_outfh,$i_errfh);
($i_outfh, $outname) = ::tempfile(SUFFIX => ".par");
($i_errfh, $errname) = ::tempfile(SUFFIX => ".par");
$self->set_fd_input(1, $i_outfh);
$self->set_fd_input(2, $i_errfh);
my $rm = $Global::debug ? "true" : "rm";
::debug("| ($rm $outname; $opt::compress_program) > $outname\n");
open($outfh,"|-","($rm $outname; $opt::compress_program) > $outname") || die $?;
::debug("| ($rm $errname; $opt::compress_program) > $errname\n");
open($errfh,"|-","($rm $errname; $opt::compress_program) > $errname") || die $?;
} elsif(@Global::tee_jobs) {
# files must be removed when the tee is done
} elsif($opt::files) {
($outfh, $outname) = ::tempfile(SUFFIX => ".par");
($errfh, $errname) = ::tempfile(SUFFIX => ".par");
# --files => only remove stderr
unlink $errname;
} else {
($outfh, $outname) = ::tempfile(SUFFIX => ".par");
($errfh, $errname) = ::tempfile(SUFFIX => ".par");
unlink $outname;
unlink $errname;
}
@ -3921,18 +3980,27 @@ sub openoutputfiles {
sub set_fd {
# Set file descriptor
my $self = shift;
my $fd_no = shift;
$self->{'fd'}{$fd_no} = shift;
my ($self, $fd_no, $fd) = @_;
$self->{'fd'}{$fd_no} = $fd;
}
sub fd {
# Get file descriptor
my $self = shift;
my $fd_no = shift;
my ($self, $fd_no) = @_;
return $self->{'fd'}{$fd_no};
}
sub set_fd_input {
my ($self, $fd_no, $fd_input) = @_;
$self->{'fd_input'}{$fd_no} = $fd_input;
}
sub fd_input {
# Get input file descriptor
my ($self, $fd_no) = @_;
return $self->{'fd_input'}{$fd_no};
}
sub set_fd_file_name {
# Set file name for a file descriptor
my $self = shift;
@ -4734,7 +4802,24 @@ sub print {
}
} else {
my $buf;
seek $in_fd, 0, 0;
if($opt::compress) {
# Close for writing so the compressor can finish up
# (Are we really sure it finishes? Potential bug)
close($in_fd);
# Save STDIN file handle
open(my $stdin_copy, "<&", "STDIN") or ::die_bug("Can't dup STDIN: $!");
my $fd = $self->fd_input($fdno);
# Replace STDIN with the file on disk
open(STDIN, "<&", $fd);
::debug("$opt::decompress_program |");
# Start decompression with the fake STDIN
open($in_fd, "-|", $opt::decompress_program) ||
::die_bug("Decompressing failed ($opt::decompress_program): $!");
# Put STDIN file handle back
open(STDIN, "<&", $stdin_copy) or ::die_bug("Can't dup STDIN: $!");
} else {
seek $in_fd, 0, 0;
}
if($opt::tag or defined $opt::tagstring) {
my $tag = $self->tag();
if($fdno == 2) {
@ -4756,7 +4841,7 @@ sub print {
if($fdno == 2) {
# OpenSSH_3.6.1p2 gives 'tcgetattr: Invalid argument' with -tt
# This is a crappy way of ignoring it.
sysread($in_fd,$buf,1_000_000);
sysread($in_fd,$buf,1_000);
$buf =~ s/^tcgetattr: Invalid argument\n//;
print $out_fd $buf;
}
@ -5243,7 +5328,7 @@ sub replaced {
if($self->{'replaced'} =~ /^\s*(-\S+)/) {
# Is this really a command in $PATH starting with '-'?
my $cmd = $1;
if(not grep { -e $_."/".$cmd } split(":",$ENV{'PATH'})) {
if(not which($cmd)) {
::error("Command ($cmd) starts with '-'. Is this a wrong option?\n");
::wait_and_exit(255);
}

Binary file not shown.

View file

@ -395,6 +395,24 @@ I<regexp> is a Perl Regular Expression:
http://perldoc.perl.org/perlre.html
=item B<--compress> (pre-alpha testing)
Compress temporary files. If the output is big and very compressible
this will take up less disk space in $TMPDIR and possibly be faster due to less
disk I/O.
GNU B<parallel> will try B<lzop>, B<pigz>, B<gzip>, B<pbzip2>,
B<plzip>, B<bzip2>, B<lzma>, B<lzip>, B<xz> in that order, and use the
first available.
=item B<--compress-program> I<prg> (pre-alpha testing)
Use I<prg> for compressing temporary files. It is assumed that I<prg
-dc> will decompress stdin (standard input) to stdout (standard
output).
=item B<--ctrlc>
Sends SIGINT to tasks running on remote computers thus killing them.
@ -2543,32 +2561,16 @@ computers:
B<echo >>B<jobqueue>; B<tail -f jobqueue | parallel -S ..>
There are a two small issues when using GNU B<parallel> as queue
system/batch manager:
=over 2
=item *
You will get a warning if you do not submit JobSlots jobs within
the first second. E.g. if you have 8 cores and use -j+2 you have to
submit 10 jobs. These can be dummy jobs (e.g. echo foo). You can also
simply ignore the warning. For parallel versions 20110322 and higher,
the warnings will not appear.
=item *
You have to submit JobSlot number of jobs before they will start, and
after that you can submit one at a time, and job will start
immediately if free slots are available. Output from the running or
completed jobs are held back and will only be printed when JobSlots
more jobs has been started (unless you use --ungroup or -u, in which
case the output from the jobs are printed immediately). E.g. if you
have 10 jobslots then the output from the first completed job will
only be printed when job 11 has started, and the output of second
completed job will only be printed when job 12 has started.
=back
There is a a small issue when using GNU B<parallel> as queue
system/batch manager: You have to submit JobSlot number of jobs before
they will start, and after that you can submit one at a time, and job
will start immediately if free slots are available. Output from the
running or completed jobs are held back and will only be printed when
JobSlots more jobs has been started (unless you use --ungroup or -u,
in which case the output from the jobs are printed immediately).
E.g. if you have 10 jobslots then the output from the first completed
job will only be printed when job 11 has started, and the output of
second completed job will only be printed when job 12 has started.
=head1 EXAMPLE: GNU Parallel as dir processor

View file

@ -418,6 +418,24 @@ separating the columns. The n'th column can be access using
@emph{regexp} is a Perl Regular Expression:
http://perldoc.perl.org/perlre.html
@item @strong{--compress} (pre-alpha testing)
@anchor{@strong{--compress} (pre-alpha testing)}
Compress temporary files. If the output is big and very compressible
this will take up less disk space in $TMPDIR and possibly be faster due to less
disk I/O.
GNU @strong{parallel} will try @strong{lzop}, @strong{pigz}, @strong{gzip}, @strong{pbzip2},
@strong{plzip}, @strong{bzip2}, @strong{lzma}, @strong{lzip}, @strong{xz} in that order, and use the
first available.
@item @strong{--compress-program} @emph{prg} (pre-alpha testing)
@anchor{@strong{--compress-program} @emph{prg} (pre-alpha testing)}
Use @emph{prg} for compressing temporary files. It is assumed that @emph{prg
-dc} will decompress stdin (standard input) to stdout (standard
output).
@item @strong{--ctrlc}
@anchor{@strong{--ctrlc}}
@ -2751,27 +2769,16 @@ computers:
@strong{echo }>@strong{jobqueue}; @strong{tail -f jobqueue | parallel -S ..}
There are a two small issues when using GNU @strong{parallel} as queue
system/batch manager:
@itemize
@item You will get a warning if you do not submit JobSlots jobs within
the first second. E.g. if you have 8 cores and use -j+2 you have to
submit 10 jobs. These can be dummy jobs (e.g. echo foo). You can also
simply ignore the warning. For parallel versions 20110322 and higher,
the warnings will not appear.
@item You have to submit JobSlot number of jobs before they will start, and
after that you can submit one at a time, and job will start
immediately if free slots are available. Output from the running or
completed jobs are held back and will only be printed when JobSlots
more jobs has been started (unless you use --ungroup or -u, in which
case the output from the jobs are printed immediately). E.g. if you
have 10 jobslots then the output from the first completed job will
only be printed when job 11 has started, and the output of second
completed job will only be printed when job 12 has started.
@end itemize
There is a a small issue when using GNU @strong{parallel} as queue
system/batch manager: You have to submit JobSlot number of jobs before
they will start, and after that you can submit one at a time, and job
will start immediately if free slots are available. Output from the
running or completed jobs are held back and will only be printed when
JobSlots more jobs has been started (unless you use --ungroup or -u,
in which case the output from the jobs are printed immediately).
E.g. if you have 10 jobslots then the output from the first completed
job will only be printed when job 11 has started, and the output of
second completed job will only be printed when job 12 has started.
@chapter EXAMPLE: GNU Parallel as dir processor
@anchor{EXAMPLE: GNU Parallel as dir processor}