欢迎来到尧图网

客户服务 关于我们

您的位置:首页 > 健康 > 养生 > Spark SQL----连接其他数据库的JDBC

Spark SQL----连接其他数据库的JDBC

2024/10/26 21:31:22 来源:https://blog.csdn.net/gabriel_wang_sh/article/details/137064443  浏览:    关键词:Spark SQL----连接其他数据库的JDBC

Spark SQL----连接其他数据库的JDBC

  • 数据源选项

Spark SQL还包括一个数据源,可以使用JDBC从其他数据库读取数据。与使用 JdbcRDD相比,应该优先使用此功能。这是因为结果以DataFrame的形式返回,并且可以很容易地在Spark SQL中进行处理或与其他数据源join。JDBC数据源也更容易从Java或Python中使用,因为它不需要用户提供ClassTag。(请注意,这与Spark SQL JDBC服务器不同,后者允许其他应用程序使用Spark SQL运行查询)。
要开始,你需要在spark类路径中包含特定数据库的JDBC驱动程序。例如,要从Spark Shell连接到postgres,你需要运行以下命令:

./bin/spark-shell --driver-class-path postgresql-9.4.1207.jar --jars postgresql-9.4.1207.jar

数据源选项

Spark支持以下JDBC不区分大小写的选项。JDBC的数据源选项可以通过以下方式设置:

  • 以下类的.option/.options方法
    • DataFrameReader
    • DataFrameWriter
  • CREATE TABLE USING DATA_SOURCE处的OPTIONS子句

对于连接属性,用户可以在数据源选项中指定JDBC连接属性。用户和密码通常作为登录到数据源的连接属性提供。

