parallel: --joblog implemented. Testsuite missing.

--spreadstdin prototype. Testsuite missing.
This commit is contained in:
Ole Tange 2011-01-18 18:15:42 +01:00
parent 6ee0aee609
commit 48454a36ea
7 changed files with 1069 additions and 933 deletions

View file

@ -1,12 +1,5 @@
max_jobs_running could be resat if -j is a changed file.
dummy children was not killed if cmd line was too long.
default number of jobs for --semaphore set to 1.
test25 had race condition.
test22 -j10 added.
test13 -j9 added.
test47 remote cleanup added before test.
test17 remote cleanup added before test.
parallel: spread arguments between all jobslots when reaching EOF of input
parallel: --joblog implemented. Testsuite missing.
parallel: --spreadstdin prototype. Testsuite missing.
codecoverage

View file

@ -143,12 +143,23 @@ download at: http://ftp.gnu.org/gnu/parallel/
New in this release:
* --joblog makes a simple log of completed jobs.
* -X now spreads arguments between job slots when reaching last
argument. Use -j1 to avoid this.
* People on the email list have voted -j+0 to be the new default
instead of -j9.
* First review in Polish. Thanks to Patryk Krawaczyński.
http://nfsec.pl/root/2458
* Review in Spanish (in print).
https://www.linux-magazine.es/issue/67/moreinfo.html
* Review in English. Thanks to Brian Gough.
http://blogs.fsfe.org/bjg/2011/01/gnu-parallel-a-map-operator-for-the-command-line/
* Review in French. Thanks to Denis Dordoigne.
http://linuxfr.org/2010/12/29/27715.html

View file

