# Task definition and call
Refer to the Celery documentation. (opens new window)
# Task definition
In general we like to keep our task implementation separated from the task definition. Thanks to dataclasses, we reduce the need to duplicate parameter changes, in the job call, in the task definition and in the separated code. See below.
Our use cases and in general our code does not need to know about the task definition. We could have retry task configuration, manual retry and so on, it should be kept inside tasks definition. Separation of concerns 😉.
# tasks.py
@shared_task()
def camera_motion_detected(data: dict) -> None:
usecases.camera_motion_detected(...)
# usecases/camera_motion.py
def camera_motion_detected(...):
print('do something amazing!')
# Class methods as celery tasks
As it seems that we cannot use class methods for tasks, we simply create a function task that calls a class method if needed. See a discussion about this topic on stackoverflow - using class methods as celery tasks (opens new window).
It is not an issue because it is not our recommanded way of working as said just before.
# Passing Data to jobs
Jobs are executed in another docker container, another process, so you can’t share memory.
You somehow have to get data between the core
application and the worker processes.
# Serialized dataclasses
If you have 2-3 parameters to pass around it's ok, but if you have more parameters to pass around, I strongly suggest you to create a python dataclass
, serialize it, send it to the task, and process it. It will makes your life easier and your code way more eleguant and less error prone.
# a.py, will create a job
@dataclass
class CameraTopic:
type: str
service: str
device_id: str
def processing():
in_data = CameraTopic(...)
tasks.camera_motion_detected.apply_async(args=[dataclasses.asdict(in_data)])
# tasks.py
@shared_task()
def camera_motion_detected(data: dict) -> None:
in_data = CameraTopic(**data)
camera_motion.camera_motion_detected(in_data)
TIP
In general we like to keep our task implementation separated from the task definition. Thanks to this, we reduce the need to duplicate parameter changes, in the job call, in the task definition and in the separated code.
If python dataclasses don't covers your needs, you can use the attrs (opens new window) library. Same principles apply.
# a.py, will create a job
@attr.s
class CameraTopic(object):
...
def processing():
in_data = CameraTopic(...)
tasks.camera_motion_detected.apply_async(args=[attr.asdict(in_data)])
# tasks.py
@shared_task()
def camera_motion_detected(data: dict) -> None:
in_data = CameraTopic(**data)
camera_motion.camera_motion_detected(in_data)
# Getting a Django Model Between Processes
But, we want to pass our models around! How can we do this?
The usual approach is to define tasks to take the model ID as a parameter, and just pass that. If I need a Foo, I’ll have a parameter foo_id that I pass to the task. Then in the task, I’ll access the DB and pull that Foo instance out.
WARNING
You could end up with race condition. Don't forget to use database locks if necessary.
In general, I strongly discourage to pass your model data through your jobs because you would need to serialize the data (with all the possible issues with relationships and so on), and you could have a race condition: the data is modified before the task is executed.
Still, if you need this (you will have to argue in the PR why), you could use pickle or Django's built-in serialization/deserialization:
from django.core.serializers import serialize, deserialize
# Note that this requires an iterable, so you have to wrap your
# instance in a list:
json_version = serialize('json', [some_class_instance])
# Now you have a JSON representation of the instance that knows its
# own type.
# Put it on the wire here, passing it to a task or whatever.
# Then in the task:
deserialized_objects = deserialize('json', json_version)
# This will produce a list of DeserializedObject instances that wrap
# the actual model, which will be available as
# deserialized_objects[i].object
# Dynamic Model
By Dynamic model I mean, you don't know which model you will receive in the task definition. For example, it could be an AlarmStatus
or SomethingStatus
. The processing is doable on both of these models.
- create 2 differents tasks. Duplicate code leads to bad design. Imagine if you grow with 3-4 models, more...
- add multiple parameter to the task:
alarm_status_pk
,something_status_pk
... That leads to some boilerplate: function signature grows,if
/elif
/else
also. - tell the task what model you need.
from django.apps import apps
@task()
def some_task(model_name, model_id):
Model = apps.get_model('django_app_name.{}'.format(model_name))
obj = Model.objects.get(pk=model_id)
⚠️ you have to make sure the task is applicable to the model, otherwise you will get some weird behavior and/or crashes.
← Task Queue Telegram →