parallel/src/parsort

279 lines
7.1 KiB
Plaintext
Raw Normal View History

#!/usr/bin/perl
=pod
=head1 NAME
parsort - Sort (big files) in parallel
=head1 SYNOPSIS
B<parsort> I<options for sort>
=head1 DESCRIPTION
B<parsort> uses GNU B<sort> to sort in parallel. It works just like
B<sort> but faster on inputs with more than 1 M lines, if you have a
multicore machine.
Hopefully these ideas will make it into GNU Sort in the future.
=head1 EXAMPLE
Sort files:
parsort *.txt > sorted.txt
Sort stdin (standard input) numerically:
cat numbers | parsort -n > sorted.txt
=head1 PERFORMANCE
B<parsort> is faster on files, because these can be read in parallel.
On a 48 core machine you should see a speedup of 3x over B<sort>.
=head1 AUTHOR
Copyright (C) 2020 Ole Tange,
http://ole.tange.dk and Free Software Foundation, Inc.
=head1 LICENSE
Copyright (C) 2012 Free Software Foundation, Inc.
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; either version 3 of the License, or
at your option any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
=head1 DEPENDENCIES
B<parsort> uses B<sort>, B<bash>, B<parallel>, and B<mbuffer>.
=head1 SEE ALSO
B<sort>
=cut
use strict;
use Getopt::Long;
use POSIX qw(mkfifo);
Getopt::Long::Configure("bundling","require_order");
my @ARGV_before = @ARGV;
GetOptions(
"debug|D" => \$opt::D,
"version" => \$opt::version,
"verbose|v" => \$opt::verbose,
"b|ignore-leading-blanks" => \$opt::ignore_leading_blanks,
"d|dictionary-order" => \$opt::dictionary_order,
"f|ignore-case" => \$opt::ignore_case,
"g|general-numeric-sort" => \$opt::general_numeric_sort,
"i|ignore-nonprinting" => \$opt::ignore_nonprinting,
"M|month-sort" => \$opt::month_sort,
"h|human-numeric-sort" => \$opt::human_numeric_sort,
"n|numeric-sort" => \$opt::numeric_sort,
"N|numascii" => \$opt::numascii,
"r|reverse" => \$opt::reverse,
"R|random-sort" => \$opt::random_sort,
"sort=s" => \$opt::sort,
"V|version-sort" => \$opt::version_sort,
"k|key=s" => \@opt::key,
"t|field-separator=s" => \$opt::field_separator,
"z|zero-terminated" => \$opt::zero_terminated,
"files0-from=s" => \$opt::files0_from,
"random-source=s" => \$opt::dummy,
"batch-size=s" => \$opt::dummy,
"check=s" => \$opt::dummy,
"c" => \$opt::dummy,
"C" => \$opt::dummy,
"compress-program=s" => \$opt::dummy,
"T|temporary-directory=s" => \$opt::dummy,
"parallel=s" => \$opt::dummy,
"u|unique" => \$opt::dummy,
"S|buffer-size=s" => \$opt::dummy,
"s|stable" => \$opt::dummy,
"help" => \$opt::dummy,
) || exit(255);
$Global::progname = ($0 =~ m:(^|/)([^/]+)$:)[1];
$Global::version = 20200412;
if($opt::version) { version(); exit 0; }
@Global::sortoptions = @ARGV_before[0..($#ARGV_before-$#ARGV-1)];
#if($opt::zero_terminated) { $/ = "\0"; }
$ENV{'TMPDIR'} ||= "/tmp";
sub merge {
# Input:
# @cmd = commands to 'cat' (part of) a file
my @cmd = @_;
chomp(@cmd);
while($#cmd > 0) {
my @tmp;
while($#cmd >= 0) {
my $a = shift @cmd;
my $b = shift @cmd;
$a &&= "<($a)";
$b &&= "<($b)";
# Ignore errors from mbuffer - it gives errors when a pipe is closed
push @tmp, "sort -m @Global::sortoptions $a $b | ".buffer();
}
@cmd = @tmp;
}
return @cmd;
}
sub sort_files {
# Input is files
my @files = @_;
# Let GNU Parallel generate the commands to read parts of files
# The commands split at \n and there will be at least one for each CPU thread
open(my $par,"-|",qw(parallel --pipepart --block -1 --dryrun -vv sort),
@Global::sortoptions, '::::', @files) || die;
my @cmd = merge(<$par>);
close $par;
# The command uses <(...) so it is incompatible with /bin/sh
open(my $bash,"|-","bash") || die;
print $bash @cmd;
close $bash;
}
sub sort_stdin {
# Input is stdin
# Spread the input between n processes that each sort
# n = number of CPU threads
my $numthreads = `parallel --number-of-threads`;
my @fifos = map { tmpfifo() } 1..$numthreads;
map { mkfifo($_,0600) } @fifos;
# This trick removes the fifo as soon as it is connected in the other end
# (rm fifo; ...) < fifo
my @cmd = map { "(rm $_; sort @Global::sortoptions) < $_" } @fifos;
@cmd = merge(@cmd);
if(fork) {
} else {
exec(qw(parallel -j),$numthreads,
# 1M 30M = 43s
# 3M 30M = 59s
# 300k 30M = 40-45s
# 100k 30M = 47s
# 500k 30M = 44s
# 300k 10M = 41-45s
# 256k 10M = 42-44s
# 300k 3M = 42-45s
# 300k - = 47s
# 286k is the best mean value after testing 250..350
qw(--block 286k --pipe --roundrobin ),buffer(),qw(> {} :::),@fifos);
}
# The command uses <(...) so it is incompatible with /bin/sh
open(my $bash,"|-","bash") || die;
print $bash @cmd;
close $bash;
}
sub tmpname {
# Select a name that does not exist
# Do not create the file as it may be used for creating a socket (by tmux)
# Remember the name in $Global::unlink to avoid hitting the same name twice
my $name = shift;
my($tmpname);
if(not -w $ENV{'TMPDIR'}) {
if(not -e $ENV{'TMPDIR'}) {
::error("Tmpdir '$ENV{'TMPDIR'}' does not exist.","Try 'mkdir $ENV{'TMPDIR'}'");
} else {
::error("Tmpdir '$ENV{'TMPDIR'}' is not writable.","Try 'chmod +w $ENV{'TMPDIR'}'");
}
::wait_and_exit(255);
}
do {
$tmpname = $ENV{'TMPDIR'}."/".$name.
join"", map { (0..9,"a".."z","A".."Z")[rand(62)] } (1..5);
} while(-e $tmpname);
return $tmpname;
}
sub tmpfifo {
# Find an unused name and mkfifo on it
my $tmpfifo = tmpname("psort");
mkfifo($tmpfifo,0600);
return $tmpfifo;
}
{
my $buffer;
sub buffer {
if(not defined $buffer) {
if(which("mbuffker")) {
# Use mbuffer if installed
# 30M = 43s
# 10M = 41-45s
# 3M = 42-45s
# Ignore errors from mbuffer - it gives errors when a pipe is closed
$buffer = "mbuffer -v0 -q -m 30M";
} else {
$buffer = "cat";
}
}
return $buffer;
}
}
sub which {
# Input:
# @programs = programs to find the path to
# Returns:
# @full_path = full paths to @programs. Nothing if not found
my @which;
for my $prg (@_) {
push(@which, grep { not -d $_ and -x $_ }
map { $_."/".$prg } split(":",$ENV{'PATH'}));
if($prg =~ m:/:) {
# Including path
push(@which, grep { not -d $_ and -x $_ } $prg);
}
}
return wantarray ? @which : $which[0];
}
if(@ARGV) {
sort_files(@ARGV);
} elsif(length $opt::files0_from) {
$/="\0";
open(my $fh,"<",$opt::files0_from) || die;
my @files = <$fh>;
chomp(@files);
sort_files(@files);
} else {
sort_stdin();
}
# Test
# -z
# OK: cat bigfile | parsort
# OK: parsort -k4n files*.txt
# OK: parsort files*.txt
# OK: parsort "file with space"