4
votes

Comment obtenir la valeur de l'emplacement d'une table Hive à l'aide d'un objet Spark?

Je souhaite pouvoir récupérer la valeur location d'une table Hive à partir d'un objet Spark (SparkSession). Une façon d'obtenir cette valeur consiste à analyser la sortie de l'emplacement via la requête SQL suivante:

describe formatted <table name>

Je me demandais s'il existe un autre moyen d'obtenir l ' emplacement value sans avoir à analyser la sortie. Une API serait idéale au cas où la sortie de la commande ci-dessus change entre les versions de Hive. Si une dépendance externe est nécessaire, quelle serait-elle? Existe-t-il un exemple de code Spark permettant d'obtenir la valeur de l'emplacement?


0 commentaires

6 Réponses :


1
votes

Première approche

Vous pouvez utiliser nom_fichier_entrée avec dataframe.

cela vous donnera le chemin absolu du fichier pour un fichier pièce.

/ p>

package org.apache.spark.sql.hive

import java.net.URI

import org.apache.spark.sql.catalyst.catalog.{InMemoryCatalog, SessionCatalog}
import org.apache.spark.sql.catalyst.parser.ParserInterface
import org.apache.spark.sql.internal.{SessionState, SharedState}
import org.apache.spark.sql.SparkSession

class TableDetail {
  def getTableLocation(table: String, spark: SparkSession): URI = {
    val sessionState: SessionState = spark.sessionState
    val sharedState: SharedState = spark.sharedState
    val catalog: SessionCatalog = sessionState.catalog
    val sqlParser: ParserInterface = sessionState.sqlParser
    val client = sharedState.externalCatalog match {
      case catalog: HiveExternalCatalog => catalog.client
      case _: InMemoryCatalog => throw new IllegalArgumentException("In Memory catalog doesn't " +
        "support hive client API")
    }

    val idtfr = sqlParser.parseTableIdentifier(table)

    require(catalog.tableExists(idtfr), new IllegalArgumentException(idtfr + " done not exists"))
    val rawTable = client.getTable(idtfr.database.getOrElse("default"), idtfr.table)
    rawTable.location
  }
}

Et puis en extraire le chemin de la table.

Deuxième approche

C'est plus de vous pirater peut dire.

spark.read.table("zen.intent_master").select(input_file_name).take(1)


5 commentaires

Que faire si la table Hive ne contient aucun fichier? Comment puis-je obtenir la valeur de l'emplacement pour cela?


@codeshark J'ai mis à jour la réponse avec la deuxième approche, j'espère que cela fonctionnera dans votre cas.


Qu'est-ce que "nom_fichier_entrée"?


C'est une fonction d'étincelle. Vous pouvez utiliser avec import org.apache.spark.sql.functions._


Vous pouvez rechercher de la documentation pour plus de détails. spark.apache.org/docs/2.0.0/api/scala/...



3
votes

Voici la bonne réponse:

import org.apache.spark.sql.catalyst.TableIdentifier

lazy val tblMetadata = spark.sessionState.catalog.getTableMetadata(new TableIdentifier(tableName,Some(schema)))


1 commentaires

@GuilhermedeLazari ici c'est spark._jsparkSession.sessionState (). Catalog (). GetTableMetada‌ ta (spark.sparkContex‌ t._jvm.org.apache.sp‌ ark.sql.catalyst.Tab‌ leIdentifier ('table '‌, spark.sparkContext._jvm.scala.Some (' base de données '))). Storage (). L‌ ocationUri (). Get ()



2
votes

Vous pouvez également utiliser la méthode .toDF sur le tableau au format desc , puis filtrer à partir du dataframe.

DataframeAPI :

String = /location/part_table

Résultat:

scala> :paste
spark.sql("desc formatted data_db.part_table")
.collect()
.filter(r => r(0).equals("Location")) //filter on r(0) value
.map(r => r(1)) //get only the location
.mkString //convert as string
.split("8020")(1) //change the split based on your namenode port..etc


1 commentaires

Merci pour le mode coller :)



1
votes

Voici comment procéder dans PySpark:

 (spark.sql("desc formatted mydb.myschema")
       .filter("col_name=='Location'")
       .collect()[0].data_type)   


0 commentaires

1
votes

Utilisez ceci comme fonction réutilisable dans votre projet scala

+--------+------------------------------------+-------+
|col_name|data_type                           |comment|
+--------+--------------------------------------------+
|Location|file:/Users/hive/spark-warehouse/src|       |
+--------+------------------------------------+-------+

file:/Users/hive/spark-warehouse/src

l'appelant serait

    println(getHiveTablePath("src", spark)) // you can prefix schema if you have

Result (j'ai exécuté en local donc file: / ci-dessous si son hdfs hdfs: // viendra):

  def getHiveTablePath(tableName: String, spark: SparkSession):String =
    {
       import org.apache.spark.sql.functions._
      val sql: String = String.format("desc formatted %s", tableName)
      val result: DataFrame = spark.sql(sql).filter(col("col_name") === "Location")
      result.show(false) // just for debug purpose
      val info: String = result.collect().mkString(",")
      val path: String = info.split(',')(1)
      path
    }


0 commentaires

0
votes

UTILISER ExternalCatalog

scala> spark
res15: org.apache.spark.sql.SparkSession = org.apache.spark.sql.SparkSession@4eba6e1f

scala> val metastore = spark.sharedState.externalCatalog
metastore: org.apache.spark.sql.catalyst.catalog.ExternalCatalog = org.apache.spark.sql.hive.HiveExternalCatalog@24b05292

scala> val location = metastore.getTable("meta_data", "mock").location
location: java.net.URI = hdfs://10.1.5.9:4007/usr/hive/warehouse/meta_data.db/mock


0 commentaires