Better support for limited number of processes in process table.

This commit is contained in:
Ole Tange 2012-08-07 00:52:08 +02:00
parent 92b9592f1d
commit ed6903976b
7 changed files with 115 additions and 79 deletions

View file

@ -408,7 +408,7 @@ sub write_record_to_pipe {
$job->write(\$record); $job->write(\$record);
my $fh = $job->stdin(); my $fh = $job->stdin();
close $fh; close $fh;
exit; exit(0);
} }
my $fh = $job->stdin(); my $fh = $job->stdin();
close $fh; close $fh;
@ -584,7 +584,7 @@ sub get_options_from_array {
sub parse_options { sub parse_options {
# Returns: N/A # Returns: N/A
# Defaults: # Defaults:
$Global::version = 20120722; $Global::version = 20120806;
$Global::progname = 'parallel'; $Global::progname = 'parallel';
$Global::infinity = 2**31; $Global::infinity = 2**31;
$Global::debug = 0; $Global::debug = 0;
@ -1220,8 +1220,20 @@ sub start_another_job {
debug("Started as seq ",$job->seq()," pid:",$job->pid(),"\n"); debug("Started as seq ",$job->seq()," pid:",$job->pid(),"\n");
return 1; return 1;
} else { } else {
# If interactive says: Dont run the job, then skip it and run the next # Not enough processes to run the job.
return start_another_job($sshlogin); # Put it back on the queue.
$Global::JobQueue->unget($job);
# Count down the number of jobs to run for this SSHLogin.
my $max = $sshlogin->max_jobs_running();
if($max > 1) { $max--; }
$sshlogin->set_max_jobs_running($max);
# Sleep up to 300 ms to give other processes time to die
::usleep(rand()*300);
print $Global::original_stderr
("parallel: Warning: No more processes: ".
"Decreasing number of running jobs to $max. ".
"Raising ulimit -u may help.\n");
return 0;
} }
} }
} else { } else {
@ -1548,7 +1560,7 @@ sub read_sshloginfile {
} else { } else {
if(not open(IN, $file)) { if(not open(IN, $file)) {
print $Global::original_stderr "Cannot open $file\n"; print $Global::original_stderr "Cannot open $file\n";
exit(255); ::wait_and_exit(255);
} }
} }
while(<IN>) { while(<IN>) {
@ -2362,7 +2374,7 @@ sub compute_max_loadavg {
$load = $self->compute_max_loadavg($opt_load_file); $load = $self->compute_max_loadavg($opt_load_file);
} else { } else {
print $Global::original_stderr "Cannot open $loadspec\n"; print $Global::original_stderr "Cannot open $loadspec\n";
exit(255); ::wait_and_exit(255);
} }
} else { } else {
print $Global::original_stderr "Parsing of --load failed\n"; print $Global::original_stderr "Parsing of --load failed\n";
@ -2407,11 +2419,6 @@ sub compute_number_of_processes {
::debug("Wanted procs: $wanted_processes\n"); ::debug("Wanted procs: $wanted_processes\n");
my $system_limit = my $system_limit =
$self->processes_available_by_system_limit($wanted_processes); $self->processes_available_by_system_limit($wanted_processes);
if($system_limit < 1) {
print STDERR "$Global::progname: Cannot spawn more jobs. ",
"Raising ulimit -u may help.\n";
exit(255);
}
::debug("Limited to procs: $system_limit\n"); ::debug("Limited to procs: $system_limit\n");
return $system_limit; return $system_limit;
} }
@ -2537,20 +2544,6 @@ sub processes_available_by_system_limit {
$slow_spawining_warning_printed = 1; $slow_spawining_warning_printed = 1;
} }
} }
if($system_limit < $wanted_processes and not $more_filehandles) {
print $Global::original_stderr
("parallel: Warning: Only enough filehandles to run ",
$system_limit, " jobs in parallel. ",
"Raising ulimit -n may help.\n");
}
if($system_limit < $wanted_processes and $max_system_proc_reached) {
print $Global::original_stderr
("parallel: Warning: Only enough available processes to run ",
$system_limit, " jobs in parallel.\n");
}
if($Global::JobQueue->empty()) {
$system_limit ||= 1;
}
# Cleanup: Close the files # Cleanup: Close the files
for (values %fh) { close $_ } for (values %fh) { close $_ }
# Cleanup: Kill the children # Cleanup: Kill the children
@ -2559,10 +2552,31 @@ sub processes_available_by_system_limit {
waitpid($pid,0); waitpid($pid,0);
delete $Global::unkilled_children{$pid}; delete $Global::unkilled_children{$pid};
} }
#wait();
# Cleanup: Unget the command_lines or the @args # Cleanup: Unget the command_lines or the @args
$Global::JobQueue->{'commandlinequeue'}->{'arg_queue'}->unget(@args); $Global::JobQueue->{'commandlinequeue'}->{'arg_queue'}->unget(@args);
$Global::JobQueue->unget(@jobs); $Global::JobQueue->unget(@jobs);
if($system_limit < $wanted_processes) {
# The system_limit is less than the wanted_processes
if($system_limit < 1 and not $Global::JobQueue->empty()) {
print $Global::original_stderr "$Global::progname: Cannot spawn any jobs. ",
"Raising ulimit -u may help.\n";
::wait_and_exit(255);
}
if(not $more_filehandles) {
print $Global::original_stderr
("parallel: Warning: Only enough filehandles to run ",
$system_limit, " jobs in parallel. ",
"Raising ulimit -n may help.\n");
}
if($max_system_proc_reached) {
print $Global::original_stderr
("parallel: Warning: Only enough available processes to run ",
$system_limit, " jobs in parallel. Raising ulimit -u may help.\n");
}
}
if($Global::JobQueue->empty()) {
$system_limit ||= 1;
}
if($self->string() ne ":" and if($self->string() ne ":" and
$system_limit > $Global::default_simultaneous_sshlogins) { $system_limit > $Global::default_simultaneous_sshlogins) {
$system_limit = $system_limit =
@ -2659,7 +2673,7 @@ sub user_requested_processes {
$processes = $self->user_requested_processes($opt_P_file); $processes = $self->user_requested_processes($opt_P_file);
} else { } else {
print $Global::original_stderr "Cannot open $opt_P\n"; print $Global::original_stderr "Cannot open $opt_P\n";
exit(255); ::wait_and_exit(255);
} }
} else { } else {
print $Global::original_stderr "Parsing of --jobs/-j/--max-procs/-P failed\n"; print $Global::original_stderr "Parsing of --jobs/-j/--max-procs/-P failed\n";
@ -3663,24 +3677,30 @@ sub start {
if($::opt_dryrun) { if($::opt_dryrun) {
$command = "true"; $command = "true";
} }
$Global::total_running++;
$Global::total_started++;
$ENV{'PARALLEL_SEQ'} = $job->seq(); $ENV{'PARALLEL_SEQ'} = $job->seq();
$ENV{'PARALLEL_PID'} = $$; $ENV{'PARALLEL_PID'} = $$;
::debug("$Global::total_running processes. Starting (" ::debug("$Global::total_running processes. Starting ("
. $job->seq() . "): $command\n"); . $job->seq() . "): $command\n");
if($::opt_pipe) { if($::opt_pipe) {
my ($in); my ($in);
# The eval is needed to catch exception from open3
eval {
$pid = ::open3($in, ">&OUT", ">&ERR", $ENV{SHELL}, "-c", $command) || $pid = ::open3($in, ">&OUT", ">&ERR", $ENV{SHELL}, "-c", $command) ||
::die_bug("open3-pipe"); ::die_bug("open3-pipe");
1;
};
$job->set_stdin($in); $job->set_stdin($in);
} elsif(@::opt_a and not $Global::stdin_in_opt_a and $job->seq() == 1 } elsif(@::opt_a and not $Global::stdin_in_opt_a and $job->seq() == 1
and $job->sshlogin()->string() eq ":") { and $job->sshlogin()->string() eq ":") {
# Give STDIN to the first job if using -a (but only if running # Give STDIN to the first job if using -a (but only if running
# locally - otherwise CTRL-C does not work for other jobs Bug#36585) # locally - otherwise CTRL-C does not work for other jobs Bug#36585)
*IN = *STDIN; *IN = *STDIN;
# The eval is needed to catch exception from open3
eval {
$pid = ::open3("<&IN", ">&OUT", ">&ERR", $ENV{SHELL}, "-c", $command) || $pid = ::open3("<&IN", ">&OUT", ">&ERR", $ENV{SHELL}, "-c", $command) ||
::die_bug("open3-a"); ::die_bug("open3-a");
1;
};
# Re-open to avoid complaining # Re-open to avoid complaining
open STDIN, "<&", $Global::original_stdin open STDIN, "<&", $Global::original_stdin
or ::die_bug("dup-\$Global::original_stdin: $!"); or ::die_bug("dup-\$Global::original_stdin: $!");
@ -3688,14 +3708,25 @@ sub start {
open(DEVTTY, "/dev/tty")) { open(DEVTTY, "/dev/tty")) {
# Give /dev/tty to the command if no one else is using it # Give /dev/tty to the command if no one else is using it
*IN = *DEVTTY; *IN = *DEVTTY;
# The eval is needed to catch exception from open3
eval {
$pid = ::open3("<&IN", ">&OUT", ">&ERR", $ENV{SHELL}, "-c", $command) || $pid = ::open3("<&IN", ">&OUT", ">&ERR", $ENV{SHELL}, "-c", $command) ||
::die_bug("open3-/dev/tty"); ::die_bug("open3-/dev/tty");
$Global::tty_taken = $pid; $Global::tty_taken = $pid;
close DEVTTY; close DEVTTY;
1;
};
} else { } else {
eval {
$pid = ::open3(::gensym, ">&OUT", ">&ERR", $ENV{SHELL}, "-c", $command) || $pid = ::open3(::gensym, ">&OUT", ">&ERR", $ENV{SHELL}, "-c", $command) ||
::die_bug("open3-gensym"); ::die_bug("open3-gensym");
1;
};
} }
if($pid) {
# A job was started
$Global::total_running++;
$Global::total_started++;
$job->set_pid($pid); $job->set_pid($pid);
$job->set_starttime(); $job->set_starttime();
if($::opt_timeout) { if($::opt_timeout) {
@ -3704,6 +3735,11 @@ sub start {
$Global::timeoutq->insert($job); $Global::timeoutq->insert($job);
} }
return $job; return $job;
} else {
# No more processes
::debug("Cannot spawn more jobs.\n");
return undef;
}
} }
sub is_already_in_joblog { sub is_already_in_joblog {
@ -5251,7 +5287,7 @@ sub mkdir_or_die {
} }
if(not -w $dir) { if(not -w $dir) {
print STDERR "$Global::progname: Cannot write to $dir: $!\n"; print STDERR "$Global::progname: Cannot write to $dir: $!\n";
exit(255); ::wait_and_exit(255);
} }
} }

View file

@ -97,13 +97,13 @@ nice nice perl -e '@x=1 .. 17000; for(1..100) { print "@x\n"}' | pv -qL 1000000
nice nice perl -e '@x=1 .. 17000; for(1..100) { print "@x\n"}' | pv -qL 1000000 | nice nice perl -e '@x=1 .. 17000; for(1..100) { print "@x\n"}' | pv -qL 1000000 |
$PAR --recend '' --files --halt-on-error 2 cat | parallel -Xj1 cat {} ';' rm {} | md5sum $PAR --recend '' --files --halt-on-error 2 cat | parallel -Xj1 cat {} ';' rm {} | md5sum
echo '### Test of -j filename'; echo '### Test of -j filename - non-existent file';
nice stdout parallel -j no_such_file echo ::: 1 nice stdout parallel -j no_such_file echo ::: 1
echo '### Test of -j filename'; echo '### Test of -j filename';
echo 3 >/tmp/jobs_to_run1; echo 3 >/tmp/jobs_to_run1;
parallel -j /tmp/jobs_to_run1 -v sleep 0.{} ::: 9 8 7 6 4; parallel -j /tmp/jobs_to_run1 -v sleep 0.{} ::: 9 7 5 3 1;
# Should give 7 8 9 4 6 # Should give 0.5 0.7 0.9 0.1 0.3
echo '### Test ::::' echo '### Test ::::'
echo '### Change --arg-file-sep' echo '### Change --arg-file-sep'

View file

@ -1,6 +1,5 @@
#!/bin/bash #!/bin/bash
PAR=parallel
SERVER1=parallel-server1 SERVER1=parallel-server1
SERVER2=parallel-server2 SERVER2=parallel-server2
SSHLOGIN1=parallel@$SERVER1 SSHLOGIN1=parallel@$SERVER1
@ -35,7 +34,7 @@ echo "### Test combined -X --return {/}_{/.}_{#/.}_{#/}_{#.} with files containi
stdout parallel -j1 -k -Xv --cleanup --return tmp/{/}_{/.}_{2/.}_{2/}_{2.}/file -S $SSHLOGIN2 \ stdout parallel -j1 -k -Xv --cleanup --return tmp/{/}_{/.}_{2/.}_{2/}_{2.}/file -S $SSHLOGIN2 \
mkdir -p tmp/{/}_{/.}_{2/.}_{2/}_{2.} \;touch tmp/{/}_{/.}_{2/.}_{2/}_{2.}/file \ mkdir -p tmp/{/}_{/.}_{2/.}_{2/}_{2.} \;touch tmp/{/}_{/.}_{2/.}_{2/}_{2.}/file \
::: /a/number1.c a/number2.c number3.c /a/number4 a/number5 number6 'number 7' 'number <8|8>' ::: /a/number1.c a/number2.c number3.c /a/number4 a/number5 number6 'number 7' 'number <8|8>'
find tmp find tmp | sort
rm -rf tmp rm -rf tmp
echo "### Here we ought to test -m --return {/}_{/.}_{#/.}_{#/}_{#.} with files containing space" echo "### Here we ought to test -m --return {/}_{/.}_{#/.}_{#/}_{#.} with files containing space"

View file

@ -17,7 +17,7 @@ stdout parallel --progress "sleep 1; echo {}" < /dev/null
echo '### bug #34422: parallel -X --eta crashes with div by zero' echo '### bug #34422: parallel -X --eta crashes with div by zero'
seq 2 | stdout parallel -X --eta echo seq 2 | stdout parallel -X --eta echo
echo '### --timeout on remote machines' echo '### --timeout --onall on remote machines: 2*slept 1, 2 jobs failed'
parallel -j0 --timeout 6 --onall -S localhost,parallel@parallel-server1 'sleep {}; echo slept {}' ::: 1 8 9 ; echo jobs failed: $? parallel -j0 --timeout 6 --onall -S localhost,parallel@parallel-server1 'sleep {}; echo slept {}' ::: 1 8 9 ; echo jobs failed: $?
echo '### --pipe without command' echo '### --pipe without command'

View file

@ -57,7 +57,7 @@ b
350eda13a37912d755c9d733d149bdaf - 350eda13a37912d755c9d733d149bdaf -
350eda13a37912d755c9d733d149bdaf - 350eda13a37912d755c9d733d149bdaf -
350eda13a37912d755c9d733d149bdaf - 350eda13a37912d755c9d733d149bdaf -
### Test of -j filename ### Test of -j filename - non-existent file
Usage: Usage:
parallel [options] [command [arguments]] < list_of_arguments parallel [options] [command [arguments]] < list_of_arguments
parallel [options] [command [arguments]] (::: arguments|:::: argfile(s))... parallel [options] [command [arguments]] (::: arguments|:::: argfile(s))...
@ -88,11 +88,11 @@ O. Tange (2011): GNU Parallel - The Command-Line Power Tool,
;login: The USENIX Magazine, February 2011:42-47. ;login: The USENIX Magazine, February 2011:42-47.
Parsing of --jobs/-j/--max-procs/-P failed Parsing of --jobs/-j/--max-procs/-P failed
### Test of -j filename ### Test of -j filename
sleep 0.5
sleep 0.7 sleep 0.7
sleep 0.8
sleep 0.9 sleep 0.9
sleep 0.4 sleep 0.1
sleep 0.6 sleep 0.3
### Test :::: ### Test ::::
### Change --arg-file-sep ### Change --arg-file-sep
1 5 1 5

View file

@ -48,29 +48,29 @@ OK
parallel: Warning: using -X or -m with --sshlogin may fail parallel: Warning: using -X or -m with --sshlogin may fail
mkdir -p tmp/number1.c_number1_number2_number2.c_a/number2 tmp/number2.c_number2_number2_number2.c_a/number2 tmp/number3.c_number3_number2_number2.c_a/number2 tmp/number4_number4_number2_number2.c_a/number2 tmp/number5_number5_number2_number2.c_a/number2 tmp/number6_number6_number2_number2.c_a/number2 tmp/number\ 7_number\ 7_number2_number2.c_a/number2 tmp/number\ \<8\|8\>_number\ \<8\|8\>_number2_number2.c_a/number2 ;touch tmp/number1.c_number1_number2_number2.c_a/number2/file tmp/number2.c_number2_number2_number2.c_a/number2/file tmp/number3.c_number3_number2_number2.c_a/number2/file tmp/number4_number4_number2_number2.c_a/number2/file tmp/number5_number5_number2_number2.c_a/number2/file tmp/number6_number6_number2_number2.c_a/number2/file tmp/number\ 7_number\ 7_number2_number2.c_a/number2/file tmp/number\ \<8\|8\>_number\ \<8\|8\>_number2_number2.c_a/number2/file mkdir -p tmp/number1.c_number1_number2_number2.c_a/number2 tmp/number2.c_number2_number2_number2.c_a/number2 tmp/number3.c_number3_number2_number2.c_a/number2 tmp/number4_number4_number2_number2.c_a/number2 tmp/number5_number5_number2_number2.c_a/number2 tmp/number6_number6_number2_number2.c_a/number2 tmp/number\ 7_number\ 7_number2_number2.c_a/number2 tmp/number\ \<8\|8\>_number\ \<8\|8\>_number2_number2.c_a/number2 ;touch tmp/number1.c_number1_number2_number2.c_a/number2/file tmp/number2.c_number2_number2_number2.c_a/number2/file tmp/number3.c_number3_number2_number2.c_a/number2/file tmp/number4_number4_number2_number2.c_a/number2/file tmp/number5_number5_number2_number2.c_a/number2/file tmp/number6_number6_number2_number2.c_a/number2/file tmp/number\ 7_number\ 7_number2_number2.c_a/number2/file tmp/number\ \<8\|8\>_number\ \<8\|8\>_number2_number2.c_a/number2/file
tmp tmp
tmp/number6_number6_number2_number2.c_a
tmp/number6_number6_number2_number2.c_a/number2
tmp/number6_number6_number2_number2.c_a/number2/file
tmp/number5_number5_number2_number2.c_a
tmp/number5_number5_number2_number2.c_a/number2
tmp/number5_number5_number2_number2.c_a/number2/file
tmp/number4_number4_number2_number2.c_a
tmp/number4_number4_number2_number2.c_a/number2
tmp/number4_number4_number2_number2.c_a/number2/file
tmp/number3.c_number3_number2_number2.c_a
tmp/number3.c_number3_number2_number2.c_a/number2
tmp/number3.c_number3_number2_number2.c_a/number2/file
tmp/number2.c_number2_number2_number2.c_a
tmp/number2.c_number2_number2_number2.c_a/number2
tmp/number2.c_number2_number2_number2.c_a/number2/file
tmp/number1.c_number1_number2_number2.c_a
tmp/number1.c_number1_number2_number2.c_a/number2
tmp/number1.c_number1_number2_number2.c_a/number2/file
tmp/number <8|8>_number <8|8>_number2_number2.c_a
tmp/number <8|8>_number <8|8>_number2_number2.c_a/number2
tmp/number <8|8>_number <8|8>_number2_number2.c_a/number2/file
tmp/number 7_number 7_number2_number2.c_a tmp/number 7_number 7_number2_number2.c_a
tmp/number 7_number 7_number2_number2.c_a/number2 tmp/number 7_number 7_number2_number2.c_a/number2
tmp/number 7_number 7_number2_number2.c_a/number2/file tmp/number 7_number 7_number2_number2.c_a/number2/file
tmp/number <8|8>_number <8|8>_number2_number2.c_a
tmp/number <8|8>_number <8|8>_number2_number2.c_a/number2
tmp/number <8|8>_number <8|8>_number2_number2.c_a/number2/file
tmp/number1.c_number1_number2_number2.c_a
tmp/number1.c_number1_number2_number2.c_a/number2
tmp/number1.c_number1_number2_number2.c_a/number2/file
tmp/number2.c_number2_number2_number2.c_a
tmp/number2.c_number2_number2_number2.c_a/number2
tmp/number2.c_number2_number2_number2.c_a/number2/file
tmp/number3.c_number3_number2_number2.c_a
tmp/number3.c_number3_number2_number2.c_a/number2
tmp/number3.c_number3_number2_number2.c_a/number2/file
tmp/number4_number4_number2_number2.c_a
tmp/number4_number4_number2_number2.c_a/number2
tmp/number4_number4_number2_number2.c_a/number2/file
tmp/number5_number5_number2_number2.c_a
tmp/number5_number5_number2_number2.c_a/number2
tmp/number5_number5_number2_number2.c_a/number2/file
tmp/number6_number6_number2_number2.c_a
tmp/number6_number6_number2_number2.c_a/number2
tmp/number6_number6_number2_number2.c_a/number2/file
### Here we ought to test -m --return {/}_{/.}_{#/.}_{#/}_{#.} with files containing space ### Here we ought to test -m --return {/}_{/.}_{#/.}_{#/}_{#.} with files containing space
### But we will wait for a real world scenario ### But we will wait for a real world scenario

View file

@ -20,7 +20,8 @@ Computers / CPU cores / Max jobs to run
Computer:jobs running/jobs completed/%of started jobs/Average seconds to complete Computer:jobs running/jobs completed/%of started jobs/Average seconds to complete
local:1/0/100%/0.0s local:1/0/100%/0.0s
### --timeout on remote machines ### --timeout --onall on remote machines: 2*slept 1, 2 jobs failed
slept 1
slept 1 slept 1
jobs failed: 2 jobs failed: 2
### --pipe without command ### --pipe without command