This short article covers a template for setting up a Flask based web app with a background thread. Knowledge of Python and basic knowledge of Flask is required.

Introduction

Let’s use an example to demonstrate how to add a background thread to Flask app.

Imagine we have a Flask app that accepts some HTTP requests. During request processing, we need to do some side tasks that take longer to run. We do not want the user to wait until tasks are finished to receive a response. In addition, we want to keep our app as simple as possible so we cannot use dedicated software like Celery to run our long-running tasks. So, we decide to set up a separate background Python thread alongside our Flask app.

Setting up a background thread

The Flask-based web app will handle POST requests and submit long-running tasks to our background thread. When we terminate the Flask app, the background thread will have an opportunity to clean up its resources before stopping (so-called graceful shutdown).

Our sample project contains two files:

  • background_thread.py, where background thread is implemented;
  • app_factory.py, Flask app.

Let’s first take a look at the background_thread.py file:

import logging
import queue
import threading
import time
from queue import Queue
from abc import abstractmethod, ABC
from typing import Dict

TASKS_QUEUE = Queue()


class BackgroundThread(threading.Thread, ABC):
    def __init__(self):
        super().__init__()
        self._stop_event = threading.Event()

    def stop(self) -> None:
        self._stop_event.set()

    def _stopped(self) -> bool:
        return self._stop_event.is_set()

    @abstractmethod
    def startup(self) -> None:
        """
        Method that is called before the thread starts.
        Initialize all necessary resources here.
        :return: None
        """
        raise NotImplementedError()

    @abstractmethod
    def shutdown(self) -> None:
        """
        Method that is called shortly after stop() method was called.
        Use it to clean up all resources before thread stops.
        :return: None
        """
        raise NotImplementedError()

    @abstractmethod
    def handle(self) -> None:
        """
        Method that should contain business logic of the thread.
        Will be executed in the loop until stop() method is called.
        Must not block for a long time.
        :return: None
        """
        raise NotImplementedError()

    def run(self) -> None:
        """
        This method will be executed in a separate thread
        when start() method is called.
        :return: None
        """
        self.startup()
        while not self._stopped():
            self.handle()
        self.shutdown()


class NotificationThread(BackgroundThread):
    def startup(self) -> None:
        logging.info('NotificationThread started')

    def shutdown(self) -> None:
        logging.info('NotificationThread stopped')

    def handle(self) -> None:
        try:
            task = TASKS_QUEUE.get(block=False)
            # send_notification(task)
            logging.info(f'Notification for {task} was sent.')
        except queue.Empty:
            time.sleep(1)


class BackgroundThreadFactory:
    @staticmethod
    def create(thread_type: str) -> BackgroundThread:
        if thread_type == 'notification':
            return NotificationThread()

        # if thread_type == 'some_other_type':
        #     return SomeOtherThread()

        raise NotImplementedError('Specified thread type is not implemented.')

BackgroundThread inherits from the threading.Thread class to implement startup and shutdown mechanism. It acts as a base abstract class.

All concrete implementations that contain a business logic like NotificationThread inherit from BackgroundThread class.

To communicate to the thread that it needs to stop, threading.Event object is used. When the stop method is called, the internal flag of the self.stop_event object is set to True. In the run method, the while loop checks if stop_event is set and if yes, exits the loop.

BackgroundThreadFactory provides a convenient method to create different kinds of threads. This is useful if you expect the number of thread types to grow in the future. Otherwise, feel free to instantiate the background thread directly.

Now, let’s take a look at the app_factory.py file which provides code for setting up a Flask app:

import os
import logging
import signal
from flask import Flask, request, jsonify

from background_thread import BackgroundThreadFactory, TASKS_QUEUE

logging.basicConfig(level=logging.INFO, force=True)


def create_app():
    app = Flask(__name__)

    @app.route('/task', methods=['POST'])
    def submit_task():
        task = request.json
        logging.info(f'Received task: {task}')

        TASKS_QUEUE.put(task)
        return jsonify({'success': 'OK'})

    notification_thread = BackgroundThreadFactory.create('notification')

    # this condition is needed to prevent creating duplicated thread in Flask debug mode
    if not (app.debug or os.environ.get('FLASK_ENV') == 'development') or os.environ.get('WERKZEUG_RUN_MAIN') == 'true':
        notification_thread.start()

        original_handler = signal.getsignal(signal.SIGINT)

        def sigint_handler(signum, frame):
            notification_thread.stop()

            # wait until thread is finished
            if notification_thread.is_alive():
                notification_thread.join()

            original_handler(signum, frame)

        try:
            signal.signal(signal.SIGINT, sigint_handler)
        except ValueError as e:
            logging.error(f'{e}. Continuing execution...')

    return app

The app and background thread will communicate via the Python queue.Queue object called TASKS_QUEUE. A queue is a simple thread-safe queue with put/get operations. New tasks will be put to the queue by the Flask app, and the background thread will get them from the queue and process.

Warning: In case of an app restart, items in the TASK_QUEUE will be lost. This is a simple implementation. If you have critical data, consider using dedicated software like Celery.

When a program is terminated, the SIGINT signal is received, and, using signal Python module, custom handler, sigint_handler, is executed. Before adding a custom handler, the original handler for the SIGINT signal is saved.

sigint_handler is stopping the background thread, waits until the thread stops, and executes the original handler to properly exit the program.

Warning: Graceful shutdown is not working in debug mode in Flask (when FLASK_ENV=development). After a couple of hours of debugging, I still cannot figure out why exactly. If I will ever do, expect updates to this article.

Warning: The background thread will not be started when using gunicorn with the environment variable FLASK_ENV=development.

Thank you for reading.