#!/usr/bin/perl use Symbol qw(gensym); use IPC::Open3; use POSIX qw(:errno_h); use IO::Select; use strict; use threads; use threads::shared; use Thread::Queue; my $opened :shared; my $q = Thread::Queue->new(); my $okq = Thread::Queue->new(); my @producers; if(not @ARGV) { if(-t *STDIN) { print "Usage:\n"; print " parcat file(s)\n"; print " cat argfile | parcat\n"; } else { # Read arguments from stdin chomp(@ARGV = ); } } my $files_to_open = 0; # Default: fd = stdout my $fd = 1; for (@ARGV) { # --rm = remove file when opened /^--rm$/ and do { $opt::rm = 1; next; }; # -1 = output to fd 1, -2 = output to fd 2 /^-(\d+)$/ and do { $fd = $1; next; }; push @producers, threads->create('producer', $_, $fd); $files_to_open++; } sub producer { # Open a file/fifo, set non blocking, enqueue fileno of the file handle my $file = shift; my $output_fd = shift; open(my $fh, "<", $file) || do { print STDERR "parcat: Cannot open $file\n"; exit(1); }; # Remove file when it has been opened if($opt::rm) { unlink $file; } set_fh_non_blocking($fh); $opened++; # Pass the fileno to parent $q->enqueue(fileno($fh),$output_fd); # Get an OK that the $fh is opened and we can release the $fh while(1) { my $ok = $okq->dequeue(); if($ok == fileno($fh)) { last; } # Not ours - very unlikely to happen $okq->enqueue($ok); } return; } my $s = IO::Select->new(); my %buffer; sub add_file { my $infd = shift; my $outfd = shift; open(my $infh, "<&=", $infd) || die; open(my $outfh, ">&=", $outfd) || die; $s->add($infh); # Tell the producer now opened here and can be released $okq->enqueue($infd); # Initialize the buffer @{$buffer{$infh}{$outfd}} = (); $Global::fh{$outfd} = $outfh; } sub add_files { # Non-blocking dequeue my ($infd,$outfd); do { ($infd,$outfd) = $q->dequeue_nb(2); if(defined($outfd)) { add_file($infd,$outfd); } } while(defined($outfd)); } sub add_files_block { # Blocking dequeue my ($infd,$outfd) = $q->dequeue(2); add_file($infd,$outfd); } my $fd; my (@ready,$infh,$rv,$buf); do { # Wait until at least one file is opened add_files_block(); while($q->pending or keys %buffer) { add_files(); while(keys %buffer) { @ready = $s->can_read(0.01); if(not @ready) { add_files(); } for $infh (@ready) { # There is only one key, namely the output file descriptor for my $outfd (keys %{$buffer{$infh}}) { $rv = sysread($infh, $buf, 65536); if (!$rv) { if($! == EAGAIN) { # Would block: Nothing read next; } else { # Nothing read, but would not block: # This file is done $s->remove($infh); syswrite($Global::fh{$outfd},"@{$buffer{$infh}{$outfd}}"); delete $buffer{$infh}; # Closing the $infh causes it to block # close $infh; add_files(); next; } } # Something read. # Find \n or \r for full line my $i = (rindex($buf,"\n")+1); if($i) { # Print full line for(@{$buffer{$infh}{$outfd}}, substr($buf,0,$i)) { syswrite($Global::fh{$outfd},$_); } # @buffer = remaining half line $buffer{$infh}{$outfd} = [substr($buf,$i,$rv-$i)]; } else { # Something read, but not a full line push @{$buffer{$infh}{$outfd}}, $buf; } redo; } } } } } while($opened < $files_to_open); for (@producers) { $_->join(); } sub set_fh_non_blocking { # Set filehandle as non-blocking # Inputs: # $fh = filehandle to be blocking # Returns: # N/A my $fh = shift; $Global::use{"Fcntl"} ||= eval "use Fcntl qw(:DEFAULT :flock); 1;"; my $flags; fcntl($fh, &F_GETFL, $flags) || die $!; # Get the current flags on the filehandle $flags |= &O_NONBLOCK; # Add non-blocking to the flags fcntl($fh, &F_SETFL, $flags) || die $!; # Set the flags on the filehandle }