Prepared for limiting to max proc (not only files)

This still does not work if -j 0 and #files_available > 2*#procs_available
So it fails one unittest
This commit is contained in:
Ole Tange 2009-02-18 02:57:38 +01:00
parent d5e29c73a2
commit 8202ef3a11
8 changed files with 209 additions and 29 deletions

151
parallel
View file

@ -323,7 +323,7 @@ use File::Temp qw/ tempfile tempdir /;
use Getopt::Std; use Getopt::Std;
my ($processes,$command); my ($processes,$command);
getopts("0cdefgj:qsuv") || die_usage(); getopts("0cdefgj:qsuvx") || die_usage();
# Defaults: # Defaults:
$Global::debug = 0; $Global::debug = 0;
@ -337,7 +337,10 @@ $/="\n";
$Global::debug = (defined $::opt_d); $Global::debug = (defined $::opt_d);
if(defined $::opt_j) { $processes = compute_number_of_processes($::opt_j); } if(defined $::opt_j) { $processes = compute_number_of_processes($::opt_j); }
if(defined $::opt_x) { $Global::xargs = 1; } if(defined $::opt_x) {
$Global::xargs = 1;
$Global::command_line_max_len = max_length_of_command_line();
}
if(defined $::opt_v) { $Global::verbose = 1; } if(defined $::opt_v) { $Global::verbose = 1; }
if(defined $::opt_s) { $Global::verbose = 0; } if(defined $::opt_s) { $Global::verbose = 0; }
if(defined $::opt_g) { $Global::grouped = 1; } if(defined $::opt_g) { $Global::grouped = 1; }
@ -356,11 +359,13 @@ if(@ARGV) {
init_run_jobs(); init_run_jobs();
DoNotReap(); DoNotReap();
while($Global::running_jobs < $processes while($Global::running_jobs < $processes
and and
start_another_job()) { start_another_job()) {
# skip # skip
} }
ReapIfNeeded(); ReapIfNeeded();
drain_job_queue(); drain_job_queue();
@ -370,28 +375,39 @@ drain_job_queue();
sub generate_command_line { sub generate_command_line {
my $command = shift; my $command = shift;
my @args = @_; my ($job_line,$last_good);
chomp(@args); my ($next_arg,@quoted_args,$arg_length);
if($Global::input_is_filename) { while ($next_arg = get_next_arg()) {
for my $arg (@args) { push (@quoted_args, $next_arg);
($arg) = (shell_quote($arg)); if(not $Global::xargs) {
last;
} else {
# Emulate xargs if there is a command and -x is set
$arg_length += length $next_arg + 1;
debug("arglen $arg_length\n");
$job_line_length = length($command) + 1 + $arg_length;
debug("linelen $job_line_length\n");
if($job_line_length >= $Global::command_line_max_len) {
unget_arg(pop @quoted_args);
if($quoted_args[0]) {
last;
} else {
die ("Command line too long at $next_arg");
}
}
} }
} }
my $line = join(" ",@args); if(@quoted_args) {
my ($job_line,$arg); my $arg=join(" ",@quoted_args);
if($command) {
$job_line = $command; $job_line = $command;
$arg = $line; if(defined $job_line and $job_line =~ s/{}/$arg/g) {
if($job_line =~ s/{}/$arg/g) {
# substituted {} with args # substituted {} with args
} else { } else {
# append args # append args
$job_line .= " $arg"; $job_line .= " $arg";
} }
} else { debug("Return jobline: $job_line\n");
$job_line = $line;
} }
return $job_line; return $job_line;
} }
@ -417,7 +433,38 @@ sub shell_quote {
# #
sub max_length_of_command_line { sub max_length_of_command_line {
# Find the max_length of a command line
# First find an upper bound
my $len = 2;
do {
$len += $len+1;
} while (acceptable_command_line_length($len));
# Then search for the actual max length between 0 and upper bound
return binary_find_max_length(0,$len);
}
sub binary_find_max_length {
# Given a lower and upper bound find the max_length of a command line
my ($lower, $upper) = (@_);
if($lower == $upper or $lower == $upper-1) { return $lower; }
my $middle = int (($upper-$lower)/2 + $lower);
$debug && print "$lower,$upper,$middle\n";
if (acceptable_command_line_length($middle)) {
return binary_find_max_length($middle,$upper);
} else {
return binary_find_max_length($lower,$middle);
}
}
sub acceptable_command_line_length {
# Test if this length can run
# This is done using external perl script to avoid warning
# (Can this be done prettier?)
my $len = shift;
my $testscript = q{'system ("true "."x"x$ARGV[0]); exit $?;'};
$debug && print "perl -e $testscript $len\n";
system "perl -e $testscript $len";
return not $?;
} }
sub compute_number_of_processes { sub compute_number_of_processes {
@ -448,6 +495,9 @@ sub compute_number_of_processes {
$processes = 1; $processes = 1;
} }
} }
# Have we asked for more processes than arguments?
$processes = min_of_args_and_processes($processes);
# Every simultaneous process uses 2 filehandles when grouping # Every simultaneous process uses 2 filehandles when grouping
# perl uses 7 for something? # perl uses 7 for something?
# parallel uses 1 for memory_usage # parallel uses 1 for memory_usage
@ -464,6 +514,46 @@ sub compute_number_of_processes {
return int $processes; return int $processes;
} }
sub min_of_args_and_processes {
my $processes = shift;
my $min_of_args_and_processes=0;
my @args=();
my $next_arg;
my $max_system_proc_reached=0;
my $time = time;
do {
$min_of_args_and_processes++;
$next_arg = get_next_arg();
if(defined $next_arg) {
push(@args, $next_arg);
}
$min_of_args_and_processes % 10 or $time=time;
# if($child = fork()) {
# push (@children,$child);
# } elsif(defined $child) {
# sleep 1000000;
# } else {
# $max_system_proc_reached = 1;
# }
# debug("Time to fork ten procs ", time-$time, " process ", $min_of_args_and_processes);
# if(time-$time > 1) {
# # It took more than 1 second to fork ten processes. We should stop forking.
# # Let us give the system a little slack
# $min_of_args_and_processes = int ($min_of_args_and_processes * 0.9)+1;
# $max_system_proc_reached = 1;
# }
} while($min_of_args_and_processes <= $processes
and defined $next_arg
and not $max_system_proc_reached);
# kill 9, @children;
unget_arg(@args);
return $min_of_args_and_processes;
}
sub NullReaper {
while (waitpid(-1, &WNOHANG) > 0) { }
}
sub compute_no_of_free_filehandles { sub compute_no_of_free_filehandles {
my $needed = shift; my $needed = shift;
my $i=1; my $i=1;
@ -530,13 +620,31 @@ sub init_run_jobs {
sub next_command_line { sub next_command_line {
my $cmd_line; my $cmd_line;
do { do {
$cmd_line = generate_command_line($Global::command);
} while (defined $cmd_line and $cmd_line =~ /^\s*$/); # Skip empty lines
return $cmd_line;
}
sub get_next_arg {
my $arg;
if(@Global::unget_arg) {
$arg = shift @Global::unget_arg;
} else {
if(eof STDIN) { if(eof STDIN) {
return undef; return undef;
} }
my $args = <STDIN>; $arg = <STDIN>;
$cmd_line = generate_command_line($Global::command, $args); chomp $arg;
} while ($cmd_line =~ /^\s*$/); # Skip empty lines if($Global::input_is_filename) {
return $cmd_line; ($arg) = shell_quote($arg);
}
}
debug("Next arg: ".$arg."\n");
return $arg;
}
sub unget_arg {
push @Global::unget_arg, @_;
} }
sub drain_job_queue { sub drain_job_queue {
@ -605,6 +713,9 @@ sub print_job {
# Only relevant for grouping # Only relevant for grouping
$Global::grouped or return; $Global::grouped or return;
my $fhs = shift; my $fhs = shift;
if(not defined $fhs) {
return;
}
my $out = $fhs->{out}; my $out = $fhs->{out};
my $err = $fhs->{err}; my $err = $fhs->{err};
my $command = $fhs->{command}; my $command = $fhs->{command};

View file

@ -373,15 +373,6 @@ parallel "wc {} \fR> \fB{}.wc"\fR using \fBxargs\fR seems to be impossible.
Filenames beginning with '\-' can cause some commands to give Filenames beginning with '\-' can cause some commands to give
unexpected results, as it will often be interpreted as an option. unexpected results, as it will often be interpreted as an option.
.PP .PP
This takes up all memory:
.PP
.Vb 1
\& seq 1 1000000000 | parallel very_loong_command
.Ve
.PP
Should be fixed by only generating a new command when needed
(i.e. when a command has finished).
.PP
If you have a lot of filehandles, then computing the max no If you have a lot of filehandles, then computing the max no
takes a long time. takes a long time.
.SH "REPORTING BUGS" .SH "REPORTING BUGS"
@ -390,6 +381,7 @@ Report bugs to <bug\-parallel@tange.dk>.
.SH "AUTHOR" .SH "AUTHOR"
.IX Header "AUTHOR" .IX Header "AUTHOR"
Copyright (C) 2007\-10\-18 Ole Tange, http://ole.tange.dk Copyright (C) 2007\-10\-18 Ole Tange, http://ole.tange.dk
Copyright (C) 2008\-2009 Ole Tange, http://ole.tange.dk
.SH "LICENSE" .SH "LICENSE"
.IX Header "LICENSE" .IX Header "LICENSE"
Copyright (C) 2007 Free Software Foundation, Inc. Copyright (C) 2007 Free Software Foundation, Inc.

View file

@ -1,2 +1,32 @@
b b
d d
1
10
2
3
4
5
6
7
8
9
1
10
2
3
4
5
6
7
8
9
1
10
2
3
4
5
6
7
8
9

View file

@ -0,0 +1,2 @@
Force outside the file handle limit
Start

View file

@ -3,3 +3,8 @@
cd input-files/test08 cd input-files/test08
ls | parallel -q perl -ne '/_PRE (\d+)/ and $p=$1; /hatchname> (\d+)/ and $1!=$p and print $ARGV,"\n"' ls | parallel -q perl -ne '/_PRE (\d+)/ and $p=$1; /hatchname> (\d+)/ and $1!=$p and print $ARGV,"\n"'
seq 1 10 | parallel -j 1 echo | sort
seq 1 10 | parallel -j 2 echo | sort
seq 1 10 | parallel -j 3 echo | sort

View file

@ -0,0 +1,8 @@
#!/bin/bash
echo Force outside the file handle limit
# 2009-02-17 Gave fork error
(echo echo Start;
seq 1 100000 | perl -pe 's/^/true /';
echo echo end) | parallel -uj 0

View file

@ -1,2 +1,32 @@
b b
d d
1
10
2
3
4
5
6
7
8
9
1
10
2
3
4
5
6
7
8
9
1
10
2
3
4
5
6
7
8
9

View file

@ -0,0 +1,2 @@
Force outside the file handle limit
Start