diff --git a/doc/release_new_version b/doc/release_new_version index f5e2d74f..7a2242f9 100644 --- a/doc/release_new_version +++ b/doc/release_new_version @@ -226,9 +226,9 @@ cc:Tim Cuthbertson , Ryoichiro Suzuki , Jesse Alama -Subject: GNU Parallel 20141222 ('') released +Subject: GNU Parallel 20141222 ('Manila') released -GNU Parallel 20141222 ('') has been released. It is available for download at: http://ftp.gnu.org/gnu/parallel/ +GNU Parallel 20141222 ('Manila') has been released. It is available for download at: http://ftp.gnu.org/gnu/parallel/ Haiku of the month: @@ -238,6 +238,18 @@ New in this release: * GNU Parallel was cited in: Parallel post-processing with MPI-Bash http://dl.acm.org/citation.cfm?id=2691137 +* A semibig refactoring of big functions. All non-trivial functions are now less than 100 lines. The refactoring makes this release beta quality. + +* GNU Parallel was cited in: Parallel post-processing with MPI-Bash http://dl.acm.org/citation.cfm?id=2691137 + +* GNU Parallel was cited in: Distinguishing cause from effect using observational data: methods and benchmarks http://arxiv-web3.library.cornell.edu/pdf/1412.3773.pdf + +* GNU Parallel was cited in: Bayesian Inference of Protein Structure from Chemical Shift Data https://peerj.com/preprints/692.pdf + +* GNU Parallel: Open Source For You (OSFY) magazine, October 2013 edition http://www.shakthimaan.com/posts/2014/11/27/gnu-parallel/news.html + +* コマンドを並列に実行するGNU parallelがとても便利 http://bicycle1885.hatenablog.com/entry/2014/08/10/143612 + * Bug fixes and man page updates. GNU Parallel - For people who live life in the parallel lane. diff --git a/src/Makefile.in b/src/Makefile.in index ba2715f3..16f9df9d 100644 --- a/src/Makefile.in +++ b/src/Makefile.in @@ -78,7 +78,7 @@ NORMAL_UNINSTALL = : PRE_UNINSTALL = : POST_UNINSTALL = : subdir = src -DIST_COMMON = $(srcdir)/Makefile.in $(srcdir)/Makefile.am README +DIST_COMMON = $(srcdir)/Makefile.in $(srcdir)/Makefile.am ACLOCAL_M4 = $(top_srcdir)/aclocal.m4 am__aclocal_m4_deps = $(top_srcdir)/configure.ac am__configure_deps = $(am__aclocal_m4_deps) $(CONFIGURE_DEPENDENCIES) \ diff --git a/src/parallel b/src/parallel index 25b62f81..2359c7e8 100755 --- a/src/parallel +++ b/src/parallel @@ -34,12 +34,6 @@ use Getopt::Long; use strict; use File::Basename; -if(not $ENV{HOME}) { - # $ENV{HOME} is sometimes not set if called from PHP - ::warning("\$HOME not set. Using /tmp\n"); - $ENV{HOME} = "/tmp"; -} - save_stdin_stdout_stderr(); save_original_signal_handler(); parse_options(); @@ -53,8 +47,7 @@ if($Global::max_number_of_args) { $number_of_args = 1; } -my @command; -@command = @ARGV; +my @command = @ARGV; my @fhlist; if($opt::pipepart) { @@ -694,6 +687,7 @@ sub options_hash { "xapply" => \$opt::xapply, "bibtex" => \$opt::bibtex, "nn|nonotice|no-notice" => \$opt::no_notice, + "memfree=s" => \$opt::memfree, # xargs-compatibility - implemented, man, testsuite "max-procs|P=s" => \$opt::jobs, "delimiter|d=s" => \$opt::d, @@ -779,56 +773,7 @@ sub get_options_from_array { sub parse_options { # Returns: N/A - # Defaults: - $Global::version = 20141123; - $Global::progname = 'parallel'; - $Global::infinity = 2**31; - $Global::debug = 0; - $Global::verbose = 0; - $Global::quoting = 0; - # Read only table with default --rpl values - %Global::replace = - ( - '{}' => '', - '{#}' => '1 $_=$job->seq()', - '{%}' => '1 $_=$job->slot()', - '{/}' => 's:.*/::', - '{//}' => '$Global::use{"File::Basename"} ||= eval "use File::Basename; 1;"; $_ = dirname($_);', - '{/.}' => 's:.*/::; s:\.[^/.]+$::;', - '{.}' => 's:\.[^/.]+$::', - ); - %Global::plus = - ( - # {} = {+/}/{/} - # = {.}.{+.} = {+/}/{/.}.{+.} - # = {..}.{+..} = {+/}/{/..}.{+..} - # = {...}.{+...} = {+/}/{/...}.{+...} - '{+/}' => 's:/[^/]*$::', - '{+.}' => 's:.*\.::', - '{+..}' => 's:.*\.([^.]*\.):$1:', - '{+...}' => 's:.*\.([^.]*\.[^.]*\.):$1:', - '{..}' => 's:\.[^/.]+$::; s:\.[^/.]+$::', - '{...}' => 's:\.[^/.]+$::; s:\.[^/.]+$::; s:\.[^/.]+$::', - '{/..}' => 's:.*/::; s:\.[^/.]+$::; s:\.[^/.]+$::', - '{/...}' => 's:.*/::; s:\.[^/.]+$::; s:\.[^/.]+$::; s:\.[^/.]+$::', - ); - # Modifiable copy of %Global::replace - %Global::rpl = %Global::replace; - $Global::parens = "{==}"; - $/="\n"; - $Global::ignore_empty = 0; - $Global::interactive = 0; - $Global::stderr_verbose = 0; - $Global::default_simultaneous_sshlogins = 9; - $Global::exitstatus = 0; - $Global::halt_on_error_exitstatus = 0; - $Global::arg_sep = ":::"; - $Global::arg_file_sep = "::::"; - $Global::trim = 'n'; - $Global::max_jobs_running = 0; - $Global::job_already_run = ''; - $ENV{'TMPDIR'} ||= "/tmp"; - + init_globals(); @ARGV=read_options(); if(@opt::v) { $Global::verbose = $#opt::v+1; } # Convert -v -v to v=2 @@ -842,37 +787,7 @@ sub parse_options { if(defined $opt::q) { $Global::quoting = 1; } if(defined $opt::r) { $Global::ignore_empty = 1; } if(defined $opt::verbose) { $Global::stderr_verbose = 1; } - # Deal with --rpl - sub rpl { - # Modify %Global::rpl - # Replace $old with $new - my ($old,$new) = @_; - if($old ne $new) { - $Global::rpl{$new} = $Global::rpl{$old}; - delete $Global::rpl{$old}; - } - } - if(defined $opt::parens) { $Global::parens = $opt::parens; } - my $parenslen = 0.5*length $Global::parens; - $Global::parensleft = substr($Global::parens,0,$parenslen); - $Global::parensright = substr($Global::parens,$parenslen); - if(defined $opt::plus) { %Global::rpl = (%Global::plus,%Global::rpl); } - if(defined $opt::I) { rpl('{}',$opt::I); } - if(defined $opt::U) { rpl('{.}',$opt::U); } - if(defined $opt::i and $opt::i) { rpl('{}',$opt::i); } - if(defined $opt::basenamereplace) { rpl('{/}',$opt::basenamereplace); } - if(defined $opt::dirnamereplace) { rpl('{//}',$opt::dirnamereplace); } - if(defined $opt::seqreplace) { rpl('{#}',$opt::seqreplace); } - if(defined $opt::slotreplace) { rpl('{%}',$opt::slotreplace); } - if(defined $opt::basenameextensionreplace) { - rpl('{/.}',$opt::basenameextensionreplace); - } - for(@opt::rpl) { - # Create $Global::rpl entries for --rpl options - # E.g: "{..} s:\.[^.]+$:;s:\.[^.]+$:;" - my ($shorthand,$long) = split/ /,$_,2; - $Global::rpl{$shorthand} = $long; - } + parse_replacement_string_options(); if(defined $opt::eof) { $Global::end_of_file_string = $opt::eof; } if(defined $opt::max_args) { $Global::max_number_of_args = $opt::max_args; } if(defined $opt::timeout) { $Global::timeoutq = TimeoutQueue->new($opt::timeout); } @@ -901,13 +816,8 @@ sub parse_options { not defined $opt::recend) { $opt::recend = "\n"; } if(not defined $opt::blocksize) { $opt::blocksize = "1M"; } $opt::blocksize = multiply_binary_prefix($opt::blocksize); + $opt::memfree = multiply_binary_prefix($opt::memfree); if(defined $opt::controlmaster) { $opt::noctrlc = 1; } - if(defined $opt::semaphore) { $Global::semaphore = 1; } - if(defined $opt::semaphoretimeout) { $Global::semaphore = 1; } - if(defined $opt::semaphorename) { $Global::semaphore = 1; } - if(defined $opt::fg) { $Global::semaphore = 1; } - if(defined $opt::bg) { $Global::semaphore = 1; } - if(defined $opt::wait) { $Global::semaphore = 1; } if(defined $opt::halt_on_error and $opt::halt_on_error=~/%/) { $opt::halt_on_error /= 100; } if(defined $opt::timeout and $opt::timeout !~ /^\d+(\.\d+)?%?$/) { @@ -1002,34 +912,8 @@ sub parse_options { # Deal with ::: and :::: @ARGV=read_args_from_command_line(); } + parse_semaphore(); - # Semaphore defaults - # Must be done before computing number of processes and max_line_length - # because when running as a semaphore GNU Parallel does not read args - $Global::semaphore ||= ($0 =~ m:(^|/)sem$:); # called as 'sem' - if($Global::semaphore) { - # A semaphore does not take input from neither stdin nor file - @opt::a = ("/dev/null"); - push(@Global::unget_argv, [Arg->new("")]); - $Semaphore::timeout = $opt::semaphoretimeout || 0; - if(defined $opt::semaphorename) { - $Semaphore::name = $opt::semaphorename; - } else { - $Semaphore::name = `tty`; - chomp $Semaphore::name; - } - $Semaphore::fg = $opt::fg; - $Semaphore::wait = $opt::wait; - $Global::default_simultaneous_sshlogins = 1; - if(not defined $opt::jobs) { - $opt::jobs = 1; - } - if($Global::interactive and $opt::bg) { - ::error("Jobs running in the ". - "background cannot be interactive.\n"); - ::wait_and_exit(255); - } - } if(defined $opt::eta) { $opt::progress = $opt::eta; } @@ -1065,6 +949,133 @@ sub parse_options { open_joblog(); } +sub init_globals { + # Defaults: + $Global::version = 20141212; + $Global::progname = 'parallel'; + $Global::infinity = 2**31; + $Global::debug = 0; + $Global::verbose = 0; + $Global::quoting = 0; + # Read only table with default --rpl values + %Global::replace = + ( + '{}' => '', + '{#}' => '1 $_=$job->seq()', + '{%}' => '1 $_=$job->slot()', + '{/}' => 's:.*/::', + '{//}' => '$Global::use{"File::Basename"} ||= eval "use File::Basename; 1;"; $_ = dirname($_);', + '{/.}' => 's:.*/::; s:\.[^/.]+$::;', + '{.}' => 's:\.[^/.]+$::', + ); + %Global::plus = + ( + # {} = {+/}/{/} + # = {.}.{+.} = {+/}/{/.}.{+.} + # = {..}.{+..} = {+/}/{/..}.{+..} + # = {...}.{+...} = {+/}/{/...}.{+...} + '{+/}' => 's:/[^/]*$::', + '{+.}' => 's:.*\.::', + '{+..}' => 's:.*\.([^.]*\.):$1:', + '{+...}' => 's:.*\.([^.]*\.[^.]*\.):$1:', + '{..}' => 's:\.[^/.]+$::; s:\.[^/.]+$::', + '{...}' => 's:\.[^/.]+$::; s:\.[^/.]+$::; s:\.[^/.]+$::', + '{/..}' => 's:.*/::; s:\.[^/.]+$::; s:\.[^/.]+$::', + '{/...}' => 's:.*/::; s:\.[^/.]+$::; s:\.[^/.]+$::; s:\.[^/.]+$::', + ); + # Modifiable copy of %Global::replace + %Global::rpl = %Global::replace; + $Global::parens = "{==}"; + $/="\n"; + $Global::ignore_empty = 0; + $Global::interactive = 0; + $Global::stderr_verbose = 0; + $Global::default_simultaneous_sshlogins = 9; + $Global::exitstatus = 0; + $Global::halt_on_error_exitstatus = 0; + $Global::arg_sep = ":::"; + $Global::arg_file_sep = "::::"; + $Global::trim = 'n'; + $Global::max_jobs_running = 0; + $Global::job_already_run = ''; + $ENV{'TMPDIR'} ||= "/tmp"; + if(not $ENV{HOME}) { + # $ENV{HOME} is sometimes not set if called from PHP + ::warning("\$HOME not set. Using /tmp\n"); + $ENV{HOME} = "/tmp"; + } +} + +sub parse_replacement_string_options { + # Deal with --rpl + sub rpl { + # Modify %Global::rpl + # Replace $old with $new + my ($old,$new) = @_; + if($old ne $new) { + $Global::rpl{$new} = $Global::rpl{$old}; + delete $Global::rpl{$old}; + } + } + if(defined $opt::parens) { $Global::parens = $opt::parens; } + my $parenslen = 0.5*length $Global::parens; + $Global::parensleft = substr($Global::parens,0,$parenslen); + $Global::parensright = substr($Global::parens,$parenslen); + if(defined $opt::plus) { %Global::rpl = (%Global::plus,%Global::rpl); } + if(defined $opt::I) { rpl('{}',$opt::I); } + if(defined $opt::U) { rpl('{.}',$opt::U); } + if(defined $opt::i and $opt::i) { rpl('{}',$opt::i); } + if(defined $opt::basenamereplace) { rpl('{/}',$opt::basenamereplace); } + if(defined $opt::dirnamereplace) { rpl('{//}',$opt::dirnamereplace); } + if(defined $opt::seqreplace) { rpl('{#}',$opt::seqreplace); } + if(defined $opt::slotreplace) { rpl('{%}',$opt::slotreplace); } + if(defined $opt::basenameextensionreplace) { + rpl('{/.}',$opt::basenameextensionreplace); + } + for(@opt::rpl) { + # Create $Global::rpl entries for --rpl options + # E.g: "{..} s:\.[^.]+$:;s:\.[^.]+$:;" + my ($shorthand,$long) = split/ /,$_,2; + $Global::rpl{$shorthand} = $long; + } +} + +sub parse_semaphore { + # Semaphore defaults + # Must be done before computing number of processes and max_line_length + # because when running as a semaphore GNU Parallel does not read args + $Global::semaphore ||= ($0 =~ m:(^|/)sem$:); # called as 'sem' + if(defined $opt::semaphore) { $Global::semaphore = 1; } + if(defined $opt::semaphoretimeout) { $Global::semaphore = 1; } + if(defined $opt::semaphorename) { $Global::semaphore = 1; } + if(defined $opt::fg) { $Global::semaphore = 1; } + if(defined $opt::bg) { $Global::semaphore = 1; } + if(defined $opt::wait) { $Global::semaphore = 1; } + if($Global::semaphore) { + # A semaphore does not take input from neither stdin nor file + @opt::a = ("/dev/null"); + push(@Global::unget_argv, [Arg->new("")]); + $Semaphore::timeout = $opt::semaphoretimeout || 0; + if(defined $opt::semaphorename) { + $Semaphore::name = $opt::semaphorename; + } else { + $Semaphore::name = `tty`; + chomp $Semaphore::name; + } + $Semaphore::fg = $opt::fg; + $Semaphore::wait = $opt::wait; + $Global::default_simultaneous_sshlogins = 1; + if(not defined $opt::jobs) { + $opt::jobs = 1; + } + if($Global::interactive and $opt::bg) { + ::error("Jobs running in the ". + "background cannot be interactive.\n"); + ::wait_and_exit(255); + } + } +} + sub env_quote { # Input: # $v = value to quote @@ -1498,9 +1509,10 @@ sub shell_quote_scalar { # $shell_quoted = string quoted with \ as needed by the shell my $a = $_[0]; if(defined $a) { - # $a =~ s/([\002-\011\013-\032\\\#\?\`\(\)\{\}\[\]\*\>\<\~\|\; \"\!\$\&\'\202-\377])/\\$1/g; + # Solaris sh wants ^ quoted. + # $a =~ s/([\002-\011\013-\032\\\#\?\`\(\)\{\}\[\]\^\*\>\<\~\|\; \"\!\$\&\'\202-\377])/\\$1/g; # This is 1% faster than the above - $a =~ s/[\002-\011\013-\032\\\#\?\`\(\)\{\}\[\]\*\>\<\~\|\; \"\!\$\&\'\202-\377]/\\$&/go; + $a =~ s/[\002-\011\013-\032\\\#\?\`\(\)\{\}\[\]\^\*\>\<\~\|\; \"\!\$\&\'\202-\377]/\\$&/go; $a =~ s/[\n]/'\n'/go; # filenames with '\n' is quoted using \' } return $a; @@ -1558,6 +1570,8 @@ sub save_stdin_stdout_stderr { } open $Global::original_stderr, ">&", "STDERR" or ::die_bug("Can't dup STDERR: $!"); + open $Global::status_fd, ">&", "STDERR" or + ::die_bug("Can't dup STDERR: $!"); open $Global::original_stdin, "<&", "STDIN" or ::die_bug("Can't dup STDIN: $!"); } @@ -1639,36 +1653,7 @@ sub init_run_jobs { my $last_time; my %last_mtime; -sub start_more_jobs { - # Run start_another_job() but only if: - # * not $Global::start_no_new_jobs set - # * not JobQueue is empty - # * not load on server is too high - # * not server swapping - # * not too short time since last remote login - # Uses: - # $Global::max_procs_file - # $Global::max_procs_file_last_mod - # %Global::host - # @opt::sshloginfile - # $Global::start_no_new_jobs - # $opt::filter_hosts - # $Global::JobQueue - # $opt::pipe - # $opt::load - # $opt::noswap - # $opt::delay - # $Global::newest_starttime - # Returns: - # $jobs_started = number of jobs started - my $jobs_started = 0; - my $jobs_started_this_round = 0; - if($Global::start_no_new_jobs) { - return $jobs_started; - } - if(time - ($last_time||0) > 1) { - # At most do this every second - $last_time = time; + sub changed_procs_file { if($Global::max_procs_file) { # --jobs filename my $mtime = (stat($Global::max_procs_file))[9]; @@ -1680,6 +1665,8 @@ sub start_more_jobs { } } } + } + sub changed_sshloginfile { if(@opt::sshloginfile) { # Is --sshloginfile changed? for my $slf (@opt::sshloginfile) { @@ -1706,138 +1693,176 @@ sub start_more_jobs { } } } - do { - $jobs_started_this_round = 0; - # This will start 1 job on each --sshlogin (if possible) - # thus distribute the jobs on the --sshlogins round robin - for my $sshlogin (values %Global::host) { - if($Global::JobQueue->empty() and not $opt::pipe) { - # No more jobs in the queue - last; - } - debug("run", "Running jobs before on ", $sshlogin->string(), ": ", - $sshlogin->jobs_running(), "\n"); - if ($sshlogin->jobs_running() < $sshlogin->max_jobs_running()) { - if($opt::load and $sshlogin->loadavg_too_high()) { - # The load is too high or unknown - next; - } - if($opt::noswap and $sshlogin->swapping()) { - # The server is swapping - next; - } - if($sshlogin->too_fast_remote_login()) { - # It has been too short since - next; - } - if($opt::delay and $opt::delay > ::now() - $Global::newest_starttime) { - # It has been too short since last start - next; - } - debug("run", $sshlogin->string(), " has ", $sshlogin->jobs_running(), - " out of ", $sshlogin->max_jobs_running(), - " jobs running. Start another.\n"); - if(start_another_job($sshlogin) == 0) { - # No more jobs to start on this $sshlogin - debug("run","No jobs started on ", $sshlogin->string(), "\n"); - next; - } - $sshlogin->inc_jobs_running(); - $sshlogin->set_last_login_at(::now()); - $jobs_started++; - $jobs_started_this_round++; - } - debug("run","Running jobs after on ", $sshlogin->string(), ": ", - $sshlogin->jobs_running(), " of ", - $sshlogin->max_jobs_running(), "\n"); + sub start_more_jobs { + # Run start_another_job() but only if: + # * not $Global::start_no_new_jobs set + # * not JobQueue is empty + # * not load on server is too high + # * not server swapping + # * not too short time since last remote login + # Uses: + # $Global::max_procs_file + # $Global::max_procs_file_last_mod + # %Global::host + # @opt::sshloginfile + # $Global::start_no_new_jobs + # $opt::filter_hosts + # $Global::JobQueue + # $opt::pipe + # $opt::load + # $opt::noswap + # $opt::delay + # $Global::newest_starttime + # Returns: + # $jobs_started = number of jobs started + my $jobs_started = 0; + my $jobs_started_this_round = 0; + if($Global::start_no_new_jobs) { + return $jobs_started; } - } while($jobs_started_this_round); + if(time - ($last_time||0) > 1) { + # At most do this every second + $last_time = time; + changed_procs_file(); + changed_sshloginfile(); + } + do { + $jobs_started_this_round = 0; + # This will start 1 job on each --sshlogin (if possible) + # thus distribute the jobs on the --sshlogins round robin + for my $sshlogin (values %Global::host) { + if($Global::JobQueue->empty() and not $opt::pipe) { + # No more jobs in the queue + last; + } + debug("run", "Running jobs before on ", $sshlogin->string(), ": ", + $sshlogin->jobs_running(), "\n"); + if ($sshlogin->jobs_running() < $sshlogin->max_jobs_running()) { + if($opt::delay and $opt::delay > ::now() - $Global::newest_starttime) { + # It has been too short since last start + next; + } + if($opt::load and $sshlogin->loadavg_too_high()) { + # The load is too high or unknown + next; + } + if($opt::noswap and $sshlogin->swapping()) { + # The server is swapping + next; + } + if($opt::memfree and $sshlogin->memfree() < $opt::memfree) { + # The server has not enough mem free + ::debug("mem", "Not starting job: not enough mem\n"); + next; + } + if($sshlogin->too_fast_remote_login()) { + # It has been too short since + next; + } + debug("run", $sshlogin->string(), " has ", $sshlogin->jobs_running(), + " out of ", $sshlogin->max_jobs_running(), + " jobs running. Start another.\n"); + if(start_another_job($sshlogin) == 0) { + # No more jobs to start on this $sshlogin + debug("run","No jobs started on ", $sshlogin->string(), "\n"); + next; + } + $sshlogin->inc_jobs_running(); + $sshlogin->set_last_login_at(::now()); + $jobs_started++; + $jobs_started_this_round++; + } + debug("run","Running jobs after on ", $sshlogin->string(), ": ", + $sshlogin->jobs_running(), " of ", + $sshlogin->max_jobs_running(), "\n"); + } + } while($jobs_started_this_round); - return $jobs_started; -} + return $jobs_started; + } } { my $no_more_file_handles_warned; -sub start_another_job { - # If there are enough filehandles - # and JobQueue not empty - # and not $job is in joblog - # Then grab a job from Global::JobQueue, - # start it at sshlogin - # mark it as virgin_job - # Inputs: - # $sshlogin = the SSHLogin to start the job on - # Uses: - # $Global::JobQueue - # $opt::pipe - # $opt::results - # $opt::resume - # @Global::virgin_jobs - # Returns: - # 1 if another jobs was started - # 0 otherwise - my $sshlogin = shift; - # Do we have enough file handles to start another job? - if(enough_file_handles()) { - if($Global::JobQueue->empty() and not $opt::pipe) { - # No more commands to run - debug("start", "Not starting: JobQueue empty\n"); - return 0; - } else { - my $job; - # Skip jobs already in job log - # Skip jobs already in results - do { - $job = get_job_with_sshlogin($sshlogin); - if(not defined $job) { - # No command available for that sshlogin - debug("start", "Not starting: no jobs available for ", - $sshlogin->string(), "\n"); + sub start_another_job { + # If there are enough filehandles + # and JobQueue not empty + # and not $job is in joblog + # Then grab a job from Global::JobQueue, + # start it at sshlogin + # mark it as virgin_job + # Inputs: + # $sshlogin = the SSHLogin to start the job on + # Uses: + # $Global::JobQueue + # $opt::pipe + # $opt::results + # $opt::resume + # @Global::virgin_jobs + # Returns: + # 1 if another jobs was started + # 0 otherwise + my $sshlogin = shift; + # Do we have enough file handles to start another job? + if(enough_file_handles()) { + if($Global::JobQueue->empty() and not $opt::pipe) { + # No more commands to run + debug("start", "Not starting: JobQueue empty\n"); + return 0; + } else { + my $job; + # Skip jobs already in job log + # Skip jobs already in results + do { + $job = get_job_with_sshlogin($sshlogin); + if(not defined $job) { + # No command available for that sshlogin + debug("start", "Not starting: no jobs available for ", + $sshlogin->string(), "\n"); + return 0; + } + } while ($job->is_already_in_joblog() + or + ($opt::results and $opt::resume and $job->is_already_in_results())); + debug("start", "Command to run on '", $job->sshlogin()->string(), "': '", + $job->replaced(),"'\n"); + if($job->start()) { + if($opt::pipe) { + push(@Global::virgin_jobs,$job); + } + debug("start", "Started as seq ", $job->seq(), + " pid:", $job->pid(), "\n"); + return 1; + } else { + # Not enough processes to run the job. + # Put it back on the queue. + $Global::JobQueue->unget($job); + # Count down the number of jobs to run for this SSHLogin. + my $max = $sshlogin->max_jobs_running(); + if($max > 1) { $max--; } else { + ::error("No more processes: cannot run a single job. Something is wrong.\n"); + ::wait_and_exit(255); + } + $sshlogin->set_max_jobs_running($max); + # Sleep up to 300 ms to give other processes time to die + ::usleep(rand()*300); + ::warning("No more processes: ", + "Decreasing number of running jobs to $max. ", + "Raising ulimit -u or /etc/security/limits.conf may help.\n"); return 0; } - } while ($job->is_already_in_joblog() - or - ($opt::results and $opt::resume and $job->is_already_in_results())); - debug("start", "Command to run on '", $job->sshlogin()->string(), "': '", - $job->replaced(),"'\n"); - if($job->start()) { - if($opt::pipe) { - push(@Global::virgin_jobs,$job); - } - debug("start", "Started as seq ", $job->seq(), - " pid:", $job->pid(), "\n"); - return 1; - } else { - # Not enough processes to run the job. - # Put it back on the queue. - $Global::JobQueue->unget($job); - # Count down the number of jobs to run for this SSHLogin. - my $max = $sshlogin->max_jobs_running(); - if($max > 1) { $max--; } else { - ::error("No more processes: cannot run a single job. Something is wrong.\n"); - ::wait_and_exit(255); - } - $sshlogin->set_max_jobs_running($max); - # Sleep up to 300 ms to give other processes time to die - ::usleep(rand()*300); - ::warning("No more processes: ", - "Decreasing number of running jobs to $max. ", - "Raising ulimit -u or /etc/security/limits.conf may help.\n"); - return 0; - } - } - } else { - # No more file handles - $no_more_file_handles_warned++ or - ::warning("No more file handles. ", - "Raising ulimit -n or /etc/security/limits.conf may help.\n"); - return 0; + } + } else { + # No more file handles + $no_more_file_handles_warned++ or + ::warning("No more file handles. ", + "Raising ulimit -n or /etc/security/limits.conf may help.\n"); + return 0; + } } } -} sub init_progress { # Uses: @@ -1865,7 +1890,7 @@ sub drain_job_queue { # $Global::start_no_new_jobs # Returns: N/A if($opt::progress) { - print $Global::original_stderr init_progress(); + ::status(init_progress()); } my $last_header=""; my $sleep = 0.2; @@ -1882,11 +1907,10 @@ sub drain_job_queue { if($opt::progress) { my %progress = progress(); if($last_header ne $progress{'header'}) { - print $Global::original_stderr "\n", $progress{'header'}, "\n"; + ::status("\n", $progress{'header'}, "\n"); $last_header = $progress{'header'}; } - print $Global::original_stderr "\r",$progress{'status'}; - flush $Global::original_stderr; + ::status("\r",$progress{'status'}); } if($Global::total_running < $Global::max_jobs_running and not $Global::JobQueue->empty()) { @@ -1920,8 +1944,7 @@ sub drain_job_queue { not $Global::start_no_new_jobs and not $Global::JobQueue->empty()); if($opt::progress) { my %progress = progress(); - print $Global::original_stderr "\r", $progress{'status'}, "\n"; - flush $Global::original_stderr; + ::status("\r", $progress{'status'}, "\n"); } } @@ -1929,11 +1952,10 @@ sub toggle_progress { # Turn on/off progress view # Uses: # $opt::progress - # $Global::original_stderr # Returns: N/A $opt::progress = not $opt::progress; if($opt::progress) { - print $Global::original_stderr init_progress(); + ::status(init_progress()); } } @@ -1971,6 +1993,7 @@ sub progress { $Global::host{$w}->max_jobs_running()."\n"; } $status = "x"x($termcols+1); + # Select an output format that will fit on a single line if(length $status > $termcols) { # sshlogin1:XX/XX/XX%/XX.Xs sshlogin2:XX/XX/XX%/XX.Xs sshlogin3:XX/XX/XX%/XX.Xs $header = "Computer:jobs running/jobs completed/%of started jobs/Average seconds to complete"; @@ -2169,17 +2192,23 @@ sub progress { } sub get_job_with_sshlogin { + # Input: + # $sshlogin = which host should the job be run on? + # Uses: + # $opt::hostgroups + # $Global::JobQueue # Returns: - # next job object for $sshlogin if any available + # $job = next job object for $sshlogin if any available my $sshlogin = shift; - my $job = undef; + my $job; if ($opt::hostgroups) { my @other_hostgroup_jobs = (); while($job = $Global::JobQueue->get()) { if($sshlogin->in_hostgroups($job->hostgroups())) { - # Found a job for this hostgroup + # Found a job to be run on a hostgroup of this + # $sshlogin last; } else { # This job was not in the hostgroups of $sshlogin @@ -2239,6 +2268,9 @@ sub get_job_with_sshlogin { sub __REMOTE_SSH__ {} sub read_sshloginfiles { + # Read a list of --slf's + # Input: + # @files = files or symbolic file names to read # Returns: N/A for my $s (@_) { read_sshloginfile(expand_slf_shorthand($s)); @@ -2246,6 +2278,11 @@ sub read_sshloginfiles { } sub expand_slf_shorthand { + # Expand --slf shorthand into a read file name + # Input: + # $file = file or symbolic file name to read + # Returns: + # $file = actual file name to read my $file = shift; if($file eq "-") { # skip: It is stdin @@ -2266,6 +2303,11 @@ sub expand_slf_shorthand { } sub read_sshloginfile { + # Read sshloginfile into @Global::sshlogin + # Input: + # $file = file to read + # Uses: + # @Global::sshlogin # Returns: N/A my $file = shift; my $close = 1; @@ -2293,6 +2335,17 @@ sub read_sshloginfile { } sub parse_sshlogin { + # Parse @Global::sshlogin into %Global::host. + # Keep only hosts that are in one of the given ssh hostgroups. + # Uses: + # @Global::sshlogin + # $Global::minimal_command_line_length + # %Global::host + # $opt::transfer + # @opt::return + # $opt::cleanup + # @opt::basefile + # @opt::trc # Returns: N/A my @login; if(not @Global::sshlogin) { @Global::sshlogin = (":"); } @@ -2369,6 +2422,8 @@ sub parse_sshlogin { sub remote_hosts { # Return sshlogins that are not ':' + # Uses: + # %Global::host # Returns: # list of sshlogins with ':' removed return grep !/^:$/, keys %Global::host; @@ -2377,6 +2432,9 @@ sub remote_hosts { sub setup_basefile { # Transfer basefiles to each $sshlogin # This needs to be done before first jobs on $sshlogin is run + # Uses: + # %Global::host + # @opt::basefile # Returns: N/A my $cmd = ""; my $rsync_destdir; @@ -2399,6 +2457,9 @@ sub setup_basefile { sub cleanup_basefile { # Remove the basefiles transferred + # Uses: + # %Global::host + # @opt::basefile # Returns: N/A my $cmd=""; my $workdir = Job->new("")->workdir(); @@ -2414,41 +2475,58 @@ sub cleanup_basefile { } sub filter_hosts { - my(@cores, @cpus, @maxline, @echo); - my $envvar = ::shell_quote_scalar($Global::envvar); - while (my ($host, $sshlogin) = each %Global::host) { - if($host eq ":") { next } - # The 'true' is used to get the $host out later - my $sshcmd = "true $host;" . $sshlogin->sshcommand()." ".$sshlogin->serverlogin(); - push(@cores, $host."\t".$sshcmd." ".$envvar." parallel --number-of-cores\n\0"); - push(@cpus, $host."\t".$sshcmd." ".$envvar." parallel --number-of-cpus\n\0"); - push(@maxline, $host."\t".$sshcmd." ".$envvar." parallel --max-line-length-allowed\n\0"); - # 'echo' is used to get the best possible value for an ssh login time - push(@echo, $host."\t".$sshcmd." echo\n\0"); - } - my ($fh, $tmpfile) = ::tmpfile(SUFFIX => ".ssh"); - print $fh @cores, @cpus, @maxline, @echo; - close $fh; - # --timeout 5: Setting up an SSH connection and running a simple - # command should never take > 5 sec. - # --delay 0.1: If multiple sshlogins use the same proxy the delay - # will make it less likely to overload the ssh daemon. - # --retries 3: If the ssh daemon it overloaded, try 3 times - # -s 16000: Half of the max line on UnixWare - my $cmd = "cat $tmpfile | $0 -j0 --timeout 5 -s 16000 --joblog - --plain --delay 0.1 --retries 3 --tag --tagstring {1} -0 --colsep '\t' -k eval {2} 2>/dev/null"; - ::debug("init", $cmd, "\n"); - open(my $host_fh, "-|", $cmd) || ::die_bug("parallel host check: $cmd"); + # Remove down --sshlogins from active duty. + # Find ncpus, ncores, maxlen, time-to-login for each host. + # Uses: + # %Global::host + # $Global::minimal_command_line_length + # $opt::use_cpus_instead_of_cores + # Returns: + # N/A my (%ncores, %ncpus, %time_to_login, %maxlen, %echo, @down_hosts); - my $prepend = ""; - while(<$host_fh>) { - if(/\'$/) { - # if last char = ' then append next line - # This may be due to quoting of $Global::envvar - $prepend .= $_; - next; + + my ($ncores_ref, $ncpus_ref, $time_to_login_ref, $maxlen_ref, + $echo_ref, $down_hosts_ref) = + parse_host_filtering(parallelized_host_filtering()); + %ncores = %$ncores_ref; + %ncpus = %$ncpus_ref; + %time_to_login = %$time_to_login_ref; + %maxlen = %$maxlen_ref; + %echo = %$echo_ref; + @down_hosts = @$down_hosts_ref; + + delete @Global::host{@down_hosts}; + @down_hosts and ::warning("Removed @down_hosts\n"); + + $Global::minimal_command_line_length = 8_000_000; + while (my ($sshlogin, $obj) = each %Global::host) { + if($sshlogin eq ":") { next } + $ncpus{$sshlogin} or ::die_bug("ncpus missing: ".$obj->serverlogin()); + $ncores{$sshlogin} or ::die_bug("ncores missing: ".$obj->serverlogin()); + $time_to_login{$sshlogin} or ::die_bug("time_to_login missing: ".$obj->serverlogin()); + $maxlen{$sshlogin} or ::die_bug("maxlen missing: ".$obj->serverlogin()); + if($opt::use_cpus_instead_of_cores) { + $obj->set_ncpus($ncpus{$sshlogin}); + } else { + $obj->set_ncpus($ncores{$sshlogin}); } - $_ = $prepend . $_; - $prepend = ""; + $obj->set_time_to_login($time_to_login{$sshlogin}); + $obj->set_maxlength($maxlen{$sshlogin}); + $Global::minimal_command_line_length = + ::min($Global::minimal_command_line_length, + int($maxlen{$sshlogin}/2)); + ::debug("init", "Timing from -S:$sshlogin ncpus:",$ncpus{$sshlogin}, + " ncores:", $ncores{$sshlogin}, + " time_to_login:", $time_to_login{$sshlogin}, + " maxlen:", $maxlen{$sshlogin}, + " min_max_len:", $Global::minimal_command_line_length,"\n"); + } +} + +sub parse_host_filtering { + my (%ncores, %ncpus, %time_to_login, %maxlen, %echo, @down_hosts); + + for (@_) { chomp; my @col = split /\t/, $_; if(defined $col[6]) { @@ -2470,7 +2548,6 @@ sub filter_hosts { # Remove sshlogin ::debug("init", "--filtered $host\n"); push(@down_hosts, $host); - @down_hosts = uniq(@down_hosts); } elsif($col[6] eq "127") { # signal == 127: parallel not installed remote # Set ncpus and ncores = 1 @@ -2516,33 +2593,61 @@ sub filter_hosts { ::die_bug("host check unmatched short jobline ($col[0]): $_"); } } + @down_hosts = uniq(@down_hosts); + return(\%ncores, \%ncpus, \%time_to_login, \%maxlen, \%echo, \@down_hosts); +} + +sub parallelized_host_filtering { + # Uses: + # $Global::envvar + # %Global::host + # Returns: + # text entries with: + # * joblog line + # * hostname \t number of cores + # * hostname \t number of cpus + # * hostname \t max-line-length-allowed + # * hostname \t empty + my(@cores, @cpus, @maxline, @echo); + my $envvar = ::shell_quote_scalar($Global::envvar); + while (my ($host, $sshlogin) = each %Global::host) { + if($host eq ":") { next } + # The 'true' is used to get the $host out later + my $sshcmd = "true $host;" . $sshlogin->sshcommand()." ".$sshlogin->serverlogin(); + push(@cores, $host."\t".$sshcmd." ".$envvar." parallel --number-of-cores\n\0"); + push(@cpus, $host."\t".$sshcmd." ".$envvar." parallel --number-of-cpus\n\0"); + push(@maxline, $host."\t".$sshcmd." ".$envvar." parallel --max-line-length-allowed\n\0"); + # 'echo' is used to get the best possible value for an ssh login time + push(@echo, $host."\t".$sshcmd." echo\n\0"); + } + my ($fh, $tmpfile) = ::tmpfile(SUFFIX => ".ssh"); + print $fh @cores, @cpus, @maxline, @echo; + close $fh; + # --timeout 5: Setting up an SSH connection and running a simple + # command should never take > 5 sec. + # --delay 0.1: If multiple sshlogins use the same proxy the delay + # will make it less likely to overload the ssh daemon. + # --retries 3: If the ssh daemon it overloaded, try 3 times + # -s 16000: Half of the max line on UnixWare + my $cmd = "cat $tmpfile | $0 -j0 --timeout 5 -s 16000 --joblog - --plain --delay 0.1 --retries 3 --tag --tagstring {1} -0 --colsep '\t' -k eval {2} 2>/dev/null"; + ::debug("init", $cmd, "\n"); + my @out; + my $prepend = ""; + open(my $host_fh, "-|", $cmd) || ::die_bug("parallel host check: $cmd"); + for(<$host_fh>) { + if(/\'$/) { + # if last char = ' then append next line + # This may be due to quoting of $Global::envvar + $prepend .= $_; + next; + } + $_ = $prepend . $_; + $prepend = ""; + push @out, $_; + } close $host_fh; $Global::debug or unlink $tmpfile; - delete @Global::host{@down_hosts}; - @down_hosts and ::warning("Removed @down_hosts\n"); - $Global::minimal_command_line_length = 8_000_000; - while (my ($sshlogin, $obj) = each %Global::host) { - if($sshlogin eq ":") { next } - $ncpus{$sshlogin} or ::die_bug("ncpus missing: ".$obj->serverlogin()); - $ncores{$sshlogin} or ::die_bug("ncores missing: ".$obj->serverlogin()); - $time_to_login{$sshlogin} or ::die_bug("time_to_login missing: ".$obj->serverlogin()); - $maxlen{$sshlogin} or ::die_bug("maxlen missing: ".$obj->serverlogin()); - if($opt::use_cpus_instead_of_cores) { - $obj->set_ncpus($ncpus{$sshlogin}); - } else { - $obj->set_ncpus($ncores{$sshlogin}); - } - $obj->set_time_to_login($time_to_login{$sshlogin}); - $obj->set_maxlength($maxlen{$sshlogin}); - $Global::minimal_command_line_length = - ::min($Global::minimal_command_line_length, - int($maxlen{$sshlogin}/2)); - ::debug("init", "Timing from -S:$sshlogin ncpus:",$ncpus{$sshlogin}, - " ncores:", $ncores{$sshlogin}, - " time_to_login:", $time_to_login{$sshlogin}, - " maxlen:", $maxlen{$sshlogin}, - " min_max_len:", $Global::minimal_command_line_length,"\n"); - } + return @out; } sub onall { @@ -2647,6 +2752,8 @@ sub __SIGNAL_HANDLING__ {} sub save_original_signal_handler { # Remember the original signal handler + # Uses: + # %Global::original_sig # Returns: N/A $SIG{TERM} ||= sub { exit 0; }; # $SIG{TERM} is not set on Mac OS X $SIG{INT} = sub { if($opt::tmux) { qx { tmux kill-session -t p$$ }; } @@ -2660,14 +2767,14 @@ sub save_original_signal_handler { sub list_running_jobs { # Returns: N/A for my $v (values %Global::running) { - print $Global::original_stderr "$Global::progname: ",$v->replaced(),"\n"; + ::status("$Global::progname: ",$v->replaced(),"\n"); } } sub start_no_new_jobs { # Returns: N/A $SIG{TERM} = $Global::original_sig{TERM}; - print $Global::original_stderr + ::status ("$Global::progname: SIGTERM received. No new jobs will be started.\n", "$Global::progname: Waiting for these ", scalar(keys %Global::running), " jobs to finish. Send SIGTERM again to stop now.\n"); @@ -2747,7 +2854,7 @@ sub process_failed_job { $Global::total_failed / $Global::total_started > $opt::halt_on_error)) { # If halt on error == 1 or --halt 10% # we should gracefully exit - print $Global::original_stderr + ::status ("$Global::progname: Starting no more jobs. ", "Waiting for ", scalar(keys %Global::running), " jobs to finish. This job failed:\n", @@ -2756,7 +2863,7 @@ sub process_failed_job { $Global::halt_on_error_exitstatus = $job->exitstatus(); } elsif($opt::halt_on_error == 2) { # If halt on error == 2 we should exit immediately - print $Global::original_stderr + ::status ("$Global::progname: This job failed:\n", $job->replaced(),"\n"); exit ($job->exitstatus()); @@ -2847,12 +2954,13 @@ sub usage { "", "See 'man $Global::progname' for details", "", + "Academic tradition requires you to cite works you base your article on.", "When using programs that use GNU Parallel to process data for publication please cite:", "", "O. Tange (2011): GNU Parallel - The Command-Line Power Tool,", ";login: The USENIX Magazine, February 2011:42-47.", "", - "Or you can get GNU Parallel without this requirement by paying 10000 EUR.", + "If you pay 10000 EUR you should feel free to use GNU Parallel without citing.\n", ""); } @@ -2871,38 +2979,42 @@ sub citation_notice { -e $ENV{'HOME'}."/.parallel/will-cite") { # skip } else { - print $Global::original_stderr - ("When using programs that use GNU Parallel to process data for publication please cite:\n", + ::status + ("Academic tradition requires you to cite works you base your article on.\n", + "When using programs that use GNU Parallel to process data for publication please cite:\n", "\n", " O. Tange (2011): GNU Parallel - The Command-Line Power Tool,\n", " ;login: The USENIX Magazine, February 2011:42-47.\n", "\n", "This helps funding further development; and it won't cost you a cent.\n", - "Or you can get GNU Parallel without this requirement by paying 10000 EUR.\n", + "If you pay 10000 EUR you should feel free to use GNU Parallel without citing.\n", "\n", "To silence this citation notice run 'parallel --bibtex' once or use '--no-notice'.\n\n", ); - flush $Global::original_stderr; } } +sub status { + my @w = @_; + my $fh = $Global::status_fd || *STDERR; + print $fh @w; + flush $fh; +} sub warning { my @w = @_; - my $fh = $Global::original_stderr || *STDERR; + my $fh = $Global::status_fd || *STDERR; my $prog = $Global::progname || "parallel"; print $fh $prog, ": Warning: ", @w; } - sub error { my @w = @_; - my $fh = $Global::original_stderr || *STDERR; + my $fh = $Global::status_fd || *STDERR; my $prog = $Global::progname || "parallel"; print $fh $prog, ": Error: ", @w; } - sub die_bug { my $bugid = shift; print STDERR @@ -2930,10 +3042,8 @@ sub version { "GNU $Global::progname comes with no warranty.", "", "Web site: http://www.gnu.org/software/${Global::progname}\n", - "When using programs that use GNU Parallel to process data for publication please cite:\n", - "O. Tange (2011): GNU Parallel - The Command-Line Power Tool, ", - ";login: The USENIX Magazine, February 2011:42-47.\n", - "Or you can get GNU Parallel without this requirement by paying 10000 EUR.\n", + "When using programs that use GNU Parallel to process data for publication", + "please cite as described in 'parallel --bibtex'.\n", ); } @@ -2943,6 +3053,7 @@ sub bibtex { print "WARNING: YOU ARE USING --tollef. IF THINGS ARE ACTING WEIRD USE --gnu.\n"; } print join("\n", + "Academic tradition requires you to cite works you base your article on.", "When using programs that use GNU Parallel to process data for publication please cite:", "", "\@article{Tange2011a,", @@ -2962,7 +3073,7 @@ sub bibtex { "", "This helps funding further development.", "", - "Or you can get GNU Parallel without this requirement by paying 10000 EUR.", + "If you pay 10000 EUR you should feel free to use GNU Parallel without citing.\n", "" ); while(not -e $ENV{'HOME'}."/.parallel/will-cite") { @@ -3182,6 +3293,9 @@ sub reap_usleep { if($opt::timeout) { $Global::timeoutq->process_timeouts(); } + if($opt::memfree) { + kill_youngster_if_not_enough_mem(); + } usleep($ms); Job::exit_if_disk_full(); if($opt::linebuffer) { @@ -3222,6 +3336,33 @@ sub now { return (int(TimeHiRestime()*1000))/1000; } +sub kill_youngster_if_not_enough_mem { + # Check each $sshlogin if there is enough mem. + # If less than 50% enough free mem: kill off the youngest child + # Put the child back in the queue. + my %jobs_of; + use Tie::RefHash; + tie %jobs_of, 'Tie::RefHash'; + + for my $job (values %Global::running) { + push @{$jobs_of{$job->sshlogin()}}, $job; + } + for my $sshlogin (keys %jobs_of) { + for my $job (sort { $b->seq() <=> $a->seq() } @{$jobs_of{$sshlogin}}) { + if($sshlogin->memfree() < $opt::memfree * 0.5) { + ::debug("mem","\n",map { $_->seq()." " } (sort { $b->seq() <=> $a->seq() } @{$jobs_of{$sshlogin}})); + ::debug("mem","\n", $job->seq(), "killed ", + $sshlogin->memfree()," < ",$opt::memfree * 0.5); + $job->kill(); + $sshlogin->memfree_recompute(); + } else { + last; + } + } + ::debug("mem","Free mem OK ", $sshlogin->memfree()," > ",$opt::memfree * 0.5); + } +} + sub multiply_binary_prefix { # Evalualte numbers with binary prefix # Ki=2^10, Mi=2^20, Gi=2^30, Ti=2^40, Pi=2^50, Ei=2^70, Zi=2^80, Yi=2^80 @@ -3344,7 +3485,7 @@ sub my_dump { if ($@) { my $err = "Neither Data::Dump nor Data::Dumper is installed\n". "Not dumping output\n"; - print $Global::original_stderr $err; + ::status($err); return $err; } else { return Dumper(@dump_this); @@ -3503,6 +3644,83 @@ sub set_max_jobs_running { $self->{'orig_max_jobs_running'} ||= $self->{'max_jobs_running'}; } +sub memfree { + # Returns: + # $memfree in bytes + my $self = shift; + if(not $self->{'memfree'}) { + $self->memfree_recompute(); + } +# if(time - $self->{'last_memfree'} >= 1) { + # More than 10 seconds old: Recompute + $self->{'last_memfree'} = time; + $self->memfree_recompute(); +# } + return (not defined $self->{'memfree'} or $self->{'memfree'}) +} + +sub memfree_recompute { + my $self = shift; + my $script = memfreescript(); + + # TODO add sshlogin and backgrounding + $self->{'memfree'} = `$script`; + #::debug("mem","New free:",$self->{'memfree'}," "); +} + +{ + my $script; + + sub memfreescript { + # Returns: + # shellscript for giving available memory in bytes + if(not $script) { + my %script_of = ( + # $ free + # total used free shared buffers cached + # Mem: 8075152 4922780 3152372 338856 233356 1658604 + # -/+ buffers/cache: 3030820 5044332 + # Swap: 8286204 116924 8169280 + "linux" => q{ print (1024*((grep /buffers.cache/, `free`)[0] =~ /buffers.cache:\s+\S+\s+(\S+)/)[0]) }, + # $ vmstat 1 1 + # procs memory page faults cpu + # r b w avm free re at pi po fr de sr in sy cs us sy id + # 1 0 0 242793 389737 5 1 0 0 0 0 0 107 978 60 1 1 99 + "hpux" => q{ print (((reverse `vmstat 1 1`)[0] =~ /(?:\d+\D+){4}(\d+)/)[0]*1024) }, + # $ vmstat 1 2 + # kthr memory page disk faults cpu + # r b w swap free re mf pi po fr de sr s3 s4 -- -- in sy cs us sy id + # 0 0 0 6496720 5170320 68 260 8 2 1 0 0 -0 3 0 0 309 1371 255 1 2 97 + # 0 0 0 6434088 5072656 7 15 8 0 0 0 0 0 261 0 0 1889 1899 3222 0 8 92 + # + # The last free is really free + "solaris" => q{ print (((reverse `vmstat 1 2`)[0] =~ /(?:\d+\D+){4}(\d+)/)[0]*1024) }, + "freebsd" => q{ + for(qx{/sbin/sysctl -a}) { + if (/^([^:]+):\s+(.+)\s*$/s) { + $sysctl->{$1} = $2; + } + } + print $sysctl->{"hw.pagesize"} * + ($sysctl->{"vm.stats.vm.v_cache_count"} + + $sysctl->{"vm.stats.vm.v_inactive_count"} + + $sysctl->{"vm.stats.vm.v_free_count"}); + }, + ); + my $perlscript = ""; + # Make a perl script that detects the OS ($^O) and runs + # the appropriate command + for my $os (keys %script_of) { + $perlscript .= 'if($^O eq "'.$os.'") { '.$script_of{$os}.'}'; + } + $perlscript =~ s/[\t\n]/ /g; + $perlscript = "perl -e " . ::shell_quote_scalar($perlscript); + $script = $Global::envvar. " " .$perlscript; + } + return $script + } +} + sub swapping { my $self = shift; my $swapping = $self->swap_activity(); @@ -3675,9 +3893,9 @@ sub swap_activity { #-svr5 (scosysv) ); my $perlscript = ""; + # Make a perl script that detects the OS ($^O) and runs + # the appropriate vmstat command for my $os (keys %vmstat) { - #q[ { vmstat 1 2 2> /dev/null || vmstat -c 1 2; } | ]. - # q[ awk 'NR!=4{next} NF==17||NF==16{print $7*$8} NF==22{print $21*$22} {exit}' ]; $vmstat{$os}[1] =~ s/\$/\\\\\\\$/g; # $ => \\\$ $perlscript .= 'if($^O eq "'.$os.'") { print `'.$vmstat{$os}[0].' | awk "{print ' . $vmstat{$os}[1] . '}"` }'; @@ -3907,11 +4125,11 @@ sub compute_max_loadavg { close $in_fh; $load = $self->compute_max_loadavg($opt_load_file); } else { - print $Global::original_stderr "Cannot open $loadspec\n"; + ::error("Cannot open $loadspec\n"); ::wait_and_exit(255); } } else { - print $Global::original_stderr "Parsing of --load failed\n"; + ::error("Parsing of --load failed\n"); ::die_usage(); } if($load < 0.01) { @@ -3962,173 +4180,195 @@ sub compute_number_of_processes { return $system_limit; } -sub processes_available_by_system_limit { - # If the wanted number of processes is bigger than the system limits: - # Limit them to the system limits - # Limits are: File handles, number of input lines, processes, - # and taking > 1 second to spawn 10 extra processes - # Returns: - # Number of processes - my $self = shift; - my $wanted_processes = shift; - - my $system_limit = 0; - my @jobs = (); - my $job; - my @args = (); - my $arg; - my $more_filehandles = 1; - my $max_system_proc_reached = 0; - my $slow_spawining_warning_printed = 0; - my $time = time; - my %fh; +{ my @children; + my $max_system_proc_reached; + my $more_filehandles; + my %fh; + my $tmpfhname; + my $count_jobs_already_read; + my @jobs; + my $job; + my @args; + my $arg; - # Reserve filehandles - # perl uses 7 filehandles for something? - # parallel uses 1 for memory_usage - # parallel uses 4 for ? - for my $i (1..12) { - open($fh{"init-$i"}, "<", "/dev/null"); + sub reserve_filehandles { + # Reserves filehandle + my $n = shift; + for (1..$n) { + $more_filehandles &&= open($fh{$tmpfhname++}, "<", "/dev/null"); + } } - for(1..2) { - # System process limit - my $child; - if($child = fork()) { - push (@children,$child); - $Global::unkilled_children{$child} = 1; - } elsif(defined $child) { - # The child takes one process slot - # It will be killed later - $SIG{TERM} = $Global::original_sig{TERM}; - sleep 10000000; - exit(0); - } else { + sub reserve_process { + # Spawn a dummy process + my $child; + if($child = fork()) { + push @children, $child; + $Global::unkilled_children{$child} = 1; + } elsif(defined $child) { + # This is the child + # The child takes one process slot + # It will be killed later + $SIG{TERM} = $Global::original_sig{TERM}; + sleep 10000000; + exit(0); + } else { + # Failed to spawn $max_system_proc_reached = 1; - } + } } - my $count_jobs_already_read = $Global::JobQueue->next_seq(); - my $wait_time_for_getting_args = 0; - my $start_time = time; - while(1) { - $system_limit >= $wanted_processes and last; - not $more_filehandles and last; - $max_system_proc_reached and last; - my $before_getting_arg = time; - if($Global::semaphore or $opt::pipe) { + + sub get_args_or_jobs { + # Get an arg or a job (depending on mode) + if($Global::semaphore or $opt::pipe) { # Skip: No need to get args - } elsif(defined $opt::retries and $count_jobs_already_read) { - # For retries we may need to run all jobs on this sshlogin - # so include the already read jobs for this sshlogin - $count_jobs_already_read--; - } else { - if($opt::X or $opt::m) { - # The arguments may have to be re-spread over several jobslots - # So pessimistically only read one arg per jobslot - # instead of a full commandline - if($Global::JobQueue->{'commandlinequeue'}->{'arg_queue'}->empty()) { + return 1; + } elsif(defined $opt::retries and $count_jobs_already_read) { + # For retries we may need to run all jobs on this sshlogin + # so include the already read jobs for this sshlogin + $count_jobs_already_read--; + return 1; + } else { + if($opt::X or $opt::m) { + # The arguments may have to be re-spread over several jobslots + # So pessimistically only read one arg per jobslot + # instead of a full commandline + if($Global::JobQueue->{'commandlinequeue'}->{'arg_queue'}->empty()) { if($Global::JobQueue->empty()) { - last; + return 0; } else { - ($job) = $Global::JobQueue->get(); + $job = $Global::JobQueue->get(); push(@jobs, $job); + return 1; } } else { - ($arg) = $Global::JobQueue->{'commandlinequeue'}->{'arg_queue'}->get(); + $arg = $Global::JobQueue->{'commandlinequeue'}->{'arg_queue'}->get(); push(@args, $arg); + return 1; + } + } else { + # If there are no more command lines, then we have a process + # per command line, so no need to go further + if($Global::JobQueue->empty()) { + return 0; + } else { + $job = $Global::JobQueue->get(); + push(@jobs, $job); + return 1; } - } else { - # If there are no more command lines, then we have a process - # per command line, so no need to go further - $Global::JobQueue->empty() and last; - ($job) = $Global::JobQueue->get(); - push(@jobs, $job); } - } - $wait_time_for_getting_args += time - $before_getting_arg; - $system_limit++; + } + } - # Every simultaneous process uses 2 filehandles when grouping - # Every simultaneous process uses 2 filehandles when compressing - $more_filehandles = open($fh{$system_limit*10}, "<", "/dev/null") - && open($fh{$system_limit*10+2}, "<", "/dev/null") - && open($fh{$system_limit*10+3}, "<", "/dev/null") - && open($fh{$system_limit*10+4}, "<", "/dev/null"); + sub cleanup { + # Cleanup: Close the files + for (values %fh) { close $_ } + # Cleanup: Kill the children + for my $pid (@children) { + kill 9, $pid; + waitpid($pid,0); + delete $Global::unkilled_children{$pid}; + } + # Cleanup: Unget the command_lines or the @args + $Global::JobQueue->{'commandlinequeue'}->{'arg_queue'}->unget(@args); + $Global::JobQueue->unget(@jobs); + } - # System process limit - my $child; - if($child = fork()) { - push (@children,$child); - $Global::unkilled_children{$child} = 1; - } elsif(defined $child) { - # The child takes one process slot - # It will be killed later - $SIG{TERM} = $Global::original_sig{TERM}; - sleep 10000000; - exit(0); - } else { - $max_system_proc_reached = 1; - } - my $forktime = time - $time - $wait_time_for_getting_args; - ::debug("run", "Time to fork $system_limit procs: $wait_time_for_getting_args ", - $forktime, - " (processes so far: ", $system_limit,")\n"); - if($system_limit > 10 and - $forktime > 1 and - $forktime > $system_limit * 0.01 - and not $slow_spawining_warning_printed) { - # It took more than 0.01 second to fork a processes on avg. - # Give the user a warning. He can press Ctrl-C if this - # sucks. - print $Global::original_stderr - ("parallel: Warning: Starting $system_limit processes took > $forktime sec.\n", - "Consider adjusting -j. Press CTRL-C to stop.\n"); - $slow_spawining_warning_printed = 1; - } - } - # Cleanup: Close the files - for (values %fh) { close $_ } - # Cleanup: Kill the children - for my $pid (@children) { - kill 9, $pid; - waitpid($pid,0); - delete $Global::unkilled_children{$pid}; - } - # Cleanup: Unget the command_lines or the @args - $Global::JobQueue->{'commandlinequeue'}->{'arg_queue'}->unget(@args); - $Global::JobQueue->unget(@jobs); - if($system_limit < $wanted_processes) { - # The system_limit is less than the wanted_processes - if($system_limit < 1 and not $Global::JobQueue->empty()) { - ::warning("Cannot spawn any jobs. Raising ulimit -u or /etc/security/limits.conf\n", - "or /proc/sys/kernel/pid_max may help.\n"); - ::wait_and_exit(255); + sub processes_available_by_system_limit { + # If the wanted number of processes is bigger than the system limits: + # Limit them to the system limits + # Limits are: File handles, number of input lines, processes, + # and taking > 1 second to spawn 10 extra processes + # Returns: + # Number of processes + my $self = shift; + my $wanted_processes = shift; + my $system_limit = 0; + my $slow_spawining_warning_printed = 0; + my $time = time; + $more_filehandles = 1; + $tmpfhname = "TmpFhNamE"; + + # perl uses 7 filehandles for something? + # parallel uses 1 for memory_usage + # parallel uses 4 for ? + reserve_filehandles(12); + # Two processes for load avg and ? + reserve_process(); + reserve_process(); + + # For --retries count also jobs already run + $count_jobs_already_read = $Global::JobQueue->next_seq(); + my $wait_time_for_getting_args = 0; + my $start_time = time; + while(1) { + $system_limit >= $wanted_processes and last; + not $more_filehandles and last; + $max_system_proc_reached and last; + + my $before_getting_arg = time; + get_args_or_jobs() or last; + $wait_time_for_getting_args += time - $before_getting_arg; + $system_limit++; + + # Every simultaneous process uses 2 filehandles to write to + # and 2 filehandles to read from + reserve_filehandles(4); + + # System process limit + reserve_process(); + + my $forktime = time - $time - $wait_time_for_getting_args; + ::debug("run", "Time to fork $system_limit procs: $wait_time_for_getting_args ", + $forktime, + " (processes so far: ", $system_limit,")\n"); + if($system_limit > 10 and + $forktime > 1 and + $forktime > $system_limit * 0.01 + and not $slow_spawining_warning_printed) { + # It took more than 0.01 second to fork a processes on avg. + # Give the user a warning. He can press Ctrl-C if this + # sucks. + ::warning("Starting $system_limit processes took > $forktime sec.\n", + "Consider adjusting -j. Press CTRL-C to stop.\n"); + $slow_spawining_warning_printed = 1; + } } - if(not $more_filehandles) { - ::warning("Only enough file handles to run ", $system_limit, " jobs in parallel.\n", - "Running 'parallel -j0 -N", $system_limit, " --pipe parallel -j0' or ", - "raising ulimit -n or /etc/security/limits.conf may help.\n"); + cleanup(); + + if($system_limit < $wanted_processes) { + # The system_limit is less than the wanted_processes + if($system_limit < 1 and not $Global::JobQueue->empty()) { + ::warning("Cannot spawn any jobs. Raising ulimit -u or /etc/security/limits.conf\n", + "or /proc/sys/kernel/pid_max may help.\n"); + ::wait_and_exit(255); + } + if(not $more_filehandles) { + ::warning("Only enough file handles to run ", $system_limit, " jobs in parallel.\n", + "Running 'parallel -j0 -N", $system_limit, " --pipe parallel -j0' or ", + "raising ulimit -n or /etc/security/limits.conf may help.\n"); + } + if($max_system_proc_reached) { + ::warning("Only enough available processes to run ", $system_limit, + " jobs in parallel. Raising ulimit -u or /etc/security/limits.conf\n", + "or /proc/sys/kernel/pid_max may help.\n"); + } } - if($max_system_proc_reached) { - ::warning("Only enough available processes to run ", $system_limit, - " jobs in parallel. Raising ulimit -u or /etc/security/limits.conf\n", - "or /proc/sys/kernel/pid_max may help.\n"); + if($] == 5.008008 and $system_limit > 1000) { + # https://savannah.gnu.org/bugs/?36942 + $system_limit = 1000; } + if($Global::JobQueue->empty()) { + $system_limit ||= 1; + } + if($self->string() ne ":" and + $system_limit > $Global::default_simultaneous_sshlogins) { + $system_limit = + $self->simultaneous_sshlogin_limit($system_limit); + } + return $system_limit; } - if($] == 5.008008 and $system_limit > 1000) { - # https://savannah.gnu.org/bugs/?36942 - $system_limit = 1000; - } - if($Global::JobQueue->empty()) { - $system_limit ||= 1; - } - if($self->string() ne ":" and - $system_limit > $Global::default_simultaneous_sshlogins) { - $system_limit = - $self->simultaneous_sshlogin_limit($system_limit); - } - return $system_limit; } sub simultaneous_sshlogin_limit { @@ -5132,44 +5372,58 @@ sub openoutputfiles { $self->set_fh(1,'name',$outname); $self->set_fh(2,'name',$errname); if($opt::compress) { - # Send stdout to stdin for $opt::compress_program(1) - # Send stderr to stdin for $opt::compress_program(2) - # cattail get pid: $pid = $self->fh($fdno,'rpid'); - my $cattail = cattail(); - for my $fdno (1,2) { - my $wpid = open(my $fdw,"|-","$opt::compress_program >>". - $self->fh($fdno,'name')) || die $?; - $self->set_fh($fdno,'w',$fdw); - $self->set_fh($fdno,'wpid',$wpid); - my $rpid = open(my $fdr, "-|", "perl", "-e", $cattail, - $opt::decompress_program, $wpid, - $self->fh($fdno,'name'),$self->fh($fdno,'unlink')) || die $?; - $self->set_fh($fdno,'r',$fdr); - $self->set_fh($fdno,'rpid',$rpid); - } + $self->filter_through_compress(); } elsif(not $opt::ungroup) { - # Set reading FD if using --group (--ungroup does not need) - for my $fdno (1,2) { - # Re-open the file for reading - # so fdw can be closed seperately - # and fdr can be seeked seperately (for --line-buffer) - open(my $fdr,"<", $self->fh($fdno,'name')) || - ::die_bug("fdr: Cannot open ".$self->fh($fdno,'name')); - $self->set_fh($fdno,'r',$fdr); - # Unlink if required - $Global::debug or unlink $self->fh($fdno,"unlink"); - } + $self->grouped(); } if($opt::linebuffer) { - # Set non-blocking when using --linebuffer - $Global::use{"Fcntl"} ||= eval "use Fcntl qw(:DEFAULT :flock); 1;"; - for my $fdno (1,2) { - my $fdr = $self->fh($fdno,'r'); - my $flags; - fcntl($fdr, &F_GETFL, $flags) || die $!; # Get the current flags on the filehandle - $flags |= &O_NONBLOCK; # Add non-blocking to the flags - fcntl($fdr, &F_SETFL, $flags) || die $!; # Set the flags on the filehandle - } + $self->set_non_blocking(); + } +} + +sub grouped { + my $self = shift; + # Set reading FD if using --group (--ungroup does not need) + for my $fdno (1,2) { + # Re-open the file for reading + # so fdw can be closed seperately + # and fdr can be seeked seperately (for --line-buffer) + open(my $fdr,"<", $self->fh($fdno,'name')) || + ::die_bug("fdr: Cannot open ".$self->fh($fdno,'name')); + $self->set_fh($fdno,'r',$fdr); + # Unlink if required + $Global::debug or unlink $self->fh($fdno,"unlink"); + } +} + +sub filter_through_compress { + my $self = shift; + # Send stdout to stdin for $opt::compress_program(1) + # Send stderr to stdin for $opt::compress_program(2) + # cattail get pid: $pid = $self->fh($fdno,'rpid'); + my $cattail = cattail(); + for my $fdno (1,2) { + my $wpid = open(my $fdw,"|-","$opt::compress_program >>". + $self->fh($fdno,'name')) || die $?; + $self->set_fh($fdno,'w',$fdw); + $self->set_fh($fdno,'wpid',$wpid); + my $rpid = open(my $fdr, "-|", "perl", "-e", $cattail, + $opt::decompress_program, $wpid, + $self->fh($fdno,'name'),$self->fh($fdno,'unlink')) || die $?; + $self->set_fh($fdno,'r',$fdr); + $self->set_fh($fdno,'rpid',$rpid); + } +} + +sub set_non_blocking { + my $self = shift; + $Global::use{"Fcntl"} ||= eval "use Fcntl qw(:DEFAULT :flock); 1;"; + for my $fdno (1,2) { + my $fdr = $self->fh($fdno,'r'); + my $flags; + fcntl($fdr, &F_GETFL, $flags) || die $!; # Get the current flags on the filehandle + $flags |= &O_NONBLOCK; # Add non-blocking to the flags + fcntl($fdr, &F_SETFL, $flags) || die $!; # Set the flags on the filehandle } } @@ -5905,20 +6159,8 @@ sub start { my $command = $job->wrapped(); if($Global::interactive or $Global::stderr_verbose) { - if($Global::interactive) { - print $Global::original_stderr "$command ?..."; - open(my $tty_fh, "<", "/dev/tty") || ::die_bug("interactive-tty"); - my $answer = <$tty_fh>; - close $tty_fh; - my $run_yes = ($answer =~ /^\s*y/i); - if (not $run_yes) { - $command = "true"; # Run the command 'true' - } - } else { - print $Global::original_stderr "$command\n"; - } + $command = interactive_start($command); } - my $pid; $job->openoutputfiles(); my($stdout_fh,$stderr_fh) = ($job->fh(1,"w"),$job->fh(2,"w")); @@ -5926,13 +6168,8 @@ sub start { open OUT, '>&', $stdout_fh or ::die_bug("Can't redirect STDOUT: $!"); open ERR, '>&', $stderr_fh or ::die_bug("Can't dup STDOUT: $!"); - if(($opt::dryrun or $Global::verbose) and $opt::ungroup) { - if($Global::verbose <= 1) { - print $stdout_fh $job->replaced(),"\n"; - } else { - # Verbose level > 1: Print the rsync and stuff - print $stdout_fh $command,"\n"; - } + if($opt::ungroup) { + print_dryrun_and_verbose($stdout_fh,$job,$command); } if($opt::dryrun) { $command = "true"; @@ -6004,6 +6241,39 @@ sub start { } } +sub interactive_start { + my $command = shift; + if($Global::interactive) { + ::status("$command ?..."); + open(my $tty_fh, "<", "/dev/tty") || ::die_bug("interactive-tty"); + my $answer = <$tty_fh>; + close $tty_fh; + my $run_yes = ($answer =~ /^\s*y/i); + if (not $run_yes) { + $command = "true"; # Run the command 'true' + } + } else { + print $Global::original_stderr "$command\n"; + } + return $command; +} + +sub print_dryrun_and_verbose { + # For $opt::ungroup we print these ASAP + # For $opt::group they are part of print() + my $stdout_fh = shift; + my $job = shift; + my $command = shift; + if($opt::dryrun or $Global::verbose) { + if($Global::verbose <= 1) { + print $stdout_fh $job->replaced(),"\n"; + } else { + # Verbose level > 1: Print the rsync and stuff + print $stdout_fh $command,"\n"; + } + } +} + sub tmux_wrap { # Wrap command with tmux for session pPID # Input: @@ -6025,7 +6295,7 @@ sub tmux_wrap { if($Global::total_running == 0) { $tmux = "tmux new-session -s p$$ -d -n ". ::shell_quote_scalar($title); - print $Global::original_stderr "See output with: tmux attach -t p$$\n"; + ::status("See output with: tmux attach -t p$$\n"); } else { $tmux = "tmux new-window -t p$$ -n ".::shell_quote_scalar($title); } @@ -6084,6 +6354,7 @@ sub should_be_retried { } else { # This command should be retried $self->set_endtime(undef); + $self->reset_exitstatus(); $Global::JobQueue->unget($self); ::debug("run", "Retry ", $self->seq(), "\n"); return 1; @@ -6144,65 +6415,39 @@ sub print { } ::debug("print", "File descriptor $fdno (", $self->fh($fdno,"name"), "):"); if($opt::files) { - # If --compress: $in_fh must be closed first. - close $self->fh($fdno,"w"); - close $in_fh; - if($opt::pipe and $self->virgin()) { - # Nothing was printed to this job: - # cleanup unused tmp files if --files was set - for my $fdno (1,2) { - unlink $self->fh($fdno,"name"); - unlink $self->fh($fdno,"unlink"); - } - } elsif($fdno == 1 and $self->fh($fdno,"name")) { - print $out_fd $self->fh($fdno,"name"),"\n"; - } + $self->files_print($fdno,$in_fh,$out_fd); } elsif($opt::linebuffer) { # Line buffered print out $self->linebuffer_print($fdno,$in_fh,$out_fd); + } elsif($opt::tag or defined $opt::tagstring) { + $self->tag_print($fdno,$in_fh,$out_fd); } else { - my $buf; - close $self->fh($fdno,"w"); - seek $in_fh, 0, 0; - # $in_fh is now ready for reading at position 0 - if($opt::tag or defined $opt::tagstring) { - my $tag = $self->tag(); - if($fdno == 2) { - # OpenSSH_3.6.1p2 gives 'tcgetattr: Invalid argument' with -tt - # This is a crappy way of ignoring it. - while(<$in_fh>) { - if(/^(client_process_control: )?tcgetattr: Invalid argument\n/) { - # Skip - } else { - print $out_fd $tag,$_; - } - # At most run the loop once - last; - } - } - while(<$in_fh>) { - print $out_fd $tag,$_; - } - } else { - my $buf; - if($fdno == 2) { - # OpenSSH_3.6.1p2 gives 'tcgetattr: Invalid argument' with -tt - # This is a crappy way of ignoring it. - sysread($in_fh,$buf,1_000); - $buf =~ s/^(client_process_control: )?tcgetattr: Invalid argument\n//; - print $out_fd $buf; - } - while(sysread($in_fh,$buf,32768)) { - print $out_fd $buf; - } - } - close $in_fh; + $self->normal_print($fdno,$in_fh,$out_fd); } flush $out_fd; } ::debug("print", "<fh($fdno,"w"); + close $in_fh; + if($opt::pipe and $self->virgin()) { + # Nothing was printed to this job: + # cleanup unused tmp files if --files was set + for my $fdno (1,2) { + unlink $self->fh($fdno,"name"); + unlink $self->fh($fdno,"unlink"); + } + } elsif($fdno == 1 and $self->fh($fdno,"name")) { + print $out_fd $self->fh($fdno,"name"),"\n"; + } +} + sub linebuffer_print { my $self = shift; my ($fdno,$in_fh,$out_fd) = @_; @@ -6277,6 +6522,53 @@ sub linebuffer_print { } } +sub tag_print { + my $self = shift; + my ($fdno,$in_fh,$out_fd) = @_; + my $buf; + close $self->fh($fdno,"w"); + seek $in_fh, 0, 0; + # $in_fh is now ready for reading at position 0 + my $tag = $self->tag(); + if($fdno == 2) { + # OpenSSH_3.6.1p2 gives 'tcgetattr: Invalid argument' with -tt + # This is a crappy way of ignoring it. + while(<$in_fh>) { + if(/^(client_process_control: )?tcgetattr: Invalid argument\n/) { + # Skip + } else { + print $out_fd $tag,$_; + } + # At most run the loop once + last; + } + } + while(<$in_fh>) { + print $out_fd $tag,$_; + } + close $in_fh; +} + +sub normal_print { + my $self = shift; + my ($fdno,$in_fh,$out_fd) = @_; + my $buf; + close $self->fh($fdno,"w"); + seek $in_fh, 0, 0; + # $in_fh is now ready for reading at position 0 + if($fdno == 2) { + # OpenSSH_3.6.1p2 gives 'tcgetattr: Invalid argument' with -tt + # This is a crappy way of ignoring it. + sysread($in_fh,$buf,1_000); + $buf =~ s/^(client_process_control: )?tcgetattr: Invalid argument\n//; + print $out_fd $buf; + } + while(sysread($in_fh,$buf,32768)) { + print $out_fd $buf; + } + close $in_fh; +} + sub print_joblog { my $self = shift; my $cmd; @@ -6331,6 +6623,11 @@ sub set_exitstatus { } } +sub reset_exitstatus { + my $self = shift; + $self->{'exitstatus'} = undef; +} + sub exitsignal { my $self = shift; return $self->{'exitsignal'}; @@ -6759,155 +7056,164 @@ sub replaced { if(not defined $self->{'replaced'}) { # Don't quote arguments if the input is the full command line my $quote_arg = $Global::noquote ? 0 : not $Global::quoting; - $self->{'replaced'} = $self->replace_placeholders($self->{'command'},$Global::quoting,$quote_arg); + $self->{'replaced'} = $self-> + replace_placeholders($self->{'command'},$Global::quoting, + $quote_arg); my $len = length $self->{'replaced'}; if ($len != $self->len()) { - ::debug("length", $len, " != ", $self->len(), " ", $self->{'replaced'}, "\n"); + ::debug("length", $len, " != ", $self->len(), + " ", $self->{'replaced'}, "\n"); } else { - ::debug("length", $len, " == ", $self->len(), " ", $self->{'replaced'}, "\n"); + ::debug("length", $len, " == ", $self->len(), + " ", $self->{'replaced'}, "\n"); } } return $self->{'replaced'}; } -sub replace_placeholders { - # Replace foo{}bar with fooargbar - # Input: - # $targetref = command as shell words - # $quote = should everything be quoted? - # $quote_arg = should replaced arguments be quoted? - # Returns: - # @target with placeholders replaced - my $self = shift; - my $targetref = shift; - my $quote = shift; - my $quote_arg = shift; - my $context_replace = $self->{'context_replace'}; - my @target = @$targetref; - ::debug("replace", "Replace @target\n"); - # -X = context replace - # maybe multiple input sources - # maybe --xapply - if(not @target) { - # @target is empty: Return empty array - return @target; - } - # Fish out the words that have replacement strings in them - my %word; - for (@target) { - my $tt = $_; - ::debug("replace", "Target: $tt"); - # a{1}b{}c{}d - # a{=1 $_=$_ =}b{= $_=$_ =}c{= $_=$_ =}d - # a\257<1 $_=$_ \257>b\257< $_=$_ \257>c\257< $_=$_ \257>d - # A B C => aAbA B CcA B Cd - # -X A B C => aAbAcAd aAbBcBd aAbCcCd +{ + my @target; + my $context_replace; + my @arg; + my $perl_expressions_as_re; - if($context_replace) { - while($tt =~ s/([^\s\257]* # before {= + sub fish_out_words_containing_replacement_strings { + my %word; + for (@target) { + my $tt = $_; + ::debug("replace", "Target: $tt"); + # Command line template: + # a{1}b{}c{}d + # becomes: + # a{=1 $_=$_ =}b{= $_=$_ =}c{= $_=$_ =}d + # becomes: + # a\257<1 $_=$_ \257>b\257< $_=$_ \257>c\257< $_=$_ \257>d + # Input A B C (no context) becomes: + # A B C => aAbA B CcA B Cd + # Input A B C (context -X) becomes: + # A B C => aAbAcAd aAbBcBd aAbCcCd + if($context_replace) { + while($tt =~ s/([^\s\257]* # before {= (?: \257< # {= [^\257]*? # The perl expression \257> # =} [^\s\257]* # after =} )+)/ /x) { - # $1 = pre \257 perlexpr \257 post - $word{"$1"} ||= 1; - } - } else { - while($tt =~ s/( (?: \257<([^\257]*?)\257>) )//x) { - # $f = \257 perlexpr \257 - $word{$1} ||= 1; - } - } - } - my @word = keys %word; - - my %replace; - my @arg; - for my $record (@{$self->{'arg_list'}}) { - # $self->{'arg_list'} = [ [Arg11, Arg12], [Arg21, Arg22], [Arg31, Arg32] ] - # Merge arg-objects from records into @arg for easy access - CORE::push @arg, @$record; - } - # Add one arg if empty to allow {#} and {%} to be computed only once - if(not @arg) { @arg = (Arg->new("")); } - # Number of arguments - used for positional arguments - my $n = $#_+1; - - # This is actually a CommandLine-object, - # but it looks nice to be able to say {= $job->slot() =} - my $job = $self; - for my $word (@word) { - # word = AB \257< perlexpr \257> CD \257< perlexpr \257> EF - my $w = $word; - ::debug("replace", "Replacing in $w\n"); - - # Replace positional arguments - $w =~ s< ([^\s\257]*) # before {= - \257< # {= - (-?\d+) # Position (eg. -2 or 3) - ([^\257]*?) # The perl expression - \257> # =} - ([^\s\257]*) # after =} - > - { $1. # Context (pre) - ( - $arg[$2 > 0 ? $2-1 : $n+$2] ? # If defined: replace - $arg[$2 > 0 ? $2-1 : $n+$2]->replace($3,$quote_arg,$self) - : "") - .$4 }egx;# Context (post) - ::debug("replace", "Positional replaced $word with: $w\n"); - - if($w !~ /\257/) { - # No more replacement strings in $w: No need to do more - if($quote) { - CORE::push(@{$replace{::shell_quote($word)}}, $w); + # $1 = pre \257 perlexpr \257 post + $word{"$1"} ||= 1; + } } else { - CORE::push(@{$replace{$word}}, $w); - } - next; - } - # for each arg: - # compute replacement for each string - # replace replacement strings with replacement in the word value - # push to replace word value - ::debug("replace", "Positional done: $w\n"); - for my $arg (@arg) { - my $val = $w; - my $number_of_replacements = 0; - for my $perlexpr (keys %{$self->{'replacecount'}}) { - # Replace {= perl expr =} with value for each arg - $number_of_replacements += - $val =~ s{\257<\Q$perlexpr\E\257>} - {$arg ? $arg->replace($perlexpr,$quote_arg,$self) : ""}eg; - } - my $ww = $word; - if($quote) { - $ww = ::shell_quote_scalar($word); - $val = ::shell_quote_scalar($val); - } - if($number_of_replacements) { - CORE::push(@{$replace{$ww}}, $val); + while($tt =~ s/( (?: \257<([^\257]*?)\257>) )//x) { + # $f = \257 perlexpr \257 + $word{$1} ||= 1; + } } } + return keys %word; } - if($quote) { - @target = ::shell_quote(@target); - } - # ::debug("replace", "%replace=",::my_dump(%replace),"\n"); - if(%replace) { - # Substitute the replace strings with the replacement values - # Must be sorted by length if a short word is a substring of a long word - my $regexp = join('|', map { my $s = $_; $s =~ s/(\W)/\\$1/g; $s } - sort { length $b <=> length $a } keys %replace); - for(@target) { - s/($regexp)/join(" ",@{$replace{$1}})/ge; + sub flatten_arg_list { + my $arglist_ref = shift; + @arg = (); + for my $record (@$arglist_ref) { + # $self->{'arg_list'} = [ [Arg11, Arg12], [Arg21, Arg22], [Arg31, Arg32] ] + # Merge arg-objects from records into @arg for easy access + CORE::push @arg, @$record; } + # Add one arg if empty to allow {#} and {%} to be computed only once + if(not @arg) { @arg = (Arg->new("")); } + } + + sub replace_placeholders { + # Replace foo{}bar with fooargbar + # Input: + # $targetref = command as shell words + # $quote = should everything be quoted? + # $quote_arg = should replaced arguments be quoted? + # Returns: + # @target with placeholders replaced + my $self = shift; + my $targetref = shift; + my $quote = shift; + my $quote_arg = shift; + my %replace; + $context_replace = $self->{'context_replace'}; + @target = @$targetref; + ::debug("replace", "Replace @target\n"); + # -X = context replace + # maybe multiple input sources + # maybe --xapply + if(not @target) { + # @target is empty: Return empty array + return @target; + } + # Fish out the words that have replacement strings in them + my @word = fish_out_words_containing_replacement_strings(); + flatten_arg_list($self->{'arg_list'}); + + # Number of arguments - used for positional arguments + my $n = $#arg+1; + + # This is actually a CommandLine-object, + # but it looks nice to be able to say {= $job->slot() =} + my $job = $self; + + for my $word (@word) { + # word = AB \257< perlexpr \257> CD \257< perlexpr \257> EF + ::debug("replace", "Replacing in $word\n"); + my $normal_replace; + + # for each arg: + # replace replacement strings with replacement in the word value + # push to replace word value + $perl_expressions_as_re ||= + join("|", map {s/^-?\d+//; "\Q$_\E"} keys %{$self->{'replacecount'}}); + for my $arg (@arg) { + my $val = $word; + # Replace {= perl expr =} with value for each arg + $val =~ s{\257<(-?\d+)?($perl_expressions_as_re)\257>} + { + if($1) { + # Positional replace + # Find the relevant arg and replace it + ($arg[$1 > 0 ? $1-1 : $n+$1] ? # If defined: replace + $arg[$1 > 0 ? $1-1 : $n+$1]-> + replace($2,$quote_arg,$self) + : ""); + } else { + # Normal replace + $normal_replace ||= 1; + ($arg ? $arg->replace($2,$quote_arg,$self) : ""); + } + }goxe; + if($quote) { + CORE::push(@{$replace{::shell_quote_scalar($word)}}, + ::shell_quote_scalar($val)); + } else { + CORE::push(@{$replace{$word}}, $val); + } + # No normal replacements => only run once + $normal_replace or last; + } + } + + if($quote) { + @target = ::shell_quote(@target); + } + # ::debug("replace", "%replace=",::my_dump(%replace),"\n"); + if(%replace) { + # Substitute the replace strings with the replacement values + # Must be sorted by length if a short word is a substring of a long word + my $regexp = join('|', map { my $s = $_; $s =~ s/(\W)/\\$1/g; $s } + sort { length $b <=> length $a } keys %replace); + for(@target) { + s/($regexp)/join(" ",@{$replace{$1}})/ge; + } + } + ::debug("replace", "Return @target\n"); + return wantarray ? @target : "@target"; } - ::debug("replace", "Return @target\n"); - return wantarray ? @target : "@target"; } @@ -6921,7 +7227,7 @@ sub new { my $max_number_of_args = shift; my $return_files = shift; my @unget = (); - my ($count,%replacecount,$posrpl,$perlexpr,%len); + my ($count,$posrpl,$perlexpr); my @command = @$commandref; # If the first command start with '-' it is probably an option if($command[0] =~ /^\s*(-\S+)/) { @@ -6933,6 +7239,8 @@ sub new { } } # Replace replacement strings with {= perl expr =} + @command = merge_rpl_parts(@command); + # Protect matching inside {= perl expr =} # by replacing {= and =} with \257< and \257> for(@command) { @@ -6940,10 +7248,14 @@ sub new { ::error("Command cannot contain the character \257. Use a function for that.\n"); ::wait_and_exit(255); } - s/\Q$Global::parensleft\E(.*?)\Q$Global::parensright\E/\257<$1\257>/gx; + # Needs to match rightmost left parens (Perl defaults to leftmost) + # to deal with: {={==} + # Disallow \257 to avoid nested {= {= =} =} + while(s/([^\257]*) \Q$Global::parensleft\E ([^\257]*?) \Q$Global::parensright\E /$1\257<$2\257>/gx) {} } for my $rpl (keys %Global::rpl) { - # Replace the short hand string with the {= perl expr =} in $command and $opt::tagstring + # Replace the short hand string (--rpl) + # with the {= perl expr =} in $command and $opt::tagstring # Avoid replacing inside existing {= perl expr =} for(@command,@Global::ret_files) { while(s/((^|\257>)[^\257]*?) # Don't replace after \257 unless \257> @@ -6969,6 +7281,77 @@ sub new { } } } + # Add {} if no replacement strings in @command + my($replacecount_ref, $len_ref, @command) = + replacement_counts_and_lengths(@command); + if("@command" =~ /^\S*\257 \@unget, + 'command' => \@command, + 'replacecount' => $replacecount_ref, + 'arg_queue' => RecordQueue->new($read_from,$opt::colsep), + 'context_replace' => $context_replace, + 'len' => $len_ref, + 'max_number_of_args' => $max_number_of_args, + 'size' => undef, + 'return_files' => $return_files, + 'seq' => 1, + }, ref($class) || $class; +} + +sub merge_rpl_parts { + # '{=' 'perlexpr' '=}' => '{= perlexpr =}' + # Input: + # @in = the @command as given by the user + # Uses: + # $Global::parensleft + # $Global::parensright + # Returns: + # @command with parts merged to keep {= and =} as one + my @in = @_; + my @out; + my $l = quotemeta($Global::parensleft); + my $r = quotemeta($Global::parensright); + + while(@in) { + my $s = shift @in; + $_ = $s; + # Remove matching (right most) parens + while(s/(.*)$l.*?$r/$1/o) {} + if(/$l/o) { + # Missing right parens + while(@in) { + $s .= " ".shift @in; + $_ = $s; + while(s/(.*)$l.*?$r/$1/o) {} + if(not /$l/o) { + last; + } + } + } + push @out, $s; + } + return @out; +} + +sub replacement_counts_and_lengths { + # Count the number of different replacement strings. + # Find the lengths of context for context groups and non-context + # groups. + # If no {} found in @command: add it to @command + # + # Input: + # @command = command template + # Output: + # \%replacecount, \%len, @command + my @command = @_; + my (%replacecount,%len); my $sum = 0; while($sum == 0) { # Count how many times each replacement string is used @@ -6979,8 +7362,8 @@ sub new { for my $c (@cmd) { while($c =~ s/ \257<([^\257]*?)\257> /\000/x) { # %replacecount = { "perlexpr" => number of times seen } - # e.g { "$_++" => 2 } - $replacecount{$1} ++; + # e.g { "s/a/b/" => 2 } + $replacecount{$1}++; $sum++; } # Measure the length of the context around the {= perl expr =} @@ -7001,12 +7384,13 @@ sub new { # %replacecount = { "perlexpr" => number of times seen } # e.g { "$_++" => 2 } # But for tagstring we just need to mark it as seen - $replacecount{$1}||=1; + $replacecount{$1} ||= 1; } } if($opt::bar) { # If the command does not contain {} force it to be computed - $replacecount{""}||=1; + # as it is being used by --bar + $replacecount{""} ||= 1; } $len{'context'} = 0+$contextlen; @@ -7017,11 +7401,9 @@ sub new { " Non: ", $len{'noncontext'}, " Ctxgrp: ", $len{'contextgroups'}, " NonCtxGrp: ", $len{'noncontextgroups'}, "\n"); if($sum == 0) { - # Default command = {} - # If not replacement string: append {} if(not @command) { + # Default command = {} @command = ("\257<\257>"); - $Global::noquote = 1; } elsif(($opt::pipe or $opt::pipepart) and not $opt::fifo and not $opt::cat) { # With --pipe / --pipe-part you can have no replacement @@ -7032,19 +7414,7 @@ sub new { } } } - - return bless { - 'unget' => \@unget, - 'command' => \@command, - 'replacecount' => \%replacecount, - 'arg_queue' => RecordQueue->new($read_from,$opt::colsep), - 'context_replace' => $context_replace, - 'len' => \%len, - 'max_number_of_args' => $max_number_of_args, - 'size' => undef, - 'return_files' => $return_files, - 'seq' => 1, - }, ref($class) || $class; + return(\%replacecount,\%len,@command); } sub get { diff --git a/src/parallel.pod b/src/parallel.pod index 601d9eca..201762ed 100644 --- a/src/parallel.pod +++ b/src/parallel.pod @@ -3058,7 +3058,7 @@ To submit your jobs to the queue: You can of course use B<-S> to distribute the jobs to remote computers: - true >jobqueue; tail -f jobqueue | parallel -S .. + true >jobqueue; tail -n+0 -f jobqueue | parallel -S .. There is a a small issue when using GNU B as queue system/batch manager: You have to submit JobSlot number of jobs before diff --git a/src/parallel_tutorial.1 b/src/parallel_tutorial.1 index 219a35bb..788dbcae 100644 --- a/src/parallel_tutorial.1 +++ b/src/parallel_tutorial.1 @@ -133,7 +133,7 @@ .\" ======================================================================== .\" .IX Title "PARALLEL_TUTORIAL 1" -.TH PARALLEL_TUTORIAL 1 "2014-11-10" "20141022" "parallel" +.TH PARALLEL_TUTORIAL 1 "2014-11-26" "20141122" "parallel" .\" For nroff, turn off justification. Always turn off hyphenation; it makes .\" way too many mistakes in technical documents. .if n .ad l diff --git a/testsuite/tests-to-run/parallel-local-0.3s.sh b/testsuite/tests-to-run/parallel-local-0.3s.sh index 613efe9d..4a1334d1 100644 --- a/testsuite/tests-to-run/parallel-local-0.3s.sh +++ b/testsuite/tests-to-run/parallel-local-0.3s.sh @@ -23,11 +23,42 @@ echo '### Test bug #43284: {%} and {#} with --xapply'; echo '**' echo '### Test bug #43376: {%} and {#} with --pipe' -echo foo | parallel -q --pipe -k echo {#} -echo foo | parallel --pipe -k echo {%} -echo foo | parallel -q --pipe -k echo {%} -echo foo | parallel --pipe -k echo {#} + echo foo | parallel -q --pipe -k echo {#} + echo foo | parallel --pipe -k echo {%} + echo foo | parallel -q --pipe -k echo {%} + echo foo | parallel --pipe -k echo {#} echo '**' +echo '### {= and =} in different groups separated by space' + parallel echo {= s/a/b/ =} ::: a + parallel echo {= s/a/b/=} ::: a + parallel echo {= s/a/b/=}{= s/a/b/=} ::: a + parallel echo {= s/a/b/=}{=s/a/b/=} ::: a + parallel echo {= s/a/b/=}{= {= s/a/b/=} ::: a + parallel echo {= s/a/b/=}{={=s/a/b/=} ::: a + parallel echo {= s/a/b/ =} {={==} ::: a + parallel echo {={= =} ::: a + parallel echo {= {= =} ::: a + parallel echo {= {= =} =} ::: a + +echo '**' + +echo '### {} as part of the command' + echo p /bin/ls | parallel l{= s/p/s/ =} + echo /bin/ls-p | parallel --colsep '-' l{=2 s/p/s/ =} {1} + echo s /bin/ls | parallel l{} + echo /bin/ls | parallel ls {} + echo ls /bin/ls | parallel {} + echo ls /bin/ls | parallel + +echo '**' + +echo '### bug #43817: Some JP char cause problems in positional replacement strings' + parallel -k echo ::: '�<�>' '�<1 $_=2�>' 'ワ' + parallel -k echo {1} ::: '�<�>' '�<1 $_=2�>' 'ワ' + parallel -Xj1 echo ::: '�<�>' '�<1 $_=2�>' 'ワ' + parallel -Xj1 echo {1} ::: '�<�>' '�<1 $_=2�>' 'ワ' + + EOF diff --git a/testsuite/wanted-results/parallel-local-0.3s b/testsuite/wanted-results/parallel-local-0.3s index beaa6450..0dd3d6ba 100644 --- a/testsuite/wanted-results/parallel-local-0.3s +++ b/testsuite/wanted-results/parallel-local-0.3s @@ -16,13 +16,67 @@ echo '**' ** echo '### Test bug #43376: {%} and {#} with --pipe' ### Test bug #43376: {%} and {#} with --pipe -echo foo | parallel -q --pipe -k echo {#} + echo foo | parallel -q --pipe -k echo {#} 1 -echo foo | parallel --pipe -k echo {%} + echo foo | parallel --pipe -k echo {%} 1 -echo foo | parallel -q --pipe -k echo {%} + echo foo | parallel -q --pipe -k echo {%} 1 -echo foo | parallel --pipe -k echo {#} + echo foo | parallel --pipe -k echo {#} 1 echo '**' ** +echo '### {= and =} in different groups separated by space' +### {= and =} in different groups separated by space + parallel echo {= s/a/b/ =} ::: a +b + parallel echo {= s/a/b/=} ::: a +b + parallel echo {= s/a/b/=}{= s/a/b/=} ::: a +bb + parallel echo {= s/a/b/=}{=s/a/b/=} ::: a +bb + parallel echo {= s/a/b/=}{= {= s/a/b/=} ::: a +b{= b + parallel echo {= s/a/b/=}{={=s/a/b/=} ::: a +b{=b + parallel echo {= s/a/b/ =} {={==} ::: a +b {=a + parallel echo {={= =} ::: a +{=a + parallel echo {= {= =} ::: a +{= a + parallel echo {= {= =} =} ::: a +{= a =} +echo '**' +** +echo '### {} as part of the command' +### {} as part of the command + echo p /bin/ls | parallel l{= s/p/s/ =} +/bin/ls + echo /bin/ls-p | parallel --colsep '-' l{=2 s/p/s/ =} {1} +/bin/ls + echo s /bin/ls | parallel l{} +/bin/ls + echo /bin/ls | parallel ls {} +/bin/ls + echo ls /bin/ls | parallel {} +/bin/ls + echo ls /bin/ls | parallel +/bin/ls +echo '**' +** +echo '### bug #43817: Some JP char cause problems in positional replacement strings' +### bug #43817: Some JP char cause problems in positional replacement strings + parallel -k echo ::: '�<�>' '�<1 $_=2�>' 'ワ' +�<�> +�<1 $_=2�> +ワ + parallel -k echo {1} ::: '�<�>' '�<1 $_=2�>' 'ワ' +�<�> +�<1 $_=2�> +ワ + parallel -Xj1 echo ::: '�<�>' '�<1 $_=2�>' 'ワ' +�<�> �<1 $_=2�> ワ + parallel -Xj1 echo {1} ::: '�<�>' '�<1 $_=2�>' 'ワ' +�<�> diff --git a/testsuite/wanted-results/parallel-local1 b/testsuite/wanted-results/parallel-local1 index d4742ec8..268964f4 100644 --- a/testsuite/wanted-results/parallel-local1 +++ b/testsuite/wanted-results/parallel-local1 @@ -1,7 +1,7 @@ echo "bug #43654: --bar with command not using {}" bug #43654: --bar with command not using {} COLUMNS=80 stdout parallel --bar true {.} ::: 1 - # 0 sec 1 0 0% 0:1=0s 1  # 0 sec 1 100 100% 1:0=0s 1  + # 0 sec 1 0 0% 0:1=0s 1  # 0 sec 1 0 0% 0:1=0s 1  # 0 sec 1 0 0% 0:1=0s 1  # 0 sec 1 0 0% 0:1=0s 1  # 0 sec 1 0 0% 0:1=0s 1  # 0 sec 1 0 0% 0:1=0s 1  # 0 sec 1 0 0% 0:1=0s 1  # 0 sec 1 0 0% 0:1=0s 1  # 0 sec 1 0 0% 0:1=0s 1  # 0 sec 1 0 0% 0:1=0s 1  # 0 sec 1 0 0% 0:1=0s 1  # 0 sec 1 100 100% 1:0=0s 1  echo "### Test --basenamereplace" ### Test --basenamereplace parallel -j1 -k -X --basenamereplace FOO echo FOO ::: /a/b.c a/b.c b.c /a/b a/b b