parallel: --skip-first-line for --pipe(part).

This commit is contained in:
Ole Tange 2021-10-09 22:29:36 +02:00
parent 072897c567
commit ccc54495bd
5 changed files with 74 additions and 17 deletions

View file

@ -74,7 +74,8 @@ run() {
# OK # OK
return 0 return 0
else else
keyservers="pgp.surf.nl keyservers="keyserver.ubuntu.com
pgp.surf.nl
keyserver.bazon.ru keyserver.bazon.ru
agora.cenditel.gob.ve agora.cenditel.gob.ve
pgp.benny-baumann.de" pgp.benny-baumann.de"

View file

@ -4,6 +4,9 @@
Quote of the month: Quote of the month:
GNU Parallelめっちゃ便利で偉い
-- аiгbus @airbus_P
I really liked GNU Parallel http://gnu.org/software/parallel/ I really liked GNU Parallel http://gnu.org/software/parallel/
one of the best tool to execute parallel jobs in the shell one of the best tool to execute parallel jobs in the shell
-- Luca Molteni @volothamp@twitter -- Luca Molteni @volothamp@twitter

View file

@ -566,14 +566,26 @@ sub pipe_part_files(@) {
"$file is not a seekable file."); "$file is not a seekable file.");
::wait_and_exit(255); ::wait_and_exit(255);
} }
my $header = find_header(\$buf,open_or_exit($file));
my $fh = open_or_exit($file);
my $firstlinelen = 0;
if($opt::skip_first_line) {
my $newline;
# Read a full line one byte at a time
while($firstlinelen += sysread($fh,$newline,1,0)) {
$newline eq "\n" and last;
}
}
my $header = find_header(\$buf,$fh);
# find positions # find positions
my @pos = find_split_positions($file,int($Global::blocksize),$header); my @pos = find_split_positions($file,int($Global::blocksize),
$header,$firstlinelen);
# Make @cat_prepends # Make @cat_prepends
my @cat_prepends = (); my @cat_prepends = ();
for(my $i=0; $i<$#pos; $i++) { for(my $i=0; $i<$#pos; $i++) {
push(@cat_prepends, push(@cat_prepends,
cat_partial($file, 0, length($header), $pos[$i], $pos[$i+1])); cat_partial($file, $firstlinelen, $firstlinelen+length($header),
$pos[$i], $pos[$i+1]));
} }
return @cat_prepends; return @cat_prepends;
} }
@ -618,8 +630,8 @@ sub find_split_positions($$$) {
# $opt::recend # $opt::recend
# Returns: # Returns:
# @positions of block start/end # @positions of block start/end
my($file, $block, $header) = @_; my($file, $block, $header, $firstlinelen) = @_;
my $headerlen = length $header; my $skiplen = $firstlinelen + length $header;
my $size = -s $file; my $size = -s $file;
if(-b $file) { if(-b $file) {
# $file is a blockdevice # $file is a blockdevice
@ -627,7 +639,8 @@ sub find_split_positions($$$) {
} }
$block = int $block; $block = int $block;
if($opt::groupby) { if($opt::groupby) {
return split_positions_for_group_by($file,$size,$block,$header); return split_positions_for_group_by($file,$size,$block,
$header,$firstlinelen);
} }
# The optimal dd blocksize for mint, redhat, solaris, openbsd = 2^17..2^20 # The optimal dd blocksize for mint, redhat, solaris, openbsd = 2^17..2^20
# The optimal dd blocksize for freebsd = 2^15..2^17 # The optimal dd blocksize for freebsd = 2^15..2^17
@ -637,8 +650,8 @@ sub find_split_positions($$$) {
my ($recstart,$recend) = recstartrecend(); my ($recstart,$recend) = recstartrecend();
my $recendrecstart = $recend.$recstart; my $recendrecstart = $recend.$recstart;
my $fh = ::open_or_exit($file); my $fh = ::open_or_exit($file);
push(@pos,$headerlen); push(@pos,$skiplen);
for(my $pos = $block+$headerlen; $pos < $size; $pos += $block) { for(my $pos = $block+$skiplen; $pos < $size; $pos += $block) {
my $buf; my $buf;
if($recendrecstart eq "") { if($recendrecstart eq "") {
# records ends anywhere # records ends anywhere
@ -745,14 +758,14 @@ sub split_positions_for_group_by($$$$) {
return($v,$vpos); return($v,$vpos);
} }
my ($file,$size,$block,$header) = @_; my ($file,$size,$block,$header,$firstlinelen) = @_;
my ($a,$b,$c,$apos,$bpos,$cpos); my ($a,$b,$c,$apos,$bpos,$cpos);
my @pos; my @pos;
$fh = open_or_exit($file); $fh = open_or_exit($file);
# Set $Global::group_by_column $Global::group_by_perlexpr # Set $Global::group_by_column $Global::group_by_perlexpr
group_by_loop($fh,$opt::recsep); group_by_loop($fh,$opt::recsep);
# $xpos = linestart, $x = value at $xpos, $apos < $bpos < $cpos # $xpos = linestart, $x = value at $xpos, $apos < $bpos < $cpos
$apos = length $header; $apos = $firstlinelen + length $header;
for(($a,$apos) = value_at($apos); $apos < $size;) { for(($a,$apos) = value_at($apos); $apos < $size;) {
push @pos, $apos; push @pos, $apos;
$bpos = $apos + $block; $bpos = $apos + $block;
@ -967,6 +980,13 @@ sub spreadstdin() {
my $in = *STDIN; my $in = *STDIN;
my $timeout = $Global::blocktimeout; my $timeout = $Global::blocktimeout;
if($opt::skip_first_line) {
my $newline;
# Read a full line one byte at a time
while(sysread($in,$newline,1,0)) {
$newline eq "\n" and last;
}
}
my $header = find_header(\$buf,$in); my $header = find_header(\$buf,$in);
my $anything_written; my $anything_written;
my $eof; my $eof;

View file

@ -8,6 +8,14 @@
# Each should be taking 1-3s and be possible to run in parallel # Each should be taking 1-3s and be possible to run in parallel
# I.e.: No race conditions, no logins # I.e.: No race conditions, no logins
par_skip_first_line() {
tmpdir=$(mktemp)
(echo `seq 10000`;echo MyHeader; seq 10) |
parallel -k --skip-first-line --pipe --block 10 --header '1' cat
(echo `seq 10000`;echo MyHeader; seq 10) > "$tmpdir"
parallel -k --skip-first-line --pipepart -a "$tmpdir" --block 10 --header '1' cat
}
par_long_input() { par_long_input() {
echo '### Long input lines should not fail if they are not used' echo '### Long input lines should not fail if they are not used'
longline_tsv() { longline_tsv() {

View file

@ -838,6 +838,31 @@ par_seqreplace_long_line ### Test --seqreplace and line too long
par_seqreplace_long_line 9 1 1 101 par_seqreplace_long_line 9 1 1 101
par_seqreplace_long_line 90 1 1 201 par_seqreplace_long_line 90 1 1 201
par_seqreplace_long_line 1 parallel: Error: Command line too long (309 >= 210) at input 0: 100 par_seqreplace_long_line 1 parallel: Error: Command line too long (309 >= 210) at input 0: 100
par_skip_first_line MyHeader
par_skip_first_line 1
par_skip_first_line 2
par_skip_first_line 3
par_skip_first_line 4
par_skip_first_line 5
par_skip_first_line MyHeader
par_skip_first_line 6
par_skip_first_line 7
par_skip_first_line 8
par_skip_first_line 9
par_skip_first_line MyHeader
par_skip_first_line 10
par_skip_first_line MyHeader
par_skip_first_line 1
par_skip_first_line 2
par_skip_first_line 3
par_skip_first_line 4
par_skip_first_line 5
par_skip_first_line 6
par_skip_first_line MyHeader
par_skip_first_line 7
par_skip_first_line 8
par_skip_first_line 9
par_skip_first_line 10
par_sql_colsep ### SQL should add Vn columns for --colsep par_sql_colsep ### SQL should add Vn columns for --colsep
par_sql_colsep /a/A/1/11/ par_sql_colsep /a/A/1/11/
par_sql_colsep /a/A/2/22/ par_sql_colsep /a/A/2/22/