optional: Python loader works with new --results format.

This commit is contained in:
Drew Frank 2012-12-21 20:01:23 -08:00
parent 05a08c55b6
commit 701445aac6
20 changed files with 25 additions and 59 deletions

View file

@ -2,5 +2,4 @@
# #
# Generate the result files used to test the query modules. # Generate the result files used to test the query modules.
../parallel --header : --result testresults/foo echo {a} {b} ::: a 1 2 ::: b 0.30 0.40 ../parallel --header : --result testresults echo {a} {b} ::: a 1 2 ::: b 0.30 0.40
../parallel --header : --result testresults/bar echo {a} {b} ::: a 5 6 ::: b 0.70 0.80

View file

@ -14,7 +14,7 @@ Sample usage:
1. Generate some results files by running parallel from the command line: 1. Generate some results files by running parallel from the command line:
# mkdir out # mkdir out
# parallel --header : --results out/pfx echo {arg1} {arg2} ::: arg1 1 2 ::: arg2 three four # parallel --header : --results out echo {arg1} {arg2} ::: arg1 1 2 ::: arg2 three four
2. Load the results using the gnuparallel Python package: 2. Load the results using the gnuparallel Python package:
@ -24,16 +24,13 @@ Sample usage:
Type "help", "copyright", "credits" or "license" for more information. Type "help", "copyright", "credits" or "license" for more information.
>>> import gnuparallel >>> import gnuparallel
>>> help(gnuparallel.load) >>> help(gnuparallel.load)
!!! THIS PART IS BROKEN !!!
>>> my_df = gnuparallel.load('out') >>> my_df = gnuparallel.load('out')
>>> my_df >>> my_df
_prefix _stream arg1 arg2 resfile _stream arg1 arg2 resfile
0 pfx stdout 1 three out/pfxstdout arg1 1 arg2 three 0 stdout 2 three out/arg1/2/arg2/three/stdout
1 pfx stdout 1 four out/pfxstdout arg1 1 arg2 four 1 stdout 2 four out/arg1/2/arg2/four/stdout
2 pfx stdout 2 three out/pfxstdout arg1 2 arg2 three 2 stdout 1 three out/arg1/1/arg2/three/stdout
3 pfx stdout 2 four out/pfxstdout arg1 2 arg2 four 3 stdout 1 four out/arg1/1/arg2/four/stdout
>>> my_df.tail(1)
3 pfx stdout 2 four out/pfxstdout arg1 2 arg2 four
See documentation for the pandas project (http://pandas.pydata.org/) for See documentation for the pandas project (http://pandas.pydata.org/) for
instructions on how to access and manipulate the loaded results. instructions on how to access and manipulate the loaded results.

View file

@ -8,7 +8,7 @@ import pandas as pd
import os import os
def load(_dir, _process=None, _format=None, _stream='stdout', def load(_dir, _process=None, _format=None, _stream='stdout',
_prefix=None, _infer_types=True, **options): _infer_types=True, **options):
"""Load files generated with parallel's --result option. """Load files generated with parallel's --result option.
One use of GNU parallel is to call one command many times, each One use of GNU parallel is to call one command many times, each
@ -43,14 +43,6 @@ def load(_dir, _process=None, _format=None, _stream='stdout',
_stream : str, optional _stream : str, optional
Specify either "stdout" or "stderr" to load results files from the Specify either "stdout" or "stderr" to load results files from the
corresponding stream. Default is "stdout". corresponding stream. Default is "stdout".
_prefix : str, optional
Only load result files with a specific prefix. When using the --result
option to parallel it is possible to specify a prefix for all of the
result files. For example,
parallel --result /some/dir/a_prefix ...
would place all result files into the `/some/dir` directory and all of
the file names would begin with "a_prefix". This parameter lets you
filter based on this prefix. If None, allow any prefix. Default None.
_infer_types : bool, optional _infer_types : bool, optional
Infer data types for option values. All option values are techinically Infer data types for option values. All option values are techinically
strings (since they were passed on the command line). When _infer_types strings (since they were passed on the command line). When _infer_types
@ -88,14 +80,15 @@ def load(_dir, _process=None, _format=None, _stream='stdout',
for k,v in options.iteritems(): for k,v in options.iteritems():
options[k] = set(_stringify(x, _format.get(k, '')) for x in v) options[k] = set(_stringify(x, _format.get(k, '')) for x in v)
options['_stream'] = [_stream] options['_stream'] = [_stream]
if _prefix:
options['_prefix'] = [_prefix]
# Iterate over results files and collect the matches. # Iterate over results files and collect the matches.
matches = [] matches = []
for file in os.listdir(_dir): normdir = os.path.normpath(_dir)
metadata = _parse_name(file) for path, file in _find_results(normdir):
metadata['resfile'] = os.path.join(_dir, metadata['resfile']) # Don't include the root path as part of the metadata string.
metadata = _parse_path(path[len(normdir):])
metadata['_stream'] = file
metadata['resfile'] = os.path.join(path, file)
if _select(metadata, options): if _select(metadata, options):
matches.append(metadata) matches.append(metadata)
@ -117,19 +110,16 @@ def load(_dir, _process=None, _format=None, _stream='stdout',
return df return df
def _parse_name(file, sep='\t'): def _find_results(root):
"""Return a dict containing metadata extracted from the file name.""" """Find all regular files in a directory."""
tokens = file.split(sep) for (path, dirs, files) in os.walk(root):
prefix_stream = tokens[0] for file in files:
metadata = {k:v for k,v in zip(tokens[1::2], tokens[2::2])} yield (path, file)
stream_index = prefix_stream.find('stdout') def _parse_path(path):
if stream_index == -1: """Return a dict containing metadata extracted from a file's path."""
stream_index = prefix_stream.find('stderr') tokens = path.split(os.path.sep)
prefix, stream = prefix_stream[:stream_index], prefix_stream[stream_index:] return {k:v for k,v in zip(tokens[1::2], tokens[2::2])}
metadata.update({'_prefix': prefix, '_stream': stream, 'resfile': file})
return metadata
def _select(metadata, filter): def _select(metadata, filter):
"""Return true if the metadata entry matches the filter, False otherwise.""" """Return true if the metadata entry matches the filter, False otherwise."""

View file

@ -9,20 +9,8 @@ class TestLoader(unittest.TestCase):
def test_basics(self): def test_basics(self):
df = load(result_dir) df = load(result_dir)
self.assertEqual(set(df.columns), set(['a', 'b', '_prefix', 'resfile', '_stream'])) self.assertEqual(set(df.columns), set(['a', 'b', 'resfile', '_stream']))
self.assertEqual(df.shape[0], 8)
def test_prefix(self):
df = load(result_dir, _prefix='foo_')
self.assertEqual(df.shape[0], 4) self.assertEqual(df.shape[0], 4)
self.assertEqual(df.a.sum(), 6)
df = load(result_dir, _prefix='bar_')
self.assertEqual(df.shape[0], 4)
self.assertEqual(df.a.sum(), 22)
df = load(result_dir, _prefix='BAD')
self.assertTrue(df.empty)
def test_filters(self): def test_filters(self):
df = load(result_dir, a=2) df = load(result_dir, a=2)
@ -60,7 +48,7 @@ class TestLoader(unittest.TestCase):
def test_process(self): def test_process(self):
df = load(result_dir, a=1, _process=lambda x: pd.np.loadtxt(x).sum()) df = load(result_dir, a=1, _process=lambda x: pd.np.loadtxt(x).sum())
self.assertAlmostEqual(df.res[0], 1.4) self.assertAlmostEqual(df.sum()['res'], 2.7)
if __name__ == '__main__': if __name__ == '__main__':
unittest.main() unittest.main()

View file

@ -1 +0,0 @@
5 0.70

View file

@ -1 +0,0 @@
5 0.80

View file

@ -1 +0,0 @@
6 0.70

View file

@ -1 +0,0 @@
6 0.80

View file

@ -1 +0,0 @@
1 0.30

View file

@ -1 +0,0 @@
1 0.40

View file

@ -1 +0,0 @@
2 0.30

View file

@ -1 +0,0 @@
2 0.40