diff --git a/src/parallel b/src/parallel index 289dce71..8f99dc9f 100755 --- a/src/parallel +++ b/src/parallel @@ -2497,17 +2497,18 @@ if($::opt_skip_first_line) { } $Global::CommandLineQueue = CommandLineQueue->new(join(" ",@ARGV),\@fhlist,$Global::Xargs,$number_of_args,\@Global::ret_files); +$Global::JobQueue = JobQueue->new($Global::CommandLineQueue); init_run_jobs(); -if(defined $::opt_P) { - compute_number_of_processes_for_sshlogins(); -} +#if(defined $::opt_P) { +# compute_number_of_processes_for_sshlogins(); +#} my $sem; if($Global::semaphore) { # $Global::host{':'}{'max_no_of_running'} must be set - if(not defined $Global::host{':'}{'max_no_of_running'}) { - compute_number_of_processes_for_sshlogins(); - } + #if(not defined $Global::host{':'}{'max_no_of_running'}) { + #compute_number_of_processes_for_sshlogins(); + #} $sem = acquire_semaphore(); } $SIG{TERM} = \&start_no_new_jobs; @@ -2528,7 +2529,8 @@ sub acquire_semaphore { # Acquires semaphore. If needed: spawns to the background # Returns: # The semaphore to be released when jobs is complete - my $sem = Semaphore->new($Semaphore::name,$Global::host{':'}{'max_no_of_running'}); + $Global::host{':'} = SSHLogin->new(":"); + my $sem = Semaphore->new($Semaphore::name,$Global::host{':'}->max_jobs_running()); $sem->acquire(); debug("run"); if($Semaphore::fg) { @@ -2543,7 +2545,7 @@ sub acquire_semaphore { # child # Get a semaphore for this pid die "Can't start a new session: $!" if setsid() == -1; - $sem = Semaphore->new($Semaphore::name,$Global::host{':'}{'max_no_of_running'}); + $sem = Semaphore->new($Semaphore::name,$Global::host{':'}->max_jobs_running()); $sem->acquire(); } } @@ -2643,7 +2645,7 @@ sub get_options_from_array { sub parse_options { # Returns: N/A # Defaults: - $Global::version = 20101122; + $Global::version = 20101126; $Global::progname = 'parallel'; $Global::debug = 0; $Global::verbose = 0; @@ -2794,9 +2796,8 @@ sub parse_options { if(defined $::opt_P) { # compute_number_of_processes_for_sshlogins(); } else { - for my $sshlogin (keys %Global::host) { - $Global::host{$sshlogin}{'max_no_of_running'} = - $Global::default_simultaneous_sshlogins; + for my $sshlogin (values %Global::host) { + $sshlogin->set_max_jobs_running($Global::default_simultaneous_sshlogins); } } } @@ -3055,12 +3056,6 @@ sub __NUMBER_OF_PROCESSES_FILEHANDLES_MAX_LENGTH_OF_COMMAND_LINE__ {} # Number of parallel processes to run -sub compute_number_of_processes_for_sshlogins { - for my $sshlogin (keys %Global::host) { - $Global::host{$sshlogin}{'max_no_of_running'} = - compute_number_of_processes($::opt_P,$sshlogin); - } -} sub compute_number_of_processes { # Number of processes wanted and limited by system resources @@ -3069,6 +3064,9 @@ sub compute_number_of_processes { my $opt_P = shift; my $sshlogin = shift; my $wanted_processes = user_requested_processes($opt_P,$sshlogin); + if(not defined $wanted_processes) { + $wanted_processes = $Global::default_simultaneous_sshlogins; + } debug("Wanted procs: $wanted_processes\n"); my $system_limit = processes_available_by_system_limit($wanted_processes,$sshlogin); @@ -3129,6 +3127,7 @@ sub processes_available_by_system_limit { } elsif(defined $child) { # The child takes one process slot # It will be killed later + $SIG{TERM} = $Global::original_sigterm; sleep 100000; wait_and_exit(0); } else { @@ -3160,10 +3159,10 @@ sub processes_available_by_system_limit { kill 9, $pid; waitpid($pid,0); } - wait(); + #wait(); # Cleanup: Unget the command_lines (and args_refs) $Global::CommandLineQueue->unget(@command_lines); - if($sshlogin ne ":" and + if($sshlogin->string() ne ":" and $system_limit > $Global::default_simultaneous_sshlogins) { $system_limit = simultaneous_sshlogin_limit($sshlogin,$system_limit); @@ -3179,7 +3178,8 @@ sub simultaneous_sshlogin { # Number of succesful logins my $sshlogin = shift; my $wanted_processes = shift; - my ($sshcmd,$serverlogin) = sshcommand_of_sshlogin($sshlogin); + my $sshcmd = $sshlogin->sshcommand(); + my $serverlogin = $sshlogin->serverlogin(); my $cmd = "$sshcmd $serverlogin echo simultaneouslogin 2>&1 &"x$wanted_processes; debug("Trying $wanted_processes logins at $serverlogin"); open (SIMUL, "($cmd)|grep simultaneouslogin | wc -l|") or die; @@ -3195,7 +3195,8 @@ sub simultaneous_sshlogin_limit { # min($wanted_processes,$working_simultaneous_ssh_logins-1) my $sshlogin = shift; my $wanted_processes = shift; - my ($sshcmd,$serverlogin) = sshcommand_of_sshlogin($sshlogin); + my $sshcmd = $sshlogin->sshcommand(); + my $serverlogin = $sshlogin->serverlogin(); # Try twice because it guesses wrong sometimes # Choose the minimal my $ssh_limit = @@ -3249,16 +3250,16 @@ sub user_requested_processes { # E.g. -P +2 my $j = $1; $processes = - no_of_processing_units_sshlogin($sshlogin) + $j; + $sshlogin->ncpus() + $j; } elsif ($opt_P =~ /^-(\d+)$/) { # E.g. -P -2 my $j = $1; $processes = - no_of_processing_units_sshlogin($sshlogin) - $j; + $sshlogin->ncpus() - $j; } elsif ($opt_P =~ /^(\d+)\%$/) { my $j = $1; $processes = - no_of_processing_units_sshlogin($sshlogin) * $j / 100; + $sshlogin->ncpus() * $j / 100; } elsif ($opt_P =~ /^(\d+)$/) { $processes = $1; if($processes == 0) { @@ -3287,39 +3288,6 @@ sub user_requested_processes { return $processes; } -sub no_of_processing_units_sshlogin { - # Number of processing units (CPUs or cores) at this sshlogin - # Returns: - # number of CPUs or cores at the sshlogin - my $sshlogin = shift; - my ($sshcmd,$serverlogin) = sshcommand_of_sshlogin($sshlogin); - if(not $Global::host{$sshlogin}{'ncpus'}) { - if($serverlogin eq ":") { - if($::opt_use_cpus_instead_of_cores) { - $Global::host{$sshlogin}{'ncpus'} = no_of_cpus(); - } else { - $Global::host{$sshlogin}{'ncpus'} = no_of_cores(); - } - } else { - my $ncpu; - if($::opt_use_cpus_instead_of_cores) { - $ncpu = qx(echo|$sshcmd $serverlogin parallel --number-of-cpus); - chomp($ncpu); - } else { - $ncpu = qx(echo|$sshcmd $serverlogin parallel --number-of-cores); - chomp($ncpu); - } - if($ncpu =~ /^[0-9]+$/) { - $Global::host{$sshlogin}{'ncpus'} = $ncpu; - } else { - print STDERR ("Warning: Could not figure out ", - "number of cpus on $serverlogin. Using 1"); - $Global::host{$sshlogin}{'ncpus'} = 1; - } - } - } - return $Global::host{$sshlogin}{'ncpus'}; -} sub no_of_cpus { # Returns: @@ -3622,7 +3590,7 @@ sub init_progress { $|=1; my %progress = progress(); return ("\nComputers / CPU cores / Max jobs to run\n", - $progress{'workerlist'},"\n"); + $progress{'workerlist'}); } sub progress { @@ -3636,15 +3604,17 @@ sub progress { my %sshlogin = map { $_ eq ":" ? ($_=>"local") : ($_=>$_) } @workers; my $workerno = 1; my %workerno = map { ($_=>$workerno++) } @workers; - my $workerlist = join("\n", map { - $workerno{$_}.":".$sshlogin{$_} ." / ". - ($Global::host{$_}{'ncpus'} || "-") ." / ". - $Global::host{$_}{'max_no_of_running'} - } @workers); + my $workerlist = ""; + for my $w (@workers) { + $workerlist .= + $workerno{$w}.":".$sshlogin{$w} ." / ". + ($Global::host{$w}->ncpus() || "-")." / ". + $Global::host{$w}->max_jobs_running()."\n"; + } my $eta = ""; if($::opt_eta) { my $completed = 0; - for(@workers) { $completed += ($Global::host{$_}{'completed'}||0) } + for(@workers) { $completed += ($Global::host{$_}->jobs_completed()||0) } if($completed) { $Private::first_completed ||= time; my $avgtime = (time-$Private::first_completed)/$completed; @@ -3663,8 +3633,8 @@ sub progress { join(" ",map { if($Global::total_started) { - my $completed = ($Global::host{$_}{'completed'}||0); - my $running = $Global::host{$_}{'no_of_running'}; + my $completed = ($Global::host{$_}->jobs_completed()||0); + my $running = $Global::host{$_}->jobs_running(); my $time = $completed ? (time-$^T)/($completed) : "0"; sprintf("%s:%d/%d/%d%%/%.1fs ", $sshlogin{$_}, $running, $completed, @@ -3679,8 +3649,8 @@ sub progress { $status = $eta . join(" ",map { - my $completed = ($Global::host{$_}{'completed'}||0); - my $running = $Global::host{$_}{'no_of_running'}; + my $completed = ($Global::host{$_}->jobs_completed()||0); + my $running = $Global::host{$_}->jobs_running(); my $time = $completed ? (time-$^T)/($completed) : "0"; sprintf("%s:%d/%d/%d%%/%.1fs ", $workerno{$_}, $running, $completed, @@ -3694,10 +3664,11 @@ sub progress { $status = $eta . join(" ",map { sprintf("%s:%d/%d/%d%%", - $sshlogin{$_}, $Global::host{$_}{'no_of_running'}, - ($Global::host{$_}{'completed'}||0), - ($Global::host{$_}{'no_of_running'}+ - ($Global::host{$_}{'completed'}||0))*100 + $sshlogin{$_}, + $Global::host{$_}->jobs_running(), + ($Global::host{$_}->jobs_completed()||0), + ($Global::host{$_}->jobs_running()+ + ($Global::host{$_}->jobs_completed()||0))*100 / $Global::total_started) } @workers); } @@ -3707,10 +3678,11 @@ sub progress { $status = $eta . join(" ",map { sprintf("%s:%d/%d/%d%%", - $workerno{$_}, $Global::host{$_}{'no_of_running'}, - ($Global::host{$_}{'completed'}||0), - ($Global::host{$_}{'no_of_running'}+ - ($Global::host{$_}{'completed'}||0))*100 + $workerno{$_}, + $Global::host{$_}->jobs_running(), + ($Global::host{$_}->jobs_completed()||0), + ($Global::host{$_}->jobs_running()+ + ($Global::host{$_}->jobs_completed()||0))*100 / $Global::total_started) } @workers); } @@ -3720,8 +3692,8 @@ sub progress { $status = $eta . join(" ",map { sprintf("%s:%d/%d", - $sshlogin{$_}, $Global::host{$_}{'no_of_running'}, - ($Global::host{$_}{'completed'}||0)) } + $sshlogin{$_}, $Global::host{$_}->jobs_running(), + ($Global::host{$_}->jobs_completed()||0)) } @workers); } if(length $status > $termcols) { @@ -3730,8 +3702,8 @@ sub progress { $status = $eta . join(" ",map { sprintf("%s:%d/%d", - $sshlogin{$_}, $Global::host{$_}{'no_of_running'}, - ($Global::host{$_}{'completed'}||0)) } + $sshlogin{$_}, $Global::host{$_}->jobs_running(), + ($Global::host{$_}->jobs_completed()||0)) } @workers); } if(length $status > $termcols) { @@ -3740,8 +3712,8 @@ sub progress { $status = $eta . join(" ",map { sprintf("%s:%d/%d", - $workerno{$_}, $Global::host{$_}{'no_of_running'}, - ($Global::host{$_}{'completed'}||0)) } + $workerno{$_}, $Global::host{$_}->jobs_running(), + ($Global::host{$_}->jobs_completed()||0)) } @workers); } if(length $status > $termcols) { @@ -3751,7 +3723,7 @@ sub progress { join(" ",map { sprintf("%s:%d", $sshlogin{$_}, - ($Global::host{$_}{'completed'}||0)) } + ($Global::host{$_}->jobs_completed()||0)) } @workers); } if(length $status > $termcols) { @@ -3761,7 +3733,7 @@ sub progress { join(" ",map { sprintf("%s:%d", $workerno{$_}, - ($Global::host{$_}{'completed'}||0)) } + ($Global::host{$_}->jobs_completed()||0)) } @workers); } return ("workerlist" => $workerlist, "header" => $header, "status" => $status); @@ -3791,24 +3763,27 @@ sub start_more_jobs { my $mtime = (stat($Global::max_procs_file))[9]; if($mtime > $Global::max_procs_file_last_mod) { $Global::max_procs_file_last_mod = $mtime; - compute_number_of_processes_for_sshlogins(); + for my $sshlogin (values %Global::host) { + $sshlogin->set_max_jobs_running(undef); + } } } - for my $sshlogin (keys %Global::host) { - debug("Running jobs on $sshlogin: $Global::host{$sshlogin}{'no_of_running'}\n"); - while ($Global::host{$sshlogin}{'no_of_running'} < - $Global::host{$sshlogin}{'max_no_of_running'}) { + for my $sshlogin (values %Global::host) { + debug("Running jobs on ".$sshlogin->string().": ".$sshlogin->jobs_running()."\n"); + while ($sshlogin->jobs_running() < $sshlogin->max_jobs_running()) { if($Global::CommandLineQueue->empty()) { last; } + debug("Try starting a job on ".$sshlogin->string()."\n"); if(start_another_job($sshlogin) == 0) { # No more jobs to start last; } - $Global::host{$sshlogin}{'no_of_running'}++; + debug("Job started on ".$sshlogin->string()."\n"); + $sshlogin->inc_jobs_running(); $jobs_started++; } - debug("Running jobs on $sshlogin: $Global::host{$sshlogin}{'no_of_running'}\n"); + debug("Running jobs on ".$sshlogin->string().": ".$sshlogin->jobs_running()."\n"); } } return $jobs_started; @@ -3827,16 +3802,15 @@ sub start_another_job { # No more commands to run return 0; } else { - my ($command,$clean_command) = get_command_line_with_sshlogin($sshlogin); - if(not defined $command) { + my ($job) = get_job_with_sshlogin($sshlogin); + if(not defined $job) { # No command available for that sshlogin return 0; } - debug("Command to run on '$sshlogin': '$command'\n"); - my %jobinfo = start_job($command,$sshlogin,$clean_command); - if(%jobinfo) { - $Global::running{$jobinfo{"pid"}} = \%jobinfo; - debug("Started as seq ".$jobinfo{'seq'},"\n"); + debug("Command to run on '".$job->sshlogin()."': '".$job->replaced()."'\n"); + if($job->start()) { + $Global::running{$job->pid()} = $job; + debug("Started as seq ".$job->seq(),"\n"); return 1; } else { # If interactive says: Dont run the job, then skip it and run the next @@ -3849,231 +3823,66 @@ sub start_another_job { } } -sub start_job { - # Setup STDOUT and STDERR for a job and start it. - # Returns: - # "seq" => sequence number of job - # "pid" => process id - # "out" => STDOUT filehandle (if grouped) - # "err" => STDERR filehandle (if grouped) - # "sshlogin" => sshlogin - # "command" => command being run - # "clean_command" => command being run without wrapping - my $command = shift; - my $sshlogin = shift; - my $clean_command = shift; - my ($pid,$out,$err,%out,%err,$outname,$errname,$name); - if($Global::grouped) { - # To group we create temporary files for STDOUT and STDERR - # To avoid the cleanup unlink the files immediately (but keep them open) - $outname = ++$Private::TmpFilename; - ($out{$outname},$name) = tempfile(SUFFIX => ".par"); - unlink $name; - $errname = ++$Private::TmpFilename; - ($err{$errname},$name) = tempfile(SUFFIX => ".par"); - unlink $name; - open STDOUT, '>&', $out{$outname} or die "Can't redirect STDOUT: $!"; - open STDERR, '>&', $err{$errname} or die "Can't dup STDOUT: $!"; - } - if($Global::interactive or $Global::stderr_verbose) { - if($Global::interactive) { - print $Global::original_stderr "$command ?..."; - open(TTY,"/dev/tty") || die; - my $answer = ; - close TTY; - my $run_yes = ($answer =~ /^\s*y/i); - if (not $run_yes) { - open STDOUT, ">&", $Global::original_stdout - or die "Can't dup \$oldout: $!"; - open STDERR, ">&", $Global::original_stderr - or die "Can't dup \$oldout: $!"; - return; - } - } else { - print $Global::original_stderr "$command\n"; - } - } - if($Global::verbose and not $Global::grouped) { - if($Global::verbose == 1) { - print STDOUT $clean_command,"\n"; - } else { - # Verbose level > 1: Print the rsync and stuff - print STDOUT $command,"\n"; - } - } - $Global::total_running++; - $Global::total_started++; - #print STDERR "LEN".length($command)."\n"; - my $job_start_sequence; - if($Global::failed{$clean_command}{'seq'}) { - # This is a retried job: Keep the old seq - $job_start_sequence = $Global::failed{$clean_command}{'seq'}; - } else { - # This is a new (non-retried) job: Give it a new seq - $Private::job_start_sequence++; - $job_start_sequence = $Private::job_start_sequence; - } - $ENV{'PARALLEL_SEQ'} = $job_start_sequence; - $ENV{'PARALLEL_PID'} = $$; - debug("$Global::total_running processes. Starting ($job_start_sequence): $command\n"); - if(@::opt_a and $job_start_sequence == 1) { - # Give STDIN to the first job if using -a - $pid = open3("<&STDIN", ">&STDOUT", ">&STDERR", $command) || - die("open3 (with -a) failed. Report a bug to \n"); - # Re-open to avoid complaining - open STDIN, "<&", $Global::original_stdin - or die "Can't dup \$Global::original_stdin: $!"; - } elsif (not $Global::tty_taken and -c "/dev/tty" and - open(DEVTTY, "/dev/tty")) { - # Give /dev/tty to the command if no one else is using it - $pid = open3("<&DEVTTY", ">&STDOUT", ">&STDERR", $command) || - die("open3 (with /dev/tty) failed. Report a bug to \n"); - $Global::tty_taken = $pid; - close DEVTTY; - } else { - $pid = open3(gensym, ">&STDOUT", ">&STDERR", $command) || - die("open3 (with gensym) failed. Report a bug to \n"); - } - debug("started: $command\n"); - open STDOUT, ">&", $Global::original_stdout - or die "Can't dup \$Global::original_stdout: $!"; - open STDERR, ">&", $Global::original_stderr - or die "Can't dup \$Global::original_stderr: $!"; - if($Global::grouped) { - return ("seq" => $job_start_sequence, - "pid" => $pid, - "out" => $out{$outname}, - "err" => $err{$errname}, - "sshlogin" => $sshlogin, - "command" => $command, - "clean_command" => $clean_command); - } else { - return ("seq" => $job_start_sequence, - "pid" => $pid, - "sshlogin" => $sshlogin, - "command" => $command, - "clean_command" => $clean_command); - } -} - -sub print_job { - # Print the output of the jobs - # Returns: N/A - # Only relevant for grouping - $Global::grouped or return; - my $fhs = shift; - if(not defined $fhs) { - return; - } - my $out = $fhs->{out}; - my $err = $fhs->{err}; - my $command = $fhs->{command}; - my $clean_command = $fhs->{clean_command}; - - debug(">>joboutput $command\n"); - if($Global::verbose and $Global::grouped) { - if($Global::verbose == 1) { - print STDOUT $clean_command,"\n"; - } else { - # Verbose level > 1: Print the rsync and stuff - print STDOUT $command,"\n"; - } - # If STDOUT and STDERR are merged, we want the command to be printed first - # so flush to avoid STDOUT being buffered - flush STDOUT; - } - seek $_, 0, 0 for $out, $err; - if($Global::debug) { - print STDERR "ERR:\n"; - } - my $buf; - while(sysread($err,$buf,1000_000)) { - print STDERR $buf; - } - flush STDERR; - if($Global::debug) { - print STDOUT "OUT:\n"; - } - while(sysread($out,$buf,1000_000)) { - print STDOUT $buf; - } - flush STDOUT; - debug("<empty()) { - Carp::confess("get_command_line_with_sshlogin should never be called if empty"); + Carp::confess("get_job_with_sshlogin should never be called if empty"); } - my ($next_command_ref) = $Global::CommandLineQueue->get(); - if(not defined $next_command_ref) { - # No more commands + + my ($job) = $Global::JobQueue->get(); + if(not defined $job) { + # No more jobs return undef; } - my ($next_command_line) = $next_command_ref->replaced(); + + if($::oodebug and not defined $job->{'commandline'}) { + Carp::confess("get_job_with_sshlogin job->commandline should never be empty"); + } + my ($next_command_line) = $job->replaced(); my ($clean_command) = $next_command_line; if($clean_command =~ /^\s*$/) { # Do not run empty lines if(not $Global::CommandLineQueue->empty()) { - return get_command_line_with_sshlogin($sshlogin); + return get_job_with_sshlogin($sshlogin); } else { return undef; } } + $job->set_sshlogin($sshlogin); if($::opt_retries and $clean_command and - $Global::failed{$clean_command}{'count'}{$sshlogin}) { + $job->failed_here()) { # This command with these args failed for this sshlogin - my $min_failures = - min(map { $Global::failed{$clean_command}{'count'}{$_} } - keys %Global::host); - if($Global::failed{$clean_command}{'count'}{$sshlogin} == $min_failures) { + my ($no_of_failed_sshlogins,$min_failures) = $job->min_failures(); + if($no_of_failed_sshlogins < keys %Global::host or + $job->failed_here() == $min_failures) { # It failed the same or more times on another host: # run it on this host } else { # If it failed fewer times on another host: # Find another job to run - my @next_job_to_run; - if(not $Global::CommandLineQueue->empty()) { - @next_job_to_run = get_command_line_with_sshlogin($sshlogin); + my $nextjob; + if(not $Global::JobQueue->empty()) { + $nextjob = get_job_with_sshlogin($sshlogin); } # Push the command back on the queue - $Global::CommandLineQueue->unget($next_command_ref); - return @next_job_to_run; + $Global::JobQueue->unget($job); + return $nextjob; } } - return $next_command_ref->sshlogin_wrap($sshlogin); + return $job; } -sub workdir { - # Returns: - # the workdir on a remote machine - my $workdir; - if(defined $::opt_workdir) { - if($::opt_workdir ne "...") { - $workdir = $::opt_workdir; - $workdir =~ s:/\./:/:g; # Rsync treats /./ special. We dont want that - $workdir =~ s:/+$::; # Remove ending / if any - $workdir =~ s:^\./::g; # Remove starting ./ if any - } else { - $workdir = ".parallel/tmp/".hostname()."-".$$."-".$Global::transfer_seq; - } - } else { - $workdir = "."; - } - return $workdir; -} + sub __REMOTE_SSH__ {} @@ -4108,15 +3917,16 @@ sub parse_sshlogin { } } } - for my $sshlogin (@login) { - if($sshlogin =~ s:^(\d*)/::) { + for my $sshlogin_string (@login) { + if($sshlogin_string =~ s:^(\d*)/:: and $1) { # Override default autodetected ncpus unless zero or missing - if($1) { - $Global::host{$sshlogin}{'ncpus'} = $1; - } + $Global::host{$sshlogin_string} = SSHLogin->new($sshlogin_string); + $Global::host{$sshlogin_string}->ncpus($1); + } else { + $Global::host{$sshlogin_string} = SSHLogin->new($sshlogin_string); } - $Global::host{$sshlogin}{'no_of_running'} = 0; - $Global::host{$sshlogin}{'maxlength'} = Limits::Command::max_length(); + $Global::host{$sshlogin_string}->set_jobs_running(0); + $Global::host{$sshlogin_string}->set_maxlength(Limits::Command::max_length()); } debug("sshlogin: ", my_dump(%Global::host),"\n"); if($::opt_transfer or @::opt_return or $::opt_cleanup or @::opt_basefile) { @@ -4144,59 +3954,6 @@ sub remote_hosts { return grep !/^:$/, keys %Global::host; } -sub sshcommand_of_sshlogin { - # 'server' -> ('ssh -S /tmp/parallel-ssh-RANDOM/host-','server') - # 'user@server' -> ('ssh','user@server') - # 'myssh user@server' -> ('myssh','user@server') - # 'myssh -l user server' -> ('myssh -l user','server') - # '/usr/local/bin/myssh -l user server' -> ('/usr/local/bin/myssh -l user','server') - # Returns: - # sshcommand - defaults to 'ssh' - # login@host - my $sshlogin = shift; - my ($sshcmd, $serverlogin); - if($sshlogin =~ /(.+) (\S+)$/) { - # Own ssh command - $sshcmd = $1; $serverlogin = $2; - } else { - # Normal ssh - if($::opt_controlmaster) { - # Use control_path to make ssh faster - my $control_path = control_path_dir()."/ssh-%r@%h:%p"; - $sshcmd = "ssh -S ".$control_path; - $serverlogin = $sshlogin; - #my $master = "ssh -MTS ".control_path_dir()."/ssh-%r@%h:%p ".$serverlogin; - my $master = "ssh -MTS ".control_path_dir()."/ssh-%r@%h:%p ".$serverlogin." sleep 1"; - if(not $Private::control_path{$control_path}++) { - # Master is not running for this control_path - # Start it - my $pid = fork(); - if($pid) { - $Global::sshmaster{$pid}++; - } else { - debug($master,"\n"); - `$master`; - wait_and_exit(0); - } - } - } else { - $sshcmd = "ssh"; $serverlogin = $sshlogin; - } - } - return ($sshcmd, $serverlogin); -} - -sub control_path_dir { - # Returns: - # path to directory - if(not $Private::control_path_dir) { - $Private::control_path_dir = - tempdir($ENV{'HOME'}."/.parallel/tmp/control_path_dir-XXXX", - CLEANUP => 1); - } - return $Private::control_path_dir; -} - #sub sshtransfer { # # Return the sshcommand needed to transfer the file # # Returns: @@ -4204,7 +3961,7 @@ sub control_path_dir { # return sshtransferreturn(@_,1,0); #} -sub sshreturn { +sub _sshreturn { # Return the sshcommand needed to returning the file # Returns: # ssh command needed to transfer file from sshlogin @@ -4212,95 +3969,19 @@ sub sshreturn { return sshtransferreturn(@_,0,$removesource); } -sub sshcleanup { - # Return the sshcommand needed to remove the file - # Returns: - # ssh command needed to remove file from sshlogin - my ($sshlogin,$file) = (@_); - my ($sshcmd,$serverlogin) = sshcommand_of_sshlogin($sshlogin); - my $workdir = workdir(); - my $removeworkdir = ""; - my @subworkdirs = parentdirs_of($file); - if(@subworkdirs) { - $removeworkdir = "; rmdir 2>/dev/null ".join(" ",map { $workdir."/".$_ } @subworkdirs); - } - my $relpath = ($file !~ m:^/:); # Is the path relative? - my $cleandir = ($relpath ? $workdir : ""); - return "$sshcmd $serverlogin rm -f ".shell_quote($file.$removeworkdir)." "; -} -sub parentdirs_of { - # Return: - # all parentdirs except . of this dir or file sorted descending by length - my $d = shift; - my @parents = (); - while($d =~ s:/[^/]+$::) { - if($d ne ".") { - push @parents, $d; - } - } - return @parents; -} -sub sshtransferreturn { - # Returns: - # ssh comands needed to transfer file to/from sshlogin - my ($sshlogin,$file,$transfer,$removesource) = (@_); - my ($sshcmd,$serverlogin) = sshcommand_of_sshlogin($sshlogin); - my $rsync_opt = "-rlDzRE -e".shell_quote_scalar($sshcmd); - $file =~ s:/\./:/:g; # Rsync treats /./ special. We dont want that - $file =~ s:^\./::g; # Remove ./ if any - my $relpath = ($file !~ m:^/:); # Is the path relative? - # Use different subdirs depending on abs or rel path - if($transfer) { - # Abs path: rsync -rlDzRE /home/tange/dir/subdir/file.gz server:/ - # Rel path: rsync -rlDzRE ./subdir/file.gz server:.parallel/tmp/tempid/ - # Rel path: rsync -rlDzRE ./subdir/file.gz server:$workdir/ - my $remote_workdir = workdir($file); - my $rsync_destdir = ($relpath ? $remote_workdir : "/"); - if($relpath) { - $file = "./".$file; - } - if(-r shell_unquote($file)) { - my $mkremote_workdir = - $remote_workdir eq "." ? "true" : "ssh $serverlogin mkdir -p $rsync_destdir"; - return "$mkremote_workdir; rsync $rsync_opt $file $serverlogin:$rsync_destdir"; - } else { - print STDERR "Warning: $file is not readable and will not be transferred\n"; - return "true"; # dummy command to run - } - } else { - # Return or cleanup - #my $noext = no_extension($file); # Remove .ext before prepending ./ - my @cmd = (); - my $rsync_destdir = ($relpath ? "./" : "/"); - #for my $ret_file (@Global::ret_files) { - my $ret_file = $file; - my $remove = $removesource ? "--remove-source-files" : ""; - # If relative path: prepend workdir/./ to avoid problems if the dir contains ':' - # and to get the right relative return path - my $replaced = ($relpath ? workdir()."/./" : "") . - $file; -# context_replace($ret_file,[$file]); - # --return - # Abs path: rsync -rlDzRE server:/home/tange/dir/subdir/file.gz / - # Rel path: rsync -rlDzRE server:./subsir/file.gz ./ - push(@cmd, "rsync $rsync_opt $remove $serverlogin:". - shell_quote_scalar($replaced) . " ".$rsync_destdir); - #} - return join(";",@cmd); - } -} sub setup_basefile { # Transfer basefiles to each $sshlogin # This needs to be done before first jobs on $sshlogin is run # Returns: N/A my $cmd = ""; - for my $sshlogin (keys %Global::host) { - if($sshlogin eq ":") { next } - my ($sshcmd,$serverlogin) = sshcommand_of_sshlogin($sshlogin); + for my $sshlogin (values %Global::host) { + if($sshlogin->string() eq ":") { next } + my $sshcmd = $sshlogin->sshcommand(); + my $serverlogin = $sshlogin->serverlogin(); my $rsync_opt = "-rlDzR -e".shell_quote_scalar($sshcmd); for my $file (@::opt_basefile) { my $f = $file; @@ -4321,9 +4002,10 @@ sub cleanup_basefile { # Remove the basefiles transferred # Returns: N/A my $cmd=""; - for my $sshlogin (keys %Global::host) { - if($sshlogin eq ":") { next } - my ($sshcmd,$serverlogin) = sshcommand_of_sshlogin($sshlogin); + for my $sshlogin (values %Global::host) { + if($sshlogin->string() eq ":") { next } + my $sshcmd = $sshlogin->sshcommand(); + my $serverlogin = $sshlogin->serverlogin(); for my $file (@::opt_basefile) { $cmd .= "$sshcmd $serverlogin rm -f ".shell_quote_scalar(shell_quote_scalar($file))."&"; } @@ -4338,7 +4020,7 @@ sub __SIGNAL_HANDLING__ {} sub list_running_jobs { # Returns: N/A for my $v (values %Global::running) { - print STDERR "$Global::progname: ",$v->{'command'},"\n"; + print STDERR "$Global::progname: ",$v->replaced(),"\n"; } } @@ -4392,36 +4074,28 @@ sub reaper { } # Ignore processes that we did not start $Global::running{$stiff} or next; - $Global::running{$stiff}{'exitstatus'} = $? >> 8; - debug("died ($Global::running{$stiff}{'exitstatus'}): $Global::running{$stiff}{'seq'}"); + $Global::running{$stiff}->set_exitstatus($? >> 8); + debug("died (".$Global::running{$stiff}->exitstatus()."): ".$Global::running{$stiff}->seq()); if($stiff == $Global::tty_taken) { # The process that died had the tty => release it $Global::tty_taken = 0; } my $retry_job = 0; if ($::opt_retries) { - my $clean_command = $Global::running{$stiff}{'clean_command'}; - my $sshlogin = $Global::running{$stiff}{'sshlogin'}; - if(not $Global::running{$stiff}{'exitstatus'}) { + my $job = $Global::running{$stiff}; + my $clean_command = $job->replaced(); + if(not $job->exitstatus()) { # Completed with success. If there is a recorded failure: forget it - delete $Global::failed{$clean_command}; + $job->reset_failed_here(); } else { # The job failed. Should it be retried? - $Global::failed{$clean_command}{'count'}{$sshlogin}++; - $Global::failed{$clean_command}{'seq'} = $Global::running{$stiff}{'seq'}; - my $total_failures = - sum(map { $Global::failed{$clean_command}{'count'}{$_} } - keys %Global::host); - if($total_failures == $::opt_retries) { + $job->add_failed_here(); + if($job->total_failed() == $::opt_retries) { # This has been retried enough $retry_job = 0; - delete $Global::failed{$clean_command}; } else { # This command should be retried - # $Global::CommandLineQueue ||= CommandLineQueue->new($Global::argfile); - # Convert $clean_command to the relevant command object - my $cmdobject = $Global::clean_commands{$clean_command}; - $Global::CommandLineQueue->unget($cmdobject); + $Global::CommandLineQueue->unget($job); $retry_job = 1; } } @@ -4429,24 +4103,24 @@ sub reaper { if(not $retry_job) { # Force printing now if the job failed and we are going to exit - my $print_now = ($Global::running{$stiff}{'exitstatus'} and + my $print_now = ($Global::running{$stiff}->exitstatus() and $::opt_halt_on_error and $::opt_halt_on_error == 2); if($Global::keeporder and not $print_now) { - $Private::print_later{$Global::running{$stiff}{"seq"}} = + $Private::print_later{$Global::running{$stiff}->seq()} = $Global::running{$stiff}; $Private::job_end_sequence ||= 1; debug("Looking for: $Private::job_end_sequence ". - "Current: ".$Global::running{$stiff}{"seq"}."\n"); + "Current: ".$Global::running{$stiff}->seq()."\n"); while($Private::print_later{$Private::job_end_sequence}) { debug("Found job end $Private::job_end_sequence"); - print_job($Private::print_later{$Private::job_end_sequence}); + $Private::print_later{$Private::job_end_sequence}->print(); delete $Private::print_later{$Private::job_end_sequence}; $Private::job_end_sequence++; } } else { - print_job ($Global::running{$stiff}); + $Global::running{$stiff}->print(); } - if($Global::running{$stiff}{'exitstatus'}) { + if($Global::running{$stiff}->exitstatus()) { # The jobs had a exit status <> 0, so error $Global::exitstatus++; if($::opt_halt_on_error) { @@ -4455,21 +4129,21 @@ sub reaper { print STDERR ("$Global::progname: Starting no more jobs. ", "Waiting for ", scalar(keys %Global::running), " jobs to finish. This job failed:\n", - $Global::running{$stiff}{"command"},"\n"); + $Global::running{$stiff}->replaced(),"\n"); $Global::start_no_new_jobs++; - $Global::halt_on_error_exitstatus = $Global::running{$stiff}{'exitstatus'}; + $Global::halt_on_error_exitstatus = $Global::running{$stiff}->exitstatus(); } elsif($::opt_halt_on_error == 2) { # If halt on error == 2 we should exit immediately print STDERR ("$Global::progname: This job failed:\n", - $Global::running{$stiff}{"command"},"\n"); - exit ($Global::running{$stiff}{'exitstatus'}); + $Global::running{$stiff}->replaced(),"\n"); + exit ($Global::running{$stiff}->exitstatus()); } } } } - my $sshlogin = $Global::running{$stiff}{'sshlogin'}; - $Global::host{$sshlogin}{'no_of_running'}--; - $Global::host{$sshlogin}{'completed'}++; + my $sshlogin = $Global::running{$stiff}->sshlogin(); + $sshlogin->dec_jobs_running(); + $sshlogin->inc_jobs_completed(); $Global::total_running--; $Global::total_completed++; delete $Global::running{$stiff}; @@ -4606,9 +4280,870 @@ sub my_dump { ##### OO Parts below ### -package JobSlot; +package SSHLogin; + +sub new { + my $class = shift; + my $string = shift; + my @unget = (); + return bless { + 'string' => $string, + }, ref($class) || $class; +} + +sub string { + my $self = shift; + return $self->{'string'}; +} + +sub jobs_running { + my $self = shift; + + return ($self->{'jobs_running'} || "0"); +} + +sub inc_jobs_running { + my $self = shift; + $self->{'jobs_running'}++; +} + +sub dec_jobs_running { + my $self = shift; + $self->{'jobs_running'}--; +} + +sub set_jobs_running { + my $self = shift; + $self->{'jobs_running'} = 0; +} + +sub set_maxlength { + my $self = shift; + $self->{'maxlength'} = shift; +} + +sub maxlength { + my $self = shift; + return $self->{'maxlength'}; +} + +sub jobs_completed { + my $self = shift; + return $self->{'jobs_completed'}; +} + +sub inc_jobs_completed { + my $self = shift; + $self->{'jobs_completed'}++; +} + +sub max_line_length { + my $self = shift; + die "TODO"; +} + +sub set_max_jobs_running { + my $self = shift; + $self->{'max_jobs_running'} = shift; +} +sub max_jobs_running { + my $self = shift; + if(not defined $self->{'max_jobs_running'}) { + $self->{'max_jobs_running'} = + ::compute_number_of_processes($::opt_P,$self); + } + return $self->{'max_jobs_running'}; +} + +sub ncpus { + my $self = shift; + my $ncpus = shift; # undef if we do not want to set it + if(defined $ncpus) { + $self->{'ncpus'} = $ncpus; + } elsif(not defined $self->{'ncpus'}) { + my $sshcmd = $self->sshcommand(); + my $serverlogin = $self->serverlogin(); + if($serverlogin eq ":") { + if($::opt_use_cpus_instead_of_cores) { + $self->{'ncpus'} = no_of_cpus(); + } else { + $self->{'ncpus'} = no_of_cores(); + } + } else { + my $ncpu; + if($::opt_use_cpus_instead_of_cores) { + $ncpu = qx(echo|$sshcmd $serverlogin parallel --number-of-cpus); + chomp($ncpu); + } else { + $ncpu = qx(echo|$sshcmd $serverlogin parallel --number-of-cores); + chomp($ncpu); + } + if($ncpu =~ /^[0-9]+$/) { + $self->{'ncpus'} = $ncpu; + } else { + print STDERR ("Warning: Could not figure out ", + "number of cpus on $serverlogin. Using 1"); + $self->{'ncpus'} = 1; + } + } + } + return $self->{'ncpus'}; +} + +sub no_of_cpus { + # Returns: + # Number of physical CPUs + if(not $Private::no_of_cpus) { + local $/="\n"; # If delimiter is set, then $/ will be wrong + my $no_of_cpus = (no_of_cpus_freebsd() + || no_of_cpus_darwin() + || no_of_cpus_solaris() + || no_of_cpus_gnu_linux() + ); + if($no_of_cpus) { + $Private::no_of_cpus = $no_of_cpus; + } else { + warn("Cannot figure out number of cpus. Using 1"); + $Private::no_of_cpus = 1; + } + } + return $Private::no_of_cpus; +} + +sub no_of_cores { + # Returns: + # Number of CPU cores + if(not $Private::no_of_cores) { + local $/="\n"; # If delimiter is set, then $/ will be wrong + my $no_of_cores = (no_of_cores_freebsd() + || no_of_cores_darwin() + || no_of_cores_solaris() + || no_of_cores_gnu_linux() + ); + if($no_of_cores) { + $Private::no_of_cores = $no_of_cores; + } else { + warn("Cannot figure out number of CPU cores. Using 1"); + $Private::no_of_cores = 1; + } + } + return $Private::no_of_cores; +} + +sub no_of_cpus_gnu_linux { + # Returns: + # Number of physical CPUs on GNU/Linux + my $no_of_cpus; + if(-e "/proc/cpuinfo") { + $no_of_cpus = 0; + my %seen; + open(IN,"cat /proc/cpuinfo|") || return undef; + while() { + if(/^physical id.*[:](.*)/ and not $seen{$1}++) { + $no_of_cpus++; + } + } + close IN; + } + return $no_of_cpus; +} + +sub no_of_cores_gnu_linux { + # Returns: + # Number of CPU cores on GNU/Linux + my $no_of_cores; + if(-e "/proc/cpuinfo") { + $no_of_cores = 0; + open(IN,"cat /proc/cpuinfo|") || return undef; + while() { + /^processor.*[:]/ and $no_of_cores++; + } + close IN; + } + return $no_of_cores; +} + +sub no_of_cpus_darwin { + # Returns: + # Number of physical CPUs on Mac Darwin + my $no_of_cpus = `sysctl -a hw 2>/dev/null | grep -w physicalcpu | awk '{ print \$2 }'`; + return $no_of_cpus; +} + +sub no_of_cores_darwin { + # Returns: + # Number of CPU cores on Mac Darwin + my $no_of_cores = `sysctl -a hw 2>/dev/null | grep -w logicalcpu | awk '{ print \$2 }'`; + return $no_of_cores; +} + +sub no_of_cpus_freebsd { + # Returns: + # Number of physical CPUs on FreeBSD + my $no_of_cpus = `sysctl hw.ncpu 2>/dev/null | awk '{ print \$2 }'`; + return $no_of_cpus; +} + +sub no_of_cores_freebsd { + # Returns: + # Number of CPU cores on FreeBSD + my $no_of_cores = `sysctl -a hw 2>/dev/null | grep -w logicalcpu | awk '{ print \$2 }'`; + return $no_of_cores; +} + +sub no_of_cpus_solaris { + # Returns: + # Number of physical CPUs on Solaris + if(-x "/usr/sbin/psrinfo") { + my @psrinfo = `/usr/sbin/psrinfo`; + if($#psrinfo >= 0) { + return $#psrinfo +1; + } + } + if(-x "/usr/sbin/prtconf") { + my @prtconf = `/usr/sbin/prtconf | grep cpu..instance`; + if($#prtconf >= 0) { + return $#prtconf +1; + } + } + return undef; +} + +sub no_of_cores_solaris { + # Returns: + # Number of CPU cores on Solaris + if(-x "/usr/sbin/psrinfo") { + my @psrinfo = `/usr/sbin/psrinfo`; + if($#psrinfo >= 0) { + return $#psrinfo +1; + } + } + if(-x "/usr/sbin/prtconf") { + my @prtconf = `/usr/sbin/prtconf | grep cpu..instance`; + if($#prtconf >= 0) { + return $#prtconf +1; + } + } + return undef; +} + +sub sshcommand { + my $self = shift; + if (not defined $self->{'sshcommand'}) { + $self->sshcommand_of_sshlogin(); + } + return $self->{'sshcommand'}; +} + +sub serverlogin { + my $self = shift; + if (not defined $self->{'serverlogin'}) { + $self->sshcommand_of_sshlogin(); + } + return $self->{'serverlogin'}; +} + +sub sshcommand_of_sshlogin { + # 'server' -> ('ssh -S /tmp/parallel-ssh-RANDOM/host-','server') + # 'user@server' -> ('ssh','user@server') + # 'myssh user@server' -> ('myssh','user@server') + # 'myssh -l user server' -> ('myssh -l user','server') + # '/usr/local/bin/myssh -l user server' -> ('/usr/local/bin/myssh -l user','server') + # Returns: + # sshcommand - defaults to 'ssh' + # login@host + my $self = shift; + my $sshlogin = $self->{'string'}; + my ($sshcmd, $serverlogin); + if($::oodebug and not defined $sshlogin) { + Carp::confess("No sshlogin"); + die; + } + if($sshlogin =~ /(.+) (\S+)$/) { + # Own ssh command + $sshcmd = $1; $serverlogin = $2; + } else { + # Normal ssh + if($::opt_controlmaster) { + # Use control_path to make ssh faster + my $control_path = control_path_dir()."/ssh-%r@%h:%p"; + $sshcmd = "ssh -S ".$control_path; + $serverlogin = $sshlogin; + #my $master = "ssh -MTS ".control_path_dir()."/ssh-%r@%h:%p ".$serverlogin; + my $master = "ssh -MTS ".control_path_dir()."/ssh-%r@%h:%p ".$serverlogin." sleep 1"; + if(not $Private::control_path{$control_path}++) { + # Master is not running for this control_path + # Start it + my $pid = fork(); + if($pid) { + $Global::sshmaster{$pid}++; + } else { + debug($master,"\n"); + `$master`; + wait_and_exit(0); + } + } + } else { + $sshcmd = "ssh"; $serverlogin = $sshlogin; + } + } + $self->{'sshcommand'} = $sshcmd; + $self->{'serverlogin'} = $serverlogin; +} + + +sub control_path_dir { + # Returns: + # path to directory + if(not $Private::control_path_dir) { + $Private::control_path_dir = + tempdir($ENV{'HOME'}."/.parallel/tmp/control_path_dir-XXXX", + CLEANUP => 1); + } + return $Private::control_path_dir; +} + + + +package JobQueue; + +sub new { + my $class = shift; + my $commandlinequeue = shift; + my @unget = (); + return bless { + 'unget' => \@unget, + 'commandlinequeue' => $commandlinequeue, + }, ref($class) || $class; +} + +sub get { + my $self = shift; + + if(@{$self->{'unget'}}) { + my $job = shift @{$self->{'unget'}}; + return ($job); + } else { + my $commandline = $self->{'commandlinequeue'}->get(); + if(defined $commandline) { + my $job = Job->new($commandline); + return $job; + } else { + return undef; + } + } +} + +sub unget { + my $self = shift; + unshift @{$self->{'unget'}}, @_; +} + +sub empty { + my $self = shift; + my $empty = (not @{$self->{'unget'}}) && $self->{'commandlinequeue'}->empty(); + ::debug("JobQueue->empty $empty\n"); + return $empty; +} + +package Job; + +sub new { + my $class = shift; + my $commandline = shift; + return bless { + 'commandline' => $commandline, + }, ref($class) || $class; +} + +sub replaced { + my $self = shift; + return $self->{'commandline'}->replaced(); +} + +sub set_seq { + my $self = shift; + my $seq = shift; + $self->{'seq'} = $seq; +} + +sub seq { + my $self = shift; + return $self->{'seq'}; +} + +sub set_stdout { + my $self = shift; + my $stdout = shift; + $self->{'stdout'} = $stdout; +} + +sub stdout { + my $self = shift; + return $self->{'stdout'}; +} + +sub stderr { + my $self = shift; + return $self->{'stderr'}; +} + +sub set_stderr { + my $self = shift; + my $stderr = shift; + $self->{'stderr'} = $stderr; +} + +sub pid { + my $self = shift; + return $self->{'pid'}; +} + +sub set_pid { + my $self = shift; + my $pid = shift; + $self->{'pid'} = $pid; +} + +sub failed { + # return number of times failed for this $sshlogin + my $self = shift; + my $sshlogin = shift; + return $self->{'failed'}{$sshlogin}; +} + +sub failed_here { + # return number of times failed for the current $sshlogin + my $self = shift; + return $self->{'failed'}{$self->sshlogin()}; +} + +sub add_failed { + # increase the number of times failed for this $sshlogin + my $self = shift; + my $sshlogin = shift; + $self->{'failed'}{$sshlogin}++; +} + +sub add_failed_here { + # increase the number of times failed for the current $sshlogin + my $self = shift; + $self->{'failed'}{$self->sshlogin()}++; +} + +sub reset_failed { + # increase the number of times failed for this $sshlogin + my $self = shift; + my $sshlogin = shift; + delete $self->{'failed'}{$sshlogin}; +} + +sub reset_failed_here { + # increase the number of times failed for this $sshlogin + my $self = shift; + delete $self->{'failed'}{$self->sshlogin()}; +} + +sub min_failed { + # Returns: + # the number of sshlogins this command has failed on + # the minimal number of times this command has failed + my $self = shift; + my $min_failures = + min(map { $self->{'failed'}{$_} } + keys %{$self->{'failed'}}); + my $number_of_sshlogins_failed_on = scalar keys %{$self->{'failed'}}; + return ($number_of_sshlogins_failed_on,$min_failures); +} + +sub total_failed { + # Returns: + # the number of times this command has failed + my $self = shift; + my $total_failures = 0; + for (values %{$self->{'failed'}}) { + $total_failures += $_; + } + return ($total_failures); +} + +sub set_sshlogin { + my $self = shift; + my $sshlogin = shift; + $self->{'sshlogin'} = $sshlogin; +} + +sub sshlogin { + my $self = shift; + return $self->{'sshlogin'}; +} + +sub sshlogin_wrap { + # Wrap the command with the commands needed to run remotely + my $self = shift; + if(not defined $self->{'sshlogin_wrap'}) { + my $sshlogin = $self->sshlogin(); + if($::oodebug and not defined $sshlogin) { + Carp::confess("No sshlogin"); + die; + } + my $sshcmd = $sshlogin->sshcommand(); + my $serverlogin = $sshlogin->serverlogin(); + my ($next_command_line) = $self->replaced(); + my ($pre,$post,$cleanup)=("","",""); + if($serverlogin ne ":") { + $Global::transfer_seq++; + # --transfer + $pre .= $self->sshtransfer(); + # --return + $post .= $self->sshreturn(); + # --cleanup + $post .= $self->sshcleanup(); + if($post) { + # We need to save the exit status of the job + $post = '_EXIT_status=$?; '.$post.' exit $_EXIT_status;'; + } + my $parallel_env = 'PARALLEL_SEQ=$PARALLEL_SEQ\;export PARALLEL_SEQ\;'. + 'PARALLEL_PID=$PARALLEL_PID\;export PARALLEL_PID\;'; + if($::opt_workdir) { + $self->{'sshlogin_wrap'} = ($pre . "$sshcmd $serverlogin $parallel_env " + . ::shell_quote_scalar("cd ".workdir()." && ") + . ::shell_quote_scalar($next_command_line).";".$post); + } else { + $self->{'sshlogin_wrap'} = ($pre . "$sshcmd $serverlogin $parallel_env " + .::shell_quote_scalar($next_command_line).";".$post); + } + } else { + $self->{'sshlogin_wrap'} = $next_command_line; + } + } + return $self->{'sshlogin_wrap'}; +} + +sub transfer { + # Files to transfer + my $self = shift; + my @transfer = (); + if($::opt_transfer) { + for my $record (@{$self->{'commandline'}{'arg_list'}}) { + # Merge arguments from records into args + for my $arg (@$record) { + CORE::push @transfer, $arg->orig(); + } + } + } + return @transfer; +} + +sub sshtransfer { + my $self = shift; + my $sshlogin = $self->sshlogin(); + my $sshcmd = $sshlogin->sshcommand(); + my $serverlogin = $sshlogin->serverlogin(); + my $rsync_opt = "-rlDzRE -e".::shell_quote_scalar($sshcmd); + my $pre = ""; + for my $file ($self->transfer()) { + $file =~ s:/\./:/:g; # Rsync treats /./ special. We dont want that + $file =~ s:^\./::g; # Remove ./ if any + my $relpath = ($file !~ m:^/:); # Is the path relative? + # Use different subdirs depending on abs or rel path + # Abs path: rsync -rlDzRE /home/tange/dir/subdir/file.gz server:/ + # Rel path: rsync -rlDzRE ./subdir/file.gz server:.parallel/tmp/tempid/ + # Rel path: rsync -rlDzRE ./subdir/file.gz server:$workdir/ + my $remote_workdir = workdir($file); + my $rsync_destdir = ($relpath ? $remote_workdir : "/"); + if($relpath) { + $file = "./".$file; + } + if(-r $file) { + my $mkremote_workdir = + $remote_workdir eq "." ? "true" : "ssh $serverlogin mkdir -p $rsync_destdir"; + $pre .= "$mkremote_workdir; rsync $rsync_opt ".::shell_quote_scalar($file)." $serverlogin:$rsync_destdir;"; + } else { + print STDERR "Warning: $file is not readable and will not be transferred\n"; + } + } + return $pre; +} + +sub return { + # Files to return + # Quoted and with {...} substituted + my $self = shift; + my @return = (); + for my $return (@{$self->{'commandline'}{'return_files'}}) { + CORE::push @return, $self->{'commandline'}->replace_placeholders($return); + } + return @return; +} + +sub sshreturn { + my $self = shift; + my $sshlogin = $self->sshlogin(); + my $pre = ""; + for my $file ($self->return()) { + $pre .= sshtransferreturn($sshlogin,$file,0,$::opt_cleanup).";"; + } + return $pre; +} + +sub sshcleanup { + # Return the sshcommand needed to remove the file + # Returns: + # ssh command needed to remove files from sshlogin + my $self = shift; + my ($sshlogin) = $self->sshlogin(); + my $sshcmd = $sshlogin->sshcommand(); + my $serverlogin = $sshlogin->serverlogin(); + my $workdir = workdir(); + my $removeworkdir = ""; + my $cleancmd = ""; + + for my $file ($self->cleanup()) { + my @subworkdirs = parentdirs_of($file); + $file = ::shell_quote_scalar($file); + if(@subworkdirs) { + $removeworkdir = "; rmdir 2>/dev/null ".join(" ",map { ::shell_quote_scalar($workdir."/".$_) } @subworkdirs); + } + my $relpath = ($file !~ m:^/:); # Is the path relative? + my $cleandir = ($relpath ? $workdir : ""); + $cleancmd .= "$sshcmd $serverlogin rm -f ".::shell_quote_scalar($file.$removeworkdir).";"; + } + return $cleancmd; +} + +sub cleanup { + # Returns: + # Files to remove at cleanup + my $self = shift; + if($::opt_cleanup) { + my @transfer = $self->transfer(); + return @transfer; + } else { + return (); + } +} + +sub sshtransferreturn { + # Returns: + # ssh comands needed to transfer file to/from sshlogin + my ($sshlogin,$file,$transfer,$removesource) = (@_); + my $sshcmd = $sshlogin->sshcommand(); + my $serverlogin = $sshlogin->serverlogin(); + my $rsync_opt = "-rlDzRE -e".::shell_quote_scalar($sshcmd); + $file =~ s:/\./:/:g; # Rsync treats /./ special. We dont want that + $file =~ s:^\./::g; # Remove ./ if any + my $relpath = ($file !~ m:^/:); # Is the path relative? + # Use different subdirs depending on abs or rel path + if($transfer) { + die("use sshtransfer instead"); + } else { + # Return or cleanup + #my $noext = no_extension($file); # Remove .ext before prepending ./ + my @cmd = (); + my $rsync_destdir = ($relpath ? "./" : "/"); + #for my $ret_file (@Global::ret_files) { + my $ret_file = $file; + my $remove = $removesource ? "--remove-source-files" : ""; + # If relative path: prepend workdir/./ to avoid problems if the dir contains ':' + # and to get the right relative return path + my $replaced = ($relpath ? workdir()."/./" : "") . + $file; +# context_replace($ret_file,[$file]); + # --return + # Abs path: rsync -rlDzRE server:/home/tange/dir/subdir/file.gz / + # Rel path: rsync -rlDzRE server:./subsir/file.gz ./ + push(@cmd, "rsync $rsync_opt $remove $serverlogin:". + ::shell_quote_scalar($replaced) . " ".$rsync_destdir); + #} + return join(";",@cmd); + } +} + +sub workdir { + # Returns: + # the workdir on a remote machine + my $workdir; + if(defined $::opt_workdir) { + if($::opt_workdir ne "...") { + $workdir = $::opt_workdir; + $workdir =~ s:/\./:/:g; # Rsync treats /./ special. We dont want that + $workdir =~ s:/+$::; # Remove ending / if any + $workdir =~ s:^\./::g; # Remove starting ./ if any + } else { + $workdir = ".parallel/tmp/".::hostname()."-".$$."-".$Global::transfer_seq; + } + } else { + $workdir = "."; + } + return $workdir; +} + +sub parentdirs_of { + # Return: + # all parentdirs except . of this dir or file - sorted descending by length + my $d = shift; + my @parents = (); + while($d =~ s:/[^/]+$::) { + if($d ne ".") { + push @parents, $d; + } + } + return @parents; +} + +sub start { + # Setup STDOUT and STDERR for a job and start it. + # Returns: + # job-object or undef if job not to run + my $job = shift; +# my $commandline = $job->{'commandline'}; + my $command = $job->sshlogin_wrap(); +# my $clean_command = $commandline->replaced(); + my ($pid,$name); + if($Global::grouped) { + my (%out,%err,$outname,$errname); + # To group we create temporary files for STDOUT and STDERR + # To avoid the cleanup unlink the files immediately (but keep them open) + $outname = ++$Private::TmpFilename; + ($out{$outname},$name) = ::tempfile(SUFFIX => ".par"); + unlink $name; + $errname = ++$Private::TmpFilename; + ($err{$errname},$name) = ::tempfile(SUFFIX => ".par"); + unlink $name; + + open STDOUT, '>&', $out{$outname} or die "Can't redirect STDOUT: $!"; + open STDERR, '>&', $err{$errname} or die "Can't dup STDOUT: $!"; + $job->set_stdout($out{$outname}); + $job->set_stderr($err{$errname}); + } + + if($Global::interactive or $Global::stderr_verbose) { + if($Global::interactive) { + print $Global::original_stderr "$command ?..."; + open(TTY,"/dev/tty") || die; + my $answer = ; + close TTY; + my $run_yes = ($answer =~ /^\s*y/i); + if (not $run_yes) { + open STDOUT, ">&", $Global::original_stdout + or die "Can't dup \$oldout: $!"; + open STDERR, ">&", $Global::original_stderr + or die "Can't dup \$oldout: $!"; + return undef; + } + } else { + print $Global::original_stderr "$command\n"; + } + } + if($Global::verbose and not $Global::grouped) { + if($Global::verbose == 1) { + print STDOUT $job->replaced(),"\n"; + } else { + # Verbose level > 1: Print the rsync and stuff + print STDOUT $command,"\n"; + } + } + $Global::total_running++; + $Global::total_started++; + #print STDERR "LEN".length($command)."\n"; + if(not $job->seq()) { + # This is a new (non-retried) job: Give it a new seq + $Private::job_start_sequence++; + $job->set_seq($Private::job_start_sequence); + } + $ENV{'PARALLEL_SEQ'} = $job->seq(); + $ENV{'PARALLEL_PID'} = $$; + ::debug("$Global::total_running processes. Starting (".$job->seq()."): $command\n"); + if(@::opt_a and $job->seq() == 1) { + # Give STDIN to the first job if using -a + ::debug("seq=1\n"); + $pid = ::open3("<&STDIN", ">&STDOUT", ">&STDERR", $command) || + die("open3 (with -a) failed. Report a bug to \n"); + # Re-open to avoid complaining + open STDIN, "<&", $Global::original_stdin + or die "Can't dup \$Global::original_stdin: $!"; + } elsif (not $Global::tty_taken and -c "/dev/tty" and + open(DEVTTY, "/dev/tty")) { + # Give /dev/tty to the command if no one else is using it + ::debug("tty free $command\n"); + $pid = ::open3("<&DEVTTY", ">&STDOUT", ">&STDERR", $command) || + die("open3 (with /dev/tty) failed. Report a bug to \n"); + $Global::tty_taken = $pid; + close DEVTTY; + ::debug("tty worked\n"); + } else { + ::debug("gensym\n"); + $pid = ::open3(::gensym, ">&STDOUT", ">&STDERR", $command) || + die("open3 (with gensym) failed. Report a bug to \n"); + ::debug("gensym worked\n"); + } + $job->set_pid($pid); + open STDOUT, ">&", $Global::original_stdout + or die "Can't dup \$Global::original_stdout: $!"; + open STDERR, ">&", $Global::original_stderr + or die "Can't dup \$Global::original_stderr: $!"; + ::debug("started: $command\n"); + return $job; +} + +sub print { + # Print the output of the jobs + # Returns: N/A + + my $self = shift; + ::debug(">>joboutput ".$self->replaced()."\n"); + # Only relevant for grouping + $Global::grouped or return; + my $out = $self->stdout(); + my $err = $self->stderr(); + my ($command) = $self->sshlogin_wrap(); + # my ($clean_command) = $self->{'commandline'}->replaced(); + + if($Global::verbose and $Global::grouped) { + if($Global::verbose == 1) { + print STDOUT $self->replaced(),"\n"; + } else { + # Verbose level > 1: Print the rsync and stuff + print STDOUT $command,"\n"; + } + # If STDOUT and STDERR are merged, we want the command to be printed first + # so flush to avoid STDOUT being buffered + flush STDOUT; + } + seek $_, 0, 0 for $out, $err; + if($Global::debug) { + print STDERR "ERR:\n"; + } + my $buf; + while(sysread($err,$buf,1000_000)) { + print STDERR $buf; + } + flush STDERR; + if($Global::debug) { + print STDOUT "OUT:\n"; + } + while(sysread($out,$buf,1000_000)) { + print STDOUT $buf; + } + flush STDOUT; + ::debug("<{'exitstatus'}; +} + +sub set_exitstatus { + my $self = shift; + my $exitstatus = shift; + $self->{'exitstatus'} = $exitstatus; +} package CommandLine; @@ -4650,9 +5185,6 @@ sub new { } } -# if(not $max_number_of_args) { -# $max_number_of_args = 2**31; # As many as possible (-X or -m) -# } return bless { 'command' => $command, 'len' => $len, @@ -4866,102 +5398,6 @@ sub number_of_replacements { return ($sum,$no_args,$context,$number_of_context_groups,%count); } -sub sshlogin_wrap { - # Wrap the command with the commands needed to run remotely - my $self = shift; - my $sshlogin = shift; - my ($sshcmd,$serverlogin) = ::sshcommand_of_sshlogin($sshlogin); - my ($next_command_line) = $self->replaced(); - my ($clean_command) = $next_command_line; - my ($pre,$post,$cleanup)=("","",""); - if($serverlogin ne ":") { - $Global::transfer_seq++; - # --transfer - $pre .= $self->sshtransfer($sshlogin); - - for my $file ($self->cleanup()) { - $cleanup .= ::sshcleanup($sshlogin,::shell_quote_scalar($file)).";"; - } - for my $file ($self->return()) { - $post .= ::sshreturn($sshlogin,$file).";"; - } - $post.=$cleanup; - if($post) { - # We need to save the exit status of the job - $post = '_EXIT_status=$?; '.$post.' exit $_EXIT_status;'; - } - my $parallel_env = 'PARALLEL_SEQ=$PARALLEL_SEQ\;export PARALLEL_SEQ\;'. - 'PARALLEL_PID=$PARALLEL_PID\;export PARALLEL_PID\;'; - if($::opt_workdir) { - return ($pre . "$sshcmd $serverlogin $parallel_env " - . ::shell_quote_scalar("cd ".::workdir()." && ") - . ::shell_quote_scalar($next_command_line).";".$post,$clean_command); - } else { - return ($pre . "$sshcmd $serverlogin $parallel_env " - .::shell_quote_scalar($next_command_line).";".$post,$clean_command); - } - } else { - return ($next_command_line,$clean_command); - } -} - -sub transfer { - # Files to transfer - my $self = shift; - my @transfer = (); - if($::opt_transfer) { - for my $record (@{$self->{'arg_list'}}) { - # Merge arguments from records into args - for my $arg (@$record) { - CORE::push @transfer, $arg->orig(); - } - } - } - return @transfer; -} - -sub sshtransfer { - my $self = shift; - my $sshlogin = shift; - my $pre = ""; - for my $file ($self->transfer()) { - $pre .= ::sshtransferreturn($sshlogin,::shell_quote_scalar($file),1,0).";"; - } - return $pre; -} - -sub return { - # Files to return - # Quoted and with {...} substituted - my $self = shift; - my @return = (); - for my $return (@{$self->{'return_files'}}) { - CORE::push @return, $self->replace_placeholders($return); - } - return @return; -} - -sub sshreturn { - my $self = shift; - my $sshlogin = shift; - my $pre = ""; - for my $file ($self->transfer()) { - $pre .= ::sshtransferreturn($sshlogin,$file,0,$::opt_cleanup).";"; - } - return $pre; -} - -sub cleanup { - # Files to remove at cleanup - my $self = shift; - if($::opt_cleanup) { - my @transfer = $self->transfer(); - return @transfer; - } else { - return (); - } -} - sub replaced { my $self = shift; if(not $self->{'replaced'}) { @@ -5110,8 +5546,6 @@ sub get { # We did not get more args - maybe at EOF string? return undef; } else { - # TODO get rid of this hash (needed in sub reaper) - $Global::clean_commands{$cmd_line->replaced()} = $cmd_line; return ($cmd_line); } } @@ -5145,6 +5579,7 @@ sub size { return $self->{'size'}; } + package Limits::Command; # Maximal command line length (for -m and -X) diff --git a/testsuite/tests-to-run/test13.sh b/testsuite/tests-to-run/test13.sh index 8328f2e3..abb49d1f 100755 --- a/testsuite/tests-to-run/test13.sh +++ b/testsuite/tests-to-run/test13.sh @@ -6,10 +6,10 @@ ulimit -n 50 | stdout parallel -k -j0 echo '### Test --keep-order' -(seq 0 2) | parallel --keep-order -j100% -S 1/:,2/parallel@server2 -q perl -e 'sleep 1;print "job{}\n";exit({})' +(seq 0 2) | parallel --keep-order -j100% -S 1/:,2/parallel@parallel-server2 -q perl -e 'sleep 1;print "job{}\n";exit({})' echo '### Test --keeporder' -(seq 0 2) | parallel --keeporder -j100% -S 1/:,2/parallel@server2 -q perl -e 'sleep 1;print "job{}\n";exit({})' +(seq 0 2) | parallel --keeporder -j100% -S 1/:,2/parallel@parallel-server2 -q perl -e 'sleep 1;print "job{}\n";exit({})' echo '### Test SIGTERM' (sleep 5; killall parallel -TERM) & seq 1 100 | stdout parallel -k sleep 3';' echo | sort diff --git a/testsuite/wanted-results/test30 b/testsuite/wanted-results/test30 index a094c913..969b7cf5 100644 --- a/testsuite/wanted-results/test30 +++ b/testsuite/wanted-results/test30 @@ -3,12 +3,12 @@ ### Test of --eta with no jobs Computers / CPU cores / Max jobs to run -1:local / - / 9 +1:local / 2 / 9 ### Test of --progress 16 ### Test of --progress with no jobs Computers / CPU cores / Max jobs to run -1:local / - / 9 +1:local / 2 / 9