From fc350ac6cc6cbf82f8dbd4e5afe9ee16a9a21f26 Mon Sep 17 00:00:00 2001 From: Ole Tange Date: Mon, 28 Oct 2013 00:44:47 +0100 Subject: [PATCH] transpose-par.pl: Parallelized transpose with paste. --- transpose/transpose-par.pl | 37 ++++++++ transpose/transpose.pod | 175 +++++++++++++++++++++++++++++++++++++ transpose/transposewrap.pl | 81 +++++++++++++++++ 3 files changed, 293 insertions(+) create mode 100755 transpose/transpose-par.pl create mode 100644 transpose/transpose.pod create mode 100755 transpose/transposewrap.pl diff --git a/transpose/transpose-par.pl b/transpose/transpose-par.pl new file mode 100755 index 0000000..8102969 --- /dev/null +++ b/transpose/transpose-par.pl @@ -0,0 +1,37 @@ +#!/usr/bin/perl + +#!/usr/local/bin/parallel --shebang-wrap --pipe --block 10m -k --files /usr/bin/perl | xargs paste + +use Text::CSV; +use File::Temp qw(tempfile tempdir); + +my $csv; +my (@table); +my $first_line = 1; +my $col = 0; +while(my $l = <>) { + if($first_line) { + my $csv_setting = guess_csv_setting($l); + $csv = Text::CSV->new($csv_setting) + or die "Cannot use CSV: ".Text::CSV->error_diag (); + $first_line = 0; + } + if(not $csv->parse($l)) { + die "CSV has unexpected format"; + } + # append to each row + my $row = 0; + + for($csv->fields()) { + $table[$row][$col] = defined($_) ? $_ : ''; + $row++; + } + $col++; +} + +print map { join("\t",@$_),"\n" } @table; + +sub guess_csv_setting { + # Based on a single line guess the csv_setting + return { binary => 1 }; +} diff --git a/transpose/transpose.pod b/transpose/transpose.pod new file mode 100644 index 0000000..100c46c --- /dev/null +++ b/transpose/transpose.pod @@ -0,0 +1,175 @@ +#!/usr/bin/perl + +=head1 NAME + +transpose - transpose CSV file + +=head1 SYNOPSIS + +B [-d I] [I] + +=head1 DESCRIPTION + +B will read a CSV fie + +=head1 OPTIONS + +=over 9 + +=item I + +Input CSV file. If none is given reads from STDIN (standard input). + + +=item B<-d> I - not implemented + +Use I as delimiter in input and output. + + +=back + + +=head1 EXAMPLES + +=head2 EXAMPLE: Transpose a big CSV file + + cat medium.csv | transpose > muidem.csv + +=head1 DESIGN + +B is designed to deal efficiently with medium sized data +(up to 30 TB per file) on systems with 250 MB RAM per CPU core. It +works by chopping the input into 30 MB blocks. Each block is +transposed in parallel and saved to disk. Then these files are pasted +together and finally removed. + +=head1 REPORTING BUGS + +Report bugs to . + + +=head1 AUTHOR + +Copyright (C) 2013 Ole Tange, http://ole.tange.dk and Free +Software Foundation, Inc. + + +=head1 LICENSE + +Copyright (C) 2013 Free Software Foundation, Inc. + +This program is free software; you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation; either version 3 of the License, or +at your option any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see . + +=head2 Documentation license I + +Permission is granted to copy, distribute and/or modify this documentation +under the terms of the GNU Free Documentation License, Version 1.3 or +any later version published by the Free Software Foundation; with no +Invariant Sections, with no Front-Cover Texts, and with no Back-Cover +Texts. A copy of the license is included in the file fdl.txt. + +=head2 Documentation license II + +You are free: + +=over 9 + +=item B + +to copy, distribute and transmit the work + +=item B + +to adapt the work + +=back + +Under the following conditions: + +=over 9 + +=item B + +You must attribute the work in the manner specified by the author or +licensor (but not in any way that suggests that they endorse you or +your use of the work). + +=item B + +If you alter, transform, or build upon this work, you may distribute +the resulting work only under the same, similar or a compatible +license. + +=back + +With the understanding that: + +=over 9 + +=item B + +Any of the above conditions can be waived if you get permission from +the copyright holder. + +=item B + +Where the work or any of its elements is in the public domain under +applicable law, that status is in no way affected by the license. + +=item B + +In no way are any of the following rights affected by the license: + +=over 2 + +=item * + +Your fair dealing or fair use rights, or other applicable +copyright exceptions and limitations; + +=item * + +The author's moral rights; + +=item * + +Rights other persons may have either in the work itself or in +how the work is used, such as publicity or privacy rights. + +=back + +=back + +=over 9 + +=item B + +For any reuse or distribution, you must make clear to others the +license terms of this work. + +=back + +A copy of the full license is included in the file as cc-by-sa.txt. + +=head1 DEPENDENCIES + +B uses Perl, B, B and B. + + +=head1 SEE ALSO + +B(1), B(1), B(1) + +=cut + diff --git a/transpose/transposewrap.pl b/transpose/transposewrap.pl new file mode 100755 index 0000000..7055424 --- /dev/null +++ b/transpose/transposewrap.pl @@ -0,0 +1,81 @@ +#!/usr/bin/perl + + +use File::Temp qw(tempfile tempdir); + +#$Global::debug = 1; +my $block = "30m"; +debug("parallel --pipe --block $block -k --files -j150% transpose-par.pl\n"); +my @files = `parallel --pipe --block $block -k --files -j150% transpose-par.pl`; +chomp(@files); +my $tmp = File::Temp::tempdir(CLEANUP => 0); +my $fifo = "$tmp/0000000"; +my $cmd = "mkfifo $fifo; paste > $fifo "; +my (@fifos, @args); +my $args_len = 0; +my $max_line_length_allowed = `parallel --max-line-length-allowed`; + +while(@files) { + push @args, shift @files; + $args_len += length $args[$#args] + 1; + if(length $cmd + $args_len > $max_line_length_allowed) { + unshift @files, pop @args; + push @fifos, $fifo; + if(fork()) { + } else { + debug("($cmd @args &)\n"); + `($cmd @args &)`; + exit($?); + } + $fifo++; + $cmd = "mkfifo $fifo; paste > $fifo "; + @args = (); + $args_len = 0; + } +} + +if(@args) { + push @fifos, $fifo; + if(fork()) { + } else { + debug("($cmd @args &)\n"); + `($cmd @args &)`; + exit($?); + } +} + +# make sure all fifos are created by the spawned shells +my @non_existing_fifos = @fifos; +while(@non_existing_fifos) { + if(not -e $non_existing_fifos[0]) { + usleep(1); + } else { + shift @non_existing_fifos; + } +} + +debug("paste @fifos\n"); +system("paste @fifos"); + +unlink(@fifos); +rmdir($tmp); + +sub usleep { + # Sleep this many milliseconds. + my $secs = shift; + ::debug(int($secs),"ms "); + select(undef, undef, undef, $secs/1000); +} + +sub debug { + # Returns: N/A + $Global::debug or return; + @_ = grep { defined $_ ? $_ : "" } @_; + if($Global::fd{1}) { + # Original stdout was saved + my $stdout = $Global::fd{1}; + print $stdout @_; + } else { + print @_; + } +}