parcat: --rm, reading args from stdin.

This commit is contained in:
Ole Tange 2017-08-12 18:37:52 +02:00
parent bbd336643c
commit 67c4377715
9 changed files with 154 additions and 54 deletions

View file

@ -1,3 +1,6 @@
People who have helped GNU Parallel different ways.
John Rusnak: Feedback on all documentation.
FrithMartin: Bug patch for orphan blocks.
Rasmus Villemoes: Code snips for signal processing.
Martin d'Anjou: Code snips for signal processing.

View file

@ -119,4 +119,4 @@ monitorman:
# If man page changed: open new pdfman
inotifywait -qmre MOVED_TO -e CLOSE_WRITE --format %w%f . | parallel -uj1 'echo {=/\.pod$$/ or skip()=};make -j && sudo make install; pdfman {/.} &'
EXTRA_DIST = CITATION CREDITS
EXTRA_DIST = CITATION CREDITS cc-by-sa.txt fdl.txt

View file

@ -275,7 +275,7 @@ top_build_prefix = @top_build_prefix@
top_builddir = @top_builddir@
top_srcdir = @top_srcdir@
SUBDIRS = src
EXTRA_DIST = CITATION CREDITS
EXTRA_DIST = CITATION CREDITS cc-by-sa.txt fdl.txt
all: config.h
$(MAKE) $(AM_MAKEFLAGS) all-recursive

View file

@ -195,6 +195,8 @@ file:///home/tange/privat/parallel/doc/release_new_version
from:tange@gnu.org
to:parallel@gnu.org, bug-parallel@gnu.org
stable-bcc: Jesse Alama <jessealama@fastmail.fm>
Subject: GNU Parallel 20170822 ('<<>>') released <<[stable]>>
@ -214,10 +216,24 @@ New in this release:
http://meta.askubuntu.com/a/16750/22307
http://meta.serverfault.com/a/9040/45704
* GNU Parallel was cited in:
* GNU Parallel was cited in: https://springerplus.springeropen.com/articles/10.1186/s40064-016-2022-y
https://dzone.com/articles/running-bash-commands-in-parallel-1
* https://medium.com/@nornagon/today-i-learned-gnu-parallel-plate-tectonics-9fcf24045e63
* https://www.upf.edu/web/sct-sit/gnu-parallel-tutorial
http://blogs.fluidinfo.com/terry/2017/08/05/do-stuff-on-things-in-parallel/
http://ino.pm/outreach/presentations/2014/03/genomics-wranglers/index.html#/5
http://www.ettemalab.org/using-for-loop-vs-gnu-parallel-for-blast/
https://medium.com/@nornagon/today-i-learned-gnu-parallel-plate-tectonics-9fcf24045e63
https://gxnotes.com/article/175866.html
<<Citation not OK: BAMClipper: removing primers from alignments to minimize false-negative mutations in amplicon next-generation sequencing https://www.nature.com/articles/s41598-017-01703-6>>
<<Wrong citation https://iris.sissa.it/retrieve/handle/20.500.11767/36149/10823/And%C3%B2_tesi.pdf>>

View file

