add gnuparallel python package.

This commit is contained in:
Drew Frank 2012-10-07 22:48:25 -07:00
parent 141eb4d5ea
commit b2f8450bf3
23 changed files with 254 additions and 0 deletions

6
src/optional/genresults.sh Executable file
View file

@ -0,0 +1,6 @@
#!/bin/bash
#
# Generate the result files used to test the query modules.
../parallel --header : --result testresults/foo_ echo {a} {b} ::: a 1 2 ::: b 0.30 0.40
../parallel --header : --result testresults/bar_ echo {a} {b} ::: a 5 6 ::: b 0.70 0.80

4
src/optional/python/.gitignore vendored Normal file
View file

@ -0,0 +1,4 @@
*.pyc
build
dist
*.egg-info

View file

@ -0,0 +1,9 @@
gnuparallel : Simple loading of GNU parallel result files.
The gnuparallel package provides a single function, `load`, which
loads results from files generated by GNU parallel into a Pandas
DataFrame object. See `help(gnuparallel.load)` for details.
Installation:
`python setup.py install`

View file

@ -0,0 +1,3 @@
"""Load GNU Parallel --results files into a Pandas DataFrame."""
from _loader import *

View file

@ -0,0 +1,143 @@
"""
A function for loading the --result files generated by GNU Parallel.
"""
__all__ = ['load']
from cStringIO import StringIO
import pandas as pd
import os
def load(_dir, _process=None, _format=None, _stream='stdout',
_prefix=None, _infer_types=True, **options):
"""Load files generated with parallel's --result option.
One use of GNU parallel is to call one command many times, each
time with a different set of arguments. With the --result option,
parallel will capture stdout and stderr from these processes and
store them in files named according to the arguments of each
individual call. This function provides easy loading of these
result files into a Pandas DataFrame.
Parameters
----------
_dir : str
Directory containing the results files.
_process : function, optional
Function that opens a results file and returns an object containing
its results. If not provided, the resulting data frame will include
a column containing the file names, not the actual results.
If provided, the function should take a filename as its sole parameter.
Whatever the function returns will be stored in the "res" column of
the resulting DataFrame.
_format : dict, optional
Dictionary of format strings, used to convert any provided filter
values to a format matching the results file names.
For example, if the `foo` parameter to parallel was "0.10" and you pass
foo=0.10 as an option, you will not find the intended file because
str(0.10) == "0.1". To fix this, you should also include the key-value
pair "foo": "%.2f" in the _format dict. This is usually only necessary
for float-valued arguments where rounding or precision issues might
affect the matching process.
_stream : str, optional
Specify either "stdout" or "stderr" to load results files from the
corresponding stream. Default is "stdout".
_prefix : str, optional
Only load result files with a specific prefix. When using the --result
option to parallel it is possible to specify a prefix for all of the
result files. For example,
parallel --result /some/dir/a_prefix ...
would place all result files into the `/some/dir` directory and all of
the file names would begin with "a_prefix". This parameter lets you
filter based on this prefix. If None, allow any prefix. Default None.
_infer_types : bool, optional
Infer data types for option values. All option values are techinically
strings (since they were passed on the command line). When _infer_types
is True, the resulting DataFrame will convert these values to inferred
dtypes, e.g. the number 1 instead of "1". Default True.
**options : kwargs
Additional keyword arguments that will be used to filter the subset
of results included in the output. The values can be either single
values or iterables. If they are iterable, files corresponding to any
of the included values will be considered a match.
For example, passing `foo=[1,2,3]` will include results from files
corresponding to runs where the parallel argument named `foo` had
the value "1", "2", or "3".
See also the _format parameter.
Returns
-------
res : pandas.DataFrame
A DataFrame with one column named for each of the parallel arguments
and, depending on the _process argument, either:
- A "res" column containing the results corresponding to each run.
- A "resfile" column containing the names of the results files.
"""
if _format is None:
_format = dict()
# Process the filter options.
for k,v in options.iteritems():
if hasattr(v, '__iter__') and not isinstance(v, basestring):
pass # v is already a container type.
else:
options[k] = [v]
for k,v in options.iteritems():
options[k] = set(_stringify(x, _format.get(k, '')) for x in v)
options['_stream'] = [_stream]
if _prefix:
options['_prefix'] = [_prefix]
# Iterate over results files and collect the matches.
matches = []
for file in os.listdir(_dir):
metadata = _parse_name(file)
metadata['resfile'] = os.path.join(_dir, metadata['resfile'])
if _select(metadata, options):
matches.append(metadata)
# Create a DataFrame from the matches.
df = pd.DataFrame(matches)
if _process and not df.empty:
df['res'] = df.resfile.apply(_process)
df = df.drop('resfile', axis=1)
# Optionally try to convert string argument values to numeric types.
if _infer_types:
buf = StringIO()
df.to_csv(buf)
df = pd.read_csv(StringIO(buf.getvalue()), index_col=0)
return df
def _parse_name(file, sep='\t'):
"""Return a dict containing metadata extracted from the file name."""
tokens = file.split(sep)
prefix_stream = tokens[0]
metadata = {k:v for k,v in zip(tokens[1::2], tokens[2::2])}
stream_index = prefix_stream.find('stdout')
if stream_index == -1:
stream_index = prefix_stream.find('stderr')
prefix, stream = prefix_stream[:stream_index], prefix_stream[stream_index:]
metadata.update({'_prefix': prefix, '_stream': stream, 'resfile': file})
return metadata
def _select(metadata, filter):
"""Return true if the metadata entry matches the filter, False otherwise."""
if any(k not in metadata for k in filter):
return False
if any(all(v != metadata[k] for v in vs) for k,vs in filter.iteritems()):
return False
return True
def _stringify(x, fmt):
"""Return the string representation of x, using a format string if provided"""
if fmt:
return fmt % x
else:
return str(x)

