3
votes

Utilisation d'un ouvrier Celery pour interagir avec une base de données SQLAlchemy, y compris connaître l'utilisateur à partir de la demande

J'ai fait de nombreuses recherches à ce sujet, notamment en essayant des réponses comme ceci . Il semble que Celery n'ait pas accès au contexte de mon application Flask.

Je connais parfaitement mon objet céleri, ce qui décorera mes tâches, doit avoir accès au contexte de mon application Flask. Et je pense que cela devrait, car j'ai suivi ce guide pour créer mon objet de céleri. Je ne sais pas si la confusion réside quelque part dans le fait que j'utilise Flask-HTTPAuth.

Voici une partie de ce que j'ai.

@app.route("/item_loop")
@auth.login_required
def item_loop():
    result = loop.delay()
    return "It's running."

En cours d'exécution cette tâche à l'aide de Flask est cependant interdite. J'essaye de démarrer cette fonction en frappant le serveur (alors qu'il est autorisé!).

def make_celery(app):
    celery = Celery(app.import_name, backend=app.config["CELERY_RESULT_BACKEND"], broker=app.config["CELERY_BROKER_URL"])
    celery.conf.update(app.config)
    TaskBase = celery.Task
    class ContextTask(TaskBase):
        abstract = True
        def __call__(self, *args, **kwargs):
            with app.app_context():
                return TaskBase.__call__(self, *args, **kwargs)
    celery.Task = ContextTask
    return celery

app = Flask(__name__)
auth = HTTPBasicAuth()
app.config["SQLALCHEMY_DATABASE_URI"] = "sqlite:///flask_app.db"
app.config["CELERY_BROKER_URL"] = "redis://localhost:6379"
app.config["CELERY_RESULT_BACKEND"] = "redis://localhost:6379"
celery = make_celery(app)
db = SQLAlchemy(app)

@celery.task(bind=True, name="flask_app.item_loop")
def loop(self):
    items = g.user.items
    for item in items:
        print(item)

Mais l'ouvrier Celery me dit la tâche soulevée inattendue: AttributeError ("'_ AppCtxGlobals 'objet n'a pas d'attribut' utilisateur '",) , ce qui, je pense, impliquerait, comme mentionné, mon objet céleri n'a pas le contexte d'application, même si j'ai utilisé le modèle d'usine recommandé.

p>


0 commentaires

3 Réponses :


2
votes

Pour récupérer l'utilisateur à partir de l'exécution d'une tâche, vous pouvez essayer de transmettre l'objet User (si le céleri peut le décaper), ou transmettre suffisamment d'informations pour que la tâche puisse récupérer l'objet User (par exemple, l'identifiant de l'utilisateur). Dans ce dernier cas, votre tâche ressemblerait à quelque chose comme

result = loop.delay(current_user.id)

et vous la lanceriez (en supposant que vous utilisez flask_login) via

@celery.task(bind=True, name="flask_app.item_loop")
def loop(self, user_id):
    user = User.query.get(user_id)
    items = user.items
    for item in items:
        print(item)


2 commentaires

Bonjour, cela a du sens et fonctionne, mais comment pourrais-je réécrire dans la base de données SQLAlchemy? db.session.commit () ne fonctionnera pas: le nom 'db' n'est pas défini .


Ensuite, vous aurez besoin d'une base de données qui a les mêmes paramètres de base de données que l'application Flask. Voir stackoverflow.com/questions/12044776/... pour une approche.



2
votes

Comme l'a noté @Dave W. Smith, plutôt que de s'appuyer sur g pour récupérer l'utilisateur, transmettre les informations de l'utilisateur comme argument à la tâche Celery pourrait être une meilleure approche. Selon Documentation Flask sur le contexte de l'application , la durée de vie de g est une requête. Étant donné que la tâche Celery est exécutée de manière asynchrone, elle serait exécutée dans un contexte d'application différent de celui de la requête où vous avez défini l'utilisateur.


0 commentaires

4
votes

Bien que les recommandations contenues dans les réponses de Dave et Greg soient valables, ce qu'elles manquent de souligner, c'est le malentendu que vous avez concernant l'utilisation d'un contexte d'application dans une tâche Celery.

Vous avez une application Flask, dans laquelle vous utilisez Flask-HTTPAuth. Vous avez probablement un gestionnaire verify_password qui définit g.user sur l'utilisateur authentifié. Cela signifie que pendant que vous traitez une demande, vous pouvez accéder à l'utilisateur en tant que g.user . Tout va bien.

Vous disposez également d'un ou de plusieurs nœuds de calcul Celery, qui sont des processus distincts qui n'ont pas de connexion directe avec le serveur Flask. La seule communication entre le serveur Flask et les processus de travail Celery s'effectue via le courtier de messages que vous utilisez (généralement Redis ou RabbitMQ).

Selon vos besoins, les ouvriers de Celery peuvent avoir besoin d'accéder à l'application Flask. Ceci est très courant lors de l'utilisation d'extensions Flask qui stockent leur configuration dans le dictionnaire app.config . Deux extensions courantes qui nécessitent cela sont Flask-SQLAlchemy et Flask-Mail. Sans accès à app.config , la tâche Celery n'aurait aucun moyen d'ouvrir une connexion à la base de données ou d'envoyer un e-mail, car elle ne connaîtrait pas les détails de la base de données et / ou du serveur de messagerie.

Pour permettre aux travailleurs Celery d'accéder à la configuration, la pratique acceptée consiste à créer des applications Flask en double dans chaque travailleur. Il s'agit d'applications secondaires qui ne sont en aucun cas connectées à l'objet d'application réel utilisé par le serveur Flask principal. Leur seul but est de conserver une copie du dictionnaire d'origine app.config accessible par votre tâche ou par toute extension Flask que votre tâche utilise.

Il est donc incorrect de s'attendre à ce qu'un g.user défini dans le serveur Flask soit également accessible en tant que g.user dans la tâche Celery, simplement parce que ce sont différents objets g , provenant de différentes instances d'application.

Si vous devez utiliser l'utilisateur authentifié dans la tâche Celery, vous devez passer le user_id (généralement g.user.id ) comme argument à ta tâche. Ensuite, dans votre tâche, vous pouvez charger l'utilisateur depuis la base de données en utilisant cet id . J'espère que cela vous aidera!


4 commentaires

Je pense que je suis. Le chargement de l'utilisateur dans la tâche me semble parfaitement logique et fonctionnera, j'ai confirmé. En fait, la réécriture dans la base de données semble être le problème. Je suis désolé, je ne peux tout simplement pas comprendre cela. Est-ce là que la chose en double de l'application Flask que vous avez mentionnée entre en jeu?


Si vous créez une application secondaire comme je l'ai indiqué dans la réponse, vous ne devriez avoir aucun problème à lire ou à écrire à partir de la base de données. Si vous rencontrez une erreur spécifique, vous devez développer cela dans votre question.


Bien que je ne sois pas sûr de ce que vous entendez par créer une application secondaire, je suppose que je l'ai fait, car je l'ai fait fonctionner, en étudiant vraiment le modèle fourni. Merci.


Tout ce que je veux dire par «secondaire», c'est que ce n'est pas l'instance Flask qui exécute le serveur.