From ed6903976bf4bad67a688d85b1e5ebe1678e3e39 Mon Sep 17 00:00:00 2001 From: Ole Tange Date: Tue, 7 Aug 2012 00:52:08 +0200 Subject: [PATCH] Better support for limited number of processes in process table. --- src/parallel | 130 ++++++++++++++-------- testsuite/tests-to-run/parallel-local9.sh | 6 +- testsuite/tests-to-run/test23.sh | 3 +- testsuite/tests-to-run/test30.sh | 2 +- testsuite/wanted-results/parallel-local9 | 8 +- testsuite/wanted-results/test23 | 42 +++---- testsuite/wanted-results/test30 | 3 +- 7 files changed, 115 insertions(+), 79 deletions(-) diff --git a/src/parallel b/src/parallel index cd369438..f25a0d2f 100755 --- a/src/parallel +++ b/src/parallel @@ -408,7 +408,7 @@ sub write_record_to_pipe { $job->write(\$record); my $fh = $job->stdin(); close $fh; - exit; + exit(0); } my $fh = $job->stdin(); close $fh; @@ -584,7 +584,7 @@ sub get_options_from_array { sub parse_options { # Returns: N/A # Defaults: - $Global::version = 20120722; + $Global::version = 20120806; $Global::progname = 'parallel'; $Global::infinity = 2**31; $Global::debug = 0; @@ -1220,8 +1220,20 @@ sub start_another_job { debug("Started as seq ",$job->seq()," pid:",$job->pid(),"\n"); return 1; } else { - # If interactive says: Dont run the job, then skip it and run the next - return start_another_job($sshlogin); + # Not enough processes to run the job. + # Put it back on the queue. + $Global::JobQueue->unget($job); + # Count down the number of jobs to run for this SSHLogin. + my $max = $sshlogin->max_jobs_running(); + if($max > 1) { $max--; } + $sshlogin->set_max_jobs_running($max); + # Sleep up to 300 ms to give other processes time to die + ::usleep(rand()*300); + print $Global::original_stderr + ("parallel: Warning: No more processes: ". + "Decreasing number of running jobs to $max. ". + "Raising ulimit -u may help.\n"); + return 0; } } } else { @@ -1548,7 +1560,7 @@ sub read_sshloginfile { } else { if(not open(IN, $file)) { print $Global::original_stderr "Cannot open $file\n"; - exit(255); + ::wait_and_exit(255); } } while() { @@ -2362,7 +2374,7 @@ sub compute_max_loadavg { $load = $self->compute_max_loadavg($opt_load_file); } else { print $Global::original_stderr "Cannot open $loadspec\n"; - exit(255); + ::wait_and_exit(255); } } else { print $Global::original_stderr "Parsing of --load failed\n"; @@ -2407,11 +2419,6 @@ sub compute_number_of_processes { ::debug("Wanted procs: $wanted_processes\n"); my $system_limit = $self->processes_available_by_system_limit($wanted_processes); - if($system_limit < 1) { - print STDERR "$Global::progname: Cannot spawn more jobs. ", - "Raising ulimit -u may help.\n"; - exit(255); - } ::debug("Limited to procs: $system_limit\n"); return $system_limit; } @@ -2537,20 +2544,6 @@ sub processes_available_by_system_limit { $slow_spawining_warning_printed = 1; } } - if($system_limit < $wanted_processes and not $more_filehandles) { - print $Global::original_stderr - ("parallel: Warning: Only enough filehandles to run ", - $system_limit, " jobs in parallel. ", - "Raising ulimit -n may help.\n"); - } - if($system_limit < $wanted_processes and $max_system_proc_reached) { - print $Global::original_stderr - ("parallel: Warning: Only enough available processes to run ", - $system_limit, " jobs in parallel.\n"); - } - if($Global::JobQueue->empty()) { - $system_limit ||= 1; - } # Cleanup: Close the files for (values %fh) { close $_ } # Cleanup: Kill the children @@ -2559,10 +2552,31 @@ sub processes_available_by_system_limit { waitpid($pid,0); delete $Global::unkilled_children{$pid}; } - #wait(); # Cleanup: Unget the command_lines or the @args $Global::JobQueue->{'commandlinequeue'}->{'arg_queue'}->unget(@args); $Global::JobQueue->unget(@jobs); + if($system_limit < $wanted_processes) { + # The system_limit is less than the wanted_processes + if($system_limit < 1 and not $Global::JobQueue->empty()) { + print $Global::original_stderr "$Global::progname: Cannot spawn any jobs. ", + "Raising ulimit -u may help.\n"; + ::wait_and_exit(255); + } + if(not $more_filehandles) { + print $Global::original_stderr + ("parallel: Warning: Only enough filehandles to run ", + $system_limit, " jobs in parallel. ", + "Raising ulimit -n may help.\n"); + } + if($max_system_proc_reached) { + print $Global::original_stderr + ("parallel: Warning: Only enough available processes to run ", + $system_limit, " jobs in parallel. Raising ulimit -u may help.\n"); + } + } + if($Global::JobQueue->empty()) { + $system_limit ||= 1; + } if($self->string() ne ":" and $system_limit > $Global::default_simultaneous_sshlogins) { $system_limit = @@ -2659,7 +2673,7 @@ sub user_requested_processes { $processes = $self->user_requested_processes($opt_P_file); } else { print $Global::original_stderr "Cannot open $opt_P\n"; - exit(255); + ::wait_and_exit(255); } } else { print $Global::original_stderr "Parsing of --jobs/-j/--max-procs/-P failed\n"; @@ -3663,24 +3677,30 @@ sub start { if($::opt_dryrun) { $command = "true"; } - $Global::total_running++; - $Global::total_started++; $ENV{'PARALLEL_SEQ'} = $job->seq(); $ENV{'PARALLEL_PID'} = $$; ::debug("$Global::total_running processes. Starting (" . $job->seq() . "): $command\n"); if($::opt_pipe) { my ($in); - $pid = ::open3($in, ">&OUT", ">&ERR", $ENV{SHELL}, "-c", $command) || - ::die_bug("open3-pipe"); + # The eval is needed to catch exception from open3 + eval { + $pid = ::open3($in, ">&OUT", ">&ERR", $ENV{SHELL}, "-c", $command) || + ::die_bug("open3-pipe"); + 1; + }; $job->set_stdin($in); } elsif(@::opt_a and not $Global::stdin_in_opt_a and $job->seq() == 1 and $job->sshlogin()->string() eq ":") { # Give STDIN to the first job if using -a (but only if running # locally - otherwise CTRL-C does not work for other jobs Bug#36585) *IN = *STDIN; - $pid = ::open3("<&IN", ">&OUT", ">&ERR", $ENV{SHELL}, "-c", $command) || - ::die_bug("open3-a"); + # The eval is needed to catch exception from open3 + eval { + $pid = ::open3("<&IN", ">&OUT", ">&ERR", $ENV{SHELL}, "-c", $command) || + ::die_bug("open3-a"); + 1; + }; # Re-open to avoid complaining open STDIN, "<&", $Global::original_stdin or ::die_bug("dup-\$Global::original_stdin: $!"); @@ -3688,22 +3708,38 @@ sub start { open(DEVTTY, "/dev/tty")) { # Give /dev/tty to the command if no one else is using it *IN = *DEVTTY; - $pid = ::open3("<&IN", ">&OUT", ">&ERR", $ENV{SHELL}, "-c", $command) || - ::die_bug("open3-/dev/tty"); - $Global::tty_taken = $pid; - close DEVTTY; + # The eval is needed to catch exception from open3 + eval { + $pid = ::open3("<&IN", ">&OUT", ">&ERR", $ENV{SHELL}, "-c", $command) || + ::die_bug("open3-/dev/tty"); + $Global::tty_taken = $pid; + close DEVTTY; + 1; + }; } else { - $pid = ::open3(::gensym, ">&OUT", ">&ERR", $ENV{SHELL}, "-c", $command) || - ::die_bug("open3-gensym"); + eval { + $pid = ::open3(::gensym, ">&OUT", ">&ERR", $ENV{SHELL}, "-c", $command) || + ::die_bug("open3-gensym"); + 1; + }; } - $job->set_pid($pid); - $job->set_starttime(); - if($::opt_timeout) { - # Timeout must be set before inserting into queue - $job->set_timeout($::opt_timeout); - $Global::timeoutq->insert($job); + if($pid) { + # A job was started + $Global::total_running++; + $Global::total_started++; + $job->set_pid($pid); + $job->set_starttime(); + if($::opt_timeout) { + # Timeout must be set before inserting into queue + $job->set_timeout($::opt_timeout); + $Global::timeoutq->insert($job); + } + return $job; + } else { + # No more processes + ::debug("Cannot spawn more jobs.\n"); + return undef; } - return $job; } sub is_already_in_joblog { @@ -5251,7 +5287,7 @@ sub mkdir_or_die { } if(not -w $dir) { print STDERR "$Global::progname: Cannot write to $dir: $!\n"; - exit(255); + ::wait_and_exit(255); } } diff --git a/testsuite/tests-to-run/parallel-local9.sh b/testsuite/tests-to-run/parallel-local9.sh index 99f0c783..50933d1d 100644 --- a/testsuite/tests-to-run/parallel-local9.sh +++ b/testsuite/tests-to-run/parallel-local9.sh @@ -97,13 +97,13 @@ nice nice perl -e '@x=1 .. 17000; for(1..100) { print "@x\n"}' | pv -qL 1000000 nice nice perl -e '@x=1 .. 17000; for(1..100) { print "@x\n"}' | pv -qL 1000000 | $PAR --recend '' --files --halt-on-error 2 cat | parallel -Xj1 cat {} ';' rm {} | md5sum -echo '### Test of -j filename'; +echo '### Test of -j filename - non-existent file'; nice stdout parallel -j no_such_file echo ::: 1 echo '### Test of -j filename'; echo 3 >/tmp/jobs_to_run1; - parallel -j /tmp/jobs_to_run1 -v sleep 0.{} ::: 9 8 7 6 4; - # Should give 7 8 9 4 6 + parallel -j /tmp/jobs_to_run1 -v sleep 0.{} ::: 9 7 5 3 1; + # Should give 0.5 0.7 0.9 0.1 0.3 echo '### Test ::::' echo '### Change --arg-file-sep' diff --git a/testsuite/tests-to-run/test23.sh b/testsuite/tests-to-run/test23.sh index 656896cb..924f61a0 100644 --- a/testsuite/tests-to-run/test23.sh +++ b/testsuite/tests-to-run/test23.sh @@ -1,6 +1,5 @@ #!/bin/bash -PAR=parallel SERVER1=parallel-server1 SERVER2=parallel-server2 SSHLOGIN1=parallel@$SERVER1 @@ -35,7 +34,7 @@ echo "### Test combined -X --return {/}_{/.}_{#/.}_{#/}_{#.} with files containi stdout parallel -j1 -k -Xv --cleanup --return tmp/{/}_{/.}_{2/.}_{2/}_{2.}/file -S $SSHLOGIN2 \ mkdir -p tmp/{/}_{/.}_{2/.}_{2/}_{2.} \;touch tmp/{/}_{/.}_{2/.}_{2/}_{2.}/file \ ::: /a/number1.c a/number2.c number3.c /a/number4 a/number5 number6 'number 7' 'number <8|8>' -find tmp +find tmp | sort rm -rf tmp echo "### Here we ought to test -m --return {/}_{/.}_{#/.}_{#/}_{#.} with files containing space" diff --git a/testsuite/tests-to-run/test30.sh b/testsuite/tests-to-run/test30.sh index 89eb05f5..cd71cb31 100644 --- a/testsuite/tests-to-run/test30.sh +++ b/testsuite/tests-to-run/test30.sh @@ -17,7 +17,7 @@ stdout parallel --progress "sleep 1; echo {}" < /dev/null echo '### bug #34422: parallel -X --eta crashes with div by zero' seq 2 | stdout parallel -X --eta echo -echo '### --timeout on remote machines' +echo '### --timeout --onall on remote machines: 2*slept 1, 2 jobs failed' parallel -j0 --timeout 6 --onall -S localhost,parallel@parallel-server1 'sleep {}; echo slept {}' ::: 1 8 9 ; echo jobs failed: $? echo '### --pipe without command' diff --git a/testsuite/wanted-results/parallel-local9 b/testsuite/wanted-results/parallel-local9 index 71d41549..a8b97c1e 100644 --- a/testsuite/wanted-results/parallel-local9 +++ b/testsuite/wanted-results/parallel-local9 @@ -57,7 +57,7 @@ b 350eda13a37912d755c9d733d149bdaf - 350eda13a37912d755c9d733d149bdaf - 350eda13a37912d755c9d733d149bdaf - -### Test of -j filename +### Test of -j filename - non-existent file Usage: parallel [options] [command [arguments]] < list_of_arguments parallel [options] [command [arguments]] (::: arguments|:::: argfile(s))... @@ -88,11 +88,11 @@ O. Tange (2011): GNU Parallel - The Command-Line Power Tool, ;login: The USENIX Magazine, February 2011:42-47. Parsing of --jobs/-j/--max-procs/-P failed ### Test of -j filename +sleep 0.5 sleep 0.7 -sleep 0.8 sleep 0.9 -sleep 0.4 -sleep 0.6 +sleep 0.1 +sleep 0.3 ### Test :::: ### Change --arg-file-sep 1 5 diff --git a/testsuite/wanted-results/test23 b/testsuite/wanted-results/test23 index 3a2aadd6..c6494130 100644 --- a/testsuite/wanted-results/test23 +++ b/testsuite/wanted-results/test23 @@ -48,29 +48,29 @@ OK parallel: Warning: using -X or -m with --sshlogin may fail mkdir -p tmp/number1.c_number1_number2_number2.c_a/number2 tmp/number2.c_number2_number2_number2.c_a/number2 tmp/number3.c_number3_number2_number2.c_a/number2 tmp/number4_number4_number2_number2.c_a/number2 tmp/number5_number5_number2_number2.c_a/number2 tmp/number6_number6_number2_number2.c_a/number2 tmp/number\ 7_number\ 7_number2_number2.c_a/number2 tmp/number\ \<8\|8\>_number\ \<8\|8\>_number2_number2.c_a/number2 ;touch tmp/number1.c_number1_number2_number2.c_a/number2/file tmp/number2.c_number2_number2_number2.c_a/number2/file tmp/number3.c_number3_number2_number2.c_a/number2/file tmp/number4_number4_number2_number2.c_a/number2/file tmp/number5_number5_number2_number2.c_a/number2/file tmp/number6_number6_number2_number2.c_a/number2/file tmp/number\ 7_number\ 7_number2_number2.c_a/number2/file tmp/number\ \<8\|8\>_number\ \<8\|8\>_number2_number2.c_a/number2/file tmp -tmp/number6_number6_number2_number2.c_a -tmp/number6_number6_number2_number2.c_a/number2 -tmp/number6_number6_number2_number2.c_a/number2/file -tmp/number5_number5_number2_number2.c_a -tmp/number5_number5_number2_number2.c_a/number2 -tmp/number5_number5_number2_number2.c_a/number2/file -tmp/number4_number4_number2_number2.c_a -tmp/number4_number4_number2_number2.c_a/number2 -tmp/number4_number4_number2_number2.c_a/number2/file -tmp/number3.c_number3_number2_number2.c_a -tmp/number3.c_number3_number2_number2.c_a/number2 -tmp/number3.c_number3_number2_number2.c_a/number2/file -tmp/number2.c_number2_number2_number2.c_a -tmp/number2.c_number2_number2_number2.c_a/number2 -tmp/number2.c_number2_number2_number2.c_a/number2/file -tmp/number1.c_number1_number2_number2.c_a -tmp/number1.c_number1_number2_number2.c_a/number2 -tmp/number1.c_number1_number2_number2.c_a/number2/file -tmp/number <8|8>_number <8|8>_number2_number2.c_a -tmp/number <8|8>_number <8|8>_number2_number2.c_a/number2 -tmp/number <8|8>_number <8|8>_number2_number2.c_a/number2/file tmp/number 7_number 7_number2_number2.c_a tmp/number 7_number 7_number2_number2.c_a/number2 tmp/number 7_number 7_number2_number2.c_a/number2/file +tmp/number <8|8>_number <8|8>_number2_number2.c_a +tmp/number <8|8>_number <8|8>_number2_number2.c_a/number2 +tmp/number <8|8>_number <8|8>_number2_number2.c_a/number2/file +tmp/number1.c_number1_number2_number2.c_a +tmp/number1.c_number1_number2_number2.c_a/number2 +tmp/number1.c_number1_number2_number2.c_a/number2/file +tmp/number2.c_number2_number2_number2.c_a +tmp/number2.c_number2_number2_number2.c_a/number2 +tmp/number2.c_number2_number2_number2.c_a/number2/file +tmp/number3.c_number3_number2_number2.c_a +tmp/number3.c_number3_number2_number2.c_a/number2 +tmp/number3.c_number3_number2_number2.c_a/number2/file +tmp/number4_number4_number2_number2.c_a +tmp/number4_number4_number2_number2.c_a/number2 +tmp/number4_number4_number2_number2.c_a/number2/file +tmp/number5_number5_number2_number2.c_a +tmp/number5_number5_number2_number2.c_a/number2 +tmp/number5_number5_number2_number2.c_a/number2/file +tmp/number6_number6_number2_number2.c_a +tmp/number6_number6_number2_number2.c_a/number2 +tmp/number6_number6_number2_number2.c_a/number2/file ### Here we ought to test -m --return {/}_{/.}_{#/.}_{#/}_{#.} with files containing space ### But we will wait for a real world scenario diff --git a/testsuite/wanted-results/test30 b/testsuite/wanted-results/test30 index 59fc6f25..e8d97ea6 100644 --- a/testsuite/wanted-results/test30 +++ b/testsuite/wanted-results/test30 @@ -20,7 +20,8 @@ Computers / CPU cores / Max jobs to run Computer:jobs running/jobs completed/%of started jobs/Average seconds to complete local:1/0/100%/0.0s -### --timeout on remote machines +### --timeout --onall on remote machines: 2*slept 1, 2 jobs failed +slept 1 slept 1 jobs failed: 2 ### --pipe without command