15
src/optional/python/setup.py Executable file
View file

@ -0,0 +1,15 @@
#!/usr/bin/env python
from distutils.core import setup
setup(
name = 'gnuparallel',
version = '0.1',
description = 'Load GNU parallel result files.',
author = 'Drew Frank',
author_email = 'drewfrank@gmail.com',
packages = [
'gnuparallel'
],
install_requires = ['pandas']
)

View file

@ -0,0 +1,66 @@
import pandas as pd
import unittest
from gnuparallel import load
result_dir = '../../testresults'
class TestLoader(unittest.TestCase):
def test_basics(self):
df = load(result_dir)
self.assertEqual(set(df.columns), set(['a', 'b', '_prefix', 'resfile', '_stream']))
self.assertEqual(df.shape[0], 8)
def test_prefix(self):
df = load(result_dir, _prefix='foo_')
self.assertEqual(df.shape[0], 4)
self.assertEqual(df.a.sum(), 6)
df = load(result_dir, _prefix='bar_')
self.assertEqual(df.shape[0], 4)
self.assertEqual(df.a.sum(), 22)
df = load(result_dir, _prefix='BAD')
self.assertTrue(df.empty)
def test_filters(self):
df = load(result_dir, a=2)
self.assertEqual(df.shape[0], 2)
self.assertEqual(df.a.sum(), 4)
df = load(result_dir, a=[2])
self.assertEqual(df.shape[0], 2)
self.assertEqual(df.a.sum(), 4)
df = load(result_dir, a=[1,2])
self.assertEqual(df.shape[0], 4)
self.assertEqual(df.a.sum(), 6)
df = load(result_dir, a=1000)
self.assertTrue(df.empty)
def test_infer_types(self):
df = load(result_dir)
self.assertEqual(df.a.dtype, pd.np.int64)
df = load(result_dir, _infer_types=False)
self.assertEqual(df.a.dtype, pd.np.object_)
def test_format(self):
df = load(result_dir, b=0.3)
self.assertTrue(df.empty)
df = load(result_dir, b=0.3, _format={'b': '%.2f'})
self.assertEqual(df.shape[0], 2)
def test_stream(self):
df = load(result_dir, _stream='stderr')
self.assertTrue((df._stream == 'stderr').all())
def test_process(self):
df = load(result_dir, a=1, _process=lambda x: pd.np.loadtxt(x).sum())
self.assertAlmostEqual(df.res[0], 1.4)
if __name__ == '__main__':
unittest.main()

View file

@ -0,0 +1 @@
5 0.70

View file

@ -0,0 +1 @@
5 0.80

View file

@ -0,0 +1 @@
6 0.70

View file

@ -0,0 +1 @@
6 0.80

View file

@ -0,0 +1 @@
1 0.30

View file

@ -0,0 +1 @@
1 0.40

View file

@ -0,0 +1 @@
2 0.30

View file

@ -0,0 +1 @@
2 0.40