parsort: Initial version.
This commit is contained in:
parent
e8f520f642
commit
39b4542bf2
19
Makefile
19
Makefile
|
@ -1,17 +1,18 @@
|
||||||
CMD = blink 2grep 2search burncpu duplicate-packets em encdir field forever \
|
CMD = blink 2grep 2search burncpu duplicate-packets em encdir field \
|
||||||
fxkill G gitnext gitundo goodpasswd histogram mtrr mirrorpdf \
|
forever fxkill G gitnext gitundo goodpasswd histogram mtrr \
|
||||||
neno off pdfman pidcmd plotpipe puniq ramusage rand rclean \
|
mirrorpdf neno off parsort pdfman pidcmd plotpipe puniq \
|
||||||
rina rn rrm seekmaniac shython sound-reload splitvideo stdout \
|
ramusage rand rclean rina rn rrm seekmaniac shython \
|
||||||
swapout T timestamp tracefile transpose upsidedown vid \
|
sound-reload splitvideo stdout swapout T timestamp tracefile \
|
||||||
w4it-for-port-open whitehash wifi-reload wssh ytv yyyymmdd
|
transpose upsidedown vid w4it-for-port-open whitehash \
|
||||||
|
wifi-reload wssh ytv yyyymmdd
|
||||||
|
|
||||||
all: blink/blink.1 2search/2grep.1 2search/2search.1 \
|
all: blink/blink.1 2search/2grep.1 2search/2search.1 \
|
||||||
burncpu/burncpu.1 encdir/encdir.1 G/G.1 gitnext/gitnext.1 \
|
burncpu/burncpu.1 encdir/encdir.1 G/G.1 gitnext/gitnext.1 \
|
||||||
gitundo/gitundo.1 goodpasswd/goodpasswd.1 \
|
gitundo/gitundo.1 goodpasswd/goodpasswd.1 \
|
||||||
histogram/histogram.1 mirrorpdf/mirrorpdf.1 neno/neno.1 \
|
histogram/histogram.1 mirrorpdf/mirrorpdf.1 neno/neno.1 \
|
||||||
off/off.1 pdfman/pdfman.1 pidcmd/pidcmd.1 plotpipe/plotpipe.1 \
|
off/off.1 parsort/parsort.1 pdfman/pdfman.1 pidcmd/pidcmd.1 \
|
||||||
puniq/puniq.1 rand/rand.1 rina/rina.1 rn/rn.1 rrm/rrm.1 \
|
plotpipe/plotpipe.1 puniq/puniq.1 rand/rand.1 rina/rina.1 \
|
||||||
seekmaniac/seekmaniac.1 shython/shython.1 \
|
rn/rn.1 rrm/rrm.1 seekmaniac/seekmaniac.1 shython/shython.1 \
|
||||||
sound-reload/sound-reload.1 splitvideo/splitvideo.1 \
|
sound-reload/sound-reload.1 splitvideo/splitvideo.1 \
|
||||||
stdout/stdout.1 timestamp/timestamp.1 tracefile/tracefile.1 \
|
stdout/stdout.1 timestamp/timestamp.1 tracefile/tracefile.1 \
|
||||||
transpose/transpose.1 T/T.1 upsidedown/upsidedown.1 vid/vid.1 \
|
transpose/transpose.1 T/T.1 upsidedown/upsidedown.1 vid/vid.1 \
|
||||||
|
|
214
parsort/parsort
Executable file
214
parsort/parsort
Executable file
|
@ -0,0 +1,214 @@
|
||||||
|
#!/usr/bin/perl
|
||||||
|
|
||||||
|
=pod
|
||||||
|
|
||||||
|
=head1 NAME
|
||||||
|
|
||||||
|
parsort - Sort in parallel
|
||||||
|
|
||||||
|
|
||||||
|
=head1 SYNOPSIS
|
||||||
|
|
||||||
|
B<parsort> I<options for sort>
|
||||||
|
|
||||||
|
|
||||||
|
=head1 DESCRIPTION
|
||||||
|
|
||||||
|
B<parsort> uses B<sort> to sort in parallel. It works just like
|
||||||
|
B<sort> but faster, if you have a multicore machine.
|
||||||
|
|
||||||
|
Hopefully these ideas will make it into GNU Sort in the future.
|
||||||
|
|
||||||
|
|
||||||
|
=head1 EXAMPLE
|
||||||
|
|
||||||
|
Sort files:
|
||||||
|
|
||||||
|
parsort *.txt > sorted.txt
|
||||||
|
|
||||||
|
Sort stdin (standard input) numerically:
|
||||||
|
|
||||||
|
cat numbers | parsort -n > sorted.txt
|
||||||
|
|
||||||
|
|
||||||
|
=head1 PERFORMANCE
|
||||||
|
|
||||||
|
B<parsort> is faster on files, because these can be read in parallel.
|
||||||
|
|
||||||
|
On a 48 core machine you should see a speedup of 3x over B<sort>.
|
||||||
|
|
||||||
|
|
||||||
|
=head1 AUTHOR
|
||||||
|
|
||||||
|
Copyright (C) 2020 Ole Tange,
|
||||||
|
http://ole.tange.dk and Free Software Foundation, Inc.
|
||||||
|
|
||||||
|
|
||||||
|
=head1 LICENSE
|
||||||
|
|
||||||
|
Copyright (C) 2012 Free Software Foundation, Inc.
|
||||||
|
|
||||||
|
This program is free software; you can redistribute it and/or modify
|
||||||
|
it under the terms of the GNU General Public License as published by
|
||||||
|
the Free Software Foundation; either version 3 of the License, or
|
||||||
|
at your option any later version.
|
||||||
|
|
||||||
|
This program is distributed in the hope that it will be useful,
|
||||||
|
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||||
|
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||||
|
GNU General Public License for more details.
|
||||||
|
|
||||||
|
You should have received a copy of the GNU General Public License
|
||||||
|
along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
|
||||||
|
|
||||||
|
=head1 DEPENDENCIES
|
||||||
|
|
||||||
|
B<parsort> uses B<sort>, B<bash>, B<parallel>, and B<mbuffer>.
|
||||||
|
|
||||||
|
|
||||||
|
=head1 SEE ALSO
|
||||||
|
|
||||||
|
B<sort>
|
||||||
|
|
||||||
|
|
||||||
|
=cut
|
||||||
|
|
||||||
|
use strict;
|
||||||
|
use Getopt::Long;
|
||||||
|
use POSIX qw(mkfifo);
|
||||||
|
|
||||||
|
Getopt::Long::Configure("bundling","require_order");
|
||||||
|
|
||||||
|
my @ARGV_before = @ARGV;
|
||||||
|
GetOptions(
|
||||||
|
"debug|D" => \$opt::D,
|
||||||
|
"version" => \$opt::version,
|
||||||
|
"verbose|v" => \$opt::verbose,
|
||||||
|
"b|ignore-leading-blanks" => \$opt::ignore_leading_blanks,
|
||||||
|
"d|dictionary-order" => \$opt::dictionary_order,
|
||||||
|
"f|ignore-case" => \$opt::ignore_case,
|
||||||
|
"g|general-numeric-sort" => \$opt::general_numeric_sort,
|
||||||
|
"i|ignore-nonprinting" => \$opt::ignore_nonprinting,
|
||||||
|
"M|month-sort" => \$opt::month_sort,
|
||||||
|
"h|human-numeric-sort" => \$opt::human_numeric_sort,
|
||||||
|
"n|numeric-sort" => \$opt::numeric_sort,
|
||||||
|
"N|numascii" => \$opt::numascii,
|
||||||
|
"r|reverse" => \$opt::reverse,
|
||||||
|
"R|random-sort" => \$opt::random_sort,
|
||||||
|
"sort=s" => \$opt::sort,
|
||||||
|
"V|version-sort" => \$opt::version_sort,
|
||||||
|
"k|key=s" => \@opt::key,
|
||||||
|
"t|field-separator=s" => \$opt::field_separator,
|
||||||
|
"z|zero-terminated" => \$opt::zero_terminated,
|
||||||
|
) || exit(255);
|
||||||
|
$Global::progname = ($0 =~ m:(^|/)([^/]+)$:)[1];
|
||||||
|
$Global::version = 20200411;
|
||||||
|
if($opt::version) { version(); exit 0; }
|
||||||
|
if($opt::zero_terminated) { $/ = "\0"; }
|
||||||
|
@Global::sortoptions = @ARGV_before[0..($#ARGV_before-$#ARGV-1)];
|
||||||
|
$ENV{'TMPDIR'} ||= "/tmp";
|
||||||
|
|
||||||
|
sub merge {
|
||||||
|
# Input:
|
||||||
|
# @cmd = commands to 'cat' (part of) a file
|
||||||
|
my @cmd = @_;
|
||||||
|
chomp(@cmd);
|
||||||
|
while($#cmd > 0) {
|
||||||
|
my @tmp;
|
||||||
|
while($#cmd >= 0) {
|
||||||
|
my $a = shift @cmd;
|
||||||
|
my $b = shift @cmd;
|
||||||
|
$a &&= "<($a)";
|
||||||
|
$b &&= "<($b)";
|
||||||
|
# Ignore errors from mbuffer - it gives errors when a pipe is closed
|
||||||
|
push @tmp, "sort -m @Global::sortoptions $a $b | mbuffer -v0 -q -m 30M;";
|
||||||
|
}
|
||||||
|
@cmd = @tmp;
|
||||||
|
}
|
||||||
|
return @cmd;
|
||||||
|
}
|
||||||
|
|
||||||
|
sub tmpname {
|
||||||
|
# Select a name that does not exist
|
||||||
|
# Do not create the file as it may be used for creating a socket (by tmux)
|
||||||
|
# Remember the name in $Global::unlink to avoid hitting the same name twice
|
||||||
|
my $name = shift;
|
||||||
|
my($tmpname);
|
||||||
|
if(not -w $ENV{'TMPDIR'}) {
|
||||||
|
if(not -e $ENV{'TMPDIR'}) {
|
||||||
|
::error("Tmpdir '$ENV{'TMPDIR'}' does not exist.","Try 'mkdir $ENV{'TMPDIR'}'");
|
||||||
|
} else {
|
||||||
|
::error("Tmpdir '$ENV{'TMPDIR'}' is not writable.","Try 'chmod +w $ENV{'TMPDIR'}'");
|
||||||
|
}
|
||||||
|
::wait_and_exit(255);
|
||||||
|
}
|
||||||
|
do {
|
||||||
|
$tmpname = $ENV{'TMPDIR'}."/".$name.
|
||||||
|
join"", map { (0..9,"a".."z","A".."Z")[rand(62)] } (1..5);
|
||||||
|
} while(-e $tmpname or $Global::unlink{$tmpname}++);
|
||||||
|
return $tmpname;
|
||||||
|
}
|
||||||
|
|
||||||
|
sub tmpfifo {
|
||||||
|
# Find an unused name and mkfifo on it
|
||||||
|
my $tmpfifo = tmpname("psort");
|
||||||
|
mkfifo($tmpfifo,0600);
|
||||||
|
return $tmpfifo;
|
||||||
|
}
|
||||||
|
|
||||||
|
sub sort_files {
|
||||||
|
my @files = @ARGV;
|
||||||
|
# Let GNU Parallel generate the commands to read parts of files
|
||||||
|
# The commands split at \n and there will be at least one for each CPU thread
|
||||||
|
open(my $par,"-|",qw(parallel --pipepart --block -1 --dryrun -vv sort),
|
||||||
|
@Global::sortoptions, '::::', @files) || die;
|
||||||
|
my @cmd = merge(<$par>);
|
||||||
|
close $par;
|
||||||
|
# The command uses <(...) so it is incompatible with /bin/sh
|
||||||
|
open(my $bash,"|-","bash") || die;
|
||||||
|
print $bash @cmd;
|
||||||
|
close $bash;
|
||||||
|
}
|
||||||
|
|
||||||
|
sub sort_stdin {
|
||||||
|
my $numthreads = `parallel --number-of-threads`;
|
||||||
|
my @fifos = map { tmpfifo() } 1..$numthreads;
|
||||||
|
map { mkfifo($_,0600) } @fifos;
|
||||||
|
# This trick removes the fifo as soon as it is connected in the other end
|
||||||
|
# (rm fifo; ...) < fifo
|
||||||
|
my @cmd = map { "(rm $_; sort @Global::sortoptions) < $_" } @fifos;
|
||||||
|
@cmd = merge(@cmd);
|
||||||
|
if(fork) {
|
||||||
|
} else {
|
||||||
|
exec(qw(parallel -j),$numthreads,
|
||||||
|
# 1M 30M = 43s
|
||||||
|
# 3M 30M = 59s
|
||||||
|
# 300k 30M = 40-45s
|
||||||
|
# 100k 30M = 47s
|
||||||
|
# 500k 30M = 44s
|
||||||
|
# 300k 10M = 41-45s
|
||||||
|
# 256k 10M = 44s
|
||||||
|
# 300k 3M = 42-45s
|
||||||
|
# 300k - = 47s
|
||||||
|
qw(--block 256k --pipe --roundrobin mbuffer -v0 -q -m 10M > {} :::),@fifos);
|
||||||
|
}
|
||||||
|
# The command uses <(...) so it is incompatible with /bin/sh
|
||||||
|
open(my $bash,"|-","bash") || die;
|
||||||
|
print $bash @cmd;
|
||||||
|
close $bash;
|
||||||
|
}
|
||||||
|
|
||||||
|
if(@ARGV) {
|
||||||
|
sort_files();
|
||||||
|
} else {
|
||||||
|
sort_stdin();
|
||||||
|
}
|
||||||
|
|
||||||
|
# Test
|
||||||
|
# -z
|
||||||
|
# OK: cat bigfile | parsort
|
||||||
|
# OK: parsort -k4n files*.txt
|
||||||
|
# OK: parsort files*.txt
|
||||||
|
# OK: parsort "file with space"
|
||||||
|
|
Loading…
Reference in a new issue