#!/usr/bin/perl # SPDX-FileCopyrightText: 2021-2022 Ole Tange, http://ole.tange.dk and Free Software and Foundation, Inc. # SPDX-License-Identifier: GPL-3.0-or-later =pod =head1 NAME parsort - Sort (big files) in parallel =head1 SYNOPSIS B I =head1 DESCRIPTION B uses GNU B to sort in parallel. It works just like B but faster on inputs with more than 1 M lines, if you have a multicore machine. Hopefully these ideas will make it into GNU B in the future. =head1 EXAMPLE Sort files: parsort *.txt > sorted.txt Sort stdin (standard input) numerically: cat numbers | parsort -n > sorted.txt =head1 PERFORMANCE B is faster on a file than on stdin (standard input), because different parts of a file can be read in parallel. On a 48 core machine you should see a speedup of 3x over B. =head1 AUTHOR Copyright (C) 2020-2022 Ole Tange, http://ole.tange.dk and Free Software Foundation, Inc. =head1 LICENSE Copyright (C) 2012 Free Software Foundation, Inc. This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation; either version 3 of the License, or at your option any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program. If not, see . =head1 DEPENDENCIES B uses B, B, and B. =head1 SEE ALSO B =cut use strict; use Getopt::Long; use POSIX qw(mkfifo); Getopt::Long::Configure("bundling","require_order"); my @ARGV_before = @ARGV; GetOptions( "debug|D" => \$opt::D, "version" => \$opt::version, "verbose|v" => \$opt::verbose, "b|ignore-leading-blanks" => \$opt::ignore_leading_blanks, "d|dictionary-order" => \$opt::dictionary_order, "f|ignore-case" => \$opt::ignore_case, "g|general-numeric-sort" => \$opt::general_numeric_sort, "i|ignore-nonprinting" => \$opt::ignore_nonprinting, "M|month-sort" => \$opt::month_sort, "h|human-numeric-sort" => \$opt::human_numeric_sort, "n|numeric-sort" => \$opt::numeric_sort, "N|numascii" => \$opt::numascii, "r|reverse" => \$opt::reverse, "R|random-sort" => \$opt::random_sort, "sort=s" => \$opt::sort, "V|version-sort" => \$opt::version_sort, "k|key=s" => \@opt::key, "t|field-separator=s" => \$opt::field_separator, "z|zero-terminated" => \$opt::zero_terminated, "files0-from=s" => \$opt::files0_from, "random-source=s" => \$opt::dummy, "batch-size=s" => \$opt::dummy, "check=s" => \$opt::dummy, "c" => \$opt::dummy, "C" => \$opt::dummy, "compress-program=s" => \$opt::dummy, "T|temporary-directory=s" => \$opt::dummy, "parallel=s" => \$opt::dummy, "u|unique" => \$opt::dummy, "S|buffer-size=s" => \$opt::dummy, "s|stable" => \$opt::dummy, "help" => \$opt::dummy, ) || exit(255); $Global::progname = ($0 =~ m:(^|/)([^/]+)$:)[1]; $Global::version = 20221222; if($opt::version) { version(); exit 0; } @Global::sortoptions = grep { ! /^-D$/ } shell_quote(@ARGV_before[0..($#ARGV_before-$#ARGV-1)]); $ENV{'TMPDIR'} ||= "/tmp"; sub merge { # Input: # @cmd = commands to 'cat' (part of) a file # 'cat a' 'cat b' 'cat c' => # sort -m <(sort -m <(cat a) <(cat b)) <(sort -m <(cat c)) my @cmd = @_; chomp(@cmd); while($#cmd > 0) { my @tmp; while($#cmd >= 0) { my $a = shift @cmd; my $b = shift @cmd; $a &&= "<($a)"; $b &&= "<($b)"; # This looks like useless use of 'cat', but contrary to # naive belief it increases performance dramatically. push @tmp, "sort -m @Global::sortoptions $a $b | cat" } @cmd = @tmp; } return @cmd; } sub sort_files { # Input is files my @files = @_; # Let GNU Parallel generate the commands to read parts of files # The commands split at \n (or \0) # and there will be at least one for each CPU thread my @subopt = $opt::zero_terminated ? qw(--recend "\0") : (); open(my $par,"-|",qw(parallel), @subopt, qw(--pipepart --block -1 --dryrun -vv sort), @Global::sortoptions, '::::', @files) || die; my @cmd = merge(<$par>); close $par; debug(@cmd); # The command uses <(...) so it is incompatible with /bin/sh open(my $bash,"|-","bash") || die; print $bash @cmd; close $bash; } sub sort_stdin { # Input is stdin # Spread the input between n processes that each sort # n = number of CPU threads my $numthreads = `parallel --number-of-threads`; my @fifos = map { tmpfifo() } 1..$numthreads; map { mkfifo($_,0600) } @fifos; # This trick removes the fifo as soon as it is connected in the other end # (rm fifo; ...) < fifo my @cmd = (map { "(rm $_; sort @Global::sortoptions) < $_" } map { Q($_) } @fifos); @cmd = merge(@cmd); if(fork) { } else { my @subopt = $opt::zero_terminated ? qw(--recend "\0") : (); exec(qw(parallel -j), $numthreads, @subopt, # 286k is the best mean value after testing 250..350 qw(--block 286k --pipe --roundrobin cat > {} :::),@fifos); } # The command uses <(...) so it is incompatible with /bin/sh open(my $bash,"|-","bash") || die; print $bash @cmd; close $bash; } sub tmpname { # Select a name that does not exist # Do not create the file as it may be used for creating a socket (by tmux) # Remember the name in $Global::unlink to avoid hitting the same name twice my $name = shift; my($tmpname); if(not -w $ENV{'TMPDIR'}) { if(not -e $ENV{'TMPDIR'}) { ::error("Tmpdir '$ENV{'TMPDIR'}' does not exist.","Try 'mkdir ". Q($ENV{'TMPDIR'})."'"); } else { ::error("Tmpdir '$ENV{'TMPDIR'}' is not writable.","Try 'chmod +w ". Q($ENV{'TMPDIR'})."'"); } exit(255); } do { $tmpname = $ENV{'TMPDIR'}."/".$name. join"", map { (0..9,"a".."z","A".."Z")[rand(62)] } (1..5); } while(-e $tmpname); return $tmpname; } sub tmpfifo { # Find an unused name and mkfifo on it my $tmpfifo = tmpname("psort"); mkfifo($tmpfifo,0600); return $tmpfifo; } sub debug { # Returns: N/A $opt::D or return; @_ = grep { defined $_ ? $_ : "" } @_; print STDERR @_[1..$#_]; } sub version() { # Returns: N/A print join ("\n", "GNU $Global::progname $Global::version", "Copyright (C) 2020-2022 Ole Tange, http://ole.tange.dk and Free Software", "Foundation, Inc.", "License GPLv3+: GNU GPL version 3 or later ", "This is free software: you are free to change and redistribute it.", "GNU $Global::progname comes with no warranty.", "", "Web site: https://www.gnu.org/software/parallel\n", ); } sub shell_quote(@) { # Input: # @strings = strings to be quoted # Returns: # @shell_quoted_strings = string quoted as needed by the shell return wantarray ? (map { Q($_) } @_) : (join" ",map { Q($_) } @_); } sub shell_quote_scalar_rc($) { # Quote for the rc-shell my $a = $_[0]; if(defined $a) { if(($a =~ s/'/''/g) + ($a =~ s/[\n\002-\011\013-\032\\\#\?\`\(\)\{\}\[\]\^\*\<\=\>\~\|\; \"\!\$\&\'\202-\377]+/'$&'/go)) { # A string was replaced # No need to test for "" or \0 } elsif($a eq "") { $a = "''"; } elsif($a eq "\0") { $a = ""; } } return $a; } sub shell_quote_scalar_csh($) { # Quote for (t)csh my $a = $_[0]; if(defined $a) { # $a =~ s/([\002-\011\013-\032\\\#\?\`\(\)\{\}\[\]\^\*\>\<\~\|\; \"\!\$\&\'\202-\377])/\\$1/g; # This is 1% faster than the above if(($a =~ s/[\002-\011\013-\032\\\#\?\`\(\)\{\}\[\]\^\*\<\=\>\~\|\; \"\!\$\&\'\202-\377]/\\$&/go) + # quote newline in csh as \\\n ($a =~ s/[\n]/"\\\n"/go)) { # A string was replaced # No need to test for "" or \0 } elsif($a eq "") { $a = "''"; } elsif($a eq "\0") { $a = ""; } } return $a; } sub shell_quote_scalar_default($) { # Quote for other shells (Bourne compatibles) # Inputs: # $string = string to be quoted # Returns: # $shell_quoted = string quoted as needed by the shell my $s = $_[0]; if($s =~ /[^-_.+a-z0-9\/]/i) { $s =~ s/'/'"'"'/g; # "-quote single quotes $s = "'$s'"; # '-quote entire string $s =~ s/^''//; # Remove unneeded '' at ends $s =~ s/''$//; # (faster than s/^''|''$//g) return $s; } elsif ($s eq "") { return "''"; } else { # No quoting needed return $s; } } sub shell_quote_scalar($) { # Quote the string so the shell will not expand any special chars # Inputs: # $string = string to be quoted # Returns: # $shell_quoted = string quoted as needed by the shell # Speed optimization: Choose the correct shell_quote_scalar_* # and call that directly from now on no warnings 'redefine'; if($Global::cshell) { # (t)csh *shell_quote_scalar = \&shell_quote_scalar_csh; } elsif($Global::shell =~ m:(^|/)rc$:) { # rc-shell *shell_quote_scalar = \&shell_quote_scalar_rc; } else { # other shells *shell_quote_scalar = \&shell_quote_scalar_default; } # The sub is now redefined. Call it return shell_quote_scalar($_[0]); } sub Q($) { # Q alias for ::shell_quote_scalar my $ret = shell_quote_scalar($_[0]); no warnings 'redefine'; *Q = \&::shell_quote_scalar; return $ret; } sub status(@) { my @w = @_; my $fh = $Global::status_fd || *STDERR; print $fh map { ($_, "\n") } @w; flush $fh; } sub status_no_nl(@) { my @w = @_; my $fh = $Global::status_fd || *STDERR; print $fh @w; flush $fh; } sub warning(@) { my @w = @_; my $prog = $Global::progname || "parsort"; status_no_nl(map { ($prog, ": Warning: ", $_, "\n"); } @w); } { my %warnings; sub warning_once(@) { my @w = @_; my $prog = $Global::progname || "parsort"; $warnings{@w}++ or status_no_nl(map { ($prog, ": Warning: ", $_, "\n"); } @w); } } sub error(@) { my @w = @_; my $prog = $Global::progname || "parsort"; status(map { ($prog.": Error: ". $_); } @w); } sub die_bug($) { my $bugid = shift; print STDERR ("$Global::progname: This should not happen. You have found a bug. ", "Please follow\n", "https://www.gnu.org/software/parallel/man.html#REPORTING-BUGS\n", "\n", "Include this in the report:\n", "* The version number: $Global::version\n", "* The bugid: $bugid\n", "* The command line being run\n", "* The files being read (put the files on a webserver if they are big)\n", "\n", "If you get the error on smaller/fewer files, please include those instead.\n"); exit(255); } if(@ARGV) { sort_files(@ARGV); } elsif(length $opt::files0_from) { $/="\0"; open(my $fh,"<",$opt::files0_from) || die; my @files = <$fh>; chomp(@files); sort_files(@files); } else { sort_stdin(); } # Test # -z # OK: cat bigfile | parsort # OK: parsort -k4n files*.txt # OK: parsort files*.txt # OK: parsort "file with space"