J'ai un énorme ensemble de données dans le serveur SQL, je veux connecter le serveur SQL avec python, puis utiliser pyspark pour exécuter la requête.
J'ai vu le pilote JDBC mais je ne trouve pas le moyen de le faire, je l'ai fait avec PYODBC mais pas avec une étincelle.
Toute aide serait appréciée.
3 Réponses :
Veuillez utiliser ce qui suit pour vous connecter à Microsoft SQL:
def connect_to_sql( spark, jdbc_hostname, jdbc_port, database, data_table, username, password ): jdbc_url = "jdbc:sqlserver://{0}:{1}/{2}".format(jdbc_hostname, jdbc_port, database) connection_details = { "user": username, "password": password, "driver": "com.microsoft.sqlserver.jdbc.SQLServerDriver", } df = spark.read.jdbc(url=jdbc_url, table=data_table, properties=connection_details) return df
spark
est un objet SparkSession
, et le reste est assez clair .
Vous pouvez également transmettre des requêtes déroulantes à read.jdbc
merci d'avoir répondu, je pense que je suis sur le point de le résoudre, mais j'ai des problèmes avec la session Spark.
merci pour la réponse, je reçois une erreur avec ma sparksession, ce que j'ai est ceci: spark = SparkSession.builder \ .config ('spark.jars', r'D: \ Usuarios \ aospinam \ Downloads \ Microsoft JDBC Driver 7.4 pour SQL Server \ sqljdbc_7.4 \ enu \ mssql-jdbc-7.4.1.jre8 ') \ .appName ("app2") \ .master ("local"). getOrCreate ()
@AndresOspina vous n'aurez pas besoin d'ajouter les bocaux. Vous pouvez vous débarrasser de cette configuration
Comment savoir si j'ai une SparkSession bien établie?
@AndresOspina SparkSession
getOrCreate
n'est pas une fonction d'évaluation paresseuse. Ainsi, lorsque vous le créez, vous devriez obtenir l'erreur immédiatement. Faites-moi savoir quelle est l'erreur.
Py4JError Traceback (most recent call last) <ipython-input-15-a1c9096af940> in <module> 1 pushdown_query="SELECT TOP (1000) [iIDEstado],[iIDTipoEstado],[sEstado] ,[rOrden] ,[bAbierto],[sCampoAux1],[sCampoAux2]FROM [dbGestionFlota].[dbo].[tblSGSEstado]" ----> 2 df1 = spark.read.jdbc(url=jdbcUrl, table=pushdown_query, properties=connectionProperties) 3 display(df1) c:\programdata\anaconda3\lib\site-packages\pyspark\sql\readwriter.py in jdbc(self, url, table, column, lowerBound, upperBound, numPartitions, predicates, properties) 546 jprop = JavaClass("java.util.Properties", self._spark._sc._gateway._gateway_client)() 547 for k in properties: --> 548 jprop.setProperty(k, properties[k]) 549 if column is not None: 550 assert lowerBound is not None, "lowerBound can not be None when ``column`` is specified" C:\ProgramData\Anaconda3\Lib\site-packages\pyspark\python\lib\py4j-0.10.7-src.zip\py4j\java_gateway.py in __call__(self, *args) 1255 answer = self.gateway_client.send_command(command) 1256 return_value = get_return_value( -> 1257 answer, self.gateway_client, self.target_id, self.name) 1258 1259 for temp_arg in temp_args: c:\programdata\anaconda3\lib\site-packages\pyspark\sql\utils.py in deco(*a, **kw) 61 def deco(*a, **kw): 62 try: ---> 63 return f(*a, **kw) 64 except py4j.protocol.Py4JJavaError as e: 65 s = e.java_exception.toString() C:\ProgramData\Anaconda3\Lib\site-packages\pyspark\python\lib\py4j-0.10.7-src.zip\py4j\protocol.py in get_return_value(answer, gateway_client, target_id, name) 330 raise Py4JError( 331 "An error occurred while calling {0}{1}{2}. Trace:\n{3}\n". --> 332 format(target_id, ".", name, value)) 333 else: 334 raise Py4JError( Py4JError: An error occurred while calling o29.setProperty. Trace: py4j.Py4JException: Method setProperty([class java.lang.String, class java.lang.Boolean]) does not exist at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:318) at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:326) at py4j.Gateway.invoke(Gateway.java:274) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:238) at java.lang.Thread.run(Unknown Source)
J'utilise la fonction de pissall (connect_to_sql) mais je l'ai un peu modifiée.
df = spark.read.format("jdbc").option("url","jdbc:mysql://localhost/<database_name>").option("driver","com.mysql.jdbc.Driver").option("dbtable",<table_name>).option("user",<user>).option("password",<password>).load()
ou vous pouvez utiliser la méthode SparkSession .read
from pyspark.sql import SparkSession def connect_to_sql( spark, jdbc_hostname, jdbc_port, database, data_table, username, password ): jdbc_url = "jdbc:mysql://{0}:{1}/{2}".format(jdbc_hostname, jdbc_port, database) connection_details = { "user": username, "password": password, "driver": "com.mysql.jdbc.Driver", } df = spark.read.jdbc(url=jdbc_url, table=data_table, properties=connection_details) return df if __name__=='__main__': spark = SparkSession \ .builder \ .appName('test') \ .master('local[*]') \ .enableHiveSupport() \ .config("spark.driver.extraClassPath", <path to mysql-connector-java-5.1.49-bin.jar>) \ .getOrCreate() df = connect_to_sql(spark, 'localhost', <port>, <database_name>, <table_name>, <user>, <password>)
p>