Conflict resolved

This commit is contained in:
Ole Tange 2011-01-21 19:03:12 +01:00
parent 0be50427fb
commit 495d832a73

View file

@ -85,36 +85,58 @@ sub spreadstdin {
my $record; my $record;
my $partial = ""; my $partial = "";
my $buf = ""; my $buf = "";
my $block_max = 1000_000;
my $rec_start = undef; my $rec_start = undef;
my $rec_end = undef; my $rec_end = undef;
my $err; my ($recstart,$recend,$recerror);
while(read(STDIN,$buf,$block_max)) { if(defined($::opt_recstart) and defined($::opt_recend)) {
# If both --recstart and --recend is given then both must match
$recstart = $::opt_recstart;
$recend = $::opt_recend;
$recerror = "Warning: --recend and --recstart unmatched";
} elsif(defined($::opt_recstart)) {
# If --recstart is given it must match start of record
$recstart = $::opt_recstart;
$recend = "";
$recerror = "Warning: --recstart unmatched";
} elsif(defined($::opt_recend)) {
# If --recend is given then it must match end of record
$recstart = "";
$recend = $::opt_recend;
$recerror = "Warning: --recend unmatched";
}
while(read(STDIN,$buf,$::opt_blocksize)) {
$record = $partial.$buf; $record = $partial.$buf;
if(eof STDIN) { if(eof STDIN) {
# There is no partial record at the end of file # There is no partial record at the end of file
$partial = ""; $partial = "";
} else { } else {
if(($::opt_recstart and $record =~ s/(.*)($::opt_recstart.*?)$/$1/os) if($record =~ s/(.*$recend)($recstart.*?)$/$1/os) {
or
($::opt_recend and $record =~ s/(.*$::opt_recend)(.*?)$/$1/os)) {
$partial = $2; $partial = $2;
} else {
print $Global::original_stderr $recerror,"\n";
} }
} }
::debug("Read record: ".length($record)."\n"); ::debug("Read record: ".length($record)."\n");
my $reap_needed = 0;
write_record: while(defined $record) { write_record: while(defined $record) {
# Sorting according to sequence is necessary for -k to work # Sorting according to sequence is necessary for -k to work
do_not_reap(); # If Global::running is changed the for loop has a race condition
for my $job (sort { $a->seq() <=> $b->seq() } values %Global::running) { for my $job (sort { $a->seq() <=> $b->seq() } values %Global::running) {
::debug("Looking at ",$job," ");
::debug("Looking at ",$job->seq(),"-",$job->remaining(),"-",$job->datawritten(),"\n");
if($job->remaining()) { if($job->remaining()) {
# Part of the job's last record has not finished being written # Part of the job's last record has not finished being written
$job->complete_write(); $job->complete_write();
} else { } else {
if($job->datawritten() > 0) { if($job->datawritten() > 0) {
# if opt -k: # There is no data remaining and we have written data before:
# close stdin # So this means we have completed writing a block.
# start another (maybe done by start more?) # close stdin
# This will cause the job to finish and when it dies we will spawn another job
my $fh = $job->stdin(); my $fh = $job->stdin();
close $fh; close $fh;
$reap_needed = 1;
} else { } else {
$job->write($record); $job->write($record);
$record = undef; $record = undef;
@ -124,6 +146,7 @@ sub spreadstdin {
} }
} }
} }
reap_if_needed();
# Rotate jobs to spread the input # Rotate jobs to spread the input
# @jobs = ($jobs[$#jobs],@jobs[0..$#jobs-1]); # @jobs = ($jobs[$#jobs],@jobs[0..$#jobs-1]);
usleep($sleep); usleep($sleep);
@ -131,6 +154,10 @@ sub spreadstdin {
$max_sleep = max($max_sleep, $sleep); $max_sleep = max($max_sleep, $sleep);
$second++; $second++;
} }
if($reap_needed) {
# One of more jobs finished in last round
reap_if_needed();
}
} }
my $flush_done; my $flush_done;
do { do {
@ -240,10 +267,11 @@ sub get_options_from_array {
"arg-file-sep|argfilesep=s" => \$::opt_arg_file_sep, "arg-file-sep|argfilesep=s" => \$::opt_arg_file_sep,
"trim=s" => \$::opt_trim, "trim=s" => \$::opt_trim,
"profile|J=s" => \$::opt_profile, "profile|J=s" => \$::opt_profile,
"spreadstdin" => \$::opt_spreadstdin, "pipe|spreadstdin" => \$::opt_spreadstdin,
"recstart=s" => \$::opt_recstart, "recstart=s" => \$::opt_recstart,
"recend=s" => \$::opt_recend, "recend=s" => \$::opt_recend,
"files" => \$::opt_files, "files|output-as-files|outputasfiles" => \$::opt_files,
"block|block-size|blocksize=s" => \$::opt_blocksize,
# xargs-compatibility - implemented, man, testsuite # xargs-compatibility - implemented, man, testsuite
"max-procs|P=s" => \$::opt_P, "max-procs|P=s" => \$::opt_P,
"delimiter|d=s" => \$::opt_d, "delimiter|d=s" => \$::opt_d,
@ -353,6 +381,10 @@ sub parse_options {
if(defined @::opt_sshlogin) { @Global::sshlogin = @::opt_sshlogin; } if(defined @::opt_sshlogin) { @Global::sshlogin = @::opt_sshlogin; }
if(defined $::opt_sshloginfile) { read_sshloginfile($::opt_sshloginfile); } if(defined $::opt_sshloginfile) { read_sshloginfile($::opt_sshloginfile); }
if(defined @::opt_return) { push @Global::ret_files, @::opt_return; } if(defined @::opt_return) { push @Global::ret_files, @::opt_return; }
if(not defined $::opt_recstart and
not defined $::opt_recend) { $::opt_recend = "\n"; }
if(not defined $::opt_blocksize) { $::opt_blocksize = "1M"; }
$::opt_blocksize = multiply_binary_prefix($::opt_blocksize);
if(defined $::opt_semaphore) { $Global::semaphore = 1; } if(defined $::opt_semaphore) { $Global::semaphore = 1; }
if(defined $::opt_semaphoretimeout) { $Global::semaphore = 1; } if(defined $::opt_semaphoretimeout) { $Global::semaphore = 1; }
if(defined $::opt_semaphorename) { $Global::semaphore = 1; } if(defined $::opt_semaphorename) { $Global::semaphore = 1; }
@ -728,6 +760,18 @@ sub usleep {
select(undef, undef, undef, $ms/1000); select(undef, undef, undef, $ms/1000);
} }
sub multiply_binary_prefix {
# Evalualte numbers with binary prefix
# 13G = 13*1073741824 = 13958643712
my $s = shift;
$s =~ s/Ki?/*1024/gi;
$s =~ s/Mi?/*1048576/gi;
$s =~ s/Gi?/*1073741824/gi;
$s =~ s/Ti?/*1099511627776/gi;
$s = eval $s;
return $s;
}
sub __RUNNING_AND_PRINTING_THE_JOBS__ {} sub __RUNNING_AND_PRINTING_THE_JOBS__ {}
@ -756,6 +800,7 @@ sub init_run_jobs {
sub drain_job_queue { sub drain_job_queue {
# Returns: N/A # Returns: N/A
$Private::first_completed ||= time;
if($::opt_progress) { if($::opt_progress) {
do_not_reap(); do_not_reap();
print $Global::original_stderr init_progress(); print $Global::original_stderr init_progress();
@ -830,13 +875,17 @@ sub progress {
my $completed = 0; my $completed = 0;
for(@workers) { $completed += $Global::host{$_}->jobs_completed() } for(@workers) { $completed += $Global::host{$_}->jobs_completed() }
if($completed) { if($completed) {
$Private::first_completed ||= time; my $jobs_left = ($Global::JobQueue->total_jobs() - $completed);
my $avgtime = (time-$Private::first_completed)/$completed; my $avgtime = (time-$Private::first_completed)/$completed;
my $this_eta = ($Global::JobQueue->total_jobs() - $completed) * $avgtime; $Private::smoothed_avg_time ||= $avgtime;
$Private::eta ||= $this_eta; $Private::smoothed_avg_time = 0.90 * $Private::smoothed_avg_time + 0.10 * $avgtime;
my $this_eta = $jobs_left * $Private::smoothed_avg_time;
#my $this_eta = ($Global::JobQueue->total_jobs() - $completed) * $avgtime;
#$Private::eta ||= $this_eta;
# Smooth the eta so it does not jump wildly # Smooth the eta so it does not jump wildly
$Private::eta = 0.98 * $Private::eta + 0.02 * $this_eta; #$Private::eta = 0.95 * $Private::eta + 0.05 * $this_eta;
$eta = sprintf("ETA: %ds ", $Private::eta); #$eta = sprintf("ETA: %ds $avgtime", $Private::eta);
$eta = sprintf("ETA: %ds %dleft %.2favg ", $this_eta, $jobs_left, $avgtime);
} }
} }
@ -2585,7 +2634,9 @@ sub returnsize {
# This is called after the job has finished # This is called after the job has finished
my $self = shift; my $self = shift;
for my $file ($self->return()) { for my $file ($self->return()) {
$self->{'returnsize'} += (stat($file))[7]; if(-e $file) {
$self->{'returnsize'} += (stat($file))[7];
}
} }
return $self->{'returnsize'}; return $self->{'returnsize'};
} }