Fixed: bug #32834: If input is generated slowly - do not complain.

-i with no args defaults to -i {}.
Use ::die_bug instead of die.
This commit is contained in:
Ole Tange 2011-03-20 22:40:12 +01:00
parent b4a51e40ee
commit c5d11674b0
9 changed files with 113 additions and 76 deletions

View file

@ -1,3 +1,8 @@
Fixed: bug #32834: If input is generated slowly - do not complain
-i with no args defaults to -i {}
Use ::die_bug instead of die.
test if block size is too small to match a record sep in both -N mode and normal
Example:

View file

@ -160,8 +160,8 @@ New in this release:
By putting --tollef in the site wide config file you can deinstall
Tollef's parallel and install GNU Parallel instead without any
change for users or scripts. This is useful for packagers that
currently do not distribute GNU Parallel because the command name
conflicts with Tollef's parallel.
currently rename GNU Parallel or simply do not distribute GNU
Parallel because the command name conflicts with Tollef's parallel.
* -L 0 -n 0, and -N 0 implemented. They will read one argument,
but insert 0 arguments on the command line. Useful if you just want
@ -176,6 +176,10 @@ New in this release:
* Man page examples translated into Japanese. Thanks to Koshigoe.
http://w.koshigoe.jp/study/?%5Bsystem%5D+GNU+parallel+%BB%C8%CD%D1%CE%E3#l13
* GNU Parallel will be presented at:
- LUGA, Augsburg, 2011-03-26, http://www.luga.de/Aktionen/LIT-2011/Programm
- OSAA.dk, Aarhus, 2011-04-12
* Video of presentation from FSCONS 2010-11-07. The presenter was
_really_ hoarse that day (Something to do with loads of alcohol the
night before). http://vimeo.com/20838834

View file

