如何使用 JDBC 源在 (Py)Spark 中写入和读取数据?

2025-02-20 09:23:00
admin
原创
28
摘要:问题描述:这个问题的目的是记录:在 PySpark 中使用 JDBC 连接读取和写入数据所需的步骤JDBC 源可能存在的问题以及已知解决方案经过微小的改动,这些方法应该可以与其他受支持的语言兼容,包括 Scala 和 R。解决方案 1:写入数据提交应用程序或启动 shell 时包含适用的 JDBC 驱动程序。...

问题描述:

这个问题的目的是记录:

  • 在 PySpark 中使用 JDBC 连接读取和写入数据所需的步骤

  • JDBC 源可能存在的问题以及已知解决方案

经过微小的改动,这些方法应该可以与其他受支持的语言兼容,包括 Scala 和 R。


解决方案 1:

写入数据

  1. 提交应用程序或启动 shell 时包含适用的 JDBC 驱动程序。例如,您可以使用--packages

 bin/pyspark --packages group:name:version  

driver-class-path结合jars

    bin/pyspark --driver-class-path $PATH_TO_DRIVER_JAR --jars $PATH_TO_DRIVER_JAR

这些属性也可以PYSPARK_SUBMIT_ARGS在 JVM 实例启动之前使用环境变量来设置,或者使用conf/spark-defaults.confsetspark.jars.packagesspark.jars/来设置spark.driver.extraClassPath

  1. 选择所需模式。Spark JDBC 编写器支持以下模式:

  • append:将此:class:的内容附加DataFrame到现有数据。

    • overwrite:覆盖现有数据。

    • ignore:如果数据已经存在,则默默忽略此操作。

    • error(默认情况):如果数据已经存在,则抛出异常。

不支持更新插入或其他细粒度的修改

 mode = ...
  1. 准备 JDBC URI,例如:

 # You can encode credentials in URI or pass
 # separately using properties argument
 # of jdbc method or options

 url = "jdbc:postgresql://localhost/foobar"
  1. (可选)创建 JDBC 参数字典。

 properties = {
     "user": "foo",
     "password": "bar"
 }

properties/options也可用于设置支持的 JDBC 连接属性。

  1. 使用DataFrame.write.jdbc

 df.write.jdbc(url=url, table="baz", mode=mode, properties=properties)

保存数据(pyspark.sql.DataFrameWriter详情请参阅)。

已知问题

  • --packages使用( java.sql.SQLException: No suitable driver found for jdbc: ...)包含驱动程序时无法找到合适的驱动程序

假设没有驱动程序版本不匹配的问题,为了解决这个问题,你可以将driver类添加到properties。例如:

  properties = {
      ...
      "driver": "org.postgresql.Driver"
  }
  • 使用df.write.format("jdbc").options(...).save()可能会导致:

java.lang.RuntimeException:org.apache.spark.sql.execution.datasources.jdbc.DefaultSource 不允许创建表作为选择。

解决方案未知。

  • 在 Pyspark 1.3 中,您可以尝试直接调用 Java 方法:

  df._jdf.insertIntoJDBC(url, "baz", True)

读取数据

  1. 按照写入数据中的步骤 1-4 进行操作

  2. 使用sqlContext.read.jdbc

 sqlContext.read.jdbc(url=url, table="baz", properties=properties)

或者sqlContext.read.format("jdbc")

    (sqlContext.read.format("jdbc")
        .options(url=url, dbtable="baz", **properties)
        .load())

已知问题和陷阱

  • 找不到合适的驱动程序 - 请参阅:写入数据

  • Spark SQL 支持使用 JDBC 源的谓词下推,但并非所有谓词都可以下推。它也不委托限制或聚合。可能的解决方法是用有效的子查询替换dbtable/table参数。例如:

+ spark谓词下推可以与JDBC一起工作吗?
+ 执行pyspark.sql.DataFrame.take需要一个多小时(4)
+ 如何使用 SQL 查询在 dbtable 中定义表?
  • 默认情况下,JDBC 数据源使用单个执行器线程按顺序加载数据。为了确保分布式数据加载,您可以:

+ 提供分区`column`(必须是`IntegerType`)`lowerBound`,,,`upperBound`。`numPartitions`
+ 提供一个互斥谓词列表`predicates`,每个所需分区一个。看:
+ 通过 JDBC 从 RDBMS 读取数据时在 spark 中进行分区,
+ 从 JDBC 源迁移数据时如何优化分区?,
+ 如何使用 DataFrame 和 JDBC 连接来提高慢速 Spark 作业的性能?
+ 使用 JDBC 导入 Postgres 时如何对 Spark RDD 进行分区?
  • 在分布式模式下(使用分区列或谓词),每个执行器都在自己的事务中运行。如果同时修改源数据库,则无法保证最终视图的一致性。

在哪里可以找到合适的驱动程序:

  • Maven 存储库(用于获取--packages选择所需版本所需的坐标,并从 Gradle 选项卡中复制数据compile-group:name:version以替换相应字段)或Maven 中央存储库:

