Django: gérer ses tâches asynchrones avec Celery

Cet article est découpé en plusieurs parties :

Tâches asynchrones, késako ?

Lorsque l’on construit une application Web, on a très vite besoin de gérer des tâches asynchrones, c’est-à-dire des tâches qui sortent du cycle habituel HTTP GET côté client / réponse serveur. Voici quelques exemples de tâches asynchrones :

  • traitement d’une image suite à son upload par l’utilisateur ;
  • envoi d’une notification par email ;
  • import journalier d’un catalogue de produits pour alimenter un webshop.

Celery, c’est le bien

Dans le monde Python / Django, c’est Celery qui est le plus utilisé pour gérer ce genre de tâches. Instagram l’utilise par exemple pour alimenter les feeds de ses utilisateurs.

On a donc un ensemble de tâches à traiter, parfois en volume conséquent. L’objectif va être de les répartir entre de multiples Workers, tournant éventuellement sur des machines différentes. Les workers surveillent constamment la Task Queue pour obtenir de nouvelles tâches à traiter.

Concrêtement, dans Celery, la communication entre le client qui crée les tâches et les workers se fait par le biais d’un Message Broker, par exemple Redis (“key-value in-memory store”).

Installation de Redis

On va donc commencer par installer Redis. Sous Debian et dérivés :

$ apt-get install redis-server

On va modifier le fichier de configuration /etc/redis/redis.conf pour changer le port par défaut et rajouter une authentification par mot de passe :

redis.conf download
bind 0.0.0.0
port 6234
requirepass cemotdepasseestincroyablementlongetd1ff1c1l3ouai$
$ sudo service redis-server restart
$ sudo tail -f /var/log/redis/redis-server.log

Si tout s’est bien passé, les logs devraient afficher le logo de Redis en Ascii-Art suivi d’un réjouissant “Server started”.

Installation et configuration de Celery

$ pip install celery celery-django

Le paquet celery-django facilite l’intégration de Celery dans un projet Django. Il va notamment rajouter des commandes utiles à manage.py pour manipuler Celery.

Côté fichiers, on aura :

  • tous les paramètres de Celery dans le settings.py du projet Django :
settings.py download
##
# CELERY
## 

CELERYD_HIJACK_ROOT_LOGGER = False

# http://celery.readthedocs.org/en/latest/configuration.html#celery-redirect-stdouts-level
CELERY_REDIRECT_STDOUTS = True # par défaut
CELERY_REDIRECT_STDOUTS_LEVEL = 'DEBUG'

BROKER_URL = 'redis://:cemotdepasseestincroyablementlongetd1ff1c1l3ouai$@127.0.0.1:6234'
CELERY_RESULT_BACKEND = 'redis://:cemotdepasseestincroyablementlongetd1ff1c1l3ouai$@127.0.0.1:6234'

CELERY_TASK_SERIALIZER = 'json'
CELERY_ACCEPT_CONTENT = ['json']  # Ignore other content
CELERY_RESULT_SERIALIZER = 'json'
CELERY_TIMEZONE = 'Europe/Paris'
CELERY_ENABLE_UTC = True

# pour tracker aussi les autres états custom
CELERY_TRACK_STARTED = True

CELERYD_POOL_RESTARTS = True
  • l’instanciation de notre client Celery dans celery.py :
celery.py download
from __future__ import absolute_import

import os

from celery import Celery

from django.conf import settings

# set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'proj.settings')

app = Celery('proj')

# Using a string here means the worker will not have to
# pickle the object when using Windows.
app.config_from_object('django.conf:settings')
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)
  • et la définition des tâches dans tasks.py :
tasks.py download
from __future__ import absolute_import

from celery import current_task, Task
from celery.utils.log import get_task_logger
logger = get_task_logger('celery')

from proj.celery import app

@app.task()
def test_task():
    logger.warning("celerrrrrrrrrry log")
    print "test"
    return True

Notre première tâche

Notre première tâche test_task utilise le mécanisme de logging de python, affiche “test” et renvoit True. Pour l’essayer, on va lancer un worker Celery :

$ python manage.py celery -A proj worker --autoreload

Le paramètre -A définit l’app concernée.

Le paramètre --autoreload va indiquer à Celery de surveiller le fichier tasks.py : en cas de modification, il sera automatiquement rechargé. Cela évite de relancer Celery à chaque fois que l’on ajoute ou modifie une tâche, très pratique en cours de développement. C’est par contre totalement déconseillé en production.

Une fois le worker lancé, on peut lui donner une tâche à traiter :

$ python manage.py shell

