Je souhaite mettre en œuvre une solution dans laquelle toutes les tâches avec ETA sont stockées dans ma base de données, au lieu d'être planifiées sur le courtier. Je fais cela en raison de contraintes de mémoire sur mes travailleurs et de la nature de la charge de travail que j'ai. Celery planifie les travaux ETA à la mémoire des travailleurs.
Je souhaite y parvenir en utilisant quelque chose comme ci-dessous
@before_task_publish.connect def handle_tasks_with_eta(body, headers, **kwargs): # If the task is getting scheduled for future if headers.get("eta"): # Store the task in database with ETA, args, kwargs and sender # Prevent the task from getting scheduled <------ THIS IS WHERE I NEED HELP else: pass
Lever une exception personnalisée n'a pas aidé. Il se fait prendre à - https: // github.com/celery/celery/blob/bef4c1642586b89ed86ef61b5824cd7cfbd9aa55/celery/utils/dispatch/signal.py#L289
Merci d'avance.
3 Réponses :
Je pense qu'il pourrait être plus facile d'aborder le problème un peu différemment. J'ai travaillé sur un problème similaire et j'ai décidé d'utiliser deux files d'attente, une pour le tri des tâches et une autre pour leur exécution. J'ai ensuite utilisé un ouvrier avec l'option -c1
pour gérer la file d'attente de triage et un autre ouvrier pour faire le vrai travail.
Si vous avez besoin de pouvoir retracer les résultats jusqu'à la demande, vous pouvez spécifier le task_id
lors de l'exécution de la tâche de triage.
@app.task def triage(args, kwargs, eta=None, task_id=None): if eta: # store in database else: other_task.apply_async(args=args, kwargs=kwargs, task_id=task_id) @app.task def other_task(a1, a2, kw1=None): # do stuff
C'est certainement une approche intéressante.
J'ai fini par créer une implémentation interne pour réaliser la planification des tâches dans un magasin de données persistant. J'ai créé une classe, qui implémenterait des méthodes telles que create_or_update_schedule
et send_task_to_queue
. Mon implémentation de TaskScheduleHelper
ressemblait à quelque chose comme ci-dessous.
task_schedule_objects = TaskSchedule.objects.filter(status=TaskScheduleStatus.SCHEDULED, eta__lte=timezone.now()) for task_schedule_object in task_schedule_objects: TaskScheduleHelper.send_task_to_queue(task_schedule_object)
J'ai ensuite ajouté create_or_update_schedule
dans celery.app.task
afin que je puisse l'appeler sur la méthode des tâches.
XXX
Maintenant, je peux planifier une tâche en appelant po_schedule
dessus. Un objet sera créé dans la base de données correspondant à la clé unique de cette tâche a_unique_key
.
@app.task def divide(dividend, divisor): return dividend/divisor divide.po_schedule( (20, 5), key='a_unique_key', eta=timezone.now() + timedelta(hours=1) )
J'ai une tâche cron qui interroge périodiquement Table de base de données TaskSchedule
pour eta
, et appelle la méthode send_task_to_queue
de TaskScheduleHelper
.
from celery.app.task import Task Task.po_schedule = TaskScheduleHelper.create_or_update_schedule
Pour mon implémentation, j'ai utilisé le hook before_task_publish
que vous avez fait dans votre question d'origine. J'ai révoqué la tâche si je l'enregistre dans la base de données. De cette façon, je peux utiliser la fonction native apply_async
.
import uuid from ast import literal_eval from celery.execute import send_task from django.db import models from app.shared.models import TimestampMixin class DelayTask(TimestampMixin): id = models.UUIDField(primary_key=True, default=uuid.uuid4, editable=False) task = models.CharField(max_length=255) argsrepr = models.TextField() kwargsrepr = models.TextField() eta = models.DateTimeField(db_index=True) def send(self): send_task( self.task, args=literal_eval(self.argsrepr), kwargs=literal_eval(self.kwargsrepr), eta=self.eta, )
J'ai alors une PeriodicTask
qui s'exécute toutes les 10 minutes et envoie toutes les tâches qui doit s'exécuter dans les 10 prochaines minutes.
@shared_task def reschedule_tasks(): tasks = DelayTask.objects.filter(eta__lte=timezone.now() + timedelta(minutes=10)) for task in tasks: # Grab all the tasks that are going to run in the next 10 minutes and send them to celery. task.send() task.delete()
Voici mon modèle DelayTask
from datetime import datetime, timedelta from celery.signals import before_task_publish from django.utils import timezone @before_task_publish.connect def task_sent_handler(sender=None, headers=None, body=None, **kwargs): info = headers if "task" in headers else body if headers.get("eta"): eta = datetime.strptime(headers["eta"], '%Y-%m-%dT%H:%M:%S.%f%z') if eta - timezone.now() > timedelta(minutes=10): # Any tasks that are delayed over 10 minutes will be stored in the database until # it's time for them to run. from .models import DelayTask DelayTask.objects.create( task=headers["task"], eta=eta, argsrepr=headers["argsrepr"], kwargsrepr=headers["kwargsrepr"], ) from app.celery import app app.control.revoke(info["id"])
p >
C'est vraiment chouette. :)
Le remplacement de votre méthode
apply_async
ne serait-il pas plus approprié pour ce type de travail?@JRajan - Voulez-vous dire que le singe patche
Task.apply_async
et vérifieeta
danskwargs
?Oui. Quelque chose sur ces lignes. Soit monkeypatch la méthode Task, soit créez une classe CustomTask qui hérite de la classe Task de céleri et appliquez-la au module céleri.
@JRajan - Ouais, je considère comme une solution plausible. Je ne sais pas si l'application de correctifs aux API internes telles que
apply_async
est une bonne idée. Mais oui, c'est l'une des solutions que je cherche.@Abhishek Je cherche à faire la même chose. Où êtes-vous arrivé?
@chrislondon - J'ai ajouté ma mise en œuvre comme réponse. J'espère que cela aide. N'hésitez pas à commenter la réponse pour toute clarification et je serais heureux d'y répondre.