FastAPI is a web framework for building APIs in Python. It is built on top of Starlette for the web parts and Pydantic for the data parts. FastAPI uses type hints to validate, serialize, and deserialize data, and it can generate interactive API documentation automatically. It is known for its high performance, thanks to features like asynchronous programming support and the use of modern Python standards.
Celery is a distributed task queue system. It allows you to offload tasks from your main application to a separate worker process. Celery uses message queues (such as RabbitMQ, Redis) to communicate between the main application and the worker processes. A Celery application consists of a producer (the main application that enqueues tasks), a broker (the message queue), and a consumer (the worker process that executes the tasks).
Integrating Celery with FastAPI helps in decoupling time - consuming tasks from the main request - response cycle of the FastAPI application. This ensures that the application can respond to user requests quickly, even when there are long - running tasks to be performed. It also improves the scalability of the application as you can easily add more worker processes to handle the task load.
First, create a virtual environment and activate it:
python3 -m venv myenv
source myenv/bin/activate
Install the necessary packages:
pip install fastapi uvicorn celery redis
Here, we are using Redis as the message broker and result backend for Celery.
Create a file named celery_app.py
:
from celery import Celery
# Initialize Celery
celery_app = Celery('tasks', broker='redis://localhost:6379/0', backend='redis://localhost:6379/0')
@celery_app.task
def add(x, y):
return x + y
In this code, we initialize a Celery application with Redis as the broker and result backend. We also define a simple task add
that adds two numbers.
Create a file named main.py
:
from fastapi import FastAPI
from celery_app import add
app = FastAPI()
@app.get("/add")
async def perform_addition(x: int, y: int):
task = add.delay(x, y)
return {"task_id": task.id}
@app.get("/result/{task_id}")
async def get_result(task_id: str):
result = add.AsyncResult(task_id)
if result.ready():
return {"status": "completed", "result": result.result}
else:
return {"status": "pending"}
In the /add
endpoint, we enqueue the add
task using the delay
method and return the task ID. The /result/{task_id}
endpoint allows us to check the status of the task and get the result if it is ready.
If you haven’t already, start the Redis server:
redis - server
celery -A celery_app worker --loglevel=info
uvicorn main:app --reload
Now you can access the FastAPI endpoints at http://localhost:8000
.
In Celery tasks, it’s important to handle errors properly. You can use try - except blocks in your tasks to catch and log errors. For example:
@celery_app.task
def add(x, y):
try:
return x + y
except Exception as e:
# Log the error
import logging
logger = logging.getLogger(__name__)
logger.error(f"Error in add task: {e}")
return None
You can use tools like Flower to monitor Celery tasks. Install Flower:
pip install flower
Start Flower:
celery -A celery_app flower
Flower provides a web - based interface to monitor Celery tasks, view task status, and manage workers.
If a task fails due to a transient error (e.g., network issue), you can configure Celery to retry the task. You can also implement an exponential backoff strategy to increase the time between retries.
@celery_app.task(bind=True, default_retry_delay=300, max_retries=5)
def add(self, x, y):
try:
return x + y
except Exception as e:
self.retry(exc=e, countdown=2 ** self.request.retries)
If you don’t need the result of a task, don’t use a result backend. Storing results in a result backend can consume a significant amount of memory, especially for long - running tasks. If you do need the result, make sure to clean up old results regularly.
Integrating Celery with FastAPI is a powerful way to handle time - consuming tasks in your web application. By offloading these tasks to a task queue, you can ensure that your FastAPI application remains responsive and scalable. We have covered the fundamental concepts, installation, setup, and integration process. Additionally, we have discussed common and best practices to help you build a robust and efficient application.