parallel: --sqlworker now reads everything from the database. Command+args are ignored.

This commit is contained in:
Ole Tange 2017-01-12 23:36:15 +01:00
parent dacefe9c3e
commit 84f4a855a5
10 changed files with 83 additions and 40 deletions

View file

@ -236,6 +236,7 @@ https://www2.eecs.berkeley.edu/Pubs/TechRpts/2016/EECS-2016-212.pdf
http://journals.plos.org/plosone/article?id=10.1371/journal.pone.0168456#references
http://people.inf.ethz.ch/trayteld/papers/aerial/aerial.pdf
https://blog.razrlele.com/p/1843
https://www.javacodegeeks.com/2017/01/processing-image-documents-mapr-scale.html
* <<Possibly http://link.springer.com/chapter/10.1007%2F978-3-319-22053-6_46>>

View file

@ -8792,7 +8792,7 @@ sub populate {
if($opt::sqlmaster) {
# Insert the V1..Vn for this $seq in SQL table instead of generating one
$Global::sql->insert_records($self->seq(),$self->{'arg_list_flat_orig'});
$Global::sql->insert_records($self->seq(),$self->{'command'},$self->{'arg_list_flat_orig'});
}
}
@ -9478,6 +9478,27 @@ sub get {
my $cmd_line = shift @{$self->{'unget'}};
return ($cmd_line);
} else {
if($opt::sqlworker) {
# Get the sequence number from the SQL table
$self->set_seq($SQL::next_seq);
# Get the command from the SQL table
$self->{'command'} = $SQL::command_ref;
my @command;
# Recompute replace counts based on the read command
($self->{'replacecount'},
$self->{'len'}, @command) =
replacement_counts_and_lengths($self->{'transfer_files'},
$self->{'return_files'},
@$SQL::command_ref);
if("@command" =~ /^[^ \t\n=]*\257</) {
# Replacement string is (part of) the command (and not just
# argument or variable definition V1={})
# E.g. parallel {}, parallel my_{= s/_//=}, parallel {2}
# Do no quote (Otherwise it will fail if the input contains spaces)
$Global::noquote = 1;
}
}
my $cmd_line = CommandLine->new($self->seq(),
$self->{'command'},
$self->{'arg_queue'},
@ -9488,10 +9509,6 @@ sub get {
$self->{'replacecount'},
$self->{'len'},
);
if($opt::sqlworker) {
# Get the sequence number from the SQL table
$cmd_line->set_seq($SQL::next_seq);
}
$cmd_line->populate();
::debug("init","cmd_line->number_of_args ",
$cmd_line->number_of_args(), "\n");
@ -10671,13 +10688,17 @@ sub create_table {
sub insert_records {
my $self = shift;
my $seq = shift;
my $command_ref = shift;
my $record_ref = shift;
my $table = $self->table();
# For SQL encode the command with \257 space as split points
my $command = join("\257 ",@$command_ref);
my $v_cols = join ",", map { "V$_" } (1..$self->max_number_of_args());
# Two extra value due to $seq, Exitval, Send
my $v_vals = join ",", map { "?" } (1..$self->max_number_of_args()+3);
$self->run("INSERT INTO $table (Seq,Exitval,Send,$v_cols) ".
"VALUES ($v_vals);", $seq, -1000, 0, @$record_ref[1..$#$record_ref]);
my $v_vals = join ",", map { "?" } (1..$self->max_number_of_args()+4);
$self->run("INSERT INTO $table (Seq,Command,Exitval,Send,$v_cols) ".
"VALUES ($v_vals);", $seq, $command, -1000,
0, @$record_ref[1..$#$record_ref]);
}
sub get_record {
@ -10685,7 +10706,7 @@ sub get_record {
my @retval;
my $table = $self->table();
my $v_cols = join ",", map { "V$_" } (1..$self->max_number_of_args());
my $v = $self->get("SELECT Seq, $v_cols FROM $table ".
my $v = $self->get("SELECT Seq, Command, $v_cols FROM $table ".
"WHERE Exitval = -1000 ORDER BY Seq LIMIT 1;");
if($v->[0]) {
my $val_ref = $v->[0];
@ -10694,6 +10715,8 @@ sub get_record {
# Save the sequence number to use when running the job
$SQL::next_seq = $seq;
$self->update("SET Exitval = ? WHERE Seq = ".$seq, -1220);
my @command = split /\257 /, shift @$val_ref;
$SQL::command_ref = \@command;
for (@$val_ref) {
push @retval, Arg->new($_);
}

View file

@ -236,4 +236,5 @@ par_kill_children_timeout() {
export -f $(compgen -A function | grep par_)
compgen -A function | grep par_ | sort | parallel -j6 --tag -k '{} 2>&1'
compgen -A function | grep par_ | sort |
parallel --joblog /tmp/jl-`basename $0` -j10 --tag -k '{} 2>&1'

40
testsuite/tests-to-run/parallel-local-30s.sh Normal file → Executable file
View file

@ -42,27 +42,40 @@ par_memory_leak() {
par_linebuffer_matters_compress_tag() {
echo "### (--linebuffer) --compress --tag should give different output"
random_data_with_id_prepended() {
perl -pe 's/^/'$1'/' /dev/urandom |
pv -qL 300000 | head -c 10000000
}
export -f random_data_with_id_prepended
nolbfile=$(mktemp)
lbfile=$(mktemp)
controlfile=$(mktemp)
randomfile=$(mktemp)
# Random data because it does not compress well
# forcing the compress tool to spit out compressed blocks
head -c 10000000 /dev/urandom > $randomfile
nolb=$(seq 10 |
parallel -j0 --compress --tag random_data_with_id_prepended {#} |
field 1 | uniq)
lb=$(seq 10 |
parallel -j0 --linebuffer --compress --tag random_data_with_id_prepended {#} |
field 1 | uniq)
parallel -j0 --compress --tag --delay 1 "shuf $randomfile; sleep 1; shuf $randomfile; true" ::: {0..9} |
perl -ne '/^(\S+)\t/ and print "$1\n"' | uniq > $nolbfile &
parallel -j0 --compress --tag --delay 1 "shuf $randomfile; sleep 1; shuf $randomfile; true" ::: {0..9} |
perl -ne '/^(\S+)\t/ and print "$1\n"' | uniq > $controlfile &
parallel -j0 --line-buffer --compress --tag --delay 1 "shuf $randomfile; sleep 1; shuf $randomfile; true" ::: {0..9} |
perl -ne '/^(\S+)\t/ and print "$1\n"' | uniq > $lbfile &
wait
nolb="$(cat $nolbfile)"
control="$(cat $controlfile)"
lb="$(cat $lbfile)"
rm $nolbfile $lbfile $controlfile $randomfile
if [ "$nolb" == "$control" ] ; then
if [ "$lb" == "$nolb" ] ; then
echo "BAD: --linebuffer makes no difference"
else
echo "OK: --linebuffer makes a difference"
fi
else
echo "BAD: control and nolb are not the same"
fi
}
par_linebuffer_matters_compress() {
echo "### (--linebuffer) --compress --tag should give different output"
echo "### (--linebuffer) --compress should give different output"
random_data_with_id_prepended() {
perl -pe 's/^/'$1'/' /dev/urandom |
pv -qL 300000 | head -c 1000000
@ -89,4 +102,5 @@ par_memfree() {
}
export -f $(compgen -A function | grep par_)
compgen -A function | grep par_ | sort | parallel -j6 --tag -k '{} 2>&1'
compgen -A function | grep par_ | sort |
parallel -j0 --tag -k --joblog /tmp/jl-`basename $0` '{} 2>&1'

View file

@ -860,7 +860,6 @@ _EOF
export -f $(compgen -A function | grep par_)
# Tested with -j1..8
# -j6 was fastest
#compgen -A function | grep par_ | sort | parallel --delay $D -j$P --tag -k '{} 2>&1'
compgen -A function | grep par_ | sort | parallel --delay 0.1 -j2 --tag -k '{} 2>&1'
compgen -A function | grep par_ | sort |
parallel --joblog /tmp/jl-`basename $0` -j200% --tag -k '{} 2>&1'

View file

@ -62,4 +62,5 @@ par_keeporder() {
export -f $(compgen -A function | grep par_)
#compgen -A function | grep par_ | sort | parallel --delay $D -j$P --tag -k '{} 2>&1'
compgen -A function | grep par_ | sort | parallel --delay 0.1 -j10 --tag -k '{} 2>&1'
compgen -A function | grep par_ | sort |
parallel --joblog /tmp/jl-`basename $0` --delay 0.1 -j10 --tag -k '{} 2>&1'

View file

@ -39,3 +39,4 @@ rm /tmp/parallel-script-for-script3
stdout parallel --citation < /dev/null
touch ~/.parallel/will-cite
reset

View file

@ -2,3 +2,4 @@
# moved to parallel-local-ssh8.sh
# parallel-local-0.3s.sh
# parallel-local-10s.sh

View file

@ -8,7 +8,12 @@ par_keeporder job1
par_keeporder job2
par_path_remote_bash bug #47695: How to set $PATH on remote? Bash
par_path_remote_bash
par_path_remote_bash * Documentation: http://www.linuxmint.com
par_path_remote_bash * Documentation: https://help.ubuntu.com
par_path_remote_bash * Management: https://landscape.canonical.com
par_path_remote_bash * Support: https://ubuntu.com/advantage
par_path_remote_bash
par_path_remote_bash 0 updates are security updates.
par_path_remote_bash
par_path_remote_bash BASH Path before: /bin:/usr/bin with no parallel
par_path_remote_bash -bash: line 2: parallel: command not found
par_path_remote_bash ^^^^^^^^ Not found is OK
@ -16,7 +21,12 @@ par_path_remote_bash /bin:/usr/bin:/tmp OK
par_path_remote_bash
par_path_remote_csh bug #47695: How to set $PATH on remote? csh
par_path_remote_csh
par_path_remote_csh * Documentation: http://www.linuxmint.com
par_path_remote_csh * Documentation: https://help.ubuntu.com
par_path_remote_csh * Management: https://landscape.canonical.com
par_path_remote_csh * Support: https://ubuntu.com/advantage
par_path_remote_csh
par_path_remote_csh 0 updates are security updates.
par_path_remote_csh
par_path_remote_csh Warning: no access to tty (Bad file descriptor).
par_path_remote_csh Thus no job control in this shell.
par_path_remote_csh CSH Path before: /bin:/usr/bin with no parallel

View file

@ -3,14 +3,6 @@ parallel: Warning: Running 'parallel -j0 -N 9 --pipe parallel -j0' or
parallel: Warning: raising ulimit -n or /etc/security/limits.conf may help.
parallel: Warning: No more file handles.
parallel: Warning: Raising ulimit -n or /etc/security/limits.conf may help.
### Test --keep-order
job0
job1
job2
### Test --keeporder
job0
job1
job2
### Test SIGTERM
1
10