from proj.tasks import *
task_name.delay()

task_name.delay() est un raccourci pour task_name.apply_async() qui offre plus de possibilités via ses arguments. On peut par par exemple spécifier le nom de la queue dans laquelle insérer la tâche : task_name.apply_async(queue="urgent").

Ces méthodes renvoient toutes les deux un objet AsyncResult qui permet de suivre le traitement de la tâche.

r = task_name.delay()
print r.id

Si l’on a besoin de suivre l’avancement d’une tâche dont on ne connaît que l’id (par les logs par exemple), on peut à tout instant lancer un shell et interroger Celery :

$ python manage.ph shell

from celery.result import AsyncResult
r = AsyncResult(id)

Définir plusieurs queues

Pour l’instant, on n’a qu’un worker et qu’une seule queue : “celery”, la queue par défaut (modifiable par le paramêtre CELERY_DEFAULT_QUEUE).

Il y a plusieurs manières d’en créer d’autres. Par défaut, toute queue non définie explicitement sera automatiquement créée :

CELERY_CREATE_MISSING_QUEUES = True

Il suffit donc de préciser dans le décorateur de la tâche la queue à laquelle on souhaite l’associer :

@app.task(queue='test')
def test_task():
    logger.warning("celerrrrrrrrrry log")
    print "test"
    return True

On va alors utiliser l’argument -Q en ligne de commande pour spécifier à un worker la ou les queues qu’il doit écouter :

$ python manage.py celery -A proj worker --autoreload -Q test

Vous l’aurez compris, les queues vont permettre de répartir la charge et de prioritiser certaines tâches. Si l’on a par exemple des tâches très longues et/ou très coûteuses en CPU, on les mettra dans une queue à part traitée par des workers dédiés.

Monitoring ses tâches

Flower est une micro application web qui permet de monitorer facilement ses tâches : https://github.com/mher/flower

Elle fonctionne “out of the box” :

$ pip install flower
$ python manage.py celery flower --basic_auth=admin:admin --address=0.0.0.0 -A proj

En pointant votre navigateur à l’url http://localhost:5555/, vous aurez une vue en temps réel des évènements Celery. Tout l’historique étant stocké en mémoire par Flower, c’est une application que l’on n’utilisera qu’en phase de développement.

Lier des tâches entre elles

Celery permet d’implémenter de véritables workflows. Un exemple pour commencer :

(
download_profile_picture.si(account.profile_image_url, account.get_profile_picture_path())
|
analyze_profile_picture.si(account.user_id)
)
.delay()

Dans cet exemple, on crée dynamiquement une nouvelle tâche en chaînant deux tâches grâce à l’opérateur |. La séquentialité de ces deux tâches est garantie par Celery : download_profile_picture s’exécutera toujours avant analyze_profile_picture. Par contre, les deux tâches ne s’exécuteront pas forcément sur le même worker. Il faut donc s’assurer que tous les workers aient bien accès aux même ressources. En l’occurence, les photos de profil sont stockées sur un disque partagé.

La méthode .si() contrairement à .s() permet de créer une sous-tâche immutable, c’est-à-dire qu’elle ne prendra pas le retour de la tâche précédente en argument.

Supprimer des tâches en attente

Si l’on se rend compte d’un bug dans le code, on peut avoir besoin de supprimer des tâches en attente.

On va commencer par lister les queues actives :

$ python manage.py celery -A proj inspect active_queues

Pour vider toutes les queues :

$ python manage.py celery -A proj purge

Pour vider complètement le contenu d’une queue donnée :

$ python manage.py celery -A proj amqp queue.purge nom_de_la_queue

Attention toutefois, cela ne suffit pas. Par défaut, un worker se réserve des tâches :

CELERYD_PREFETCH_MULTIPLIER = 4

Pour lister ces tâches réservées :

$ python manage.py celery -A proj inspect reserved

Pour qu’elles ne soient pas traitées, il va falloir les révoquer une-à-une à la main :

$ python manage.ph shell

from celery.result import AsyncResult
r = AsyncResult(id)
r.revoke()

Conclusion

Celery est une bibliothèque puissante capable d’assumer la distribution et le traitement de millions de tâches par jour. Pour des projets de moindre envergure, il peut être intéressant de se pencher sur des alternatives plus légères telles que Huye, autoproclamée “little task queue”.

Dans un prochain article, je tâcherai d’approfondir l’utilisation de Celery dans le cas des tâches planifiées (cron-like) et des tâches longues. J’aborderai également la problématique de la mise en production et de la mise à jour des noeuds. En attendant, tout feedback est le bienvenu !