J'ai besoin de lire un CSV et de publier sur un sujet Kafka au format Avro. Pendant la publication, je dois définir la clé de message comme la combinaison de deux attributs. Disons que j'ai un attribut appelé id et un attribut appelé group. J'ai besoin que ma clé de message soit id + "-" + group. Existe-t-il un moyen d'y parvenir dans Apache Nifi Flow? Définir la clé de message sur un seul attribut fonctionne très bien pour moi.
3 Réponses :
Oui, dans le PublishKafka_2_0
(ou quelle que soit la version que vous utilisez), définissez le paramètre < strong> Kafka Key pour construire votre clé de message en utilisant NiFi Expression Language . Pour votre exemple, l'expression $ {id} - $ {group}
le formera (par exemple id = myId
& group = MyGroup
-> myId-myGroup
).
Si vous ne remplissez pas cette propriété explicitement, le processeur recherche l'attribut kafka.key
, donc si vous aviez précédemment défini cette valeur, elle serait transmise.
Ah, donc le PublishKafkaRecord
publiera plusieurs messages sur Kafka, chacun en corrélation avec un enregistrement dans le fichier de flux NiFi unique. Dans ce cas, la propriété demande un champ (un terme d'enregistrement signifiant un élément du schéma d'enregistrement ) à utiliser pour remplir cette clé de message. Je suggérerais d'utiliser UpdateRecord
avant ce processeur pour ajouter un champ appelé messageKey
(ou ce que vous voulez) à chaque enregistrement à l'aide du langage d'expression, puis référencez ce champ dans la propriété du processeur de publication.
Merci pour votre réponse. J'utilise le processeur publishKafkaRecord_2_0 et il existe une propriété Message Key que j'utilise pour définir la clé. J'ai essayé la combinaison suggérée et elle publie une clé nulle. Quand j'essaye avec un champ (comme juste id), cela fonctionne. Mise à jour de ma question avec la capture d'écran du processeur publishKafkaRecord
Notez les (?) sur chaque propriété qui indiquent ce qui est autorisé ou non:
Lorsqu'un champ n'exclut pas les langages d'expression, utilisez un processeur updateAttribute pour définir la valeur combinée dont vous avez besoin. Ensuite, vous utilisez la valeur combinée en aval.
Merci. Je suis capable de référencer les attributs nifi intégrés comme uuid, filename, filesize, recordCount à partir du processeur updateAttribute. Comment puis-je faire référence aux champs de ma charge utile? Je lis un fichier csv et quand j'essaye de me référer à l'attribut comme $ {id}, cela ne fonctionne pas.
Le formulaire simple pour obtenir des valeurs CSV du contenu du fichier de flux à un attribut utilise SplitText -> ExtractText. Comme suggéré ci-dessous, UpdateRecord peut gérer la mise à jour du CSV à l'aide d'un lecteur / enregistreur CSV dans lequel vous indiquez à nifi les schémas à utiliser pour lire / écrire. UpateRecord est la méthode préférée.
Voici un exemple de CSV -> SplitText, ExtractText, ReplaceText to PutSQL. github.com/steven-dfheinz/NiFi-Templates votre flux ressemblerait mais vous changerait les valeurs extraites, comment elles sont reformées dans replaceText, puis les enverraient à PublishKafka. L'autre méthode est préférée car elle peut fonctionner sur l'ensemble du CSV, par rapport au temps de le séparer et de traiter des lignes séparées.
Cette discussion ne concerne pas le traitement des enregistrements, donc le champ dans ce cas ne fonctionnerait pas. La propriété attend un nom de champ d'enregistrement, pas une expression de langage d'expression composée comme la plupart des propriétés.
Merci pour vos contributions. J'ai dû changer ma conception initiale de production avec une combinaison de touches pour partitionner le fichier en fonction d'un champ spécifique à l'aide du processeur PartitionRecord. J'ai un champ de date dans mon fichier CSV et il peut y avoir plusieurs enregistrements par date. Je partitionne en fonction de ce champ de date et produit les rubriques kafka en utilisant le champ id comme clé par partition. Le nom de la rubrique kafka est dynamique et est suffixé avec la valeur de date. Puisque je prévois d'utiliser les flux Kafka pour lire les données de ces sujets, c'est une conception bien meilleure que la conception initiale.