parallel: Saving to SQL is now done using sql->output().

This commit is contained in:
Ole Tange 2016-12-14 03:53:33 +01:00
parent 0c41e2d2df
commit e305356898
3 changed files with 62 additions and 42 deletions

View file

@ -64,7 +64,7 @@ if($opt::sqlmaster) {
$Global::sql->create_table($#input_source_fh+1); $Global::sql->create_table($#input_source_fh+1);
if($opt::sqlworker) { if($opt::sqlworker) {
# Start a real --sqlworker in the background later # Start a real --sqlworker in the background later
$Global::sqlworker = 1; $Global::start_sqlworker = 1;
$opt::sqlworker = undef; $opt::sqlworker = undef;
} }
} }
@ -1223,7 +1223,7 @@ sub check_invalid_option_combinations {
sub init_globals { sub init_globals {
# Defaults: # Defaults:
$Global::version = 20161123; $Global::version = 20161208;
$Global::progname = 'parallel'; $Global::progname = 'parallel';
$Global::infinity = 2**31; $Global::infinity = 2**31;
$Global::debug = 0; $Global::debug = 0;
@ -2460,9 +2460,9 @@ sub drain_job_queue {
while($opt::sqlmaster and not $Global::sql->finished()) { while($opt::sqlmaster and not $Global::sql->finished()) {
# SQL master # SQL master
$sleep = ::reap_usleep($sleep); $sleep = ::reap_usleep($sleep);
if($Global::sqlworker) { if($Global::start_sqlworker) {
# Start an SQL worker as we are now sure there is work to do # Start an SQL worker as we are now sure there is work to do
$Global::sqlworker = 0; $Global::start_sqlworker = 0;
if(fork()) { if(fork()) {
# skip # skip
} else { } else {
@ -8229,6 +8229,10 @@ sub linebuffer_print {
} }
# Print up to and including the last \n # Print up to and including the last \n
print $out_fd substr($$partial,0,$i); print $out_fd substr($$partial,0,$i);
if($opt::sqlworker) {
push (@{$self->{'sqlworker'}{$fdno}},
substr($$partial,0,$i));
}
# Remove the printed part # Remove the printed part
substr($$partial,0,$i) = ""; substr($$partial,0,$i) = "";
} }
@ -8247,6 +8251,9 @@ sub linebuffer_print {
$$partial =~ s/^/$tag/gm; $$partial =~ s/^/$tag/gm;
} }
print $out_fd $$partial; print $out_fd $$partial;
if($opt::sqlworker) {
push (@{$self->{'sqlworker'}{$fdno}},$$partial);
}
} }
# Release the memory # Release the memory
undef $$partial; undef $$partial;
@ -8256,24 +8263,9 @@ sub linebuffer_print {
# decompress done: # decompress done:
# copy to sql (if needed) # copy to sql (if needed)
# then close fh # then close fh
if($opt::sqlworker) { if($opt::sqlworker and not $opt::results) {
my @output; $Global::sql->output($fdno, $self->seq(),
my $tag = $self->tag(); join("",@{$self->{'sqlworker'}{$fdno}}));
seek $in_fh, 0, 0;
while(<$in_fh>) {
push @output, $tag,$_;
}
if($fdno == 1) {
if(not $opt::results) {
$Global::sql->update("SET Stdout = ? WHERE Seq = ".$self->seq(),
join("",@output));
}
} else {
if(not $opt::results) {
$Global::sql->update("SET Stderr = ? WHERE Seq = ".$self->seq(),
join("",@output));
}
}
} }
close $in_fh; close $in_fh;
if($? and $opt::compress) { if($? and $opt::compress) {
@ -8308,15 +8300,9 @@ sub tag_print {
} }
if($fdno == 1) { if($fdno == 1) {
$self->add_returnsize($outputlength); $self->add_returnsize($outputlength);
if($opt::sqlworker and not $opt::results) {
$Global::sql->update("SET Stdout = ? WHERE Seq = ".$self->seq(),
join("",@output));
} }
} else {
if($opt::sqlworker and not $opt::results) { if($opt::sqlworker and not $opt::results) {
$Global::sql->update("SET Stderr = ? WHERE Seq = ".$self->seq(), $Global::sql->output($fdno, $self->seq(), join("",@output));
join("",@output));
}
} }
close $in_fh; close $in_fh;
if($? and $opt::compress) { if($? and $opt::compress) {
@ -8347,16 +8333,11 @@ sub normal_print {
} }
if($fdno == 1) { if($fdno == 1) {
$self->add_returnsize($outputlength); $self->add_returnsize($outputlength);
if($opt::sqlworker and not $opt::results) {
$Global::sql->update("SET Stdout = ? WHERE Seq = ".$self->seq(),
join("",@output));
} }
} else {
if($opt::sqlworker and not $opt::results) { if($opt::sqlworker and not $opt::results) {
$Global::sql->update("SET Stderr = ? WHERE Seq = ".$self->seq(), $Global::sql->output($fdno, $self->seq(),
join("",@output)); join("",@output));
} }
}
close $in_fh; close $in_fh;
if($? and $opt::compress) { if($? and $opt::compress) {
::error($opt::decompress_program." failed."); ::error($opt::decompress_program." failed.");
@ -10222,7 +10203,7 @@ sub get_alias {
if($dburl) { if($dburl) {
return get_alias($dburl.$rest); return get_alias($dburl.$rest);
} else { } else {
Usage("$alias is not defined in @search"); ::error("$alias is not defined in @search");
exit(-1); exit(-1);
} }
} }
@ -10406,6 +10387,17 @@ sub update {
$self->run("UPDATE $table $stmt",@_); $self->run("UPDATE $table $stmt",@_);
} }
sub output {
my $self = shift;
my $fdno = shift;
my $seq = shift;
my @output = @_;
my %column_of_fdno = ( 1 => "Stdout", 2 => "Stderr" );
my $column = $column_of_fdno{$fdno};
$self->update("SET $column = ? WHERE Seq = ".$seq, join("",@output));
}
sub max_number_of_args { sub max_number_of_args {
# Maximal number of args for this table # Maximal number of args for this table
my $self = shift; my $self = shift;
@ -10526,7 +10518,7 @@ sub finished {
# Check if there are any jobs left in the SQL table that do not # Check if there are any jobs left in the SQL table that do not
# have a "real" exitval # have a "real" exitval
my $self = shift; my $self = shift;
if($opt::wait) { if($opt::wait or $Global::start_sqlworker) {
my $table = $self->table(); my $table = $self->table();
my $rv = $self->get("select Seq,Exitval from $table where Exitval <= -1000 limit 1"); my $rv = $self->get("select Seq,Exitval from $table where Exitval <= -1000 limit 1");
return not $rv->[0]; return not $rv->[0];

View file

@ -1925,6 +1925,9 @@ Execute jobs via SQL server. Read the input sources variables from the
table pointed to by I<DBURL>. The I<command> on the command line table pointed to by I<DBURL>. The I<command> on the command line
should be the same as given by B<--sqlmaster>. should be the same as given by B<--sqlmaster>.
If you have more than one B<--sqlworker> jobs may be run more than
once.
=item B<--ssh> I<sshcommand> =item B<--ssh> I<sshcommand>

View file

@ -76,6 +76,23 @@ where there is no easy way to convert an environment into commands
that generate the environment. that generate the environment.
=head3 env_parallel.*
These are the files that implements the alias or function
B<env_parallel> for a given shell. It could be argued that these
should be put in some obscure place under /usr/lib, but by putting
them in your path it becomes trivial to find the path to them and
B<source> them:
source `which env_parallel.foo`
The beauty is that they can be put anywhere in the path without the
user having to know the location. So if the user's path includes
/afs/bin/i386_fc5 or /usr/pkg/parallel/bin or
/usr/local/parallel/20161222/sunos5.6/bin the files can be put in the
dir that makes most sense for the sysadmin.
=head3 env_parallel.bash / env_parallel.zsh / env_parallel.ksh / env_parallel.pdksh =head3 env_parallel.bash / env_parallel.zsh / env_parallel.ksh / env_parallel.pdksh
B<env_parallel.(bash|ksh|pdksh|zsh)> sets the function B<env_parallel>. It uses B<env_parallel.(bash|ksh|pdksh|zsh)> sets the function B<env_parallel>. It uses
@ -731,6 +748,14 @@ commands (that cannot be parallelized) on each server, but run the
same sequence on multiple servers. same sequence on multiple servers.
=head2 --shuf
When using B<--shuf> to shuffle the jobs, all jobs are read, then they
are shuffled, and finally executed. When using SQL this makes the
B<--sqlmaster> be the part that shuffles the jobs. The B<--sqlworker>s
simply executes according to Seq number.
=head2 Buffering on disk =head2 Buffering on disk
GNU B<parallel> buffers on disk in $TMPDIR using files, that are GNU B<parallel> buffers on disk in $TMPDIR using files, that are