0
votes

Intercepter une tâche de céleri dans avant_task_publish et l'empêcher de faire publier

Je souhaite mettre en œuvre une solution dans laquelle toutes les tâches avec ETA sont stockées dans ma base de données, au lieu d'être planifiées sur le courtier. Je fais cela en raison de contraintes de mémoire sur mes travailleurs et de la nature de la charge de travail que j'ai. Celery planifie les travaux ETA à la mémoire des travailleurs.

Je souhaite y parvenir en utilisant quelque chose comme ci-dessous

@before_task_publish.connect
def handle_tasks_with_eta(body, headers, **kwargs):
    # If the task is getting scheduled for future
    if headers.get("eta"):
        # Store the task in database with ETA, args, kwargs and sender
        # Prevent the task from getting scheduled <------ THIS IS WHERE I NEED HELP
    else:
        pass

Lever une exception personnalisée n'a pas aidé. Il se fait prendre à - https: // github.com/celery/celery/blob/bef4c1642586b89ed86ef61b5824cd7cfbd9aa55/celery/utils/dispatch/signal.py#L289

Merci d'avance.


6 commentaires

Le remplacement de votre méthode apply_async ne serait-il pas plus approprié pour ce type de travail?


@JRajan - Voulez-vous dire que le singe patche Task.apply_async et vérifie eta dans kwargs ?


Oui. Quelque chose sur ces lignes. Soit monkeypatch la méthode Task, soit créez une classe CustomTask qui hérite de la classe Task de céleri et appliquez-la au module céleri.


@JRajan - Ouais, je considère comme une solution plausible. Je ne sais pas si l'application de correctifs aux API internes telles que apply_async est une bonne idée. Mais oui, c'est l'une des solutions que je cherche.


@Abhishek Je cherche à faire la même chose. Où êtes-vous arrivé?


@chrislondon - J'ai ajouté ma mise en œuvre comme réponse. J'espère que cela aide. N'hésitez pas à commenter la réponse pour toute clarification et je serais heureux d'y répondre.


3 Réponses :


2
votes

Je pense qu'il pourrait être plus facile d'aborder le problème un peu différemment. J'ai travaillé sur un problème similaire et j'ai décidé d'utiliser deux files d'attente, une pour le tri des tâches et une autre pour leur exécution. J'ai ensuite utilisé un ouvrier avec l'option -c1 pour gérer la file d'attente de triage et un autre ouvrier pour faire le vrai travail.

Si vous avez besoin de pouvoir retracer les résultats jusqu'à la demande, vous pouvez spécifier le task_id lors de l'exécution de la tâche de triage.

@app.task
def triage(args, kwargs, eta=None, task_id=None):
    if eta:
        # store in database
    else:
        other_task.apply_async(args=args, kwargs=kwargs, task_id=task_id)

@app.task
def other_task(a1, a2, kw1=None):
    # do stuff


1 commentaires

C'est certainement une approche intéressante.



1
votes

J'ai fini par créer une implémentation interne pour réaliser la planification des tâches dans un magasin de données persistant. J'ai créé une classe, qui implémenterait des méthodes telles que create_or_update_schedule et send_task_to_queue . Mon implémentation de TaskScheduleHelper ressemblait à quelque chose comme ci-dessous.

task_schedule_objects = TaskSchedule.objects.filter(status=TaskScheduleStatus.SCHEDULED,
                                                            eta__lte=timezone.now())
for task_schedule_object in task_schedule_objects:
    TaskScheduleHelper.send_task_to_queue(task_schedule_object)

J'ai ensuite ajouté create_or_update_schedule dans celery.app.task afin que je puisse l'appeler sur la méthode des tâches.

XXX

Maintenant, je peux planifier une tâche en appelant po_schedule dessus. Un objet sera créé dans la base de données correspondant à la clé unique de cette tâche a_unique_key.

@app.task
def divide(dividend, divisor):
    return dividend/divisor

divide.po_schedule(
    (20, 5),
    key='a_unique_key',
    eta=timezone.now() + timedelta(hours=1)
)

J'ai une tâche cron qui interroge périodiquement Table de base de données TaskSchedule pour eta , et appelle la méthode send_task_to_queue de TaskScheduleHelper.

from celery.app.task import Task
Task.po_schedule = TaskScheduleHelper.create_or_update_schedule


0 commentaires

1
votes

Pour mon implémentation, j'ai utilisé le hook before_task_publish que vous avez fait dans votre question d'origine. J'ai révoqué la tâche si je l'enregistre dans la base de données. De cette façon, je peux utiliser la fonction native apply_async .

import uuid
from ast import literal_eval

from celery.execute import send_task
from django.db import models

from app.shared.models import TimestampMixin


class DelayTask(TimestampMixin):
    id = models.UUIDField(primary_key=True, default=uuid.uuid4, editable=False)
    task = models.CharField(max_length=255)
    argsrepr = models.TextField()
    kwargsrepr = models.TextField()
    eta = models.DateTimeField(db_index=True)

    def send(self):
        send_task(
            self.task,
            args=literal_eval(self.argsrepr),
            kwargs=literal_eval(self.kwargsrepr),
            eta=self.eta,
        )

J'ai alors une PeriodicTask qui s'exécute toutes les 10 minutes et envoie toutes les tâches qui doit s'exécuter dans les 10 prochaines minutes.

@shared_task
def reschedule_tasks():
    tasks = DelayTask.objects.filter(eta__lte=timezone.now() + timedelta(minutes=10))

    for task in tasks:
        # Grab all the tasks that are going to run in the next 10 minutes and send them to celery.
        task.send()
        task.delete()

Voici mon modèle DelayTask

from datetime import datetime, timedelta

from celery.signals import before_task_publish
from django.utils import timezone


@before_task_publish.connect
def task_sent_handler(sender=None, headers=None, body=None, **kwargs):
    info = headers if "task" in headers else body
    if headers.get("eta"):
        eta = datetime.strptime(headers["eta"], '%Y-%m-%dT%H:%M:%S.%f%z')
        if eta - timezone.now() > timedelta(minutes=10):
            # Any tasks that are delayed over 10 minutes will be stored in the database until
            # it's time for them to run.
            from .models import DelayTask
            DelayTask.objects.create(
                task=headers["task"],
                eta=eta,
                argsrepr=headers["argsrepr"],
                kwargsrepr=headers["kwargsrepr"],
            )

            from app.celery import app
            app.control.revoke(info["id"])

p >


1 commentaires

C'est vraiment chouette. :)