diff --git a/src/parallel b/src/parallel index 4f76b74f..dcef9552 100755 --- a/src/parallel +++ b/src/parallel @@ -64,7 +64,7 @@ if($opt::sqlmaster) { $Global::sql->create_table($#input_source_fh+1); if($opt::sqlworker) { # Start a real --sqlworker in the background later - $Global::sqlworker = 1; + $Global::start_sqlworker = 1; $opt::sqlworker = undef; } } @@ -1223,7 +1223,7 @@ sub check_invalid_option_combinations { sub init_globals { # Defaults: - $Global::version = 20161123; + $Global::version = 20161208; $Global::progname = 'parallel'; $Global::infinity = 2**31; $Global::debug = 0; @@ -2460,9 +2460,9 @@ sub drain_job_queue { while($opt::sqlmaster and not $Global::sql->finished()) { # SQL master $sleep = ::reap_usleep($sleep); - if($Global::sqlworker) { + if($Global::start_sqlworker) { # Start an SQL worker as we are now sure there is work to do - $Global::sqlworker = 0; + $Global::start_sqlworker = 0; if(fork()) { # skip } else { @@ -8229,6 +8229,10 @@ sub linebuffer_print { } # Print up to and including the last \n print $out_fd substr($$partial,0,$i); + if($opt::sqlworker) { + push (@{$self->{'sqlworker'}{$fdno}}, + substr($$partial,0,$i)); + } # Remove the printed part substr($$partial,0,$i) = ""; } @@ -8247,6 +8251,9 @@ sub linebuffer_print { $$partial =~ s/^/$tag/gm; } print $out_fd $$partial; + if($opt::sqlworker) { + push (@{$self->{'sqlworker'}{$fdno}},$$partial); + } } # Release the memory undef $$partial; @@ -8256,24 +8263,9 @@ sub linebuffer_print { # decompress done: # copy to sql (if needed) # then close fh - if($opt::sqlworker) { - my @output; - my $tag = $self->tag(); - seek $in_fh, 0, 0; - while(<$in_fh>) { - push @output, $tag,$_; - } - if($fdno == 1) { - if(not $opt::results) { - $Global::sql->update("SET Stdout = ? WHERE Seq = ".$self->seq(), - join("",@output)); - } - } else { - if(not $opt::results) { - $Global::sql->update("SET Stderr = ? WHERE Seq = ".$self->seq(), - join("",@output)); - } - } + if($opt::sqlworker and not $opt::results) { + $Global::sql->output($fdno, $self->seq(), + join("",@{$self->{'sqlworker'}{$fdno}})); } close $in_fh; if($? and $opt::compress) { @@ -8308,15 +8300,9 @@ sub tag_print { } if($fdno == 1) { $self->add_returnsize($outputlength); - if($opt::sqlworker and not $opt::results) { - $Global::sql->update("SET Stdout = ? WHERE Seq = ".$self->seq(), - join("",@output)); - } - } else { - if($opt::sqlworker and not $opt::results) { - $Global::sql->update("SET Stderr = ? WHERE Seq = ".$self->seq(), - join("",@output)); - } + } + if($opt::sqlworker and not $opt::results) { + $Global::sql->output($fdno, $self->seq(), join("",@output)); } close $in_fh; if($? and $opt::compress) { @@ -8347,15 +8333,10 @@ sub normal_print { } if($fdno == 1) { $self->add_returnsize($outputlength); - if($opt::sqlworker and not $opt::results) { - $Global::sql->update("SET Stdout = ? WHERE Seq = ".$self->seq(), - join("",@output)); - } - } else { - if($opt::sqlworker and not $opt::results) { - $Global::sql->update("SET Stderr = ? WHERE Seq = ".$self->seq(), - join("",@output)); - } + } + if($opt::sqlworker and not $opt::results) { + $Global::sql->output($fdno, $self->seq(), + join("",@output)); } close $in_fh; if($? and $opt::compress) { @@ -10222,7 +10203,7 @@ sub get_alias { if($dburl) { return get_alias($dburl.$rest); } else { - Usage("$alias is not defined in @search"); + ::error("$alias is not defined in @search"); exit(-1); } } @@ -10406,6 +10387,17 @@ sub update { $self->run("UPDATE $table $stmt",@_); } +sub output { + my $self = shift; + my $fdno = shift; + my $seq = shift; + my @output = @_; + + my %column_of_fdno = ( 1 => "Stdout", 2 => "Stderr" ); + my $column = $column_of_fdno{$fdno}; + $self->update("SET $column = ? WHERE Seq = ".$seq, join("",@output)); +} + sub max_number_of_args { # Maximal number of args for this table my $self = shift; @@ -10526,7 +10518,7 @@ sub finished { # Check if there are any jobs left in the SQL table that do not # have a "real" exitval my $self = shift; - if($opt::wait) { + if($opt::wait or $Global::start_sqlworker) { my $table = $self->table(); my $rv = $self->get("select Seq,Exitval from $table where Exitval <= -1000 limit 1"); return not $rv->[0]; diff --git a/src/parallel.pod b/src/parallel.pod index f2b5d246..dcf7bce2 100644 --- a/src/parallel.pod +++ b/src/parallel.pod @@ -1925,6 +1925,9 @@ Execute jobs via SQL server. Read the input sources variables from the table pointed to by I. The I on the command line should be the same as given by B<--sqlmaster>. +If you have more than one B<--sqlworker> jobs may be run more than +once. + =item B<--ssh> I diff --git a/src/parallel_design.pod b/src/parallel_design.pod index b9693049..4699375c 100644 --- a/src/parallel_design.pod +++ b/src/parallel_design.pod @@ -76,6 +76,23 @@ where there is no easy way to convert an environment into commands that generate the environment. +=head3 env_parallel.* + +These are the files that implements the alias or function +B for a given shell. It could be argued that these +should be put in some obscure place under /usr/lib, but by putting +them in your path it becomes trivial to find the path to them and +B them: + + source `which env_parallel.foo` + +The beauty is that they can be put anywhere in the path without the +user having to know the location. So if the user's path includes +/afs/bin/i386_fc5 or /usr/pkg/parallel/bin or +/usr/local/parallel/20161222/sunos5.6/bin the files can be put in the +dir that makes most sense for the sysadmin. + + =head3 env_parallel.bash / env_parallel.zsh / env_parallel.ksh / env_parallel.pdksh B sets the function B. It uses @@ -731,6 +748,14 @@ commands (that cannot be parallelized) on each server, but run the same sequence on multiple servers. +=head2 --shuf + +When using B<--shuf> to shuffle the jobs, all jobs are read, then they +are shuffled, and finally executed. When using SQL this makes the +B<--sqlmaster> be the part that shuffles the jobs. The B<--sqlworker>s +simply executes according to Seq number. + + =head2 Buffering on disk GNU B buffers on disk in $TMPDIR using files, that are