parallel/src/parcat

143 lines
2.9 KiB
Plaintext
Raw Normal View History

2016-09-04 08:42:04 +00:00
#!/usr/bin/perl
use Symbol qw(gensym);
use IPC::Open3;
use POSIX qw(:errno_h);
use IO::Select;
use strict;
use threads;
use threads::shared;
use Thread::Queue;
my $opened :shared;
my $q = Thread::Queue->new();
my $okq = Thread::Queue->new();
my @producers;
if(not @ARGV) {
print "Usage:\n";
print " parcat file(s)\n";
}
for (@ARGV) {
push @producers, threads->create('producer', $_);
}
sub producer {
# Open a file/fifo, set non blocking, enqueue fileno of the file handle
my $file = shift;
open(my $fh, "<", $file) || do {
print STDERR "parcat: Cannot open $file\n";
exit(1);
};
set_fh_non_blocking($fh);
$q->enqueue(fileno($fh));
$opened++;
# Get an OK that the $fh is opened and we can release the $fh
while(1) {
my $ok = $okq->dequeue();
if($ok == fileno($fh)) { last; }
# Not ours - very unlikely to happen
$okq->enqueue($ok);
}
return;
}
my $s = IO::Select->new();
my %buffer;
sub add_file {
my $fd = shift;
open(my $fh, "<&=", $fd) || die;
$s->add($fh);
# Tell the producer now opened here and can be released
$okq->enqueue($fd);
# Initialize the buffer
@{$buffer{$fh}} = ();
}
sub add_files {
# Non-blocking dequeue
while(defined(my $fd = $q->dequeue_nb())) {
add_file($fd);
}
}
sub add_files_block {
# Blocking dequeue
my $fd = $q->dequeue();
add_file($fd);
}
my $fd;
my (@ready,$file,$rv,$buf);
do {
# Wait until at least one file is opened
add_files_block();
while($q->pending or keys %buffer) {
2016-09-04 08:42:04 +00:00
add_files();
while(keys %buffer) {
2016-09-04 08:42:04 +00:00
@ready = $s->can_read(0.01);
if(not @ready) {
add_files();
}
for $file (@ready) {
$rv = sysread($file, $buf, 65536);
if (!$rv) {
if($! == EAGAIN) {
# Would block: Nothing read
next;
} else {
# This file is done
$s->remove($file);
print @{$buffer{$file}};
delete $buffer{$file};
# Closing the $file causes it to block
# close $file;
add_files();
next;
}
}
# Find \n for full line
my $i = (rindex($buf,"\n")+1);
if($i) {
# Print full line
for(@{$buffer{$file}}, substr($buf,0,$i)) {
syswrite(STDOUT,$_);
}
# @buffer = remaining half line
@{$buffer{$file}} = (substr($buf,$i,$rv-$i));
redo;
} else {
# Something read, but not a full line
push @{$buffer{$file}}, $buf;
redo;
}
}
}
}
} while($opened <= $#ARGV);
for (@producers) {
$_->join();
}
sub set_fh_non_blocking {
# Set filehandle as non-blocking
# Inputs:
# $fh = filehandle to be blocking
# Returns:
# N/A
my $fh = shift;
$Global::use{"Fcntl"} ||= eval "use Fcntl qw(:DEFAULT :flock); 1;";
my $flags;
fcntl($fh, &F_GETFL, $flags) || die $!; # Get the current flags on the filehandle
$flags |= &O_NONBLOCK; # Add non-blocking to the flags
fcntl($fh, &F_SETFL, $flags) || die $!; # Set the flags on the filehandle
}