3
votes

Groupe de consommateurs Kafka et partitions avec streaming structuré Spark

J'ai un sujet Kafka avec 3 partitions et je consomme ces données en utilisant le streaming structuré Spark. J'ai 3 consommateurs (disons le groupe de consommateurs A) lisant chacun à partir d'une seule partition, tout est fichier de travail jusqu'à ici.

J'ai une nouvelle exigence pour lire à partir du même sujet et je veux le paralléliser en créant 3 consommateurs (disons le groupe de consommateurs B) à nouveau chaque lecture à partir d'une seule partition. Comme j'utilise le streaming structuré, je ne peux pas mentionner explicitement group.id .

Les consommateurs de différents groupes pointant vers une seule / même partition liront-ils toutes les données?


1 commentaires

Je ne sais pas comment Spark fonctionne avec cela, mais si la question est de savoir si la lecture est indépendante entre les groupes, oui; Vous aurez deux consommateurs pour chaque partition, chacun avec son propre identifiant de groupe, lisant tous les messages indépendamment


3 Réponses :


0
votes

L'utilisation peut utiliser group.id comme ci-dessous pour le streaming

String processingGroup = "processingGroupA";

Dataset<Row> raw_df = sparkSession
                      .readStream()
                      .format("kafka")
                      .option("kafka.bootstrap.servers", consumerAppProperties.getProperty(BOOTSTRAP_SERVERS_CONFIG))
                      .option("subscribe", topicName) 
                      .option("startingOffsets", "latest")
                      .option("group.id",  processingGroup)
                      .load();


0 commentaires

0
votes

Sauf si vous utilisez Spark 3.x ou version ultérieure, vous ne pourrez pas définir le group.id dans votre flux d'entrée Kafka. En utilisant Spark 3.x, vous pouvez, comme vous l'avez mentionné, avoir deux travaux de streaming structuré différents fournissant deux group.id différents pour vous assurer que chaque travail lit tous les messages du sujet indépendamment de l'autre travail.

Pour les versions Spark <= 2.4.x, Spark lui-même créera un groupe de consommateurs unique pour vous, comme vous pouvez le rechercher dans le code sur GitHub :

// Each running query should use its own group id. Otherwise, the query may be only 
// assigned partial data since Kafka will assign partitions to multiple consumers having
// the same group id. Hence, we should generate a unique id for each query.
val uniqueGroupId = s"spark-kafka-source-${UUID.randomUUID}-${metadataPath.hashCode}"

Ainsi, également dans ce cas, le fait d'avoir deux jobs de streaming différents garantira que vous avez deux ConsumerGroup différents, ce qui permet aux deux jobs de lire tous les messages du sujet indépendamment de l'autre job.


0 commentaires

1
votes

À partir de la documentation Spark 3.0.1:

Par défaut, chaque requête génère un identifiant de groupe unique pour la lecture des données. Cela garantit que chaque source Kafka a son propre groupe de consommateurs qui ne subit aucune interférence de la part d'un autre consommateur, et peut donc lire toutes les partitions de ses sujets souscrits.

Donc, si vous utilisez l'option assign et que vous mentionnez la partition à utiliser, il lira toutes les données d'une partition spécifique car par sa nature par défaut, ce sera un groupe de consommateurs différent (group.id). assign option assign prend la chaîne json comme valeur et peut également avoir plusieurs partitions de différents sujets. Par exemple, {"topicA":[0,1],"topicB":[2,4]} .

val df = spark
  .read
  .format("kafka")
  .option("kafka.bootstrap.servers", "host:port")
  .option("assign", "{"topic-name":[0]}")
  .load()


0 commentaires