@ -14,7 +14,7 @@ $::oodebug=0;
$SIG{TERM} ||= sub { exit 0; }; # $SIG{TERM} is not set on Mac OS X
%Global::original_sig = %SIG;
$SIG{TERM} = sub {}; # Dummy until jobs really start
open $Global::original_stderr, ">&STDERR" or die "Can't dup STDERR: $!";
open $Global::original_stderr, ">&STDERR" or ::die_bug("Can't dup STDERR: $!");
do_not_reap();
parse_options();
@ -264,7 +264,7 @@ sub acquire_semaphore {
} else {
# child
# Get a semaphore for this pid
die "Can't start a new session: $!" if setsid() == -1;
::die_bug("Can't start a new session: $!") if setsid() == -1;
$sem = Semaphore->new($Semaphore::name,$Global::host{':'}->max_jobs_running());
$sem->acquire();
}
@ -423,7 +423,9 @@ sub parse_options {
if(defined $::opt_verbose) { $Global::stderr_verbose = 1; }
if(defined $::opt_I) { $Global::replace{'{}'} = $::opt_I; }
if(defined $::opt_U) { $Global::replace{'{.}'} = $::opt_U; }
if(defined $::opt_i) { $Global::replace{'{}'} = $::opt_i; }
if(defined $::opt_i) {
$Global::replace{'{}'} = $::opt_i eq "" ? "{}" : $::opt_i;
}
if(defined $::opt_basenamereplace) { $Global::replace{'{/}'} = $::opt_basenamereplace; }
if(defined $::opt_basenameextensionreplace) {
$Global::replace{'{/.}'} = $::opt_basenameextensionreplace;
@ -600,7 +602,7 @@ sub read_options {
}
for my $profile (@profiles) {
if(-r $profile) {
open (IN, "<", $profile) || die;
open (IN, "<", $profile) || ::die_bug("read-profile: $profile");
while(<IN>) {
/^\s*\#/ and next;
chomp;
@ -879,9 +881,12 @@ sub __RUNNING_AND_PRINTING_THE_JOBS__ {}
sub init_run_jobs {
# Remember the original STDOUT and STDERR
# Returns: N/A
open $Global::original_stdout, ">&STDOUT" or die "Can't dup STDOUT: $!";
open $Global::original_stderr, ">&STDERR" or die "Can't dup STDERR: $!";
open $Global::original_stdin, "<&STDIN" or die "Can't dup STDIN: $!";
open $Global::original_stdout, ">&STDOUT" or
::die_bug("Can't dup STDOUT: $!");
open $Global::original_stderr, ">&STDERR" or
::die_bug("Can't dup STDERR: $!");
open $Global::original_stdin, "<&STDIN" or
::die_bug("Can't dup STDIN: $!");
$Global::total_running = 0;
$Global::total_started = 0;
$Global::tty_taken = 0;
@ -1262,7 +1267,7 @@ sub read_sshloginfile {
if($file eq "..") {
$file = $ENV{'HOME'}."/.parallel/sshloginfile";
}
open(IN, $file) || die "Cannot open $file";
open(IN, $file) || ::die_bug("Cannot open $file");
while(<IN>) {
chomp;
/^\s*#/ and next;
@ -1738,7 +1743,7 @@ sub loadavg {
# Should we update the loadavg file?
my $update_loadavg_file = 0;
if(-r $self->{'loadavg_file'}) {
open(UPTIME,"<".$self->{'loadavg_file'}) || die;
open(UPTIME,"<".$self->{'loadavg_file'}) || ::die_bug("loadavg_file-r");
my $uptime_out = <UPTIME>;
close UPTIME;
# load average: 0.76, 1.53, 1.45
@ -1818,7 +1823,7 @@ sub compute_max_loadavg {
}
} elsif (-f $loadspec) {
# TODO this needs to be done for $loadspec
die;
::die_bug("loadspec-unimplemented");
$Global::max_procs_file = $loadspec;
$Global::max_procs_file_last_mod = (stat($Global::max_procs_file))[9];
if(open(IN, $Global::max_procs_file)) {
@ -1895,10 +1900,13 @@ sub processes_available_by_system_limit {
open($fh{"init-$i"},"</dev/null");
}
my $count_jobs_already_read = $Global::JobQueue->next_seq();
my $wait_time_for_getting_args = 0;
my $start_time = time;
while(1) {
$system_limit >= $wanted_processes and last;
not $more_filehandles and last;
$max_system_proc_reached and last;
my $before_getting_arg = time;
if($Global::semaphore) {
} elsif(defined $::opt_retries and $count_jobs_already_read) {
# For retries we may need to run all jobs on this sshlogin
@ -1920,6 +1928,7 @@ sub processes_available_by_system_limit {
push(@jobs, $job);
}
}
$wait_time_for_getting_args += time - $before_getting_arg;
$system_limit++;
# Every simultaneous process uses 2 filehandles when grouping
@ -1927,7 +1936,6 @@ sub processes_available_by_system_limit {
&& open($fh{$system_limit*10+2},"</dev/null");
# System process limit
$system_limit % 10 or $time=time;
my $child;
if($child = fork()) {
push (@children,$child);
@ -1941,13 +1949,19 @@ sub processes_available_by_system_limit {
} else {
$max_system_proc_reached = 1;
}
::debug("Time to fork ten procs: ", time-$time, " (processes so far: ", $system_limit,")\n");
if(time-$time > 2 and not $slow_spawining_warning_printed) {
# It took more than 2 second to fork ten processes.
my $forktime = time - $time - $wait_time_for_getting_args;
::debug("Time to fork $system_limit procs: $wait_time_for_getting_args ",
$forktime,
" (processes so far: ", $system_limit,")\n");
if($system_limit > 10 and
$forktime > 1 and
$forktime > $system_limit * 0.01
and not $slow_spawining_warning_printed) {
# It took more than 0.01 second to fork a processes on avg.
# Give the user a warning. He can press Ctrl-C if this
# sucks.
print $Global::original_stderr
("parallel: Warning: Starting 10 extra processes takes > 2 sec.\n",
("parallel: Warning: Starting $system_limit processes took > $forktime sec.\n",
"Consider adjusting -j. Press CTRL-C to stop.\n");
$slow_spawining_warning_printed = 1;
}
@ -2022,7 +2036,8 @@ sub simultaneous_sshlogin {
my $serverlogin = $self->serverlogin();
my $cmd = "$sshcmd $serverlogin echo simultaneouslogin 2>&1 &"x$wanted_processes;
::debug("Trying $wanted_processes logins at $serverlogin");
open (SIMUL, "($cmd)|grep simultaneouslogin | wc -l|") or die;
open (SIMUL, "($cmd)|grep simultaneouslogin | wc -l|") or
::die_bug("simultaneouslogin");
my $ssh_limit = <SIMUL>;
close SIMUL;
chomp $ssh_limit;
@ -2281,10 +2296,6 @@ sub sshcommand_of_sshlogin {
# login@host
my $self = shift;
my ($sshcmd, $serverlogin);
if($::oodebug and not defined $self->{'string'}) {
Carp::confess("No sshlogin");
die;
}
if($self->{'string'} =~ /(.+) (\S+)$/) {
# Own ssh command
$sshcmd = $1; $serverlogin = $2;
@ -2497,7 +2508,8 @@ sub set_stdin {
my $self = shift;
my $stdin = shift;
# set non-blocking
fcntl($stdin, ::F_SETFL, ::O_NONBLOCK) or die "Couldn't set flags for HANDLE: $!\n";
fcntl($stdin, ::F_SETFL, ::O_NONBLOCK) or
::die_bug("Couldn't set flags for HANDLE: $!");
$self->{'stdin'} = $stdin;
}
@ -2659,10 +2671,6 @@ sub sshlogin_wrap {
my $self = shift;
if(not defined $self->{'sshlogin_wrap'}) {
my $sshlogin = $self->sshlogin();
if($::oodebug and not defined $sshlogin) {
Carp::confess("No sshlogin");
die;
}
my $sshcmd = $sshlogin->sshcommand();
my $serverlogin = $sshlogin->serverlogin();
my $next_command_line = $self->replaced();
@ -2883,17 +2891,12 @@ sub start {
# Returns:
# job-object or undef if job not to run
my $job = shift;
if($::oodebug and $job->{'commandline'}->{'commandline'}) {
Carp::confess($job);
die "jkj2";
}
my $command = $job->sshlogin_wrap();
if($Global::interactive or $Global::stderr_verbose) {
if($Global::interactive) {
print $Global::original_stderr "$command ?...";
open(TTY,"/dev/tty") || die;
open(TTY,"/dev/tty") || ::die_bug("interactive-tty");
my $answer = <TTY>;
close TTY;
my $run_yes = ($answer =~ /^\s*y/i);
@ -2917,8 +2920,8 @@ sub start {
($errfh,$name) = ::tempfile(SUFFIX => ".par");
unlink $name;
open OUT, '>&', $outfh or die "Can't redirect STDOUT: $!";
open ERR, '>&', $errfh or die "Can't dup STDOUT: $!";
open OUT, '>&', $outfh or ::die_bug("Can't redirect STDOUT: $!");
open ERR, '>&', $errfh or ::die_bug("Can't dup STDOUT: $!");
$job->set_stdout($outfh);
$job->set_stderr($errfh);
} else {
@ -2944,27 +2947,27 @@ sub start {
if($::opt_pipe) {
my ($in);
$pid = ::open3($in, ">&OUT", ">&ERR", $command) ||
die("open3 (with spreadstdin) failed. Report a bug to <bug-parallel\@gnu.org>\n");
::die_bug("open3-pipe");
$job->set_stdin($in);
} elsif(@::opt_a and $job->seq() == 1) {
# Give STDIN to the first job if using -a
*IN = *STDIN;
$pid = ::open3("<&IN", ">&OUT", ">&ERR", $command) ||
die("open3 (with -a) failed. Report a bug to <bug-parallel\@gnu.org>\n");
::die_bug("open3-a");
# Re-open to avoid complaining
open STDIN, "<&", $Global::original_stdin
or die "Can't dup \$Global::original_stdin: $!";
or ::die_bug("dup-\$Global::original_stdin: $!");
} elsif ($::opt_tty and not $Global::tty_taken and -c "/dev/tty" and
open(DEVTTY, "/dev/tty")) {
# Give /dev/tty to the command if no one else is using it
*IN = *DEVTTY;
$pid = ::open3("<&IN", ">&OUT", ">&ERR", $command) ||
die("open3 (with /dev/tty) failed. Report a bug to <bug-parallel\@gnu.org>\n");
::die_bug("open3-/dev/tty");
$Global::tty_taken = $pid;
close DEVTTY;
} else {
$pid = ::open3(::gensym, ">&OUT", ">&ERR", $command) ||
die("open3 (with gensym) failed. Report a bug to <bug-parallel\@gnu.org>\n");
::die_bug("open3-gensym");
}
$job->set_pid($pid);
$job->set_starttime();
@ -3971,7 +3974,7 @@ sub new {
-d $parallel_locks or mkdir $parallel_locks;
my $lockdir = "$parallel_locks/$id";
my $lockfile = $lockdir.".lock";
if($count < 1) { die "Semaphore count = $count"; }
if($count < 1) { ::die_bug("semaphore-count: $count"); }
return bless {
'lockfile' => $lockfile,
'lockfh' => Symbol::gensym(),
@ -4038,7 +4041,8 @@ sub atomic_link_if_count_less_than {
if($self->nlinks() < $self->{'count'}) {
-d $self->{'lockdir'} || mkdir $self->{'lockdir'};
if(not -e $self->{'idfile'}) {
open (A, ">", $self->{'idfile'}) or die ">$self->{'idfile'}";
open (A, ">", $self->{'idfile'}) or
::die_bug("write_idfile: $self->{'idfile'}");
close A;
}
$retval = link $self->{'idfile'}, $self->{'pidfile'};
@ -4061,7 +4065,7 @@ sub nlinks {
sub lock {
my $self = shift;
open $self->{'lockfh'}, ">", $self->{'lockfile'}
or die "Can't open semaphore file $self->{'lockfile'}: $!";
or ::die_bug("Can't open semaphore file $self->{'lockfile'}: $!");
chmod 0666, $self->{'lockfile'}; # assuming you want it a+rw
while(not flock $self->{'lockfh'}, LOCK_EX()|LOCK_NB()) {
::debug("Cannot lock $self->{'lockfile'}");

View file

@ -18,22 +18,23 @@ B<#!/usr/bin/parallel> --shebang [options] [I<command> [arguments]]
=head1 DESCRIPTION
GNU B<parallel> is a shell tool for executing jobs concurrently locally
or using remote computers. A job is typically a single command or a
small script that has to be run for each of the lines in the
input. The typical input is a list of files, a list of hosts, a list
of users, a list of URLs, or a list of tables.
GNU B<parallel> is a shell tool for executing jobs in parallel using one
or more computers. A job is can be a single command or a small script
that has to be run for each of the lines in the input. The typical
input is a list of files, a list of hosts, a list of users, a list of
URLs, or a list of tables. A job can also be a command that reads from
a pipe. GNU B<parallel> can then split the input and pipe it into
commands in parallel.
If you use B<xargs> today you will find GNU B<parallel> very easy to
use as GNU B<parallel> is written to have the same options as
B<xargs>. If you write loops in shell, you will find GNU B<parallel>
may be able to replace most of the loops and make them run faster by
running several jobs simultaneously.
If you use xargs and tee today you will find GNU B<parallel> very easy to
use as GNU B<parallel> is written to have the same options as xargs. If
you write loops in shell, you will find GNU B<parallel> may be able to
replace most of the loops and make them run faster by running several
jobs in parallel.
GNU B<parallel> makes sure output from the commands is the same output
as you would get had you run the commands sequentially. This makes it
possible to use output from GNU B<parallel> as input for other
programs.
GNU B<parallel> makes sure output from the commands is the same output as
you would get had you run the commands sequentially. This makes it
possible to use output from GNU B<parallel> as input for other programs.
For each line of input GNU B<parallel> will execute I<command> with
the line as arguments. If no I<command> is given, the line of input is
@ -436,7 +437,7 @@ B<perl -a -F"\t" -ne 'chomp($F[2]=`date -d \@$F[2] +%FT%T`); print join("\t",@F)
=item B<-P> I<N>
Number of jobslots. Run up to N jobs in parallel. 0 means as many as
possible. Default is +0 which will run one job per CPU core.
possible. Default is 100% which will run one job per CPU core.
If B<--semaphore> is set default is 1 thus making a mutex.
@ -2647,6 +2648,24 @@ you file a bug-report.
Report bugs to <bug-parallel@gnu.org> or
https://savannah.gnu.org/bugs/?func=additem&group=parallel
Your bugreport should always include:
=over 2
=item *
The output of B<parallel --version>. If you are not running the latest
released version you should specify why you believe the problem is not
fixed in that version.
=item *
A complete example that others can run that shows the problem. A
combination of B<seq>, B<echo>, and B<sleep> can reproduce most
errors.
=back
=head1 AUTHOR

View file

@ -30,7 +30,8 @@ GNU B<sql> is often used in combination with GNU B<parallel>.
=item I<dburl>
A DBURL has the following syntax:
[sql:]vendor://[[user][:password]@][host][:port]/[database][?sqlquery]
[sql:]vendor://
[[user][:password]@][host][:port]/[database][?sqlquery]
See the section DBURL below.
@ -164,7 +165,8 @@ For this to work B<--shebang> or B<-Y> must be set as the first option.
=head1 DBURL
A DBURL has the following syntax:
[sql:]vendor://[[user][:password]@][host][:port]/[database][?sqlquery]
[sql:]vendor://
[[user][:password]@][host][:port]/[database][?sqlquery]
To quote special characters use %-encoding specified in
http://tools.ietf.org/html/rfc3986#section-2.1 (E.g. a password
@ -175,6 +177,7 @@ Examples:
sql:oracle://scott:tiger@ora.example.com/xe
postgresql://scott:tiger@pg.example.com/pgdb
pg:///
postgresqlssl://scott@pg.example.com:3333/pgdb
sql:sqlite2:////tmp/db.sqlite?SELECT * FROM foo;
sqlite3:///../db.sqlite3?SELECT%20*%20FROM%20foo;

View file

@ -1,4 +1,14 @@
#!/bin/bash
echo '### Test slow arguments generation - https://savannah.gnu.org/bugs/?32834'
seq 1 3 | parallel -j1 "sleep 2; echo {}" | parallel -kj2 echo
#seq 1000000000 1000000010 | pv -L 10 -q | stdout parallel -j 10 echo
echo '### Test too slow spawning'
seq 1000000000 1000000010 | pv -L 10 -q | stdout parallel -j 10 echo
killall -9 burnP6 2>/dev/null
seq 1 2 | parallel -j2 -N0 timeout -k 25 26 burnP6 &
sleep 1
seq 1 1000 |
stdout nice nice parallel -s 100 -uj0 true |
perl -pe '/parallel: Warning: Starting \d+ processes took/ and do {close STDIN; `killall -9 burnP6`; print "OK\n"; exit }'
killall -9 burnP6 2>/dev/null

View file

@ -290,7 +290,7 @@ Output from -h and --help
6
6
### Test --version: Version output (just check we get the same amount of lines)
7
12
### Test --verbose and -t
echo bar
echo car

View file

@ -22,7 +22,7 @@
2
### Test of --retries on unreachable host
ssh: connect to host 4.3.2.1 port 22: Connection timed out
parallel: Warning: Could not figure out number of cpus on 4.3.2.1. Using 1
parallel: Warning: Could not figure out number of cpus on 4.3.2.1 (). Using 1
echo 1
1
echo 2

View file

@ -1,14 +1,6 @@
### Test slow arguments generation - https://savannah.gnu.org/bugs/?32834
1
2
3
### Test too slow spawning
parallel: Warning: Starting 10 extra processes takes > 2 sec.
Consider adjusting -j. Press CTRL-C to stop.
1000000000
1000000001
1000000002
1000000003
1000000004
1000000005
1000000006
1000000007
1000000008
1000000009
1000000010
OK