Mon travail de streaming structuré Spark génère en continu des fichiers parquet que je souhaite supprimer après l'expiration (disons après 30 jours).
Je stocke mes données de parquet partitionnées avec la clé de partition étant la date de l'événement dans RFC3339 / ISO8601 afin que le ménage puisse être effectué assez facilement au niveau HDFS basé sur un travail cron (Supprimez tous les dossiers de parquet avec la clé de partition Cependant, depuis que j'ai introduit Spark Streaming, Spark écrit les métadonnées dans un dossier nommé La solution la plus simple consiste simplement à désactiver la création du répertoire J'ai pensé, je pourrais alors simplement utiliser spark pour la suppression afin qu'il supprime les fichiers hdfs du parquet ET met à jour les métadonnées. Cependant, effectuer simplement un ne fonctionne pas. Existe-t-il une solution pour que je puisse supprimer les anciennes données tout en conservant le dossier _spark_metadata
à côté des données à écrire elles-mêmes. Si je supprime maintenant simplement les fichiers HDFS expirés et exécute un travail par lots Spark sur l'ensemble de données, le travail échouera en raison de fichiers introuvables. Le batchjob lira les métadonnées et s'attendra à ce que les fichiers déjà supprimés existent. _spark_metadata
, comme décrit ici: désactivation de _spark_metadata dans Structured Streaming dans spark 2.3.0 . Mais comme je ne veux pas perdre de performances lors de la lecture des données pour mon analyse par lots régulière, je me demande s'il n'y a pas de meilleure solution. session.sql(String.format("DELETE FROM parquet.`%s` WHERE partitionKey < " + oldestAllowedPartitionAge, path.toString()));
DELETE
est malheureusement une opération non prise en charge dans Spark ... _spark_metadata
travaille?
3 Réponses :
Pour autant que je sache, l'objectif principal de _spark_metadata
était de garantir la tolérance aux pannes et d'éviter de lister tous les fichiers à traiter:
Afin de gérer correctement les pannes partielles tout en maintenant exactement une fois la sémantique, les fichiers de chaque lot sont écrits dans un répertoire unique, puis ajouté de manière atomique à un journal de métadonnées. Quand un
DataSource
basé sur parquet est initialisé pour la lecture, nous recherchez ce répertoire de journal et utilisez-le à la place de la liste des fichiers lorsque présent.
https://github.com/apache/spark/commit4444f6bc4bbe7
Le lien que vous avez cité ( la désactivation de _spark_metadata dans Structured Streaming dans Spark 2.3.0 ) explique que le problème provenait d'un état de point de contrôle incohérent - le point de contrôle a généré des métadonnées mais plus tard, l'utilisateur les a supprimées manuellement et quand il a redémarré la requête, il a échoué car le point de contrôle devrait avoir un fichier de métadonnées.
Pour voir si l'absence de métadonnées échouera votre traitement par lots, veuillez consulter la méthode org.apache.spark.sql.execution.datasources.DataSource # resolutionRelation où vous pouvez trouver des correspondances de modèles avec 2 cas:
def hasMetadata(path: Seq[String], hadoopConf: Configuration): Boolean = { path match { case Seq(singlePath) => try { val hdfsPath = new Path(singlePath) val fs = hdfsPath.getFileSystem(hadoopConf) if (fs.isDirectory(hdfsPath)) { fs.exists(new Path(hdfsPath, metadataDir)) } else { false } } catch { case NonFatal(e) => logWarning(s"Error while looking for metadata directory.") false } case _ => false } }
Et la méthode hasMetadata
ressemble à:
// We are reading from the results of a streaming query. Load files from the metadata log // instead of listing them using HDFS APIs. case (format: FileFormat, _) if FileStreamSink.hasMetadata( caseInsensitiveOptions.get("path").toSeq ++ paths, sparkSession.sessionState.newHadoopConf()) => case (format: FileFormat, _) => val globbedPaths = checkAndGlobPathIfNecessary(checkEmptyGlobPath = true, checkFilesExist = checkFilesExist)
Comme vous pouvez le voir, là il n'y a aucun risque d'échec (du moins en lisant le code!). Si vous en avez, veuillez donner plus de contexte car le problème est peut-être ailleurs.
Concernant votre problème de performances, cette _spark_metadata
ne contient que la liste des fichiers, donc bien sûr, Spark aura d'abord besoin pour lister les fichiers de votre répertoire d'entrée. Mais d'après mon expérience, ce n'est pas l'opération la plus coûteuse. Par exemple, la liste du répertoire avec 1297 fichiers sur AWS S3 prend environ 9 secondes. Après cela, c'est à vous de décider si vous souhaitez avoir un processus de nettoyage simple ou un traitement par lots légèrement plus lent. Si vous avez beaucoup plus de fichiers comme celui-là, peut-être devriez-vous les regrouper en plus gros, comme 256 Mo ou plus?
Néanmoins, si vous souhaitez conserver les _spark_metadata
, peut-être il existe un moyen de supprimer des fichiers par votre application de nettoyage. Mais ce sera difficile car vous aurez 2 applications (streaming et nettoyage) travaillant sur les mêmes données.
Vous pouvez trouver plus d'informations sur _spark_metadata
ici: Comment changer l'emplacement du répertoire _spark_metadata?
p>
Merci pour ces infos. Une fois que j'ai supprimé les _spark_metadata, mes processus par lots ont fonctionné à nouveau, donc je suis à peu près sûr que ce sont les problèmes ici. Mais je ne savais pas qu'ils sont nécessaires pour la sémantique Exactly-Once-Semantics, donc les désactiver n'est probablement pas une bonne idée :) Alors maintenant, je suis passé à un autre mécanisme de chargement de parquet dans les jobs batch ( stackoverflow.com/questions/53479585/... ) afin que _spark_metadata soit conservé mais que les tâches par lots continuent à travail.
Il s'agit en fait de l'un des problèmes connus du streaming structuré ( SPARK-24295 a >) bien que cela ne se produise qu'avec des fichiers d'entrée massifs, et les utilisateurs finaux prennent leurs propres solutions de contournement. Par exemple, arrêtez la requête -> supprimez les anciens fichiers d'entrée -> manipulez les métadonnées manuellement pour les purger -> redémarrez la requête.
Étant donné que la manipulation manuelle des métadonnées n'est ni triviale ni idéale (étant donné qu'elle devrait arrêter la requête en continu et forcer les utilisateurs finaux à comprendre le format des métadonnées), SPARK-27188 est proposé comme alternative - il applique la rétention et purge les fichiers d'entrée obsolètes des métadonnées.
Merci de m'avoir signalé les problèmes appropriés. Comme ces tickets sont toujours ouverts, je conclus qu'il n'y a pas encore de solution appropriée mais seulement quelques solutions de contournement. J'ai maintenant opté pour ( stackoverflow.com/questions/53479585/… a>). Cela répond à ma question pour le moment et j'espère que ces tickets seront bientôt résolus :)
Pour autant que je sache, celui-ci a trois options pour résoudre ce problème:
1) Utilisez spark.load (filePathsUsingGlobRegex)
pour charger uniquement les fichiers qui doivent être lus, de cette façon, spark n'a pas besoin de charger tous les fichiers et n'a donc pas besoin de spark_metadata.
Avantages : vous bénéficiez toujours de spark_metadata (les lectures sont plus rapides, la sémantique exactement une fois est toujours assurée)
Inconvénients : Vous devez créer vous-même le chemin des fichiers, ce qui peut être plus compliqué si vous avez des données stockées dans diverses stratégies de partitionnement.
2) Ne créez pas spark_metadata dans le répertoire de sortie désactivation de _spark_metadata dans la diffusion en continu structurée dans Spark 2.3.0
Avantages : le nettoyage est simple
Inconvénients : vous perdez le bénéfice de spark_metadata.
3) Comprendre et mettre à jour le fichier spark_metadata pendant la mise à niveau, supprimez les anciens fichiers.
Avantages : vous bénéficiez à la fois des avantages de la rétention et de spark_metadata.
Inconvénients : Vous devez modifier manuellement les _spark_metadata, ce qui peut être un code difficile / compliqué à maintenir. Étant donné que cette étincelle est interne et peut changer.