1
votes

Comment écrire / écrireStream chaque ligne d'une dataframe dans une table delta différente

Chaque ligne de mon dataframe a un contenu CSV.

Je m'efforce d'enregistrer chaque ligne dans une table différente et spécifique.

Je pense que je dois utiliser un foreach ou un UDF pour accomplir cela, mais cela ne fonctionne tout simplement pas.

Tout le contenu que j'ai réussi à trouver était comme de simples impressions à l'intérieur de foreachs ou des codes utilisant .collect () (que je ne veux vraiment pas utiliser).

J'ai également trouvé la méthode de répartition, mais cela ne me permet pas de choisir où chaque ligne ira.

rows = df.count()
df.repartition(rows).write.csv('save-dir')

Pouvez-vous me donner un simple et un exemple concret?


0 commentaires

3 Réponses :


1
votes

Enregistrer chaque ligne sous forme de tableau est une opération coûteuse et déconseillée. Mais ce que vous essayez peut être réalisé comme ceci -

df.write.format("delta").partitionBy("<primary-key-column>").save("/delta/save-dir")

Désormais, chaque ligne sera enregistrée au format .parquet et vous pourrez créer une table externe à partir de chaque partition. Cela ne fonctionnera que si vous avez une valeur unique pour chaque ligne, c'est-à-dire une clé primaire.


2 commentaires

Je n'ai pas de clé unique, en fait, de nombreuses lignes vont dans la même table


Le dataframe a les colonnes CSV | ID. J'utiliserai l'ID pour obtenir les informations sur l'endroit où enregistrer le CSV. Y compris le nom de la table, de la base de données, du schéma et de sparkSchema. C'est pourquoi j'ai besoin d'un foreach ou d'un UDF. C'est là que tout échoue



0
votes

Eh bien, à la fin de tout, comme toujours, c'est quelque chose de très simple, mais je ne vois rien de cela.

Fondamentalement, lorsque vous effectuez un foreach et que le dataframe que vous souhaitez enregistrer est construit à l'intérieur de la boucle. Le worker, contrairement au pilote, ne configurera pas automatiquement le chemin "/ dbfs /" lors de la sauvegarde, donc si vous n'ajoutez pas manuellement le "/ dbfs /", il sauvegardera les données localement dans le worker.

C'est pourquoi mes boucles ne fonctionnaient pas.


0 commentaires

0
votes

Avez-vous essayé .mode ("append"). repartionBy ("ID") , il créera un répertoire pour chaque ID, puis n'oubliez pas de mettre le mode


0 commentaires