Introduction

Python’s ThreadPoolExecutor’s context manager is a really neat way to run a bunch of (I/O) work in a thread pool, and then clean everything up when the context is exited.

Something like this:

1
2
3
with ThreadPoolExecutor() as executor:
    for i in range(N):
        executor.submit(my_function, arg1, arg2)

Where did my tasks go, attempt 1

Recently at work however, I had to debug a case where it appeared that it would discard tasks that had been submitted later.

This was unexpected, because the documentation stated that the context manager would by default do shutdown(wait=True) and hence wait for all work to finish.

At a first trace through the TPE code, it seemed that the thread _worker would read the executor’s shutdown flag, and then just finish its current task.

However, I had missed a continue in the _worker’s while loop, and hence the fact that at shutdown it would first empty the queue until reaching a None task, and then exit.

Long story short: TPE does exactly what you would expect. At context manager exit, by default it lets all threads empty out the queue, and then completes context exit.

Where did my tasks go, attempt 2

In our case, it turned out that the problem was the recursive task that would keep on submitting itself as it traversed a possibly very deep hierarchical structure.

When the outer level TPE’s context manager exits, it waits for all tasks that were submitted at that level, and then pushes the None task which will lead to thread workers terminating, but child tasks submitted by those tasks as they finish obviously come after the None values, and hence get discarded.

I’ve added a self-contained demonstrator below.

The upshot of all of this is that if your TPE tasks are also going to be submitting tasks, ensure that you keep the context open long enough so that everything can be successfully submitted before it closes.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
from concurrent.futures import ThreadPoolExecutor, wait
from threading import current_thread
import time

def some_task(id, executor):
    time.sleep(2)
    if id < 10:
        # each of the 4 initial tasks will submit a "child" task
        # with id 10,20,30,40
        return executor.submit(some_task, id*10, executor)
    else:
        return None

    print(f"DONE with task {id} on thread {current_thread()}\n")
    

with ThreadPoolExecutor(max_workers=2) as executor:
    futures = []
    for tid in range(4):
        f = executor.submit(some_task, tid+1, executor)
        if f is not None:
            futures.append(f)
        
    # when we exit TPE context, tpe.shutdown() will be called
    # this sets executor.shutdown and pushes None onto the queue -- 
    # worker threads will finish all tasks that were submitted, 
    # until the None, and then all stop.
    # "Child" tasks that are added inside of
    # some_task will be discarded unless we keep the context
    # open for long enough

    # one way of doing that is by waiting for the first level of futures
    # with this uncommented, you should see the child tasks complete
    # after the "exiting context" output below. Comment out to see
    # them being discarded.
    wait(futures)
    
    print("=====> submitted all tasks, exiting context!")
    

print("thread pool closed with wait=True")