Monday, January 9, 2012

Shutting worker threads down gracefully after a signal in Python

Recently I wrote about a bug in Python around handling of signals in multi-threaded programs. The upstream Python developers suggested that in order to properly handle signals in multi-threaded programs across several operating systems, developers should use some of the newer APIs in the signal library to make sure we get the behavior we want.

A common scenario for threads and signals is a daemon with long running worker threads that need to exit gracefully when SIGTERM or some other signal is received. The basic idea is:
  • Set up signal handlers
  • Spawn worker threads
  • Wait for SIGINT, and wake up immediately when it is received
  • Shutdown workers gracefully

The challenge is to write the simplest code that will do this portably, (at least) on FreeBSD and Linux, and that requires absolutely no CPU in the main thread while waiting (we want to be able to sleep completely, not have to wake up periodically to check for signals).

This is the best solution I've come up with (skip down to the bottom, it's where the interesting stuff is):

import errno
import fcntl
import os
import signal
import threading

NUM_THREADS = 2
_shutdown = False


class Worker(threading.Thread):

    def __init__(self, *args, **kwargs):
        threading.Thread.__init__(self, *args, **kwargs)
        self._stop_event = threading.Event()

    def run(self):
        # Do something.
        while not self._stop_event.isSet():
            print 'hi from %s' % (self.getName(),)
            self._stop_event.wait(10)

    def shutdown(self):
        self._stop_event.set()
        print 'shutdown %s' % (self.getName(),)
        self.join()


def sig_handler(signum, frame):
    print 'handled'
    global _shutdown
    _shutdown = True


if __name__ == '__main__':

    # Set up signal handling.
    pipe_r, pipe_w = os.pipe()
    flags = fcntl.fcntl(pipe_w, fcntl.F_GETFL, 0)
    flags |= os.O_NONBLOCK
    flags = fcntl.fcntl(pipe_w, fcntl.F_SETFL, flags)
    signal.set_wakeup_fd(pipe_w)

    signal.signal(signal.SIGTERM, sig_handler)

    # Start worker threads.
    workers = [Worker() for i in xrange(NUM_THREADS)]
    for worker in workers:
        worker.start()

    # Sleep until woken by a signal.
    while not _shutdown:
        while True:
            try:
                os.read(pipe_r, 1)
                break
            except OSError, e:
                if e.errno != errno.EINTR:
                    raise

    # Shutdown work threads gracefully.
    for worker in workers:
        worker.shutdown()

Basically, we have to use set_wakeup_fd() to ensure that we can reliably wake up when a signal is delivered. The obvious function to use here (signal.pause()) doesn't work

No comments:

Post a Comment

Using Cloudflare Access to Protect Home Assistant

In my last post, I mentioned how I've been using Cloudflare Access to secure my Home Assistant server. If you're not familiar wit...