From 48b1743b84ead59d51ebde1eb06ebea1e34723b9 Mon Sep 17 00:00:00 2001 From: Ole Tange Date: Tue, 1 Jan 2019 14:34:04 +0100 Subject: [PATCH] parallel: Prototypes on most functions added. parallel: Control flow for --roundrobin simplyfied. --- src/parallel | 1074 +++++++++-------- testsuite/tests-to-run/parallel-local-30s.sh | 2 +- testsuite/tests-to-run/parallel-local-3s.sh | 25 + testsuite/tests-to-run/parallel-local-ssh9.sh | 2 + testsuite/wanted-results/parallel-local-3s | 2 + testsuite/wanted-results/parallel-local4 | 10 + 6 files changed, 580 insertions(+), 535 deletions(-) diff --git a/src/parallel b/src/parallel index 1fc2ec9c..a4a1a1f4 100755 --- a/src/parallel +++ b/src/parallel @@ -33,123 +33,8 @@ use Getopt::Long; use strict; use File::Basename; -save_stdin_stdout_stderr(); -save_original_signal_handler(); -parse_options(); -::debug("init", "Open file descriptors: ", join(" ",keys %Global::fd), "\n"); -my $number_of_args; -if($Global::max_number_of_args) { - $number_of_args = $Global::max_number_of_args; -} elsif ($opt::X or $opt::m or $opt::xargs) { - $number_of_args = undef; -} else { - $number_of_args = 1; -} - -my @command = @ARGV; -my @input_source_fh; -if($opt::pipepart) { - if($opt::tee) { - @input_source_fh = map { open_or_exit($_) } @opt::a; - # Remove the first: It will be the file piped. - shift @input_source_fh; - if(not @input_source_fh and not $opt::pipe) { - @input_source_fh = (*STDIN); - } - } else { - # -a is used for data - not for command line args - @input_source_fh = map { open_or_exit($_) } "/dev/null"; - } -} else { - @input_source_fh = map { open_or_exit($_) } @opt::a; - if(not @input_source_fh and not $opt::pipe) { - @input_source_fh = (*STDIN); - } -} -if($opt::sqlmaster) { - # Create SQL table to hold joblog + output - $Global::sql->create_table($#input_source_fh+1); - if($opt::sqlworker) { - # Start a real --sqlworker in the background later - $Global::start_sqlworker = 1; - $opt::sqlworker = undef; - } -} - -if($opt::skip_first_line) { - # Skip the first line for the first file handle - my $fh = $input_source_fh[0]; - <$fh>; -} - -set_input_source_header(); - -if($opt::filter_hosts and (@opt::sshlogin or @opt::sshloginfile)) { - # Parallel check all hosts are up. Remove hosts that are down - filter_hosts(); -} - -if($opt::nonall or $opt::onall) { - onall(\@input_source_fh,@command); - wait_and_exit(min(undef_as_zero($Global::exitstatus),254)); -} - -$Global::JobQueue = JobQueue->new( - \@command,\@input_source_fh,$Global::ContextReplace, - $number_of_args,\@Global::transfer_files,\@Global::ret_files); - -if($opt::pipepart) { - pipepart_setup(); -} elsif($opt::pipe and $opt::tee) { - pipe_tee_setup(); -} - -if($opt::eta or $opt::bar or $opt::shuf or $Global::halt_pct) { - # Count the number of jobs or shuffle all jobs - # before starting any. - # Must be done after ungetting any --pipepart jobs. - $Global::JobQueue->total_jobs(); -} -# Compute $Global::max_jobs_running -# Must be done after ungetting any --pipepart jobs. -max_jobs_running(); - -init_run_jobs(); -my $sem; -if($Global::semaphore) { - $sem = acquire_semaphore(); -} -$SIG{TERM} = \&start_no_new_jobs; - -if($opt::tee) { - # All jobs must be running in parallel for --tee - while(start_more_jobs()) {} - $Global::start_no_new_jobs = 1; - if(not $Global::JobQueue->empty()) { - ::error("--tee requres --jobs to be higher. Try --jobs 0."); - ::wait_and_exit(255); - } -} elsif($opt::pipe and not $opt::pipepart) { - # Fill all jobslots - while(start_more_jobs()) {} - spreadstdin(); -} else { - # Reap one - start one - while(reaper() + start_more_jobs()) {} -} -::debug("init", "Start draining\n"); -drain_job_queue(); -::debug("init", "Done draining\n"); -reapers(); -::debug("init", "Done reaping\n"); -if($Global::semaphore) { - $sem->release(); -} -cleanup(); -::debug("init", "Halt\n"); -halt(); - -sub set_input_source_header { +sub set_input_source_header($$) { + my ($command_ref,$input_source_fh_ref) = @_; if($opt::header and not $opt::pipe) { # split with colsep or \t # $header force $colsep = \t if undef? @@ -161,14 +46,14 @@ sub set_input_source_header { my $right = "\Q$Global::parensright\E"; my $r = $Global::parensright; my $id = 1; - for my $fh (@input_source_fh) { + for my $fh (@$input_source_fh_ref) { my $line = <$fh>; chomp($line); ::debug("init", "Delimiter: '$delimiter'"); for my $s (split /$delimiter/o, $line) { ::debug("init", "Colname: '$s'"); # Replace {colname} with {2} - for(@command,@Global::ret_files,@Global::transfer_files, + for(@$command_ref,@Global::ret_files,@Global::transfer_files, $opt::tagstring, $opt::workdir, $opt::results, $opt::retries) { # Skip if undefined @@ -183,14 +68,14 @@ sub set_input_source_header { } } else { my $id = 1; - for my $fh (@input_source_fh) { + for my $fh (@$input_source_fh_ref) { $Global::input_source_header{$id} = $id; $id++; } } } -sub max_jobs_running { +sub max_jobs_running() { # Compute $Global::max_jobs_running as the max number of jobs # running on each sshlogin. # Returns: @@ -207,7 +92,7 @@ sub max_jobs_running { return $Global::max_jobs_running; } -sub halt { +sub halt() { # Compute exit value, # wait for children to complete # and exit @@ -228,9 +113,11 @@ sub halt { } } -sub __PIPE_MODE__ {} -sub pipepart_setup { +sub __PIPE_MODE__() {} + + +sub pipepart_setup() { # Compute the blocksize # Generate the commands to extract the blocks # Push the commands on queue @@ -281,7 +168,7 @@ sub pipepart_setup { } } -sub pipe_tee_setup { +sub pipe_tee_setup() { # Create temporary fifos # Run 'tee fifo1 fifo2 fifo3 ... fifoN' in the background # This will spread the input to fifos @@ -309,7 +196,7 @@ sub pipe_tee_setup { @Global::cat_appends = map { ") < $_" } @fifos; } -sub pipe_part_files { +sub pipe_part_files(@) { # Given the bigfile # find header and split positions # make commands that 'cat's the partial file @@ -335,7 +222,7 @@ sub pipe_part_files { return @cat_prepends; } -sub find_header { +sub find_header($$) { # Compute the header based on $opt::header # Input: # $buf_ref = reference to read-in buffer @@ -362,7 +249,7 @@ sub find_header { return $header; } -sub find_split_positions { +sub find_split_positions($$$) { # Find positions in bigfile where recend is followed by recstart # Input: # $file = the file to read @@ -429,7 +316,7 @@ sub find_split_positions { return @pos; } -sub cat_partial { +sub cat_partial($@) { # Efficient command to copy from byte X to byte Y # Input: # $file = the file to read @@ -460,7 +347,7 @@ sub cat_partial { " perl -e '$script' @start_len |"; } -sub spreadstdin { +sub spreadstdin() { # read a record # Spawn a job and print the record to it. # Uses: @@ -654,7 +541,7 @@ sub spreadstdin { } } -sub recstartrecend { +sub recstartrecend() { # Uses: # $opt::recstart # $opt::recend @@ -692,7 +579,7 @@ sub recstartrecend { return ($recstart,$recend); } -sub nindex { +sub nindex($$) { # See if string is in buffer N times # Returns: # the position where the Nth copy is found @@ -709,7 +596,7 @@ sub nindex { my @robin_queue; my $sleep = 1; - sub round_robin_write { + sub round_robin_write($$$$$) { # Input: # $header_ref = ref to $header string # $block_ref = ref to $block to be written @@ -735,7 +622,8 @@ sub nindex { push @robin_queue, (sort { $a->seq() <=> $b->seq() } values %Global::running); } - if($opt::keeporder) { + do { + $written = 0; for my $job (@robin_queue) { if($job->block_length() > 0) { $written += $job->non_blocking_write(); @@ -748,33 +636,18 @@ sub nindex { last; } } - } else { - do { - $written = 0; - for my $job (@robin_queue) { - if($job->block_length() > 0) { - $written += $job->non_blocking_write(); - } else { - $job->set_block($header_ref, $buffer_ref, - $endpos, $recstart, $recend); - $block_passed = 1; - $job->set_virgin(0); - $written += $job->non_blocking_write(); - last; - } - } - if($written) { - $sleep = $sleep/1.5+0.001; - } - } while($written and not $block_passed); - } + if($written) { + $sleep = $sleep/1.5+0.001; + } + # Don't sleep if something is written + } while($written and not $block_passed); $sleep = ::reap_usleep($sleep); } return $written; } } -sub index64 { +sub index64($$$) { # Do index on strings > 2GB. # index in Perl < v5.22 does not work for > 2GB # Input: @@ -806,7 +679,7 @@ sub index64 { return -1; } -sub rindex64 { +sub rindex64($@) { # Do rindex on strings > 2GB. # rindex in Perl < v5.22 does not work for > 2GB # Input: @@ -847,7 +720,7 @@ sub rindex64 { return -1; } -sub shorten { +sub shorten($$) { # Do: substr($buf,0,$i) = ""; # Some Perl versions do not support $i > 2GB, so do this in 2GB chunks # Input: @@ -863,7 +736,7 @@ sub shorten { substr($$buf_ref,0,$i) = ""; } -sub write_record_to_pipe { +sub write_record_to_pipe($$$$$$) { # Fork then # Write record from pos 0 .. $endpos to pipe # Input: @@ -939,10 +812,10 @@ sub write_record_to_pipe { } -sub __SEM_MODE__ {} +sub __SEM_MODE__() {} -sub acquire_semaphore { +sub acquire_semaphore() { # Acquires semaphore. If needed: spawns to the background # Uses: # @Global::host @@ -966,10 +839,10 @@ sub acquire_semaphore { } -sub __PARSE_OPTIONS__ {} +sub __PARSE_OPTIONS__() {} -sub options_hash { +sub options_hash() { # Returns: # %hash = the GetOptions config return @@ -1140,7 +1013,7 @@ sub options_hash { ); } -sub get_options_from_array { +sub get_options_from_array($@) { # Run GetOptions on @array # Input: # $array_ref = ref to @ARGV to parse @@ -1182,7 +1055,7 @@ sub get_options_from_array { return $retval; } -sub parse_options { +sub parse_options(@) { # Returns: N/A init_globals(); my @argv_before = @ARGV; @@ -1516,7 +1389,7 @@ sub parse_options { if($opt::sqlworker) { $Global::membuffer ||= 1; } } -sub check_invalid_option_combinations { +sub check_invalid_option_combinations() { if(defined $opt::timeout and $opt::timeout !~ /^\d+(\.\d+)?%?$|^(\d+(\.\d+)?[dhms])+$/i) { ::error("--timeout must be seconds or percentage."); @@ -1569,7 +1442,7 @@ sub check_invalid_option_combinations { } } -sub init_globals { +sub init_globals() { # Defaults: $Global::version = 20181223; $Global::progname = 'parallel'; @@ -1674,7 +1547,7 @@ sub init_globals { $ENV{'HOME'} . "/.parallel"; } -sub parse_halt { +sub parse_halt() { # $opt::halt flavours # Uses: # $opt::halt @@ -1726,7 +1599,7 @@ sub parse_halt { } } -sub parse_replacement_string_options { +sub parse_replacement_string_options() { # Deal with --rpl # Uses: # %Global::rpl @@ -1746,7 +1619,7 @@ sub parse_replacement_string_options { # $opt::slotreplace # $opt::basenameextensionreplace - sub rpl { + sub rpl($$) { # Modify %Global::rpl # Replace $old with $new my ($old,$new) = @_; @@ -1779,7 +1652,7 @@ sub parse_replacement_string_options { } } -sub parse_semaphore { +sub parse_semaphore() { # Semaphore defaults # Must be done before computing number of processes and max_line_length # because when running as a semaphore GNU Parallel does not read args @@ -1843,7 +1716,7 @@ sub parse_semaphore { } } -sub record_env { +sub record_env() { # Record current %ENV-keys in $PARALLEL_HOME/ignored_vars # Returns: N/A my $ignore_filename = $Global::config_dir . "/ignored_vars"; @@ -1855,7 +1728,7 @@ sub record_env { } } -sub open_joblog { +sub open_joblog() { # Open joblog as specified by --joblog # Uses: # $opt::resume @@ -1982,7 +1855,7 @@ sub open_joblog { } } -sub open_csv { +sub open_csv() { if($opt::results) { # Output as CSV/TSV if($opt::results eq "-.csv" @@ -2007,7 +1880,7 @@ sub open_csv { } } -sub find_compression_program { +sub find_compression_program() { # Find a fast compression program # Returns: # $compress_program = compress program with options @@ -2051,7 +1924,7 @@ sub find_compression_program { return ("cat","cat"); } -sub read_options { +sub read_options() { # Read options from command line, profile and $PARALLEL # Uses: # $opt::shebang_wrap @@ -2170,7 +2043,7 @@ sub read_options { return @ARGV; } -sub arrayindex { +sub arrayindex() { # Similar to Perl's index function, but for arrays # Input: # $arr_ref1 = ref to @array1 to search in @@ -2186,7 +2059,7 @@ sub arrayindex { return $#before; } -sub read_args_from_command_line { +sub read_args_from_command_line() { # Arguments given on the command line after: # ::: ($Global::arg_sep) # :::: ($Global::arg_file_sep) @@ -2278,7 +2151,7 @@ sub read_args_from_command_line { return @new_argv; } -sub cleanup { +sub cleanup() { # Returns: N/A unlink keys %Global::unlink; map { rmdir $_ } keys %Global::unlink; @@ -2290,10 +2163,10 @@ sub cleanup { } -sub __QUOTING_ARGUMENTS_FOR_SHELL__ {} +sub __QUOTING_ARGUMENTS_FOR_SHELL__() {} -sub shell_quote { +sub shell_quote(@) { # Input: # @strings = strings to be quoted # Returns: @@ -2303,7 +2176,7 @@ sub shell_quote { : (join" ",map { Q($_) } @_); } -sub shell_quote_scalar_rc { +sub shell_quote_scalar_rc($) { # Quote for the rc-shell my $a = $_[0]; if(defined $a) { @@ -2321,7 +2194,7 @@ sub shell_quote_scalar_rc { return $a; } -sub shell_quote_scalar_csh { +sub shell_quote_scalar_csh($) { # Quote for (t)csh my $a = $_[0]; if(defined $a) { @@ -2342,7 +2215,7 @@ sub shell_quote_scalar_csh { return $a; } -sub shell_quote_scalar_default { +sub shell_quote_scalar_default($) { # Quote for other shells (Bourne compatibles) # Inputs: # $string = string to be quoted @@ -2362,7 +2235,7 @@ sub shell_quote_scalar_default { } } -sub shell_quote_scalar { +sub shell_quote_scalar($) { # Quote the string so the shell will not expand any special chars # Inputs: # $string = string to be quoted @@ -2386,14 +2259,14 @@ sub shell_quote_scalar { return shell_quote_scalar(@_); } -sub Q { +sub Q($) { # Q alias for ::shell_quote_scalar no warnings 'redefine'; *Q = \&::shell_quote_scalar; return Q(@_); } -sub shell_quote_file { +sub shell_quote_file($) { # Quote the string so shell will not expand any special chars # and prepend ./ if needed # Input: @@ -2412,7 +2285,7 @@ sub shell_quote_file { return Q($a); } -sub shell_words { +sub shell_words() { # Input: # $string = shell line # Returns: @@ -2421,7 +2294,7 @@ sub shell_words { return Text::ParseWords::shellwords(@_); } -sub perl_quote_scalar { +sub perl_quote_scalar($) { # Quote the string so perl's eval will not expand any special chars # Inputs: # $string = string to be quoted @@ -2434,13 +2307,13 @@ sub perl_quote_scalar { return $a; } -sub pQ { +sub pQ($) { # pQ alias for ::perl_quote_scalar *pQ = \&::perl_quote_scalar; return pQ(@_); } -sub unquote_printf { +sub unquote_printf() { # Convert \t \n \r \000 \0 # Inputs: # $string = string with \t \n \r \num \0 @@ -2456,10 +2329,10 @@ sub unquote_printf { } -sub __FILEHANDLES__ {} +sub __FILEHANDLES__() {} -sub save_stdin_stdout_stderr { +sub save_stdin_stdout_stderr() { # Remember the original STDIN, STDOUT and STDERR # and file descriptors opened by the shell (e.g. 3>/tmp/foo) # Uses: @@ -2489,7 +2362,7 @@ sub save_stdin_stdout_stderr { ::die_bug("Can't dup STDIN: $!"); } -sub enough_file_handles { +sub enough_file_handles() { # Check that we have enough filehandles available for starting # another job # Uses: @@ -2517,7 +2390,7 @@ sub enough_file_handles { } } -sub open_or_exit { +sub open_or_exit($) { # Open a file name or exit if the file cannot be opened # Inputs: # $file = filehandle or filename to open @@ -2541,7 +2414,7 @@ sub open_or_exit { return $fh; } -sub set_fh_blocking { +sub set_fh_blocking($) { # Set filehandle as blocking # Inputs: # $fh = filehandle to be blocking @@ -2558,7 +2431,7 @@ sub set_fh_blocking { fcntl($fh, &F_SETFL, $flags) || die $!; } -sub set_fh_non_blocking { +sub set_fh_non_blocking($) { # Set filehandle as non-blocking # Inputs: # $fh = filehandle to be blocking @@ -2576,7 +2449,7 @@ sub set_fh_non_blocking { } -sub __RUNNING_THE_JOBS_AND_PRINTING_PROGRESS__ {} +sub __RUNNING_THE_JOBS_AND_PRINTING_PROGRESS__() {} # Variable structure: @@ -2601,7 +2474,7 @@ sub __RUNNING_THE_JOBS_AND_PRINTING_PROGRESS__ {} # $Global::exitstatus = status code of GNU Parallel # $Global::quoting = quote the command to run -sub init_run_jobs { +sub init_run_jobs() { # Set Global variables and progress signal handlers # Do the copying of basefiles # Returns: N/A @@ -2767,7 +2640,7 @@ sub init_run_jobs { { my $no_more_file_handles_warned; - sub start_another_job { + sub start_another_job() { # If there are enough filehandles # and JobQueue not empty # and not $job is in joblog @@ -2866,7 +2739,7 @@ sub init_run_jobs { } } -sub init_progress { +sub init_progress() { # Uses: # $opt::bar # Returns: @@ -2880,7 +2753,7 @@ sub init_progress { $progress{'workerlist'}); } -sub drain_job_queue { +sub drain_job_queue(@) { # Uses: # $opt::progress # $Global::total_running @@ -2890,6 +2763,7 @@ sub drain_job_queue { # %Global::host # $Global::start_no_new_jobs # Returns: N/A + my @command = @_; if($opt::progress) { ::status_no_nl(init_progress()); } @@ -2968,7 +2842,7 @@ sub drain_job_queue { } } -sub toggle_progress { +sub toggle_progress() { # Turn on/off progress view # Uses: # $opt::progress @@ -2979,7 +2853,7 @@ sub toggle_progress { } } -sub progress { +sub progress() { # Uses: # $opt::bar # $opt::eta @@ -3178,7 +3052,7 @@ sub progress { { my ($rev,$reset); - sub bar { + sub bar() { # Return: # $status = bar with eta, completed jobs, arg and pct $rev ||= "\033[7m"; @@ -3212,7 +3086,7 @@ sub progress { { my ($columns,$last_column_time); - sub terminal_columns { + sub terminal_columns() { # Get the number of columns of the terminal. # Only update once per second. # Returns: @@ -3242,7 +3116,9 @@ sub progress { } } -sub get_job_with_sshlogin { +# Prototype forwarding +sub get_job_with_sshlogin($); +sub get_job_with_sshlogin($) { # Input: # $sshlogin = which host should the job be run on? # Uses: @@ -3307,10 +3183,10 @@ sub get_job_with_sshlogin { } -sub __REMOTE_SSH__ {} +sub __REMOTE_SSH__() {} -sub read_sshloginfiles { +sub read_sshloginfiles(@) { # Read a list of --slf's # Input: # @files = files or symbolic file names to read @@ -3320,7 +3196,7 @@ sub read_sshloginfiles { } } -sub expand_slf_shorthand { +sub expand_slf_shorthand($) { # Expand --slf shorthand into a read file name # Input: # $file = file or symbolic file name to read @@ -3348,7 +3224,7 @@ sub expand_slf_shorthand { return $file; } -sub read_sshloginfile { +sub read_sshloginfile($) { # Read sshloginfile into @Global::sshlogin # Input: # $file = file to read @@ -3381,7 +3257,7 @@ sub read_sshloginfile { } } -sub parse_sshlogin { +sub parse_sshlogin() { # Parse @Global::sshlogin into %Global::host. # Keep only hosts that are in one of the given ssh hostgroups. # Uses: @@ -3466,7 +3342,7 @@ sub parse_sshlogin { } } -sub remote_hosts { +sub remote_hosts() { # Return sshlogins that are not ':' # Uses: # %Global::host @@ -3475,7 +3351,7 @@ sub remote_hosts { return grep !/^:$/, keys %Global::host; } -sub setup_basefile { +sub setup_basefile() { # Transfer basefiles to each $sshlogin # This needs to be done before first jobs on $sshlogin is run # Uses: @@ -3511,7 +3387,7 @@ sub setup_basefile { } } -sub cleanup_basefile { +sub cleanup_basefile() { # Remove the basefiles transferred # Uses: # %Global::host @@ -3541,14 +3417,14 @@ sub cleanup_basefile { } } -sub run_parallel { +sub run_parallel() { my ($stdin,@args) = @_; my $cmd = join "",map { " $_ & " } split /\n/, $stdin; print $Global::original_stderr ` $cmd wait` ; return 0 } -sub _run_parallel { +sub _run_parallel() { # Run GNU Parallel # This should ideally just fork an internal copy # and not start it through a shell @@ -3592,7 +3468,7 @@ sub _run_parallel { return ($exitstatus,\@stdout,\@stderr); } -sub filter_hosts { +sub filter_hosts() { # Remove down --sshlogins from active duty. # Find ncpus, ncores, maxlen, time-to-login for each host. # Uses: @@ -3646,7 +3522,7 @@ sub filter_hosts { } } -sub parse_host_filtering { +sub parse_host_filtering() { # Input: # @lines = output from parallelized_host_filtering() # Returns: @@ -3744,7 +3620,7 @@ sub parse_host_filtering { \%maxlen, \%echo, \@down_hosts); } -sub parallelized_host_filtering { +sub parallelized_host_filtering() { # Uses: # %Global::host # Returns: @@ -3823,7 +3699,7 @@ sub parallelized_host_filtering { return @out; } -sub onall { +sub onall($@) { # Runs @command on all hosts. # Uses parallel to run @command on each host. # --jobs = number of hosts to run on simultaneously. @@ -3974,10 +3850,10 @@ sub onall { } -sub __SIGNAL_HANDLING__ {} +sub __SIGNAL_HANDLING__() {} -sub sigtstp { +sub sigtstp() { # Send TSTP signal (Ctrl-Z) to all children process groups # Uses: # %SIG @@ -3985,7 +3861,7 @@ sub sigtstp { signal_children("TSTP"); } -sub sigpipe { +sub sigpipe() { # Send SIGPIPE signal to all children process groups # Uses: # %SIG @@ -3993,7 +3869,7 @@ sub sigpipe { signal_children("PIPE"); } -sub signal_children { +sub signal_children() { # Send signal to all children process groups # and GNU Parallel itself # Uses: @@ -4007,7 +3883,7 @@ sub signal_children { kill $signal, $$; } -sub save_original_signal_handler { +sub save_original_signal_handler() { # Remember the original signal handler # Uses: # %Global::original_sig @@ -4034,7 +3910,7 @@ sub save_original_signal_handler { }; } -sub list_running_jobs { +sub list_running_jobs() { # Print running jobs on tty # Uses: # %Global::running @@ -4044,7 +3920,7 @@ sub list_running_jobs { } } -sub start_no_new_jobs { +sub start_no_new_jobs() { # Start no more jobs # Uses: # %Global::original_sig @@ -4061,7 +3937,7 @@ sub start_no_new_jobs { $Global::start_no_new_jobs ||= 1; } -sub reapers { +sub reapers() { # Run reaper until there are no more left # Returns: # @pids_reaped = pids of reaped processes @@ -4073,7 +3949,7 @@ sub reapers { return @pids_reaped; } -sub reaper { +sub reaper() { # A job finished: # * Set exitstatus, exitsignal, endtime. # * Free ressources for new job @@ -4153,10 +4029,10 @@ sub reaper { } -sub __USAGE__ {} +sub __USAGE__() {} -sub killall { +sub killall() { # Kill all jobs by killing their process groups # Uses: # $Global::start_no_new_jobs = we are stopping @@ -4167,7 +4043,7 @@ sub killall { kill_sleep_seq(keys %Global::running); } -sub kill_sleep_seq { +sub kill_sleep_seq(@) { # Send jobs TERM,TERM,KILL to processgroups # Input: # @pids = list of pids that are also processgroups @@ -4182,7 +4058,7 @@ sub kill_sleep_seq { } } -sub kill_sleep { +sub kill_sleep() { # Kill pids with a signal and wait a while for them to die # Input: # $signal = signal to send to @pids @@ -4216,7 +4092,7 @@ sub kill_sleep { return @pids; } -sub wait_and_exit { +sub wait_and_exit($) { # If we do not wait, we sometimes get segfault # Returns: N/A my $error = shift; @@ -4237,13 +4113,13 @@ sub wait_and_exit { exit($error); } -sub die_usage { +sub die_usage() { # Returns: N/A usage(); wait_and_exit(255); } -sub usage { +sub usage() { # Returns: N/A print join ("\n", @@ -4289,7 +4165,7 @@ sub usage { "",); } -sub citation_notice { +sub citation_notice() { # if --will-cite or --plain: do nothing # if stderr redirected: do nothing # if $PARALLEL_HOME/will-cite: do nothing @@ -4345,33 +4221,33 @@ sub citation_notice { } } -sub status { +sub status(@) { my @w = @_; my $fh = $Global::status_fd || *STDERR; print $fh map { ($_, "\n") } @w; flush $fh; } -sub status_no_nl { +sub status_no_nl(@) { my @w = @_; my $fh = $Global::status_fd || *STDERR; print $fh @w; flush $fh; } -sub warning { +sub warning(@) { my @w = @_; my $prog = $Global::progname || "parallel"; status_no_nl(map { ($prog, ": Warning: ", $_, "\n"); } @w); } -sub error { +sub error(@) { my @w = @_; my $prog = $Global::progname || "parallel"; status(map { ($prog.": Error: ". $_); } @w); } -sub die_bug { +sub die_bug($) { my $bugid = shift; print STDERR ("$Global::progname: This should not happen. You have found a bug.\n", @@ -4388,7 +4264,7 @@ sub die_bug { ::wait_and_exit(255); } -sub version { +sub version() { # Returns: N/A print join ("\n", @@ -4404,7 +4280,7 @@ sub version { ); } -sub citation { +sub citation() { # Returns: N/A my ($all_argv_ref,$argv_options_removed_ref) = @_; my $all_argv = "@$all_argv_ref"; @@ -4479,7 +4355,7 @@ sub citation { } } -sub show_limits { +sub show_limits() { # Returns: N/A print("Maximal size of command: ",Limits::Command::real_max_length(),"\n", "Maximal used size of command: ",Limits::Command::max_length(),"\n", @@ -4489,7 +4365,7 @@ sub show_limits { "press CTRL-D or CTRL-C\n"); } -sub embed { +sub embed() { # Give an embeddable version of GNU Parallel # Tested with: bash, zsh, ksh, ash, dash, sh my $randomstring = "cut-here-".join"", @@ -4591,10 +4467,11 @@ echo $p $y $c $h " $0 --embed > new_script"); } -sub __GENERIC_COMMON_FUNCTION__ {} + +sub __GENERIC_COMMON_FUNCTION__() {} -sub mkdir_or_die { +sub mkdir_or_die($) { # If dir is not executable: die my $dir = shift; # The eval is needed to catch exception from mkdir @@ -4605,7 +4482,7 @@ sub mkdir_or_die { } } -sub tmpfile { +sub tmpfile(@) { # Create tempfile as $TMPDIR/parXXXXX # Returns: # $filehandle = opened file handle @@ -4621,7 +4498,7 @@ sub tmpfile { } } -sub tmpname { +sub tmpname($) { # Select a name that does not exist # Do not create the file as it may be used for creating a socket (by tmux) # Remember the name in $Global::unlink to avoid hitting the same name twice @@ -4642,15 +4519,15 @@ sub tmpname { return $tmpname; } -sub tmpfifo { +sub tmpfifo() { # Find an unused name and mkfifo on it use POSIX qw(mkfifo); - my $tmpfifo = tmpname("fif",@_); + my $tmpfifo = tmpname("fif"); mkfifo($tmpfifo,0600); return $tmpfifo; } -sub rm { +sub rm(@) { # Remove file and remove it from %Global::unlink # Uses: # %Global::unlink @@ -4658,7 +4535,7 @@ sub rm { unlink @_; } -sub size_of_block_dev { +sub size_of_block_dev() { # Like -s but for block devices # Input: # $blockdev = file name of block device @@ -4676,7 +4553,7 @@ sub size_of_block_dev { } } -sub qqx { +sub qqx(@) { # Like qx but with clean environment (except for @keep) # and STDERR ignored # This is needed if the environment contains functions @@ -4697,12 +4574,12 @@ sub qqx { } } -sub uniq { +sub uniq(@) { # Remove duplicates and return unique values return keys %{{ map { $_ => 1 } @_ }}; } -sub min { +sub min(@) { # Returns: # Minimum value of array my $min; @@ -4715,7 +4592,7 @@ sub min { return $min; } -sub max { +sub max(@) { # Returns: # Maximum value of array my $max; @@ -4728,7 +4605,7 @@ sub max { return $max; } -sub sum { +sub sum() { # Returns: # Sum of values of array my @args = @_; @@ -4740,24 +4617,24 @@ sub sum { return $sum; } -sub undef_as_zero { +sub undef_as_zero($) { my $a = shift; return $a ? $a : 0; } -sub undef_as_empty { +sub undef_as_empty($) { my $a = shift; return $a ? $a : ""; } -sub undef_if_empty { +sub undef_if_empty($) { if(defined($_[0]) and $_[0] eq "") { return undef; } return $_[0]; } -sub multiply_binary_prefix { +sub multiply_binary_prefix(@) { # Evalualte numbers with binary prefix # Ki=2^10, Mi=2^20, Gi=2^30, Ti=2^40, Pi=2^50, Ei=2^70, Zi=2^80, Yi=2^80 # ki=2^10, mi=2^20, gi=2^30, ti=2^40, pi=2^50, ei=2^70, zi=2^80, yi=2^80 @@ -4806,7 +4683,7 @@ sub multiply_binary_prefix { return wantarray ? @v : $v[0]; } -sub multiply_time_units { +sub multiply_time_units($) { # Evalualte numbers with time units # s=1, m=60, h=3600, d=86400 # Input: @@ -4827,7 +4704,7 @@ sub multiply_time_units { return wantarray ? @v : $v[0]; } -sub seconds_to_time_units { +sub seconds_to_time_units() { # Convert seconds into ??d??h??m??s # s=1, m=60, h=3600, d=86400 # Input: @@ -4856,7 +4733,7 @@ sub seconds_to_time_units { { my ($disk_full_fh, $b8193, $error_printed); - sub exit_if_disk_full { + sub exit_if_disk_full() { # Checks if $TMPDIR is full by writing 8kb to a tmpfile # If the disk is full: Exit immediately. # Returns: @@ -4900,7 +4777,7 @@ sub seconds_to_time_units { } } -sub spacefree { +sub spacefree($$) { # Remove comments and spaces # Inputs: # $spaces = keep 1 space? @@ -4924,7 +4801,7 @@ sub spacefree { { my $hostname; - sub hostname { + sub hostname() { local $/ = "\n"; if(not $hostname) { $hostname = `hostname`; @@ -4935,7 +4812,7 @@ sub spacefree { } } -sub which { +sub which(@) { # Input: # @programs = programs to find the path to # Returns: @@ -5038,7 +4915,7 @@ sub which { { my %pid_parentpid_cmd; - sub pid_table { + sub pid_table() { # Returns: # %children_of = { pid -> children of pid } # %parent_of = { pid -> pid of parent } @@ -5103,7 +4980,7 @@ sub which { } } -sub now { +sub now() { # Returns time since epoch as in seconds with 3 decimals # Uses: # @Global::use @@ -5121,7 +4998,7 @@ sub now { return (int(TimeHiRestime()*1000))/1000; } -sub usleep { +sub usleep($) { # Sleep this many milliseconds. # Input: # $ms = milliseconds to sleep @@ -5130,7 +5007,7 @@ sub usleep { select(undef, undef, undef, $ms/1000); } -sub reap_usleep { +sub reap_usleep() { # Reap dead children. # If no dead children: Sleep specified amount with exponential backoff # Input: @@ -5188,7 +5065,7 @@ sub reap_usleep { } } -sub kill_youngest_if_over_limit { +sub kill_youngest_if_over_limit() { # Check each $sshlogin we are over limit # If over limit: kill off the youngest child # Put the child back in the queue. @@ -5213,7 +5090,7 @@ sub kill_youngest_if_over_limit { } } -sub kill_youngster_if_not_enough_mem { +sub kill_youngster_if_not_enough_mem() { # Check each $sshlogin if there is enough mem. # If less than 50% enough free mem: kill off the youngest child # Put the child back in the queue. @@ -5248,10 +5125,10 @@ sub kill_youngster_if_not_enough_mem { } -sub __DEBUGGING__ {} +sub __DEBUGGING__() {} -sub debug { +sub debug(@) { # Uses: # $Global::debug # %Global::fd @@ -5269,7 +5146,7 @@ sub debug { } } -sub my_memory_usage { +sub my_memory_usage() { # Returns: # memory usage if found # 0 otherwise @@ -5293,7 +5170,7 @@ sub my_memory_usage { } } -sub my_size { +sub my_size() { # Returns: # $size = size of object if Devel::Size is installed # -1 otherwise @@ -5306,7 +5183,7 @@ sub my_size { } } -sub my_dump { +sub my_dump(@) { # Returns: # ascii expression of object if Data::Dump(er) is installed # error code otherwise @@ -5332,25 +5209,25 @@ sub my_dump { } } -sub my_croak { +sub my_croak(@) { eval "use Carp; 1"; $Carp::Verbose = 1; croak(@_); } -sub my_carp { +sub my_carp() { eval "use Carp; 1"; $Carp::Verbose = 1; carp(@_); } -sub __OBJECT_ORIENTED_PARTS__ {} +sub __OBJECT_ORIENTED_PARTS__() {} package SSHLogin; -sub new { +sub new($$) { my $class = shift; my $sshlogin_string = shift; my $ncpus; @@ -5404,49 +5281,49 @@ sub new { }, ref($class) || $class; } -sub DESTROY { +sub DESTROY($) { my $self = shift; # Remove temporary files if they are created. ::rm($self->{'loadavg_file'}); ::rm($self->{'swap_activity_file'}); } -sub string { +sub string($) { my $self = shift; return $self->{'string'}; } -sub jobs_running { +sub jobs_running($) { my $self = shift; return ($self->{'jobs_running'} || "0"); } -sub inc_jobs_running { +sub inc_jobs_running($) { my $self = shift; $self->{'jobs_running'}++; } -sub dec_jobs_running { +sub dec_jobs_running($) { my $self = shift; $self->{'jobs_running'}--; } -sub set_maxlength { +sub set_maxlength($$) { my $self = shift; $self->{'maxlength'} = shift; } -sub maxlength { +sub maxlength($) { my $self = shift; return $self->{'maxlength'}; } -sub jobs_completed { +sub jobs_completed() { my $self = shift; return $self->{'jobs_completed'}; } -sub in_hostgroups { +sub in_hostgroups() { # Input: # @hostgroups = the hostgroups to look for # Returns: @@ -5456,18 +5333,18 @@ sub in_hostgroups { return grep { defined $self->{'hostgroups'}{$_} } @_; } -sub hostgroups { +sub hostgroups() { my $self = shift; return keys %{$self->{'hostgroups'}}; } -sub inc_jobs_completed { +sub inc_jobs_completed($) { my $self = shift; $self->{'jobs_completed'}++; $Global::total_completed++; } -sub set_max_jobs_running { +sub set_max_jobs_running($$) { my $self = shift; if(defined $self->{'max_jobs_running'}) { $Global::max_jobs_running -= $self->{'max_jobs_running'}; @@ -5481,15 +5358,16 @@ sub set_max_jobs_running { $self->{'orig_max_jobs_running'} ||= $self->{'max_jobs_running'}; } -sub memfree { +sub memfree() { # Returns: # $memfree in bytes my $self = shift; $self->memfree_recompute(); + # Return 1 if not defined. return (not defined $self->{'memfree'} or $self->{'memfree'}) } -sub memfree_recompute { +sub memfree_recompute() { my $self = shift; my $script = memfreescript(); @@ -5505,7 +5383,7 @@ sub memfree_recompute { { my $script; - sub memfreescript { + sub memfreescript() { # Returns: # shellscript for giving available memory in bytes if(not $script) { @@ -5581,7 +5459,7 @@ sub memfree_recompute { } } -sub limit { +sub limit($) { # Returns: # 0 = Below limit. Start another job. # 1 = Over limit. Start no jobs. @@ -5652,13 +5530,13 @@ sub limit { } -sub swapping { +sub swapping($) { my $self = shift; my $swapping = $self->swap_activity(); return (not defined $swapping or $swapping) } -sub swap_activity { +sub swap_activity($) { # If the currently known swap activity is too old: # Recompute a new one in the background # Returns: @@ -5711,7 +5589,7 @@ sub swap_activity { { my $script; - sub swapactivityscript { + sub swapactivityscript() { # Returns: # shellscript for detecting swap activity # @@ -5838,7 +5716,7 @@ sub swap_activity { } } -sub too_fast_remote_login { +sub too_fast_remote_login($) { my $self = shift; if($self->{'last_login_at'} and $self->{'time_to_login'}) { # sshd normally allows 10 simultaneous logins @@ -5855,17 +5733,17 @@ sub too_fast_remote_login { } } -sub last_login_at { +sub last_login_at($) { my $self = shift; return $self->{'last_login_at'}; } -sub set_last_login_at { +sub set_last_login_at($$) { my $self = shift; $self->{'last_login_at'} = shift; } -sub loadavg_too_high { +sub loadavg_too_high($) { my $self = shift; my $loadavg = $self->loadavg(); return (not defined $loadavg or @@ -5874,7 +5752,7 @@ sub loadavg_too_high { { my $cmd; - sub loadavg_cmd { + sub loadavg_cmd() { if(not $cmd) { # aix => "ps -ae -o state,command" # state wrong # bsd => "ps ax -o state,command" @@ -5941,7 +5819,7 @@ sub loadavg_too_high { } -sub loadavg { +sub loadavg($) { # If the currently know loadavg is too old: # Recompute a new one in the background # The load average is computed as the number of processes waiting for disk @@ -6007,7 +5885,7 @@ sub loadavg { return $self->{'loadavg'}; } -sub max_loadavg { +sub max_loadavg($) { my $self = shift; # If --load is a file it might be changed if($Global::max_load_file) { @@ -6027,12 +5905,12 @@ sub max_loadavg { return $self->{'max_loadavg'}; } -sub set_max_loadavg { +sub set_max_loadavg($$) { my $self = shift; $self->{'max_loadavg'} = shift; } -sub compute_max_loadavg { +sub compute_max_loadavg($) { # Parse the max loadaverage that the user asked for using --load # Returns: # max loadaverage @@ -6078,17 +5956,17 @@ sub compute_max_loadavg { return $load; } -sub time_to_login { +sub time_to_login($) { my $self = shift; return $self->{'time_to_login'}; } -sub set_time_to_login { +sub set_time_to_login($$) { my $self = shift; $self->{'time_to_login'} = shift; } -sub max_jobs_running { +sub max_jobs_running($) { my $self = shift; if(not defined $self->{'max_jobs_running'}) { my $nproc = $self->compute_number_of_processes($opt::jobs); @@ -6097,12 +5975,12 @@ sub max_jobs_running { return $self->{'max_jobs_running'}; } -sub orig_max_jobs_running { +sub orig_max_jobs_running($) { my $self = shift; return $self->{'orig_max_jobs_running'}; } -sub compute_number_of_processes { +sub compute_number_of_processes($) { # Number of processes wanted and limited by system resources # Returns: # Number of processes @@ -6131,7 +6009,7 @@ sub compute_number_of_processes { my @args; my $arg; - sub reserve_filehandles { + sub reserve_filehandles($) { # Reserves filehandle my $n = shift; for (1..$n) { @@ -6139,7 +6017,7 @@ sub compute_number_of_processes { } } - sub reserve_process { + sub reserve_process() { # Spawn a dummy process my $child; if($child = fork()) { @@ -6164,7 +6042,7 @@ sub compute_number_of_processes { } } - sub get_args_or_jobs { + sub get_args_or_jobs() { # Get an arg or a job (depending on mode) if($Global::semaphore or ($opt::pipe and not $opt::tee)) { # Skip: No need to get args @@ -6208,7 +6086,7 @@ sub compute_number_of_processes { } } - sub cleanup { + sub cleanup() { # Cleanup: Close the files for (values %fh) { close $_ } # Cleanup: Kill the children @@ -6224,7 +6102,7 @@ sub compute_number_of_processes { @jobs = (); } - sub processes_available_by_system_limit { + sub processes_available_by_system_limit($) { # If the wanted number of processes is bigger than the system limits: # Limit them to the system limits # Limits are: File handles, number of input lines, processes, @@ -6326,7 +6204,7 @@ sub compute_number_of_processes { } } -sub simultaneous_sshlogin_limit { +sub simultaneous_sshlogin_limit($) { # Test by logging in wanted number of times simultaneously # Returns: # min($wanted_processes,$working_simultaneous_ssh_logins-1) @@ -6356,7 +6234,7 @@ sub simultaneous_sshlogin_limit { return $ssh_limit; } -sub simultaneous_sshlogin { +sub simultaneous_sshlogin($) { # Using $sshlogin try to see if we can do $wanted_processes # simultaneous logins # (ssh host echo simul-login & ssh host echo simul-login & ...) | @@ -6383,12 +6261,12 @@ sub simultaneous_sshlogin { return $ssh_limit; } -sub set_ncpus { +sub set_ncpus($$) { my $self = shift; $self->{'ncpus'} = shift; } -sub user_requested_processes { +sub user_requested_processes($) { # Parse the number of processes that the user asked for using -j # Input: # $opt_P = string formatted as for -P @@ -6438,7 +6316,7 @@ sub user_requested_processes { return $processes; } -sub ncpus { +sub ncpus($) { # Number of CPU threads # --use_sockets_instead_of_threads = count socket instead # --use_cores_instead_of_threads = count physical cores instead @@ -6485,7 +6363,7 @@ sub ncpus { } -sub nproc { +sub nproc() { # Returns: # Number of threads using `nproc` my $no_of_threads = ::qqx("nproc"); @@ -6949,7 +6827,7 @@ sub sct_tru64() { } } -sub sshcommand { +sub sshcommand($) { # Returns: # $sshcommand = the command (incl options) to run when using ssh my $self = shift; @@ -6959,7 +6837,7 @@ sub sshcommand { return $self->{'sshcommand'}; } -sub serverlogin { +sub serverlogin($) { # Returns: # $sshcommand = the command (incl options) to run when using ssh my $self = shift; @@ -6969,7 +6847,7 @@ sub serverlogin { return $self->{'serverlogin'}; } -sub sshcommand_of_sshlogin { +sub sshcommand_of_sshlogin($) { # Compute ssh command and serverlogin from sshlogin # 'server' -> ('ssh -S /tmp/parallel-ssh-RANDOM/host-','server') # 'user@server' -> ('ssh','user@server') @@ -7030,7 +6908,7 @@ sub sshcommand_of_sshlogin { $self->{'serverlogin'} = $serverlogin; } -sub control_path_dir { +sub control_path_dir($) { # Returns: # $control_path_dir = dir of control path (for -M) my $self = shift; @@ -7045,7 +6923,7 @@ sub control_path_dir { return $self->{'control_path_dir'}; } -sub rsync_transfer_cmd { +sub rsync_transfer_cmd($) { # Command to run to transfer a file # Input: # $file = filename of file to transfer @@ -7077,7 +6955,7 @@ sub rsync_transfer_cmd { rsync()." $rsync_opts $file $serverlogin:$rsync_destdir"; } -sub cleanup_cmd { +sub cleanup_cmd($) { # Command to run to remove the remote file # Input: # $file = filename to remove @@ -7143,7 +7021,7 @@ sub cleanup_cmd { package JobQueue; -sub new { +sub new($) { my $class = shift; my $commandref = shift; my $read_from = shift; @@ -7163,7 +7041,7 @@ sub new { }, ref($class) || $class; } -sub get { +sub get($) { my $self = shift; $self->{'this_job_no'}++; @@ -7180,13 +7058,13 @@ sub get { } } -sub unget { +sub unget($) { my $self = shift; unshift @{$self->{'unget'}}, @_; $self->{'this_job_no'} -= @_; } -sub empty { +sub empty($) { my $self = shift; my $empty = (not @{$self->{'unget'}}) && $self->{'commandlinequeue'}->empty(); @@ -7194,7 +7072,7 @@ sub empty { return $empty; } -sub total_jobs { +sub total_jobs($) { my $self = shift; if(not defined $self->{'total_jobs'}) { if($opt::pipe and not $opt::tee) { @@ -7240,19 +7118,19 @@ sub total_jobs { return $self->{'total_jobs'}; } -sub flush_total_jobs { +sub flush_total_jobs($) { # Unset total_jobs to force recomputing my $self = shift; $self->{'total_jobs'} = undef; } -sub next_seq { +sub next_seq($) { my $self = shift; return $self->{'commandlinequeue'}->seq(); } -sub quote_args { +sub quote_args($) { my $self = shift; return $self->{'commandlinequeue'}->quote_args(); } @@ -7260,7 +7138,7 @@ sub quote_args { package Job; -sub new { +sub new($) { my $class = shift; my $commandlineref = shift; return bless { @@ -7291,28 +7169,28 @@ sub new { }, ref($class) || $class; } -sub replaced { +sub replaced($) { my $self = shift; $self->{'commandline'} or ::die_bug("commandline empty"); return $self->{'commandline'}->replaced(); } -sub seq { +sub seq($) { my $self = shift; return $self->{'commandline'}->seq(); } -sub set_seq { +sub set_seq($$) { my $self = shift; return $self->{'commandline'}->set_seq(shift); } -sub slot { +sub slot($) { my $self = shift; return $self->{'commandline'}->slot(); } -sub free_slot { +sub free_slot($) { my $self = shift; push @Global::slots, $self->slot(); } @@ -7320,7 +7198,7 @@ sub free_slot { { my($cattail); - sub cattail { + sub cattail() { # Returns: # $cattail = perl program for: # cattail "decompress program" writerpid [file_to_decompress or stdin] [file_to_unlink] @@ -7399,7 +7277,7 @@ sub free_slot { } } -sub openoutputfiles { +sub openoutputfiles($) { # Open files for STDOUT and STDERR # Set file handles in $self->fh my $self = shift; @@ -7510,7 +7388,7 @@ sub openoutputfiles { } } -sub print_verbose_dryrun { +sub print_verbose_dryrun($) { # If -v set: print command to stdout (possibly buffered) # This must be done before starting the command my $self = shift; @@ -7529,26 +7407,26 @@ sub print_verbose_dryrun { } } -sub add_rm { +sub add_rm($) { # Files to remove when job is done my $self = shift; push @{$self->{'unlink'}}, @_; } -sub get_rm { +sub get_rm($) { # Files to remove when job is done my $self = shift; return @{$self->{'unlink'}}; } -sub cleanup { +sub cleanup($) { # Remove files when job is done my $self = shift; unlink $self->get_rm(); delete @Global::unlink{$self->get_rm()}; } -sub grouped { +sub grouped($) { my $self = shift; # Set reading FD if using --group (--ungroup does not need) for my $fdno (1,2) { @@ -7563,7 +7441,7 @@ sub grouped { } } -sub empty_input_wrapper { +sub empty_input_wrapper($) { # If no input: exit(0) # If some input: Pass input as input to command on STDIN # This avoids starting the command if there is no input. @@ -7605,7 +7483,7 @@ sub empty_input_wrapper { } } -sub filter_through_compress { +sub filter_through_compress($) { my $self = shift; # Send stdout to stdin for $opt::compress_program(1) # Send stderr to stdin for $opt::compress_program(2) @@ -7638,19 +7516,19 @@ sub filter_through_compress { -sub set_fh { +sub set_fh($$$$) { # Set file handle my ($self, $fd_no, $key, $fh) = @_; $self->{'fd'}{$fd_no,$key} = $fh; } -sub fh { +sub fh($) { # Get file handle my ($self, $fd_no, $key) = @_; return $self->{'fd'}{$fd_no,$key}; } -sub write { +sub write($) { my $self = shift; my $remaining_ref = shift; my $stdin_fh = $self->fh(0,"w"); @@ -7668,7 +7546,7 @@ sub write { } } -sub set_block { +sub set_block($$$$$$) { # Copy stdin buffer from $block_ref up to $endpos # Prepend with $header_ref if virgin (i.e. not --roundrobin) # Remove $recstart and $recend if needed @@ -7692,18 +7570,18 @@ sub set_block { $self->add_transfersize($self->{'block_length'}); } -sub block_ref { +sub block_ref($) { my $self = shift; return \$self->{'block'}; } -sub block_length { +sub block_length($) { my $self = shift; return $self->{'block_length'}; } -sub remove_rec_sep { +sub remove_rec_sep($) { my ($block_ref,$recstart,$recend) = @_; # Remove record separator $$block_ref =~ s/$recend$recstart//gos; @@ -7711,7 +7589,7 @@ sub remove_rec_sep { $$block_ref =~ s/$recend$//os; } -sub non_blocking_write { +sub non_blocking_write($) { my $self = shift; my $something_written = 0; use POSIX qw(:errno_h); @@ -7741,34 +7619,34 @@ sub non_blocking_write { } -sub virgin { +sub virgin($) { my $self = shift; return $self->{'virgin'}; } -sub set_virgin { +sub set_virgin($$) { my $self = shift; $self->{'virgin'} = shift; } -sub pid { +sub pid($) { my $self = shift; return $self->{'pid'}; } -sub set_pid { +sub set_pid($$) { my $self = shift; $self->{'pid'} = shift; } -sub starttime { +sub starttime($) { # Returns: # UNIX-timestamp this job started my $self = shift; return sprintf("%.3f",$self->{'starttime'}); } -sub set_starttime { +sub set_starttime($@) { my $self = shift; my $starttime = shift || ::now(); $self->{'starttime'} = $starttime; @@ -7777,7 +7655,7 @@ sub set_starttime { $starttime); } -sub runtime { +sub runtime($) { # Returns: # Run time in seconds with 3 decimals my $self = shift; @@ -7785,7 +7663,7 @@ sub runtime { int(($self->endtime() - $self->starttime())*1000)/1000); } -sub endtime { +sub endtime($) { # Returns: # UNIX-timestamp this job ended # 0 if not ended yet @@ -7793,7 +7671,7 @@ sub endtime { return ($self->{'endtime'} || 0); } -sub set_endtime { +sub set_endtime($$) { my $self = shift; my $endtime = shift; $self->{'endtime'} = $endtime; @@ -7802,7 +7680,7 @@ sub set_endtime { $self->runtime()); } -sub is_timedout { +sub is_timedout($) { # Is the job timedout? # Input: # $delta_time = time that the job may run @@ -7813,13 +7691,13 @@ sub is_timedout { return time > $self->{'starttime'} + $delta_time; } -sub kill { +sub kill($) { my $self = shift; $self->set_exitstatus(-1); ::kill_sleep_seq($self->pid()); } -sub failed { +sub failed($) { # return number of times failed for this $sshlogin # Input: # $sshlogin @@ -7830,7 +7708,7 @@ sub failed { return $self->{'failed'}{$sshlogin}; } -sub failed_here { +sub failed_here($) { # return number of times failed for the current $sshlogin # Returns: # Number of times failed for this sshlogin @@ -7838,33 +7716,33 @@ sub failed_here { return $self->{'failed'}{$self->sshlogin()}; } -sub add_failed { +sub add_failed($) { # increase the number of times failed for this $sshlogin my $self = shift; my $sshlogin = shift; $self->{'failed'}{$sshlogin}++; } -sub add_failed_here { +sub add_failed_here($) { # increase the number of times failed for the current $sshlogin my $self = shift; $self->{'failed'}{$self->sshlogin()}++; } -sub reset_failed { +sub reset_failed($) { # increase the number of times failed for this $sshlogin my $self = shift; my $sshlogin = shift; delete $self->{'failed'}{$sshlogin}; } -sub reset_failed_here { +sub reset_failed_here($) { # increase the number of times failed for this $sshlogin my $self = shift; delete $self->{'failed'}{$self->sshlogin()}; } -sub min_failed { +sub min_failed($) { # Returns: # the number of sshlogins this command has failed on # the minimal number of times this command has failed @@ -7875,7 +7753,7 @@ sub min_failed { return ($number_of_sshlogins_failed_on,$min_failures); } -sub total_failed { +sub total_failed($) { # Returns: # $total_failures = the number of times this command has failed my $self = shift; @@ -7915,7 +7793,7 @@ sub total_failed { { my $script; - sub fifo_wrap { + sub fifo_wrap() { # Script to create a fifo, run a command on the fifo # while copying STDIN to the fifo, and finally # remove the fifo and return the exit code of the command. @@ -7952,7 +7830,7 @@ sub total_failed { } } -sub wrapped { +sub wrapped($) { # Wrap command with: # * --shellquote # * --nice @@ -8066,7 +7944,7 @@ sub wrapped { return $self->{'wrapped'}; } -sub set_sshlogin { +sub set_sshlogin($$) { my $self = shift; my $sshlogin = shift; $self->{'sshlogin'} = $sshlogin; @@ -8083,12 +7961,12 @@ sub set_sshlogin { } } -sub sshlogin { +sub sshlogin($) { my $self = shift; return $self->{'sshlogin'}; } -sub string_base64 { +sub string_base64($) { # Base64 encode strings into 1000 byte blocks. # 1000 bytes is the largest word size csh supports # Input: @@ -8100,7 +7978,7 @@ sub string_base64 { return @base64; } -sub string_zip_base64 { +sub string_zip_base64($) { # Pipe string through 'bzip2 -9' and base64 encode it into 1000 # byte blocks. # 1000 bytes is the largest word size csh supports @@ -8127,7 +8005,7 @@ sub string_zip_base64 { return @base64; } -sub base64_zip_eval { +sub base64_zip_eval() { # Script that: # * reads base64 strings from @ARGV # * decodes them @@ -8167,7 +8045,7 @@ sub base64_zip_eval { return $script; } -sub base64_wrap { +sub base64_wrap($) { # base64 encode Perl code # Split it into chunks of < 1000 bytes # Prepend it with a decoder that eval's it @@ -8182,7 +8060,7 @@ sub base64_wrap { join" ",::shell_quote(string_zip_base64($eval_string)); } -sub base64_eval { +sub base64_eval($) { # Script that: # * reads base64 strings from @ARGV # * decodes them @@ -8205,7 +8083,7 @@ sub base64_eval { return $script; } -sub sshlogin_wrap { +sub sshlogin_wrap($) { # Wrap the command with the commands needed to run remotely # Input: # $command = command to run @@ -8434,7 +8312,7 @@ sub sshlogin_wrap { return $self->{'sshlogin_wrap'}{$command}; } -sub transfer { +sub transfer($) { # Files to transfer # Non-quoted and with {...} substituted # Returns: @@ -8454,12 +8332,12 @@ sub transfer { return @transfer; } -sub transfersize { +sub transfersize($) { my $self = shift; return $self->{'transfersize'}; } -sub add_transfersize { +sub add_transfersize($) { my $self = shift; my $transfersize = shift; $self->{'transfersize'} += $transfersize; @@ -8468,7 +8346,7 @@ sub add_transfersize { $self->{'transfersize'}); } -sub sshtransfer { +sub sshtransfer($) { # Returns for each transfer file: # rsync $file remote:$workdir my $self = shift; @@ -8481,7 +8359,7 @@ sub sshtransfer { return join("",@pre); } -sub return { +sub return($) { # Files to return # Non-quoted and with {...} substituted # Returns: @@ -8491,7 +8369,7 @@ sub return { replace_placeholders($self->{'commandline'}{'return_files'},0,0); } -sub returnsize { +sub returnsize($) { # This is called after the job has finished # Returns: # $number_of_bytes transferred in return @@ -8504,7 +8382,7 @@ sub returnsize { return $self->{'returnsize'}; } -sub add_returnsize { +sub add_returnsize($) { my $self = shift; my $returnsize = shift; $self->{'returnsize'} += $returnsize; @@ -8513,7 +8391,7 @@ sub add_returnsize { $self->{'returnsize'}); } -sub sshreturn { +sub sshreturn($) { # Returns for each return-file: # rsync remote:$workdir/$file . my $self = shift; @@ -8555,7 +8433,7 @@ sub sshreturn { return $pre; } -sub sshcleanup { +sub sshcleanup($) { # Return the sshcommand needed to remove the file # Returns: # ssh command needed to remove files from sshlogin @@ -8576,7 +8454,7 @@ sub sshcleanup { return $cleancmd; } -sub remote_cleanup { +sub remote_cleanup($) { # Returns: # Files to remove at cleanup my $self = shift; @@ -8589,7 +8467,7 @@ sub remote_cleanup { } } -sub workdir { +sub workdir($) { # Returns: # the workdir on a remote machine my $self = shift; @@ -8645,7 +8523,7 @@ sub workdir { return $self->{'workdir'}; } -sub parentdirs_of { +sub parentdirs_of($) { # Return: # all parentdirs except . of this dir or file - sorted desc by length my $d = shift; @@ -8658,7 +8536,7 @@ sub parentdirs_of { return @parents; } -sub start { +sub start($) { # Setup STDOUT and STDERR for a job and start it. # Returns: # job-object or undef if job not to run @@ -8837,7 +8715,7 @@ sub start { } } -sub interactive_start { +sub interactive_start($) { my $self = shift; my $command = $self->wrapped(); if($Global::interactive) { @@ -8857,7 +8735,7 @@ sub interactive_start { { my $tmuxsocket; - sub tmux_wrap { + sub tmux_wrap($) { # Wrap command with tmux for session pPID # Input: # $actual_command = the actual command being run (incl ssh wrap) @@ -8953,7 +8831,7 @@ sub interactive_start { } } -sub is_already_in_results { +sub is_already_in_results($) { # Do we already have results for this job? # Returns: # $job_already_run = bool whether there is output for this or not @@ -8963,17 +8841,17 @@ sub is_already_in_results { return(-e $out."stdout" or -f $out); } -sub is_already_in_joblog { +sub is_already_in_joblog($) { my $job = shift; return vec($Global::job_already_run,$job->seq(),1); } -sub set_job_in_joblog { +sub set_job_in_joblog($) { my $job = shift; vec($Global::job_already_run,$job->seq(),1) = 1; } -sub should_be_retried { +sub should_be_retried($) { # Should this job be retried? # Returns # 0 - do not retry @@ -9008,7 +8886,7 @@ sub should_be_retried { { my (%print_later,$job_seq_to_print); - sub print_earlier_jobs { + sub print_earlier_jobs($) { # Print jobs whose output is postponed due to --keep-order # Returns: N/A my $job = shift; @@ -9035,7 +8913,7 @@ sub should_be_retried { } } -sub print { +sub print($) { # Print the output of the jobs # Returns: N/A @@ -9089,7 +8967,7 @@ sub print { } flush $out_fd; } - ::debug("print", "<{'exitstatus'} and not ($self->virgin() and $opt::pipe)) { if($Global::joblog and not $opt::sqlworker) { @@ -9110,14 +8988,14 @@ sub print { { my $header_printed; - sub print_csv { + sub print_csv($) { my $self = shift; my $cmd; if($Global::verbose <= 1) { $cmd = $self->replaced(); } else { # Verbose level > 1: Print the rsync and stuff - $cmd = "@command"; + $cmd = join " ", @{$self->{'commandline'}}; } my $record_ref = $self->{'commandline'}{'arg_list_flat_orig'}; @@ -9160,7 +9038,7 @@ sub print { } } -sub combine_ref { +sub combine_ref($) { # Inspired by Text::CSV_PP::_combine (by Makamaka Hannyaharamitu) my @part = @_; my $sep = $Global::csvsep; @@ -9207,7 +9085,7 @@ sub combine_ref { return @out; } -sub print_files { +sub print_files($) { # Print the name of the file containing stdout on stdout # Uses: # $opt::pipe @@ -9248,7 +9126,7 @@ sub print_files { } } -sub print_linebuffer { +sub print_linebuffer($) { my $self = shift; my ($fdno,$in_fh,$out_fd) = @_; if(defined $self->{'exitstatus'}) { @@ -9351,7 +9229,7 @@ sub print_linebuffer { } } -sub print_tag { +sub print_tag(@) { return print_normal(@_); } @@ -9365,7 +9243,7 @@ sub free_ressources() { } } -sub print_normal { +sub print_normal($) { my $self = shift; my ($fdno,$in_fh,$out_fd) = @_; my $buf; @@ -9411,14 +9289,14 @@ sub print_normal { } } -sub print_joblog { +sub print_joblog($) { my $self = shift; my $cmd; if($Global::verbose <= 1) { $cmd = $self->replaced(); } else { # Verbose level > 1: Print the rsync and stuff - $cmd = "@command"; + $cmd = join " ", @{$self->{'commandline'}}; } # Newlines make it hard to parse the joblog $cmd =~ s/\n/\0/g; @@ -9432,7 +9310,7 @@ sub print_joblog { $self->set_job_in_joblog(); } -sub tag { +sub tag($) { my $self = shift; if(not defined $self->{'tag'}) { if($opt::tag or defined $opt::tagstring) { @@ -9445,7 +9323,7 @@ sub tag { return $self->{'tag'}; } -sub hostgroups { +sub hostgroups($) { my $self = shift; if(not defined $self->{'hostgroups'}) { $self->{'hostgroups'} = @@ -9454,12 +9332,12 @@ sub hostgroups { return @{$self->{'hostgroups'}}; } -sub exitstatus { +sub exitstatus($) { my $self = shift; return $self->{'exitstatus'}; } -sub set_exitstatus { +sub set_exitstatus($$) { my $self = shift; my $exitstatus = shift; if($exitstatus) { @@ -9475,17 +9353,17 @@ sub set_exitstatus { $exitstatus); } -sub reset_exitstatus { +sub reset_exitstatus($) { my $self = shift; undef $self->{'exitstatus'}; } -sub exitsignal { +sub exitsignal($) { my $self = shift; return $self->{'exitsignal'}; } -sub set_exitsignal { +sub set_exitsignal($$) { my $self = shift; my $exitsignal = shift; $self->{'exitsignal'} = $exitsignal; @@ -9586,7 +9464,7 @@ sub set_exitsignal { package CommandLine; -sub new { +sub new($) { my $class = shift; my $seq = shift; my $commandref = shift; @@ -9621,17 +9499,17 @@ sub new { }, ref($class) || $class; } -sub seq { +sub seq($) { my $self = shift; return $self->{'seq'}; } -sub set_seq { +sub set_seq($$) { my $self = shift; $self->{'seq'} = shift; } -sub slot { +sub slot($) { # Find the number of a free job slot and return it # Uses: # @Global::slots - list with free jobslots @@ -9651,7 +9529,7 @@ sub slot { { my $already_spread; - sub populate { + sub populate($) { # Add arguments from arg_queue until the number of arguments or # max line length is reached # Uses: @@ -9738,7 +9616,7 @@ sub slot { } } -sub push { +sub push($) { # Add one or more records as arguments # Returns: N/A my $self = shift; @@ -9767,7 +9645,7 @@ sub push { } } -sub pop { +sub pop($) { # Remove last argument # Returns: # the last record @@ -9795,7 +9673,7 @@ sub pop { return $record; } -sub pop_all { +sub pop_all($) { # Remove all arguments and zeros the length of replacement perlexpr # Returns: # all records @@ -9810,7 +9688,7 @@ sub pop_all { return @popped; } -sub number_of_args { +sub number_of_args($) { # The number of records # Returns: # number of records @@ -9819,7 +9697,7 @@ sub number_of_args { return $#{$self->{'arg_list'}}+1; } -sub number_of_recargs { +sub number_of_recargs($) { # The number of args in records # Returns: # number of args records @@ -9832,7 +9710,7 @@ sub number_of_recargs { return $sum; } -sub args_as_string { +sub args_as_string($) { # Returns: # all unmodified arguments joined with ' ' (similar to {}) my $self = shift; @@ -9840,7 +9718,7 @@ sub args_as_string { map { @$_ } @{$self->{'arg_list'}}); } -sub results_out { +sub results_out($) { sub max_file_name_length { # Figure out the max length of a subdir # TODO and the max total length @@ -9916,7 +9794,7 @@ sub results_out { return $out; } -sub args_as_dirname { +sub args_as_dirname($) { # Returns: # all unmodified arguments joined with '/' (similar to {}) # \t \0 \\ and / are quoted as: \t \0 \\ \_ @@ -9949,7 +9827,7 @@ sub args_as_dirname { return join "/", @res; } -sub header_indexes_sorted { +sub header_indexes_sorted($) { # Sort headers first by number then by name. # E.g.: 1a 1b 11a 11b # Returns: @@ -9972,7 +9850,7 @@ sub header_indexes_sorted { return @header_indexes_sorted; } -sub len { +sub len($) { # Uses: # $opt::shellquote # The length of the command line with args substituted @@ -10035,7 +9913,7 @@ sub len { return $len; } -sub replaced { +sub replaced($) { # Uses: # $Global::noquote # $Global::quoting @@ -10062,7 +9940,7 @@ sub replaced { return $self->{'replaced'}; } -sub replace_placeholders { +sub replace_placeholders($$$$) { # Replace foo{}bar with fooargbar # Input: # $targetref = command as shell words @@ -10235,7 +10113,7 @@ sub replace_placeholders { # \0spc splits quotable groups if($quote) { if(@quotegroup) { - CORE::push @quoted, ::Q(join"",@quotegroup); + CORE::push @quoted, ::Q(join"",@quotegroup);; } } else { CORE::push @quoted, join"",@quotegroup; @@ -10249,7 +10127,7 @@ sub replace_placeholders { return wantarray ? @quoted : "@quoted"; } -sub skip { +sub skip($) { # Skip this job my $self = shift; $self->{'skip'} = 1; @@ -10258,7 +10136,7 @@ sub skip { package CommandLineQueue; -sub new { +sub new($) { my $class = shift; my $commandref = shift; my $read_from = shift; @@ -10414,7 +10292,7 @@ sub new { }, ref($class) || $class; } -sub merge_rpl_parts { +sub merge_rpl_parts($) { # '{=' 'perlexpr' '=}' => '{= perlexpr =}' # Input: # @in = the @command as given by the user @@ -10449,7 +10327,7 @@ sub merge_rpl_parts { return @out; } -sub replacement_counts_and_lengths { +sub replacement_counts_and_lengths($$@) { # Count the number of different replacement strings. # Find the lengths of context for context groups and non-context # groups. @@ -10533,7 +10411,7 @@ sub replacement_counts_and_lengths { return(\%replacecount,\%len,@command); } -sub get { +sub get($) { my $self = shift; if(@{$self->{'unget'}}) { my $cmd_line = shift @{$self->{'unget'}}; @@ -10589,12 +10467,12 @@ sub get { } } -sub unget { +sub unget($) { my $self = shift; unshift @{$self->{'unget'}}, @_; } -sub empty { +sub empty($) { my $self = shift; my $empty = (not @{$self->{'unget'}}) && $self->{'arg_queue'}->empty(); @@ -10602,17 +10480,17 @@ sub empty { return $empty; } -sub seq { +sub seq($) { my $self = shift; return $self->{'seq'}; } -sub set_seq { +sub set_seq($$) { my $self = shift; $self->{'seq'} = shift; } -sub quote_args { +sub quote_args($) { my $self = shift; # If there is not command emulate |bash return $self->{'command'}; @@ -10622,7 +10500,7 @@ sub quote_args { package Limits::Command; # Maximal command line length (for -m and -X) -sub max_length { +sub max_length($) { # Find the max_length of a command line and cache it # Returns: # number of chars on the longest command line allowed @@ -10656,7 +10534,7 @@ sub max_length { return int($Limits::Command::line_max_len); } -sub real_max_length { +sub real_max_length($) { # Find the max_length of a command line # Returns: # The maximal command line length @@ -10671,7 +10549,9 @@ sub real_max_length { return binary_find_max_length(int($len/16),$len); } -sub binary_find_max_length { +# Prototype forwarding +sub binary_find_max_length($$); +sub binary_find_max_length($$) { # Given a lower and upper bound find the max_length of a command line # Returns: # number of chars on the longest command line allowed @@ -10686,7 +10566,7 @@ sub binary_find_max_length { } } -sub is_acceptable_command_line_length { +sub is_acceptable_command_line_length($) { # Test if a command line of this length can run # in the current environment # Returns: @@ -10701,7 +10581,7 @@ sub is_acceptable_command_line_length { return not $?; } -sub tmux_length { +sub tmux_length($) { # If $opt::tmux set, find the limit for tmux # tmux 1.8 has a 2kB limit # tmux 1.9 has a 16kB limit @@ -10744,7 +10624,7 @@ sub tmux_length { package RecordQueue; -sub new { +sub new($) { my $class = shift; my $fhs = shift; my $colsep = shift; @@ -10767,7 +10647,7 @@ sub new { }, ref($class) || $class; } -sub get { +sub get($) { # Returns: # reference to array of Arg-objects my $self = shift; @@ -10809,14 +10689,14 @@ sub get { return $ret; } -sub unget { +sub unget($) { my $self = shift; ::debug("run", "RecordQueue-unget '@_'\n"); $self->{'arg_number'} -= @_; unshift @{$self->{'unget'}}, @_; } -sub empty { +sub empty($) { my $self = shift; my $empty = (not @{$self->{'unget'}}) && $self->{'arg_sub_queue'}->empty(); @@ -10824,7 +10704,7 @@ sub empty { return $empty; } -sub arg_number { +sub arg_number($) { my $self = shift; return $self->{'arg_number'}; } @@ -10832,7 +10712,7 @@ sub arg_number { package RecordColQueue; -sub new { +sub new($) { my $class = shift; my $fhs = shift; my @unget = (); @@ -10843,7 +10723,7 @@ sub new { }, ref($class) || $class; } -sub get { +sub get($) { # Returns: # reference to array of Arg-objects my $self = shift; @@ -10886,13 +10766,13 @@ sub get { } } -sub unget { +sub unget($) { my $self = shift; ::debug("run", "RecordColQueue-unget '@_'\n"); unshift @{$self->{'unget'}}, @_; } -sub empty { +sub empty($) { my $self = shift; my $empty = (not @{$self->{'unget'}}) && $self->{'arg_sub_queue'}->empty(); @@ -10903,7 +10783,7 @@ sub empty { package SQLRecordQueue; -sub new { +sub new($) { my $class = shift; my @unget = (); return bless { @@ -10911,7 +10791,7 @@ sub new { }, ref($class) || $class; } -sub get { +sub get($) { # Returns: # reference to array of Arg-objects my $self = shift; @@ -10921,13 +10801,13 @@ sub get { return $Global::sql->get_record(); } -sub unget { +sub unget($) { my $self = shift; ::debug("run", "SQLRecordQueue-unget '@_'\n"); unshift @{$self->{'unget'}}, @_; } -sub empty { +sub empty($) { my $self = shift; if(@{$self->{'unget'}}) { return 0; } my $get = $self->get(); @@ -10944,7 +10824,7 @@ package MultifileQueue; @Global::unget_argv=(); -sub new { +sub new($) { my $class = shift; my $fhs = shift; for my $fh (@$fhs) { @@ -10963,7 +10843,7 @@ sub new { }, ref($class) || $class; } -sub get { +sub get($) { my $self = shift; if($opt::link) { return $self->link_get(); @@ -10972,13 +10852,13 @@ sub get { } } -sub unget { +sub unget($) { my $self = shift; ::debug("run", "MultifileQueue-unget '@_'\n"); unshift @{$self->{'unget'}}, @_; } -sub empty { +sub empty($) { my $self = shift; my $empty = (not @Global::unget_argv) && not @{$self->{'unget'}}; @@ -10989,7 +10869,7 @@ sub empty { return $empty; } -sub link_get { +sub link_get($) { my $self = shift; if(@{$self->{'unget'}}) { return shift @{$self->{'unget'}}; @@ -11019,7 +10899,7 @@ sub link_get { } } -sub nest_get { +sub nest_get($) { my $self = shift; if(@{$self->{'unget'}}) { return shift @{$self->{'unget'}}; @@ -11101,7 +10981,7 @@ sub nest_get { return shift @{$self->{'unget'}}; } -sub read_arg_from_fh { +sub read_arg_from_fh($) { # Read one Arg from filehandle # Returns: # Arg-object with one read line @@ -11174,7 +11054,9 @@ sub read_arg_from_fh { } } -sub expand_combinations { +# Prototype forwarding +sub expand_combinations(@); +sub expand_combinations(@) { # Input: # ([xmin,xmax], [ymin,ymax], ...) # Returns: ([x,y,...],[x,y,...]) @@ -11208,7 +11090,7 @@ sub expand_combinations { package Arg; -sub new { +sub new($) { my $class = shift; my $orig = shift; my @hostgroups; @@ -11239,40 +11121,40 @@ sub new { }, ref($class) || $class; } -sub Q { +sub Q($) { # Q alias for ::shell_quote_scalar no warnings 'redefine'; *Q = \&::shell_quote_scalar; return Q(@_); } -sub pQ { +sub pQ($) { # pQ alias for ::perl_quote_scalar *pQ = \&::perl_quote_scalar; return pQ(@_); } -sub total_jobs { +sub total_jobs() { return $Global::JobQueue->total_jobs(); } { my %perleval; my $job; - sub skip { + sub skip() { # shorthand for $job->skip(); $job->skip(); } - sub slot { + sub slot() { # shorthand for $job->slot(); $job->slot(); } - sub seq { + sub seq() { # shorthand for $job->seq(); $job->seq(); } - sub replace { + sub replace($$$$) { # Calculates the corresponding value for a given perl expression # Returns: # The calculated string (quoted if asked for) @@ -11317,23 +11199,23 @@ sub total_jobs { } } -sub flush_cache { +sub flush_cache($) { # Flush cache of computed values my $self = shift; $self->{'cache'} = undef; } -sub orig { +sub orig($) { my $self = shift; return $self->{'orig'}; } -sub set_orig { +sub set_orig($$) { my $self = shift; $self->{'orig'} = shift; } -sub trim_of { +sub trim_of($) { # Removes white space as specifed by --trim: # n = nothing # l = start @@ -11361,7 +11243,7 @@ sub trim_of { package TimeoutQueue; -sub new { +sub new($) { my $class = shift; my $delta_time = shift; my ($pct); @@ -11382,22 +11264,22 @@ sub new { }, ref($class) || $class; } -sub delta_time { +sub delta_time($) { my $self = shift; return $self->{'delta_time'}; } -sub set_delta_time { +sub set_delta_time($$) { my $self = shift; $self->{'delta_time'} = shift; } -sub remedian { +sub remedian($) { my $self = shift; return $self->{'remedian'}; } -sub set_remedian { +sub set_remedian($$) { # Set median of the last 999^3 (=997002999) values using Remedian # # Rousseeuw, Peter J., and Gilbert W. Bassett Jr. "The remedian: A @@ -11413,7 +11295,7 @@ sub set_remedian { $self->{'remedian'} = (sort @{$rref->[2]})[$#{$rref->[2]}/2]; } -sub update_median_runtime { +sub update_median_runtime($) { # Update delta_time based on runtime of finished job if timeout is # a percentage my $self = shift; @@ -11425,7 +11307,7 @@ sub update_median_runtime { } } -sub process_timeouts { +sub process_timeouts($) { # Check if there was a timeout my $self = shift; # $self->{'queue'} is sorted by start time @@ -11449,7 +11331,7 @@ sub process_timeouts { } } -sub insert { +sub insert($) { my $self = shift; my $in = shift; push @{$self->{'queue'}}, $in; @@ -11458,7 +11340,7 @@ sub insert { package SQL; -sub new { +sub new($) { my $class = shift; my $dburl = shift; $Global::use{"DBI"} ||= eval "use DBI; 1;"; @@ -11504,7 +11386,9 @@ sub new { }, ref($class) || $class; } -sub get_alias { +# Prototype forwarding +sub get_alias($); +sub get_alias($) { my $alias = shift; $alias =~ s/^(sql:)*//; # Accept aliases prepended with sql: if ($alias !~ /^:/) { @@ -11564,7 +11448,7 @@ sub get_alias { } } -sub check_permissions { +sub check_permissions($) { my $file = shift; if(-e $file) { @@ -11583,7 +11467,7 @@ sub check_permissions { } } -sub parse_dburl { +sub parse_dburl($) { my $url = shift; my %options = (); # sql:mysql://[[user][:password]@][host][:port]/[database[/table][?query]] @@ -11636,7 +11520,7 @@ sub parse_dburl { return %options; } -sub uri_unescape { +sub uri_unescape($) { # Copied from http://cpansearch.perl.org/src/GAAS/URI-1.55/URI/Escape.pm # to avoid depending on URI::Escape # This section is (C) Gisle Aas. @@ -11656,7 +11540,7 @@ sub uri_unescape { $str; } -sub run { +sub run($) { my $self = shift; my $stmt = shift; if($self->{'driver'} eq "CSV") { @@ -11713,7 +11597,7 @@ sub run { return $sth; } -sub get { +sub get($) { my $self = shift; my $sth = $self->run(@_); my @retval; @@ -11726,24 +11610,24 @@ sub get { return \@retval; } -sub table { +sub table($) { my $self = shift; return $self->{'table'}; } -sub append { +sub append($) { my $self = shift; return $self->{'append'}; } -sub update { +sub update($) { my $self = shift; my $stmt = shift; my $table = $self->table(); $self->run("UPDATE $table $stmt",@_); } -sub output { +sub output($) { my $self = shift; my $commandline = shift; @@ -11753,7 +11637,7 @@ sub output { join("",@{$commandline->{'output'}{2}})); } -sub max_number_of_args { +sub max_number_of_args($) { # Maximal number of args for this table my $self = shift; if(not $self->{'max_number_of_args'}) { @@ -11771,12 +11655,12 @@ sub max_number_of_args { return $self->{'max_number_of_args'}; } -sub set_max_number_of_args { +sub set_max_number_of_args($$) { my $self = shift; $self->{'max_number_of_args'} = shift; } -sub create_table { +sub create_table($) { my $self = shift; if($self->append()) { return; } my $max_number_of_args = shift; @@ -11810,7 +11694,7 @@ sub create_table { Stderr $TEXT);}); } -sub insert_records { +sub insert_records($) { my $self = shift; my $seq = shift; my $command_ref = shift; @@ -11826,7 +11710,7 @@ sub insert_records { 0, @$record_ref[1..$#$record_ref]); } -sub get_record { +sub get_record($) { my $self = shift; my @retval; my $table = $self->table(); @@ -11853,7 +11737,7 @@ sub get_record { } } -sub total_jobs { +sub total_jobs($) { my $self = shift; my $table = $self->table(); my $v = $self->get("SELECT count(*) FROM $table;"); @@ -11864,7 +11748,7 @@ sub total_jobs { } } -sub max_seq { +sub max_seq($) { my $self = shift; my $table = $self->table(); my $v = $self->get("SELECT max(Seq) FROM $table;"); @@ -11875,7 +11759,7 @@ sub max_seq { } } -sub finished { +sub finished($) { # Check if there are any jobs left in the SQL table that do not # have a "real" exitval my $self = shift; @@ -11901,7 +11785,7 @@ package Semaphore; # process holding the entry. If the process dies, the entry can be # taken by another process. -sub new { +sub new($) { my $class = shift; my $id = shift; my $count = shift; @@ -11925,7 +11809,7 @@ sub new { }, ref($class) || $class; } -sub remove_dead_locks { +sub remove_dead_locks($) { my $self = shift; my $lockdir = $self->{'lockdir'}; @@ -11943,7 +11827,7 @@ sub remove_dead_locks { } } -sub acquire { +sub acquire($) { my $self = shift; my $sleep = 1; # 1 ms my $start_time = time; @@ -11982,7 +11866,7 @@ sub acquire { ::debug("sem", "acquired $self->{'pid'}\n"); } -sub release { +sub release($) { my $self = shift; ::rm($self->{'pidfile'}); if($self->nlinks() == 1) { @@ -11997,7 +11881,7 @@ sub release { ::debug("run", "released $self->{'pid'}\n"); } -sub pid_change { +sub pid_change($) { # This should do what release()+acquire() would do without having # to re-acquire the semaphore my $self = shift; @@ -12010,7 +11894,7 @@ sub pid_change { ::rm($old_pidfile); } -sub atomic_link_if_count_less_than { +sub atomic_link_if_count_less_than($) { # Link $file1 to $file2 if nlinks to $file1 < $count my $self = shift; my $retval = 0; @@ -12032,7 +11916,7 @@ sub atomic_link_if_count_less_than { return $retval; } -sub nlinks { +sub nlinks($) { my $self = shift; if(-e $self->{'idfile'}) { return (stat(_))[3]; @@ -12041,7 +11925,7 @@ sub nlinks { } } -sub lock { +sub lock($) { my $self = shift; my $sleep = 100; # 100 ms my $total_sleep = 0; @@ -12104,7 +11988,7 @@ sub lock { ::debug("run", "locked $self->{'lockfile'}"); } -sub unlock { +sub unlock($) { my $self = shift; ::rm($self->{'lockfile'}); close $self->{'lockfh'}; @@ -12116,3 +12000,125 @@ sub unlock { $opt::x = $Semaphore::timeout = $Semaphore::wait = $Job::file_descriptor_warning_printed = $Global::envdef = @Arg::arg = $Global::max_slot_number = $opt::session; + +package main; + +sub main() { + save_stdin_stdout_stderr(); + save_original_signal_handler(); + parse_options(); + ::debug("init", "Open file descriptors: ", join(" ",keys %Global::fd), "\n"); + my $number_of_args; + if($Global::max_number_of_args) { + $number_of_args = $Global::max_number_of_args; + } elsif ($opt::X or $opt::m or $opt::xargs) { + $number_of_args = undef; + } else { + $number_of_args = 1; + } + + my @command = @ARGV; + my @input_source_fh; + if($opt::pipepart) { + if($opt::tee) { + @input_source_fh = map { open_or_exit($_) } @opt::a; + # Remove the first: It will be the file piped. + shift @input_source_fh; + if(not @input_source_fh and not $opt::pipe) { + @input_source_fh = (*STDIN); + } + } else { + # -a is used for data - not for command line args + @input_source_fh = map { open_or_exit($_) } "/dev/null"; + } + } else { + @input_source_fh = map { open_or_exit($_) } @opt::a; + if(not @input_source_fh and not $opt::pipe) { + @input_source_fh = (*STDIN); + } + } + if($opt::sqlmaster) { + # Create SQL table to hold joblog + output + $Global::sql->create_table($#input_source_fh+1); + if($opt::sqlworker) { + # Start a real --sqlworker in the background later + $Global::start_sqlworker = 1; + $opt::sqlworker = undef; + } + } + + if($opt::skip_first_line) { + # Skip the first line for the first file handle + my $fh = $input_source_fh[0]; + <$fh>; + } + + set_input_source_header(\@command,\@input_source_fh); + + if($opt::filter_hosts and (@opt::sshlogin or @opt::sshloginfile)) { + # Parallel check all hosts are up. Remove hosts that are down + filter_hosts(); + } + + if($opt::nonall or $opt::onall) { + onall(\@input_source_fh,@command); + wait_and_exit(min(undef_as_zero($Global::exitstatus),254)); + } + + $Global::JobQueue = JobQueue->new( + \@command,\@input_source_fh,$Global::ContextReplace, + $number_of_args,\@Global::transfer_files,\@Global::ret_files); + + if($opt::pipepart) { + pipepart_setup(); + } elsif($opt::pipe and $opt::tee) { + pipe_tee_setup(); + } + + if($opt::eta or $opt::bar or $opt::shuf or $Global::halt_pct) { + # Count the number of jobs or shuffle all jobs + # before starting any. + # Must be done after ungetting any --pipepart jobs. + $Global::JobQueue->total_jobs(); + } + # Compute $Global::max_jobs_running + # Must be done after ungetting any --pipepart jobs. + max_jobs_running(); + + init_run_jobs(); + my $sem; + if($Global::semaphore) { + $sem = acquire_semaphore(); + } + $SIG{TERM} = \&start_no_new_jobs; + + if($opt::tee) { + # All jobs must be running in parallel for --tee + while(start_more_jobs()) {} + $Global::start_no_new_jobs = 1; + if(not $Global::JobQueue->empty()) { + ::error("--tee requres --jobs to be higher. Try --jobs 0."); + ::wait_and_exit(255); + } + } elsif($opt::pipe and not $opt::pipepart) { + # Fill all jobslots + while(start_more_jobs()) {} + spreadstdin(); + } else { + # Reap one - start one + while(reaper() + start_more_jobs()) {} + } + ::debug("init", "Start draining\n"); + drain_job_queue(@command); + ::debug("init", "Done draining\n"); + reapers(); + ::debug("init", "Done reaping\n"); + if($Global::semaphore) { + $sem->release(); + } + cleanup(); + ::debug("init", "Halt\n"); + halt(); +} + +main(); diff --git a/testsuite/tests-to-run/parallel-local-30s.sh b/testsuite/tests-to-run/parallel-local-30s.sh index 269f531d..39112032 100755 --- a/testsuite/tests-to-run/parallel-local-30s.sh +++ b/testsuite/tests-to-run/parallel-local-30s.sh @@ -251,4 +251,4 @@ par_macron() { export -f $(compgen -A function | grep par_) compgen -A function | grep par_ | sort | - parallel -j0 --tag -k --joblog /tmp/jl-`basename $0` '{} 2>&1' + parallel --delay 0.3 -j0 --tag -k --joblog /tmp/jl-`basename $0` '{} 2>&1' diff --git a/testsuite/tests-to-run/parallel-local-3s.sh b/testsuite/tests-to-run/parallel-local-3s.sh index 8aa4bc51..c2446e7c 100644 --- a/testsuite/tests-to-run/parallel-local-3s.sh +++ b/testsuite/tests-to-run/parallel-local-3s.sh @@ -245,6 +245,31 @@ par_nice() { parallel --retries 10 '! kill -TERM' ::: $pid 2>/dev/null } +par_test_diff_roundrobin_k() { + echo '### test there is difference on -k' + . $(which env_parallel.bash) + mytest() { + K=$1 + doit() { + perl -ne 'select(undef, undef, undef, rand()/10000);print' | + md5sum + } + export -f doit + seq 100000 | parallel --block 2k --pipe $K --roundrobin doit | sort + } + export -f mytest + parset a,b,c mytest ::: -k -k '' + # a == b and a != c or error + if [ "$a" == "$b" ]; then + if [ "$a" != "$c" ]; then + echo OK + else + echo error + fi + else + echo error + fi +} export -f $(compgen -A function | grep par_) compgen -A function | grep par_ | LC_ALL=C sort | parallel -j6 --tag -k '{} 2>&1' diff --git a/testsuite/tests-to-run/parallel-local-ssh9.sh b/testsuite/tests-to-run/parallel-local-ssh9.sh index 269efe78..21392b84 100644 --- a/testsuite/tests-to-run/parallel-local-ssh9.sh +++ b/testsuite/tests-to-run/parallel-local-ssh9.sh @@ -155,6 +155,8 @@ par_env_parallel_big_env() { par_no_route_to_host() { echo '### no route to host with | and -j0 causes inf loop' + # Broken in parallel-20121122 .. parallel-20181022 + # parallel-20181022 -j0 -S 185.75.195.218 echo ::: {1..11} via_parallel() { seq 11 | stdout parallel -j0 -S $1 echo } diff --git a/testsuite/wanted-results/parallel-local-3s b/testsuite/wanted-results/parallel-local-3s index bfda3815..7c4d7c7c 100644 --- a/testsuite/wanted-results/parallel-local-3s +++ b/testsuite/wanted-results/parallel-local-3s @@ -177,6 +177,8 @@ par_sqlworker_hostname host par_sqlworker_hostname par_sqlworker_hostname par_sqlworker_hostname +par_test_diff_roundrobin_k ### test there is difference on -k +par_test_diff_roundrobin_k OK par_wrong_slot_rpl_resume ### bug #47644: Wrong slot number replacement when resuming par_wrong_slot_rpl_resume 1 0 par_wrong_slot_rpl_resume 2 1 diff --git a/testsuite/wanted-results/parallel-local4 b/testsuite/wanted-results/parallel-local4 index 49539996..0485ae1d 100644 --- a/testsuite/wanted-results/parallel-local4 +++ b/testsuite/wanted-results/parallel-local4 @@ -92,6 +92,11 @@ X : XXXXXXXXXX.XXX X.XXX X X X X true X echo '### How do we deal with missing $HOME' ### How do we deal with missing $HOME unset HOME; stdout perl -w $(which parallel) -k echo ::: 1 2 3 +main::shell_quote_scalar() called too early to check prototype at /usr/local/bin/parallel line 2299. +main::Q() called too early to check prototype at /usr/local/bin/parallel line 2306. +main::pQ() called too early to check prototype at /usr/local/bin/parallel line 2353. +Arg::Q() called too early to check prototype at /usr/local/bin/parallel line 11163. +Arg::pQ() called too early to check prototype at /usr/local/bin/parallel line 11169. parallel: Warning: $HOME not set. Using /tmp. 1 2 @@ -99,6 +104,11 @@ parallel: Warning: $HOME not set. Using /tmp. echo '### How do we deal with missing $SHELL' ### How do we deal with missing $SHELL unset SHELL; stdout perl -w $(which parallel) -k echo ::: 1 2 3 +main::shell_quote_scalar() called too early to check prototype at /usr/local/bin/parallel line 2299. +main::Q() called too early to check prototype at /usr/local/bin/parallel line 2306. +main::pQ() called too early to check prototype at /usr/local/bin/parallel line 2353. +Arg::Q() called too early to check prototype at /usr/local/bin/parallel line 11163. +Arg::pQ() called too early to check prototype at /usr/local/bin/parallel line 11169. 1 2 3