How to Muli-thread with Python to Speed up Your Code
As Python was first developed 29 years ago, it is unsurprisingly that it was designed more as a linear programming language when single-core CPUs are still dominating the market. In fact, CPython developer may still be feeling the heat when it comes to concurrency. Luckily, we Python noobs can lay back and enjoy the fruits of PEP 371, where multiprocessing
was officially added to the standard libraries back in 2008, and PEP 3156 where asyncio
made its way to standard libraries in 2012. In the first part of the series on parallelism with Python, we are going to look into multithreading, how to implement it with multiprocessing
.
TL;DR: Parallelise a CPU-bound task with multiprocessing, and a I/O-bound task with multithreading
Thread is a separate flow of execution. When you have a pool of worker threads, they will be executing close-to concurrently. These threads will share a common data space, hence the concept of Global Interpretor Lock (GIL) is important when you try to multi-thread your python script. What GIL does is, in short limit one Python thread to run at a time to avoid inconsistent changes in shared memories, or in long is to create a thread-safe memory management environment for including C libraries that are not thread-safe into the Python ecosystem (or go read more on CPython, unfortunately this is out of my realm of knowledge). As such, threads will be locking the caller thread when they need to use the CPU for computations. This makes threads less efficient for CPU-bound tasks and more so for I/O-bound tasks, e.g. networking, issuing database operations, etc.
Process can be understood as a separate Python process that has been forked from the parent process and has its own Python interpretor. Because of that, each process will has its own GIL, and will not lock other processes out when executing on a CPU core. The price for avoiding the GIL bottleneck is to have a larger memory overhead as a copy of the address space, or copy-on-write if supported is needed for every process. Because of that, processes are usually more preferable when conducting CPU-bound tasks e.g. matrix manipulations.
Python’s standard library, multiprocessing
has an interface for threading available via multiprocessing.pool.Pool
. For seasoned Python veterans, threading
was the original library for this. This interface provides the following functionalities, but each method has different restrictions on how arguments can be passed and without easy way for us to track progress:
- apply(func[, args[, kwds]]) (This is implemented as
apply_async( ... ).get()
) - apply_async(func[, args[, kwds[, callback[, error_callback]]]])
- map(func, iterable[, chunksize]) (This is implemented as
map_async( ... ).get()
) - map_async(func, iterable[, chunksize[, callback[, error_callback]]])
- imap(func, iterable[, chunksize])
- imap_unordered(func, iterable[, chunksize])
- starmap(func, iterable[, chunksize]) (This is implemented as
starmap_async( ... ).get()
) - starmap_async(func, iterable[, chunksize[, callback[, error_callback]]])
Before we will dive into these methods, discuss the pros and cons, let’s set the scene first:
When running it with %time func(0)
, we have got the following results:
CPU times: user 89.6 ms, sys: 35.6 ms, total: 125 ms
Wall time: 127 ms
Let’s say we want to, for some reason, run it a hundred times:
n = 100
data = range(n)# for loop
%%timeit -n 3
for i in data:
func(i)>>> 7.93 s ± 67.5 ms per loop (mean ± std. dev. of 7 runs, 3 loops each)# for loop with tqdm
%%capture capture
%%timeit -n 3
for i in tqdm(data):
func(i)
print(capture.stdout.strip())>>> 7.93 s ± 67.5 ms per loop (mean ± std. dev. of 7 runs, 3 loops each)
Pass a function to the pool of threads. This comes in two variants: .apply()
and .apply_async()
. When using the apply()
method, you will need to pass a callable, together with some optional args and/or kwargs. When executing the function, it would block the calling thread until the result is ready or an exception has been raised. Hence, apply_async()
is usually preferred.
When using apply_async()
, instead of the actual result, it would return a multiprocessing.pool.ApplyResult
which is essentially a promise of a result that you can obtain using multiprocessing.pool.ApplyResult.get()
function. you can also pass in a callback
and a error_callback
function which will be executed when the thread has finished it job and when it has failed like it has always been in my code respectively.
Pros:
- passing functions to the worker pool one by one allow better flexibility in scheduling
- the only variant in Python’s implementation that supports both args and kwargs
Cons:
- assigning functions one by one increasing network overhead in the form of inter-thread communications
- apply will block the caller thread, and can easily be slower than the benchmark
- depending on the specific function, using
apply_async
may have worse throughput as the payload cannot be passed as batches
APPLY
%%timeit -n 3
with Pool(processes=n_job) as pool:
results = list(pool.apply(func, args=[i]) for i in data)>>> 11.1 s ± 901 ms per loop (mean ± std. dev. of 7 runs, 3 loops each)
APPLY + TQDM
%%capture capture
%%timeit -n 1
with Pool(processes=n_job) as pool:
results = list(tqdm((pool.apply(func, args=[i]) for i in data), total=n))print(capture.stdout.strip())>>> 11.3 s +- 1.19 s per loop (mean +- std. dev. of 7 runs, 1 loop each)
APPLY_ASYNC
%%timeit -n 10
with Pool(processes=n_job) as pool:
results = list(pool.apply_async(func, args=(i, )) for i in data)
results = [r.get() for r in results]>>> 3 s ± 159 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)
APPLY_ASYNC + TQDM
%%capture capture
%%timeit -n 10
with Pool(processes=n_job) as pool:
results = [pool.apply_async(func, args=(i, )) for i in data]
results = [r.get() for r in tqdm(results)]print(capture.stdout.strip())>>> 3.08 s +- 108 ms per loop (mean +- std. dev. of 7 runs, 10 loops each)
APPLY_ASYNC + TQDM in Callback
%%capture capture
%%timeit -n 10
with tqdm(total=n) as pbar:
with Pool(processes=n_job) as pool:
def callback(*args):
# callback
pbar.update()
return
results = [
pool.apply_async(
func,
args=(i, ),
callback=callback) for i in data]
results = [r.get() for r in results]print(capture.stdout.strip())>>> 3.08 s +- 105 ms per loop (mean +- std. dev. of 7 runs, 10 loops each)
Arguments:
- func: A callable for each of the individual thread to execute
- iterable: An iterable of arg that will be passes to the function
- chunksize: An optional positive integer (default 1) that specifies the (approximate) size of chunks, named tasks, that iterable will be chopped into. These tasks and will be assigned to the threads for executing the flow defined in func.
- callback: (Only for map_async) An optional callable (default
None
) that will be called everytime when results have been returned. - error_callback: (Only for map_async) An optional callable (default
None
) that will be called everytime when an uncaught exception has been raised in func.
Returns:
Pros:
- No need to iterate through the list of data as this will be handled by
map
ormap_async
directly chunksize
allows better throughput- Order is preserved, i.e. order of execution is same as the order of output
A major disadvantage for apply
and apply_async
is that we will need to iterative execute pool.apply
or the asynchronous variant for each of the set of args and/or kwargs. Un-optimised iteration is almost always nightmares when it comes to scalability. In this case, pool.map
is our dreamcatcher. All we need to do is pass in the iterable, and viola.
Just like apply
, map
has an asynchronous variant which usually performs better as map
would block the calling thread until results have been returned.
Another very handy argument is chunksize
, which accepts a natural number with a default of 1. Our iterable will be splitted into sections, called tasks, of length roughly the length of chunksize. Each of the thread will be a task, which they would need to finish before they ask for a new task. In essense, chunksize == 1
gives you better flexibility on scheduling each task, while having a chunksize > 1
gives you (in general) better throughput and reduces number of inter-thread communications. A general rule of thumb is to have chunksize=1
if you do not know how long each task would take to finish (e.g. optimisation), and have chunksize=len(iterable) // n_job
if you are expecting the tasks to finish in roughly the same time.
Cons:
map
andmap_async
will convert the argumentiterable
into a list before execution, causing additional memory overheadmap
will block the calling threads until results has been returnedmap
andmap_async
only allow one arg to be passed to the function, meaning that the other arguments (from the second onwards) should have a pre-defined value or a default value- Be careful of memory usage especially when the iterable gets longer
map
and map_async
also come with some disadvantages. One that is particularly annoying is that it only allows one arg. If your function accepts multiple args or kwargs, here are three ways (in general) to walk around it:
1. Just make it happen
Not recommended; not maintainable.
You will need to specify every argument even if they are optional, and may also need to wrap your target function to make it happen.
2. Partial Method
Only if the varying argument is the first argument accepted by the function.
3. Don’t use map
or map_async
Yes, there are better alternatives, like starmap
MAP
%%timeit -n 10
with mp.Pool(processes=n_job) as pool:
results = pool.map(func, data, chunksize=n // n_job)>>> 3.16 s ± 64.3 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)
MAP + TQDM
%%capture capture
%%timeit -n 10
with mp.Pool(processes=n_job) as pool:
results = list(tqdm(pool.map(func, data, chunksize=n // n_job)))print(capture.stdout.strip())>>> 3.19 s +- 92.8 ms per loop (mean +- std. dev. of 7 runs, 10 loops each)
MAP_ASYNC
%%timeit -n 10
with mp.Pool(processes=n_job) as pool:
results = pool.map_async(func, data, chunksize=n // n_job).get()>>> 3.2 s ± 131 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)
Arguments:
- func: A callable for each of the individual thread to execute
- iterable: An iterable of arg that will be passes to the function
- chunksize: An optional positive integer (default 1) that specifies the (approximate) size of chunks, named tasks, that iterable will be chopped into. These tasks and will be assigned to the threads for executing the flow defined in func.
Returns:
Pros:
- No need to iterate through the list of data as this will be handled by
map
ormap_async
directly chunksize
allows better throughput- Argument
iterable
will not be converted tolist
before passing to threads, avoiding stack overflow wheniterable
is a long, non-list iterable - Results will be kept as a generator instead of being converted into a
list
before returning imap_unordered
will return results as soon as it completes without guaranteeing preservation of order in the input iterable
imap
is officially defined as a lazier version of map
, meaning that it would be cast your input iterable into a list
before chopping it into tasks, nor would it cast the results into list before returning. Instead it uses iter()
and .next()
to delegate tasks. Unlike map
which would return a list of results or map_async
which returns a promise of a result, imap
and imap_unordered
return results as soon as the worker threads yield results. Because of this difference, results cannot be casted into a list and instead would need to be in a generator, where users can use next()
to fetch the latest results. This will be particularly if your program do not need to wait for all the results to start any post-processing. On top of that if order of execution is not important to you, imap_unordered
would theoretically be better as it would yield results as soon as the execution is done regardless of the order of input iterable (i.e. if you pass in (1, 2, 3)
, you may get the results (3, 2, 1)
, with the result order completely determined by run time).
Just like map
and map_async
, imap
and its variant also have chunksize
, which accepts a natural number with a default of 1. A general rule of thumb is to have chunksize=1
if you do not know how long each task would take to finish (e.g. optimisation), and have chunksize=len(iterable) // n_job
if you are expecting the tasks to finish in roughly the same time.
Cons:
- You may be working with an incomplete set of results when interacting with the generator
- All the convenience of list would not be available when handling the result
imap
andimap_unordered
only allow one arg to be passed to the function, meaning that the other arguments (from the second onwards) should have a pre-defined value or a default valueimap_unordered
will return results as soon as it completes without guaranteeing preservation of order in the input iterable- Be careful of memory usage especially when the iterable gets longer
In short, don’t use imap
and imap_unordered
if you are not comfortable with generator or if you need to complete result before proceeding, and don’t use imap_unordered
if order is important to you. If you want to convert the generator back to a list, shoo, go use map
and map_async
instead.
To walk around the restriction of one arg, please refer to map
and map_async
.
IMAP
%%timeit -n 10
with mp.Pool(processes=n_job) as pool:
results = list(pool.imap(func, data, chunksize=n // n_job))>>> 3.19 s ± 69.5 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)
IMAP + TQDM
%%capture capture
%%timeit -n 10
with mp.Pool(processes=n_job) as pool:
results = list(tqdm(pool.imap(func, data, chunksize=n // n_job), total=n))print(capture.stdout.strip())>>> 3.16 s +- 93 ms per loop (mean +- std. dev. of 7 runs, 10 loops each)
IMAP_UNORDERED
%%timeit -n 10
with mp.Pool(processes=n_job) as pool:
results = list(pool.imap_unordered(func, data, chunksize=n // n_job))>>> 3.13 s ± 65.4 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)
IMAP_UNORDERED + TQDM
%%capture capture
%%timeit -n 10
with mp.Pool(processes=n_job) as pool:
results = list(tqdm(pool.imap_unordered(func, data, chunksize=n // n_job), total=n))print(capture.stdout.strip())>>> 3.19 s +- 173 ms per loop (mean +- std. dev. of 7 runs, 10 loops each)
Arguments:
- func: A callable for each of the individual thread to execute
- iterable: An iterable of iterable of arg(S) that will be passes to and unpacked by the function
- chunksize: An optional positive integer (default 1) that specifies the (approximate) size of chunks, named tasks, that iterable will be chopped into. These tasks and will be assigned to the threads for executing the flow defined in func.
- callback: (Only for starmap_async) An optional callable (default
None
) that will be called everytime when results have been returned. - error_callback: (Only for starmap_async) An optional callable (default
None
) that will be called everytime when an uncaught exception has been raised in func.
Returns:
Pros:
- Multiple args can be passed to
func
chunksize
allows better throughput- Order is preserved, i.e. order of execution is same as the order of output
starmap
and starmap_async
were introduced in Python 3.3 to address exactly the issue where multiple args cannot be easily passed to the target function. Instead of passing in an iterable of arg, we will need to pass in an iterable of iterable of args i.e. if we pass in [(0, 1), (1, 2)]
into function f
, it would execute f(0, 1)
, and f(1, 2)
. So remember to wrap your args into a tuple (memory does not grow on trees) even if it has only one arg (we don’t judge). For cases where you will have a mix of constant arguments and varying arguments, there are two ways in general to treat it:
1. Partial Method
Only if the varying arguments are the first arguments accepted by the function.
2. Repeat
By defining all the arguments for each of the function call, this would work for varying argument at different position.
Cons:
starmap
andstarmap_async
will convert the argumentiterable
into a list before execution, causing additional memory overheadstarmap
will block the calling thread until results has been returned- Be careful of memory usage especially when the iterable gets longer
starmap
andstarmap_async
do not support kwargs, so you either need to use the Repeat Method above for all the arguments, or there is nothing obvious in multiprocessing library that can help you
STARMAP
%%timeit -n 10
with mp.Pool(processes=n_job) as pool:
results = pool.starmap(func, zip(data, data), chunksize=n // n_job)>>> 3.2 s ± 109 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)
STARMAP_ASYNC
%%timeit -n 10
with mp.Pool(processes=n_job) as pool:
results = pool.starmap_async(func, zip(data, data), chunksize=n // n_job).get()>>> 3.18 s ± 105 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)
From the table above, we can see that when used correctly, multithreading has been able to speed up our simple I/O heavy task. These numbers were obtained on an EC2 instance at its idle state, which means it will most likely be different from what you will get. That being said, apply
and apply_async
have been consistently outperforming the rest based on my experience and experiment, so if you would like to try multithreading, apply
and apply_async
should be your best shout.
In the next part of this series, we are going to look into why multiprocessing
may not be good enough for our daily routine and also walk through some other alternatives that would hopefully help you speed up your code. Let me know if you have learnt something new from this! And please also let me know if there are other neat tricks that I have missed!
Adios!