parallel: With many --sqlworkers minimize the number of duplicates.

This commit is contained in:
Ole Tange 2020-06-11 23:25:18 +02:00
parent b5b3d5dc3e
commit 3cd3f75200
7 changed files with 129 additions and 22 deletions

2
NEWS
View file

@ -1,3 +1,5 @@
20200522
New in this release:

View file

@ -188,7 +188,7 @@ from:tange@gnu.org
to:parallel@gnu.org, bug-parallel@gnu.org
stable-bcc: Jesse Alama <jessealama@fastmail.fm>
Subject: GNU Parallel 20200622 ('SpaceX') released <<[stable]>>
Subject: GNU Parallel 20200622 ('SpaceX/Floyd') released <<[stable]>>
GNU Parallel 20200622 ('') <<[stable]>> has been released. It is available for download at: http://ftpmirror.gnu.org/parallel/
@ -201,12 +201,18 @@ Quote of the month:
New in this release:
*
https://www.slideshare.net/hoffmanlab/gnu-parallel-194030490
https://www.openskysoftware.com/site-credits.htm
* Bug fixes and man page updates.
News about GNU Parallel:
https://bioinformaticsworkbook.org/Appendix/GNUparallel/GNU_parallel_examples.html#gsc.tab=0
http://pdebuyl.be/blog/2020/gnu-parallel-for-simulations.html
https://negfeedback.blogspot.com/2020/05/indispensable-command-line-tools.html
Get the book: GNU Parallel 2018 http://www.lulu.com/shop/ole-tange/gnu-parallel-2018/paperback/product-23558902.html

View file

