Source code for fundamentals.fmultiprocess

#!/usr/local/bin/python
# encoding: utf-8
"""
*A function to quickly add multiprocessing to any program*

Author
: David Young
"""

import os

os.environ["TERM"] = "vt100"


[docs] def fmultiprocess( log, function, inputArray, poolSize=False, timeout=3600, turnOffMP=False, progressBar=False, mute=False, **kwargs, ): """multiprocess pool **Key Arguments** - ``log`` -- logger - ``function`` -- the function to multiprocess - ``inputArray`` -- the array to be iterated over - ``poolSize`` -- limit the number of CPU that are used in multiprocess job - ``timeout`` -- time in sec after which to raise a timeout error if the processes have not completed - ``turnOffMP`` -- turn off multiprocessing. Useful for profiling and debugging. Default **False** - ``progressBar`` -- add a progress bar - ``mute`` -- mute terminal output from child processes **Return** - ``resultArray`` -- the array of results **Usage** ```python from fundamentals import multiprocess # DEFINE AN INPUT ARRAY inputArray = range(10000) results = multiprocess(log=log, function=functionName, poolSize=10, timeout=300, inputArray=inputArray, otherFunctionKeyword="cheese") ``` """ log.debug("starting the ``multiprocess`` function") import multiprocess as mp import time import inspect from functools import partial import psutil import os import sys logFound = "log" in inspect.signature(function).parameters global theseBatches if turnOffMP == False: import psutil from multiprocess import cpu_count, Pool from ctypes import c_int32 # DEFINTE POOL SIZE - NUMBER OF CPU CORES TO USE (BEST = ALL - 1) if not poolSize: poolSize = psutil.cpu_count() if mute: # MUTE STDOUT AND PRINTING TO TERMINAL def startFunc(log, l, c): global counter_lock global counter counter = c counter_lock = l import logging streamHandlers = [ h for h in log.handlers if not isinstance(h, logging.FileHandler) ] streamHandlersLevel = [ h.level for h in log.handlers if not isinstance(h, logging.FileHandler) ] [ h.setLevel(logging.WARNING) for h in log.handlers if not isinstance(h, logging.FileHandler) ] sys.stdout = open(os.devnull, "w") else: def startFunc(log, l, c): global counter_lock global counter counter = c counter_lock = l pass # COUNTER AND LOCK FOR PROGRESS BAR c = mp.Value(c_int32) l = mp.Lock() if poolSize: p = Pool(processes=poolSize, initializer=startFunc, initargs=(log, l, c)) else: p = Pool(initializer=startFunc, initargs=(log, l, c)) cpuCount = psutil.cpu_count() chunksize = max(1, (len(inputArray) + 1) // (cpuCount * 3)) if chunksize == 0: chunksize = 1 def thisFunction(p): result = mapfunc(p) # WE CAN DO SOMETHING ELSE AFTER RUNNING OF SINGLE FUNCTIONS with counter_lock: counter.value += 1 return result if logFound: mapfunc = partial(function, log=log, **kwargs) else: mapfunc = partial(function, **kwargs) if not timeout: # 1 HRS timeout = 60 * 60 start_time = time.time() futureArray = p.map_async(thisFunction, inputArray, chunksize=chunksize) if progressBar: import tqdm with tqdm.tqdm(total=len(inputArray)) as pbar: while not futureArray.ready(): current_time = time.time() if current_time > start_time + timeout: raise TimeoutError( f"The timeout limit of {timeout}s has been reached" ) if c.value != 0: with l: increment = c.value c.value = 0 pbar.update(n=increment) time.sleep(0.5) if c.value != 0: with l: increment = c.value c.value = 0 pbar.update(n=increment) try: resultArray = futureArray.get(timeout=timeout) except Exception as e: log.error(f"Multiprocessing error: {e}") raise p.close() p.join() else: resultArray = [] if logFound: for i in inputArray: r = function(i, log=log, **kwargs) resultArray.append(r) else: for i in inputArray: r = function(i, **kwargs) resultArray.append(r) log.debug("completed the ``multiprocess`` function") return resultArray