#!/usr/bin/perl =head1 NAME parcat - cat files or fifos in parallel =head1 SYNOPSIS B file(s) =head1 DESCRIPTION GNU B reads files or fifos in parallel. It writes full lines so there will be no problem with mixed-half-lines which you risk if you use: (cat file1 & cat file2 &) | ... =head1 EXAMPLES =head2 Do be done mkfifo slot-{1..5}-digit-{0..9} parallel -j5 'seq 100000 | grep {} > slot-{%}-digit-{}' ::: {0..9} & parallel parcat slot-{1..5}-digit-{} '>' digit-{} ::: {0..9} =head1 REPORTING BUGS GNU B is part of GNU B. Report bugs to . =head1 AUTHOR When using GNU B for a publication please cite: O. Tange (2011): GNU SQL - A Command Line Tool for Accessing Different Databases Using DBURLs, ;login: The USENIX Magazine, April 2011:29-32. Copyright (C) 2008,2009,2010 Ole Tange http://ole.tange.dk Copyright (C) 2010,2011 Ole Tange, http://ole.tange.dk and Free Software Foundation, Inc. =head1 LICENSE Copyright (C) 2007,2008,2009,2010,2011 Free Software Foundation, Inc. This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation; either version 3 of the License, or at your option any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program. If not, see . =head2 Documentation license I Permission is granted to copy, distribute and/or modify this documentation under the terms of the GNU Free Documentation License, Version 1.3 or any later version published by the Free Software Foundation; with no Invariant Sections, with no Front-Cover Texts, and with no Back-Cover Texts. A copy of the license is included in the file fdl.txt. =head2 Documentation license II You are free: =over 9 =item B to copy, distribute and transmit the work =item B to adapt the work =back Under the following conditions: =over 9 =item B You must attribute the work in the manner specified by the author or licensor (but not in any way that suggests that they endorse you or your use of the work). =item B If you alter, transform, or build upon this work, you may distribute the resulting work only under the same, similar or a compatible license. =back With the understanding that: =over 9 =item B Any of the above conditions can be waived if you get permission from the copyright holder. =item B Where the work or any of its elements is in the public domain under applicable law, that status is in no way affected by the license. =item B In no way are any of the following rights affected by the license: =over 9 =item * Your fair dealing or fair use rights, or other applicable copyright exceptions and limitations; =item * The author's moral rights; =item * Rights other persons may have either in the work itself or in how the work is used, such as publicity or privacy rights. =back =item B For any reuse or distribution, you must make clear to others the license terms of this work. =back A copy of the full license is included in the file as cc-by-sa.txt. =head1 DEPENDENCIES GNU B uses Perl. =head1 SEE ALSO B(1), B(1) =cut use Symbol qw(gensym); use IPC::Open3; use POSIX qw(:errno_h); use IO::Select; use strict; use threads; use threads::shared; use Thread::Queue; my $opened :shared; my $q = Thread::Queue->new(); my $okq = Thread::Queue->new(); my @producers; if(not @ARGV) { print "Usage:\n"; print " parcat file(s)\n"; } for (@ARGV) { push @producers, threads->create('producer', $_); } sub producer { # Open a file/fifo, set non blocking, enqueue fileno of the file handle my $file = shift; open(my $fh, "<", $file) || do { print STDERR "parcat: Cannot open $file\n"; exit(1); }; set_fh_non_blocking($fh); $q->enqueue(fileno($fh)); $opened++; # Get an OK that the $fh is opened and we can release the $fh while(1) { my $ok = $okq->dequeue(); if($ok == fileno($fh)) { last; } # Not ours - very unlikely to happen $okq->enqueue($ok); } return; } my $s = IO::Select->new(); my %fhr; my %buffer; sub add_file { my $fd = shift; open(my $fh, "<&=", $fd) || die; $s->add($fh); $fhr{$fh}++; # Tell the producer now opened here and can be released $okq->enqueue($fd); # Initialize the buffer @{$buffer{$fh}} = (); } sub add_files { # Non-blocking dequeue while(defined(my $fd = $q->dequeue_nb())) { add_file($fd); } } sub add_files_block { # Blocking dequeue my $fd = $q->dequeue(); add_file($fd); } my $fd; my (@ready,$file,$rv,$buf); do { # Wait until at least one file is opened add_files_block(); while($q->pending or keys %fhr) { add_files(); while(keys %fhr) { @ready = $s->can_read(0.01); if(not @ready) { add_files(); } for $file (@ready) { $rv = sysread($file, $buf, 65536); if (!$rv) { if($! == EAGAIN) { # Would block: Nothing read next; } else { # This file is done $s->remove($file); delete $fhr{$file}; print @{$buffer{$file}}; delete $buffer{$file}; # Closing the $file causes it to block # close $file; add_files(); next; } } # Find \n for full line my $i = (rindex($buf,"\n")+1); if($i) { # Print full line for(@{$buffer{$file}}, substr($buf,0,$i)) { syswrite(STDOUT,$_); } # @buffer = remaining half line @{$buffer{$file}} = (substr($buf,$i,$rv-$i)); redo; } else { # Something read, but not a full line push @{$buffer{$file}}, $buf; redo; } } } } } while($opened <= $#ARGV); for (@producers) { $_->join(); } sub set_fh_non_blocking { # Set filehandle as non-blocking # Inputs: # $fh = filehandle to be blocking # Returns: # N/A my $fh = shift; $Global::use{"Fcntl"} ||= eval "use Fcntl qw(:DEFAULT :flock); 1;"; my $flags; fcntl($fh, &F_GETFL, $flags) || die $!; # Get the current flags on the filehandle $flags |= &O_NONBLOCK; # Add non-blocking to the flags fcntl($fh, &F_SETFL, $flags) || die $!; # Set the flags on the filehandle }