src/parallel: --trc works for minimal examples.

This commit is contained in:
Ole Tange 2010-05-24 02:48:40 +02:00
parent fc7fba878b
commit 26868cdc4f
3 changed files with 165 additions and 70 deletions

View file

@ -911,7 +911,7 @@ the currently running jobs are finished before exiting.
=head1 DIFFERENCES BETWEEN xargs/find -exec AND parallel
B<xargs> and B<find -exec> offer some of the same possibilites as
GNU B<parallel>.
GNU B<parallel>.
B<find -exec> only works on files. So processing other input (such as
hosts or URLs) will require creating these inputs as files. B<find
@ -924,7 +924,7 @@ B<xargs> deals badly with special characters (such as space, ' and
touch 'not important_file'
ls not* | xargs rm
mkdir -p '12" records'
ls | xargs rmdir
ls | xargs rmdir
You can specify B<-0> or B<-d "\n">, but many input generators are not
optimized for using B<NUL> as separator but are optimized for
@ -948,7 +948,7 @@ B<xargs> has no support for keeping the order of the output, therefore
if running jobs in parallel using B<xargs> the output of the second
job cannot be postponed till the first job is done.
B<xargs> has no support for context replace, so you will have to create the
B<xargs> has no support for context replace, so you will have to create the
arguments.
If you use a replace string in B<xargs> (B<-I>) you can not force
@ -957,17 +957,17 @@ B<xargs> to use more than one argument.
Quoting in B<xargs> works like B<-q> in GNU B<parallel>. This means
composed commands and redirection requires using B<bash -c>.
B<ls | parallel "wc {} >> B<{}.wc">
B<ls | parallel "wc {} >> B<{}.wc">
becomes
becomes
B<ls | xargs -d "\n" -P10 -I {} bash -c "wc {} >>B< {}.wc">
and
and
B<ls | parallel "echo {}; ls {}|wc">
B<ls | parallel "echo {}; ls {}|wc">
becomes
becomes
B<ls | xargs -d "\n" -P10 -I {} bash -c "echo {}; ls {}|wc">
@ -1161,7 +1161,7 @@ sub parse_options {
$Global::argfile = *STDIN;
$Global::interactive = 0;
$Global::stderr_verbose = 0;
Getopt::Long::Configure ("bundling","require_order");
GetOptions("debug|D" => \$::opt_D,
"xargs|m" => \$::opt_m,
@ -1181,8 +1181,12 @@ sub parse_options {
"max-line-length-allowed" => \$::opt_max_line_length_allowed,
"number-of-cpus" => \$::opt_number_of_cpus,
"number-of-cores" => \$::opt_number_of_cores,
"sshlogin|S=s" => \@Global::sshlogin,
"sshlogin|S=s" => \@::opt_sshlogin,
"sshloginfile=s" => \$::opt_sshloginfile,
"return=s" => \@::opt_return,
"trc=s" => \@::opt_trc,
"transfer" => \$::opt_transfer,
"cleanup" => \$::opt_cleanup,
# xargs-compatibility - implemented, man, unittest
"max-procs|P=s" => \$::opt_P,
"delimiter|d=s" => \$::opt_d,
@ -1200,7 +1204,6 @@ sub parse_options {
## xargs-compatibility - implemented, man - unittest missing
"interactive|p" => \$::opt_p,
## How to unittest? tty skal emuleres
# xargs-compatibility - unimplemented
"L=i" => \$::opt_L,
"max-lines|l:i" => \$::opt_l,
@ -1236,7 +1239,14 @@ sub parse_options {
if(defined $::opt_max_line_length_allowed) { print real_max_length(),"\n"; exit(0); }
if(defined $::opt_version) { version(); exit(0); }
if(defined $::opt_show_limits) { show_limits(); }
if(defined @::opt_sshlogin) { @Global::sshlogin = @::opt_sshlogin; }
if(defined $::opt_sshloginfile) { read_sshloginfile($::opt_sshloginfile); }
if(defined @::opt_return) { push @Global::ret_files, @::opt_return; }
if(defined @::opt_trc) {
push @Global::ret_files, @::opt_trc;
$::opt_transfer = 1;
$::opt_cleanup = 1;
}
if(defined $::opt_a) {
if(not open(ARGFILE,"<".$::opt_a)) {
@ -1246,11 +1256,11 @@ sub parse_options {
$Global::argfile = *ARGFILE;
}
if(@ARGV) {
if(@ARGV) {
if($Global::quoting) {
$Global::command = join(" ", shell_quote(@ARGV));
$Global::command = shell_quote(@ARGV);
} else {
$Global::command = join(" ", @ARGV);
$Global::command = join(" ", @ARGV);
}
}
@ -1258,7 +1268,7 @@ sub parse_options {
# Needs to be done after setting $Global::command and $Global::command_line_max_len
# as '-m' influences the number of commands that needs to be run
if(defined $::opt_P) {
if(defined $::opt_P) {
for my $sshlogin (keys %Global::host) {
$Global::host{$sshlogin}{'max_no_of_running'} =
compute_number_of_processes($::opt_P,$sshlogin);
@ -1276,6 +1286,12 @@ sub parse_options {
# Generating the command line
#
sub no_extension {
my $no_ext = shift;
$no_ext =~ s/\.[^\.]*$//; # Remove .ext from argument
return $no_ext;
}
sub generate_command_line {
my $command = shift;
my ($job_line,$last_good);
@ -1290,8 +1306,7 @@ sub generate_command_line {
# max number of lines (-L) =
# number_of_read_lines = 0
while (defined($next_arg = get_next_arg())) {
my $next_arg_no_ext = $next_arg;
$next_arg_no_ext =~ s/\.[^\.]*$//; # Remove .ext from argument
my $next_arg_no_ext = no_extension($next_arg);
# if defined max_number_of_lines
# number_of_read_lines++
# if $next_arg =~ /\w$/ then number_of_read_lines--
@ -1349,7 +1364,7 @@ sub generate_command_line {
}
debug("Return jobline: $job_line\n");
}
return $job_line;
return ($job_line,\@quoted_args);
}
@ -1364,7 +1379,7 @@ sub xargs_computations {
# Count number of {.}'s on the command line
my $no_of_no_ext = ($command =~ s/\Q$Global::replace_no_ext\E/$Global::replace_no_ext/go);
my $number_of_substitution_no_ext = $no_of_no_ext || 0;
# Count
# Count
my $c = $command;
if($Global::xargs) {
# remove all {}s
@ -1376,7 +1391,6 @@ sub xargs_computations {
if($Global::Xargs) {
$c =~ s/\S*\Q$Global::replacestring\E\S*//go;
$c =~ s/\S*\Q$Global::replace_no_ext\E\S*//go;
$length_of_command_no_args = length($c) - 1;
$length_of_context = length($command) - $length_of_command_no_args
- $no_of_replace * length($Global::replacestring)
@ -1384,7 +1398,7 @@ sub xargs_computations {
$spaces = 0;
}
@Calculated::xargs_computations =
@Calculated::xargs_computations =
($number_of_substitution, $number_of_substitution_no_ext,
$spaces,$length_of_command_no_args,$length_of_context);
}
@ -1395,7 +1409,7 @@ sub xargs_computations {
sub shell_quote {
# Quote the string so shell will not expand any special chars
my (@strings) = (@_);
my $arg;
my $arg;
for $arg (@strings) {
$arg =~ s/\\/\\\\/g;
@ -1403,8 +1417,7 @@ sub shell_quote {
$arg =~ s/([\002-\011\013-\032])/\\$1/g;
$arg =~ s/([\n])/'\n'/g; # filenames with '\n' is quoted using \'
}
#return wantarray ? @strings : "@strings";
return @strings;
return wantarray ? @strings : "@strings";
}
# Replace foo{}bar or foo{.}bar
@ -1508,7 +1521,7 @@ sub processes_available_by_system_limit {
my $sshlogin = shift;
my $system_limit=0;
my @command_lines=();
my $next_command_line;
my ($next_command_line, $args_ref);
my $more_filehandles;
my $max_system_proc_reached=0;
my $spawning_too_slow=0;
@ -1528,13 +1541,13 @@ sub processes_available_by_system_limit {
# If there are no more command lines, then we have a process
# per command line, so no need to go further
$next_command_line = next_command_line();
if(defined $next_command_line) {
push(@command_lines, $next_command_line);
($next_command_line, $args_ref) = next_command_line();
if(defined $next_command_line) {
push(@command_lines, $next_command_line, $args_ref);
}
# Every simultaneous process uses 2 filehandles when grouping
$more_filehandles = open($fh{$system_limit*2},"</dev/null")
$more_filehandles = open($fh{$system_limit*2},"</dev/null")
&& open($fh{$system_limit*2+1},"</dev/null");
# System process limit
@ -1559,7 +1572,7 @@ sub processes_available_by_system_limit {
$system_limit = int ($system_limit * 0.9)+1;
$spawning_too_slow = 1;
}
} while($system_limit < $wanted_processes
} while($system_limit < $wanted_processes
and defined $next_command_line
and $more_filehandles
and not $max_system_proc_reached
@ -1585,7 +1598,7 @@ sub processes_available_by_system_limit {
waitpid($pid,0);
}
wait();
# Cleanup: Unget the command_lines
# Cleanup: Unget the command_lines (and args_refs)
unget_command_line(@command_lines);
return $system_limit;
}
@ -1727,26 +1740,49 @@ sub init_run_jobs {
$SIG{TERM} = \&StartNoNewJobs;
}
sub login_and_host {
my $sshlogin = shift;
$sshlogin =~ /(\S+$)/ or die;
return $1;
}
sub next_command_line_with_sshlogin {
my $sshlogin = shift;
my $next_command_line = next_command_line();
my ($next_command_line, $args_ref) = next_command_line();
my ($pre,$post)=("","");
if($next_command_line and $sshlogin ne ":") {
return "ssh $sshlogin ".join("",shell_quote($next_command_line));
my $remote = login_and_host($sshlogin);
for my $file (@$args_ref) {
$file =~ s:^([^/]):./$1:; # If relative path: prepend ./ (to avoid problems with ':')
if($::opt_transfer) {
$pre = "rsync -az $file $remote:".shell_quote($file)." ;";
}
for my $ret_file (@Global::ret_files) {
my $remove = $::opt_cleanup ? "--remove-source-files" : "";
my $replaced = context_replace($ret_file,[$file],[no_extension($file)]);
$post .= "rsync -az $remove $remote:".shell_quote($replaced)." $replaced ;";
}
if($::opt_cleanup) {
$post .= "ssh $sshlogin rm ".shell_quote($file).";";
}
}
return "$pre ssh $sshlogin ".shell_quote($next_command_line)."; $post";
} else {
return $next_command_line;
}
}
sub next_command_line {
my $cmd_line;
my ($cmd_line,$args_ref);
if(@Global::unget_next_command_line) {
$cmd_line = shift @Global::unget_next_command_line;
$args_ref = shift @Global::unget_next_command_line;
} else {
do {
$cmd_line = generate_command_line($Global::command);
($cmd_line,$args_ref) = generate_command_line($Global::command);
} while (defined $cmd_line and $cmd_line =~ /^\s*$/); # Skip empty lines
}
return $cmd_line;
return ($cmd_line,$args_ref);
}
sub unget_command_line {
@ -1774,10 +1810,10 @@ sub get_next_arg {
}
}
if($Global::input_is_filename) {
($arg) = shell_quote($arg);
$arg = shell_quote($arg);
}
}
debug("Next arg: ".$arg."\n");
debug("Next arg: ".$arg."\n");
return $arg;
}
@ -1797,7 +1833,7 @@ sub start_more_jobs {
if(not $Global::StartNoNewJobs) {
for my $sshlogin (keys %Global::host) {
debug("Running jobs on $sshlogin: $Global::host{$sshlogin}{'no_of_running'}\n");
while ($Global::host{$sshlogin}{'no_of_running'} <
while ($Global::host{$sshlogin}{'no_of_running'} <
$Global::host{$sshlogin}{'max_no_of_running'}) {
if(start_another_job($sshlogin) == 0) {
# No more jobs to start
@ -1813,7 +1849,7 @@ sub start_more_jobs {
}
sub start_another_job {
# Grab a job from @Global::command, start it
# Grab a job from @Global::command, start it
# and remember the pid, the STDOUT and the STDERR handles
# Return 1.
# If no more jobs: do nothing and return 0
@ -1887,12 +1923,12 @@ sub start_job {
if($::opt_a and $Global::job_start_sequence == 1) {
# Give STDIN to the first job if using -a
$pid = open3("<&STDIN", ">&STDOUT", ">&STDERR", $command) ||
$pid = open3("<&STDIN", ">&STDOUT", ">&STDERR", $command) ||
die("open3 failed. Report a bug to <bug-parallel\@gnu.org>\n");
# Re-open to avoid complaining
open STDIN, "<&", $Global::original_stdin or die "Can't dup \$Global::original_stdin: $!";
} else {
$pid = open3(gensym, ">&STDOUT", ">&STDERR", $command) ||
$pid = open3(gensym, ">&STDOUT", ">&STDERR", $command) ||
die("open3 failed. Report a bug to <bug-parallel\@gnu.org>\n");
}
debug("started: $command\n");
@ -1908,7 +1944,7 @@ sub start_job {
"command" => $command);
} else {
return ("seq" => $Global::job_start_sequence,
"pid" => $pid,
"pid" => $pid,
"sshlogin" => $sshlogin,
"command" => $command);
}
@ -1978,7 +2014,7 @@ sub parse_sshlogin {
if($sshlogin =~ s:^(\d+)/::) {
# Override default autodetected ncpus
$Global::host{$sshlogin}{'ncpus'} = $1;
}
}
$Global::host{$sshlogin}{'no_of_running'} = 0;
$Global::host{$sshlogin}{'maxlength'} = max_length_of_command_line();
}
@ -1998,7 +2034,7 @@ sub ListRunningJobs {
sub StartNoNewJobs {
print STDERR ("$Global::progname: SIGTERM received. No new jobs will be started.\n",
"$Global::progname: Waiting for these ", scalar(keys %Global::running),
"$Global::progname: Waiting for these ", scalar(keys %Global::running),
" jobs to finish. Send SIGTERM again to stop now.\n");
ListRunningJobs();
$Global::StartNoNewJobs++;
@ -2022,7 +2058,7 @@ sub ReapIfNeeded {
$Global::SigChildCaught = 0;
Reaper();
}
$SIG{CHLD} = \&Reaper;
$SIG{CHLD} = \&Reaper;
}
sub Reaper {
@ -2172,10 +2208,10 @@ $main::opt_E = $main::opt_r = $Global::xargs = $Global::keeporder = 0;
# Per host variables:
# Can depend on OS
#$Global::command_line_max_len =
#$Global::command_line_max_len =
# Can depend on processes_available_by_system_limit
#$Global::processes_to_run =
#
#
# $sshlogin, $ncpus
# $Global::running_jobs = 0;
# $Global::running{$pid}{'seq'} = printsequence
@ -2193,3 +2229,9 @@ $main::opt_E = $main::opt_r = $Global::xargs = $Global::keeporder = 0;
# blokkene samlet med den mest egnede algoritme.
# Moved parse options to parse_options
# TODO max_line_length on remote
# TODO compute how many can be transferred within max_line_length
# TODO Unittest with filename that is long and requires a lot of quoting. Will there be to many
# TODO Unittest --trc without -S should warn
# TODO Unittest --transfer file called ' : & ) *.jpg'

View file

@ -1,53 +1,53 @@
1
2
3
sleep 1; echo 1
1
sleep 1; echo 2
2
ssh nlv.pi.dk sleep\ 1\;\ echo\ 3
sleep 1; echo 1
1
ssh nlv.pi.dk sleep\ 1\;\ echo\ 3;
3
ssh nlv.pi.dk sleep\ 1\;\ echo\ \\\>/tmp/fire
sleep 1; echo \>/tmp/fire
>/tmp/fire
sleep 1; echo 5
5
sleep 1; echo 6
ssh nlv.pi.dk sleep\ 1\;\ echo\ 6;
6
ssh nlv.pi.dk sleep\ 1\;\ echo\ 7
sleep 1; echo 7
7
ssh nlv.pi.dk sleep\ 1\;\ echo\ 8
sleep 1; echo 8
8
sleep 1; echo 9
ssh nlv.pi.dk sleep\ 1\;\ echo\ 9;
9
sleep 1; echo 10
10
sleep 1; hostname; echo 1
alpha
1
sleep 1; hostname; echo 2
alpha
ssh nlv.pi.dk sleep\ 1\;\ hostname\;\ echo\ 2;
nlv.pi.dk
2
ssh nlv.pi.dk sleep\ 1\;\ hostname\;\ echo\ 3
ssh nlv.pi.dk sleep\ 1\;\ hostname\;\ echo\ 3;
nlv.pi.dk
3
ssh nlv.pi.dk sleep\ 1\;\ hostname\;\ echo\ \\\>/tmp/fire
ssh nlv.pi.dk sleep\ 1\;\ hostname\;\ echo\ \\\>/tmp/fire;
nlv.pi.dk
>/tmp/fire
sleep 1; hostname; echo 5
alpha
ssh nlv.pi.dk sleep\ 1\;\ hostname\;\ echo\ 5;
nlv.pi.dk
5
sleep 1; hostname; echo 6
alpha
ssh nlv.pi.dk sleep\ 1\;\ hostname\;\ echo\ 6;
nlv.pi.dk
6
ssh nlv.pi.dk sleep\ 1\;\ hostname\;\ echo\ 7
ssh nlv.pi.dk sleep\ 1\;\ hostname\;\ echo\ 7;
nlv.pi.dk
7
ssh nlv.pi.dk sleep\ 1\;\ hostname\;\ echo\ 8
ssh nlv.pi.dk sleep\ 1\;\ hostname\;\ echo\ 8;
nlv.pi.dk
8
sleep 1; hostname; echo 9
alpha
ssh nlv.pi.dk sleep\ 1\;\ hostname\;\ echo\ 9;
nlv.pi.dk
9
sleep 1; hostname; echo 10
alpha
ssh nlv.pi.dk sleep\ 1\;\ hostname\;\ echo\ 10;
nlv.pi.dk
10

View file

@ -0,0 +1,53 @@
1
2
3
sleep 1; echo 2
2
sleep 1; echo 1
1
ssh nlv.pi.dk sleep\ 1\;\ echo\ 3;
3
sleep 1; echo \>/tmp/fire
>/tmp/fire
sleep 1; echo 5
5
ssh nlv.pi.dk sleep\ 1\;\ echo\ 6;
6
sleep 1; echo 7
7
sleep 1; echo 8
8
ssh nlv.pi.dk sleep\ 1\;\ echo\ 9;
9
sleep 1; echo 10
10
sleep 1; hostname; echo 1
alpha
1
ssh nlv.pi.dk sleep\ 1\;\ hostname\;\ echo\ 2;
nlv.pi.dk
2
ssh nlv.pi.dk sleep\ 1\;\ hostname\;\ echo\ 3;
nlv.pi.dk
3
ssh nlv.pi.dk sleep\ 1\;\ hostname\;\ echo\ \\\>/tmp/fire;
nlv.pi.dk
>/tmp/fire
ssh nlv.pi.dk sleep\ 1\;\ hostname\;\ echo\ 5;
nlv.pi.dk
5
ssh nlv.pi.dk sleep\ 1\;\ hostname\;\ echo\ 6;
nlv.pi.dk
6
ssh nlv.pi.dk sleep\ 1\;\ hostname\;\ echo\ 7;
nlv.pi.dk
7
ssh nlv.pi.dk sleep\ 1\;\ hostname\;\ echo\ 8;
nlv.pi.dk
8
ssh nlv.pi.dk sleep\ 1\;\ hostname\;\ echo\ 9;
nlv.pi.dk
9
ssh nlv.pi.dk sleep\ 1\;\ hostname\;\ echo\ 10;
nlv.pi.dk
10