diff --git a/parallel b/parallel index 835d01bf..6d015d02 100755 --- a/parallel +++ b/parallel @@ -457,7 +457,7 @@ sub binary_find_max_length { } sub acceptable_command_line_length { - # Test if this length can run + # Test if a command line of this length can run # This is done using external perl script to avoid warning # (Can this be done prettier?) my $len = shift; @@ -468,6 +468,148 @@ sub acceptable_command_line_length { } sub compute_number_of_processes { + # Number of processes wanted and limited by system ressources + my $opt_j = shift; + my $wanted_processes = user_requested_processes($opt_j); + my $system_limit = processes_available_by_system_limit($wanted_processes); + return $system_limit; +} + +sub processes_available_by_system_limit { + # If the wanted number of processes is bigger than the system limits: + # Limit them to the system limits + # Limits are: File handles, number of input lines, processes, + # and taking > 1 second to spawn 10 extra procs + my $wanted_processes = shift; + my $system_limit=0; +#GONE my @args=(); + my @command_lines=(); +#GONE my $next_arg; + my $next_command_line; + my $more_filehandles; + my $max_system_proc_reached=0; + my $spawning_too_slow=0; + my $time = time; + my %fh; + DoNotReap(); + + # Reserve filehandles + # perl uses 7 filehandles for something? + # parallel uses 1 for memory_usage + for my $i (1..8) { + open($fh{"init-$i"}," 1) { + # It took more than 1 second to fork ten processes. We should stop forking. + # Let us give the system a little slack + debug("\nLimiting processes to: $system_limit-10%=". + (int ($system_limit * 0.9)+1)."\n"); + $system_limit = int ($system_limit * 0.9)+1; + $spawning_too_slow = 1; + } + } while($system_limit <= $wanted_processes + and defined $next_command_line + and $more_filehandles + and not $max_system_proc_reached + and not $spawning_too_slow); + if($system_limit <= $wanted_processes + and not $more_filehandles) { + print STDERR ("Warning: Only enough filehandles to run ", + $system_limit, " jobs in parallel. ", + "Raising ulimit -n may help\n"); + } + if($system_limit <= $wanted_processes + and $max_system_proc_reached) { + print STDERR ("Warning: Only enough available processes to run ", + $system_limit, " jobs in parallel.\n"); + } + if($system_limit <= $wanted_processes + and $spawning_too_slow) { + print STDERR ("Warning: Starting 10 extra processes takes > 1 sec.\n", + "Limiting to ", $system_limit, " jobs in parallel.\n"); + } + # Cleanup: Close the files + for (keys %fh) { close $fh{$_} } + # Cleanup: Kill the children + for $pid (@children) { + kill 15, $pid; + waitpid($pid,0); + } + wait(); +# # Cleanup: Unget the args +# unget_arg(@args); + # Cleanup: Unget the command_lines + unget_command_line(@command_lines); + return $system_limit; +} + +sub user_requested_processes { + # Parse the number of processes that the user asked for + my $opt_j = shift; + if(defined $opt_j) { + if($opt_j =~ /^\+(\d+)$/) { + # E.g. -j +2 + my $j = $1; + $processes = $j + no_of_cpus(); + } elsif ($opt_j =~ /^-(\d+)$/) { + # E.g. -j -2 + my $j = $1; + $processes = no_of_cpus() - $j; + } elsif ($opt_j =~ /^(\d+)\%$/) { + my $j = $1; + $processes = no_of_cpus() * $j / 100; + } elsif ($opt_j =~ /^(\d+)$/) { + $processes = $1; + if($processes == 0) { + # -j 0 = infinity (or at least close) + $processes = 2**31; + } + } else { + die_usage(); + } + if($processes < 1) { + $processes = 1; + } + } + return $processes; +} + + +sub GONE_compute_number_of_processes { my $opt_j = shift; my $processes = 0; if(defined $opt_j) { @@ -514,7 +656,7 @@ sub compute_number_of_processes { return int $processes; } -sub min_of_args_and_processes { +sub GONE_min_of_args_and_processes { my $processes = shift; my $min_of_args_and_processes=0; my @args=(); @@ -564,7 +706,7 @@ sub NullReaper { while (waitpid(-1, &WNOHANG) > 0) { } } -sub compute_no_of_free_filehandles { +sub GONE_compute_no_of_free_filehandles { my $needed = shift; my $i=1; my %fh; @@ -629,12 +771,21 @@ sub init_run_jobs { sub next_command_line { my $cmd_line; - do { - $cmd_line = generate_command_line($Global::command); - } while (defined $cmd_line and $cmd_line =~ /^\s*$/); # Skip empty lines + if(@Global::unget_next_command_line) { + $cmd_line = shift @Global::unget_next_command_line; + } else { + do { + $cmd_line = generate_command_line($Global::command); + } while (defined $cmd_line and $cmd_line =~ /^\s*$/); # Skip empty lines + } return $cmd_line; } +sub unget_command_line { + push @Global::unget_next_command_line, @_; +} + + sub get_next_arg { my $arg; if(@Global::unget_arg) {