parallel: Removed some global variables. Code cleanup.

This commit is contained in:
Ole Tange 2010-12-19 12:58:36 +01:00
parent 27822174c2
commit ec51739620
2 changed files with 56 additions and 84 deletions

View file

@ -36,8 +36,17 @@ if($::opt_skip_first_line) {
<$fh>; <$fh>;
} }
my $command;
if(@ARGV) {
if($Global::quoting) {
$command = shell_quote(@ARGV);
} else {
$command = join(" ", @ARGV);
}
}
$Global::JobQueue = JobQueue->new( $Global::JobQueue = JobQueue->new(
join(" ",@ARGV),\@fhlist,$Global::Xargs,$number_of_args,\@Global::ret_files); $command,\@fhlist,$Global::Xargs,$number_of_args,\@Global::ret_files);
init_run_jobs(); init_run_jobs();
my $sem; my $sem;
@ -142,7 +151,7 @@ sub get_options_from_array {
"arg-file-sep|argfilesep=s" => \$::opt_arg_file_sep, "arg-file-sep|argfilesep=s" => \$::opt_arg_file_sep,
"trim=s" => \$::opt_trim, "trim=s" => \$::opt_trim,
"profile|J=s" => \$::opt_profile, "profile|J=s" => \$::opt_profile,
# xargs-compatibility - implemented, man, unittest # xargs-compatibility - implemented, man, testsuite
"max-procs|P=s" => \$::opt_P, "max-procs|P=s" => \$::opt_P,
"delimiter|d=s" => \$::opt_d, "delimiter|d=s" => \$::opt_d,
"max-chars|s=i" => \$::opt_s, "max-chars|s=i" => \$::opt_s,
@ -183,7 +192,7 @@ sub get_options_from_array {
sub parse_options { sub parse_options {
# Returns: N/A # Returns: N/A
# Defaults: # Defaults:
$Global::version = 20101206; $Global::version = 20101217;
$Global::progname = 'parallel'; $Global::progname = 'parallel';
$Global::debug = 0; $Global::debug = 0;
$Global::verbose = 0; $Global::verbose = 0;
@ -201,7 +210,6 @@ sub parse_options {
$Global::default_simultaneous_sshlogins = 9; $Global::default_simultaneous_sshlogins = 9;
$Global::exitstatus = 0; $Global::exitstatus = 0;
$Global::halt_on_error_exitstatus = 0; $Global::halt_on_error_exitstatus = 0;
$Global::total_jobs = 0;
$Global::arg_sep = ":::"; $Global::arg_sep = ":::";
$Global::arg_file_sep = "::::"; $Global::arg_file_sep = "::::";
$Global::trim = 'n'; $Global::trim = 'n';
@ -294,14 +302,6 @@ sub parse_options {
$::opt_progress = $::opt_eta; $::opt_progress = $::opt_eta;
} }
if(@ARGV) {
if($Global::quoting) {
$Global::command = shell_quote(@ARGV);
} else {
$Global::command = join(" ", @ARGV);
}
}
parse_sshlogin(); parse_sshlogin();
if(remote_hosts() and ($Global::xargs or $Global::Xargs) if(remote_hosts() and ($Global::xargs or $Global::Xargs)
@ -404,7 +404,6 @@ sub read_args_from_command_line {
@ARGV=(); @ARGV=();
if(defined $prepend) { if(defined $prepend) {
push(@Global::unget_argv, [Arg->new($prepend)]); push(@Global::unget_argv, [Arg->new($prepend)]);
$Global::total_jobs++;
} }
last; last;
} }
@ -420,7 +419,6 @@ sub read_args_from_command_line {
} }
} }
push(@Global::unget_argv, [Arg->new($arg)]); push(@Global::unget_argv, [Arg->new($arg)]);
$Global::total_jobs++;
} }
last; last;
} else { } else {
@ -891,14 +889,12 @@ sub start_more_jobs {
} }
sub start_another_job { sub start_another_job {
# Grab a job from @Global::command, start it at sshlogin # Grab a job from Global::JobQueue, start it at sshlogin
# and remember the pid, the STDOUT and the STDERR handles # and remember the pid, the STDOUT and the STDERR handles
# Returns: # Returns:
# 1 if another jobs was started # 1 if another jobs was started
# 0 otherwise # 0 otherwise
my $sshlogin = shift; my $sshlogin = shift;
# ::my_dump($sshlogin);
# ::my_dump($Global::JobQueue);
# Do we have enough file handles to start another job? # Do we have enough file handles to start another job?
if(enough_file_handles()) { if(enough_file_handles()) {
if($Global::JobQueue->empty()) { if($Global::JobQueue->empty()) {
@ -1015,15 +1011,9 @@ sub parse_sshlogin {
} }
} }
for my $sshlogin_string (@login) { for my $sshlogin_string (@login) {
if($sshlogin_string =~ s:^(\d*)/:: and $1) { my $sshlogin = SSHLogin->new($sshlogin_string);
# Override default autodetected ncpus unless zero or missing $sshlogin->set_maxlength(Limits::Command::max_length());
$Global::host{$sshlogin_string} = SSHLogin->new($sshlogin_string); $Global::host{$sshlogin->string()} = $sshlogin;
$Global::host{$sshlogin_string}->set_ncpus($1);
} else {
$Global::host{$sshlogin_string} = SSHLogin->new($sshlogin_string);
}
$Global::host{$sshlogin_string}->set_jobs_running(0);
$Global::host{$sshlogin_string}->set_maxlength(Limits::Command::max_length());
} }
#debug("sshlogin: ", my_dump(%Global::host),"\n"); #debug("sshlogin: ", my_dump(%Global::host),"\n");
if($::opt_transfer or @::opt_return or $::opt_cleanup or @::opt_basefile) { if($::opt_transfer or @::opt_return or $::opt_cleanup or @::opt_basefile) {
@ -1347,15 +1337,21 @@ package SSHLogin;
sub new { sub new {
my $class = shift; my $class = shift;
my $string = shift; my $sshlogin_string = shift;
my $ncpus;
if($sshlogin_string =~ s:^(\d*)/:: and $1) {
# Override default autodetected ncpus unless zero or missing
$ncpus = $1;
}
my $string = $sshlogin_string;
my @unget = (); my @unget = ();
return bless { return bless {
'string' => $string, 'string' => $string,
'jobs_running' => undef, 'jobs_running' => 0,
'jobs_completed' => 0, 'jobs_completed' => 0,
'maxlength' => undef, 'maxlength' => undef,
'max_jobs_running' => undef, 'max_jobs_running' => undef,
'ncpus' => undef, 'ncpus' => $ncpus,
'sshcommand' => undef, 'sshcommand' => undef,
'serverlogin' => undef, 'serverlogin' => undef,
'control_path_dir' => undef, 'control_path_dir' => undef,
@ -2096,6 +2092,7 @@ sub new {
my $commandline = shift; my $commandline = shift;
return bless { return bless {
'commandline' => $commandline, 'commandline' => $commandline,
'workdir' => undef,
'seq' => undef, 'seq' => undef,
'stdout' => undef, 'stdout' => undef,
'stderr' => undef, 'stderr' => undef,
@ -2248,7 +2245,6 @@ sub sshlogin_wrap {
if($serverlogin eq ":") { if($serverlogin eq ":") {
$self->{'sshlogin_wrap'} = $next_command_line; $self->{'sshlogin_wrap'} = $next_command_line;
} else { } else {
$Global::transfer_seq++;
# --transfer # --transfer
$pre .= $self->sshtransfer(); $pre .= $self->sshtransfer();
# --return # --return
@ -2263,7 +2259,7 @@ sub sshlogin_wrap {
'PARALLEL_PID=$PARALLEL_PID\;export PARALLEL_PID\;'; 'PARALLEL_PID=$PARALLEL_PID\;export PARALLEL_PID\;';
if($::opt_workdir) { if($::opt_workdir) {
$self->{'sshlogin_wrap'} = ($pre . "$sshcmd $serverlogin $parallel_env " $self->{'sshlogin_wrap'} = ($pre . "$sshcmd $serverlogin $parallel_env "
. ::shell_quote_scalar("cd ".workdir()." && ") . ::shell_quote_scalar("cd ".$self->workdir()." && ")
. ::shell_quote_scalar($next_command_line).";".$post); . ::shell_quote_scalar($next_command_line).";".$post);
} else { } else {
$self->{'sshlogin_wrap'} = ($pre . "$sshcmd $serverlogin $parallel_env " $self->{'sshlogin_wrap'} = ($pre . "$sshcmd $serverlogin $parallel_env "
@ -2304,7 +2300,7 @@ sub sshtransfer {
# Abs path: rsync -rlDzRE /home/tange/dir/subdir/file.gz server:/ # Abs path: rsync -rlDzRE /home/tange/dir/subdir/file.gz server:/
# Rel path: rsync -rlDzRE ./subdir/file.gz server:.parallel/tmp/tempid/ # Rel path: rsync -rlDzRE ./subdir/file.gz server:.parallel/tmp/tempid/
# Rel path: rsync -rlDzRE ./subdir/file.gz server:$workdir/ # Rel path: rsync -rlDzRE ./subdir/file.gz server:$workdir/
my $remote_workdir = workdir($file); my $remote_workdir = $self->workdir($file);
my $rsync_destdir = ($relpath ? $remote_workdir : "/"); my $rsync_destdir = ($relpath ? $remote_workdir : "/");
if($relpath) { if($relpath) {
$file = "./".$file; $file = "./".$file;
@ -2352,7 +2348,7 @@ sub sshreturn {
my $remove = $::opt_cleanup ? "--remove-source-files" : ""; my $remove = $::opt_cleanup ? "--remove-source-files" : "";
# If relative path: prepend workdir/./ to avoid problems if the dir contains ':' # If relative path: prepend workdir/./ to avoid problems if the dir contains ':'
# and to get the right relative return path # and to get the right relative return path
my $replaced = ($relpath ? workdir()."/./" : "") . $file; my $replaced = ($relpath ? $self->workdir()."/./" : "") . $file;
# --return # --return
# Abs path: rsync -rlDzRE server:/home/tange/dir/subdir/file.gz / # Abs path: rsync -rlDzRE server:/home/tange/dir/subdir/file.gz /
# Rel path: rsync -rlDzRE server:./subsir/file.gz ./ # Rel path: rsync -rlDzRE server:./subsir/file.gz ./
@ -2370,7 +2366,7 @@ sub sshcleanup {
my ($sshlogin) = $self->sshlogin(); my ($sshlogin) = $self->sshlogin();
my $sshcmd = $sshlogin->sshcommand(); my $sshcmd = $sshlogin->sshcommand();
my $serverlogin = $sshlogin->serverlogin(); my $serverlogin = $sshlogin->serverlogin();
my $workdir = workdir(); my $workdir = $self->workdir();
my $removeworkdir = ""; my $removeworkdir = "";
my $cleancmd = ""; my $cleancmd = "";
@ -2403,6 +2399,8 @@ sub cleanup {
sub workdir { sub workdir {
# Returns: # Returns:
# the workdir on a remote machine # the workdir on a remote machine
my $self = shift;
if(not defined $self->{'workdir'}) {
my $workdir; my $workdir;
if(defined $::opt_workdir) { if(defined $::opt_workdir) {
if($::opt_workdir ne "...") { if($::opt_workdir ne "...") {
@ -2411,12 +2409,14 @@ sub workdir {
$workdir =~ s:/+$::; # Remove ending / if any $workdir =~ s:/+$::; # Remove ending / if any
$workdir =~ s:^\./::g; # Remove starting ./ if any $workdir =~ s:^\./::g; # Remove starting ./ if any
} else { } else {
$workdir = ".parallel/tmp/".::hostname()."-".$$."-".$Global::transfer_seq; $workdir = ".parallel/tmp/".::hostname()."-".$$."-".$self->{'seq'};
} }
} else { } else {
$workdir = "."; $workdir = ".";
} }
return $workdir; $self->{'workdir'} = $workdir;
}
return $self->{'workdir'};
} }
sub parentdirs_of { sub parentdirs_of {
@ -2442,22 +2442,20 @@ sub start {
die "jkj2"; die "jkj2";
} }
my $command = $job->sshlogin_wrap(); my $command = $job->sshlogin_wrap();
my ($pid,$name); my $pid;
if($Global::grouped) { if($Global::grouped) {
my (%out,%err,$outname,$errname); my ($outfh,$errfh,$name);
# To group we create temporary files for STDOUT and STDERR # To group we create temporary files for STDOUT and STDERR
# To avoid the cleanup unlink the files immediately (but keep them open) # To avoid the cleanup unlink the files immediately (but keep them open)
$outname = ++$Private::TmpFilename; ($outfh,$name) = ::tempfile(SUFFIX => ".par");
($out{$outname},$name) = ::tempfile(SUFFIX => ".par");
unlink $name; unlink $name;
$errname = ++$Private::TmpFilename; ($errfh,$name) = ::tempfile(SUFFIX => ".par");
($err{$errname},$name) = ::tempfile(SUFFIX => ".par");
unlink $name; unlink $name;
open STDOUT, '>&', $out{$outname} or die "Can't redirect STDOUT: $!"; open STDOUT, '>&', $outfh or die "Can't redirect STDOUT: $!";
open STDERR, '>&', $err{$errname} or die "Can't dup STDOUT: $!"; open STDERR, '>&', $errfh or die "Can't dup STDOUT: $!";
$job->set_stdout($out{$outname}); $job->set_stdout($outfh);
$job->set_stderr($err{$errname}); $job->set_stderr($errfh);
} }
if($Global::interactive or $Global::stderr_verbose) { if($Global::interactive or $Global::stderr_verbose) {
@ -3011,7 +3009,7 @@ sub new {
my @unget = (); my @unget = ();
return bless { return bless {
'unget' => \@unget, 'unget' => \@unget,
'command' => $Global::command, 'command' => $command,
'arg_queue' => RecordQueue->new($read_from,$::opt_colsep), 'arg_queue' => RecordQueue->new($read_from,$::opt_colsep),
'context_replace' => $context_replace, 'context_replace' => $context_replace,
'max_number_of_args' => $max_number_of_args, 'max_number_of_args' => $max_number_of_args,

View file

@ -56,8 +56,7 @@ http://tinyogg.com/watch/TORaR/ and http://tinyogg.com/watch/hfxKj/
=item I<command> =item I<command>
Command to execute. If I<command> or the following arguments contain Command to execute. If I<command> or the following arguments contain
{} every instance will be substituted with the input line. Setting a {} every instance will be substituted with the input line.
command also invokes B<--file>.
If I<command> is given, GNU B<parallel> will behave similar to B<xargs>. If If I<command> is given, GNU B<parallel> will behave similar to B<xargs>. If
I<command> is not given GNU B<parallel> will behave similar to B<cat | sh>. I<command> is not given GNU B<parallel> will behave similar to B<cat | sh>.
@ -278,18 +277,6 @@ I<regexp> is a Perl Regular Expression:
http://perldoc.perl.org/perlre.html http://perldoc.perl.org/perlre.html
=item B<--command>
=item B<-c> (Use B<--command> as B<-c> may be removed in later versions)
Line is a command. The input line contains more than one argument or
the input line needs to be evaluated by the shell. This is the default
if I<command> is not set. Can be reversed with B<--file>.
Most people will never need this because GNU B<parallel> normally
selects the correct B<--file> or B<--command>.
=item B<--delimiter> I<delim> =item B<--delimiter> I<delim>
=item B<-d> I<delim> =item B<-d> I<delim>
@ -349,18 +336,6 @@ See also: B<--bg>
Implies B<--semaphore>. Implies B<--semaphore>.
=item B<--file>
=item B<-f> (Use B<--file> as B<-f> may be removed in later versions)
Line is a filename. The input line contains a filename that will be
quoted so it is not evaluated by the shell. This is the default if
I<command> is set. Can be reversed with B<--command>.
Most people will never need this because GNU B<parallel> normally
selects the correct B<--file> or B<--command>.
=item B<--group> =item B<--group>
=item B<-g> =item B<-g>
@ -1498,8 +1473,7 @@ To run 100 processes simultaneously do:
B<parallel -j 100 < jobs_to_run> B<parallel -j 100 < jobs_to_run>
As there is not a I<command> the option B<--command> is default As there is not a I<command> the jobs will be evaluated by the shell.
because the jobs needs to be evaluated by the shell.
=head1 EXAMPLE: Working as mutex and counting semaphore =head1 EXAMPLE: Working as mutex and counting semaphore