9
votes

Attendez un seul message rabbbitmq avec un délai d'attente

J'aimerais envoyer un message à un serveur rabbbitmq, puis attendez un message de réponse (sur une file d'attente "Répondre à"). Bien sûr, je ne veux pas attendre pour toujours au cas où le traitement de l'application ces messages est en panne - il doit être un délai d'attente. Cela ressemble à une tâche très fondamentale, mais je ne trouve pas un moyen de faire cela. Je rencontre maintenant ce problème avec py-amqplib et le Rabbitmq .NET Client .

La meilleure solution que j'ai jusqu'à présent est de sonder l'utilisation de BASIC_GET CODE> avec SLEEZ CODE> ENTRE-ENTRE, mais c'est assez moche: P>

def _wait_for_message_with_timeout(channel, queue_name, timeout):
    slept = 0
    sleep_interval = 0.1

    while slept < timeout:
        reply = channel.basic_get(queue_name)
        if reply is not None:
            return reply

        time.sleep(sleep_interval)
        slept += sleep_interval

    raise Exception('Timeout (%g seconds) expired while waiting for an MQ response.' % timeout)


0 commentaires

5 Réponses :


1
votes

Cela semble casser toute l'idée d'une transformation asynchrone, mais si vous devez, je pense que la bonne façon de le faire est d'utiliser un RPCClient .


1 commentaires

Bien que la RPCClient elle-même ne soit pas utile pour moi, en regardant sa mise en œuvre révèle que l'approche d'utilisation: Créez un weueuingbasicConsumer et attendez sur sa file d'attente, qui prend en charge un délai d'attente. Ce n'est pas aussi complexe dans .net que je craignais.



2
votes

Il y a un exemple ici en utilisant qpid avec un msg = q.get (timeout = 1) qui devrait faire ce que vous voulez faire . Désolé, je ne sais pas quelles autres bibliothèques client AMQP mettent en œuvre les délaisols (et en particulier, je ne connais pas les deux spécifiques que vous avez mentionnés).


1 commentaires

En regardant la source de QPID, il semble utiliser exactement la même approche que le client .NET: basic_consume avec une file d'attente et en attente de la file d'attente avec un délai d'attente. On dirait que c'est ce que je vais devoir faire.



9
votes

Voici ce que j'ai fini par faire dans le client .NET: xxx

Malheureusement, je ne peux pas faire la même chose avec Py-amqplib, car sa méthode basic_consume Ne pas appeler le rappel à moins que vous appeliez canal.wait () et canal.wait () ne prend pas en charge les délaisols! Cette limitation idiote (que je continue à courir) signifie que si vous ne recevez jamais un autre message, votre fil est congelé pour toujours.


0 commentaires

8
votes

Je viens d'ajouter un support de timeout pour amqplib dans carotte .

Ceci est une sous-classe de amqplib.client0_8.Connection :

http://github.com/ Demandez / carotte / blob / maître / carotte / backends / pyamqplib.py # l19-97

wait_multi est une version de canal.wait capable de recevoir sur un numéro arbitraire des canaux.

Je suppose que cela pourrait être fusionné en amont à un moment donné.


2 commentaires

Maintenant c'est ce que j'appelle une "bonne réponse": "C'est réparé"! Accepter - dans l'espoir que est fusionné dans AMQPLIB.


@Emp haha ​​:) drôle :)



1
votes

lapin vous permet maintenant d'ajouter des événements de timeout. Il suffit de envelopper votre code dans une prise d'essai, puis de lancer des exceptions dans le délai d'attente et de déconnecter les gestionnaires: xxx


0 commentaires