parallel: --pipe can now only read from STDIN.

This commit is contained in:
Ole Tange 2013-08-21 17:04:33 +02:00
parent 73c7f844d7
commit 81a8a116b7
10 changed files with 66 additions and 89 deletions

View file

@ -209,12 +209,12 @@ New in this release:
http://www.keylength.com/en/4/ the signing key was changed from http://www.keylength.com/en/4/ the signing key was changed from
1024D/ID:FFFFFFF1 to 9888R/ID:88888888. 1024D/ID:FFFFFFF1 to 9888R/ID:88888888.
* Job ad asking for GNU Parallel expertise
http://seattle.craigslist.org/est/sof/4006079352.html
* Agalma: an automated phylogenomics workflow * Agalma: an automated phylogenomics workflow
http://arxiv.org/pdf/1307.6432 http://arxiv.org/pdf/1307.6432
* Job ad asking for GNU Parallel expertise
http://seattle.craigslist.org/est/sof/4006079352.html
* Transient Beowulf Clustering with GNU Parallel and SSHfs * Transient Beowulf Clustering with GNU Parallel and SSHfs
http://www.reddit.com/r/linux/comments/1ka8mn/transient_beowulf_clustering_with_gnu_parallel/ http://www.reddit.com/r/linux/comments/1ka8mn/transient_beowulf_clustering_with_gnu_parallel/
@ -233,15 +233,21 @@ New in this release:
* Using GNU Parallel to roll-your-own Map Reduce! * Using GNU Parallel to roll-your-own Map Reduce!
http://www.rankfocus.com/?p=1 http://www.rankfocus.com/?p=1
* TimeMachineっぽいバックアップスクリプト
http://rio.tc/2013/07/timemachine-1.html
* Using GNU Parallel with Amazon * Using GNU Parallel with Amazon
media.amazonwebservices.com/AWS_Amazon_EMR_Best_Practices.pdf media.amazonwebservices.com/AWS_Amazon_EMR_Best_Practices.pdf
* Some useful comments on GNU Parallel * Some useful comments on GNU Parallel
https://news.ycombinator.com/item?id=6209767 https://news.ycombinator.com/item?id=6209767
* Using GNU Parallel to count reads mapping to genes for multiple BAMs
http://drduanehassane.com/blog/sequencing-resources
* TimeMachineっぽいバックアップスクリプト
http://rio.tc/2013/07/timemachine-1.html
* GNU ParallelでAndroid NDKの全バージョンを一括ダウンロード
http://qiita.com/mazgi/items/b10bf0ff3da2045d19ab
* Bug fixes and man page updates. * Bug fixes and man page updates.

View file