@ -12299,8 +12299,13 @@ sub new($) {
my $dbh;
if($driver eq "CSV") {
# CSV does not use normal dsn
$dbh = DBI->connect("dbi:CSV:", "", "", { f_dir => "$database", })
or die $DBI::errstr;
if(-d $database) {
$dbh = DBI->connect("dbi:CSV:", "", "", { f_dir => "$database", })
or die $DBI::errstr;
} else {
::error("$database is not a directory.");
::wait_and_exit(255);
}
} else {
$dbh = DBI->connect($dsn, $userid, $password,
{ RaiseError => 1, AutoInactiveDestroy => 1 })
@ -12311,7 +12316,6 @@ sub new($) {
$dbh->{'RaiseError'} = 1;
$dbh->{'ShowErrorStatement'} = 1;
$dbh->{'HandleError'} = sub {};
if(not defined $options{'table'}) {
::error("The DBURL ($dburl) must contain a table.");
::wait_and_exit(255);
@ -12650,26 +12654,58 @@ sub insert_records($) {
0, @$record_ref[1..$#$record_ref]);
}
sub get_record($) {
my $self = shift;
my @retval;
my $table = $self->table();
my @v_cols = map { ", V$_" } (1..$self->max_number_of_args());
my $v = $self->get("SELECT Seq, Command @v_cols FROM $table ".
"WHERE Exitval = -1000 ORDER BY Seq LIMIT 1;");
if($v->[0]) {
my $val_ref = $v->[0];
# Mark record as taken
my $seq = shift @$val_ref;
# Save the sequence number to use when running the job
$SQL::next_seq = $seq;
$self->update("SET Exitval = ? WHERE Seq = ".$seq, -1220);
my @command = split /\257 /, shift @$val_ref;
$SQL::command_ref = \@command;
for (@$val_ref) {
push @retval, Arg->new($_);
my $rand = "Reserved-".$$.rand();
my $v;
my $more_pending;
do {
if($self->{'driver'} eq "CSV") {
# Sub SELECT is not supported in CSV
# So to minimize the race condition below select a job at random
my $r = $self->get("SELECT Seq, Command @v_cols FROM $table ".
"WHERE Exitval = -1000 LIMIT 100;");
$v = [ sort { rand() > 0.5 } @$r ];
} else {
# Avoid race condition where multiple workers get the same job
# by setting Stdout to a unique string
# (SELECT * FROM (...) AS dummy) is needed due to sillyness in MySQL
$self->update("SET Stdout = ?,Exitval = ? ".
"WHERE Seq = (".
" SELECT * FROM (".
" SELECT min(Seq) FROM $table WHERE Exitval = -1000".
" ) AS dummy".
") AND Exitval = -1000;", $rand, -1210);
# If a parallel worker overwrote the unique string this will get nothing
$v = $self->get("SELECT Seq, Command @v_cols FROM $table ".
"WHERE Stdout = ?;", $rand);
}
}
if($v->[0]) {
my $val_ref = $v->[0];
# Mark record as taken
my $seq = shift @$val_ref;
# Save the sequence number to use when running the job
$SQL::next_seq = $seq;
$self->update("SET Exitval = ? WHERE Seq = ".$seq, -1220);
# Command is encoded with '\257 space' as splitting char
my @command = split /\257 /, shift @$val_ref;
$SQL::command_ref = \@command;
for (@$val_ref) {
push @retval, Arg->new($_);
}
} else {
# If the record was updated by another job in parallel,
# then we may not be done, so see if there are more jobs pending
$more_pending =
$self->get("SELECT Seq FROM $table WHERE Exitval = ?;", -1210);
}
} while (not $v->[0] and $more_pending->[0]);
if(@retval) {
return \@retval;
} else {

View file

@ -2380,8 +2380,11 @@ E.g.
sql:oracle://scott:tiger@ora.example.com/xe/parjob
postgresql://scott:tiger@pg.example.com/pgdb/parjob
pg:///parjob
sqlite3:///pardb/parjob.sqlite
csv:///%2Ftmp%2Fmydir/jobtable
sqlite3:///%2Ftmp%2Fpardb.sqlite/parjob
csv:///%2Ftmp%2Fpardb/parjob
Notice how / in the path of sqlite and CVS must be encoded as
%2F. Except the last / in CSV which must be a /.
It can also be an alias from ~/.sql/aliases:

View file

@ -6,9 +6,11 @@
export SQLITE=sqlite3:///%2Frun%2Fshm%2Fparallel.db
export PG=pg://`whoami`:`whoami`@lo/`whoami`
export MYSQL=mysql://`whoami`:`whoami`@lo/`whoami`
export CSV=csv:///%2Frun%2Fshm%2Fcsv
export DEBUG=false
rm /run/shm/parallel.db
rm -f /run/shm/parallel.db
mkdir -p /run/shm/csv
p_showsqlresult() {
SERVERURL=$1
@ -125,7 +127,8 @@ export -f $(compgen -A function | egrep 'p_|par_')
# Tested that -j0 in parallel is fastest (up to 15 jobs)
compgen -A function | grep par_ | sort |
stdout parallel -vj5 -k --tag --joblog /tmp/jl-`basename $0` p_wrapper \
:::: - ::: \$MYSQL \$PG \$SQLITE | perl -pe 's/tbl\d+/TBL99999/gi;' |
:::: - ::: \$MYSQL \$PG \$SQLITE \$CSV |
perl -pe 's/tbl\d+/TBL99999/gi;' |
perl -pe 's/(from TBL99999 order) .*/$1/g' |
perl -pe 's/ *\b'"$hostname"'\b */hostname/g' |
grep -v -- --------------- |

View file

@ -0,0 +1,45 @@
#!/bin/bash
# GNU Parallel SQL tests
# The tests must be able to run in parallel
export SQLITE=sqlite3:///%2Frun%2Fshm%2Fparallel.db
export PG=pg://`whoami`:`whoami`@lo/`whoami`
export MYSQL=mysql://`whoami`:`whoami`@lo/`whoami`
export CSV=csv:///%2Frun%2Fshm%2Fcsv
rm -f /run/shm/parallel.db
mkdir -p /run/shm/csv
par_few_duplicate_run() {
echo '### With many workers there will be some duplicates'
TABLE=TBL$RANDOM
DBURL="$1"/$TABLE
parallel --sqlmaster $DBURL echo ::: {1..100}
lines=$( (
parallel --sqlworker $DBURL &
parallel --sqlworker $DBURL &
parallel --sqlworker $DBURL &
parallel --sqlworker $DBURL &
wait
) | wc -l)
if [ $lines -gt 105 ] ; then
echo Error: $lines are more than 5% duplicates
else
echo OK
fi
}
hostname=`hostname`
export -f $(compgen -A function | egrep 'p_|par_')
# Tested that -j0 in parallel is fastest (up to 15 jobs)
compgen -A function | grep par_ | sort |
stdout parallel -vj5 -k --tag --joblog /tmp/jl-`basename $0` {1} {2} \
:::: - ::: \$CSV \$MYSQL \$PG \$SQLITE |
perl -pe 's/tbl\d+/TBL99999/gi;' |
perl -pe 's/(from TBL99999 order) .*/$1/g' |
perl -pe 's/ *\b'"$hostname"'\b */hostname/g' |
grep -v -- --------------- |
perl -pe 's/ *\bhost\b */host/g' |
perl -pe 's/ +/ /g'

View file

@ -0,0 +1,12 @@
par_few_duplicate_run $CSV par_few_duplicate_run $CSV
par_few_duplicate_run $CSV ### With many workers there will be some duplicates
par_few_duplicate_run $CSV OK
par_few_duplicate_run $MYSQL par_few_duplicate_run $MYSQL
par_few_duplicate_run $MYSQL ### With many workers there will be some duplicates
par_few_duplicate_run $MYSQL OK
par_few_duplicate_run $PG par_few_duplicate_run $PG
par_few_duplicate_run $PG ### With many workers there will be some duplicates
par_few_duplicate_run $PG OK
par_few_duplicate_run $SQLITE par_few_duplicate_run $SQLITE
par_few_duplicate_run $SQLITE ### With many workers there will be some duplicates
par_few_duplicate_run $SQLITE OK