2
votes

spark streaming de kafka comment spécifier l'heure limite pour les événements interrogés

J'ai une application de streaming Spark qui fonctionne en fin de journée et consomme les événements kafka envoyés par l'application en amont. Actuellement, l'application en amont continue de pousser de nouvelles données toute la journée et mon consommateur finit par les consommer. Je veux limiter les événements consommés en fonction d'une coupure, disons 18 heures tous les jours. Y a-t-il un moyen de spécifier une coupure pour limiter les événements consommés en fonction d'une coupure, par exemple l'horodatage de l'événement kafka ou quelque chose. Voici le code consommateur

  KafkaUtils.createDirectStream[String, String](ssc, PreferConsistent, Subscribe[String, String](topicSet, kafkaParams))


0 commentaires

3 Réponses :


2
votes

Vous pouvez simplement filtrer les événements pendant le traitement sur la base de timeStamp ou time ou n'importe quel champ. Par exemple, supposons que votre événement soit JSON et qu'il ait un champ appelé hour qui est la valeur de l'heure de l'événement. Vous pouvez facilement choisir uniquement l'événement qui a été créé avant 6, comme ci-dessous.

directStream.foreachRDD { rdd =>
        val eventDfRDD = rdd.filter(record => {
          val option = JSON.parseFull(record).get.asInstanceOf[Map[String, String]]
          option.get("hour") < 1800
        })
      }


4 commentaires

Merci le problème avec la consommation, c'est que je me retrouve dans une boucle sans fin où de nouveaux événements continuent à venir et que je devrais le consommer et le laisser tomber. horodatage de l'événement


Limiter le consommateur signifie que vous devez traiter le message. Par conséquent, vous ne pouvez pas vraiment contrôler l'afflux de messages du côté des consommateurs. Cela doit être fait du côté des producteurs.


Merci, j'essaye quelque chose comme


Merci d'avoir suivi votre suggestion et mis en œuvre le filtre et ça a l'air bien



1
votes

Lorsque vous déclarez le contexte de streaming, nous pouvons mentionner le temps limite pour créer dsstream et nous pouvons transmettre cette valeur au paramètre createDirectStream. Veuillez trouver le code snap. Dans le code ci-dessous, 5 secondes comme temps de coupure. Ainsi, toutes les 5 secondes, DStream RDD sera créé.

sc = spark.sparkContext
ssc = StreamingContext(sc,5)
kvs = KafkaUtils.createDirectStream(ssc, ['Topic-name'], {"metadata.broker.list": 'Server-name:port-number'},valueDecoder=serializer.decode_message)


2 commentaires

Merci mais ce que je demandais, ce n'est pas l'intervalle de lots. Je demande si je peux limiter les messages consommés dans DStream, disons si je crée DSTream avec une coupure, par exemple, l'horodatage de l'événement KAfka à 17 heures. Dstream ne doit pas interroger d'autres messages. et RDD.count doit renvoyer 0


Merci d'avoir suivi votre suggestion et mis en œuvre le filtre et ça a l'air bien



0
votes

Voici la solution que j'ai implémentée

1: Stocker l'heure actuelle dans une variable lorsque la tâche Spark Streaming démarre

val cuttoffTime = System.currentTimeMillis ()

2: Créer DirectStream

val directKafkaStream=   KafkaUtils.createDirectStream[String, String](ssc, PreferConsistent, Subscribe[String, String](topicSet, kafkaParams))

3: Appliquer les critères de filtre Dans la boucle foreach, appliquez les critères de filtre comme ci-dessous

directKafkaStream.foreachRDD {rdd => val filteredRdd = rdd.filter (_. timestamp ()


2 commentaires

Vous consommez toujours les messages, donc, la prochaine fois que vous consommerez, vous ne consommerez pas les messages qui ont été filtrés. Qu'en est-il de l'utilisation du flux d'étincelle pour tous les traitements qui peuvent se produire à tout moment, de l'enregistrement dans hadoop / s3, puis de l'utilisation d'un travail d'étincelle régulier (et non d'un flux) pour traiter uniquement ceux pour une date donnée?


non je ne valide pas le décalage si l'horodatage est supérieur. donc le lendemain, il sera traité à nouveau