Fixed bug #42999: --pipepart with remote does not work

This commit is contained in:
Ole Tange 2014-08-25 00:44:19 +02:00
parent 6e612565a9
commit fd85a50d14
2 changed files with 41 additions and 14 deletions

View file

@ -123,8 +123,12 @@ if($opt::eta or $opt::bar) {
$Global::JobQueue->total_jobs();
}
if($opt::pipepart) {
@Global::cat_partials =
map { pipe_part_files($_) } @opt::a;
# Unget the command as many times as there are parts
$Global::JobQueue->{'commandlinequeue'}->unget(
map { pipe_part_files($_) } @opt::a);
map { $Global::JobQueue->{'commandlinequeue'}->get() } @Global::cat_partials
);
}
for my $sshlogin (values %Global::host) {
$sshlogin->max_jobs_running();
@ -177,23 +181,22 @@ sub pipe_part_files {
# Input:
# $file = the file to read
# Returns:
# @commands to run to pipe the blocks of the file to the command given
# @commands that will cat_partial each part
my ($file) = @_;
my $buf = "";
my $header = find_header(\$buf,open_or_exit($file));
# find positions
my @pos = find_split_positions($file,$opt::blocksize,length $header);
# unshift job with cat_partial
my @cmdlines;
# Make @cat_partials
my @cat_partials = ();
for(my $i=0; $i<$#pos; $i++) {
my $cmd = $Global::JobQueue->{'commandlinequeue'}->get();
$cmd->{'replaced'} =
cat_partial($file, 0, length($header), $pos[$i], $pos[$i+1]) . "|" .
"(".$cmd->{'replaced'}.")";
::debug("init", "Unget ", $cmd->{'replaced'}, "\n");
push(@cmdlines, $cmd);
push @cat_partials, cat_partial($file, 0, length($header), $pos[$i], $pos[$i+1]);
}
return @cmdlines;
# Remote exec should look like:
# ssh -oLogLevel=quiet lo 'eval `echo $SHELL | grep "/t\{0,1\}csh" > /dev/null && echo setenv PARALLEL_SEQ '$PARALLEL_SEQ'\; setenv PARALLEL_PID '$PARALLEL_PID' || echo PARALLEL_SEQ='$PARALLEL_SEQ'\;export PARALLEL_SEQ\; PARALLEL_PID='$PARALLEL_PID'\;export PARALLEL_PID` ;' tty\ \>/dev/null\ \&\&\ stty\ isig\ -onlcr\ -echo\;echo\ \$SHELL\ \|\ grep\ \"/t\\\{0,1\\\}csh\"\ \>\ /dev/null\ \&\&\ setenv\ FOO\ /tmp/foo\ \|\|\ export\ FOO=/tmp/foo\; \(wc\ -\ \$FOO\)
# ssh -tt not allowed. Remote will die due to broken pipe anyway.
# TODO test remote with --fifo / --cat
return @cat_partials;
}
sub find_header {
@ -5193,11 +5196,22 @@ sub sshlogin_wrap {
my $serverlogin = $sshlogin->serverlogin();
my $next_command_line = $Global::envvar.$self->replaced();
my ($pre,$post,$cleanup)=("","","");
if($serverlogin eq ":") {
$self->{'sshlogin_wrap'} = $next_command_line;
if(@Global::cat_partials) {
# Prepend:
# < /tmp/foo perl -e 'while(@ARGV) { sysseek(STDIN,shift,0) || die; $left = shift; while($read = sysread(STDIN,$buf, ($left > 32768 ? 32768 : $left))){ $left -= $read; syswrite(STDOUT,$buf); } }' 0 0 0 11 |
$self->{'sshlogin_wrap'} = (pop @Global::cat_partials). "|".
$next_command_line;
} else {
$self->{'sshlogin_wrap'} = $next_command_line;
}
} else {
# --transfer
$pre .= $self->sshtransfer();
if(@Global::cat_partials) {
$pre .= (pop @Global::cat_partials)."|";
}
# --return
$post .= $self->sshreturn();
# --cleanup
@ -5219,9 +5233,9 @@ sub sshlogin_wrap {
. q{ PARALLEL_PID='$PARALLEL_PID'\;export PARALLEL_PID` ;' });
my $remote_pre = "";
my $ssh_options = "";
if($opt::pipe and $opt::ctrlc
if(($opt::pipe or $opt::pipepart) and $opt::ctrlc
or
not $opt::pipe and not $opt::noctrlc) {
not ($opt::pipe or $opt::pipepart) and not $opt::noctrlc) {
# TODO Determine if this is needed
# Propagating CTRL-C to kill remote jobs requires
# remote jobs to be run with a terminal.

View file

@ -1629,6 +1629,19 @@ The sshloginfile '-' is special, too, it read sshlogins from stdin
If the sshloginfile is changed it will be re-read when a job finishes
though at most once per second.
This can be used to have a daemon that updates the sshloginfile to
only contain servers that are up:
cp original.slf tmp2.slf
while [ 1 ] ; do
nice parallel --nonall -j0 -k --slf original.slf --tag echo | perl 's/\t$//' > tmp.slf
if diff tmp.slf tmp2.slf; then
mv tmp.slf tmp2.slf
fi
sleep 10
done &
parallel --slf tmp2.slf ...
=item B<--slotreplace> I<replace-str>