+ PostgreSQL
+ MySQL

其他选择

根据数据库的不同,可能存在专门的源,并且在某些情况下是首选:

  • Greenplum - Pivotal Greenplum-Spark 连接器

  • Apache Phoenix - Apache Spark 插件

  • Microsoft SQL Server -用于 Azure SQL 数据库和 SQL Server 的 Spark 连接器

  • Amazon Redshift - Databricks Redshift 连接器(当前版本仅在专有 Databricks Runtime 中可用。已停产的开源版本,可在 GitHub 上获取)。

解决方案 2:

下载mysql-connector-java驱动程序并保存在 spark jar 文件夹中,观察下面的 python 代码将数据写入“acotr1”,我们必须在 mysql 数据库中创建 acotr1 表结构

    spark = SparkSession.builder.appName("prasadad").master('local').config('spark.driver.extraClassPath','D:spark-2.1.0-bin-hadoop2.7jarsmysql-connector-java-5.1.41-bin.jar').getOrCreate()

    sc = spark.sparkContext

    from pyspark.sql import SQLContext

    sqlContext = SQLContext(sc)

    df = sqlContext.read.format("jdbc").options(url="jdbc:mysql://localhost:3306/sakila",driver="com.mysql.jdbc.Driver",dbtable="actor",user="root",password="****").load()

    mysql_url="jdbc:mysql://localhost:3306/sakila?user=root&password=****"

    df.write.jdbc(mysql_url,table="actor1",mode="append")

解决方案 3:

请参阅此链接下载 postgres 的 jdbc,并按照步骤下载 jar 文件

https://jaceklaskowski.gitbooks.io/mastering-apache-spark/exercises/spark-exercise-dataframe-jdbc-postgresql.html
jar 文件将在这样的路径中下载。“/home/anand/.ivy2/jars/org.postgresql_postgresql-42.1.1.jar”

如果你的spark版本是2

from pyspark.sql import SparkSession

spark = SparkSession.builder
        .appName("sparkanalysis")
        .config("spark.driver.extraClassPath",
         "/home/anand/.ivy2/jars/org.postgresql_postgresql42.1.1.jar")
        .getOrCreate()

//for localhost database//

pgDF = spark.read \n.format("jdbc") \n.option("url", "jdbc:postgresql:postgres") \n.option("dbtable", "public.user_emp_tab") \n.option("user", "postgres") \n.option("password", "Jonsnow@100") \n.load()


print(pgDF)

pgDF.filter(pgDF["user_id"]>5).show()

将文件保存为 python 并运行“python respectfilename.py”

相关推荐
  为什么项目管理通常仍然耗时且低效?您是否还在反复更新电子表格、淹没在便利贴中并参加每周更新会议?这确实是耗费时间和精力。借助软件工具的帮助,您可以一目了然地全面了解您的项目。如今,国内外有足够多优秀的项目管理软件可以帮助您掌控每个项目。什么是项目管理软件?项目管理软件是广泛行业用于项目规划、资源分配和调度的软件。它使项...
项目管理软件   1325  
  IPD(Integrated Product Development)流程作为一种先进的产品开发管理模式,在众多企业中得到了广泛应用。它涵盖了从产品概念产生到产品退市的整个生命周期,通过整合跨部门团队、优化流程等方式,显著提升产品开发的效率和质量,进而为项目的成功奠定坚实基础。深入探究IPD流程的五个阶段与项目成功之间...
IPD流程分为几个阶段   4  
  华为作为全球知名的科技企业,其成功背后的管理体系备受关注。IPD(集成产品开发)流程作为华为核心的产品开发管理模式,其中的创新管理与实践更是蕴含着丰富的经验和深刻的智慧,对众多企业具有重要的借鉴意义。IPD流程的核心架构IPD流程旨在打破部门墙,实现跨部门的高效协作,将产品开发视为一个整体的流程。它涵盖了从市场需求分析...
华为IPD是什么   3  
  IPD(Integrated Product Development)研发管理体系作为一种先进的产品开发模式,在众多企业的发展历程中发挥了至关重要的作用。它不仅仅是一套流程,更是一种理念,一种能够全方位提升企业竞争力,推动企业持续发展的有效工具。深入探究IPD研发管理体系如何助力企业持续发展,对于众多渴望在市场中立足并...
IPD管理流程   3  
  IPD(Integrated Product Development)流程管理旨在通过整合产品开发流程、团队和资源,实现产品的快速、高质量交付。在这一过程中,有效降低成本是企业提升竞争力的关键。通过优化IPD流程管理中的各个环节,可以在不牺牲产品质量和性能的前提下,实现成本的显著降低,为企业创造更大的价值。优化产品规划...
IPD流程分为几个阶段   4  
热门文章
项目管理软件有哪些?
云禅道AD
禅道项目管理软件

云端的项目管理软件

尊享禅道项目软件收费版功能

无需维护,随时随地协同办公

内置subversion和git源码管理

每天备份,随时转为私有部署

免费试用