@ -68,7 +68,7 @@ if(@ARGV) {
my @fhlist; my @fhlist;
@fhlist = map { open_or_exit($_) } @opt::a; @fhlist = map { open_or_exit($_) } @opt::a;
if(not @fhlist) { if(not @fhlist and not $opt::pipe) {
@fhlist = (*STDIN); @fhlist = (*STDIN);
} }
if($opt::skip_first_line) { if($opt::skip_first_line) {
@ -280,7 +280,7 @@ if($Global::semaphore) {
$SIG{TERM} = \&start_no_new_jobs; $SIG{TERM} = \&start_no_new_jobs;
start_more_jobs(); start_more_jobs();
if($opt::pipe) { if($opt::pipe) {
spreadstdin(@fhlist); spreadstdin();
} }
::debug("Start draining\n"); ::debug("Start draining\n");
drain_job_queue(); drain_job_queue();
@ -301,7 +301,6 @@ sub __PIPE_MODE__ {}
sub spreadstdin { sub spreadstdin {
# read a record # read a record
# Spawn a job and print the record to it. # Spawn a job and print the record to it.
my @fhlist = @_; # Filehandles to read from (Defaults to STDIN)
my $buf = ""; my $buf = "";
my $header = ""; my $header = "";
if($opt::header) { if($opt::header) {
@ -343,7 +342,7 @@ sub spreadstdin {
my $chunk_number = 1; my $chunk_number = 1;
my $one_time_through; my $one_time_through;
my $blocksize = $opt::blocksize; my $blocksize = $opt::blocksize;
for my $in (@fhlist) { my $in = *STDIN;
piperead: while(1) { piperead: while(1) {
my $anything_written = 0; my $anything_written = 0;
if(not read($in,substr($buf,length $buf,0),$blocksize)) { if(not read($in,substr($buf,length $buf,0),$blocksize)) {
@ -426,7 +425,6 @@ sub spreadstdin {
::warning("A full record was not matched in a block. Increasing to --blocksize ".$blocksize."\n"); ::warning("A full record was not matched in a block. Increasing to --blocksize ".$blocksize."\n");
} }
} }
}
# If there is anything left in the buffer write it # If there is anything left in the buffer write it
substr($buf,0,0) = ""; substr($buf,0,0) = "";
write_record_to_pipe($chunk_number++,\$header,\$buf,$recstart,$recend,length $buf); write_record_to_pipe($chunk_number++,\$header,\$buf,$recstart,$recend,length $buf);
@ -731,7 +729,7 @@ sub get_options_from_array {
sub parse_options { sub parse_options {
# Returns: N/A # Returns: N/A
# Defaults: # Defaults:
$Global::version = 20130817; $Global::version = 20130819;
$Global::progname = 'parallel'; $Global::progname = 'parallel';
$Global::infinity = 2**31; $Global::infinity = 2**31;
$Global::debug = 0; $Global::debug = 0;
@ -1220,8 +1218,8 @@ sub read_args_from_command_line {
} }
} }
if($group eq $Global::arg_file_sep if($group eq $Global::arg_file_sep
or ($opt::internal_pipe_means_argfiles and $opt::pipe) or ($opt::internal_pipe_means_argfiles and $opt::pipe)
) { ) {
# Group of file names on the command line. # Group of file names on the command line.
# Append args into -a # Append args into -a
@ -1506,7 +1504,7 @@ sub start_another_job {
return 0; return 0;
} }
} while ($job->is_already_in_joblog()); } while ($job->is_already_in_joblog());
debug("Command to run on '".$job->sshlogin()."': '".$job->replaced()."'\n"); debug("Command to run on '".$job->sshlogin()->string()."': '".$job->replaced()."'\n");
if($job->start()) { if($job->start()) {
$Global::running{$job->pid()} = $job; $Global::running{$job->pid()} = $job;
if($opt::pipe) { if($opt::pipe) {
@ -2847,8 +2845,8 @@ sub processes_available_by_system_limit {
not $more_filehandles and last; not $more_filehandles and last;
$max_system_proc_reached and last; $max_system_proc_reached and last;
my $before_getting_arg = time; my $before_getting_arg = time;
if($Global::semaphore) { if($Global::semaphore or $opt::pipe) {
# Skip # Skip: No need to get args
} elsif(defined $opt::retries and $count_jobs_already_read) { } elsif(defined $opt::retries and $count_jobs_already_read) {
# For retries we may need to run all jobs on this sshlogin # For retries we may need to run all jobs on this sshlogin
# so include the already read jobs for this sshlogin # so include the already read jobs for this sshlogin
@ -4677,10 +4675,15 @@ sub new {
if($sum == 0) { if($sum == 0) {
if($command eq "") { if($command eq "") {
$command = $Global::replace{'{}'}; $command = $Global::replace{'{}'};
} else { } elsif($opt::pipe) {
# With --pipe you can have ::: or not
if(@opt::a) {
$command .=" ".$Global::replace{'{}'};
}
} else {
# Add {} to the command if there are no {...}'s # Add {} to the command if there are no {...}'s
$command .=" ".$Global::replace{'{}'}; $command .=" ".$Global::replace{'{}'};
} }
($sum,$len->{'no_args'},$len->{'context'},$len->{'contextgroups'}, ($sum,$len->{'no_args'},$len->{'context'},$len->{'contextgroups'},
%replacecount) = number_of_replacements($command,$context_replace); %replacecount) = number_of_replacements($command,$context_replace);
} }
@ -4730,11 +4733,15 @@ sub populate {
# max line length is reached # max line length is reached
# Returns: N/A # Returns: N/A
my $self = shift; my $self = shift;
if($opt::pipe) { # if($opt::pipe) {
# --pipe => Do no read any args # # --pipe => Do no read any args
$self->push([Arg->new("")]); # $self->push([Arg->new("")]);
return; # return;
} # }
# if($opt::roundrobin) {
# $self->push([Arg->new("")]);
# return;
# }
my $next_arg; my $next_arg;
while (not $self->{'arg_queue'}->empty()) { while (not $self->{'arg_queue'}->empty()) {
$next_arg = $self->{'arg_queue'}->get(); $next_arg = $self->{'arg_queue'}->get();
@ -5096,6 +5103,8 @@ sub replace_placeholders {
my $context_replace = $self->{'context_replace'}; my $context_replace = $self->{'context_replace'};
my $replaced; my $replaced;
# print ::my_dump($self);
if($self->{'context_replace'}) { if($self->{'context_replace'}) {
$replaced = $self->context_replace_placeholders($target,$quote); $replaced = $self->context_replace_placeholders($target,$quote);
} else { } else {
@ -5105,6 +5114,11 @@ sub replace_placeholders {
} }
sub context_replace_placeholders { sub context_replace_placeholders {
# Replace foo{}bar with fooargbar
# Input:
# target = foo{}bar
# quote = should this be quoted?
# Returns: $target
my $self = shift; my $self = shift;
my $target = shift; my $target = shift;
my $quote = shift; my $quote = shift;
@ -5143,6 +5157,10 @@ sub context_replace_placeholders {
while($tt =~ s/(\S*(?:$rep_str_regexp|$rep_str_pos_regexp|$rep_seq_regexp)\S*)/\0/o) { while($tt =~ s/(\S*(?:$rep_str_regexp|$rep_str_pos_regexp|$rep_seq_regexp)\S*)/\0/o) {
$word{$1} ||= 1; $word{$1} ||= 1;
} }
if(not %word) {
# The line did no contain any replacementstrings => return unchanged
return $target;
}
# For each word: Generate the replacement string for that word. # For each word: Generate the replacement string for that word.
for my $origword (keys %word) { for my $origword (keys %word) {
my @pos_replacements=(); my @pos_replacements=();
@ -5778,7 +5796,7 @@ sub read_arg_from_fh {
sub expand_combinations { sub expand_combinations {
# Input: # Input:
# ([xmin,xmax], [ymin,ymax], ...) # ([xmin,xmax], [ymin,ymax], ...)
# Returns ([x,y,...],[x,y,...]) # Returns: ([x,y,...],[x,y,...])
# where xmin <= x <= xmax and ymin <= y <= ymax # where xmin <= x <= xmax and ymin <= y <= ymax
my $minmax_ref = shift; my $minmax_ref = shift;
my $xmin = $$minmax_ref[0]; my $xmin = $$minmax_ref[0];

View file

@ -59,8 +59,6 @@ after the list of options. That will give you an idea of what GNU
@strong{parallel} is capable of. @strong{parallel} is capable of.
You can also watch the intro video for a quick introduction: You can also watch the intro video for a quick introduction:
http://tinyogg.com/watch/TORaR/ http://tinyogg.com/watch/hfxKj/ and
http://tinyogg.com/watch/YQuXd/ or
http://www.youtube.com/playlist?list=PL284C9FF2488BC6D1 http://www.youtube.com/playlist?list=PL284C9FF2488BC6D1
@chapter OPTIONS @chapter OPTIONS

View file

@ -5,7 +5,7 @@ seq 4 5 >/tmp/in45
echo "### Test basic --shebang-wrap" echo "### Test basic --shebang-wrap"
cat <<EOF > /tmp/basic--shebang-wrap cat <<EOF > /tmp/basic--shebang-wrap
#!/usr/local/bin/parallel --shebang-wrap /usr/bin/perl #!/usr/local/bin/parallel --shebang-wrap -k /usr/bin/perl
print "Shebang from perl with args @ARGV\n"; print "Shebang from perl with args @ARGV\n";
EOF EOF
@ -13,11 +13,11 @@ EOF
chmod 755 /tmp/basic--shebang-wrap chmod 755 /tmp/basic--shebang-wrap
/tmp/basic--shebang-wrap arg1 arg2 /tmp/basic--shebang-wrap arg1 arg2
echo "### Test basic --shebang-wrap Same as" echo "### Test basic --shebang-wrap Same as"
parallel /usr/bin/perl /tmp/basic--shebang-wrap ::: arg1 arg2 parallel -k /usr/bin/perl /tmp/basic--shebang-wrap ::: arg1 arg2
echo "### Test basic --shebang-wrap stdin" echo "### Test basic --shebang-wrap stdin"
(echo arg1; echo arg2) | /tmp/basic--shebang-wrap (echo arg1; echo arg2) | /tmp/basic--shebang-wrap
echo "### Test basic --shebang-wrap Same as" echo "### Test basic --shebang-wrap Same as"
(echo arg1; echo arg2) | parallel /usr/bin/perl /tmp/basic--shebang-wrap (echo arg1; echo arg2) | parallel -k /usr/bin/perl /tmp/basic--shebang-wrap
rm /tmp/basic--shebang-wrap rm /tmp/basic--shebang-wrap
@ -47,12 +47,6 @@ print "Shebang from perl with args @ARGV\n";
EOF EOF
chmod 755 /tmp/pipe--shebang-wrap chmod 755 /tmp/pipe--shebang-wrap
echo Suboptimal
/tmp/pipe--shebang-wrap :::: /tmp/in12 /tmp/in45
echo Optimal
/tmp/pipe--shebang-wrap /tmp/in12 /tmp/in45
echo "### Test --shebang-wrap --pipe with parser options Same as"
parallel -k --pipe /usr/bin/perl\ -p /tmp/pipe--shebang-wrap :::: /tmp/in12 /tmp/in45
echo "### Test --shebang-wrap --pipe with parser options stdin" echo "### Test --shebang-wrap --pipe with parser options stdin"
cat /tmp/in12 /tmp/in45 | /tmp/pipe--shebang-wrap cat /tmp/in12 /tmp/in45 | /tmp/pipe--shebang-wrap
echo "### Test --shebang-wrap --pipe with parser options Same as" echo "### Test --shebang-wrap --pipe with parser options Same as"

View file

@ -1,13 +1,18 @@
#!/bin/bash #!/bin/bash
echo '### Test --joblog with exitval and Test --joblog with signal' echo '### Test --joblog with exitval and Test --joblog with signal -- timing dependent'
parallel --joblog /tmp/parallel_joblog_exitval 'sleep {} && echo sleep was not killed=BAD' ::: 100 2>/dev/null & rm /tmp/parallel_sleep
parallel --joblog /tmp/parallel_joblog_signal 'sleep {}' ::: 100 2>/dev/null & #parallel --joblog /tmp/parallel_joblog_signal 'echo foo >/tmp/parallel_sleep; sleep {}' ::: 30 2>/dev/null &
parallel --joblog /tmp/parallel_joblog_signal 'sleep {}' ::: 30 2>/dev/null &
parallel --joblog /tmp/parallel_joblog_exitval 'echo foo >/tmp/parallel_sleep; sleep {} && echo sleep was not killed=BAD' ::: 30 2>/dev/null &
while [ ! -e /tmp/parallel_sleep ] ; do
sleep 1
done
sleep 1 sleep 1
killall -6 sleep killall -6 sleep
sleep 2 wait
grep -q 134 /tmp/parallel_joblog_exitval && echo exitval OK grep -q 134 /tmp/parallel_joblog_exitval && echo exitval=128+6 OK
grep -q '[^0-9]6[^0-9]' /tmp/parallel_joblog_signal && echo signal OK grep -q '[^0-9]6[^0-9]' /tmp/parallel_joblog_signal && echo signal OK
rm /tmp/parallel_joblog_exitval /tmp/parallel_joblog_signal #rm /tmp/parallel_joblog_exitval /tmp/parallel_joblog_signal

View file

@ -43,8 +43,4 @@ echo '### Test --wd .';
ssh $SSHLOGIN2 mkdir -p mydir; ssh $SSHLOGIN2 mkdir -p mydir;
mkdir -p $HOME/mydir; cd $HOME/mydir; mkdir -p $HOME/mydir; cd $HOME/mydir;
parallel --workdir . -S $SSHLOGIN2 ::: pwd parallel --workdir . -S $SSHLOGIN2 ::: pwd
echo '### bug #35544: --pipe would read from :::: (-a)';
parallel -j1 --pipe echo foo\;cat ::: a b c :::: <(seq 3) <(seq 4 6)
EOF EOF

View file

@ -47,33 +47,6 @@ Shebang from perl with args
Shebang from perl with args Shebang from perl with args
5 5
### Test --shebang-wrap --pipe with parser options ### Test --shebang-wrap --pipe with parser options
Suboptimal
Shebang from perl with args
1
Shebang from perl with args
2
Shebang from perl with args
4
Shebang from perl with args
5
Optimal
Shebang from perl with args
1
Shebang from perl with args
2
Shebang from perl with args
4
Shebang from perl with args
5
### Test --shebang-wrap --pipe with parser options Same as
Shebang from perl with args
1
Shebang from perl with args
2
Shebang from perl with args
4
Shebang from perl with args
5
### Test --shebang-wrap --pipe with parser options stdin ### Test --shebang-wrap --pipe with parser options stdin
Shebang from perl with args Shebang from perl with args
1 1

View file

@ -91,7 +91,7 @@ echo '### bug #39360: --joblog does not work with --pipe'
### bug #39360: --joblog does not work with --pipe ### bug #39360: --joblog does not work with --pipe
seq 100 | parallel --joblog - --pipe wc | tr '0-9' 'X' seq 100 | parallel --joblog - --pipe wc | tr '0-9' 'X'
Seq Host Starttime Runtime Send Receive Exitval Signal Command Seq Host Starttime Runtime Send Receive Exitval Signal Command
X : XXXXXXXXXX.XXX X.XXX X X X X wc X : XXXXXXXXXX.XXX X.XXX X X X X wc
XXX XXX XXX XXX XXX XXX
echo '### bug #39572: --tty and --joblog do not work' echo '### bug #39572: --tty and --joblog do not work'
### bug #39572: --tty and --joblog do not work ### bug #39572: --tty and --joblog do not work

View file

@ -1,3 +1,3 @@
### Test --joblog with exitval and Test --joblog with signal ### Test --joblog with exitval and Test --joblog with signal -- timing dependent
exitval OK exitval=128+6 OK
signal OK signal OK

View file

@ -70,16 +70,3 @@ redhat9.tange.dk
/home/parallel/mydir /home/parallel/mydir
### Test --wd . ### Test --wd .
/home/parallel/mydir /home/parallel/mydir
### bug #35544: --pipe would read from :::: (-a)
foo
a
b
c
foo
1
2
3
foo
4
5
6