@ -1,13 +1,15 @@
#!/usr/bin/perl -w
use IPC::Open3;
use Fcntl;
use Symbol qw(gensym);
use IO::File;
use POSIX qw(:sys_wait_h setsid ceil);
use POSIX qw(:sys_wait_h setsid ceil :errno_h);
use File::Temp qw(tempfile tempdir);
use Getopt::Long;
use strict;
use Carp;
use Time::HiRes qw(usleep);
$::oodebug=0;
$Global::original_sigterm = $SIG{TERM};
@ -51,6 +53,7 @@ for my $sshlogin (values %Global::host) {
$sshlogin->max_jobs_running();
}
init_run_jobs();
my $sem;
if($Global::semaphore) {
@ -58,6 +61,9 @@ if($Global::semaphore) {
}
$SIG{TERM} = \&start_no_new_jobs;
start_more_jobs();
if($::opt_spreadstdin) {
spreadstdin();
}
reap_if_needed();
drain_job_queue();
cleanup();
@ -70,6 +76,48 @@ if($::opt_halt_on_error) {
wait_and_exit(min(undef_as_zero($Global::exitstatus),254));
}
sub spreadstdin {
# read a record
# print it to the first jobs that is ready
my @jobs = values %Global::running;
my $first;
my $second;
my $sleep = 1;
while(<STDIN>) {
my @rec;
for(my $t = 0; $t < 1000 and not eof(STDIN); $t++) {
push @rec, scalar(<STDIN>);
}
my $record = join("",@rec);
# Rotate jobs to spread the input
#@jobs = ($jobs[$#jobs],@jobs[0..$#jobs-1]);
write_record: while(defined $record) {
for my $job (@jobs) {
# ::debug("Looking at ",$job);
if($job->remaining()) {
$job->complete_write();
} else {
$job->write($record);
$record = undef;
$first++;
$sleep=1;
last write_record;
}
}
@jobs = ($jobs[$#jobs],@jobs[0..$#jobs-1]);
usleep($sleep);
$sleep *=1.1;
$second++;
}
}
for my $job (values %Global::running) {
my $fh = $job->stdin();
close $fh;
}
$Global::start_no_new_jobs = 1;
print STDERR $first," ",$second,"\n";
}
sub acquire_semaphore {
# Acquires semaphore. If needed: spawns to the background
# Returns:
@ -156,6 +204,7 @@ sub get_options_from_array {
"arg-file-sep|argfilesep=s" => \$::opt_arg_file_sep,
"trim=s" => \$::opt_trim,
"profile|J=s" => \$::opt_profile,
"spreadstdin" => \$::opt_spreadstdin,
# xargs-compatibility - implemented, man, testsuite
"max-procs|P=s" => \$::opt_P,
"delimiter|d=s" => \$::opt_d,
@ -197,7 +246,7 @@ sub get_options_from_array {
sub parse_options {
# Returns: N/A
# Defaults:
$Global::version = 20110111;
$Global::version = 20110118;
$Global::progname = 'parallel';
$Global::infinity = 2**31;
$Global::debug = 0;
@ -340,14 +389,16 @@ sub parse_options {
if(not defined $::opt_P) {
$::opt_P = "+0";
#for my $sshlogin (values %Global::host) {
# $sshlogin->set_max_jobs_running($Global::default_simultaneous_sshlogins);
#}
}
if($::opt_joblog) {
if(not open($Global::joblog,">$::opt_joblog")) {
print STDERR "Cannot write to --joblog $::opt_joblog\n";
::wait_and_exit(255);
} else {
print $Global::joblog
join("\t", "Seq", "Host", "Starttime", "Runtime",
"Trans", "Return", "Command"
). "\n";
}
}
}
@ -561,7 +612,7 @@ sub enough_file_handles {
my $enough_filehandles = 1;
# We need a filehandle for STDOUT and STDERR
# open3 uses 2 extra filehandles temporarily
for my $i (1..4) {
for my $i (1..6) {
$enough_filehandles &&= open($fh{$i},"</dev/null");
}
for (values %fh) { close $_; }
@ -658,14 +709,6 @@ sub init_run_jobs {
}
}
sub __login_and_host {
# Returns:
# login@hostname
my $sshlogin = shift;
$sshlogin =~ /(\S+$)/ or die;
return $1;
}
sub drain_job_queue {
# Returns: N/A
if($::opt_progress) {
@ -901,7 +944,7 @@ sub start_more_jobs {
next;
}
while ($sshlogin->jobs_running() < $sshlogin->max_jobs_running()) {
if($Global::JobQueue->empty()) {
if($Global::JobQueue->empty() and not $::opt_spreadstdin) {
last;
}
debug("Try starting a job on ".$sshlogin->string()."\n");
@ -930,7 +973,7 @@ sub start_another_job {
my $sshlogin = shift;
# Do we have enough file handles to start another job?
if(enough_file_handles()) {
if($Global::JobQueue->empty()) {
if($Global::JobQueue->empty() and not $::opt_spreadstdin) {
# No more commands to run
return 0;
} else {
@ -1661,8 +1704,8 @@ sub processes_available_by_system_limit {
$system_limit++;
# Every simultaneous process uses 2 filehandles when grouping
$more_filehandles = open($fh{$system_limit*2},"</dev/null")
&& open($fh{$system_limit*2+1},"</dev/null");
$more_filehandles = open($fh{$system_limit*10},"</dev/null")
&& open($fh{$system_limit*10+2},"</dev/null");
# System process limit
$system_limit % 10 or $time=time;
@ -2150,11 +2193,15 @@ sub new {
my $class = shift;
my $commandline = shift;
return bless {
'commandline' => $commandline,
'workdir' => undef,
'seq' => undef,
'stdout' => undef,
'stderr' => undef,
'commandline' => $commandline, # The commandline with no args
'workdir' => undef, # --workdir
'seq' => undef, # $PARALLEL_SEQ
'stdin' => undef, # filehandle for stdin (used for --spreadstdin)
'stdout' => undef, # filehandle for stdout (used for --group)
'stderr' => undef, # filehandle for stderr (used for --group)
'remaining' => "", # remaining data not sent to stdin (used for --spreadstdin)
'transfersize' => 0, # size of files using --transfer
'returnsize' => 0, # size of files using --return
'pid' => undef,
# hash of { SSHLogins => number of times the command failed there }
'failed' => undef,
@ -2203,6 +2250,52 @@ sub set_stderr {
$self->{'stderr'} = $stderr;
}
sub stdin {
my $self = shift;
return $self->{'stdin'};
}
sub set_stdin {
my $self = shift;
my $stdin = shift;
# set non-blocking
fcntl($stdin, ::F_SETFL, ::O_NONBLOCK) or die "Couldn't set flags for HANDLE: $!\n";
$self->{'stdin'} = $stdin;
}
sub write {
my $self = shift;
my $remaining = shift;
if(length($remaining)) {
$self->{'remaining'} .= $remaining;
$self->complete_write();
}
}
sub complete_write {
my $self = shift;
my $in = $self->{'stdin'};
::debug("complete_write\n");
my $len = syswrite($in,$self->{'remaining'});
if (!defined($len) && $! == ::EAGAIN) {
# write would block;
} else {
# Remove the part that was written
substr($self->{'remaining'},0,$len) = "";
}
}
sub remaining {
my $self = shift;
if(defined $self->{'remaining'}) {
return length $self->{'remaining'};
} else {
return undef;
}
}
sub pid {
my $self = shift;
return $self->{'pid'};
@ -2361,17 +2454,25 @@ sub transfer {
# Files to transfer
my $self = shift;
my @transfer = ();
$self->{'transfersize'} = 0;
if($::opt_transfer) {
for my $record (@{$self->{'commandline'}{'arg_list'}}) {
# Merge arguments from records into args
for my $arg (@$record) {
CORE::push @transfer, $arg->orig();
# filesize
$self->{'transfersize'} += (stat($arg->orig()))[7];
}
}
}
return @transfer;
}
sub transfersize {
my $self = shift;
return $self->{'transfersize'};
}
sub sshtransfer {
my $self = shift;
my $sshlogin = $self->sshlogin();
@ -2415,6 +2516,15 @@ sub return {
return @return;
}
sub returnsize {
# This is called after the job has finished
my $self = shift;
for my $file ($self->return()) {
$self->{'returnsize'} += (stat($file))[7];
}
return $self->{'returnsize'};
}
sub sshreturn {
my $self = shift;
my $sshlogin = $self->sshlogin();
@ -2529,6 +2639,24 @@ sub start {
die "jkj2";
}
my $command = $job->sshlogin_wrap();
if($Global::interactive or $Global::stderr_verbose) {
if($Global::interactive) {
print $Global::original_stderr "$command ?...";
open(TTY,"/dev/tty") || die;
my $answer = <TTY>;
close TTY;
my $run_yes = ($answer =~ /^\s*y/i);
if (not $run_yes) {
$command = "true"; # Run the command 'true'
}
} else {
print $Global::original_stderr "$command\n";
}
}
local (*IN,*OUT,*ERR);
my $pid;
if($Global::grouped) {
my ($outfh,$errfh,$name);
@ -2539,36 +2667,20 @@ sub start {
($errfh,$name) = ::tempfile(SUFFIX => ".par");
unlink $name;
open STDOUT, '>&', $outfh or die "Can't redirect STDOUT: $!";
open STDERR, '>&', $errfh or die "Can't dup STDOUT: $!";
open OUT, '>&', $outfh or die "Can't redirect STDOUT: $!";
open ERR, '>&', $errfh or die "Can't dup STDOUT: $!";
$job->set_stdout($outfh);
$job->set_stderr($errfh);
} else {
(*OUT,*ERR)=(*STDOUT,*STDERR);
}
if($Global::interactive or $Global::stderr_verbose) {
if($Global::interactive) {
print $Global::original_stderr "$command ?...";
open(TTY,"/dev/tty") || die;
my $answer = <TTY>;
close TTY;
my $run_yes = ($answer =~ /^\s*y/i);
if (not $run_yes) {
open STDOUT, ">&", $Global::original_stdout
or die "Can't dup \$oldout: $!";
open STDERR, ">&", $Global::original_stderr
or die "Can't dup \$oldout: $!";
$command = "true"; # Run the command 'true'
}
} else {
print $Global::original_stderr "$command\n";
}
}
if(($::opt_dryrun or $Global::verbose) and not $Global::grouped) {
if($Global::verbose <= 1) {
print STDOUT $job->replaced(),"\n";
print OUT $job->replaced(),"\n";
} else {
# Verbose level > 1: Print the rsync and stuff
print STDOUT $command,"\n";
print OUT $command,"\n";
}
}
if($::opt_dryrun) {
@ -2579,9 +2691,15 @@ sub start {
$ENV{'PARALLEL_SEQ'} = $job->seq();
$ENV{'PARALLEL_PID'} = $$;
::debug("$Global::total_running processes. Starting (".$job->seq()."): $command\n");
if(@::opt_a and $job->seq() == 1) {
if($::opt_spreadstdin) {
my ($in);
$pid = ::open3($in, ">&OUT", ">&ERR", $command) ||
die("open3 (with spreadstdin) failed. Report a bug to <bug-parallel\@gnu.org>\n");
$job->set_stdin($in);
} elsif(@::opt_a and $job->seq() == 1) {
# Give STDIN to the first job if using -a
$pid = ::open3("<&STDIN", ">&STDOUT", ">&STDERR", $command) ||
*IN = *STDIN;
$pid = ::open3("<&IN", ">&OUT", ">&ERR", $command) ||
die("open3 (with -a) failed. Report a bug to <bug-parallel\@gnu.org>\n");
# Re-open to avoid complaining
open STDIN, "<&", $Global::original_stdin
@ -2589,20 +2707,17 @@ sub start {
} elsif ($::opt_tty and not $Global::tty_taken and -c "/dev/tty" and
open(DEVTTY, "/dev/tty")) {
# Give /dev/tty to the command if no one else is using it
$pid = ::open3("<&DEVTTY", ">&STDOUT", ">&STDERR", $command) ||
*IN = *DEVTTY;
$pid = ::open3("<&IN", ">&OUT", ">&ERR", $command) ||
die("open3 (with /dev/tty) failed. Report a bug to <bug-parallel\@gnu.org>\n");
$Global::tty_taken = $pid;
close DEVTTY;
} else {
$pid = ::open3(::gensym, ">&STDOUT", ">&STDERR", $command) ||
$pid = ::open3(::gensym, ">&OUT", ">&ERR", $command) ||
die("open3 (with gensym) failed. Report a bug to <bug-parallel\@gnu.org>\n");
}
$job->set_pid($pid);
$job->set_starttime();
open STDOUT, ">&", $Global::original_stdout
or die "Can't dup \$Global::original_stdout: $!";
open STDERR, ">&", $Global::original_stderr
or die "Can't dup \$Global::original_stderr: $!";
return $job;
}
@ -2656,7 +2771,7 @@ sub print {
}
printf $Global::joblog
join("\t", $self->seq(), $self->sshlogin()->string(),
$self->starttime(), $self->runtime(), $cmd
$self->starttime(), $self->runtime(), $self->transfersize(), $self->returnsize(), $cmd
). "\n";
flush $Global::joblog;
}
@ -2763,7 +2878,11 @@ sub populate {
# Add arguments from arg_queue until the number of arguments or
# max line length is reached
my $self = shift;
# my $first_time_empty = 1;
if($::opt_spreadstdin) {
# Do no read any args
$self->push([Arg->new("")]);
return;
}
my $next_arg;
while (not $self->{'arg_queue'}->empty()) {
$next_arg = $self->{'arg_queue'}->get();
@ -3011,8 +3130,8 @@ sub replace_placeholders {
my $target = shift;
my $quote_special_chars = shift;
my $context_replace = $self->{'context_replace'};
my $context_regexp = $context_replace ? '\S*' : ''; # Regexp to match surrounding context
if($self->number_of_args() == 0) {
Carp::confess("0 args should never call replaced");
}
@ -3171,7 +3290,7 @@ sub get {
);
$cmd_line->populate();
::debug("cmd_line->number_of_args ".$cmd_line->number_of_args()."\n");
if($cmd_line->number_of_args() == 0) {
if(not $::opt_spreadstdin and $cmd_line->number_of_args() == 0) {
# We did not get more args - maybe at EOF string?
return undef;
} else {

View file

@ -390,6 +390,17 @@ specified, and for B<-I>{} otherwise. This option is deprecated;
use B<-I> instead.
=item B<--joblog> I<logfile> (beta testing)
Logfile for executed jobs. Saved a list of the executed jobs to
I<logfile> in the following format: sequence number, sshlogin, start
time as seconds since epoch, run time in seconds, bytes in files
transfered, bytes in files returned, command run.
To convert the times into ISO-8601 strict do:
B<perl -a -F"\t" -ne 'chomp($F[2]=`date -d \@$F[2] +%FT%T`); print join("\t",@F)'>
=item B<--jobs> I<N>
=item B<-j> I<N>

View file

@ -239,8 +239,9 @@ stdout parallel -k -l1 -n4 echo < files.xi
echo '### -l2 echo < files.xi'
stdout xargs -l2 echo < files.xi
stdout parallel -k -l2 echo < files.xi
echo '### -s30 -t echo < stairs.xi'
echo '### -s30 -t echo < stairs.xi - xargs'
stdout xargs -s30 -t echo < stairs.xi
echo '### -s30 -t echo < stairs.xi - parallel'
stdout parallel -k -X -s30 -t echo < stairs.xi
echo '### -t echo this plus that < space.xi'
stdout xargs -t echo this plus that < space.xi

View file

@ -38,7 +38,7 @@ echo '### Test empty input'
</dev/null parallel -j +0 echo
echo '### Test -m'
seq 1 2 | parallel -m echo
seq 1 2 | parallel -k -m echo
echo '### Test :::'
parallel echo ::: 1

View file

@ -1055,7 +1055,7 @@ FIRST with 'single quotes' as well. IS OK
/src/gnu/autoconf-1.11/install.sh /src/gnu/autoconf-1.11/autoconf.info
/src/gnu/autoconf-1.11/standards.texi /src/gnu/autoconf-1.11/make-stds.texi
/src/gnu/autoconf-1.11/standards.info /src/gnu/autoconf-1.11/texinfo.tex
### -s30 -t echo < stairs.xi
### -s30 -t echo < stairs.xi - xargs
echo 1 22 333 4444 55555
1 22 333 4444 55555
echo 666666 7777777 88888888
@ -1066,6 +1066,7 @@ echo 55555 666666 7777777
55555 666666 7777777
echo 88888888 999999999
88888888 999999999
### -s30 -t echo < stairs.xi - parallel
echo 1 22 333 4444 55555
echo 666666 7777777 88888888
1 22 333 4444 55555