Je reçois des données via Azure Eventhub et essayez de fermer si je ne reçois plus de données pour une valeur de 10 ou 15 secondes.
J'ai essayé de mettre en œuvre des arguments Je fais aussi référence à cette Exemple . p> Il existe une fonction "On_Error" qui peut fonctionner pour fermer le client lorsqu'il n'y a aucun message supplémentaire consommé. P> auth_timeout code> et
idel_timeout code> dans _ Consumer_Client , mais ni n'a travaillé. P>
consumer_client = EventHubConsumerClient.from_connection_string(conn_str=CONNECTION_STR,consumer_group='forceconsummer',eventhub_name=EVENTHUB_NAME, idle_timeout = 30, auth_timeout = 10)
consumer_client.receive(on_event=on_event, partition_id = "2", track_last_enqueued_event_properties=False, on_error=on_error, starting_position="@latest")
3 Réponses :
.Receive est un appel de blocage. Essayez de l'appeler dans un thread séparé afin que vous puissiez fermer le consommateur dans le fil principal. Vous pouvez créer de l'extrémité inférieure à l'extrémité de la dernière fois reçue et prendre une décision de près s'il a été un certain temps depuis le dernier événement reçu.
thread = threading.Thread( target=consumer_client.receive, kwargs={ "on_event": on_event, "on_partition_initialize": on_partition_initialize, "on_partition_close": on_partition_close, "on_error": on_error, "starting_position": "-1", # "-1" is from the beginning of the partition. } ) thread.daemon = True thread.start() time.sleep(RECEIVE_DURATION) consumer_client.close() thread.join()
Devrais-je ajouter ce code après consommer_client.receive code>?
J'ai eu ton point, mais je ne suis toujours pas sûr de définir la réception_duration. Vous vouliez probablement utiliser `temps.time () ', mais comment peut-il être fait au milieu de l'opération de consommateur_thread? Pourriez-vous spécifier un peu plus en détail?
Ne me laissez pas commenter, mais j'avais le même problème. Le délai d'inactivité de Builin Azure semble ne rien faire. P>
L'exemple de Serkant a fonctionné pour moi. Il suffit de remplacer (recevoir_duration) avec un int ... c'est-à-dire 15 secondes. Oh et importer un filetage p>
Je n'essaie pas de définir combien de temps ce fil devrait être exécuté. Je veux que cela soit fermé quand il n'y a pas d'activité dans la réception de messages du producteur.
Ouais. Moi aussi. Je devais ajouter un tas de choses mais c'était là que j'ai commencé. Je suis en train d'écrire tous les journaux reçus dans un fichier ... Je lui donne 15 secondes et je fais une boucle tandis que la boucle tandis que vérifie la fichible à partir du fichier de fichiers précédent. S'il ne change pas dans les 15 secondes .. Je ferme la connexion. Si c'est le cas, je réinitialise le compteur de boucle et répétez
Je pense que vous pouvez utiliser le Il existe un on_event code> ou
on_event_batch code> rappel pour atteindre l'objectif.
max_wait_time code> argument de mot-clé que vous pouvez passer à la
recevoir code> ou
réception_batch code> méthode. Vous pouvez utiliser Aucun / Vide Array en tant qu'indicateur pour suivre la durée de votre temps que vous n'avez pas reçu de message. P>
import os
import time
import functools
from azure.eventhub import EventHubConsumerClient
last_receive_none_time = None
duration_to_close_after_not_receiving_events = 15
def on_event(consumer_client, partition_context, event):
global last_receive_none_time
global duration_to_close_after_not_receiving_events
if not event:
print('not receiving event in partition {}'.format(partition_context.partition_id))
if not last_receive_none_time:
last_receive_none_time = time.time()
else:
cur_time = time.time()
not_receiving_event_duration = cur_time - last_receive_none_time
if not_receiving_event_duration > duration_to_close_after_not_receiving_events:
consumer_client.close()
else:
print('event received in partition {}'.format(partition_context.partition_id))
last_receive_none_time = None # reset the timer
if __name__ == '__main__':
consumer_client = EventHubConsumerClient.from_connection_string(
conn_str=CONNECTION_STR,
consumer_group='$Default',
eventhub_name=EVENTHUB_NAME,
)
try:
with consumer_client:
on_event_callback_with_consumer_client = functools.partial(on_event, consumer_client)
consumer_client.receive(
on_event=on_event_callback_with_consumer_client,
starting_position="-1", # "-1" is from the beginning of the partition.
max_wait_time=5,
)
except KeyboardInterrupt:
print('Stopped receiving.')