mirror of
https://git.savannah.gnu.org/git/parallel.git
synced 2024-11-22 05:57:54 +00:00
174 lines
3.9 KiB
Perl
Executable file
174 lines
3.9 KiB
Perl
Executable file
#!/usr/bin/perl
|
|
|
|
use Symbol qw(gensym);
|
|
use IPC::Open3;
|
|
use POSIX qw(:errno_h);
|
|
use IO::Select;
|
|
use strict;
|
|
use threads;
|
|
use threads::shared;
|
|
use Thread::Queue;
|
|
|
|
|
|
my $opened :shared;
|
|
my $q = Thread::Queue->new();
|
|
my $okq = Thread::Queue->new();
|
|
my @producers;
|
|
|
|
if(not @ARGV) {
|
|
if(-t *STDIN) {
|
|
print "Usage:\n";
|
|
print " parcat file(s)\n";
|
|
print " cat argfile | parcat\n";
|
|
} else {
|
|
# Read arguments from stdin
|
|
chomp(@ARGV = <STDIN>);
|
|
}
|
|
}
|
|
|
|
my $files_to_open = 0;
|
|
# Default: fd = stdout
|
|
my $fd = 1;
|
|
for (@ARGV) {
|
|
# --rm = remove file when opened
|
|
/^--rm$/ and do { $opt::rm = 1; next; };
|
|
# -1 = output to fd 1, -2 = output to fd 2
|
|
/^-(\d+)$/ and do { $fd = $1; next; };
|
|
push @producers, threads->create('producer', $_, $fd);
|
|
$files_to_open++;
|
|
}
|
|
|
|
sub producer {
|
|
# Open a file/fifo, set non blocking, enqueue fileno of the file handle
|
|
my $file = shift;
|
|
my $output_fd = shift;
|
|
open(my $fh, "<", $file) || do {
|
|
print STDERR "parcat: Cannot open $file\n";
|
|
exit(1);
|
|
};
|
|
# Remove file when it has been opened
|
|
if($opt::rm) {
|
|
unlink $file;
|
|
}
|
|
set_fh_non_blocking($fh);
|
|
$opened++;
|
|
# Pass the fileno to parent
|
|
$q->enqueue(fileno($fh),$output_fd);
|
|
# Get an OK that the $fh is opened and we can release the $fh
|
|
while(1) {
|
|
my $ok = $okq->dequeue();
|
|
if($ok == fileno($fh)) { last; }
|
|
# Not ours - very unlikely to happen
|
|
$okq->enqueue($ok);
|
|
}
|
|
return;
|
|
}
|
|
|
|
my $s = IO::Select->new();
|
|
my %buffer;
|
|
|
|
sub add_file {
|
|
my $infd = shift;
|
|
my $outfd = shift;
|
|
open(my $infh, "<&=", $infd) || die;
|
|
open(my $outfh, ">&=", $outfd) || die;
|
|
$s->add($infh);
|
|
# Tell the producer now opened here and can be released
|
|
$okq->enqueue($infd);
|
|
# Initialize the buffer
|
|
@{$buffer{$infh}{$outfd}} = ();
|
|
$Global::fh{$outfd} = $outfh;
|
|
}
|
|
|
|
sub add_files {
|
|
# Non-blocking dequeue
|
|
my ($infd,$outfd);
|
|
do {
|
|
($infd,$outfd) = $q->dequeue_nb(2);
|
|
if(defined($outfd)) {
|
|
add_file($infd,$outfd);
|
|
}
|
|
} while(defined($outfd));
|
|
}
|
|
|
|
sub add_files_block {
|
|
# Blocking dequeue
|
|
my ($infd,$outfd) = $q->dequeue(2);
|
|
add_file($infd,$outfd);
|
|
}
|
|
|
|
|
|
my $fd;
|
|
my (@ready,$infh,$rv,$buf);
|
|
do {
|
|
# Wait until at least one file is opened
|
|
add_files_block();
|
|
while($q->pending or keys %buffer) {
|
|
add_files();
|
|
while(keys %buffer) {
|
|
@ready = $s->can_read(0.01);
|
|
if(not @ready) {
|
|
add_files();
|
|
}
|
|
for $infh (@ready) {
|
|
# There is only one key, namely the output file descriptor
|
|
for my $outfd (keys %{$buffer{$infh}}) {
|
|
$rv = sysread($infh, $buf, 65536);
|
|
if (!$rv) {
|
|
if($! == EAGAIN) {
|
|
# Would block: Nothing read
|
|
next;
|
|
} else {
|
|
# Nothing read, but would not block:
|
|
# This file is done
|
|
$s->remove($infh);
|
|
for(@{$buffer{$infh}{$outfd}}) {
|
|
syswrite($Global::fh{$outfd},$_);
|
|
}
|
|
delete $buffer{$infh};
|
|
# Closing the $infh causes it to block
|
|
# close $infh;
|
|
add_files();
|
|
next;
|
|
}
|
|
}
|
|
# Something read.
|
|
# Find \n or \r for full line
|
|
my $i = (rindex($buf,"\n")+1);
|
|
if($i) {
|
|
# Print full line
|
|
for(@{$buffer{$infh}{$outfd}}, substr($buf,0,$i)) {
|
|
syswrite($Global::fh{$outfd},$_);
|
|
}
|
|
# @buffer = remaining half line
|
|
$buffer{$infh}{$outfd} = [substr($buf,$i,$rv-$i)];
|
|
} else {
|
|
# Something read, but not a full line
|
|
push @{$buffer{$infh}{$outfd}}, $buf;
|
|
}
|
|
redo;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
} while($opened < $files_to_open);
|
|
|
|
|
|
for (@producers) {
|
|
$_->join();
|
|
}
|
|
|
|
sub set_fh_non_blocking {
|
|
# Set filehandle as non-blocking
|
|
# Inputs:
|
|
# $fh = filehandle to be blocking
|
|
# Returns:
|
|
# N/A
|
|
my $fh = shift;
|
|
$Global::use{"Fcntl"} ||= eval "use Fcntl qw(:DEFAULT :flock); 1;";
|
|
my $flags;
|
|
fcntl($fh, &F_GETFL, $flags) || die $!; # Get the current flags on the filehandle
|
|
$flags |= &O_NONBLOCK; # Add non-blocking to the flags
|
|
fcntl($fh, &F_SETFL, $flags) || die $!; # Set the flags on the filehandle
|
|
}
|