--pipepart --group-by implemented (except column names).

This commit is contained in:
Ole Tange 2019-05-20 01:05:14 +02:00
parent 324a9f3a07
commit aa62104eb5
4 changed files with 198 additions and 22 deletions

View file

@ -206,7 +206,7 @@ from:tange@gnu.org
to:parallel@gnu.org, bug-parallel@gnu.org
stable-bcc: Jesse Alama <jessealama@fastmail.fm>
Subject: GNU Parallel 20190522 ('') released <<[stable]>>
Subject: GNU Parallel 20190522 ('Akihito') released <<[stable]>>
GNU Parallel 20190522 ('') <<[stable]>> has been released. It is available for download at: http://ftpmirror.gnu.org/parallel/
@ -223,6 +223,12 @@ Quote of the month:
New in this release:
* --group-by groups lines depending on value of a column. The value can be computed.
* How to compress (bzip / gzip) a very large text quickly? https://medium.com/@gchandra/how-to-compress-bzip-gzip-a-very-large-text-quickly-27c11f4c6681
* Simple tutorial to install & use GNU Parallel https://medium.com/@gchandra/simple-tutorial-to-install-use-gnu-parallel-79251120d618
* Introducing Parallel into Shell https://petelawson.com/post/parallel-in-shell/
* Bug fixes and man page updates.

View file

