Phyton How to implement parallel, delayed in such a way that the parallelized for loop stops when output goes below a threshold? python implement parallel processing

Suppose I have the following code:

from scipy import *
import multiprocessing as mp
num_cores = mp.cpu_count()
from joblib import Parallel, delayed
import matplotlib.pyplot as plt

def func(x,y):
    return y/x
def main(y, xmin,xmax, dx):
    x = arange(xmin,xmax,dx)
    output = Parallel(n_jobs=num_cores)(delayed(func)(i, y) for i in x)
    return x, asarray(output)
def demo():
    x,z = main(2.,1.,30.,.1)
    plt.plot(x,z, label='All values')
    plt.plot(x[z>.1],z[z>.1], label='desired range') ## This is better to do in main()
    plt.show()

demo()

I want to calculate output only until output > a given number (it can be assumed that elements of output decreases monotonically with increase of x) and then stop (NOT calculating for all values of x and then sorting, that's inefficient for my purpose). Is there any way to do that using Parallel, delayed or any other multiprocessing?

Answer:1

There was no output > a given number specified so I just made one up. after testing I had to reverse the condition for proper operation output < a given number.

I would use a pool, launch the processes with a callback function to check the stop condition, then terminate the pool when ready. but that would cause a race condition which would allow results to be omitted from running processes that were not allowed to finish. I think this method has minimal modification to your code and is very easy to read. The order of list is NOT guaranteed.

Pros: very little overhead
Cons: could have missing results.

Method 1)

from scipy import *
import multiprocessing

import matplotlib.pyplot as plt


def stop_condition_callback(ret):
        output.append(ret)
        if ret < stop_condition:
            worker_pool.terminate()


def func(x, y, ):
    return y / x


def main(y, xmin, xmax, dx):
    x = arange(xmin, xmax, dx)
    print("Number of calculations: %d" % (len(x)))

    # add calculations to the pool
    for i in x:
        worker_pool.apply_async(func, (i, y,), callback=stop_condition_callback)

    # wait for the pool to finish/terminate
    worker_pool.close()
    worker_pool.join()

    print("Number of results: %d" % (len(output)))
    return x, asarray(output)


def demo():
    x, z_list = main(2., 1., 30., .1)
    plt.plot(z_list, label='desired range')
    plt.show()


output = []
stop_condition = 0.1

worker_pool = multiprocessing.Pool()
demo()

This method has more overhead but will allow processes which have started to finish. Method 2)

from scipy import *
import multiprocessing

import matplotlib.pyplot as plt


def stop_condition_callback(ret):
    if ret is not None:
        if ret < stop_condition:
            worker_stop.value = 1
        else:
            output.append(ret)


def func(x, y, ):
    if worker_stop.value != 0:
        return None
    return y / x


def main(y, xmin, xmax, dx):
    x = arange(xmin, xmax, dx)
    print("Number of calculations: %d" % (len(x)))

    # add calculations to the pool
    for i in x:
        worker_pool.apply_async(func, (i, y,), callback=stop_condition_callback)

    # wait for the pool to finish/terminate
    worker_pool.close()
    worker_pool.join()

    print("Number of results: %d" % (len(output)))
    return x, asarray(output)


def demo():
    x, z_list = main(2., 1., 30., .1)
    plt.plot(z_list, label='desired range')
    plt.show()


output = []
worker_stop = multiprocessing.Value('i', 0)
stop_condition = 0.1

worker_pool = multiprocessing.Pool()
demo()

Method 3) Pros: No results will be left out
Cons: This steps way outside what you would normally do.

take Method 1 and add

def stopPoolButLetRunningTaskFinish(pool):
    # Pool() shutdown new task from being started, by emptying the query all worker processes draw from
    while pool._task_handler.is_alive() and pool._inqueue._reader.poll():
        pool._inqueue._reader.recv()
    # Send sentinels to all worker processes
    for a in range(len(pool._pool)):
            pool._inqueue.put(None)

Then change stop_condition_callback

def stop_condition_callback(ret):
    if ret[1] < stop_condition:
        #worker_pool.terminate()
        stopPoolButLetRunningTaskFinish(worker_pool)
    else:
        output.append(ret)
Answer:2



  1. python implement parallel processing

How can I format IPython html display of pandas dataframes so that numbers are right justified numbers have commas as thousands separator large floats have no decimal places I understand that numpy ...

How can I format IPython html display of pandas dataframes so that numbers are right justified numbers have commas as thousands separator large floats have no decimal places I understand that numpy ...

I am trying to read in a csv file with numpy.genfromtxt but some of the fields are strings which contain commas. The strings are in quotes, but numpy is not recognizing the quotes as defining a ...

I am trying to read in a csv file with numpy.genfromtxt but some of the fields are strings which contain commas. The strings are in quotes, but numpy is not recognizing the quotes as defining a ...