如何使用 JDBC 源在 (Py)Spark 中写入和读取数据?
- 2025-02-20 09:23:00
- admin 原创
- 28
问题描述:
这个问题的目的是记录:
在 PySpark 中使用 JDBC 连接读取和写入数据所需的步骤
JDBC 源可能存在的问题以及已知解决方案
经过微小的改动,这些方法应该可以与其他受支持的语言兼容,包括 Scala 和 R。
解决方案 1:
写入数据
提交应用程序或启动 shell 时包含适用的 JDBC 驱动程序。例如,您可以使用
--packages
:
bin/pyspark --packages group:name:version
或driver-class-path
结合jars
bin/pyspark --driver-class-path $PATH_TO_DRIVER_JAR --jars $PATH_TO_DRIVER_JAR
这些属性也可以PYSPARK_SUBMIT_ARGS
在 JVM 实例启动之前使用环境变量来设置,或者使用conf/spark-defaults.conf
setspark.jars.packages
或spark.jars
/来设置spark.driver.extraClassPath
。
选择所需模式。Spark JDBC 编写器支持以下模式:
append
:将此:class:的内容附加DataFrame
到现有数据。
overwrite
:覆盖现有数据。
ignore
:如果数据已经存在,则默默忽略此操作。
error
(默认情况):如果数据已经存在,则抛出异常。
不支持更新插入或其他细粒度的修改
mode = ...
准备 JDBC URI,例如:
# You can encode credentials in URI or pass
# separately using properties argument
# of jdbc method or options
url = "jdbc:postgresql://localhost/foobar"
(可选)创建 JDBC 参数字典。
properties = {
"user": "foo",
"password": "bar"
}
properties
/options
也可用于设置支持的 JDBC 连接属性。
使用
DataFrame.write.jdbc
df.write.jdbc(url=url, table="baz", mode=mode, properties=properties)
保存数据(pyspark.sql.DataFrameWriter
详情请参阅)。
已知问题:
--packages
使用(java.sql.SQLException: No suitable driver found for jdbc: ...
)包含驱动程序时无法找到合适的驱动程序
假设没有驱动程序版本不匹配的问题,为了解决这个问题,你可以将driver
类添加到properties
。例如:
properties = {
...
"driver": "org.postgresql.Driver"
}
使用
df.write.format("jdbc").options(...).save()
可能会导致:
java.lang.RuntimeException:org.apache.spark.sql.execution.datasources.jdbc.DefaultSource 不允许创建表作为选择。
解决方案未知。
在 Pyspark 1.3 中,您可以尝试直接调用 Java 方法:
df._jdf.insertIntoJDBC(url, "baz", True)
读取数据
按照写入数据中的步骤 1-4 进行操作
使用
sqlContext.read.jdbc
:
sqlContext.read.jdbc(url=url, table="baz", properties=properties)
或者sqlContext.read.format("jdbc")
:
(sqlContext.read.format("jdbc")
.options(url=url, dbtable="baz", **properties)
.load())
已知问题和陷阱:
找不到合适的驱动程序 - 请参阅:写入数据
Spark SQL 支持使用 JDBC 源的谓词下推,但并非所有谓词都可以下推。它也不委托限制或聚合。可能的解决方法是用有效的子查询替换
dbtable
/table
参数。例如:
+ spark谓词下推可以与JDBC一起工作吗?
+ 执行pyspark.sql.DataFrame.take需要一个多小时(4)
+ 如何使用 SQL 查询在 dbtable 中定义表?
默认情况下,JDBC 数据源使用单个执行器线程按顺序加载数据。为了确保分布式数据加载,您可以:
+ 提供分区`column`(必须是`IntegerType`)`lowerBound`,,,`upperBound`。`numPartitions`
+ 提供一个互斥谓词列表`predicates`,每个所需分区一个。看:
+ 通过 JDBC 从 RDBMS 读取数据时在 spark 中进行分区,
+ 从 JDBC 源迁移数据时如何优化分区?,
+ 如何使用 DataFrame 和 JDBC 连接来提高慢速 Spark 作业的性能?
+ 使用 JDBC 导入 Postgres 时如何对 Spark RDD 进行分区?
在分布式模式下(使用分区列或谓词),每个执行器都在自己的事务中运行。如果同时修改源数据库,则无法保证最终视图的一致性。
在哪里可以找到合适的驱动程序:
Maven 存储库(用于获取
--packages
选择所需版本所需的坐标,并从 Gradle 选项卡中复制数据compile-group:name:version
以替换相应字段)或Maven 中央存储库:
+ PostgreSQL
+ MySQL
其他选择
根据数据库的不同,可能存在专门的源,并且在某些情况下是首选:
Greenplum - Pivotal Greenplum-Spark 连接器
Apache Phoenix - Apache Spark 插件
Microsoft SQL Server -用于 Azure SQL 数据库和 SQL Server 的 Spark 连接器
Amazon Redshift - Databricks Redshift 连接器(当前版本仅在专有 Databricks Runtime 中可用。已停产的开源版本,可在 GitHub 上获取)。
解决方案 2:
下载mysql-connector-java驱动程序并保存在 spark jar 文件夹中,观察下面的 python 代码将数据写入“acotr1”,我们必须在 mysql 数据库中创建 acotr1 表结构
spark = SparkSession.builder.appName("prasadad").master('local').config('spark.driver.extraClassPath','D:spark-2.1.0-bin-hadoop2.7jarsmysql-connector-java-5.1.41-bin.jar').getOrCreate()
sc = spark.sparkContext
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)
df = sqlContext.read.format("jdbc").options(url="jdbc:mysql://localhost:3306/sakila",driver="com.mysql.jdbc.Driver",dbtable="actor",user="root",password="****").load()
mysql_url="jdbc:mysql://localhost:3306/sakila?user=root&password=****"
df.write.jdbc(mysql_url,table="actor1",mode="append")
解决方案 3:
请参阅此链接下载 postgres 的 jdbc,并按照步骤下载 jar 文件
https://jaceklaskowski.gitbooks.io/mastering-apache-spark/exercises/spark-exercise-dataframe-jdbc-postgresql.html
jar 文件将在这样的路径中下载。“/home/anand/.ivy2/jars/org.postgresql_postgresql-42.1.1.jar”
如果你的spark版本是2
from pyspark.sql import SparkSession
spark = SparkSession.builder
.appName("sparkanalysis")
.config("spark.driver.extraClassPath",
"/home/anand/.ivy2/jars/org.postgresql_postgresql42.1.1.jar")
.getOrCreate()
//for localhost database//
pgDF = spark.read \n.format("jdbc") \n.option("url", "jdbc:postgresql:postgres") \n.option("dbtable", "public.user_emp_tab") \n.option("user", "postgres") \n.option("password", "Jonsnow@100") \n.load()
print(pgDF)
pgDF.filter(pgDF["user_id"]>5).show()
将文件保存为 python 并运行“python respectfilename.py”
- 2025年20款好用的项目管理软件推荐,项目管理提效的20个工具和技巧
- 2024年开源项目管理软件有哪些?推荐5款好用的项目管理工具
- 2024年常用的项目管理软件有哪些?推荐这10款国内外好用的项目管理工具
- 项目管理软件有哪些?推荐7款超好用的项目管理工具
- 项目管理软件有哪些最好用?推荐6款好用的项目管理工具
- 项目管理软件哪个最好用?盘点推荐5款好用的项目管理工具
- 项目管理软件排行榜:2024年项目经理必备5款开源项目管理软件汇总
- 项目管理必备:盘点2024年13款好用的项目管理软件
- 项目管理软件有哪些,盘点推荐国内外超好用的7款项目管理工具
- 2024项目管理软件排行榜(10类常用的项目管理工具全推荐)