parallel: Prepare for --round-robin with --pipepart.

--slotreplace for jobslot replacement string {%}.
Bugfix for --line-buffer.
Fixed bug #42272: Undefined subroutine &Job::dirname at line 4873.
Bugfix for sem when opening locking fail fails for a while.
--semaphoretimeout implemented.
This commit is contained in:
Ole Tange 2014-05-22 14:26:48 +02:00
parent b7b5725f6d
commit 30b54613e8

View file

@ -222,13 +222,13 @@ sub pipe_part_files {
my ($file) = @_; my ($file) = @_;
# find positions # find positions
my @pos = find_split_positions($file,$opt::blocksize); my @pos = find_split_positions($file,$opt::blocksize);
# unshift job with dd_prefix # unshift job with cat_partial
my @cmdlines; my @cmdlines;
for(my $i=0; $i<$#pos; $i++) { for(my $i=0; $i<$#pos; $i++) {
my $cmd = $Global::JobQueue->{'commandlinequeue'}->get(); my $cmd = $Global::JobQueue->{'commandlinequeue'}->get();
# TODO prepend --header (how?) # TODO prepend --header (how?)
$cmd->{'replaced'} = cat_partial($file, $pos[$i],$pos[$i+1])."|" . $cmd->{'replaced'} = cat_partial($file, $pos[$i],$pos[$i+1])."|" .
$cmd->{'replaced'}; "(".$cmd->{'replaced'}.")";
::debug("Unget ".$cmd->{'replaced'}."\n"); ::debug("Unget ".$cmd->{'replaced'}."\n");
push(@cmdlines, $cmd); push(@cmdlines, $cmd);
} }
@ -276,103 +276,16 @@ sub find_split_positions {
sub cat_partial { sub cat_partial {
# Input: # Input:
# $file = the file to read # $file = the file to read
# $start = start byte # ($start, $end, [$start2, $end2, ...]) = start byte, end byte
# $end = end byte
# Returns: # Returns:
# Efficent perl command to copy $start..$end to stdout # Efficient perl command to copy $start..$end, $start2..$end2, ... to stdout
my($file, $start, $end) = @_; my($file, @start_end) = @_;
my $len = $end - $start; my($start, $i);
# Convert start_end to start_len
my @start_len = map { if(++$i % 2) { $start = $_; } else { $_-$start } } @start_end;
return "<". shell_quote_scalar($file) . return "<". shell_quote_scalar($file) .
q{ perl -e 'sysseek(STDIN,shift,0) || die; $left = shift; while($read = sysread(STDIN,$buf, ($left > 32768 ? 32768 : $left))){ $left -= $read; syswrite(STDOUT,$buf); }' } . q{ perl -e 'while(@ARGV) { sysseek(STDIN,shift,0) || die; $left = shift; while($read = sysread(STDIN,$buf, ($left > 32768 ? 32768 : $left))){ $left -= $read; syswrite(STDOUT,$buf); } }' } .
" ".$start." ".$len; " @start_len";
}
sub _dd_prefix_part_job {
# Input:
# $file = the file to read
# $start = start byte
# $end = end byte
# Returns:
# Efficent dd command to copy $start..$end to stdout
my($file, $start, $end) = @_;
# The optimal blocksize for mint, redhat, solaris, openbsd = 2^17..2^20
# The optimal blocksize for freebsd = 2^15..2^17
my $big_block = 131072;
my $small_block = 512;
# Copy:
# start .. 512*n: 1 byte at a time (1 MB/s)
# 512*n .. 131072*n: 512 bytes at a time (300 MB/s)
# 131072*n1 .. 131072*n2: 131072 bytes at a time (1 GB/s)
# 131072*n .. 512*n: 512 bytes at a time (medium speed)
# 512*n .. end: 1 byte at a time
# start = 1234;
# end = 4321;
# len = end - start = 3087;
my $len = $end - $start;
# copy1_start = start;
my $copy1_start = $start;
# copy1_len = (10 - 1234) % 10 = (small_block - copy1_start) % small_block = 6;
my $copy1_len = ($small_block - $copy1_start) % $small_block;
# copy1_bs = 1;
my $copy1_bs = 1;
# copy1_count = 6 / 1 = copy1_len / copy1_bs = 6;
my $copy1_count = $copy1_len / $copy1_bs;
# copy1_skip = 1234 = start / copy1_bs;
my $copy1_skip = $start / $copy1_bs;
# copy2_start = start + copy1_len = 1240;
my $copy2_start = $start + $copy1_len;
# copy2_len = (100 - 1240) % 100 = (big_block - copy2_start) % big_block = 60;
my $copy2_len = ($big_block - $copy2_start) % $big_block;
# copy2_bs = small_block = 10;
my $copy2_bs = $small_block;
# copy2_count = 60 / 10 = copy2_len / copy2_bs = 6;
my $copy2_count = $copy2_len / $copy2_bs;
# copy2_skip = 1240 / 10 = copy2_start / copy2_bs = 124
my $copy2_skip = $copy2_start / $copy2_bs;
# copy5_len = 4321 % 10 = end % small_block = 1;
my $copy5_len = $end % $small_block;
# copy5_start = 4321 - 1 = end - copy5_len = 4320;
my $copy5_start = $end - $copy5_len;
# copy5_bs = 1;
my $copy5_bs = 1;
# copy5_count = 1 / 1 = copy5_len / copy5_bs = 1;
my $copy5_count = $copy5_len / $copy5_bs;
# copy5_skip = 4320 / 1 = copy5_start / copy5_bs = 4320
my $copy5_skip = $copy5_start / $copy5_bs;
# copy4_len = 4320 % 100 = copy5_start % big_block = 20;
my $copy4_len = $copy5_start % $big_block;
# copy4_start = end - copy5_len - copy4_len = 4300;
my $copy4_start = $end - $copy5_len - $copy4_len;
# copy4_bs = small_block = 10;
my $copy4_bs = $small_block;
# copy4_count = 20 / 10 = copy4_len / copy4_bs = 2;
my $copy4_count = $copy4_len / $copy4_bs;
# copy4_skip = 4300 / 10 = copy4_start / copy4_bs = 430
my $copy4_skip = $copy4_start / $copy4_bs;
# copy3_start = start + copy1_len + copy2_len = 1300;
my $copy3_start = $start + $copy1_len + $copy2_len;
# copy3_len = 4300 - 1300 = copy4_start - copy3_start = 3000;
my $copy3_len = $copy4_start - $copy3_start;
# copy3_bs = big_block = 100;
my $copy3_bs = $big_block;
# copy3_count = 3000 / 100 = copy3_len / copy3_bs = 3000;
my $copy3_count = $copy3_len / $copy3_bs;
# copy3_skip = 1300 / 100 = copy3_start / copy3_bs = 13
my $copy3_skip = $copy3_start / $copy3_bs;
return
"dd if=$file bs=$copy1_bs skip=$copy1_skip count=$copy1_count iflag=fullblock;" .
"dd if=$file bs=$copy2_bs skip=$copy2_skip count=$copy2_count iflag=fullblock;" .
"dd if=$file bs=$copy3_bs skip=$copy3_skip count=$copy3_count iflag=fullblock;" .
"dd if=$file bs=$copy4_bs skip=$copy4_skip count=$copy4_count iflag=fullblock;" .
"dd if=$file bs=$copy5_bs skip=$copy5_skip count=$copy5_count iflag=fullblock;"
;
} }
sub spreadstdin { sub spreadstdin {
@ -679,7 +592,8 @@ sub options_hash {
"dirnamereplace|dnr=s" => \$opt::dirnamereplace, "dirnamereplace|dnr=s" => \$opt::dirnamereplace,
"basenameextensionreplace|bner=s" => \$opt::basenameextensionreplace, "basenameextensionreplace|bner=s" => \$opt::basenameextensionreplace,
"seqreplace=s" => \$opt::seqreplace, "seqreplace=s" => \$opt::seqreplace,
"jobs|j=s" => \$opt::P, "slotreplace=s" => \$opt::slotreplace,
"jobs|j=s" => \$opt::jobs,
"delay=f" => \$opt::delay, "delay=f" => \$opt::delay,
"sshdelay=f" => \$opt::sshdelay, "sshdelay=f" => \$opt::sshdelay,
"load=s" => \$opt::load, "load=s" => \$opt::load,
@ -744,7 +658,7 @@ sub options_hash {
"bibtex" => \$opt::bibtex, "bibtex" => \$opt::bibtex,
"nn|nonotice|no-notice" => \$opt::no_notice, "nn|nonotice|no-notice" => \$opt::no_notice,
# xargs-compatibility - implemented, man, testsuite # xargs-compatibility - implemented, man, testsuite
"max-procs|P=s" => \$opt::P, "max-procs|P=s" => \$opt::jobs,
"delimiter|d=s" => \$opt::d, "delimiter|d=s" => \$opt::d,
"max-chars|s=i" => \$opt::max_chars, "max-chars|s=i" => \$opt::max_chars,
"arg-file|a=s" => \@opt::a, "arg-file|a=s" => \@opt::a,
@ -833,6 +747,7 @@ sub parse_options {
$Global::replace{'{//}'} = '{//}'; $Global::replace{'{//}'} = '{//}';
$Global::replace{'{/.}'} = '{/.}'; $Global::replace{'{/.}'} = '{/.}';
$Global::replace{'{#}'} = '{#}'; $Global::replace{'{#}'} = '{#}';
$Global::replace{'{%}'} = '{%}';
$/="\n"; $/="\n";
$Global::ignore_empty = 0; $Global::ignore_empty = 0;
$Global::interactive = 0; $Global::interactive = 0;
@ -874,6 +789,9 @@ sub parse_options {
if(defined $opt::seqreplace) { if(defined $opt::seqreplace) {
$Global::replace{'{#}'} = $opt::seqreplace; $Global::replace{'{#}'} = $opt::seqreplace;
} }
if(defined $opt::slotreplace) {
$Global::replace{'{%}'} = $opt::slotreplace;
}
if(defined $opt::E) { $Global::end_of_file_string = $opt::E; } if(defined $opt::E) { $Global::end_of_file_string = $opt::E; }
if(defined $opt::max_args) { $Global::max_number_of_args = $opt::max_args; } if(defined $opt::max_args) { $Global::max_number_of_args = $opt::max_args; }
if(defined $opt::timeout) { $Global::timeoutq = TimeoutQueue->new($opt::timeout); } if(defined $opt::timeout) { $Global::timeoutq = TimeoutQueue->new($opt::timeout); }
@ -940,8 +858,8 @@ sub parse_options {
if(defined $opt::tty) { if(defined $opt::tty) {
# Defaults for --tty: -j1 -u # Defaults for --tty: -j1 -u
# Can be overridden with -jXXX -g # Can be overridden with -jXXX -g
if(not defined $opt::P) { if(not defined $opt::jobs) {
$opt::P = 1; $opt::jobs = 1;
} }
if(not defined $opt::group) { if(not defined $opt::group) {
$Global::grouped = 0; $Global::grouped = 0;
@ -1018,8 +936,8 @@ sub parse_options {
$Semaphore::fg = $opt::fg; $Semaphore::fg = $opt::fg;
$Semaphore::wait = $opt::wait; $Semaphore::wait = $opt::wait;
$Global::default_simultaneous_sshlogins = 1; $Global::default_simultaneous_sshlogins = 1;
if(not defined $opt::P) { if(not defined $opt::jobs) {
$opt::P = 1; $opt::jobs = 1;
} }
if($Global::interactive and $opt::bg) { if($Global::interactive and $opt::bg) {
::error("Jobs running in the ". ::error("Jobs running in the ".
@ -1056,8 +974,8 @@ sub parse_options {
::warning("Using -X or -m with --sshlogin may fail.\n"); ::warning("Using -X or -m with --sshlogin may fail.\n");
} }
if(not defined $opt::P) { if(not defined $opt::jobs) {
$opt::P = "100%"; $opt::jobs = "100%";
} }
open_joblog(); open_joblog();
} }
@ -2297,7 +2215,7 @@ sub onall {
# -P should only go to the first, and -S should not be copied at all. # -P should only go to the first, and -S should not be copied at all.
my $options = my $options =
join(" ", join(" ",
((defined $opt::P) ? "-P $opt::P" : ""), ((defined $opt::jobs) ? "-P $opt::jobs" : ""),
((defined $opt::u) ? "-u" : ""), ((defined $opt::u) ? "-u" : ""),
((defined $opt::group) ? "-g" : ""), ((defined $opt::group) ? "-g" : ""),
((defined $opt::keeporder) ? "--keeporder" : ""), ((defined $opt::keeporder) ? "--keeporder" : ""),
@ -2508,9 +2426,9 @@ sub usage {
"-j n Run n jobs in parallel", "-j n Run n jobs in parallel",
"-k Keep same order", "-k Keep same order",
"-X Multiple arguments with context replace", "-X Multiple arguments with context replace",
"--colsep regexp Split input on regexp for positional replacements", "--colsep regexp Split input on regexp for positional replacements",
"{} {.} {/} {/.} {#} Replacement strings", "{} {.} {/} {/.} {#} {%} Replacement strings",
"{3} {3.} {3/} {3/.} Positional replacement strings", "{3} {3.} {3/} {3/.} Positional replacement strings",
"", "",
"-S sshlogin Example: foo\@server.example.com", "-S sshlogin Example: foo\@server.example.com",
"--slf .. Use ~/.parallel/sshloginfile as the list of sshlogins", "--slf .. Use ~/.parallel/sshloginfile as the list of sshlogins",
@ -3224,7 +3142,7 @@ sub set_time_to_login {
sub max_jobs_running { sub max_jobs_running {
my $self = shift; my $self = shift;
if(not defined $self->{'max_jobs_running'}) { if(not defined $self->{'max_jobs_running'}) {
my $nproc = $self->compute_number_of_processes($opt::P); my $nproc = $self->compute_number_of_processes($opt::jobs);
$self->set_max_jobs_running($nproc); $self->set_max_jobs_running($nproc);
} }
return $self->{'max_jobs_running'}; return $self->{'max_jobs_running'};
@ -4307,13 +4225,14 @@ sub openoutputfiles {
my ($outfhw, $errfhw, $outname, $errname); my ($outfhw, $errfhw, $outname, $errname);
if($opt::results) { if($opt::results) {
my $args_as_dirname = $self->{'commandline'}->args_as_dirname(); my $args_as_dirname = $self->{'commandline'}->args_as_dirname();
# prefix/name1/val1/name2/val2/ # Output in: prefix/name1/val1/name2/val2/stdout
my $dir = $opt::results."/".$args_as_dirname; my $dir = $opt::results."/".$args_as_dirname;
if(eval{ File::Path::mkpath($dir); }) { if(eval{ File::Path::mkpath($dir); }) {
# OK # OK
} else { } else {
# mkpath failed: Argument probably too long. # mkpath failed: Argument probably too long.
# Set $Global::max_file_length # Set $Global::max_file_length, which will keep the individual
# dir names shorter than the max length
max_file_name_length($opt::results); max_file_name_length($opt::results);
$args_as_dirname = $self->{'commandline'}->args_as_dirname(); $args_as_dirname = $self->{'commandline'}->args_as_dirname();
# prefix/name1/val1/name2/val2/ # prefix/name1/val1/name2/val2/
@ -4382,16 +4301,18 @@ sub openoutputfiles {
$self->set_fh($fdno,'r',$fdr); $self->set_fh($fdno,'r',$fdr);
$self->set_fh($fdno,'rpid',$rpid); $self->set_fh($fdno,'rpid',$rpid);
# Unlink if required but only when cattail and compress_program has started. # Unlink if required but only when cattail and compress_program has started.
# TODO this is disabled for now # How do we know when cattail and compress have opened the files?
# Disabled for now.
# unlink $self->fh($fdno,"unlink"); # unlink $self->fh($fdno,"unlink");
} }
} else { } elsif($Global::grouped) {
# Set reading FD # Set reading FD if using --group (--ungroup does not need)
for my $fdno (1,2) { for my $fdno (1,2) {
# Re-open the file for reading # Re-open the file for reading
# so fdw can be closed seperately # so fdw can be closed seperately
# and fdr can be seeked seperately (for --line-buffer) # and fdr can be seeked seperately (for --line-buffer)
open(my $fdr,"<", $self->fh($fdno,'name')) || die; open(my $fdr,"<", $self->fh($fdno,'name')) ||
::die_bug("fdr: Cannot open ".$self->fh($fdno,'name'));
$self->set_fh($fdno,'r',$fdr); $self->set_fh($fdno,'r',$fdr);
# Unlink if required # Unlink if required
$Global::debug or unlink $self->fh($fdno,"unlink"); $Global::debug or unlink $self->fh($fdno,"unlink");
@ -4411,7 +4332,8 @@ sub openoutputfiles {
} }
sub max_file_name_length { sub max_file_name_length {
# Figure out the max length of a subdir and the max total length # Figure out the max length of a subdir
# TODO and the max total length
# Ext4 = 255,130816 # Ext4 = 255,130816
my $testdir = shift; my $testdir = shift;
@ -4427,6 +4349,8 @@ sub max_file_name_length {
my $min = $len/16; my $min = $len/16;
my $max = $len; my $max = $len;
while($max-$min > 5) { while($max-$min > 5) {
# If we are within 5 chars of the exact value:
# it is not worth the extra time to find the exact value
my $test = int(($min+$max)/2); my $test = int(($min+$max)/2);
$dir="x"x$test; $dir="x"x$test;
if(mkdir $testdir."/".$dir) { if(mkdir $testdir."/".$dir) {
@ -4871,7 +4795,7 @@ sub sshreturn {
} }
# Only load File::Basename if actually needed # Only load File::Basename if actually needed
$Global::use{"File::Basename"} ||= eval "use File::Basename; 1;"; $Global::use{"File::Basename"} ||= eval "use File::Basename; 1;";
$cd = ::shell_quote_file(dirname($file)); $cd = ::shell_quote_file(::dirname($file));
my $rsync_cd = '--rsync-path='.::shell_quote_scalar("cd $wd$cd; rsync"); my $rsync_cd = '--rsync-path='.::shell_quote_scalar("cd $wd$cd; rsync");
my $basename = ::shell_quote_scalar(::shell_quote_file(basename($file))); my $basename = ::shell_quote_scalar(::shell_quote_file(basename($file)));
# --return # --return
@ -5494,6 +5418,15 @@ sub seq {
return $self->{'seq'}; return $self->{'seq'};
} }
sub slot {
my $self = shift;
# $Global::max_jobs_running is 0 while computing $Global::max_jobs_running
# So assume it is huge
my $mod = ($Global::max_jobs_running || 1000000);
my $add = $Global::max_jobs_running ? 1 : 0;
return ($self->{'seq'} - $add) % $mod + $add;
}
sub populate { sub populate {
# Add arguments from arg_queue until the number of arguments or # Add arguments from arg_queue until the number of arguments or
# max line length is reached # max line length is reached
@ -5671,22 +5604,27 @@ sub args_as_string {
sub args_as_dirname { sub args_as_dirname {
# Returns: # Returns:
# all unmodified arguments joined with '/' (similar to {}) # all unmodified arguments joined with '/' (similar to {})
# \t \0 \\ and / are quoted # \t \0 \\ and / are quoted as: \t \0 \\ \_
# If $Global::max_file_length: Keep labels < $Global::max_file_length # If $Global::max_file_length: Keep subdirs < $Global::max_file_length
my $self = shift; my $self = shift;
my @res = (); my @res = ();
for my $rec_ref (@{$self->{'arg_list'}}) { for my $rec_ref (@{$self->{'arg_list'}}) {
# If headers are used, sort by them.
# Otherwise keep the order from the command line.
my @header_indexes_sorted = header_indexes_sorted($#$rec_ref+1); my @header_indexes_sorted = header_indexes_sorted($#$rec_ref+1);
for my $n (@header_indexes_sorted) { for my $n (@header_indexes_sorted) {
CORE::push(@res, CORE::push(@res,
$Global::input_source_header{$n}, $Global::input_source_header{$n},
map { my $s = $_; map { my $s = $_;
# \t \0 \\ and / are quoted as: \t \0 \\ \_
$s =~ s/\\/\\\\/g; $s =~ s/\\/\\\\/g;
$s =~ s/\t/\\t/g; $s =~ s/\t/\\t/g;
$s =~ s/\0/\\0/g; $s =~ s/\0/\\0/g;
$s =~ s:/:\\_:g; $s =~ s:/:\\_:g;
if($Global::max_file_length) { if($Global::max_file_length) {
# Keep each subdir shorter than the longest
# allowed file name
$s = substr($s,0,$Global::max_file_length); $s = substr($s,0,$Global::max_file_length);
} }
$s; } $s; }
@ -5828,7 +5766,7 @@ sub number_of_replacements {
} }
for my $k (keys %count) { for my $k (keys %count) {
if(defined $Global::replace{$k}) { if(defined $Global::replace{$k}) {
# {} {/} {//} {.} {/.} {#} # {} {/} {//} {.} {/.} {#} {%}
$context -= (length $Global::replace{$k}) * $count{$k}; $context -= (length $Global::replace{$k}) * $count{$k};
} else { } else {
# {n} # {n}
@ -5933,6 +5871,8 @@ sub context_replace_placeholders {
my $rep_inner_regexp = "(?:". join('|', map { my $s = $_; $s =~ s/(\W)/\\$1/g; $s } @rep_inner) . ")"; my $rep_inner_regexp = "(?:". join('|', map { my $s = $_; $s =~ s/(\W)/\\$1/g; $s } @rep_inner) . ")";
# Seq replace string: {#} # Seq replace string: {#}
my $rep_seq_regexp = '(?:'.::maybe_quote('\{\#\}').")"; my $rep_seq_regexp = '(?:'.::maybe_quote('\{\#\}').")";
# Slot replace string: {%}
my $rep_slot_regexp = '(?:'.::maybe_quote('\{\%\}').")";
# Normal replace strings # Normal replace strings
my $rep_str_regexp = multi_regexp(); my $rep_str_regexp = multi_regexp();
# Positional replace strings # Positional replace strings
@ -5941,7 +5881,7 @@ sub context_replace_placeholders {
# Fish out the words that have replacement strings in them # Fish out the words that have replacement strings in them
my $tt = $target; my $tt = $target;
my %word; my %word;
while($tt =~ s/(\S*(?:$rep_str_regexp|$rep_str_pos_regexp|$rep_seq_regexp)\S*)/\0/o) { while($tt =~ s/(\S*(?:$rep_str_regexp|$rep_str_pos_regexp|$rep_seq_regexp|$rep_slot_regexp)\S*)/\0/o) {
$word{$1} ||= 1; $word{$1} ||= 1;
} }
if(not %word) { if(not %word) {
@ -5957,6 +5897,8 @@ sub context_replace_placeholders {
# replace {#} if it exists # replace {#} if it exists
$word =~ s/$rep_seq_regexp/$self->seq()/geo; $word =~ s/$rep_seq_regexp/$self->seq()/geo;
# replace {%} if it exists
$word =~ s/$rep_slot_regexp/$self->slot()/geo;
if($word =~ /$rep_str_pos_regexp/o) { if($word =~ /$rep_str_pos_regexp/o) {
# There are positional replacement strings # There are positional replacement strings
my @argset; my @argset;
@ -6035,7 +5977,7 @@ sub simple_replace_placeholders {
} }
my $n = $#args+1; my $n = $#args+1;
# Which replace strings are used? # Which replace strings are used?
# {#} {} {/} {//} {.} {/.} {n} {n/} {n//} {n.} {n/.} # {#} {%} {} {/} {//} {.} {/.} {n} {n/} {n//} {n.} {n/.}
for my $used (keys %{$self->{'replacecount'}}) { for my $used (keys %{$self->{'replacecount'}}) {
# What are the replacement values for the replace strings? # What are the replacement values for the replace strings?
if(grep { $used eq $_ } qw({} {/} {//} {.} {/.})) { if(grep { $used eq $_ } qw({} {/} {//} {.} {/.})) {
@ -6062,6 +6004,9 @@ sub simple_replace_placeholders {
} elsif($used eq "{#}") { } elsif($used eq "{#}") {
# {#} # {#}
$replace{$Global::replace{$used}} = $self->seq(); $replace{$Global::replace{$used}} = $self->seq();
} elsif($used eq "{%}") {
# {%}
$replace{$Global::replace{$used}} = $self->slot();
} else { } else {
::die_bug('simple_replace_placeholders_20110530'); ::die_bug('simple_replace_placeholders_20110530');
} }
@ -6741,7 +6686,6 @@ sub set_remedian {
$rref->[1][$i/999%999] = (sort @{$rref->[0]})[$#{$rref->[0]}/2]; $rref->[1][$i/999%999] = (sort @{$rref->[0]})[$#{$rref->[0]}/2];
$rref->[2][$i/999/999%999] = (sort @{$rref->[1]})[$#{$rref->[1]}/2]; $rref->[2][$i/999/999%999] = (sort @{$rref->[1]})[$#{$rref->[1]}/2];
$self->{'remedian'} = (sort @{$rref->[2]})[$#{$rref->[2]}/2]; $self->{'remedian'} = (sort @{$rref->[2]})[$#{$rref->[2]}/2];
# die((sort @{$rref->[2]})[$#{$rref->[2]}/2]);
} }
sub update_delta_time { sub update_delta_time {
@ -6912,23 +6856,51 @@ sub nlinks {
sub lock { sub lock {
my $self = shift; my $self = shift;
my $sleep = 100; # 100 ms my $sleep = 100; # 100 ms
open $self->{'lockfh'}, ">", $self->{'lockfile'} my $total_sleep = 0;
or ::die_bug("Can't open semaphore file $self->{'lockfile'}: $!");
chmod 0666, $self->{'lockfile'}; # assuming you want it a+rw
$Global::use{"Fcntl"} ||= eval "use Fcntl qw(:DEFAULT :flock); 1;"; $Global::use{"Fcntl"} ||= eval "use Fcntl qw(:DEFAULT :flock); 1;";
while(not flock $self->{'lockfh'}, LOCK_EX()|LOCK_NB()) { my $locked = 0;
if ($! =~ m/Function not implemented/) { while(not $locked) {
::warning("flock: $!"); if(tell($self->{'lockfh'}) == -1) {
::warning("Will wait for a random while\n"); # File not open
::usleep(rand(5000)); open($self->{'lockfh'}, ">", $self->{'lockfile'})
last; or ::debug("Cannot open $self->{'lockfile'}");
} }
if($self->{'lockfh'}) {
::debug("Cannot lock $self->{'lockfile'}"); # File is open
# TODO if timeout: last chmod 0666, $self->{'lockfile'}; # assuming you want it a+rw
if(flock($self->{'lockfh'}, LOCK_EX()|LOCK_NB())) {
# The file is locked: No need to retry
$locked = 1;
last;
} else {
if ($! =~ m/Function not implemented/) {
::warning("flock: $!");
::warning("Will wait for a random while\n");
::usleep(rand(5000));
# File cannot be locked: No need to retry
$locked = 2;
last;
}
}
}
# Locking failed in first round
# Sleep and try again
$sleep = ($sleep < 1000) ? ($sleep * 1.1) : ($sleep); $sleep = ($sleep < 1000) ? ($sleep * 1.1) : ($sleep);
# Random to avoid every sleeping job waking up at the same time # Random to avoid every sleeping job waking up at the same time
::usleep(rand()*$sleep); ::usleep(rand()*$sleep);
$total_sleep += $sleep;
if($opt::semaphoretimeout) {
if($total_sleep/1000 > $opt::semaphoretimeout) {
# Timeout: bail out
::warning("Semaphore timed out. Ignoring timeout.");
$locked = 3;
last;
}
} else {
if($total_sleep/1000 > 30) {
::warning("Semaphore stuck for 30 seconds. Consider using --semaphoretimeout.");
}
}
} }
::debug("locked $self->{'lockfile'}"); ::debug("locked $self->{'lockfile'}");
} }