@ -462,7 +462,7 @@ sub pipe_part_files(@) {
}
my $header = find_header(\$buf,open_or_exit($file));
# find positions
my @pos = find_split_positions($file,$Global::blocksize,length $header);
my @pos = find_split_positions($file,$Global::blocksize,$header);
# Make @cat_prepends
my @cat_prepends = ();
for(my $i=0; $i<$#pos; $i++) {
@ -507,19 +507,23 @@ sub find_split_positions($$$) {
# Input:
# $file = the file to read
# $block = (minimal) --block-size of each chunk
# $headerlen = length of header to be skipped
# $header = header to be skipped
# Uses:
# $opt::recstart
# $opt::recend
# Returns:
# @positions of block start/end
my($file, $block, $headerlen) = @_;
my($file, $block, $header) = @_;
my $headerlen = length $header;
my $size = -s $file;
if(-b $file) {
# $file is a blockdevice
$size = size_of_block_dev($file);
}
$block = int $block;
if($opt::groupby) {
return split_positions_for_group_by($file,$size,$block,$header);
}
# The optimal dd blocksize for mint, redhat, solaris, openbsd = 2^17..2^20
# The optimal dd blocksize for freebsd = 2^15..2^17
my $dd_block_size = 131072; # 2^17
@ -563,12 +567,126 @@ sub find_split_positions($$$) {
}
if($pos[$#pos] != $size) {
# Last splitpoint was not at end of the file: add it
push(@pos,$size);
push @pos, $size;
}
close $fh;
return @pos;
}
sub split_positions_for_group_by($$$$) {
my($fh);
sub value_at($) {
my $pos = shift;
if($pos != 0) {
seek($fh, $pos-1, 0) || die;
# Read half line
<$fh>;
}
# Read full line
my $linepos = tell($fh);
$_ = <$fh>;
if(defined $_) {
# Not end of file
my @F;
if(defined $Global::group_by_column) {
$opt::colsep ||= "\t";
@F = split /$opt::colsep/, $_;
$_ = $F[$Global::group_by_column];
}
eval $Global::group_by_perlexpr;
}
return ($_,$linepos);
}
sub binary_search_end($$$) {
my ($s,$spos,$epos) = @_;
# value_at($spos) == $s
# value_at($epos) != $s
my $posdif = $epos - $spos;
my ($v,$vpos);
while($posdif) {
($v,$vpos) = value_at($spos+$posdif);
if($v eq $s) {
$spos = $vpos;
$posdif = $epos - $spos;
} else {
$epos = $vpos;
}
$posdif = int($posdif/2);
}
return($v,$vpos);
}
sub binary_search_start($$$) {
my ($s,$spos,$epos) = @_;
# value_at($spos) != $s
# value_at($epos) == $s
my $posdif = $epos - $spos;
my ($v,$vpos);
while($posdif) {
($v,$vpos) = value_at($spos+$posdif);
if($v eq $s) {
$epos = $vpos;
} else {
$spos = $vpos;
$posdif = $epos - $spos;
}
$posdif = int($posdif/2);
}
return($v,$vpos);
}
my ($file,$size,$block,$header) = @_;
my ($a,$b,$c,$apos,$bpos,$cpos);
my @pos;
$fh = open_or_exit($file);
# Set $Global::group_by_column $Global::group_by_perlexpr
group_by_loop($opt::recsep);
# $xpos = linestart, $x = value at $xpos, $apos < $bpos < $cpos
$apos = length $header;
for(($a,$apos) = value_at($apos); $apos < $size;) {
push @pos, $apos;
$bpos = $apos + $block;
($b,$bpos) = value_at($bpos);
if(eof($fh)) {
push @pos, $size; last;
}
$cpos = $bpos + $block;
($c,$cpos) = value_at($cpos);
if($a eq $b) {
while($b eq $c) {
# Move bpos, cpos a block forward until $a == $b != $c
$bpos = $cpos;
$cpos += $block;
($c,$cpos) = value_at($cpos);
if($cpos >= $size) {
$cpos = $size;
last;
}
}
# $a == $b != $c
# Binary search for $b ending between ($bpos,$cpos)
($b,$bpos) = binary_search_end($b,$bpos,$cpos);
} else {
if($b eq $c) {
# $a != $b == $c
# Binary search for $b starting between ($apos,$bpos)
($b,$bpos) = binary_search_start($b,$apos,$bpos);
} else {
# $a != $b != $c
# Binary search for $b ending between ($bpos,$cpos)
($b,$bpos) = binary_search_end($b,$bpos,$cpos);
}
}
($a,$apos) = ($b,$bpos);
}
if($pos[$#pos] != $size) {
# Last splitpoint was not at end of the file: add it
push @pos, $size;
}
return @pos;
}
sub cat_partial($@) {
# Efficient command to copy from byte X to byte Y
# Input:
@ -637,6 +755,8 @@ sub group_by_loop($) {
}
# What is left of $groupby is $perlexpr
$perlexpr = $groupby;
$Global::group_by_perlexpr = $perlexpr;
$Global::group_by_column = $col;
my $loop = ::spacefree(0,'{
local $_=COLVALUE;
@ -1792,7 +1912,7 @@ sub check_invalid_option_combinations() {
::wait_and_exit(255);
}
if($opt::groupby) {
if(not $opt::pipe) {
if(not $opt::pipe and not $opt::pipepart) {
$opt::pipe = 1;
}
if($opt::remove_rec_sep) {
@ -1807,12 +1927,6 @@ sub check_invalid_option_combinations() {
::error("--recend is not compatible with --groupby");
::wait_and_exit(255);
}
if($opt::pipepart) {
# TODO This may be possible to do later
# Finding split points might be a bitch though
::error("--pipepart is not compatible with --groupby");
::wait_and_exit(255);
}
}
}
@ -12558,7 +12672,7 @@ sub main() {
pipe_shard_setup();
}
if($opt::groupby) {
if(not $opt::pipepart and $opt::groupby) {
group_by_stdin_filter();
}
if($opt::eta or $opt::bar or $opt::shuf or $Global::halt_pct) {

View file

@ -797,8 +797,8 @@ See also: B<--line-buffer> B<--ungroup>
=item B<--group-by> I<val> (alpha testing)
Group input by value. Combined with B<--pipe> B<--group-by> groups
lines with the same value into a record.
Group input by value. Combined with B<--pipe>/B<--pipepart>
B<--group-by> groups lines with the same value into a record.
The value can be computed from the full line or from a single column.
@ -815,6 +815,8 @@ Use the value in the column numbered.
Treat the first line as a header and use the value in the column
named.
(Not supported with B<--pipepart>).
=item Z<> perl expression
Run the perl expression and use $_ as the value.
@ -827,17 +829,19 @@ Put the value of the column put in $_, run the perl expression, and use $_ as th
Put the value of the column put in $_, run the perl expression, and use $_ as the value.
(Not supported with B<--pipepart>).
=back
Example:
UserID, Consumption
123, 1
123, 2
12-3, 1
221, 3
221, 1
2/21, 5
123, 1
123, 2
12-3, 1
221, 3
221, 1
2/21, 5
If you want to group 123, 12-3, 221, and 2/21 into 4 records and pass
one record at a time to B<wc>:
@ -861,7 +865,7 @@ UserID when grouping:
cat table.csv | parallel --pipe --colsep , --header : \
--group-by 'UserID s/\D//g' -kN1 wc
See also B<--shard>.
See also B<--shard>, B<--roundrobin>.
=item B<--help>
@ -1975,6 +1979,8 @@ impossible to track which input block corresponds to which output.
B<--roundrobin> implies B<--pipe>, except if B<--pipepart> is given.
See also B<--group-by>, B<--shard>.
=item B<--rpl> 'I<tag> I<perl expression>'
@ -2164,6 +2170,8 @@ I<shardkey> is small (<10), slower if it is big (>100).
B<--shard> requires B<--pipe> and a fixed numeric value for B<--jobs>.
See also B<--group-by>, B<--roundrobin>.
=item B<--shebang>
@ -4462,6 +4470,46 @@ files:
$ tracefile -un ./configure | tail | parallel -j0 apt-file search
=head1 SPREADING BLOCKS OF DATA
B<--round-robin>, B<--pipe-part>, B<--shard>, and B<--group-by> are
all specialized versions of B<--pipe>.
In the following I<n> is the number of jobslots given by B<--jobs>. A
record starts with B<--recstart> and ends with B<--recend>. It is
typically a full line. A chunk is a number of full records that is
approximately the size of a block. A block can contain half records, a
chunk cannot.
B<--pipe> starts one job per chunk. It reads blocks from stdin
(standard input). It finds a record end near a block border and passes
a chunk to the program.
B<--pipe-part> starts one job per chunk - just like normal
B<--pipe>. It first finds record endings near all block borders in the
file and then starts the jobs. By using B<--block -1> it will set the
block size to 1/I<n> * size-of-file. Used this way it will start I<n>
jobs in total.
B<--round-robin> starts I<n> jobs in total. It reads a block and
passes a chunk to whichever job is ready to read. It does not parse
the content except for identifying where a record ends to make sure it
only passes full records.
B<--shard> starts I<n> jobs in total. It parses each line to read the
value in the given column. Based on this value the line is passed to
one of the I<n> jobs. All lines having this value will be given to the
same jobslot.
B<--group-by> starts one job per chunk. Record borders are not given
by B<--recend>/B<--recstart>. Instead a record is defined by a number
of lines having the same value in a given column. So the value of a
given column changes at a chunk border. With B<--pipe> every line is
parsed, with B<--pipe-part> only a few lines are parsed to find the
chunk border.
B<--group-by> can be combined with B<--round-robin> or B<--pipe-part>.
=head1 QUOTING
GNU B<parallel> is very liberal in quoting. You only need to quote

View file

@ -189,5 +189,13 @@ par_test_build_and_install() {
sudo parallel mv {} {.}
}
#par_crashing() {
# echo '### bug #56322: sem crashed when running with input from seq'
# echo "### This should not fail"
# doit() { seq 100000000 |xargs -P 80 -n 1 sem true; }
# export -f doit
# parallel -j1 --timeout 100 --nice 11 doit ::: 1
#}
export -f $(compgen -A function | grep par_)
compgen -A function | grep par_ | sort | parallel -vj0 -k --tag --joblog /tmp/jl-`basename $0` '{} 2>&1'