Property NameDefaultMeaningScope
url(none)要连接到的JDBC:subprotocol:subname 形式的JDBC URL。特定于源的连接属性可以在URL中指定。例如, jdbc:postgresql://localhost/test?user=fred&password=secretread/write
dbtable(none)应该读取或写入的JDBC表。请注意,当在读路径中使用它时,可以使用SQL查询的FROM子句中有效的任何内容。例如,除了完整的表,还可以在括号中使用子查询。不允许同时指定dbtable和query选项。read/write
query(none)将用于把数据读取到Spark中的查询。指定的查询将用括号括起来,并用作FROM子句中的子查询。Spark还将为子查询子句分配一个别名。例如,spark将向JDBC源发出以下形式的查询。SELECT <columns> FROM (<user_specified_query>) spark_gen_alias。 下面是使用此选项时的几个限制。1. 不允许同时指定dbtable和query选项。2. 不允许同时指定query和partitionColumn选项。当需要指定partitionColumn选项时,可以使用dbtable选项来指定子查询,并且可以使用作为dbtable的一部分提供的子查询别名来限定分区列。例子: spark.read.format(“jdbc”).option(“url”, jdbcUrl).option(“query”, “select c1, c2 from t1”).load()read/write
prepareQuery(none)与query一起构成最终查询的前缀。由于指定的查询将作为FROM子句中的子查询括起来,而且有些数据库不支持子查询中的所有子句,因此prepareQuery属性提供了一种运行此类复杂查询的方法。作为示例,spark将向JDBC Source发出如下形式的查询。 <prepareQuery> SELECT <columns> FROM (<user_specified_query>) spark_gen_alias。 下面是几个例子。1. MSSQL Server不接受子查询中的WITH子句,但可以将这样的查询拆分为prepareQuery和query: spark.read.format(“jdbc”).option(“url”, jdbcUrl).option(“prepareQuery”, “WITH t AS (SELECT x, y FROM tbl)”).option(“query”, “SELECT * FROM t WHERE x > 10”).load() 2. MSSQL Server不接受子查询中的临时表子句,但可以将这样的查询拆分为prepareQuery和query: spark.read.format(“jdbc”).option(“url”, jdbcUrl).option(“prepareQuery”, “(SELECT * INTO #TempTable FROM (SELECT * FROM tbl) t)”).option(“query”, “SELECT * FROM #TempTable”).load()read/write
driver(none)用于连接到此URL的JDBC驱动程序的类名。read/write
partitionColumn, lowerBound, upperBound(none)如果指定了其中任何一个选项,则必须指定所有这些选项。此外,还必须指定numPartitions。它们描述了在从多个workers并行读取数据时如何对表进行分区。partitionColumn必须是相关表中的数字列、日期列或时间戳列。注意,lowerBound和upperBound仅用于决定分区步长(stride),而不是用于过滤表中的行。因此表中的所有行都将被分区并返回。此选项仅适用于读取。例子: spark.read.format(“jdbc”).option(“url”, jdbcUrl).option(“dbtable”, “(select c1, c2 from t1) as subq”).option(“partitionColumn”, “c1”).option(“lowerBound”, “1”).option(“upperBound”, “100”).option(“numPartitions”, “3”).load()read
numPartitions(none)在表读取和写入过程中可用于并行的最大分区数。这也决定了并发JDBC连接的最大数量。如果要写入的分区数超过了这个限制,我们会在写入之前通过调用coalesce(numPartitions)将其减少到这个限制。read/write
queryTimeout0驱动程序将等待语句对象执行的秒数到给定的秒数。零表示没有限制。在写路径中,这个选项取决于JDBC驱动程序如何实现API setQueryTimeout,例如,h2 JDBC驱动程序检查每个查询的超时,而不是整个JDBC batch。read/write
fetchsize0JDBC获取大小,它决定每次往返要获取多少行。这可以帮助JDBC驱动程序的性能,默认为低读取大小(例如Oracle的10行)。read
batchsize1000JDBC批处理大小,它决定每次往返要插入多少行。这有助于提高JDBC驱动程序的性能。此选项仅适用于写入。write
isolationLevelREAD_UNCOMMITTED事务隔离级别,适用于当前连接。它可以是NONE、READ_COMMITTED、READ_UNCOMMITTED、REPEATABLE_READ或SERIALIZABLE中的一个,对应于JDBC的Connection对象定义的标准事务隔离级别,默认为READ_UNCOMMITTED。请参考java.sql.Connection中的文档。write
sessionInitStatement(none)在向远程DB打开每个数据库会话之后,在开始读取数据之前,该选项执行自定义SQL语句(或PL/SQL块)。使用它来实现会话初始化代码。例子: option(“sessionInitStatement”, “”“BEGIN execute immediate ‘alter session set “_serial_direct_read”=true’; END;”“”)read
truncatefalse这是一个与JDBC writer 相关的选项。当启用“SaveMode.Overwrite”时,此选项会导致Spark截断现有表,而不是删除并重新创建它。这可以提高效率,并防止删除表元数据(如索引)。然而,在某些情况下,例如当新数据具有不同的schema时,它将不起作用。若出现故障,用户应关闭truncate选项以再次使用DROP TABLE。此外,由于TRUNCATE TABLE在DBMS之间的行为不同,使用它并不总是安全的。MySQLDialect、DB2Dialect、MsSqlServerDialect、DerbyDialect和OracleDialect支持此功能,而PostgresDialect和默认JDBCDirect不支持。对于未知和不受支持的JDBCDirect,用户选项truncate将被忽略。write
cascadeTruncate所讨论的JDBC数据库的默认级联truncate行为,在每个JDBCDialect中的isCascadeTruncate中指定这是一个与JDBC writer相关的选项。如果JDBC数据库(目前是PostgreSQL 和Oracle )启用并支持此选项,则允许执行TRUNCATE TABLE t CASCADE(在PostgreSQL 的情况下,执行TRUNCATE TABLE ONLY t CASCADE以防止无意中truncate descendant表)。这将影响其他表,因此应谨慎使用。write
createTableOptions这是一个与JDBC writer相关的选项。如果指定,此选项允许在创建表时设置特定于数据库的表和分区选项(例如, CREATE TABLE t (name string) ENGINE=InnoDB.).write
createTableColumnTypes(none)创建表时要使用的数据库列数据类型,而不是默认值。数据类型信息应该以与CREATE TABLE列语法相同的格式指定(例如:“name CHAR(64), comments VARCHAR(1024)”)。指定的类型应该是有效的spark sql数据类型。write
customSchema(none)用于从JDBC连接器读取数据的自定义schema。例如:id DECIMAL(38, 0), name STRING。你还可以指定部分字段,其他字段使用默认类型映射。例如,“id DECIMAL(38, 0)”。列名应该与JDBC表的相应列名相同。用户可以指定相应的Spark SQL数据类型,而不是使用默认值。read
pushDownPredicatetrue启用或禁用谓词下推到JDBC数据源的选项。默认值为true,在这种情况下,Spark将尽可能地将filters下推到JDBC数据源。否则,如果设置为false,则不会将任何筛选器下推到JDBC数据源,因此所有filters都将由Spark处理。当Spark执行谓词过滤的速度比JDBC数据源快时,通常关闭谓词下推。read
pushDownAggregatetrue在V2 JDBC数据源中启用或禁用聚合下推的选项。默认值为true,在这种情况下,Spark将下推聚合(aggregates)到JDBC数据源。否则,如果设置为false,聚合将不会下推到JDBC数据源。当Spark执行聚合的速度比JDBC数据源更快时,聚合下推通常会关闭。请注意,当且仅当所有聚合函数和相关过滤器都可以向下推时,聚合才能下推。如果numPartitions等于1,或者group by key与partitionColumn相同,Spark将完全下推聚合到数据源,而不会对数据源输出应用最终聚合。否则,Spark将对数据源输出应用最终聚合。read
pushDownLimittrue启用或禁用LIMIT下推到V2 JDBC数据源的选项。LIMIT下推还包括LIMIT + SORT,也称为Top N运算符。默认值为true,在这种情况下,Spark将LIMIT或LIMIT+SORT一起下推到JDBC数据源。否则,如果设置为false,则LIMIT或LIMIT + SORT不会下推到JDBC数据源。如果numPartitions大于1,即使LIMIT或LIMIT with SORT被下推,Spark仍会对数据源的结果应用LIMIT 或LIMIT +SORT。否则,如果LIMIT或LIMIT+ SORT被下推,并且numPartitions等于1,Spark将不会对数据源的结果应用LIMIT 或LIMIT +SORT。read
pushDownOffsettrue启用或禁用OFFSET下推到V2 JDBC数据源的选项。默认值为true,在这种情况下,Spark将把OFFSET下推到JDBC数据源。否则,如果设置为false, Spark将不会尝试将OFFSET下推到JDBC数据源。如果pushDownOffset为true并且numPartitions等于1,OFFSET将被下推到JDBC数据源。否则,OFFSET不会被下推,Spark仍然对数据源的结果应用OFFSET。read
pushDownTableSampletrue启用或禁用TABLESAMPLE下推到V2 JDBC数据源的选项。默认值为true,在这种情况下,Spark将TABLESAMPLE下推到JDBC数据源。否则,如果该值设置为false,则不会将TABLESAMPLE下推到JDBC数据源。read
keytab(none)JDBC客户端的kerberos keytab文件的位置(必须通过spark-submit的–files选项或手动将其预先上传到所有节点)。当找到路径信息时,Spark认为keytab是手动分布的,否则假定为–files。如果同时定义了keytab和principal,那么Spark将尝试进行kerberos身份验证。read/write
principal(none)为JDBC客户端指定kerberos principal名称。如果同时定义了keytab和principal,那么Spark将尝试进行kerberos身份验证。read/write
refreshKrb5Configfalse此选项控制在建立新连接之前是否刷新JDBC客户端的kerberos配置。如果要刷新配置,请设置为true,否则设置为false。默认值为false。请注意,如果将此选项设置为true并尝试建立多个连接,则可能会出现争用情况。一种可能的情况如下。1.refreshKrb5Config标志被设置为具有安全上下文1。2.JDBC连接provider用于相应的DBMS。3.修改了krb5.conf,但JVM还没有意识到必须重新加载它。4.Spark成功验证安全上下文1。5.JVM从修改后的krb5.conf中加载安全上下文2。6.Spark恢复之前保存的安全上下文1。7.修改后的krb5.conf内容消失了read/write
connectionProvider(none)用于连接到此URL的JDBC连接provider的名称,例如db2、mssql。必须是加载了JDBC数据源的providers之一。当多个provider可以处理指定的驱动程序和选项时,用于消除歧义。所选的provider不能被spark.sql.sources.disabledJdbcConnProviderList禁用。read/write
preferTimestampNTZfalse当该选项设置为true时,所有时间戳都推断为TIMESTAMP WITHOUT TIME ZONE。否则,时间戳将被读取为带有本地时区的TIMESTAMP。read

请注意,JDBC驱动程序并不总是支持使用keytab的kerberos身份验证。
在使用keytab和principal配置选项之前,请确保满足以下要求:

  • 所包含的JDBC驱动程序版本支持使用keytab进行kerberos身份验证。
  • 有一个内置的连接provider,它支持所使用的数据库。
    以下数据库有一个内置的连接provider:
  • DB2
  • MariaDB
  • MS Sql
  • Oracle
  • PostgreSQL
    如果不满足要求,请考虑使用JdbcConnectionProvider developer API来处理自定义身份验证。
# Note: JDBC loading and saving can be achieved via either the load/save or jdbc methods
# Loading data from a JDBC source
jdbcDF = spark.read \.format("jdbc") \.option("url", "jdbc:postgresql:dbserver") \.option("dbtable", "schema.tablename") \.option("user", "username") \.option("password", "password") \.load()jdbcDF2 = spark.read \.jdbc("jdbc:postgresql:dbserver", "schema.tablename",properties={"user": "username", "password": "password"})# Specifying dataframe column data types on read
jdbcDF3 = spark.read \.format("jdbc") \.option("url", "jdbc:postgresql:dbserver") \.option("dbtable", "schema.tablename") \.option("user", "username") \.option("password", "password") \.option("customSchema", "id DECIMAL(38, 0), name STRING") \.load()# Saving data to a JDBC source
jdbcDF.write \.format("jdbc") \.option("url", "jdbc:postgresql:dbserver") \.option("dbtable", "schema.tablename") \.option("user", "username") \.option("password", "password") \.save()jdbcDF2.write \.jdbc("jdbc:postgresql:dbserver", "schema.tablename",properties={"user": "username", "password": "password"})# Specifying create table column data types on write
jdbcDF.write \.option("createTableColumnTypes", "name CHAR(64), comments VARCHAR(1024)") \.jdbc("jdbc:postgresql:dbserver", "schema.tablename",properties={"user": "username", "password": "password"})

在Spark repo中的“examples/src/main/python/sql/datasource.py”中找到完整的示例代码。

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com