parallel: --pipepart now works with --regexp

This commit is contained in:
Ole Tange 2014-05-28 23:45:13 +02:00
parent 1f43b57e64
commit 35939753d6
3 changed files with 48 additions and 19 deletions

View file

@ -228,6 +228,10 @@ Haiku of the month:
New in this release: New in this release:
* GNU Parallel was cited in: Ferroelectric contributions to anomalous hysteresis in hybrid perovskite solar cells http://arxiv.org/pdf/1405.5810.pdf
* Processes Paralleling to Speed up Computing and Tasks Execution in Linux http://kukuruku.co/hub/nix/processes-paralleling-to-speed-up-computing-and-tasks-execution-in-linux
* Speeding up grep log queries with GNU Parallel http://www.tripwire.com/state-of-security/incident-detection/speeding-grep-queries-gnu-parallel/ * Speeding up grep log queries with GNU Parallel http://www.tripwire.com/state-of-security/incident-detection/speeding-grep-queries-gnu-parallel/
* Bug fixes and man page updates. * Bug fixes and man page updates.

View file

@ -211,14 +211,11 @@ if($opt::halt_on_error) {
sub __PIPE_MODE__ {} sub __PIPE_MODE__ {}
# parallel --part-pipe -a bigfile cat
# =>
# (dd 1) | cat
# (dd 2) | cat
# (dd 3) | cat
sub pipe_part_files { sub pipe_part_files {
# Input:
# $file = the file to read
# Returns:
# @commands to run to pipe the blocks of the file to the command given
my ($file) = @_; my ($file) = @_;
# find positions # find positions
my @pos = find_split_positions($file,$opt::blocksize); my @pos = find_split_positions($file,$opt::blocksize);
@ -258,6 +255,16 @@ sub find_split_positions {
my $buf; my $buf;
seek($fh, $pos, 0) || die; seek($fh, $pos, 0) || die;
while(read($fh,substr($buf,length $buf,0),$dd_block_size)) { while(read($fh,substr($buf,length $buf,0),$dd_block_size)) {
if($opt::regexp) {
# If match /$recend$recstart/ => Record position
if($buf =~ /(.*$recend)$recstart/os) {
my $i = length($1);
push(@pos,$pos+$i);
# Start looking for next record _after_ this match
$pos += $i;
last;
}
} else {
# If match $recend$recstart => Record position # If match $recend$recstart => Record position
my $i = index($buf,$recendrecstart); my $i = index($buf,$recendrecstart);
if($i != -1) { if($i != -1) {
@ -268,6 +275,7 @@ sub find_split_positions {
} }
} }
} }
}
push(@pos,$size); push(@pos,$size);
close $fh; close $fh;
return @pos; return @pos;
@ -1303,6 +1311,10 @@ sub shell_quote {
} }
sub shell_quote_empty { sub shell_quote_empty {
# Inputs:
# @strings = strings to be quoted
# Returns:
# @quoted_strings = empty strings quoted as ''.
my @strings = shell_quote(@_); my @strings = shell_quote(@_);
for my $a (@strings) { for my $a (@strings) {
if($a eq "") { if($a eq "") {
@ -1344,7 +1356,6 @@ sub shell_quote_file {
return $a; return $a;
} }
sub maybe_quote { sub maybe_quote {
# If $Global::quoting is set then quote the string so shell will not expand any special chars # If $Global::quoting is set then quote the string so shell will not expand any special chars
# Else do not quote # Else do not quote
@ -1385,9 +1396,13 @@ sub shell_unquote {
if(not defined $arg) { if(not defined $arg) {
$arg = ""; $arg = "";
} }
$arg =~ s/'\n'/\n/g; # filenames with '\n' is quoted using \' # filenames with '\n' is quoted using \'\n\'
$arg =~ s/'\n'/\n/g;
# Non-printables
$arg =~ s/\\([\002-\011\013-\032])/$1/g; $arg =~ s/\\([\002-\011\013-\032])/$1/g;
# Shell special chars
$arg =~ s/\\([\#\?\`\(\)\{\}\*\>\<\~\|\; \"\!\$\&\'])/$1/g; $arg =~ s/\\([\#\?\`\(\)\{\}\*\>\<\~\|\; \"\!\$\&\'])/$1/g;
# Backslash
$arg =~ s/\\\\/\\/g; $arg =~ s/\\\\/\\/g;
} }
return wantarray ? @strings : "@strings"; return wantarray ? @strings : "@strings";
@ -1416,7 +1431,7 @@ sub save_stdin_stdout_stderr {
} }
sub enough_file_handles { sub enough_file_handles {
# check that we have enough filehandles available for starting # Check that we have enough filehandles available for starting
# another job # another job
# Returns: # Returns:
# 1 if ungrouped (thus not needing extra filehandles) # 1 if ungrouped (thus not needing extra filehandles)
@ -1441,7 +1456,7 @@ sub enough_file_handles {
} }
sub open_or_exit { sub open_or_exit {
# Open a file name or exit if the fille cannot be opened # Open a file name or exit if the file cannot be opened
# Inputs: # Inputs:
# $file = filehandle or filename to open # $file = filehandle or filename to open
# Returns: # Returns:
@ -1497,8 +1512,10 @@ sub start_more_jobs {
return $jobs_started; return $jobs_started;
} }
if($Global::max_procs_file) { if($Global::max_procs_file) {
# --jobs filename
my $mtime = (stat($Global::max_procs_file))[9]; my $mtime = (stat($Global::max_procs_file))[9];
if($mtime > $Global::max_procs_file_last_mod) { if($mtime > $Global::max_procs_file_last_mod) {
# file changed: Force re-computing max_jobs_running
$Global::max_procs_file_last_mod = $mtime; $Global::max_procs_file_last_mod = $mtime;
for my $sshlogin (values %Global::host) { for my $sshlogin (values %Global::host) {
$sshlogin->set_max_jobs_running(undef); $sshlogin->set_max_jobs_running(undef);
@ -1511,6 +1528,7 @@ sub start_more_jobs {
# thus distribute the jobs on the --sshlogins round robin # thus distribute the jobs on the --sshlogins round robin
for my $sshlogin (values %Global::host) { for my $sshlogin (values %Global::host) {
if($Global::JobQueue->empty() and not $opt::pipe) { if($Global::JobQueue->empty() and not $opt::pipe) {
# No more jobs in the queue
last; last;
} }
debug("Running jobs before on ".$sshlogin->string().": ".$sshlogin->jobs_running()."\n"); debug("Running jobs before on ".$sshlogin->string().": ".$sshlogin->jobs_running()."\n");
@ -1528,7 +1546,7 @@ sub start_more_jobs {
next; next;
} }
if($opt::delay and $opt::delay > ::now() - $Global::newest_starttime) { if($opt::delay and $opt::delay > ::now() - $Global::newest_starttime) {
# It has been too short since # It has been too short since last start
next; next;
} }
debug($sshlogin->string()." has ".$sshlogin->jobs_running() debug($sshlogin->string()." has ".$sshlogin->jobs_running()

View file

@ -2583,6 +2583,13 @@ files are passed to the second B<parallel> that runs B<sort -m> on the
files before it removes the files. The output is saved to files before it removes the files. The output is saved to
B<bigfile.sort>. B<bigfile.sort>.
GNU B<parallel>'s B<--pipe> maxes out at around 100 MB/s because every
byte has to be copied through GNU B<parallel>. But if B<bigfile> is a
real (seekable) file GNU B<parallel> can by-pass the copying and send
the parts directly to the program:
B<parallel --pipepart --block 100m -a bigfile --files sort | parallel -Xj1 sort -m {} ';' rm {} >>B<bigfile.sort>
=head1 EXAMPLE: Running more than 500 jobs workaround =head1 EXAMPLE: Running more than 500 jobs workaround