3
votes

Comment gérer correctement les fichiers de parquet partitionnés générés à partir de Spark Streaming

Mon travail de streaming structuré Spark génère en continu des fichiers parquet que je souhaite supprimer après l'expiration (disons après 30 jours).

Je stocke mes données de parquet partitionnées avec la clé de partition étant la date de l'événement dans RFC3339 / ISO8601 afin que le ménage puisse être effectué assez facilement au niveau HDFS basé sur un travail cron (Supprimez tous les dossiers de parquet avec la clé de partition

Cependant, depuis que j'ai introduit Spark Streaming, Spark écrit les métadonnées dans un dossier nommé _spark_metadata à côté des données à écrire elles-mêmes. Si je supprime maintenant simplement les fichiers HDFS expirés et exécute un travail par lots Spark sur l'ensemble de données, le travail échouera en raison de fichiers introuvables. Le batchjob lira les métadonnées et s'attendra à ce que les fichiers déjà supprimés existent.

La solution la plus simple consiste simplement à désactiver la création du répertoire _spark_metadata , comme décrit ici: désactivation de _spark_metadata dans Structured Streaming dans spark 2.3.0 . Mais comme je ne veux pas perdre de performances lors de la lecture des données pour mon analyse par lots régulière, je me demande s'il n'y a pas de meilleure solution.

J'ai pensé, je pourrais alors simplement utiliser spark pour la suppression afin qu'il supprime les fichiers hdfs du parquet ET met à jour les métadonnées. Cependant, effectuer simplement un

session.sql(String.format("DELETE FROM parquet.`%s` WHERE partitionKey < " + oldestAllowedPartitionAge, path.toString()));

ne fonctionne pas. DELETE est malheureusement une opération non prise en charge dans Spark ...

Existe-t-il une solution pour que je puisse supprimer les anciennes données tout en conservant le dossier _spark_metadata travaille?


0 commentaires

3 Réponses :


3
votes

Pour autant que je sache, l'objectif principal de _spark_metadata était de garantir la tolérance aux pannes et d'éviter de lister tous les fichiers à traiter:

Afin de gérer correctement les pannes partielles tout en maintenant exactement une fois la sémantique, les fichiers de chaque lot sont écrits dans un répertoire unique, puis ajouté de manière atomique à un journal de métadonnées. Quand un DataSource basé sur parquet est initialisé pour la lecture, nous recherchez ce répertoire de journal et utilisez-le à la place de la liste des fichiers lorsque présent.

https://github.com/apache/spark/commit4444f6bc4bbe7

Le lien que vous avez cité ( la désactivation de _spark_metadata dans Structured Streaming dans Spark 2.3.0 ) explique que le problème provenait d'un état de point de contrôle incohérent - le point de contrôle a généré des métadonnées mais plus tard, l'utilisateur les a supprimées manuellement et quand il a redémarré la requête, il a échoué car le point de contrôle devrait avoir un fichier de métadonnées.

Pour voir si l'absence de métadonnées échouera votre traitement par lots, veuillez consulter la méthode org.apache.spark.sql.execution.datasources.DataSource # resolutionRelation où vous pouvez trouver des correspondances de modèles avec 2 cas:

  def hasMetadata(path: Seq[String], hadoopConf: Configuration): Boolean = {
    path match {
      case Seq(singlePath) =>
        try {
          val hdfsPath = new Path(singlePath)
          val fs = hdfsPath.getFileSystem(hadoopConf)
          if (fs.isDirectory(hdfsPath)) {
            fs.exists(new Path(hdfsPath, metadataDir))
          } else {
            false
          }
        } catch {
          case NonFatal(e) =>
            logWarning(s"Error while looking for metadata directory.")
            false
        }
      case _ => false
    }
  }

Et la méthode hasMetadata ressemble à:

  // We are reading from the results of a streaming query. Load files from the metadata log
  // instead of listing them using HDFS APIs.
  case (format: FileFormat, _)
      if FileStreamSink.hasMetadata(
        caseInsensitiveOptions.get("path").toSeq ++ paths,
        sparkSession.sessionState.newHadoopConf()) =>
  case (format: FileFormat, _) =>
    val globbedPaths =
      checkAndGlobPathIfNecessary(checkEmptyGlobPath = true, checkFilesExist = checkFilesExist)

Comme vous pouvez le voir, là il n'y a aucun risque d'échec (du moins en lisant le code!). Si vous en avez, veuillez donner plus de contexte car le problème est peut-être ailleurs.

Concernant votre problème de performances, cette _spark_metadata ne contient que la liste des fichiers, donc bien sûr, Spark aura d'abord besoin pour lister les fichiers de votre répertoire d'entrée. Mais d'après mon expérience, ce n'est pas l'opération la plus coûteuse. Par exemple, la liste du répertoire avec 1297 fichiers sur AWS S3 prend environ 9 secondes. Après cela, c'est à vous de décider si vous souhaitez avoir un processus de nettoyage simple ou un traitement par lots légèrement plus lent. Si vous avez beaucoup plus de fichiers comme celui-là, peut-être devriez-vous les regrouper en plus gros, comme 256 Mo ou plus?

Néanmoins, si vous souhaitez conserver les _spark_metadata , peut-être il existe un moyen de supprimer des fichiers par votre application de nettoyage. Mais ce sera difficile car vous aurez 2 applications (streaming et nettoyage) travaillant sur les mêmes données.

Vous pouvez trouver plus d'informations sur _spark_metadata ici: Comment changer l'emplacement du répertoire _spark_metadata?

p>


1 commentaires

Merci pour ces infos. Une fois que j'ai supprimé les _spark_metadata, mes processus par lots ont fonctionné à nouveau, donc je suis à peu près sûr que ce sont les problèmes ici. Mais je ne savais pas qu'ils sont nécessaires pour la sémantique Exactly-Once-Semantics, donc les désactiver n'est probablement pas une bonne idée :) Alors maintenant, je suis passé à un autre mécanisme de chargement de parquet dans les jobs batch ( stackoverflow.com/questions/53479585/... ) afin que _spark_metadata soit conservé mais que les tâches par lots continuent à travail.