parallel: --progress to STDERR. Test of -M.

This commit is contained in:
Ole Tange 2010-11-28 04:11:45 +01:00
parent 16bf961945
commit 46cc1980fa
3 changed files with 131 additions and 136 deletions

View file

@ -2698,8 +2698,8 @@ sub parse_options {
if(defined $::opt_trim) { $Global::trim = $::opt_trim; }
if(defined $::opt_arg_sep) { $Global::arg_sep = $::opt_arg_sep; }
if(defined $::opt_arg_file_sep) { $Global::arg_file_sep = $::opt_arg_file_sep; }
if(defined $::opt_number_of_cpus) { print no_of_cpus(),"\n"; wait_and_exit(0); }
if(defined $::opt_number_of_cores) { print no_of_cores(),"\n"; wait_and_exit(0); }
if(defined $::opt_number_of_cpus) { print SSHLogin::no_of_cpus(),"\n"; wait_and_exit(0); }
if(defined $::opt_number_of_cores) { print SSHLogin::no_of_cores(),"\n"; wait_and_exit(0); }
if(defined $::opt_max_line_length_allowed) { print CommandMaxLength::real_max_length(),"\n"; wait_and_exit(0); }
if(defined $::opt_version) { version(); wait_and_exit(0); }
if(defined $::opt_show_limits) { show_limits(); }
@ -3289,7 +3289,7 @@ sub user_requested_processes {
}
sub no_of_cpus {
sub _no_of_cpus {
# Returns:
# Number of physical CPUs
if(not $Private::no_of_cpus) {
@ -3309,7 +3309,7 @@ sub no_of_cpus {
return $Private::no_of_cpus;
}
sub no_of_cores {
sub _no_of_cores {
# Returns:
# Number of CPU cores
if(not $Private::no_of_cores) {
@ -3329,7 +3329,7 @@ sub no_of_cores {
return $Private::no_of_cores;
}
sub no_of_cpus_gnu_linux {
sub _no_of_cpus_gnu_linux {
# Returns:
# Number of physical CPUs on GNU/Linux
my $no_of_cpus;
@ -3347,7 +3347,7 @@ sub no_of_cpus_gnu_linux {
return $no_of_cpus;
}
sub no_of_cores_gnu_linux {
sub _no_of_cores_gnu_linux {
# Returns:
# Number of CPU cores on GNU/Linux
my $no_of_cores;
@ -3362,35 +3362,35 @@ sub no_of_cores_gnu_linux {
return $no_of_cores;
}
sub no_of_cpus_darwin {
sub _no_of_cpus_darwin {
# Returns:
# Number of physical CPUs on Mac Darwin
my $no_of_cpus = `sysctl -a hw 2>/dev/null | grep -w physicalcpu | awk '{ print \$2 }'`;
return $no_of_cpus;
}
sub no_of_cores_darwin {
sub _no_of_cores_darwin {
# Returns:
# Number of CPU cores on Mac Darwin
my $no_of_cores = `sysctl -a hw 2>/dev/null | grep -w logicalcpu | awk '{ print \$2 }'`;
return $no_of_cores;
}
sub no_of_cpus_freebsd {
sub _no_of_cpus_freebsd {
# Returns:
# Number of physical CPUs on FreeBSD
my $no_of_cpus = `sysctl hw.ncpu 2>/dev/null | awk '{ print \$2 }'`;
return $no_of_cpus;
}
sub no_of_cores_freebsd {
sub _no_of_cores_freebsd {
# Returns:
# Number of CPU cores on FreeBSD
my $no_of_cores = `sysctl -a hw 2>/dev/null | grep -w logicalcpu | awk '{ print \$2 }'`;
return $no_of_cores;
}
sub no_of_cpus_solaris {
sub _no_of_cpus_solaris {
# Returns:
# Number of physical CPUs on Solaris
if(-x "/usr/sbin/psrinfo") {
@ -3551,7 +3551,7 @@ sub drain_job_queue {
# Returns: N/A
if($::opt_progress) {
do_not_reap();
print init_progress();
print $Global::original_stderr init_progress();
reap_if_needed();
}
my $last_header="";
@ -3563,15 +3563,15 @@ sub drain_job_queue {
my %progress = progress();
do_not_reap();
if($last_header ne $progress{'header'}) {
print "\n",$progress{'header'},"\n";
print $Global::original_stderr "\n",$progress{'header'},"\n";
$last_header = $progress{'header'};
}
print "\r",$progress{'status'};
print $Global::original_stderr "\r",$progress{'status'};
reap_if_needed();
}
}
if($::opt_progress) {
print "\n";
print $Global::original_stderr "\n";
}
}
@ -3580,7 +3580,7 @@ sub toggle_progress {
# Returns: N/A
$::opt_progress = not $::opt_progress;
if($::opt_progress) {
print init_progress();
print $Global::original_stderr init_progress();
}
}
@ -3924,7 +3924,7 @@ sub parse_sshlogin {
if($sshlogin_string =~ s:^(\d*)/:: and $1) {
# Override default autodetected ncpus unless zero or missing
$Global::host{$sshlogin_string} = SSHLogin->new($sshlogin_string);
$Global::host{$sshlogin_string}->ncpus($1);
$Global::host{$sshlogin_string}->set_ncpus($1);
} else {
$Global::host{$sshlogin_string} = SSHLogin->new($sshlogin_string);
}
@ -3957,23 +3957,6 @@ sub remote_hosts {
return grep !/^:$/, keys %Global::host;
}
#sub sshtransfer {
# # Return the sshcommand needed to transfer the file
# # Returns:
# # ssh command needed to transfer file to sshlogin
# return sshtransferreturn(@_,1,0);
#}
sub _sshreturn {
# Return the sshcommand needed to returning the file
# Returns:
# ssh command needed to transfer file from sshlogin
my $removesource = $::opt_cleanup;
return sshtransferreturn(@_,0,$removesource);
}
sub setup_basefile {
@ -4292,6 +4275,15 @@ sub new {
my @unget = ();
return bless {
'string' => $string,
'jobs_running' => undef,
'jobs_completed' => undef,
'maxlength' => undef,
'max_jobs_running' => undef,
'ncpus' => undef,
'sshcommand' => undef,
'serverlogin' => undef,
'control_path_dir' => undef,
'control_path' => undef,
}, ref($class) || $class;
}
@ -4341,17 +4333,11 @@ sub inc_jobs_completed {
$self->{'jobs_completed'}++;
}
sub max_line_length {
my $self = shift;
die "TODO";
}
sub set_max_jobs_running {
my $self = shift;
$self->{'max_jobs_running'} = shift;
}
sub max_jobs_running {
my $self = shift;
if(not defined $self->{'max_jobs_running'}) {
@ -4361,12 +4347,14 @@ sub max_jobs_running {
return $self->{'max_jobs_running'};
}
sub set_ncpus {
my $self = shift;
$self->{'ncpus'} = shift;
}
sub ncpus {
my $self = shift;
my $ncpus = shift; # undef if we do not want to set it
if(defined $ncpus) {
$self->{'ncpus'} = $ncpus;
} elsif(not defined $self->{'ncpus'}) {
if(not defined $self->{'ncpus'}) {
my $sshcmd = $self->sshcommand();
my $serverlogin = $self->serverlogin();
if($serverlogin eq ":") {
@ -4388,7 +4376,7 @@ sub ncpus {
$self->{'ncpus'} = $ncpu;
} else {
print STDERR ("Warning: Could not figure out ",
"number of cpus on $serverlogin. Using 1");
"number of cpus on $serverlogin. Using 1\n");
$self->{'ncpus'} = 1;
}
}
@ -4399,27 +4387,23 @@ sub ncpus {
sub no_of_cpus {
# Returns:
# Number of physical CPUs
if(not $Private::no_of_cpus) {
local $/="\n"; # If delimiter is set, then $/ will be wrong
local $/="\n"; # If delmiiter is set, then $/ will be wrong
my $no_of_cpus = (no_of_cpus_freebsd()
|| no_of_cpus_darwin()
|| no_of_cpus_solaris()
|| no_of_cpus_gnu_linux()
);
if($no_of_cpus) {
$Private::no_of_cpus = $no_of_cpus;
return $no_of_cpus;
} else {
warn("Cannot figure out number of cpus. Using 1");
$Private::no_of_cpus = 1;
return 1;
}
}
return $Private::no_of_cpus;
}
sub no_of_cores {
# Returns:
# Number of CPU cores
if(not $Private::no_of_cores) {
local $/="\n"; # If delimiter is set, then $/ will be wrong
my $no_of_cores = (no_of_cores_freebsd()
|| no_of_cores_darwin()
@ -4427,14 +4411,12 @@ sub no_of_cores {
|| no_of_cores_gnu_linux()
);
if($no_of_cores) {
$Private::no_of_cores = $no_of_cores;
return $no_of_cores;
} else {
warn("Cannot figure out number of CPU cores. Using 1");
$Private::no_of_cores = 1;
return 1;
}
}
return $Private::no_of_cores;
}
sub no_of_cpus_gnu_linux {
# Returns:
@ -4572,21 +4554,22 @@ sub sshcommand_of_sshlogin {
# Normal ssh
if($::opt_controlmaster) {
# Use control_path to make ssh faster
my $control_path = control_path_dir()."/ssh-%r@%h:%p";
my $control_path = $self->control_path_dir()."/ssh-%r@%h:%p";
$sshcmd = "ssh -S ".$control_path;
$serverlogin = $sshlogin;
#my $master = "ssh -MTS ".control_path_dir()."/ssh-%r@%h:%p ".$serverlogin;
my $master = "ssh -MTS ".control_path_dir()."/ssh-%r@%h:%p ".$serverlogin." sleep 1";
if(not $Private::control_path{$control_path}++) {
# my $master = "ssh -MTS ".$self->control_path_dir()."/ssh-%r@%h:%p ".$serverlogin." sleep 1";
my $master = "ssh -MTS $control_path $serverlogin sleep 1";
if(not $self->{'control_path'}{$control_path}++) {
# Master is not running for this control_path
# Start it
my $pid = fork();
if($pid) {
$Global::sshmaster{$pid}++;
} else {
debug($master,"\n");
::debug($master,"\n");
`$master`;
wait_and_exit(0);
::wait_and_exit(0);
}
}
} else {
@ -4601,12 +4584,15 @@ sub sshcommand_of_sshlogin {
sub control_path_dir {
# Returns:
# path to directory
if(not $Private::control_path_dir) {
$Private::control_path_dir =
tempdir($ENV{'HOME'}."/.parallel/tmp/control_path_dir-XXXX",
my $self = shift;
if(not defined $self->{'control_path_dir'}) {
-e $ENV{'HOME'}."/.parallel" or mkdir $ENV{'HOME'}."/.parallel";
-e $ENV{'HOME'}."/.parallel/tmp" or mkdir $ENV{'HOME'}."/.parallel/tmp";
$self->{'control_path_dir'} =
File::Temp::tempdir($ENV{'HOME'}."/.parallel/tmp/control_path_dir-XXXX",
CLEANUP => 1);
}
return $Private::control_path_dir;
return $self->{'control_path_dir'};
}
@ -4659,6 +4645,16 @@ sub new {
my $commandline = shift;
return bless {
'commandline' => $commandline,
'seq' => undef,
'stdout' => undef,
'stderr' => undef,
'pid' => undef,
# hash of { SSHLogins => number of times the command failed there }
'failed' => undef,
'sshlogin' => undef,
# The commandline wrapped with rsync and ssh
'sshlogin_wrap' => undef,
'exitstatus' => undef,
}, ref($class) || $class;
}
@ -4887,9 +4883,29 @@ sub return {
sub sshreturn {
my $self = shift;
my $sshlogin = $self->sshlogin();
my $sshcmd = $sshlogin->sshcommand();
my $serverlogin = $sshlogin->serverlogin();
my $rsync_opt = "-rlDzRE -e".::shell_quote_scalar($sshcmd);
my $pre = "";
for my $file ($self->return()) {
$pre .= sshtransferreturn($sshlogin,$file,0,$::opt_cleanup).";";
$file =~ s:/\./:/:g; # Rsync treats /./ special. We dont want that
$file =~ s:^\./::g; # Remove ./ if any
my $relpath = ($file !~ m:^/:); # Is the path relative?
# Use different subdirs depending on abs or rel path
# Return or cleanup
my @cmd = ();
my $rsync_destdir = ($relpath ? "./" : "/");
my $ret_file = $file;
my $remove = $::opt_cleanup ? "--remove-source-files" : "";
# If relative path: prepend workdir/./ to avoid problems if the dir contains ':'
# and to get the right relative return path
my $replaced = ($relpath ? workdir()."/./" : "") . $file;
# --return
# Abs path: rsync -rlDzRE server:/home/tange/dir/subdir/file.gz /
# Rel path: rsync -rlDzRE server:./subsir/file.gz ./
$pre .= "rsync $rsync_opt $remove $serverlogin:".
::shell_quote_scalar($replaced) . " ".$rsync_destdir.";";
}
return $pre;
}
@ -4931,42 +4947,6 @@ sub cleanup {
}
}
sub sshtransferreturn {
# Returns:
# ssh comands needed to transfer file to/from sshlogin
my ($sshlogin,$file,$transfer,$removesource) = (@_);
my $sshcmd = $sshlogin->sshcommand();
my $serverlogin = $sshlogin->serverlogin();
my $rsync_opt = "-rlDzRE -e".::shell_quote_scalar($sshcmd);
$file =~ s:/\./:/:g; # Rsync treats /./ special. We dont want that
$file =~ s:^\./::g; # Remove ./ if any
my $relpath = ($file !~ m:^/:); # Is the path relative?
# Use different subdirs depending on abs or rel path
if($transfer) {
die("use sshtransfer instead");
} else {
# Return or cleanup
#my $noext = no_extension($file); # Remove .ext before prepending ./
my @cmd = ();
my $rsync_destdir = ($relpath ? "./" : "/");
#for my $ret_file (@Global::ret_files) {
my $ret_file = $file;
my $remove = $removesource ? "--remove-source-files" : "";
# If relative path: prepend workdir/./ to avoid problems if the dir contains ':'
# and to get the right relative return path
my $replaced = ($relpath ? workdir()."/./" : "") .
$file;
# context_replace($ret_file,[$file]);
# --return
# Abs path: rsync -rlDzRE server:/home/tange/dir/subdir/file.gz /
# Rel path: rsync -rlDzRE server:./subsir/file.gz ./
push(@cmd, "rsync $rsync_opt $remove $serverlogin:".
::shell_quote_scalar($replaced) . " ".$rsync_destdir);
#}
return join(";",@cmd);
}
}
sub workdir {
# Returns:
# the workdir on a remote machine
@ -5008,9 +4988,7 @@ sub start {
Carp::confess($job);
die "jkj2";
}
# my $commandline = $job->{'commandline'};
my $command = $job->sshlogin_wrap();
# my $clean_command = $commandline->replaced();
my ($pid,$name);
if($Global::grouped) {
my (%out,%err,$outname,$errname);
@ -5057,7 +5035,6 @@ sub start {
}
$Global::total_running++;
$Global::total_started++;
#print STDERR "LEN".length($command)."\n";
if(not $job->seq()) {
# This is a new (non-retried) job: Give it a new seq
$Private::job_start_sequence++;
@ -5068,7 +5045,6 @@ sub start {
::debug("$Global::total_running processes. Starting (".$job->seq()."): $command\n");
if(@::opt_a and $job->seq() == 1) {
# Give STDIN to the first job if using -a
::debug("seq=1\n");
$pid = ::open3("<&STDIN", ">&STDOUT", ">&STDERR", $command) ||
die("open3 (with -a) failed. Report a bug to <bug-parallel\@gnu.org>\n");
# Re-open to avoid complaining
@ -5077,17 +5053,13 @@ sub start {
} elsif (not $Global::tty_taken and -c "/dev/tty" and
open(DEVTTY, "/dev/tty")) {
# Give /dev/tty to the command if no one else is using it
::debug("tty free $command\n");
$pid = ::open3("<&DEVTTY", ">&STDOUT", ">&STDERR", $command) ||
die("open3 (with /dev/tty) failed. Report a bug to <bug-parallel\@gnu.org>\n");
$Global::tty_taken = $pid;
close DEVTTY;
::debug("tty worked\n");
} else {
::debug("gensym\n");
$pid = ::open3(::gensym, ">&STDOUT", ">&STDERR", $command) ||
die("open3 (with gensym) failed. Report a bug to <bug-parallel\@gnu.org>\n");
::debug("gensym worked\n");
}
$job->set_pid($pid);
open STDOUT, ">&", $Global::original_stdout
@ -5108,7 +5080,6 @@ sub print {
my $out = $self->stdout();
my $err = $self->stderr();
my ($command) = $self->sshlogin_wrap();
# my ($clean_command) = $self->{'commandline'}->replaced();
if($Global::verbose and $Global::grouped) {
if($Global::verbose == 1) {
@ -5204,6 +5175,7 @@ sub new {
'return_files' => $return_files,
'positional_replace' => \%positional_replace,
'multi_replace' => \%multi_replace,
'replaced' => undef,
}, ref($class) || $class;
}
@ -5408,7 +5380,7 @@ sub number_of_replacements {
sub replaced {
my $self = shift;
if(not $self->{'replaced'}) {
if(not defined $self->{'replaced'}) {
$self->{'replaced'} = $self->replace_placeholders($self->{'command'});
}
if($::oodebug and length($self->{'replaced'}) != ($self->len())) {
@ -5799,13 +5771,11 @@ sub get {
my $prepend = undef;
my $empty = 1;
for my $fh (@{$self->{'fhs'}}) {
# ::debug("Reading $fh\n");
if(eof($fh)) {
if(defined $prepend) {
push @record, Arg->new($prepend);
$empty = 0;
} else {
# push @record, undef;
push @record, Arg->new($prepend||"");
}
next;
@ -5822,7 +5792,6 @@ sub get {
push @record, Arg->new($prepend);
$empty = 0;
} else {
# push @record, undef;
push @record, Arg->new($prepend);
}
::debug("Is empty? $empty");
@ -5844,15 +5813,12 @@ sub get {
redo;
}
}
# ::debug("ArgLineQueue->get '",$arg,"'\n");
push @record, Arg->new($arg);
$empty = 0;
}
if($empty) {
# ::debug("Return undef");
return undef;
} else {
# ::debug("return [@record]");
return \@record;
}
}
@ -6074,6 +6040,6 @@ sub unlock {
# Keep perl -w happy
$::opt_x = $::opt_workdir = $Private::control_path = $Semaphore::timeout = $Semaphore::wait =
$::opt_x = $::opt_workdir = $Semaphore::timeout = $Semaphore::wait =
$::opt_skip_first_line = $::opt_shebang = 0 ;

View file

@ -0,0 +1,8 @@
#!/bin/bash
SERVER1=parallel-server3
SERVER2=parallel-server2
echo '### Test -M'
seq 1 20 | parallel -k -M -S 9/$SERVER1,9/parallel@$SERVER2 echo

View file

@ -0,0 +1,21 @@
### Test -M
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20