diff --git a/src/parallel b/src/parallel index fccea918..b683f218 100755 --- a/src/parallel +++ b/src/parallel @@ -2698,8 +2698,8 @@ sub parse_options { if(defined $::opt_trim) { $Global::trim = $::opt_trim; } if(defined $::opt_arg_sep) { $Global::arg_sep = $::opt_arg_sep; } if(defined $::opt_arg_file_sep) { $Global::arg_file_sep = $::opt_arg_file_sep; } - if(defined $::opt_number_of_cpus) { print no_of_cpus(),"\n"; wait_and_exit(0); } - if(defined $::opt_number_of_cores) { print no_of_cores(),"\n"; wait_and_exit(0); } + if(defined $::opt_number_of_cpus) { print SSHLogin::no_of_cpus(),"\n"; wait_and_exit(0); } + if(defined $::opt_number_of_cores) { print SSHLogin::no_of_cores(),"\n"; wait_and_exit(0); } if(defined $::opt_max_line_length_allowed) { print CommandMaxLength::real_max_length(),"\n"; wait_and_exit(0); } if(defined $::opt_version) { version(); wait_and_exit(0); } if(defined $::opt_show_limits) { show_limits(); } @@ -3289,7 +3289,7 @@ sub user_requested_processes { } -sub no_of_cpus { +sub _no_of_cpus { # Returns: # Number of physical CPUs if(not $Private::no_of_cpus) { @@ -3309,7 +3309,7 @@ sub no_of_cpus { return $Private::no_of_cpus; } -sub no_of_cores { +sub _no_of_cores { # Returns: # Number of CPU cores if(not $Private::no_of_cores) { @@ -3329,7 +3329,7 @@ sub no_of_cores { return $Private::no_of_cores; } -sub no_of_cpus_gnu_linux { +sub _no_of_cpus_gnu_linux { # Returns: # Number of physical CPUs on GNU/Linux my $no_of_cpus; @@ -3347,7 +3347,7 @@ sub no_of_cpus_gnu_linux { return $no_of_cpus; } -sub no_of_cores_gnu_linux { +sub _no_of_cores_gnu_linux { # Returns: # Number of CPU cores on GNU/Linux my $no_of_cores; @@ -3362,35 +3362,35 @@ sub no_of_cores_gnu_linux { return $no_of_cores; } -sub no_of_cpus_darwin { +sub _no_of_cpus_darwin { # Returns: # Number of physical CPUs on Mac Darwin my $no_of_cpus = `sysctl -a hw 2>/dev/null | grep -w physicalcpu | awk '{ print \$2 }'`; return $no_of_cpus; } -sub no_of_cores_darwin { +sub _no_of_cores_darwin { # Returns: # Number of CPU cores on Mac Darwin my $no_of_cores = `sysctl -a hw 2>/dev/null | grep -w logicalcpu | awk '{ print \$2 }'`; return $no_of_cores; } -sub no_of_cpus_freebsd { +sub _no_of_cpus_freebsd { # Returns: # Number of physical CPUs on FreeBSD my $no_of_cpus = `sysctl hw.ncpu 2>/dev/null | awk '{ print \$2 }'`; return $no_of_cpus; } -sub no_of_cores_freebsd { +sub _no_of_cores_freebsd { # Returns: # Number of CPU cores on FreeBSD my $no_of_cores = `sysctl -a hw 2>/dev/null | grep -w logicalcpu | awk '{ print \$2 }'`; return $no_of_cores; } -sub no_of_cpus_solaris { +sub _no_of_cpus_solaris { # Returns: # Number of physical CPUs on Solaris if(-x "/usr/sbin/psrinfo") { @@ -3551,7 +3551,7 @@ sub drain_job_queue { # Returns: N/A if($::opt_progress) { do_not_reap(); - print init_progress(); + print $Global::original_stderr init_progress(); reap_if_needed(); } my $last_header=""; @@ -3563,15 +3563,15 @@ sub drain_job_queue { my %progress = progress(); do_not_reap(); if($last_header ne $progress{'header'}) { - print "\n",$progress{'header'},"\n"; + print $Global::original_stderr "\n",$progress{'header'},"\n"; $last_header = $progress{'header'}; } - print "\r",$progress{'status'}; + print $Global::original_stderr "\r",$progress{'status'}; reap_if_needed(); } } if($::opt_progress) { - print "\n"; + print $Global::original_stderr "\n"; } } @@ -3580,7 +3580,7 @@ sub toggle_progress { # Returns: N/A $::opt_progress = not $::opt_progress; if($::opt_progress) { - print init_progress(); + print $Global::original_stderr init_progress(); } } @@ -3924,7 +3924,7 @@ sub parse_sshlogin { if($sshlogin_string =~ s:^(\d*)/:: and $1) { # Override default autodetected ncpus unless zero or missing $Global::host{$sshlogin_string} = SSHLogin->new($sshlogin_string); - $Global::host{$sshlogin_string}->ncpus($1); + $Global::host{$sshlogin_string}->set_ncpus($1); } else { $Global::host{$sshlogin_string} = SSHLogin->new($sshlogin_string); } @@ -3957,23 +3957,6 @@ sub remote_hosts { return grep !/^:$/, keys %Global::host; } -#sub sshtransfer { -# # Return the sshcommand needed to transfer the file -# # Returns: -# # ssh command needed to transfer file to sshlogin -# return sshtransferreturn(@_,1,0); -#} - -sub _sshreturn { - # Return the sshcommand needed to returning the file - # Returns: - # ssh command needed to transfer file from sshlogin - my $removesource = $::opt_cleanup; - return sshtransferreturn(@_,0,$removesource); -} - - - sub setup_basefile { @@ -4292,6 +4275,15 @@ sub new { my @unget = (); return bless { 'string' => $string, + 'jobs_running' => undef, + 'jobs_completed' => undef, + 'maxlength' => undef, + 'max_jobs_running' => undef, + 'ncpus' => undef, + 'sshcommand' => undef, + 'serverlogin' => undef, + 'control_path_dir' => undef, + 'control_path' => undef, }, ref($class) || $class; } @@ -4341,17 +4333,11 @@ sub inc_jobs_completed { $self->{'jobs_completed'}++; } -sub max_line_length { - my $self = shift; - die "TODO"; -} - sub set_max_jobs_running { my $self = shift; $self->{'max_jobs_running'} = shift; } - sub max_jobs_running { my $self = shift; if(not defined $self->{'max_jobs_running'}) { @@ -4361,12 +4347,14 @@ sub max_jobs_running { return $self->{'max_jobs_running'}; } +sub set_ncpus { + my $self = shift; + $self->{'ncpus'} = shift; +} + sub ncpus { my $self = shift; - my $ncpus = shift; # undef if we do not want to set it - if(defined $ncpus) { - $self->{'ncpus'} = $ncpus; - } elsif(not defined $self->{'ncpus'}) { + if(not defined $self->{'ncpus'}) { my $sshcmd = $self->sshcommand(); my $serverlogin = $self->serverlogin(); if($serverlogin eq ":") { @@ -4388,7 +4376,7 @@ sub ncpus { $self->{'ncpus'} = $ncpu; } else { print STDERR ("Warning: Could not figure out ", - "number of cpus on $serverlogin. Using 1"); + "number of cpus on $serverlogin. Using 1\n"); $self->{'ncpus'} = 1; } } @@ -4399,41 +4387,35 @@ sub ncpus { sub no_of_cpus { # Returns: # Number of physical CPUs - if(not $Private::no_of_cpus) { - local $/="\n"; # If delimiter is set, then $/ will be wrong - my $no_of_cpus = (no_of_cpus_freebsd() - || no_of_cpus_darwin() - || no_of_cpus_solaris() - || no_of_cpus_gnu_linux() - ); - if($no_of_cpus) { - $Private::no_of_cpus = $no_of_cpus; - } else { - warn("Cannot figure out number of cpus. Using 1"); - $Private::no_of_cpus = 1; - } + local $/="\n"; # If delmiiter is set, then $/ will be wrong + my $no_of_cpus = (no_of_cpus_freebsd() + || no_of_cpus_darwin() + || no_of_cpus_solaris() + || no_of_cpus_gnu_linux() + ); + if($no_of_cpus) { + return $no_of_cpus; + } else { + warn("Cannot figure out number of cpus. Using 1"); + return 1; } - return $Private::no_of_cpus; } sub no_of_cores { # Returns: # Number of CPU cores - if(not $Private::no_of_cores) { - local $/="\n"; # If delimiter is set, then $/ will be wrong - my $no_of_cores = (no_of_cores_freebsd() - || no_of_cores_darwin() - || no_of_cores_solaris() - || no_of_cores_gnu_linux() - ); - if($no_of_cores) { - $Private::no_of_cores = $no_of_cores; - } else { - warn("Cannot figure out number of CPU cores. Using 1"); - $Private::no_of_cores = 1; - } + local $/="\n"; # If delimiter is set, then $/ will be wrong + my $no_of_cores = (no_of_cores_freebsd() + || no_of_cores_darwin() + || no_of_cores_solaris() + || no_of_cores_gnu_linux() + ); + if($no_of_cores) { + return $no_of_cores; + } else { + warn("Cannot figure out number of CPU cores. Using 1"); + return 1; } - return $Private::no_of_cores; } sub no_of_cpus_gnu_linux { @@ -4572,21 +4554,22 @@ sub sshcommand_of_sshlogin { # Normal ssh if($::opt_controlmaster) { # Use control_path to make ssh faster - my $control_path = control_path_dir()."/ssh-%r@%h:%p"; + my $control_path = $self->control_path_dir()."/ssh-%r@%h:%p"; $sshcmd = "ssh -S ".$control_path; $serverlogin = $sshlogin; #my $master = "ssh -MTS ".control_path_dir()."/ssh-%r@%h:%p ".$serverlogin; - my $master = "ssh -MTS ".control_path_dir()."/ssh-%r@%h:%p ".$serverlogin." sleep 1"; - if(not $Private::control_path{$control_path}++) { +# my $master = "ssh -MTS ".$self->control_path_dir()."/ssh-%r@%h:%p ".$serverlogin." sleep 1"; + my $master = "ssh -MTS $control_path $serverlogin sleep 1"; + if(not $self->{'control_path'}{$control_path}++) { # Master is not running for this control_path # Start it my $pid = fork(); if($pid) { $Global::sshmaster{$pid}++; } else { - debug($master,"\n"); + ::debug($master,"\n"); `$master`; - wait_and_exit(0); + ::wait_and_exit(0); } } } else { @@ -4601,12 +4584,15 @@ sub sshcommand_of_sshlogin { sub control_path_dir { # Returns: # path to directory - if(not $Private::control_path_dir) { - $Private::control_path_dir = - tempdir($ENV{'HOME'}."/.parallel/tmp/control_path_dir-XXXX", + my $self = shift; + if(not defined $self->{'control_path_dir'}) { + -e $ENV{'HOME'}."/.parallel" or mkdir $ENV{'HOME'}."/.parallel"; + -e $ENV{'HOME'}."/.parallel/tmp" or mkdir $ENV{'HOME'}."/.parallel/tmp"; + $self->{'control_path_dir'} = + File::Temp::tempdir($ENV{'HOME'}."/.parallel/tmp/control_path_dir-XXXX", CLEANUP => 1); } - return $Private::control_path_dir; + return $self->{'control_path_dir'}; } @@ -4659,6 +4645,16 @@ sub new { my $commandline = shift; return bless { 'commandline' => $commandline, + 'seq' => undef, + 'stdout' => undef, + 'stderr' => undef, + 'pid' => undef, + # hash of { SSHLogins => number of times the command failed there } + 'failed' => undef, + 'sshlogin' => undef, + # The commandline wrapped with rsync and ssh + 'sshlogin_wrap' => undef, + 'exitstatus' => undef, }, ref($class) || $class; } @@ -4887,9 +4883,29 @@ sub return { sub sshreturn { my $self = shift; my $sshlogin = $self->sshlogin(); + my $sshcmd = $sshlogin->sshcommand(); + my $serverlogin = $sshlogin->serverlogin(); + my $rsync_opt = "-rlDzRE -e".::shell_quote_scalar($sshcmd); my $pre = ""; for my $file ($self->return()) { - $pre .= sshtransferreturn($sshlogin,$file,0,$::opt_cleanup).";"; + $file =~ s:/\./:/:g; # Rsync treats /./ special. We dont want that + $file =~ s:^\./::g; # Remove ./ if any + my $relpath = ($file !~ m:^/:); # Is the path relative? + # Use different subdirs depending on abs or rel path + + # Return or cleanup + my @cmd = (); + my $rsync_destdir = ($relpath ? "./" : "/"); + my $ret_file = $file; + my $remove = $::opt_cleanup ? "--remove-source-files" : ""; + # If relative path: prepend workdir/./ to avoid problems if the dir contains ':' + # and to get the right relative return path + my $replaced = ($relpath ? workdir()."/./" : "") . $file; + # --return + # Abs path: rsync -rlDzRE server:/home/tange/dir/subdir/file.gz / + # Rel path: rsync -rlDzRE server:./subsir/file.gz ./ + $pre .= "rsync $rsync_opt $remove $serverlogin:". + ::shell_quote_scalar($replaced) . " ".$rsync_destdir.";"; } return $pre; } @@ -4931,42 +4947,6 @@ sub cleanup { } } -sub sshtransferreturn { - # Returns: - # ssh comands needed to transfer file to/from sshlogin - my ($sshlogin,$file,$transfer,$removesource) = (@_); - my $sshcmd = $sshlogin->sshcommand(); - my $serverlogin = $sshlogin->serverlogin(); - my $rsync_opt = "-rlDzRE -e".::shell_quote_scalar($sshcmd); - $file =~ s:/\./:/:g; # Rsync treats /./ special. We dont want that - $file =~ s:^\./::g; # Remove ./ if any - my $relpath = ($file !~ m:^/:); # Is the path relative? - # Use different subdirs depending on abs or rel path - if($transfer) { - die("use sshtransfer instead"); - } else { - # Return or cleanup - #my $noext = no_extension($file); # Remove .ext before prepending ./ - my @cmd = (); - my $rsync_destdir = ($relpath ? "./" : "/"); - #for my $ret_file (@Global::ret_files) { - my $ret_file = $file; - my $remove = $removesource ? "--remove-source-files" : ""; - # If relative path: prepend workdir/./ to avoid problems if the dir contains ':' - # and to get the right relative return path - my $replaced = ($relpath ? workdir()."/./" : "") . - $file; -# context_replace($ret_file,[$file]); - # --return - # Abs path: rsync -rlDzRE server:/home/tange/dir/subdir/file.gz / - # Rel path: rsync -rlDzRE server:./subsir/file.gz ./ - push(@cmd, "rsync $rsync_opt $remove $serverlogin:". - ::shell_quote_scalar($replaced) . " ".$rsync_destdir); - #} - return join(";",@cmd); - } -} - sub workdir { # Returns: # the workdir on a remote machine @@ -5008,9 +4988,7 @@ sub start { Carp::confess($job); die "jkj2"; } -# my $commandline = $job->{'commandline'}; my $command = $job->sshlogin_wrap(); -# my $clean_command = $commandline->replaced(); my ($pid,$name); if($Global::grouped) { my (%out,%err,$outname,$errname); @@ -5057,7 +5035,6 @@ sub start { } $Global::total_running++; $Global::total_started++; - #print STDERR "LEN".length($command)."\n"; if(not $job->seq()) { # This is a new (non-retried) job: Give it a new seq $Private::job_start_sequence++; @@ -5068,7 +5045,6 @@ sub start { ::debug("$Global::total_running processes. Starting (".$job->seq()."): $command\n"); if(@::opt_a and $job->seq() == 1) { # Give STDIN to the first job if using -a - ::debug("seq=1\n"); $pid = ::open3("<&STDIN", ">&STDOUT", ">&STDERR", $command) || die("open3 (with -a) failed. Report a bug to \n"); # Re-open to avoid complaining @@ -5077,17 +5053,13 @@ sub start { } elsif (not $Global::tty_taken and -c "/dev/tty" and open(DEVTTY, "/dev/tty")) { # Give /dev/tty to the command if no one else is using it - ::debug("tty free $command\n"); $pid = ::open3("<&DEVTTY", ">&STDOUT", ">&STDERR", $command) || die("open3 (with /dev/tty) failed. Report a bug to \n"); $Global::tty_taken = $pid; close DEVTTY; - ::debug("tty worked\n"); } else { - ::debug("gensym\n"); $pid = ::open3(::gensym, ">&STDOUT", ">&STDERR", $command) || die("open3 (with gensym) failed. Report a bug to \n"); - ::debug("gensym worked\n"); } $job->set_pid($pid); open STDOUT, ">&", $Global::original_stdout @@ -5108,7 +5080,6 @@ sub print { my $out = $self->stdout(); my $err = $self->stderr(); my ($command) = $self->sshlogin_wrap(); - # my ($clean_command) = $self->{'commandline'}->replaced(); if($Global::verbose and $Global::grouped) { if($Global::verbose == 1) { @@ -5204,6 +5175,7 @@ sub new { 'return_files' => $return_files, 'positional_replace' => \%positional_replace, 'multi_replace' => \%multi_replace, + 'replaced' => undef, }, ref($class) || $class; } @@ -5408,7 +5380,7 @@ sub number_of_replacements { sub replaced { my $self = shift; - if(not $self->{'replaced'}) { + if(not defined $self->{'replaced'}) { $self->{'replaced'} = $self->replace_placeholders($self->{'command'}); } if($::oodebug and length($self->{'replaced'}) != ($self->len())) { @@ -5799,13 +5771,11 @@ sub get { my $prepend = undef; my $empty = 1; for my $fh (@{$self->{'fhs'}}) { -# ::debug("Reading $fh\n"); if(eof($fh)) { if(defined $prepend) { push @record, Arg->new($prepend); $empty = 0; } else { -# push @record, undef; push @record, Arg->new($prepend||""); } next; @@ -5822,7 +5792,6 @@ sub get { push @record, Arg->new($prepend); $empty = 0; } else { -# push @record, undef; push @record, Arg->new($prepend); } ::debug("Is empty? $empty"); @@ -5844,15 +5813,12 @@ sub get { redo; } } -# ::debug("ArgLineQueue->get '",$arg,"'\n"); push @record, Arg->new($arg); $empty = 0; } if($empty) { -# ::debug("Return undef"); return undef; } else { -# ::debug("return [@record]"); return \@record; } } @@ -6074,6 +6040,6 @@ sub unlock { # Keep perl -w happy -$::opt_x = $::opt_workdir = $Private::control_path = $Semaphore::timeout = $Semaphore::wait = +$::opt_x = $::opt_workdir = $Semaphore::timeout = $Semaphore::wait = $::opt_skip_first_line = $::opt_shebang = 0 ; diff --git a/testsuite/tests-to-run/test41.sh b/testsuite/tests-to-run/test41.sh new file mode 100644 index 00000000..1878299b --- /dev/null +++ b/testsuite/tests-to-run/test41.sh @@ -0,0 +1,8 @@ +#!/bin/bash + +SERVER1=parallel-server3 +SERVER2=parallel-server2 + +echo '### Test -M' + +seq 1 20 | parallel -k -M -S 9/$SERVER1,9/parallel@$SERVER2 echo diff --git a/testsuite/wanted-results/test41 b/testsuite/wanted-results/test41 new file mode 100644 index 00000000..07a863fc --- /dev/null +++ b/testsuite/wanted-results/test41 @@ -0,0 +1,21 @@ +### Test -M +1 +2 +3 +4 +5 +6 +7 +8 +9 +10 +11 +12 +13 +14 +15 +16 +17 +18 +19 +20