@ -16,24 +16,44 @@ my $okq = Thread::Queue->new();
my @producers;
if(not @ARGV) {
if(-t *STDIN) {
print "Usage:\n";
print " parcat file(s)\n";
print " cat argfile | parcat\n";
} else {
# Read arguments from stdin
chomp(@ARGV = <STDIN>);
}
}
my $files_to_open = 0;
# Default: fd = stdout
my $fd = 1;
for (@ARGV) {
push @producers, threads->create('producer', $_);
# --rm = remove file when opened
/^--rm$/ and do { $opt::rm = 1; next; };
# -1 = output to fd 1, -2 = output to fd 2
/^-(\d+)$/ and do { $fd = $1; next; };
push @producers, threads->create('producer', $_, $fd);
$files_to_open++;
}
sub producer {
# Open a file/fifo, set non blocking, enqueue fileno of the file handle
my $file = shift;
my $output_fd = shift;
open(my $fh, "<", $file) || do {
print STDERR "parcat: Cannot open $file\n";
exit(1);
};
# Remove file when it has been opened
if($opt::rm) {
unlink $file;
}
set_fh_non_blocking($fh);
$q->enqueue(fileno($fh));
$opened++;
# Pass the fileno to parent
$q->enqueue(fileno($fh),$output_fd);
# Get an OK that the $fh is opened and we can release the $fh
while(1) {
my $ok = $okq->dequeue();
@ -48,31 +68,38 @@ my $s = IO::Select->new();
my %buffer;
sub add_file {
my $fd = shift;
open(my $fh, "<&=", $fd) || die;
$s->add($fh);
my $infd = shift;
my $outfd = shift;
open(my $infh, "<&=", $infd) || die;
open(my $outfh, ">&=", $outfd) || die;
$s->add($infh);
# Tell the producer now opened here and can be released
$okq->enqueue($fd);
$okq->enqueue($infd);
# Initialize the buffer
@{$buffer{$fh}} = ();
@{$buffer{$infh}{$outfd}} = ();
$Global::fh{$outfd} = $outfh;
}
sub add_files {
# Non-blocking dequeue
while(defined(my $fd = $q->dequeue_nb())) {
add_file($fd);
my ($infd,$outfd);
do {
($infd,$outfd) = $q->dequeue_nb(2);
if(defined($outfd)) {
add_file($infd,$outfd);
}
} while(defined($outfd));
}
sub add_files_block {
# Blocking dequeue
my $fd = $q->dequeue();
add_file($fd);
my ($infd,$outfd) = $q->dequeue(2);
add_file($infd,$outfd);
}
my $fd;
my (@ready,$file,$rv,$buf);
my (@ready,$infh,$rv,$buf);
do {
# Wait until at least one file is opened
add_files_block();
@ -83,43 +110,46 @@ do {
if(not @ready) {
add_files();
}
for $file (@ready) {
$rv = sysread($file, $buf, 65536);
for $infh (@ready) {
# There is only one key, namely the output file descriptor
for my $outfd (keys %{$buffer{$infh}}) {
$rv = sysread($infh, $buf, 65536);
if (!$rv) {
if($! == EAGAIN) {
# Would block: Nothing read
next;
} else {
# Nothing read, but would not block:
# This file is done
$s->remove($file);
print @{$buffer{$file}};
delete $buffer{$file};
# Closing the $file causes it to block
# close $file;
$s->remove($infh);
syswrite($Global::fh{$outfd},"@{$buffer{$infh}{$outfd}}");
delete $buffer{$infh};
# Closing the $infh causes it to block
# close $infh;
add_files();
next;
}
}
# Find \n for full line
# Something read.
# Find \n or \r for full line
my $i = (rindex($buf,"\n")+1);
if($i) {
# Print full line
for(@{$buffer{$file}}, substr($buf,0,$i)) {
syswrite(STDOUT,$_);
for(@{$buffer{$infh}{$outfd}}, substr($buf,0,$i)) {
syswrite($Global::fh{$outfd},$_);
}
# @buffer = remaining half line
@{$buffer{$file}} = (substr($buf,$i,$rv-$i));
redo;
$buffer{$infh}{$outfd} = [substr($buf,$i,$rv-$i)];
} else {
# Something read, but not a full line
push @{$buffer{$file}}, $buf;
push @{$buffer{$infh}{$outfd}}, $buf;
}
redo;
}
}
}
}
} while($opened <= $#ARGV);
} while($opened < $files_to_open);
for (@producers) {
@ -139,4 +169,3 @@ sub set_fh_non_blocking {
$flags |= &O_NONBLOCK; # Add non-blocking to the flags
fcntl($fh, &F_SETFL, $flags) || die $!; # Set the flags on the filehandle
}

View file

@ -19,15 +19,20 @@ you use:
=head1 EXAMPLES
=head2 Do be done
=head2 Simple line buffered output
mkfifo slot-{1..5}-digit-{0..9}
parallel -j5 'seq 100000 | grep {} > slot-{%}-digit-{}' ::: {0..9} &
parallel parcat slot-{1..5}-digit-{} '>' digit-{} ::: {0..9}
GNU Parallel saves output to tempfiles. If the amount of data is
bigger than the free disk space, then you can use this technique to do
line buffering without saving to disk:
mkfifo slot-{1..5}
seq 10000000 | parallel -j5 --round --pipe 'cat > slot-{%}' &
parcat slot-{1..5} | wc
=head1 REPORTING BUGS
GNU B<parcat> is part of GNU B<parallel>. Report bugs to <bug-parallel@gnu.org>.
GNU B<parcat> is part of GNU B<parallel>. Report bugs to
<bug-parallel@gnu.org>.
=head1 AUTHOR

View file

@ -728,6 +728,25 @@ par_X_eta_div_zero() {
perl -pe 's/\d+/0/g'
}
par_parcat_args_stdin() {
echo 'bug #51690: parcat: read args from stdin'
tmp1=$(tempfile)
tmp2=$(tempfile)
echo OK1 > $tmp1
echo OK2 > $tmp2
(echo $tmp1
echo $tmp2) | parcat
rm $tmp1 $tmp2
}
par_parcat_rm() {
echo 'bug #51691: parcat --rm remove fifo when opened'
tmp1=$(tempfile)
echo OK1 > $tmp1
parcat --rm $tmp1
rm $tmp1 2>/dev/null || echo OK file removed
}
export -f $(compgen -A function | grep par_)
compgen -A function | grep par_ | sort |
parallel -j6 --tag -k --joblog +/tmp/jl-`basename $0` '{} 2>&1'

View file

@ -152,5 +152,28 @@ newlines"' ::: a b c d e | sort
) | perl -pe 's/\0/<null>/g;s/\d+/./g'
}
par_parcat_mixing() {
echo 'parcat output should mix: a b a b'
mktmpfifo() {
tmp=$(tempfile)
rm $tmp
mkfifo $tmp
echo $tmp
}
slow_output() {
string=$1
perl -e 'print "'$string'"x9000,"start\n"'
sleep 1
perl -e 'print "'$string'"x9000,"end\n"'
}
tmp1=$(mktmpfifo)
tmp2=$(mktmpfifo)
slow_output a > $tmp1 &
sleep 0.5
slow_output b > $tmp2 &
parcat $tmp1 $tmp2 | tr -s ab
}
export -f $(compgen -A function | grep par_)
compgen -A function | grep par_ | sort | parallel -j6 --tag -k '{} 2>&1'

View file

@ -52,6 +52,11 @@ par_multiline_commands finish 2
par_multiline_commands parallel: Warning: Command lines contain newline. Forcing --null.
par_multiline_commands 3
par_multiline_commands finish 3
par_parcat_mixing parcat output should mix: a b a b
par_parcat_mixing astart
par_parcat_mixing bstart
par_parcat_mixing bend
par_parcat_mixing aend
par_pipepart_block ### --pipepart --block -# (# < 0)
par_pipepart_block 1
par_pipepart_block 2