Skip to content

Celery

Overview

Celery is a (BSD-licensed) open source, simple and flexible distributed task queue for asynchronous processing of messages. With Celery one can define units of work called "tasks" and dispatch them for execution, in a distributed way if desired. Celery is a Python package and as such is easily integrated in any Python project.

Typical use cases might be: a queue of uploaded images to resize in the background, long-running tasks initiated by a Web application's API, a batch of emails scheduled for sending, ...

Celery is composed of two parts: on one side, one or more clients define the tasks to be run and enqueue/schedule them for execution; on the other side, one or more workers pick up these tasks, execute them and optionally store the resulting values. Communication between these two parts happens through a message bus (such as RabbitMQ) acting as broker, while the return value of a task is made available back to the caller through a backend (de/serialization is transparently handled by the Celery infrastructure).

Celery supports several backends for storing and exposing task results. Among the supported backends are Cassandra and (starting with v5.3) Astra DB.

In the following we assume familiarity with the celeryconfig configuration object for Celery and with the usage of Cassandra as backend. See the Celery documentation for more details:

Prerequisites

Keep the token information and the bundle file location ready: these will be soon provided in the Celery configuration.

Installation and Setup

Here a minimal Celery setup that makes use of the Astra DB backend is described start-to-end.

A task will be defined and executed through Celery: afterwards, its return value will be retrieved on the client side. For this example to work, a message bus is needed - here, in line with a quickstart on Celery's documentation, a dockerized RabbitMQ is used.

1. Start a message broker

Make sure you have a RabbitMQ instance running in Docker with docker run -d -p 5672:5672 rabbitmq (it might take a while for the image to be downloaded and complete startup).

2. Define a task

Create a tasks.py module with the definition of a task, to be later executed through Celery:

from celery import Celery

app = Celery('tasks')
app.config_from_object('celeryconfig')

@app.task
def sortWords(text, capitalize):
    # Rearrange the text so that words are in alphabetical order.
    words = text.split(' ')
    sortedWords = sorted(words, key=str.upper)
    return ' '.join([
        w if not capitalize else w.upper()
        for w in sortedWords
    ])

3. Configure Celery

Create a module celeryconfig.py in the same directory, providing (among other things) the broker and backend configuration:

broker_url = 'pyamqp://guest@localhost//'

broker_connection_retry_on_startup = True

task_serializer = 'json'
result_serializer = 'json'
accept_content = ['json']
enable_utc = True

result_backend = 'cassandra://'
cassandra_keyspace = 'celeryks'                       # REPLACE_ME
cassandra_table = 'celery_tasks'                      # REPLACE_ME
cassandra_read_consistency = 'quorum'
cassandra_write_consistency = 'quorum'
cassandra_auth_provider = 'PlainTextAuthProvider'
cassandra_auth_kwargs = {
  'username': 'client-id-from-astra-token',           # REPLACE_ME
  'password': 'client-secret-from-astra-token',       # REPLACE_ME
}
cassandra_secure_bundle_path = '/path/to/secure-connect-database.zip'   # REPLACE_ME

In the above, take care of inserting your values for:

  • the keyspace name you created earlier in Astra DB;
  • the table name you want Celery to store results in (no need to create it beforehand);
  • the Client ID and Client Secret generated in your Astra DB token earlier (resp. as username and password in cassandra_auth_kwargs);
  • the path to the Secure Connect Bundle you downloaded earlier.

4. Start the worker

Start a Celery worker with:

celery -A tasks worker --loglevel=INFO

5. Run and check a task

In a different shell, open a Python REPL and type the following commands to run a couple of tasks and retrieve their result:

from tasks import sortWords
sorted1 = sortWords.delay('storage yay my DB is powerful results Astra', False)
sorted1.ready()
# Returns:     True
# (as soon as the function completes, which here is almost immediately)

sorted1.get()
# Returns:     'Astra DB is my powerful results storage yay'

sorted2 = sortWords.delay('In the land of another wizards day', capitalize=True)
sorted2.get()
# Returns:     'ANOTHER DAY IN LAND OF THE WIZARDS'

6. (Optional) Look at the database

Check the corresponding data stored on Astra DB. Navigate to the CQL Console for the database you created and enter the following commands:

USE celeryks;               // <== enter your keyspace name here

DESCRIBE TABLES;            // the output, e.g. "celery_tasks", lists the tables

SELECT * FROM celery_tasks; // <== enter your table name here

Additional configuration

Celery uses the DataStax Python driver for Cassandra; hence, the choice of connection parameters is that for the generic driver-based usage of Cassandra in Python.

In particular, one may want to specify additional parameters through the celeryconfig such as protocol level, load-balancing policy and so on. Refer to the "Additional configuration" section in the Celery documentation for a more comprehensive setup.


Last update: 2023-08-01