Marius Stanca.

Asynchronously executing callables (Python 2.7)

2018-03-16

Launching parallel tasks with concurrent.futures lib. This can be performed with threads (ThreadPoolExecutor), or separate processes (ProcessPoolExecutor). Both are part to the same interfaces, which is defined by the Executor class.

If you want to use this lib, you need to install it: pip install futures, this does not exist in Python 2.7. In >= Python 3.2 already exists.

Let's take an example how this works.
No concurrency:

#!/usr/bin/env python

from Queue import Queue
import random
import time

q = Queue()
numList = [1,2,3,4,5,6,7,8,9,10]

def calc(x):
    if random.randint(0,1):
        time.sleep(0.1)
    result = x + x
    q.put(result)

def main():
    for num in numList:
        calc(num)

    while not q.empty():
        print(q.get())

if __name__ == "__main__":
    main()

I have a list of numbers, calc() function that i use for calculate (addition) and Queue for storing the results. These jobs are executed one after the other but they are independent from each other. So, these could be processed parallely.

ThreadPoolExecutor:

#!/usr/bin/env python

from Queue import Queue
import concurrent.futures
import random
import time

q = Queue()
numList = [1,2,3,4,5,6,7,8,9,10]

def calc(x):
    if random.randint(0,1):
        time.sleep(0.1)

    result = x + x
    q.put(result)

def main():
    with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
        for num in numList:
            executor.submit(calc, num)

    while not q.empty():
        print(q.get())


if __name__ == "__main__":
    main()

Here we have a pool (list) where the jobs are added. For this pool we have 3 threads, each thread take the job out from the pool, execute it and when it is done takes another job to be processed from the pool.

ProcessPoolExecutor:

#!/usr/bin/env python

import sys
import redis
import concurrent.futures

r = redis.Redis()
numList = [1,2,3,4,5,6,7,8,9,10]

def check_server():
    try:
        r.info()
    except redis.exceptions.ConnectionError:
        print(sys.stderr, "Error: cannot connect to redis server. Is the server running?")
        sys.exit(1)

def calc(x):
    result = x + x
    r.rpush("test", result)

def main():
    with concurrent.futures.ProcessPoolExecutor(max_workers=3) as executor:
        for num in numList:
            executor.submit(calc, num)

    print(r.lrange("test", 0, -1))


if __name__ == "__main__":
    check_server()
    r.delete("test")
    main()

Same as previous example, but i replaced ThreadPoolExecutorwith ProcessPoolExecutor.
Another change, i do not use Queue, i use redis for storing list result. Queue is made for threads not process.

Now you have an idea how to use futures lib. What solution is good? Depends of your problems, but i can tell you, threads are good for I/O tasks and processes are good for CPU-bound tasks.

Note

In Python 3.x, Queuelib has been renamed in queue name.


© 2018. mariuss.me. Build with Python and Skeleton.