1
votes

Connexion Pyspark au serveur Microsoft SQL?

J'ai un énorme ensemble de données dans le serveur SQL, je veux connecter le serveur SQL avec python, puis utiliser pyspark pour exécuter la requête.

J'ai vu le pilote JDBC mais je ne trouve pas le moyen de le faire, je l'ai fait avec PYODBC mais pas avec une étincelle.

Toute aide serait appréciée.


0 commentaires

3 Réponses :


1
votes

Veuillez utiliser ce qui suit pour vous connecter à Microsoft SQL:

def connect_to_sql(
    spark, jdbc_hostname, jdbc_port, database, data_table, username, password
):
    jdbc_url = "jdbc:sqlserver://{0}:{1}/{2}".format(jdbc_hostname, jdbc_port, database)

    connection_details = {
        "user": username,
        "password": password,
        "driver": "com.microsoft.sqlserver.jdbc.SQLServerDriver",
    }

    df = spark.read.jdbc(url=jdbc_url, table=data_table, properties=connection_details)
    return df

spark est un objet SparkSession , et le reste est assez clair .

Vous pouvez également transmettre des requêtes déroulantes à read.jdbc


5 commentaires

merci d'avoir répondu, je pense que je suis sur le point de le résoudre, mais j'ai des problèmes avec la session Spark.


merci pour la réponse, je reçois une erreur avec ma sparksession, ce que j'ai est ceci: spark = SparkSession.builder \ .config ('spark.jars', r'D: \ Usuarios \ aospinam \ Downloads \ Microsoft JDBC Driver 7.4 pour SQL Server \ sqljdbc_7.4 \ enu \ mssql-jdbc-7.4.1.jre8 ') \ .appName ("app2") \ .master ("local"). getOrCreate ()


@AndresOspina vous n'aurez pas besoin d'ajouter les bocaux. Vous pouvez vous débarrasser de cette configuration


Comment savoir si j'ai une SparkSession bien établie?


@AndresOspina SparkSession getOrCreate n'est pas une fonction d'évaluation paresseuse. Ainsi, lorsque vous le créez, vous devriez obtenir l'erreur immédiatement. Faites-moi savoir quelle est l'erreur.



0
votes
Py4JError                                 Traceback (most recent call last)
<ipython-input-15-a1c9096af940> in <module>
      1 pushdown_query="SELECT TOP (1000) [iIDEstado],[iIDTipoEstado],[sEstado] ,[rOrden] ,[bAbierto],[sCampoAux1],[sCampoAux2]FROM [dbGestionFlota].[dbo].[tblSGSEstado]"
----> 2 df1 = spark.read.jdbc(url=jdbcUrl, table=pushdown_query, properties=connectionProperties)
      3 display(df1)

c:\programdata\anaconda3\lib\site-packages\pyspark\sql\readwriter.py in jdbc(self, url, table, column, lowerBound, upperBound, numPartitions, predicates, properties)
    546         jprop = JavaClass("java.util.Properties", self._spark._sc._gateway._gateway_client)()
    547         for k in properties:
--> 548             jprop.setProperty(k, properties[k])
    549         if column is not None:
    550             assert lowerBound is not None, "lowerBound can not be None when ``column`` is specified"

C:\ProgramData\Anaconda3\Lib\site-packages\pyspark\python\lib\py4j-0.10.7-src.zip\py4j\java_gateway.py in __call__(self, *args)
   1255         answer = self.gateway_client.send_command(command)
   1256         return_value = get_return_value(
-> 1257             answer, self.gateway_client, self.target_id, self.name)
   1258 
   1259         for temp_arg in temp_args:

c:\programdata\anaconda3\lib\site-packages\pyspark\sql\utils.py in deco(*a, **kw)
     61     def deco(*a, **kw):
     62         try:
---> 63             return f(*a, **kw)
     64         except py4j.protocol.Py4JJavaError as e:
     65             s = e.java_exception.toString()

C:\ProgramData\Anaconda3\Lib\site-packages\pyspark\python\lib\py4j-0.10.7-src.zip\py4j\protocol.py in get_return_value(answer, gateway_client, target_id, name)
    330                 raise Py4JError(
    331                     "An error occurred while calling {0}{1}{2}. Trace:\n{3}\n".
--> 332                     format(target_id, ".", name, value))
    333         else:
    334             raise Py4JError(

Py4JError: An error occurred while calling o29.setProperty. Trace:
py4j.Py4JException: Method setProperty([class java.lang.String, class java.lang.Boolean]) does not exist
    at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:318)
    at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:326)
    at py4j.Gateway.invoke(Gateway.java:274)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:238)
    at java.lang.Thread.run(Unknown Source)

0 commentaires

0
votes

J'utilise la fonction de pissall (connect_to_sql) mais je l'ai un peu modifiée.

df = spark.read.format("jdbc").option("url","jdbc:mysql://localhost/<database_name>").option("driver","com.mysql.jdbc.Driver").option("dbtable",<table_name>).option("user",<user>).option("password",<password>).load()

ou vous pouvez utiliser la méthode SparkSession .read

from pyspark.sql import SparkSession

def connect_to_sql(
    spark, jdbc_hostname, jdbc_port, database, data_table, username, password
):
    jdbc_url = "jdbc:mysql://{0}:{1}/{2}".format(jdbc_hostname, jdbc_port, database)

    connection_details = {
        "user": username,
        "password": password,
        "driver": "com.mysql.jdbc.Driver",
    }

    df = spark.read.jdbc(url=jdbc_url, table=data_table, properties=connection_details)
    return df

if __name__=='__main__':
    spark = SparkSession \
        .builder \
        .appName('test') \
        .master('local[*]') \
        .enableHiveSupport() \
        .config("spark.driver.extraClassPath", <path to mysql-connector-java-5.1.49-bin.jar>) \
        .getOrCreate()

    df = connect_to_sql(spark, 'localhost', <port>, <database_name>, <table_name>, <user>, <password>)

p>


0 commentaires