mirror of
https://git.savannah.gnu.org/git/parallel.git
synced 2024-12-22 20:57:53 +00:00
Released as 20160101 ('20160101alpha')
This commit is contained in:
parent
fb03c5f05f
commit
f4c20c13b0
12
README
12
README
|
@ -40,9 +40,9 @@ document.
|
|||
|
||||
Full installation of GNU Parallel is as simple as:
|
||||
|
||||
wget http://ftpmirror.gnu.org/parallel/parallel-20151222.tar.bz2
|
||||
bzip2 -dc parallel-20151222.tar.bz2 | tar xvf -
|
||||
cd parallel-20151222
|
||||
wget http://ftpmirror.gnu.org/parallel/parallel-20160101.tar.bz2
|
||||
bzip2 -dc parallel-20160101.tar.bz2 | tar xvf -
|
||||
cd parallel-20160101
|
||||
./configure && make && sudo make install
|
||||
|
||||
|
||||
|
@ -51,9 +51,9 @@ Full installation of GNU Parallel is as simple as:
|
|||
If you are not root you can add ~/bin to your path and install in
|
||||
~/bin and ~/share:
|
||||
|
||||
wget http://ftpmirror.gnu.org/parallel/parallel-20151222.tar.bz2
|
||||
bzip2 -dc parallel-20151222.tar.bz2 | tar xvf -
|
||||
cd parallel-20151222
|
||||
wget http://ftpmirror.gnu.org/parallel/parallel-20160101.tar.bz2
|
||||
bzip2 -dc parallel-20160101.tar.bz2 | tar xvf -
|
||||
cd parallel-20160101
|
||||
./configure --prefix=$HOME && make && make install
|
||||
|
||||
Or if your system lacks 'make' you can simply copy src/parallel
|
||||
|
|
20
configure
vendored
20
configure
vendored
|
@ -1,6 +1,6 @@
|
|||
#! /bin/sh
|
||||
# Guess values for system-dependent variables and create Makefiles.
|
||||
# Generated by GNU Autoconf 2.69 for parallel 20151222.
|
||||
# Generated by GNU Autoconf 2.69 for parallel 20160101.
|
||||
#
|
||||
# Report bugs to <bug-parallel@gnu.org>.
|
||||
#
|
||||
|
@ -579,8 +579,8 @@ MAKEFLAGS=
|
|||
# Identity of this package.
|
||||
PACKAGE_NAME='parallel'
|
||||
PACKAGE_TARNAME='parallel'
|
||||
PACKAGE_VERSION='20151222'
|
||||
PACKAGE_STRING='parallel 20151222'
|
||||
PACKAGE_VERSION='20160101'
|
||||
PACKAGE_STRING='parallel 20160101'
|
||||
PACKAGE_BUGREPORT='bug-parallel@gnu.org'
|
||||
PACKAGE_URL=''
|
||||
|
||||
|
@ -1203,7 +1203,7 @@ if test "$ac_init_help" = "long"; then
|
|||
# Omit some internal or obsolete options to make the list less imposing.
|
||||
# This message is too long to be a string in the A/UX 3.1 sh.
|
||||
cat <<_ACEOF
|
||||
\`configure' configures parallel 20151222 to adapt to many kinds of systems.
|
||||
\`configure' configures parallel 20160101 to adapt to many kinds of systems.
|
||||
|
||||
Usage: $0 [OPTION]... [VAR=VALUE]...
|
||||
|
||||
|
@ -1269,7 +1269,7 @@ fi
|
|||
|
||||
if test -n "$ac_init_help"; then
|
||||
case $ac_init_help in
|
||||
short | recursive ) echo "Configuration of parallel 20151222:";;
|
||||
short | recursive ) echo "Configuration of parallel 20160101:";;
|
||||
esac
|
||||
cat <<\_ACEOF
|
||||
|
||||
|
@ -1345,7 +1345,7 @@ fi
|
|||
test -n "$ac_init_help" && exit $ac_status
|
||||
if $ac_init_version; then
|
||||
cat <<\_ACEOF
|
||||
parallel configure 20151222
|
||||
parallel configure 20160101
|
||||
generated by GNU Autoconf 2.69
|
||||
|
||||
Copyright (C) 2012 Free Software Foundation, Inc.
|
||||
|
@ -1362,7 +1362,7 @@ cat >config.log <<_ACEOF
|
|||
This file contains any messages produced by compilers while
|
||||
running configure, to aid debugging if configure makes a mistake.
|
||||
|
||||
It was created by parallel $as_me 20151222, which was
|
||||
It was created by parallel $as_me 20160101, which was
|
||||
generated by GNU Autoconf 2.69. Invocation command line was
|
||||
|
||||
$ $0 $@
|
||||
|
@ -2225,7 +2225,7 @@ fi
|
|||
|
||||
# Define the identity of the package.
|
||||
PACKAGE='parallel'
|
||||
VERSION='20151222'
|
||||
VERSION='20160101'
|
||||
|
||||
|
||||
cat >>confdefs.h <<_ACEOF
|
||||
|
@ -2867,7 +2867,7 @@ cat >>$CONFIG_STATUS <<\_ACEOF || ac_write_fail=1
|
|||
# report actual input values of CONFIG_FILES etc. instead of their
|
||||
# values after options handling.
|
||||
ac_log="
|
||||
This file was extended by parallel $as_me 20151222, which was
|
||||
This file was extended by parallel $as_me 20160101, which was
|
||||
generated by GNU Autoconf 2.69. Invocation command line was
|
||||
|
||||
CONFIG_FILES = $CONFIG_FILES
|
||||
|
@ -2929,7 +2929,7 @@ _ACEOF
|
|||
cat >>$CONFIG_STATUS <<_ACEOF || ac_write_fail=1
|
||||
ac_cs_config="`$as_echo "$ac_configure_args" | sed 's/^ //; s/[\\""\`\$]/\\\\&/g'`"
|
||||
ac_cs_version="\\
|
||||
parallel config.status 20151222
|
||||
parallel config.status 20160101
|
||||
configured by $0, generated by GNU Autoconf 2.69,
|
||||
with options \\"\$ac_cs_config\\"
|
||||
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
AC_INIT([parallel], [20151222], [bug-parallel@gnu.org])
|
||||
AC_INIT([parallel], [20160101], [bug-parallel@gnu.org])
|
||||
AM_INIT_AUTOMAKE([-Wall -Werror foreign])
|
||||
AC_CONFIG_HEADERS([config.h])
|
||||
AC_CONFIG_FILES([
|
||||
|
|
|
@ -212,9 +212,9 @@ cc:Tim Cuthbertson <tim3d.junk@gmail.com>,
|
|||
Ryoichiro Suzuki <ryoichiro.suzuki@gmail.com>,
|
||||
Jesse Alama <jesse.alama@gmail.com>
|
||||
|
||||
Subject: GNU Parallel 20151222 ('') released <<[stable]>>
|
||||
Subject: GNU Parallel 20160122 ('') released <<[stable]>>
|
||||
|
||||
GNU Parallel 20151222 ('') <<[stable]>> has been released. It is available for download at: http://ftp.gnu.org/gnu/parallel/
|
||||
GNU Parallel 20160122 ('') <<[stable]>> has been released. It is available for download at: http://ftp.gnu.org/gnu/parallel/
|
||||
|
||||
<<No new functionality was introduced so this is a good candidate for a stable release.>>
|
||||
|
||||
|
@ -227,11 +227,19 @@ Haiku of the month:
|
|||
|
||||
New in this release:
|
||||
|
||||
* --transfer is now an alias for --transferfile {}.
|
||||
* --sql DBURL uses DBURL as storage for jobs and output. It does not run any jobs so it requires at least one --sqlworker. DBURL must point to a table.
|
||||
|
||||
* --transferfile works like --transfer, but takes an argument like --return. This makes it possible to combine transferring files with multiple input sources: parallel -S server --tf {1} wc {2} {1} ::: * ::: -l -w -c
|
||||
* --sqlworker DBURL gets jobs from DBURL and stores the result back to DBURL.
|
||||
|
||||
* total_jobs() can now be used in {= =}: parallel echo job {#} of '{= $_=total_jobs() =}' ::: {1..50}
|
||||
* --sqlandworker is a shorthand for --sql and --sqlworker.
|
||||
|
||||
* --sqlworker requires the output of a single job to fit in memory.
|
||||
|
||||
* --results now also saves a file called 'seq' containing the sequence number.
|
||||
|
||||
* If $PARALLEL_ENV is a file, then that file will be read into $PARALLEL_ENV.
|
||||
|
||||
* man parallel_tutorial has been given an overhaul.
|
||||
|
||||
* << kontakt GNU Parallel was used (unfortunately without citation) in: Instrumentation and Trace Analysis for Ad-hoc Python Workflows in Cloud Environments http://ieeexplore.ieee.org/xpl/articleDetails.jsp?arnumber=7214035>>
|
||||
|
||||
|
@ -249,17 +257,13 @@ for Big Data Applications https://ieeexplore.ieee.org/stamp/stamp.jsp?tp=&arnumb
|
|||
|
||||
* <<Citation needed: Introspecting for RSA Key Material to Assist Intrusion Detection http://ieeexplore.ieee.org/xpl/login.jsp?tp=&arnumber=7331177&url=http%3A%2F%2Fieeexplore.ieee.org%2Fxpls%2Fabs_all.jsp%3Farnumber%3D7331177>>
|
||||
|
||||
* GNU Parallel was cited in: Evolution and Learning in Heterogeneous Environments http://research.gold.ac.uk/15078/1/COM_thesis_JonesD_2015.pdf
|
||||
* GNU Parallel is used in LAST: http://last.cbrc.jp/
|
||||
|
||||
* GNU Parallel was cited in: Contrasting genetic architectures of schizophrenia and other complex diseases using fast variance-components analysis http://www.nature.com/ng/journal/v47/n12/full/ng.3431.html
|
||||
* GNU Parallel was cited in: Possum - A Framework for Three-Dimensional Reconstruction of Brain Images rfom Serial Sections http://link.springer.com/article/10.1007/s12021-015-9286-1
|
||||
|
||||
* GNU Parallel was cited in: Efficient Retrieval of Key Material for Inspecting Potentially Malicious Traffic in the Cloud http://www.cs.bham.ac.uk/~bxb/Papres/2015.1.pdf
|
||||
* GNU Parallel was used in: Mission Impossible: you have 1 minute to analyze the Ebola Genome https://www.biostars.org/p/119397
|
||||
|
||||
* GNU Parallel was cited in: Achieving Consistent Doppler Measurements from SDO/HMI Vector Field Inversions http://arxiv.org/pdf/1511.06500.pdf
|
||||
|
||||
* Flo uses GNU Parallel: https://github.com/wurmlab/flo
|
||||
|
||||
* 使用 GNU parallel 來平行運算http://mutolisp.logdown.com/posts/316959-using-gnu-parallel-to-parallel-computing
|
||||
* Distributed Log Search Using GNU Parallel http://blog.codehate.com/post/134320079974/distributed-log-search-using-gnu-parallel
|
||||
|
||||
* Bug fixes and man page updates.
|
||||
|
||||
|
|
|
@ -78,7 +78,7 @@ NORMAL_UNINSTALL = :
|
|||
PRE_UNINSTALL = :
|
||||
POST_UNINSTALL = :
|
||||
subdir = src
|
||||
DIST_COMMON = $(srcdir)/Makefile.in $(srcdir)/Makefile.am README
|
||||
DIST_COMMON = $(srcdir)/Makefile.in $(srcdir)/Makefile.am
|
||||
ACLOCAL_M4 = $(top_srcdir)/aclocal.m4
|
||||
am__aclocal_m4_deps = $(top_srcdir)/configure.ac
|
||||
am__configure_deps = $(am__aclocal_m4_deps) $(CONFIGURE_DEPENDENCIES) \
|
||||
|
|
|
@ -24,7 +24,7 @@
|
|||
use strict;
|
||||
use Getopt::Long;
|
||||
$Global::progname="niceload";
|
||||
$Global::version = 20151222;
|
||||
$Global::version = 20160101;
|
||||
Getopt::Long::Configure("bundling","require_order");
|
||||
get_options_from_array(\@ARGV) || die_usage();
|
||||
if($opt::version) {
|
||||
|
|
583
src/parallel
583
src/parallel
|
@ -40,7 +40,7 @@ parse_options();
|
|||
::debug("init", "Open file descriptors: ", join(" ",keys %Global::fd), "\n");
|
||||
my $number_of_args;
|
||||
if($Global::max_number_of_args) {
|
||||
$number_of_args=$Global::max_number_of_args;
|
||||
$number_of_args = $Global::max_number_of_args;
|
||||
} elsif ($opt::X or $opt::m or $opt::xargs) {
|
||||
$number_of_args = undef;
|
||||
} else {
|
||||
|
@ -59,6 +59,15 @@ if($opt::pipepart) {
|
|||
@input_source_fh = (*STDIN);
|
||||
}
|
||||
}
|
||||
if($opt::sql) {
|
||||
# Create SQL table to hold joblog + output
|
||||
$Global::sql->create_table($#input_source_fh+1);
|
||||
if($opt::sqlworker) {
|
||||
# Start a real --sqlworker in the background later
|
||||
$Global::sqlworker = 1;
|
||||
$opt::sqlworker = undef;
|
||||
}
|
||||
}
|
||||
|
||||
if($opt::skip_first_line) {
|
||||
# Skip the first line for the first file handle
|
||||
|
@ -751,6 +760,9 @@ sub options_hash {
|
|||
"m" => \$opt::m,
|
||||
"X" => \$opt::X,
|
||||
"v" => \@opt::v,
|
||||
"sql=s" => \$opt::sql,
|
||||
"sqlworker=s" => \$opt::sqlworker,
|
||||
"sqlandworker=s" => \$opt::sqlandworker,
|
||||
"joblog=s" => \$opt::joblog,
|
||||
"results|result|res=s" => \$opt::results,
|
||||
"resume" => \$opt::resume,
|
||||
|
@ -935,7 +947,7 @@ sub get_options_from_array {
|
|||
sub parse_options {
|
||||
# Returns: N/A
|
||||
init_globals();
|
||||
@ARGV=read_options();
|
||||
@ARGV = read_options();
|
||||
|
||||
# no-* overrides *
|
||||
if($opt::nokeeporder) { $opt::keeporder = undef; }
|
||||
|
@ -960,6 +972,7 @@ sub parse_options {
|
|||
if(defined $opt::tmpdir) { $ENV{'TMPDIR'} = $opt::tmpdir; }
|
||||
$opt::nice ||= 0;
|
||||
if(defined $opt::help) { die_usage(); }
|
||||
if(defined $opt::sqlandworker) { $opt::sql = $opt::sqlworker = $opt::sqlandworker; }
|
||||
if(defined $opt::colsep) { $Global::trim = 'lr'; }
|
||||
if(defined $opt::header) { $opt::colsep = defined $opt::colsep ? $opt::colsep : "\t"; }
|
||||
if(defined $opt::trim) { $Global::trim = $opt::trim; }
|
||||
|
@ -1108,6 +1121,7 @@ sub parse_options {
|
|||
$opt::jobs = "100%";
|
||||
}
|
||||
open_joblog();
|
||||
($opt::sql or $opt::sqlworker) and $Global::sql = SQL->new($opt::sql || $opt::sqlworker);
|
||||
}
|
||||
|
||||
sub check_invalid_option_combinations {
|
||||
|
@ -1162,7 +1176,7 @@ sub check_invalid_option_combinations {
|
|||
|
||||
sub init_globals {
|
||||
# Defaults:
|
||||
$Global::version = 20151222;
|
||||
$Global::version = 20160101;
|
||||
$Global::progname = 'parallel';
|
||||
$Global::infinity = 2**31;
|
||||
$Global::debug = 0;
|
||||
|
@ -1595,6 +1609,7 @@ sub read_options {
|
|||
|
||||
Getopt::Long::Configure("bundling","require_order");
|
||||
my @ARGV_copy = @ARGV;
|
||||
my @ARGV_orig = @ARGV;
|
||||
# Check if there is a --profile to set @opt::profile
|
||||
get_options_from_array(\@ARGV_copy,"profile|J=s","plain") || die_usage();
|
||||
my @ARGV_profile = ();
|
||||
|
@ -1644,12 +1659,29 @@ sub read_options {
|
|||
get_options_from_array(\@ARGV_profile) || die_usage();
|
||||
get_options_from_array(\@ARGV_env) || die_usage();
|
||||
get_options_from_array(\@ARGV) || die_usage();
|
||||
|
||||
# What were the options given on the command line?
|
||||
# Used to start --sqlworker
|
||||
my $ai = arrayindex(\@ARGV_orig, \@ARGV);
|
||||
@Global::options_in_argv = @ARGV_orig[0..$ai-1];
|
||||
# Prepend non-options to @ARGV (such as commands like 'nice')
|
||||
unshift @ARGV, @ARGV_profile, @ARGV_env;
|
||||
return @ARGV;
|
||||
}
|
||||
|
||||
sub arrayindex {
|
||||
# Similar to Perl's index function, but for arrays
|
||||
# Input:
|
||||
# $arr_ref1 = ref to @array1 to search in
|
||||
# $arr_ref2 = ref to @array2 to search for
|
||||
my ($arr_ref1,$arr_ref2) = @_;
|
||||
my $array1_as_string = join "", map { "\257\257".$_ } @$arr_ref1;
|
||||
my $array2_as_string = join "", map { "\257\257".$_ } @$arr_ref2;
|
||||
my $i = index($array1_as_string,$array2_as_string,0);
|
||||
if($i == -1) { return -1 }
|
||||
my @before = split /\257\257/, substr($array1_as_string,0,$i);
|
||||
return $#before;
|
||||
}
|
||||
|
||||
sub read_args_from_command_line {
|
||||
# Arguments given on the command line after:
|
||||
# ::: ($Global::arg_sep)
|
||||
|
@ -2322,9 +2354,27 @@ sub drain_job_queue {
|
|||
::warning("There are no job slots available. Increase --jobs.");
|
||||
}
|
||||
}
|
||||
while($opt::sql and not $Global::sql->finished()) {
|
||||
# SQL master
|
||||
$sleep = ::reap_usleep($sleep);
|
||||
if($Global::sqlworker) {
|
||||
# Start an SQL worker as we are now sure there is work to do
|
||||
$Global::sqlworker = 0;
|
||||
if(fork()) {
|
||||
# skip
|
||||
} else {
|
||||
# Replace --sql/--sqlandworker with --sqlworker
|
||||
my @ARGV = map { s/^--sql(andworker)?$/--sqlworker/; $_ } @Global::options_in_argv;
|
||||
# exec the --sqlworker
|
||||
exec($0,::shell_quote(@ARGV),@command);
|
||||
}
|
||||
}
|
||||
}
|
||||
} while ($Global::total_running > 0
|
||||
or
|
||||
not $Global::start_no_new_jobs and not $Global::JobQueue->empty());
|
||||
not $Global::start_no_new_jobs and not $Global::JobQueue->empty()
|
||||
or
|
||||
$opt::sql and not $Global::sql->finished());
|
||||
if($opt::progress) {
|
||||
my %progress = progress();
|
||||
::status("\r", $progress{'status'}, "\n");
|
||||
|
@ -3291,6 +3341,8 @@ sub reaper {
|
|||
my $stiff;
|
||||
my @pids_reaped;
|
||||
debug("run", "Reaper ");
|
||||
# For efficiency surround with BEGIN/COMMIT when using $opt::sql
|
||||
$opt::sql and $Global::sql->run("BEGIN;");
|
||||
while (($stiff = waitpid(-1, &WNOHANG)) > 0) {
|
||||
# $stiff = pid of dead process
|
||||
push(@pids_reaped,$stiff);
|
||||
|
@ -3348,6 +3400,7 @@ sub reaper {
|
|||
::status("\r",$progress{'status'});
|
||||
}
|
||||
}
|
||||
$opt::sql and $Global::sql->run("COMMIT;");
|
||||
debug("run", "done ");
|
||||
return @pids_reaped;
|
||||
}
|
||||
|
@ -3741,6 +3794,13 @@ sub undef_as_empty {
|
|||
return $a ? $a : "";
|
||||
}
|
||||
|
||||
sub undef_if_empty {
|
||||
if(defined($_[0]) and $_[0] eq "") {
|
||||
return undef;
|
||||
}
|
||||
return $_[0];
|
||||
}
|
||||
|
||||
sub multiply_binary_prefix {
|
||||
# Evalualte numbers with binary prefix
|
||||
# Ki=2^10, Mi=2^20, Gi=2^30, Ti=2^40, Pi=2^50, Ei=2^70, Zi=2^80, Yi=2^80
|
||||
|
@ -6092,6 +6152,15 @@ sub openoutputfiles {
|
|||
$dir = $opt::results."/".$args_as_dirname;
|
||||
File::Path::mkpath($dir);
|
||||
}
|
||||
# prefix/name1/val1/name2/val2/seq
|
||||
my $seqname = "$dir/seq";
|
||||
my $seqfhw;
|
||||
if(not open($seqfhw, "+>", $seqname)) {
|
||||
::error("Cannot write to `$seqname'.");
|
||||
::wait_and_exit(255);
|
||||
}
|
||||
print $seqfhw $self->seq();
|
||||
close $seqfhw;
|
||||
# prefix/name1/val1/name2/val2/stdout
|
||||
$outname = "$dir/stdout";
|
||||
if(not open($outfhw, "+>", $outname)) {
|
||||
|
@ -6106,6 +6175,13 @@ sub openoutputfiles {
|
|||
}
|
||||
$self->set_fh(1,"unlink","");
|
||||
$self->set_fh(2,"unlink","");
|
||||
if($opt::sqlworker) {
|
||||
# Save the filenames in SQL table
|
||||
$Global::sql->update("SET Stdout = ? WHERE Seq = ".$self->seq(),
|
||||
$outname);
|
||||
$Global::sql->update("SET Stderr = ? WHERE Seq = ".$self->seq(),
|
||||
$errname);
|
||||
}
|
||||
} elsif(not $opt::ungroup) {
|
||||
# To group we create temporary files for STDOUT and STDERR
|
||||
# To avoid the cleanup unlink the files immediately (but keep them open)
|
||||
|
@ -6404,6 +6480,9 @@ sub set_starttime {
|
|||
my $self = shift;
|
||||
my $starttime = shift || ::now();
|
||||
$self->{'starttime'} = $starttime;
|
||||
$opt::sqlworker and
|
||||
$Global::sql->update("SET Starttime = ? WHERE Seq = ".$self->seq(),
|
||||
$starttime);
|
||||
}
|
||||
|
||||
sub runtime {
|
||||
|
@ -6425,6 +6504,9 @@ sub set_endtime {
|
|||
my $self = shift;
|
||||
my $endtime = shift;
|
||||
$self->{'endtime'} = $endtime;
|
||||
$opt::sqlworker and
|
||||
$Global::sql->update("SET JobRuntime = ? WHERE Seq = ".$self->seq(),
|
||||
$self->runtime());
|
||||
}
|
||||
|
||||
sub is_timedout {
|
||||
|
@ -6635,6 +6717,14 @@ sub wrapped {
|
|||
';';
|
||||
}
|
||||
if($ENV{'PARALLEL_ENV'}) {
|
||||
if(-e $ENV{'PARALLEL_ENV'}) {
|
||||
# This is a file/fifo: Replace envvar with content of file
|
||||
open(my $parallel_env, "<", $ENV{'PARALLEL_ENV'}) ||
|
||||
::die_bug("Cannot read parallel_env from $ENV{'PARALLEL_ENV'}");
|
||||
local $/;
|
||||
$ENV{'PARALLEL_ENV'} = <$parallel_env>;
|
||||
close $parallel_env;
|
||||
}
|
||||
# If $PARALLEL_ENV set, put that in front of the command
|
||||
# Used for importing functions for fish
|
||||
# Map \001 to \n to make it easer to quote \n in $PARALLEL_ENV
|
||||
|
@ -6664,8 +6754,11 @@ sub wrapped {
|
|||
and
|
||||
length $command > 499) {
|
||||
# csh does not like words longer than 1000 (499 quoted)
|
||||
$command = "perl -e '".base64_zip_eval()."' ".
|
||||
join" ",string_zip_base64('exec "'.::perl_quote_scalar($command).'"');
|
||||
# bzip2 breaks --sql mysql://...
|
||||
# $command = "perl -e '".base64_zip_eval()."' ".
|
||||
# join" ",string_zip_base64('exec "'.::perl_quote_scalar($command).'"');
|
||||
$command = "perl -e '".base64_eval()."' ".
|
||||
join" ",string_base64('exec "'.::perl_quote_scalar($command).'"');
|
||||
}
|
||||
$self->{'wrapped'} = $command;
|
||||
}
|
||||
|
@ -6678,6 +6771,9 @@ sub set_sshlogin {
|
|||
$self->{'sshlogin'} = $sshlogin;
|
||||
delete $self->{'sshlogin_wrap'}; # If sshlogin is changed the wrap is wrong
|
||||
delete $self->{'wrapped'};
|
||||
$opt::sqlworker and
|
||||
$Global::sql->update("SET Host = ? WHERE Seq = ".$self->seq(),
|
||||
$sshlogin->string());
|
||||
}
|
||||
|
||||
sub sshlogin {
|
||||
|
@ -6685,6 +6781,18 @@ sub sshlogin {
|
|||
return $self->{'sshlogin'};
|
||||
}
|
||||
|
||||
sub string_base64 {
|
||||
# Base64 encode it into 1000 byte blocks.
|
||||
# 1000 bytes is the largest word size csh supports
|
||||
# Input:
|
||||
# @strings = to be encoded
|
||||
# Returns:
|
||||
# @base64 = 1000 byte block
|
||||
$Global::use{"MIME::Base64"} ||= eval "use MIME::Base64; 1;";
|
||||
my @base64 = unpack("(A1000)*",encode_base64((join"",@_),""));
|
||||
return @base64;
|
||||
}
|
||||
|
||||
sub string_zip_base64 {
|
||||
# Pipe string through 'bzip2 -9' and base64 encode it into 1000
|
||||
# byte blocks.
|
||||
|
@ -6707,7 +6815,7 @@ sub string_zip_base64 {
|
|||
close $zipin_fh;
|
||||
exit;
|
||||
}
|
||||
::debug("base64","Orig:@_\nAs base64:@base64\n");
|
||||
::debug("base64","Orig:@_\nAs bzip2 base64:@base64\n");
|
||||
return @base64;
|
||||
}
|
||||
|
||||
|
@ -6725,11 +6833,11 @@ sub base64_zip_eval {
|
|||
@GNU_Parallel=("use","IPC::Open3;","use","MIME::Base64");
|
||||
eval "@GNU_Parallel";
|
||||
|
||||
$SIG{CHLD}="IGNORE";
|
||||
# Search for bzip2. Not found => use default path
|
||||
$SIG{CHLD}="IGNORE";
|
||||
# Search for bzip2. Not found => use default path
|
||||
my $zip = (grep { -x $_ } "/usr/local/bin/bzip2")[0] || "bzip2";
|
||||
# $in = stdin on $zip, $out = stdout from $zip
|
||||
my($in, $out,$eval);
|
||||
# $in = stdin on $zip, $out = stdout from $zip
|
||||
my($in, $out,$eval);
|
||||
open3($in,$out,">&STDERR",$zip,"-dc");
|
||||
if(my $perlpid = fork) {
|
||||
close $in;
|
||||
|
@ -6742,7 +6850,27 @@ sub base64_zip_eval {
|
|||
close $in;
|
||||
exit;
|
||||
}
|
||||
wait;
|
||||
wait;
|
||||
eval $eval;
|
||||
});
|
||||
::debug("base64",$script,"\n");
|
||||
return $script;
|
||||
}
|
||||
|
||||
sub base64_eval {
|
||||
# Script that:
|
||||
# * reads base64 strings from @ARGV
|
||||
# * decodes them
|
||||
# * evals the result
|
||||
# Reverse of string_base64 + eval
|
||||
# Will be wrapped in ' so single quote is forbidden
|
||||
# Returns:
|
||||
# $script = 1-liner for perl -e
|
||||
my $script = ::spacefree(0,q{
|
||||
@GNU_Parallel=("use","IPC::Open3;","use","MIME::Base64");
|
||||
eval "@GNU_Parallel";
|
||||
my $eval;
|
||||
$eval = decode_base64(join"",@ARGV);
|
||||
eval $eval;
|
||||
});
|
||||
::debug("base64",$script,"\n");
|
||||
|
@ -6913,8 +7041,11 @@ sub sshlogin_wrap {
|
|||
$command =~ /\n/) {
|
||||
# csh does not deal well with > 1000 chars in one word
|
||||
# csh does not deal well with $ENV with \n
|
||||
$env_command = "perl -e '".base64_zip_eval()."' ".
|
||||
join" ",string_zip_base64($env_command);
|
||||
# bzip2 breaks --sql mysql://...
|
||||
# $env_command = "perl -e '".base64_zip_eval()."' ".
|
||||
# join" ",string_zip_base64($env_command);
|
||||
$env_command = "perl -e '".base64_eval()."' ".
|
||||
join" ",string_base64($env_command);
|
||||
$self->{'sshlogin_wrap'} = $env_command;
|
||||
} else {
|
||||
$self->{'sshlogin_wrap'} = "perl -e ".::shell_quote_scalar($env_command);
|
||||
|
@ -6942,8 +7073,11 @@ sub sshlogin_wrap {
|
|||
$command =~ /\n/) {
|
||||
# csh does not deal well with > 1000 chars in one word
|
||||
# csh does not deal well with $ENV with \n
|
||||
$quoted_remote_command = "perl -e \\''".base64_zip_eval()."'\\' ".
|
||||
join" ",string_zip_base64($remote_command);
|
||||
# bzip2 breaks --sql mysql://...
|
||||
# $quoted_remote_command = "perl -e \\''".base64_zip_eval()."'\\' "."".
|
||||
# join" ",string_zip_base64($remote_command);
|
||||
$quoted_remote_command = "perl -e \\''".base64_eval()."'\\' ".
|
||||
join" ",string_base64($remote_command);
|
||||
} else {
|
||||
$quoted_remote_command = $dq_remote_command;
|
||||
}
|
||||
|
@ -6999,6 +7133,9 @@ sub add_transfersize {
|
|||
my $self = shift;
|
||||
my $transfersize = shift;
|
||||
$self->{'transfersize'} += $transfersize;
|
||||
$opt::sqlworker and
|
||||
$Global::sql->update("SET Send = ? WHERE Seq = ".$self->seq(),
|
||||
$self->{'transfersize'});
|
||||
}
|
||||
|
||||
sub sshtransfer {
|
||||
|
@ -7041,6 +7178,9 @@ sub add_returnsize {
|
|||
my $self = shift;
|
||||
my $returnsize = shift;
|
||||
$self->{'returnsize'} += $returnsize;
|
||||
$opt::sqlworker and
|
||||
$Global::sql->update("SET Receive = ? WHERE Seq = ".$self->seq(),
|
||||
$self->{'returnsize'});
|
||||
}
|
||||
|
||||
sub sshreturn {
|
||||
|
@ -7275,10 +7415,10 @@ sub start {
|
|||
}
|
||||
$job->openoutputfiles();
|
||||
my($stdout_fh,$stderr_fh) = ($job->fh(1,"w"),$job->fh(2,"w"));
|
||||
if($opt::ungroup) {
|
||||
if($opt::ungroup or $opt::sqlworker) {
|
||||
print_dryrun_and_verbose($stdout_fh,$job,$command);
|
||||
}
|
||||
if($opt::dryrun) { $command = "true"; }
|
||||
if($opt::dryrun or $opt::sql) { $command = "true"; }
|
||||
$ENV{'PARALLEL_SEQ'} = $job->seq();
|
||||
$ENV{'PARALLEL_PID'} = $$;
|
||||
$ENV{'PARALLEL_TMP'} = ::tmpname("par");
|
||||
|
@ -7381,6 +7521,10 @@ sub print_dryrun_and_verbose {
|
|||
print $stdout_fh $command,"\n";
|
||||
}
|
||||
}
|
||||
if($opt::sqlworker) {
|
||||
$Global::sql->update("SET Command = ? WHERE Seq = ".$job->seq(),
|
||||
$job->replaced());
|
||||
}
|
||||
}
|
||||
|
||||
{
|
||||
|
@ -7567,7 +7711,11 @@ sub print {
|
|||
|
||||
if(($opt::dryrun or $Global::verbose)
|
||||
and
|
||||
not $self->{'verbose_printed'}) {
|
||||
not $self->{'verbose_printed'}
|
||||
and
|
||||
not $opt::sql
|
||||
and
|
||||
not $opt::sqlworker) {
|
||||
$self->{'verbose_printed'}++;
|
||||
if($Global::verbose <= 1) {
|
||||
print STDOUT $self->replaced(),"\n";
|
||||
|
@ -7638,6 +7786,10 @@ sub files_print {
|
|||
}
|
||||
} elsif($fdno == 1 and $self->fh($fdno,"name")) {
|
||||
print $out_fd $self->tag(),$self->fh($fdno,"name"),"\n";
|
||||
if($opt::sqlworker) {
|
||||
$Global::sql->update("SET Stdout = ? WHERE Seq = ".$self->seq(),
|
||||
$self->tag().$self->fh($fdno,"name"));
|
||||
}
|
||||
$self->add_returnsize(-s $self->fh($fdno,"name"));
|
||||
}
|
||||
}
|
||||
|
@ -7781,12 +7933,25 @@ sub normal_print {
|
|||
seek $in_fh, 0, 0;
|
||||
# $in_fh is now ready for reading at position 0
|
||||
my $outputlength = 0;
|
||||
my @output;
|
||||
while(sysread($in_fh,$buf,131072)) {
|
||||
print $out_fd $buf;
|
||||
$outputlength += length $buf;
|
||||
if($opt::sqlworker) {
|
||||
push @output, $buf;
|
||||
}
|
||||
}
|
||||
if($fdno == 1) {
|
||||
$self->add_returnsize($outputlength);
|
||||
if($opt::sqlworker and not $opt::results) {
|
||||
$Global::sql->update("SET Stdout = ? WHERE Seq = ".$self->seq(),
|
||||
join("",@output));
|
||||
}
|
||||
} else {
|
||||
if($opt::sqlworker and not $opt::results) {
|
||||
$Global::sql->update("SET Stderr = ? WHERE Seq = ".$self->seq(),
|
||||
join("",@output));
|
||||
}
|
||||
}
|
||||
close $in_fh;
|
||||
if($? and $opt::compress) {
|
||||
|
@ -7851,6 +8016,9 @@ sub set_exitstatus {
|
|||
# Status may have been set by --timeout
|
||||
$self->{'exitstatus'} ||= $exitstatus;
|
||||
}
|
||||
$opt::sqlworker and
|
||||
$Global::sql->update("SET Exitval = ? WHERE Seq = ".$self->seq(),
|
||||
$exitstatus);
|
||||
}
|
||||
|
||||
sub reset_exitstatus {
|
||||
|
@ -7867,9 +8035,11 @@ sub set_exitsignal {
|
|||
my $self = shift;
|
||||
my $exitsignal = shift;
|
||||
$self->{'exitsignal'} = $exitsignal;
|
||||
$opt::sqlworker and
|
||||
$Global::sql->update("SET _Signal = ? WHERE Seq = ".$self->seq(),
|
||||
$exitsignal);
|
||||
}
|
||||
|
||||
|
||||
{
|
||||
my $status_printed;
|
||||
my $total_jobs;
|
||||
|
@ -8092,6 +8262,10 @@ sub populate {
|
|||
}
|
||||
}
|
||||
}
|
||||
if($opt::sql) {
|
||||
# Insert the V1..Vn for this $seq in SQL table instead of generating one
|
||||
$Global::sql->insert_records($self->seq(),$self->{'arg_list_flat_orig'});
|
||||
}
|
||||
}
|
||||
|
||||
sub push {
|
||||
|
@ -8715,6 +8889,10 @@ sub get {
|
|||
$self->{'len'},
|
||||
);
|
||||
$cmd_line->populate();
|
||||
if($opt::sqlworker) {
|
||||
# Get the sequence number from the SQL table
|
||||
$cmd_line->set_seq($SQL::next_seq);
|
||||
}
|
||||
::debug("init","cmd_line->number_of_args ",
|
||||
$cmd_line->number_of_args(), "\n");
|
||||
if(not $Global::no_more_input and ($opt::pipe or $opt::pipepart)) {
|
||||
|
@ -8893,7 +9071,10 @@ sub new {
|
|||
my $colsep = shift;
|
||||
my @unget = ();
|
||||
my $arg_sub_queue;
|
||||
if($colsep) {
|
||||
if($opt::sqlworker) {
|
||||
# Open SQL table
|
||||
$arg_sub_queue = SQLRecordQueue->new();
|
||||
} elsif($colsep) {
|
||||
# Open one file with colsep
|
||||
$arg_sub_queue = RecordColQueue->new($fhs);
|
||||
} else {
|
||||
|
@ -8978,7 +9159,7 @@ sub get {
|
|||
if(@{$self->{'unget'}}) {
|
||||
return shift @{$self->{'unget'}};
|
||||
}
|
||||
my $unget_ref=$self->{'unget'};
|
||||
my $unget_ref = $self->{'unget'};
|
||||
if($self->{'arg_sub_queue'}->empty()) {
|
||||
return undef;
|
||||
}
|
||||
|
@ -9017,6 +9198,45 @@ sub empty {
|
|||
}
|
||||
|
||||
|
||||
package SQLRecordQueue;
|
||||
|
||||
sub new {
|
||||
my $class = shift;
|
||||
my @unget = ();
|
||||
return bless {
|
||||
'unget' => \@unget,
|
||||
}, ref($class) || $class;
|
||||
}
|
||||
|
||||
sub get {
|
||||
# Returns:
|
||||
# reference to array of Arg-objects
|
||||
my $self = shift;
|
||||
if(@{$self->{'unget'}}) {
|
||||
return shift @{$self->{'unget'}};
|
||||
}
|
||||
return $Global::sql->get_record();
|
||||
}
|
||||
|
||||
sub unget {
|
||||
my $self = shift;
|
||||
::debug("run", "SQLRecordQueue-unget '@_'\n");
|
||||
unshift @{$self->{'unget'}}, @_;
|
||||
}
|
||||
|
||||
sub empty {
|
||||
my $self = shift;
|
||||
if(@{$self->{'unget'}}) { return 0; }
|
||||
my $get = $self->get();
|
||||
if(defined $get) {
|
||||
$self->unget($get);
|
||||
}
|
||||
my $empty = not $get;
|
||||
::debug("run", "SQLRecordQueue->empty $empty");
|
||||
return $empty;
|
||||
}
|
||||
|
||||
|
||||
package MultifileQueue;
|
||||
|
||||
@Global::unget_argv=();
|
||||
|
@ -9463,6 +9683,323 @@ sub insert {
|
|||
}
|
||||
|
||||
|
||||
package SQL;
|
||||
|
||||
sub new {
|
||||
my $class = shift;
|
||||
my $dburl = shift;
|
||||
$Global::use{"DBI"} ||= eval "use DBI; 1;";
|
||||
my %options = parse_dburl(get_alias($dburl));
|
||||
my %driveralias = ("sqlite" => "SQLite",
|
||||
"sqlite3" => "SQLite",
|
||||
"pg" => "Pg",
|
||||
"postgres" => "Pg",
|
||||
"postgresql" => "Pg");
|
||||
my $driver = $driveralias{$options{'databasedriver'}} || $options{'databasedriver'};
|
||||
my $database = $options{'database'};
|
||||
my $host = $options{'host'} ? ";host=".$options{'host'} : "";
|
||||
my $port = $options{'port'} ? ";port=".$options{'port'} : "";
|
||||
my $dsn = "DBI:$driver:dbname=$database$host$port";
|
||||
my $userid = $options{'user'};
|
||||
my $password = $options{'password'};;
|
||||
my $dbh = DBI->connect($dsn, $userid, $password, { RaiseError => 1 })
|
||||
or die $DBI::errstr;
|
||||
return bless {
|
||||
'dbh' => $dbh,
|
||||
'max_number_of_args' => undef,
|
||||
'table' => $options{'table'},
|
||||
}, ref($class) || $class;
|
||||
}
|
||||
|
||||
sub get_alias {
|
||||
my $alias = shift;
|
||||
$alias =~ s/^(sql:)*//; # Accept aliases prepended with sql:
|
||||
if ($alias !~ /^:/) {
|
||||
return $alias;
|
||||
}
|
||||
|
||||
# Find the alias
|
||||
my $path;
|
||||
if (-l $0) {
|
||||
($path) = readlink($0) =~ m|^(.*)/|;
|
||||
} else {
|
||||
($path) = $0 =~ m|^(.*)/|;
|
||||
}
|
||||
|
||||
my @deprecated = ("$ENV{HOME}/.dburl.aliases",
|
||||
"$path/dburl.aliases", "$path/dburl.aliases.dist");
|
||||
for (@deprecated) {
|
||||
if(-r $_) {
|
||||
print STDERR "$_ is deprecated. Use .sql/aliases instead (read man sql)\n";
|
||||
}
|
||||
}
|
||||
my @urlalias=();
|
||||
check_permissions("$ENV{HOME}/.sql/aliases");
|
||||
check_permissions("$ENV{HOME}/.dburl.aliases");
|
||||
my @search = ("$ENV{HOME}/.sql/aliases",
|
||||
"$ENV{HOME}/.dburl.aliases", "/etc/sql/aliases",
|
||||
"$path/dburl.aliases", "$path/dburl.aliases.dist");
|
||||
for my $alias_file (@search) {
|
||||
if(-r $alias_file) {
|
||||
push @urlalias, `cat "$alias_file"`;
|
||||
}
|
||||
}
|
||||
my ($alias_part,$rest) = $alias=~/(:\w*)(.*)/;
|
||||
# If we saw this before: we have an alias loop
|
||||
if(grep {$_ eq $alias_part } @Private::seen_aliases) {
|
||||
print STDERR "$alias_part is a cyclic alias\n";
|
||||
exit -1;
|
||||
} else {
|
||||
push @Private::seen_aliases, $alias_part;
|
||||
}
|
||||
|
||||
my $dburl;
|
||||
for (@urlalias) {
|
||||
/^$alias_part\s+(\S+.*)/ and do { $dburl = $1; last; }
|
||||
}
|
||||
|
||||
if($dburl) {
|
||||
return get_alias($dburl.$rest);
|
||||
} else {
|
||||
Usage("$alias is not defined in @search");
|
||||
exit(-1);
|
||||
}
|
||||
}
|
||||
|
||||
sub check_permissions {
|
||||
my $file = shift;
|
||||
|
||||
if(-e $file) {
|
||||
if(not -o $file) {
|
||||
my $username = (getpwuid($<))[0];
|
||||
print STDERR "$file should be owned by $username: chown $username $file\n";
|
||||
}
|
||||
my ($dev,$ino,$mode,$nlink,$uid,$gid,$rdev,$size,
|
||||
$atime,$mtime,$ctime,$blksize,$blocks) = stat($file);
|
||||
if($mode & 077) {
|
||||
my $username = (getpwuid($<))[0];
|
||||
print STDERR "$file should be only be readable by $username: chmod 600 $file\n";
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
sub parse_dburl {
|
||||
my $url = shift;
|
||||
my %options = ();
|
||||
# sql:mysql://[[user][:password]@][host][:port]/[database[/table][?sql query]]
|
||||
|
||||
if($url=~m!(?:sql:)? # You can prefix with 'sql:'
|
||||
((?:oracle|ora|mysql|pg|postgres|postgresql)(?:s|ssl|)|
|
||||
(?:sqlite|sqlite2|sqlite3)):// # Databasedriver ($1)
|
||||
(?:
|
||||
([^:@/][^:@]*|) # Username ($2)
|
||||
(?:
|
||||
:([^@]*) # Password ($3)
|
||||
)?
|
||||
@)?
|
||||
([^:/]*)? # Hostname ($4)
|
||||
(?:
|
||||
:
|
||||
([^/]*)? # Port ($5)
|
||||
)?
|
||||
(?:
|
||||
/
|
||||
([^/?]*)? # Database ($6)
|
||||
)?
|
||||
(?:
|
||||
/
|
||||
([^?]*)? # Table ($7)
|
||||
)?
|
||||
(?:
|
||||
\?
|
||||
(.*)? # Query ($8)
|
||||
)?
|
||||
!ix) {
|
||||
$options{databasedriver} = ::undef_if_empty(lc(uri_unescape($1)));
|
||||
$options{user} = ::undef_if_empty(uri_unescape($2));
|
||||
$options{password} = ::undef_if_empty(uri_unescape($3));
|
||||
$options{host} = ::undef_if_empty(uri_unescape($4));
|
||||
$options{port} = ::undef_if_empty(uri_unescape($5));
|
||||
$options{database} = ::undef_if_empty(uri_unescape($6));
|
||||
$options{table} = ::undef_if_empty(uri_unescape($7));
|
||||
$options{query} = ::undef_if_empty(uri_unescape($8));
|
||||
::debug("sql","dburl $url\n");
|
||||
::debug("sql","databasedriver ",$options{databasedriver}, " user ", $options{user},
|
||||
" password ", $options{password}, " host ", $options{host},
|
||||
" port ", $options{port}, " database ", $options{database},
|
||||
" table ",$options{table}," query ",$options{query}, "\n");
|
||||
|
||||
} else {
|
||||
::error("$url is not a valid DBURL");
|
||||
exit 255;
|
||||
}
|
||||
return %options;
|
||||
}
|
||||
|
||||
sub uri_unescape {
|
||||
# Copied from http://cpansearch.perl.org/src/GAAS/URI-1.55/URI/Escape.pm
|
||||
# to avoid depending on URI::Escape
|
||||
# This section is (C) Gisle Aas.
|
||||
# Note from RFC1630: "Sequences which start with a percent sign
|
||||
# but are not followed by two hexadecimal characters are reserved
|
||||
# for future extension"
|
||||
my $str = shift;
|
||||
if (@_ && wantarray) {
|
||||
# not executed for the common case of a single argument
|
||||
my @str = ($str, @_); # need to copy
|
||||
foreach (@str) {
|
||||
s/%([0-9A-Fa-f]{2})/chr(hex($1))/eg;
|
||||
}
|
||||
return @str;
|
||||
}
|
||||
$str =~ s/%([0-9A-Fa-f]{2})/chr(hex($1))/eg if defined $str;
|
||||
$str;
|
||||
}
|
||||
|
||||
sub run {
|
||||
my $self = shift;
|
||||
my $stmt = shift;
|
||||
my @retval;
|
||||
my $dbh = $self->{'dbh'};
|
||||
::debug("sql","$opt::sql$opt::sqlworker run $stmt\n");
|
||||
# Execute with the rest of the args - if any
|
||||
my $rv;
|
||||
my $sth;
|
||||
my $lockretry = 0;
|
||||
while($lockretry < 10) {
|
||||
$sth = $dbh->prepare($stmt);
|
||||
if($rv = $sth->execute(@_)) {
|
||||
last;
|
||||
} else {
|
||||
if($DBI::errstr =~ /locked/) {
|
||||
::debug("sql","Lock retry: $lockretry");
|
||||
$lockretry++;
|
||||
} else {
|
||||
::error($DBI::errstr);
|
||||
}
|
||||
}
|
||||
}
|
||||
if($rv < 0){
|
||||
print $DBI::errstr;
|
||||
}
|
||||
return $sth;
|
||||
}
|
||||
|
||||
sub get {
|
||||
my $self = shift;
|
||||
my $sth = $self->run(@_);
|
||||
my @retval;
|
||||
while(1) {
|
||||
my @row = $sth->fetchrow_array();
|
||||
@row or last;
|
||||
push @retval, \@row;
|
||||
}
|
||||
return \@retval;
|
||||
}
|
||||
|
||||
sub table {
|
||||
my $self = shift;
|
||||
return $self->{'table'};
|
||||
}
|
||||
|
||||
sub update {
|
||||
my $self = shift;
|
||||
my $stmt = shift;
|
||||
my $table = $self->table();
|
||||
$self->run("UPDATE $table $stmt",@_);
|
||||
}
|
||||
|
||||
sub max_number_of_args {
|
||||
# Maximal number of args for this table
|
||||
my $self = shift;
|
||||
if(not $self->{'max_number_of_args'}) {
|
||||
# Read the number of args from the SQL table
|
||||
my $table = $self->table();
|
||||
my $v = $self->get("SELECT * FROM $table LIMIT 1;");
|
||||
my @reserved_columns = qw(Seq Host Starttime JobRuntime Send
|
||||
Receive Exitval _Signal Command Stdout Stderr);
|
||||
if(not $v) {
|
||||
::error("$table contains no records");
|
||||
}
|
||||
# Count the number of Vx columns
|
||||
$self->{'max_number_of_args'} = $#{$v->[0]} - $#reserved_columns;
|
||||
}
|
||||
return $self->{'max_number_of_args'};
|
||||
}
|
||||
|
||||
sub set_max_number_of_args {
|
||||
my $self = shift;
|
||||
$self->{'max_number_of_args'} = shift;
|
||||
}
|
||||
|
||||
sub create_table {
|
||||
my $self = shift;
|
||||
my $max_number_of_args = shift;
|
||||
$self->set_max_number_of_args($max_number_of_args);
|
||||
my $table = $self->table();
|
||||
$self->run(qq(DROP TABLE IF EXISTS $table;));
|
||||
my $v_def = join "", map { "V$_ TEXT," } (1..$self->max_number_of_args());
|
||||
$self->run(qq{CREATE TABLE $table
|
||||
(Seq INT,
|
||||
Host TEXT,
|
||||
Starttime REAL,
|
||||
JobRuntime REAL,
|
||||
Send INT,
|
||||
Receive INT,
|
||||
Exitval INT,
|
||||
_Signal INT,
|
||||
Command TEXT,}.
|
||||
$v_def.
|
||||
qq{Stdout TEXT,
|
||||
Stderr TEXT);});
|
||||
}
|
||||
|
||||
sub insert_records {
|
||||
my $self = shift;
|
||||
my $seq = shift;
|
||||
my $record_ref = shift;
|
||||
my $table = $self->table();
|
||||
my $v_cols = join ",", map { "V$_" } (1..$self->max_number_of_args());
|
||||
# Two extra value due to $seq, Exitval
|
||||
my $v_vals = join ",", map { "?" } (1..$self->max_number_of_args()+2);
|
||||
$self->run("INSERT INTO $table (Seq,Exitval,$v_cols) ".
|
||||
"VALUES ($v_vals);", $seq, -1000, @$record_ref[1..$#$record_ref]);
|
||||
}
|
||||
|
||||
sub get_record {
|
||||
my $self = shift;
|
||||
my @retval;
|
||||
my $table = $self->table();
|
||||
my $v_cols = join ",", map { "V$_" } (1..$self->max_number_of_args());
|
||||
my $v = $self->get("SELECT Seq, $v_cols FROM $table ".
|
||||
"WHERE Exitval = -1000 ORDER BY Seq LIMIT 1;");
|
||||
if($v->[0]) {
|
||||
my $val_ref = $v->[0];
|
||||
# Mark record as taken
|
||||
my $seq = shift @$val_ref;
|
||||
# Save the sequence number to use when running the job
|
||||
$SQL::next_seq = $seq;
|
||||
$self->update("SET Exitval = ? WHERE Seq = ".$seq, -1220);
|
||||
for (@$val_ref) {
|
||||
push @retval, Arg->new($_);
|
||||
}
|
||||
}
|
||||
if(@retval) {
|
||||
return \@retval;
|
||||
} else {
|
||||
return undef;
|
||||
}
|
||||
}
|
||||
|
||||
sub finished {
|
||||
# Check if there are any jobs left in the SQL table that do not
|
||||
# have a "real" exitval
|
||||
my $self = shift;
|
||||
my $table = $self->table();
|
||||
my $rv = $self->get("select Seq,Exitval from $table where Exitval <= -1000 limit 1");
|
||||
return not $rv->[0];
|
||||
}
|
||||
|
||||
package Semaphore;
|
||||
|
||||
# This package provides a counting semaphore
|
||||
|
|
110
src/parallel.pod
110
src/parallel.pod
|
@ -514,7 +514,7 @@ should feel free to use B<--will-cite>.
|
|||
Size of block in bytes to read at a time. The I<size> can be postfixed
|
||||
with K, M, G, T, P, k, m, g, t, or p which would multiply the size
|
||||
with 1024, 1048576, 1073741824, 1099511627776, 1125899906842624, 1000,
|
||||
1000000, 1000000000, 1000000000000, or 1000000000000000 respectively.
|
||||
1000000, 1000000000, 1000000000000, or 1000000000000000, respectively.
|
||||
|
||||
GNU B<parallel> tries to meet the block size but can be off by the
|
||||
length of one record. For performance reasons I<size> should be bigger
|
||||
|
@ -1088,7 +1088,7 @@ most likely do what is needed.
|
|||
|
||||
Multiple arguments. Insert as many arguments as the command line
|
||||
length permits. If multiple jobs are being run in parallel: distribute
|
||||
the arguments evenly among the jobs. Use B<-j1> to avoid this.
|
||||
the arguments evenly among the jobs. Use B<-j1> or B<--xargs> to avoid this.
|
||||
|
||||
If B<{}> is not used the arguments will be appended to the
|
||||
line. If B<{}> is used multiple times each B<{}> will be replaced
|
||||
|
@ -1106,7 +1106,7 @@ Minimum memory free when starting another job. The I<size> can be
|
|||
postfixed with K, M, G, T, P, k, m, g, t, or p which would multiply
|
||||
the size with 1024, 1048576, 1073741824, 1099511627776,
|
||||
1125899906842624, 1000, 1000000, 1000000000, 1000000000000, or
|
||||
1000000000000000 respectively.
|
||||
1000000000000000, respectively.
|
||||
|
||||
If the jobs take up very different amount of RAM, GNU B<parallel> will
|
||||
only start as many as there is memory for. If less than I<size> bytes
|
||||
|
@ -1791,6 +1791,50 @@ Do not use the first line of input (used by GNU B<parallel> itself
|
|||
when called with B<--shebang>).
|
||||
|
||||
|
||||
=item B<--sql> I<DBURL> (alpha testing)
|
||||
|
||||
Submit jobs via SQL server. I<DBURL> must point to a table, which will
|
||||
contain the same information as B<--joblog>, the values from the input
|
||||
sources (stored in columns V1 .. Vn), and the output (stored in
|
||||
columns Stdout and Stderr).
|
||||
|
||||
The table will be dropped and created with the correct amount of
|
||||
V-columns.
|
||||
|
||||
B<--sql> does not run any jobs, but it creates the values for the jobs
|
||||
to be run and wait for them to complete. One or more B<--sqlworker>
|
||||
must be run to actually execute the jobs.
|
||||
|
||||
The format of a DBURL is:
|
||||
|
||||
[sql:]vendor://[[user][:password]@][host][:port]/[database]/table
|
||||
|
||||
E.g.
|
||||
|
||||
sql:mysql://hr:hr@localhost:3306/hrdb/jobs
|
||||
mysql://scott:tiger@my.example.com/pardb/paralleljobs
|
||||
sql:oracle://scott:tiger@ora.example.com/xe/parjob
|
||||
postgresql://scott:tiger@pg.example.com/pgdb/parjob
|
||||
pg:///parjob
|
||||
sqlite3:///pardb/parjob
|
||||
|
||||
It can also be an alias from ~/.sql/aliases:
|
||||
|
||||
:myalias mysql:///mydb/paralleljobs
|
||||
|
||||
|
||||
=item B<--sqlandworker> I<DBURL> (alpha testing)
|
||||
|
||||
Shorthand for: B<--sql> I<DBURL> B<--sqlworker> I<DBURL>.
|
||||
|
||||
|
||||
=item B<--sqlworker> I<DBURL> (alpha testing)
|
||||
|
||||
Execute jobs via SQL server. Read the input sources variables from the
|
||||
table pointed to by I<DBURL>. The I<command> on the command line
|
||||
should be the same as given by B<--sql>.
|
||||
|
||||
|
||||
=item B<--ssh> I<sshcommand>
|
||||
|
||||
GNU B<parallel> defaults to using B<ssh> for remote access. This can
|
||||
|
@ -2579,7 +2623,8 @@ files.
|
|||
tar xvf foo.tgz | perl -ne 'print $l;$l=$_;END{print $l}' | \
|
||||
parallel echo
|
||||
|
||||
The Perl one-liner is needed to avoid race condition.
|
||||
The Perl one-liner is needed to make sure the file is complete before
|
||||
handing it to GNU B<parallel>.
|
||||
|
||||
|
||||
=head1 EXAMPLE: Rewriting a for-loop and a while-read-loop
|
||||
|
@ -3410,6 +3455,35 @@ computers:
|
|||
|
||||
true >jobqueue; tail -n+0 -f jobqueue | parallel -S ..
|
||||
|
||||
If you keep this running for a long time, jobqueue will grow. A way of
|
||||
removing the jobs already run is by making GNU B<parallel> stop when
|
||||
it hits a special value and then restart. To use B<--eof> to make GNU
|
||||
B<parallel> exit, B<tail> also needs to be forced to exit:
|
||||
|
||||
true >jobqueue;
|
||||
while true; do
|
||||
tail -n+0 -f jobqueue |
|
||||
(parallel -E StOpHeRe -S ..; echo GNU Parallel is now done;
|
||||
perl -e 'while(<>){/StOpHeRe/ and last};print <>' jobqueue > j2;
|
||||
(seq 1000 >> jobqueue &);
|
||||
echo Done appending dummy data forcing tail to exit)
|
||||
echo tail exited;
|
||||
mv j2 jobqueue
|
||||
done
|
||||
|
||||
In some cases you can run on more CPUs and computers during the night:
|
||||
|
||||
# Day time
|
||||
echo 50% > jobfile
|
||||
cp day_server_list ~/.parallel/sshloginfile
|
||||
# Night time
|
||||
echo 100% > jobfile
|
||||
cp night_server_list ~/.parallel/sshloginfile
|
||||
tail -n+0 -f jobqueue | parallel --jobs jobfile -S ..
|
||||
|
||||
GNU Parallel discovers if B<jobfile> or B<~/.parallel/sshloginfile>
|
||||
changes.
|
||||
|
||||
There is a a small issue when using GNU B<parallel> as queue
|
||||
system/batch manager: You have to submit JobSlot number of jobs before
|
||||
they will start, and after that you can submit one at a time, and job
|
||||
|
@ -3421,14 +3495,6 @@ E.g. if you have 10 jobslots then the output from the first completed
|
|||
job will only be printed when job 11 has started, and the output of
|
||||
second completed job will only be printed when job 12 has started.
|
||||
|
||||
To use B<--eof> to make GNU B<parallel> exit, B<tail> also needs to be
|
||||
forced to exit:
|
||||
|
||||
tail -n+0 -f command-list.txt |
|
||||
(parallel --eof=EXIT {}; echo Parallel is now done;
|
||||
(seq 1000 >> command-list.txt &);
|
||||
echo Done appending dummy data forcing tail to exit)
|
||||
|
||||
|
||||
=head1 EXAMPLE: GNU Parallel as dir processor
|
||||
|
||||
|
@ -3773,6 +3839,26 @@ More than 100 jobs failed.
|
|||
|
||||
Other error.
|
||||
|
||||
=item Z<>-1 (In joblog and SQL table)
|
||||
|
||||
Killed by Ctrl-C, timeout, not enough memory or similar.
|
||||
|
||||
=item Z<>-2 (In joblog and SQL table)
|
||||
|
||||
$job->skip() was called in {= =}.
|
||||
|
||||
=item Z<>-1200 (In SQL table)
|
||||
|
||||
Job is ready to run (used with --sql).
|
||||
|
||||
=item Z<>-1220 (In SQL table)
|
||||
|
||||
Job is taken by worker to be started (used with --sql).
|
||||
|
||||
=item Z<>-1250 (In SQL table)
|
||||
|
||||
Job is running (used with --sql).
|
||||
|
||||
=back
|
||||
|
||||
If fail=1 is used, the exit status will be the exit status of the
|
||||
|
|
|
@ -32,7 +32,7 @@ is CentOS 3.9 and Perl 5.8.0.
|
|||
GNU B<parallel> busy waits. This is because the reason why a job is
|
||||
not started may be due to load average, and thus it will not make
|
||||
sense to wait for a job to finish. Instead the load average must be
|
||||
checked again. Load average is not the only reason: --timeout has a
|
||||
checked again. Load average is not the only reason: B<--timeout> has a
|
||||
similar problem.
|
||||
|
||||
To not burn up too much CPU GNU B<parallel> sleeps exponentially
|
||||
|
@ -56,14 +56,13 @@ The easiest way to explain what GNU B<parallel> does is to assume that
|
|||
there are a number of job slots, and when a slot becomes available a
|
||||
job from the queue will be run in that slot. But originally GNU
|
||||
B<parallel> did not model job slots in the code. Job slots have been
|
||||
added to make it possible to use {%} as a replacement string.
|
||||
added to make it possible to use B<{%}> as a replacement string.
|
||||
|
||||
While the job
|
||||
sequence number can be computed in advance, the job slot can only be
|
||||
computed the moment a slot becomes available. So it has been
|
||||
implemented as a stack with lazy evaluation: Draw one from an empty
|
||||
stack and the stack is extended by one. When a job is done, push the
|
||||
available job slot back on the stack.
|
||||
While the job sequence number can be computed in advance, the job slot
|
||||
can only be computed the moment a slot becomes available. So it has
|
||||
been implemented as a stack with lazy evaluation: Draw one from an
|
||||
empty stack and the stack is extended by one. When a job is done, push
|
||||
the available job slot back on the stack.
|
||||
|
||||
This implementation also means that if you use remote executions, you
|
||||
cannot assume that a given job slot will remain on the same remote
|
||||
|
@ -89,7 +88,7 @@ B<--compress> compresses the data in the temporary files. This is a
|
|||
bit tricky because there should be no files to clean up if GNU
|
||||
B<parallel> is killed by a power outage.
|
||||
|
||||
GNU B<parallel> first selects a compress program. If the user has not
|
||||
GNU B<parallel> first selects a compression program. If the user has not
|
||||
selected one, the first of these that are in $PATH is used: B<lz4 pigz
|
||||
lzop plzip pbzip2 pxz gzip lzma xz bzip2 lzip>. They are sorted by
|
||||
speed on a 16 core machine.
|
||||
|
@ -102,16 +101,16 @@ Schematically the setup is as follows:
|
|||
The setup is duplicated for both standard output (stdout) and standard
|
||||
error (stderr).
|
||||
|
||||
GNU B<parallel> pipes output from the command run into the compress
|
||||
GNU B<parallel> pipes output from the command run into the compression
|
||||
program which saves to a tmpfile. GNU B<parallel> records the pid of
|
||||
the compress program. At the same time a small perl script (called
|
||||
B<cattail> above) is started: It basically does B<cat> followed by
|
||||
B<tail -f>, but it also removes the tmpfile as soon as the first byte
|
||||
is read, and it continously checks if the pid of the compress program
|
||||
is dead. If the compress program is dead, B<cattail> reads the rest of
|
||||
tmpfile and exits.
|
||||
is read, and it continously checks if the pid of the compression
|
||||
program is dead. If the compress program is dead, B<cattail> reads the
|
||||
rest of tmpfile and exits.
|
||||
|
||||
As most compress programs write out a header when they start, the
|
||||
As most compression programs write out a header when they start, the
|
||||
tmpfile in practice is unlinked after around 40 ms.
|
||||
|
||||
|
||||
|
@ -134,16 +133,12 @@ Local: B<setpriority(0,0,$nice)>
|
|||
|
||||
=item --cat
|
||||
|
||||
cat > {}; I<input> {};
|
||||
perl -e '$bash = shift;
|
||||
$csh = shift;
|
||||
for(@ARGV) {
|
||||
unlink;rmdir;
|
||||
}
|
||||
if($bash =~ s/h//) {
|
||||
exit $bash;
|
||||
}
|
||||
exit $csh;' "$?h" "$status" {};
|
||||
cat > {}; I<input> {};
|
||||
perl -e '$bash = shift;
|
||||
$csh = shift;
|
||||
for(@ARGV) { unlink;rmdir; }
|
||||
if($bash =~ s/h//) { exit $bash; }
|
||||
exit $csh;' "$?h" "$status" {};
|
||||
|
||||
{} is set to $PARALLEL_TMP which is a tmpfile. The Perl script saves
|
||||
the exit value, unlinks the tmpfile, and returns the exit value - no
|
||||
|
@ -151,22 +146,22 @@ matter if the shell is B<bash> (using $?) or B<*csh> (using $status).
|
|||
|
||||
=item --fifo
|
||||
|
||||
perl -e '($s,$c,$f) = @ARGV;
|
||||
# mkfifo $PARALLEL_TMP
|
||||
system "mkfifo", $f;
|
||||
# spawn $shell -c $command &
|
||||
$pid = fork || exec $s, "-c", $c;
|
||||
open($o,">",$f) || die $!;
|
||||
# cat > $PARALLEL_TMP
|
||||
while(sysread(STDIN,$buf,131072)){
|
||||
syswrite $o, $buf;
|
||||
}
|
||||
close $o;
|
||||
# waitpid to get the exit code from $command
|
||||
waitpid $pid,0;
|
||||
# Cleanup
|
||||
unlink $f;
|
||||
exit $?/256;' I<shell> I<input> $PARALLEL_TMP
|
||||
perl -e '($s,$c,$f) = @ARGV;
|
||||
# mkfifo $PARALLEL_TMP
|
||||
system "mkfifo", $f;
|
||||
# spawn $shell -c $command &
|
||||
$pid = fork || exec $s, "-c", $c;
|
||||
open($o,">",$f) || die $!;
|
||||
# cat > $PARALLEL_TMP
|
||||
while(sysread(STDIN,$buf,131072)){
|
||||
syswrite $o, $buf;
|
||||
}
|
||||
close $o;
|
||||
# waitpid to get the exit code from $command
|
||||
waitpid $pid,0;
|
||||
# Cleanup
|
||||
unlink $f;
|
||||
exit $?/256;' I<shell> I<input> $PARALLEL_TMP
|
||||
|
||||
This is an elaborate way of: mkfifo {}; run I<input> in the
|
||||
background using I<shell>; copying STDIN to {}; waiting for background
|
||||
|
@ -214,19 +209,19 @@ B<$_EXIT_status>: see B<--return> above.
|
|||
|
||||
=item --pipe
|
||||
|
||||
perl -e 'if(sysread(STDIN, $buf, 1)) {
|
||||
open($fh, "|-", "@ARGV") || die;
|
||||
syswrite($fh, $buf);
|
||||
# Align up to 128k block
|
||||
if($read = sysread(STDIN, $buf, 131071)) {
|
||||
syswrite($fh, $buf);
|
||||
}
|
||||
while($read = sysread(STDIN, $buf, 131072)) {
|
||||
syswrite($fh, $buf);
|
||||
}
|
||||
close $fh;
|
||||
exit ($?&127 ? 128+($?&127) : 1+$?>>8)
|
||||
}' I<shell> -c I<input>
|
||||
perl -e 'if(sysread(STDIN, $buf, 1)) {
|
||||
open($fh, "|-", "@ARGV") || die;
|
||||
syswrite($fh, $buf);
|
||||
# Align up to 128k block
|
||||
if($read = sysread(STDIN, $buf, 131071)) {
|
||||
syswrite($fh, $buf);
|
||||
}
|
||||
while($read = sysread(STDIN, $buf, 131072)) {
|
||||
syswrite($fh, $buf);
|
||||
}
|
||||
close $fh;
|
||||
exit ($?&127 ? 128+($?&127) : 1+$?>>8)
|
||||
}' I<shell> -c I<input>
|
||||
|
||||
This small wrapper makes sure that I<input> will never be run if
|
||||
there is no data.
|
||||
|
@ -261,8 +256,6 @@ the FIFO, and this value is used as exit value.
|
|||
To make it compatible with B<csh> and B<bash> the exit value is
|
||||
printed as: $?h/$status and this is parsed by B<perl>.
|
||||
|
||||
Works in B<csh>.
|
||||
|
||||
There is a bug that makes it necessary to print the exit value 3
|
||||
times.
|
||||
|
||||
|
@ -272,17 +265,17 @@ are added to the title to force it to be outside the limits.
|
|||
|
||||
You can map the bad limits using:
|
||||
|
||||
perl -e 'sub r { int(rand(shift)).($_[0] && "\t".r(@_)) } print map { r(@ARGV)."\n" } 1..10000' 1600 1500 90 |
|
||||
perl -ane '$F[0]+$F[1]+$F[2] < 2037 and print ' |
|
||||
parallel --colsep '\t' --tagstring '{1}\t{2}\t{3}' tmux -S /tmp/p{%}-'{=3 $_="O"x$_ =}' \
|
||||
new-session -d -n '{=1 $_="O"x$_ =}' true'\ {=2 $_="O"x$_ =};echo $?;rm -f /tmp/p{%}-O*'
|
||||
perl -e 'sub r { int(rand(shift)).($_[0] && "\t".r(@_)) } print map { r(@ARGV)."\n" } 1..10000' 1600 1500 90 |
|
||||
perl -ane '$F[0]+$F[1]+$F[2] < 2037 and print ' |
|
||||
parallel --colsep '\t' --tagstring '{1}\t{2}\t{3}' tmux -S /tmp/p{%}-'{=3 $_="O"x$_ =}' \
|
||||
new-session -d -n '{=1 $_="O"x$_ =}' true'\ {=2 $_="O"x$_ =};echo $?;rm -f /tmp/p{%}-O*'
|
||||
|
||||
perl -e 'sub r { int(rand(shift)).($_[0] && "\t".r(@_)) } print map { r(@ARGV)."\n" } 1..10000' 17000 17000 90 |
|
||||
parallel --colsep '\t' --tagstring '{1}\t{2}\t{3}' \
|
||||
tmux -S /tmp/p{%}-'{=3 $_="O"x$_ =}' new-session -d -n '{=1 $_="O"x$_ =}' true'\ {=2 $_="O"x$_ =};echo $?;rm /tmp/p{%}-O*'
|
||||
> value.csv 2>/dev/null
|
||||
perl -e 'sub r { int(rand(shift)).($_[0] && "\t".r(@_)) } print map { r(@ARGV)."\n" } 1..10000' 17000 17000 90 |
|
||||
parallel --colsep '\t' --tagstring '{1}\t{2}\t{3}' \
|
||||
tmux -S /tmp/p{%}-'{=3 $_="O"x$_ =}' new-session -d -n '{=1 $_="O"x$_ =}' true'\ {=2 $_="O"x$_ =};echo $?;rm /tmp/p{%}-O*'
|
||||
> value.csv 2>/dev/null
|
||||
|
||||
R -e 'a<-read.table("value.csv");X11();plot(a[,1],a[,2],col=a[,3]+5,cex=0.1);Sys.sleep(1000)'
|
||||
R -e 'a<-read.table("value.csv");X11();plot(a[,1],a[,2],col=a[,3]+5,cex=0.1);Sys.sleep(1000)'
|
||||
|
||||
For B<tmux 1.8> 17000 can be lowered to 2100.
|
||||
|
||||
|
@ -306,23 +299,6 @@ B<--pipepart>/B<--pipe> should be done on the local machine inside B<--tmux>
|
|||
=back
|
||||
|
||||
|
||||
=head2 --block-size adjustment
|
||||
|
||||
Every time GNU B<parallel> detects a record bigger than
|
||||
B<--block-size> it increases the block size by 30%. A small
|
||||
B<--block-size> gives very poor performance; by exponentially
|
||||
increasing the block size performance will not suffer.
|
||||
|
||||
GNU B<parallel> will waste CPU power if B<--block-size> does not
|
||||
contain a full record, because it tries to find a full record and will
|
||||
fail to do so. The recommendation is therefore to use a
|
||||
B<--block-size> > 2 records, so you always get at least one full
|
||||
record when you read one block.
|
||||
|
||||
If you use B<-N> then B<--block-size> should be big enough to contain
|
||||
N+1 records.
|
||||
|
||||
|
||||
=head2 Convenience options --nice --basefile --transfer --return
|
||||
--cleanup --tmux --group --compress --cat --fifo --workdir
|
||||
|
||||
|
@ -360,6 +336,23 @@ that correctly for all corner cases is next to impossible to do by
|
|||
hand.
|
||||
|
||||
|
||||
=head2 --block-size adjustment
|
||||
|
||||
Every time GNU B<parallel> detects a record bigger than
|
||||
B<--block-size> it increases the block size by 30%. A small
|
||||
B<--block-size> gives very poor performance; by exponentially
|
||||
increasing the block size performance will not suffer.
|
||||
|
||||
GNU B<parallel> will waste CPU power if B<--block-size> does not
|
||||
contain a full record, because it tries to find a full record and will
|
||||
fail to do so. The recommendation is therefore to use a
|
||||
B<--block-size> > 2 records, so you always get at least one full
|
||||
record when you read one block.
|
||||
|
||||
If you use B<-N> then B<--block-size> should be big enough to contain
|
||||
N+1 records.
|
||||
|
||||
|
||||
=head2 Shell shock
|
||||
|
||||
The shell shock bug in B<bash> did not affect GNU B<parallel>, but the
|
||||
|
@ -402,10 +395,14 @@ exec'ing a Perl wrapper to monitor the parent pid and kill the child
|
|||
if the parent pid becomes 1, then Ctrl-C works and stderr is kept on
|
||||
stderr.
|
||||
|
||||
To be able to kill all (grand)*children a new process group is
|
||||
started.
|
||||
|
||||
=head3 --nice
|
||||
|
||||
B<nice>ing the remote process is done by B<setpriority(0,0,$nice)>. A
|
||||
few old systems do not implement this and is thus unsupported.
|
||||
few old systems do not implement this and B<--nice> is unsupported on
|
||||
those.
|
||||
|
||||
=head3 Setting $PARALLEL_TMP
|
||||
|
||||
|
@ -417,39 +414,39 @@ remote system.
|
|||
|
||||
The wrapper looks like this:
|
||||
|
||||
$shell = $PARALLEL_SHELL || $SHELL;
|
||||
$tmpdir = $TMPDIR;
|
||||
$nice = $opt::nice;
|
||||
# Set $PARALLEL_TMP to a non-existent file name in $TMPDIR
|
||||
do {
|
||||
$ENV{PARALLEL_TMP} = $tmpdir."/par".
|
||||
join"", map { (0..9,"a".."z","A".."Z")[rand(62)] } (1..5);
|
||||
} while(-e $ENV{PARALLEL_TMP});
|
||||
$SIG{CHLD} = sub { $done = 1; };
|
||||
$pid = fork;
|
||||
unless($pid) {
|
||||
# Make own process group to be able to kill HUP it later
|
||||
setpgrp;
|
||||
eval { setpriority(0,0,$nice) };
|
||||
exec $shell, "-c", ($bashfunc."@ARGV");
|
||||
die "exec: $!\n";
|
||||
}
|
||||
do {
|
||||
# Parent is not init (ppid=1), so sshd is alive
|
||||
# Exponential sleep up to 1 sec
|
||||
$s = $s < 1 ? 0.001 + $s * 1.03 : $s;
|
||||
select(undef, undef, undef, $s);
|
||||
} until ($done || getppid == 1);
|
||||
# Kill HUP the process group if job not done
|
||||
kill(SIGHUP, -${pid}) unless $done;
|
||||
wait;
|
||||
exit ($?&127 ? 128+($?&127) : 1+$?>>8)
|
||||
$shell = $PARALLEL_SHELL || $SHELL;
|
||||
$tmpdir = $TMPDIR;
|
||||
$nice = $opt::nice;
|
||||
# Set $PARALLEL_TMP to a non-existent file name in $TMPDIR
|
||||
do {
|
||||
$ENV{PARALLEL_TMP} = $tmpdir."/par".
|
||||
join"", map { (0..9,"a".."z","A".."Z")[rand(62)] } (1..5);
|
||||
} while(-e $ENV{PARALLEL_TMP});
|
||||
$SIG{CHLD} = sub { $done = 1; };
|
||||
$pid = fork;
|
||||
unless($pid) {
|
||||
# Make own process group to be able to kill HUP it later
|
||||
setpgrp;
|
||||
eval { setpriority(0,0,$nice) };
|
||||
exec $shell, "-c", ($bashfunc."@ARGV");
|
||||
die "exec: $!\n";
|
||||
}
|
||||
do {
|
||||
# Parent is not init (ppid=1), so sshd is alive
|
||||
# Exponential sleep up to 1 sec
|
||||
$s = $s < 1 ? 0.001 + $s * 1.03 : $s;
|
||||
select(undef, undef, undef, $s);
|
||||
} until ($done || getppid == 1);
|
||||
# Kill HUP the process group if job not done
|
||||
kill(SIGHUP, -${pid}) unless $done;
|
||||
wait;
|
||||
exit ($?&127 ? 128+($?&127) : 1+$?>>8)
|
||||
|
||||
=head2 Transferring of variables and functions
|
||||
|
||||
Transferring of variables and functions given by B<--env> is done by
|
||||
running a Perl script remotely that calls the actual command. The Perl
|
||||
script sets $ENV{variable} to the correct value before exec'ing the a
|
||||
script sets B<$ENV{>I<variable>B<}> to the correct value before exec'ing the a
|
||||
shell that runs the function definition followed by the actual
|
||||
command.
|
||||
|
||||
|
@ -512,9 +509,9 @@ shell is used when GNU B<parallel> executes commands.
|
|||
GNU B<parallel> tries hard to use the right shell. If GNU B<parallel>
|
||||
is called from B<tcsh> it will use B<tcsh>. If it is called from
|
||||
B<bash> it will use B<bash>. It does this by looking at the
|
||||
(grand*)parent process: If the (grand*)parent process is a shell, use
|
||||
this shell; otherwise look at the parent of this (grand*)parent. If
|
||||
none of the (grand*)parents are shells, then $SHELL is used.
|
||||
(grand)*parent process: If the (grand)*parent process is a shell, use
|
||||
this shell; otherwise look at the parent of this (grand)*parent. If
|
||||
none of the (grand)*parents are shells, then $SHELL is used.
|
||||
|
||||
This will do the right thing if called from:
|
||||
|
||||
|
@ -570,7 +567,7 @@ not known to B<bash>.
|
|||
=back
|
||||
|
||||
If GNU B<parallel> guesses wrong in these situation, set the shell using
|
||||
$PARALLEL_SHELL.
|
||||
B<$PARALLEL_SHELL>.
|
||||
|
||||
=head2 Quoting
|
||||
|
||||
|
@ -596,7 +593,7 @@ implemented very differently.
|
|||
With B<--pipe> GNU B<parallel> reads the blocks from standard input
|
||||
(stdin), which is then given to the command on standard input (stdin);
|
||||
so every block is being processed by GNU B<parallel> itself. This is
|
||||
the reason why B<--pipe> maxes out at around 100 MB/sec.
|
||||
the reason why B<--pipe> maxes out at around 500 MB/sec.
|
||||
|
||||
B<--pipepart>, on the other hand, first identifies at which byte
|
||||
positions blocks start and how long they are. It does that by seeking
|
||||
|
@ -766,42 +763,32 @@ B<ps> and put the result in a file, which is then used next time.
|
|||
|
||||
=head2 Killing jobs
|
||||
|
||||
B<--memfree>, B<--halt> and when GNU B<parallel> meets a condition
|
||||
from which it cannot recover, jobs are killed. This is done by finding
|
||||
the (grand)*children of the jobs and killing those processes.
|
||||
GNU B<parallel> kills jobs. It can be due to B<--memfree>, B<--halt>,
|
||||
or when GNU B<parallel> meets a condition from which it cannot
|
||||
recover. Every job is started as its own process group. This way any
|
||||
(grand)*children will get killed, too. The process group is killed with
|
||||
the specification mentioned in B<--termseq>.
|
||||
|
||||
More specifically GNU B<parallel> maintains a list of processes to be
|
||||
killed, sends a signal to all processes (first round this is a TERM).
|
||||
It weeds out the processes that exited from the list then waits a
|
||||
while and weeds out again. It does that until all processes are dead
|
||||
or 200 ms passed. Then it does another round with TERM, and finally a
|
||||
round with KILL.
|
||||
|
||||
pids = family_pids(jobs)
|
||||
for signal in TERM, TERM, KILL:
|
||||
for pid in pids:
|
||||
kill signal, pid
|
||||
while kill 0, pids and slept < 200 ms:
|
||||
sleep sleeptime
|
||||
pids = kill 0, pids
|
||||
slept += sleeptime
|
||||
sleeptime = sleeptime * 1.1
|
||||
=head2 SQL interface
|
||||
|
||||
By doing so there is a tiny risk, that GNU B<parallel> will kill
|
||||
processes that are not started from GNU B<parallel>. It, however,
|
||||
requires all of these to be true:
|
||||
GNU B<parallel> uses the DBURL from GNU B<sql> to give database
|
||||
software, username, password, host, port, database, and table in a
|
||||
single string.
|
||||
|
||||
* Process A is sent a signal
|
||||
* It dies during a I<sleep sleeptime> cycle
|
||||
* A new process B is spawned (by an unrelated process)
|
||||
* This is done during the same I<sleep sleeptime> cycle
|
||||
* B is owned by the same user
|
||||
* B reuses the pid of the A
|
||||
The DBURL must point to a table name. The table will be dropped and
|
||||
created. The reason for not reusing an exising table is that the user
|
||||
may have added more input sources which would require more columns in
|
||||
the table.
|
||||
|
||||
It is considered unlikely to ever happen due to:
|
||||
The table columns are similar to joblog with the addition of B<V1>
|
||||
.. B<Vn> which are values from the input sources, and stdout and
|
||||
stderr which are the output from standard output and standard error,
|
||||
respectively.
|
||||
|
||||
The Signal column has been renamed to _Signal due to Signal being a
|
||||
reserved word in MySQL.
|
||||
|
||||
* The longest I<sleep sleeptime> sleeps is 10 ms
|
||||
* Re-use of a dead pid rarely happens within a few seconds
|
||||
|
||||
|
||||
=head1 Ideas for new design
|
||||
|
|
File diff suppressed because it is too large
Load diff
File diff suppressed because it is too large
Load diff
2
src/sql
2
src/sql
|
@ -566,7 +566,7 @@ $Global::Initfile && unlink $Global::Initfile;
|
|||
exit ($err);
|
||||
|
||||
sub parse_options {
|
||||
$Global::version = 20151222;
|
||||
$Global::version = 20160101;
|
||||
$Global::progname = 'sql';
|
||||
|
||||
# This must be done first as this may exec myself
|
||||
|
|
75
testsuite/tests-to-run/parallel-local-sql.sh
Normal file
75
testsuite/tests-to-run/parallel-local-sql.sh
Normal file
|
@ -0,0 +1,75 @@
|
|||
#!/bin/bash
|
||||
|
||||
export SQLITE=sqlite3:///%2Frun%2Fshm%2Fparallel.db
|
||||
export SQLITETBL=$SQLITE/parsql
|
||||
export PG=pg://tange:tange@lo/tange
|
||||
export PGTBL=$PG/parsql
|
||||
export MYSQL=mysql://tange:tange@lo/tange
|
||||
export MYSQLTBL=$MYSQL/parsql
|
||||
export PGTBL2=${PGTBL}2
|
||||
export PGTBL3=${PGTBL}3
|
||||
export PGTBL4=${PGTBL}4
|
||||
export PGTBL5=${PGTBL}5
|
||||
export T1=$(tempfile)
|
||||
export T2=$(tempfile)
|
||||
export T3=$(tempfile)
|
||||
export T4=$(tempfile)
|
||||
export T5=$(tempfile)
|
||||
export T6=$(tempfile)
|
||||
export T7=$(tempfile)
|
||||
export T8=$(tempfile)
|
||||
export T9=$(tempfile)
|
||||
export T10=$(tempfile)
|
||||
export T11=$(tempfile)
|
||||
export T12=$(tempfile)
|
||||
export T13=$(tempfile)
|
||||
export T14=$(tempfile)
|
||||
|
||||
#sql mysql://tange:tange@lo/ 'create database tange;';
|
||||
cat <<'EOF' | sed -e 's/;$/; /;' | stdout parallel -vj0 -k --joblog /tmp/jl-`basename $0` -L1 | perl -pe 's/\s*\d+\.?\d+\s*/999/g;s/999e+999.\s+.\s+/999e+999|999/g;'
|
||||
echo '### --sqlandworker mysql'
|
||||
(sleep 2; parallel --sqlworker $MYSQLTBL sleep .3\;echo >$T1) &
|
||||
parallel --sqlandworker $MYSQLTBL sleep .3\;echo ::: {1..5} ::: {a..e} >$T2;
|
||||
true sort -u $T1 $T2;
|
||||
sql $MYSQL 'select * from parsql order by seq;'
|
||||
|
||||
echo '### --sqlandworker postgresql'
|
||||
(sleep 2; parallel --sqlworker $PGTBL sleep .3\;echo >$T3) &
|
||||
parallel --sqlandworker $PGTBL sleep .3\;echo ::: {1..5} ::: {a..e} >$T4;
|
||||
true sort -u $T3 $T4;
|
||||
sql $PG 'select * from parsql order by seq;'
|
||||
|
||||
echo '### --sqlandworker sqlite'
|
||||
(sleep 2; parallel --sqlworker $SQLITETBL sleep .3\;echo >$T5) &
|
||||
parallel --sqlandworker $SQLITETBL sleep .3\;echo ::: {1..5} ::: {a..e} >$T6;
|
||||
true sort -u $T5 $T6;
|
||||
sql $SQLITE 'select * from parsql order by seq;'
|
||||
|
||||
echo '### --sqlandworker postgresql -S lo'
|
||||
(sleep 2; parallel -S lo --sqlworker $PGTBL2 sleep .3\;echo >$T7) &
|
||||
parallel -S lo --sqlandworker $PGTBL2 sleep .3\;echo ::: {1..5} ::: {a..e} >$T8;
|
||||
true sort -u $T7 $T8;
|
||||
sql $PG 'select * from parsql2 order by seq;'
|
||||
|
||||
echo '### --sqlandworker postgresql --results'
|
||||
mkdir -p /tmp/out--sql
|
||||
(sleep 2; parallel --results /tmp/out--sql --sqlworker $PGTBL3 sleep .3\;echo >$T9) &
|
||||
parallel --results /tmp/out--sql --sqlandworker $PGTBL3 sleep .3\;echo ::: {1..5} ::: {a..e} >$T10;
|
||||
true sort -u $T9 $T10;
|
||||
sql $PG 'select * from parsql3 order by seq;'
|
||||
|
||||
echo '### --sqlandworker postgresql --linebuffer'
|
||||
(sleep 2; parallel --linebuffer --sqlworker $PGTBL4 sleep .3\;echo >$T11) &
|
||||
parallel --linebuffer --sqlandworker $PGTBL4 sleep .3\;echo ::: {1..5} ::: {a..e} >$T12;
|
||||
true sort -u $T11 $T12;
|
||||
sql $PG 'select * from parsql4 order by seq;'
|
||||
|
||||
echo '### --sqlandworker postgresql -u'
|
||||
(sleep 2; parallel -u --sqlworker $PGTBL5 sleep .3\;echo >$T13) &
|
||||
parallel -u --sqlandworker $PGTBL5 sleep .3\;echo ::: {1..5} ::: {a..e} >$T14;
|
||||
true sort -u $T13 $T14;
|
||||
sql $PG 'select * from parsql5 order by seq;'
|
||||
|
||||
EOF
|
||||
|
||||
eval rm '$T'{1..14}
|
Loading…
Reference in a new issue