parallel: implemented -j file. Passes testsuite.

This commit is contained in:
Ole Tange 2010-10-06 22:18:32 +02:00
parent 44bdda5387
commit cce81b5735
4 changed files with 151 additions and 39 deletions

View file

@ -1,13 +1,7 @@
== Change number of jobs while running ==
Read -j from file. If the file is changed when a job completes the
file is re-read and the new number of jobs computed. If the number is
lower than before currently running jobs will be allowed to finish but
new jobs will not be started.
== Bug == == Bug ==
(echo ; echo abc ; echo abc; echo ; echo bbc) | parallel --colsep b -v echo {1}{2} (echo ; echo abc ; echo abc; echo ; echo bbc) | parallel --colsep b -v echo {1}{2}
(echo ; echo abc ; echo abc; echo ; echo bbc) | parallel --colsep d -v echo {1}{2}
== SQL == == SQL ==

View file

@ -413,6 +413,24 @@ If the evaluated number is less than 1 then 1 will be used. See also
B<--use-cpus-instead-of-cores>. B<--use-cpus-instead-of-cores>.
=item B<--jobs> I<procfile> (beta test)
=item B<-j> I<procfile> (beta test)
=item B<--max-procs> I<procfile> (beta test)
=item B<-P> I<procfile> (beta test)
Read parameter from file. Use the content of I<procfile> as parameter
for I<-j>. E.g. I<procfile> could contain the string 100% or +2 or
10. If I<procfile> is changed when a job completes, I<procfile> is
read again and the new number of jobs is computed. If the number is
lower than before, running jobs will be allowed to finish but new jobs
will not be started until the wanted number of jobs has been reached.
This makes it possible to change the number of simultaneous running
jobs while GNU B<parallel> is running.
=item B<--keeporder> =item B<--keeporder>
=item B<-k> =item B<-k>
@ -1590,21 +1608,6 @@ functionality without sacrifying ease of use.
The following features are in some of the comparable tools: The following features are in some of the comparable tools:
Execution
E1. Running jobs in parallel
E2. List running jobs
E3. Finish running jobs, but do not start new jobs
E4. Number of running jobs can depend on number of cpus
E5. Finish running jobs, but do not start new jobs after first failure
Outputs
O1. Grouping output so output from different jobs do not mix
O2. Send stderr to stderr
O3. Send stdout to stdout
O4. Order of output can be same as order of input
O5. Stdout only contains stdout from the command
O6. Stderr only contains stdout from the command
Inputs Inputs
I1. Arguments can be read from stdin I1. Arguments can be read from stdin
I2. Arguments can be read from a file I2. Arguments can be read from a file
@ -1622,6 +1625,22 @@ Manipulation of input
M5. Arguments can be replaced with context M5. Arguments can be replaced with context
M6. Input can be treated as complete execution line M6. Input can be treated as complete execution line
Outputs
O1. Grouping output so output from different jobs do not mix
O2. Send stderr to stderr
O3. Send stdout to stdout
O4. Order of output can be same as order of input
O5. Stdout only contains stdout from the command
O6. Stderr only contains stdout from the command
Execution
E1. Running jobs in parallel
E2. List running jobs
E3. Finish running jobs, but do not start new jobs
E4. Number of running jobs can depend on number of cpus
E5. Finish running jobs, but do not start new jobs after first failure
E6. Number of running jobs can be adjusted while running
Remote execution Remote execution
R1. Jobs can be run on remote computers R1. Jobs can be run on remote computers
R2. Basefiles can be transferred R2. Basefiles can be transferred
@ -1646,23 +1665,53 @@ As every new version of the programs are not tested the table may be
outdated. Please file a bug-report if you find errors (See REPORTING outdated. Please file a bug-report if you find errors (See REPORTING
BUGS). BUGS).
parallel: E1 E2 E3 E4 E5 O1 O2 O3 O4 O5 O6 I1 I2 I3 I4 I5 I6 I7 M1 M2 parallel:
M3 M4 M5 M6 R1 R2 R3 R4 R5 R6 R7 R8 R9 S1 S2 I1 I2 I3 I4 I5 I6 I7
M1 M2 M3 M4 M5 M6
O1 O2 O3 O4 O5 O6
E1 E2 E3 E4 E5 E6
R1 R2 R3 R4 R5 R6 R7 R8 R9
S1 S2
xargs: E1 - - - - - O2 O3 - O5 O6 I1 I2 - - - - - - M2 M3 - xargs:
- - - - - - - x - - - - I1 I2 - - - - -
- M2 M3 - - -
- O2 O3 - O5 O6
E1 - - - - -
- - - - - x - - -
- -
find -exec: - - - x - x O2 O3 O4 O5 O6 - - - - - - - - M2 M3 - find -exec:
- - - - - - - - - - x x - - - x - x -
- M2 M3 - - - -
- O2 O3 O4 O5 O6
- - - - - - -
- - - - - - - - -
x x
make -j: E1 - - - E5 O1 O2 O3 - x O6 - - - - - - - - - - - - - - - - - - make -j:
- - - - - - - - - - -
- - - - - -
O1 O2 O3 - x O6
E1 - - - E5 -
- - - - - - - - -
- -
ppss: E1 E2 ?E3 E4 - O1 - - x - - I1 I2 - - - - I7 M1 - M3 - - M6 R1 R2 ppss:
R3 R4 - - ?R7 ? ? - - I1 I2 - - - - I7
M1 - M3 - - M6
O1 - - x - -
E1 E2 ?E3 E4 - -
R1 R2 R3 R4 - - ?R7 ? ?
- -
pexec: E1 - - E4 - O1 O2 O3 - O5 O6 I1 I2 - I4 I5 - - M1 - M3 - - M6 R1 pexec:
- - - - R6 - - - S1 - I1 I2 - I4 I5 - -
M1 - M3 - - M6
O1 O2 O3 - O5 O6
E1 - - E4 - E6
R1 - - - - R6 - - -
S1 -
xjobs: TODO - Please file a bug-report if you know what features xjobs xjobs: TODO - Please file a bug-report if you know what features xjobs
supports (See REPORTING BUGS). supports (See REPORTING BUGS).
@ -2590,10 +2639,7 @@ sub parse_options {
# Needs to be done after setting $Global::command and $Global::command_line_max_len # Needs to be done after setting $Global::command and $Global::command_line_max_len
# as '-m' influences the number of commands that needs to be run # as '-m' influences the number of commands that needs to be run
if(defined $::opt_P) { if(defined $::opt_P) {
for my $sshlogin (keys %Global::host) { compute_number_of_processes_for_sshlogins();
$Global::host{$sshlogin}{'max_no_of_running'} =
compute_number_of_processes($::opt_P,$sshlogin);
}
} else { } else {
for my $sshlogin (keys %Global::host) { for my $sshlogin (keys %Global::host) {
$Global::host{$sshlogin}{'max_no_of_running'} = $Global::host{$sshlogin}{'max_no_of_running'} =
@ -3102,6 +3148,13 @@ sub is_acceptable_command_line_length {
# Number of parallel processes to run # Number of parallel processes to run
sub compute_number_of_processes_for_sshlogins {
for my $sshlogin (keys %Global::host) {
$Global::host{$sshlogin}{'max_no_of_running'} =
compute_number_of_processes($::opt_P,$sshlogin);
}
}
sub compute_number_of_processes { sub compute_number_of_processes {
# Number of processes wanted and limited by system ressources # Number of processes wanted and limited by system ressources
# Returns: # Returns:
@ -3203,7 +3256,7 @@ sub processes_available_by_system_limit {
for (values %fh) { close $_ } for (values %fh) { close $_ }
# Cleanup: Kill the children # Cleanup: Kill the children
for my $pid (@children) { for my $pid (@children) {
kill 15, $pid; kill 9, $pid;
waitpid($pid,0); waitpid($pid,0);
} }
wait(); wait();
@ -3311,7 +3364,19 @@ sub user_requested_processes {
# -P 0 = infinity (or at least close) # -P 0 = infinity (or at least close)
$processes = 2**31; $processes = 2**31;
} }
} elsif (-f $opt_P) {
$Global::max_procs_file = $opt_P;
$Global::max_procs_file_last_mod = (stat($Global::max_procs_file))[9];
if(open(IN, $Global::max_procs_file)) {
my $opt_P_file = join("",<IN>);
close IN;
$processes = user_requested_processes($opt_P_file);
} else { } else {
print STDERR "Cannot open $opt_P\n";
exit(255);
}
} else {
print STDERR "Parsing of --jobs/-j/--max-procs/-P failed\n";
die_usage(); die_usage();
} }
if($processes < 1) { if($processes < 1) {
@ -3809,6 +3874,13 @@ sub start_more_jobs {
# number of jobs started # number of jobs started
my $jobs_started = 0; my $jobs_started = 0;
if(not $Global::start_no_new_jobs) { if(not $Global::start_no_new_jobs) {
if($Global::max_procs_file) {
my $mtime = (stat($Global::max_procs_file))[9];
if($mtime > $Global::max_procs_file_last_mod) {
$Global::max_procs_file_last_mod = $mtime;
compute_number_of_processes_for_sshlogins();
}
}
for my $sshlogin (keys %Global::host) { for my $sshlogin (keys %Global::host) {
debug("Running jobs on $sshlogin: $Global::host{$sshlogin}{'no_of_running'}\n"); debug("Running jobs on $sshlogin: $Global::host{$sshlogin}{'no_of_running'}\n");
while ($Global::host{$sshlogin}{'no_of_running'} < while ($Global::host{$sshlogin}{'no_of_running'} <
@ -3998,6 +4070,7 @@ sub print_job {
close $out; close $out;
close $err; close $err;
} }
sub __READING_AND_QUOTING_ARGUMENTS__ {} sub __READING_AND_QUOTING_ARGUMENTS__ {}
sub get_command_line_with_sshlogin { sub get_command_line_with_sshlogin {
@ -4218,6 +4291,7 @@ sub unget_arg {
} }
sub __REMOTE_SSH__ {} sub __REMOTE_SSH__ {}
sub read_sshloginfile { sub read_sshloginfile {
# Returns: N/A # Returns: N/A
my $file = shift; my $file = shift;

View file

@ -0,0 +1,13 @@
#!/bin/bash
echo '### Test of -j filename'
stdout parallel -j no_such_file echo ::: 1
echo '### Test of -j filename'
echo 3 >/tmp/jobs_to_run
parallel -j /tmp/jobs_to_run -v sleep 0.{} ::: 9 8 7 6 5
# Should give 7 8 9 5 6
echo '### Test of -j filename with file content changing'
(echo 1 >/tmp/jobs_to_run; sleep 3; echo 10 >/tmp/jobs_to_run) &
parallel -j /tmp/jobs_to_run -v sleep {} ::: 3.3 1.1 1.3 1.4 1.2 1 1 1 1 1 1 1 1 1 1 1

View file

@ -0,0 +1,31 @@
### Test of -j filename
Parsing of --jobs/-j/--max-procs/-P failed
Usage:
parallel [options] [command [arguments]] < list_of_arguments)
parallel [options] [command [arguments]] ::: arguments
parallel [options] [command [arguments]] :::: argfile(s)
See 'man parallel' for the options
### Test of -j filename
sleep 0.7
sleep 0.8
sleep 0.9
sleep 0.5
sleep 0.6
### Test of -j filename with file content changing
sleep 3.3
sleep 1
sleep 1
sleep 1
sleep 1
sleep 1
sleep 1
sleep 1.1
sleep 1.2
sleep 1.3
sleep 1.4
sleep 1
sleep 1
sleep 1
sleep 1
sleep 1