Source code for smpl.parallel.parallel

from multiprocessing import Process, Queue
from smpl.doc import append_doc


def queued(q, f, *args, **kwargs):
    q.put(f(*args, **kwargs))


[docs]def res(a): """ Parallel evaluation of the list generator from :func:`gen`. Return parallel executed values. Examples -------- >>> def twice(x): ... return x+x >>> for r in [calc(twice,i) for i in range(0,5)]: ... print(res(r)) 0 2 4 6 8 >>> res([calc(lambda x : x**3, i) for i in range(0,5)]) [0, 1, 8, 27, 64] """ if isinstance(a, list): return [next(k) for k in a] return next(a)
[docs]def gen(f, *args, **kwargs): """Generates parallel execution list generator.""" q = Queue() p = Process(target=queued, args=(q, f, *args), kwargs=kwargs) yield p.start() yield [q.get(), p.join()][0]
[docs]@append_doc(res) def calc(f, *args, **kwargs): g = gen(f, *args, **kwargs) next(g) return g
[docs]def par(f, *args, **kwargs): """ Parallel execution of f on each element of args and kwargs Examples -------- >>> par(lambda x : x**2, range(0,5)) [0, 1, 4, 9, 16] """ return res([calc(f, *[args[k][i] for k in range(len(args))], **{k: v[i] for k, v in kwargs.items()}) for i in range(len(args[0]) if len(args) > 0 else len(next(iter(kwargs.values()))))])
parallel = par if __name__ == "__main__": import doctest doctest.testmod()