欢迎来到尧图网

客户服务 关于我们

您的位置:首页 > 科技 > IT业 > 【数据同步】SeaTunnel初体验,5000字深入浅出带你用上Oracle-CDC

【数据同步】SeaTunnel初体验,5000字深入浅出带你用上Oracle-CDC

2024/11/30 12:27:23 来源:https://blog.csdn.net/weixin_54625990/article/details/141393473  浏览:    关键词:【数据同步】SeaTunnel初体验,5000字深入浅出带你用上Oracle-CDC

Apache SeaTunnel 是啥?
下一代高性能、分布式、海量数据集成框架。
支持上百个数据源、传输速度快、准确率高,丰富易扩展的连接器和插件化的连接器设计,能够更轻松的运行复杂的集成。
是一个分布式、高性能的数据集成平台,用于数据迁移和实时流处理任务。

JDK下载安装

需安装Java (Java 8 或 11, 其他高于Java 8的版本理论上也可以工作) 以及设置 JAVA_HOME

本次选择了安装8u381版本,大部分的bug有得到修复,JDK下载页地址: https://www.oracle.com/java/technologies/javase/javase8u211-later-archive-downloads.html

# 解压
tar xvf jdk-8u381-linux-x64.tar.gz
# 配置环境变量,添加2行
cat ~/.bash_profileexport JAVA_HOME="/data/jdk-8u381-linux-x64"
export PATH=$PATH:$JAVA_HOME/bin# 环境变量生效
source ~/.bash_profile# 验证
java -versionjava version "1.8.0_381"
Java(TM) SE Runtime Environment (build 1.8.0_381-b09)
Java HotSpot(TM) 64-Bit Server VM (build 25.381-b09, mixed mode)

SeaTunnel 下载安装

其实安装很简单,下载解压,开箱即用,截止作者撰写本文的时候,最新版本2.3.5,所以本文的下载链接均为2.3.5版本,目前社区最新版本是2.3.7

官网下载安装包:[bin] apache-seatunnel-2.3.5-bin.tar.gz

参考文档

  • https://seatunnel.apache.org/zh-CN/docs/2.3.5/start-v2/locally/deployment
    • https://seatunnel.apache.org/zh-CN/docs/2.3.5/seatunnel-engine/deployment

下载及解压

file

cd /data/
export version="2.3.5"
wget "https://archive.apache.org/dist/seatunnel/${version}/apache-seatunnel-${version}-bin.tar.gz"
tar -xzvf "apache-seatunnel-${version}-bin.tar.gz"

配置环境变量

cat  /etc/profile.d/seatunnel.shexport SEATUNNEL_HOME=/data/apache-seatunnel-2.3.5
export PATH=$PATH:$SEATUNNEL_HOME/bin# 环境变量生效
source /etc/profile.d/seatunnel.sh

安装连接器

配置插件文件(按需)

连接器数据量很多,全部下载需要时间及耗费存储空间,本次仅添加了Oracle-cdc 和一些常用的连接器,配置文件如下

cat  config/plugin_config--connectors-v2--
connector-cdc-mysql
connector-cdc-mongodb
connector-cdc-oracle
connector-elasticsearch
connector-http-wechat
connector-jdbc
connector-kafka
connector-mongodb
connector-redis
--end--

自动安装连接器插件(指定版本)

这里跟大家讲一下,从2.2.0-beta版本开始,二进制包不再默认提供连接器依赖,第一次使用需执行如下命令安装连接器。

若手动安装,安装包地址:https://repo.maven.apache.org/maven2/org/apache/seatunnel/

# 2.3.5 为当前安装的版本
sh bin/install-plugin.sh 2.3.5

安装比较耗时,配置的越多越耗时,如下展示部分日志

Install SeaTunnel connectors plugins, usage version is 2.3.5
install connector :  connector-cdc-oracle
[INFO] Scanning for projects...
[INFO]
[INFO] ------------------< org.apache.maven:standalone-pom >-------------------
[INFO] Building Maven Stub Project (No POM) 1
[INFO] --------------------------------[ pom ]---------------------------------
[INFO]
[INFO] --- maven-dependency-plugin:2.8:get (default-cli) @ standalone-pom ---
[WARNING] The artifact xml-apis:xml-apis:jar:2.0.2 has been relocated to xml-apis:xml-apis:jar:1.0.b2
[INFO] Resolving org.apache.seatunnel:connector-cdc-oracle:jar:2.3.5 with transitive dependencies
Downloading from central: https://repo.maven.apache.org/maven2/org/apache/seatunnel/connector-cdc-oracle/2.3.5/connector-cdc-oracle-2.3.5.pom
Downloaded from central: https://repo.maven.apache.org/maven2/org/apache/seatunnel/connector-cdc-oracle/2.3.5/connector-cdc-oracle-2.3.5.pom (3.2 kB at 2.3 kB/s)
Downloading from central: https://repo.maven.apache.org/maven2/org/apache/seatunnel/connector-cdc-oracle/2.3.5/connector-cdc-oracle-2.3.5.jar
Downloaded from central: https://repo.maven.apache.org/maven2/org/apache/seatunnel/connector-cdc-oracle/2.3.5/connector-cdc-oracle-2.3.5.jar (31 MB at 411 kB/s)
[WARNING] destination/dest parameter is deprecated: it will disappear in future version.
[INFO] Copying /root/.m2/repository/org/apache/seatunnel/connector-cdc-oracle/2.3.5/connector-cdc-oracle-2.3.5.jar to /data/apache-seatunnel-2.3.5/connectors
[WARNING] Notice transitive dependencies won't be copied.
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time:  01:17 min
[INFO] Finished at: 2024-07-25T11:34:52+08:00
[INFO] ------------------------------------------------------------------------## 过程省略一万字。。 ##

安装完成后,连接器存放目录为: $SEATUNNEL_HOME/connectors/ 【重要】

安装完成后,检查连接器安装情况

file

ORACLE CDC连接测试

这里以11G为例:

安装JDBC驱动

下载地址: https://mvnrepository.com/artifact/com.oracle.database.jdbc/ojdbc8/12.2.0.1

file

JDBC驱动下载后存放目录:$SEATUNNEL_HOME/lib

源库相关配置

这里需要重启,生产环境谨慎操作!!

开启归档及补充日志(注意查看磁盘空间,避免爆盘事件)

ALTER DATABASE ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;
alter system set log_archive_format='ARC%S_%R.%T.arc';
shutdown immediate
startup mount;
alter database archivelog;
alter database open;

源库创建表空间、用户,及授权

-- 创建用户及表空间
CREATE TABLESPACE logminer_tbs DATAFILE '/mnt/oracle/oradata/logminer_tbs.dbf' SIZE 25M AUTOEXTEND ON NEXT 512M MAXSIZE 32767M;
CREATE USER LOGMINER_USER IDENTIFIED BY Vir123 DEFAULT TABLESPACE logminer_tbs QUOTA UNLIMITED ON logminer_tbs;-- 给用户授权
GRANT CREATE SESSION TO logminer_user;
GRANT SELECT ON V_$DATABASE to logminer_user;
GRANT SELECT ON V_$LOG TO logminer_user;
GRANT SELECT ON V_$LOGFILE TO logminer_user;
GRANT SELECT ON V_$LOGMNR_LOGS TO logminer_user;
GRANT SELECT ON V_$LOGMNR_CONTENTS TO logminer_user;
GRANT SELECT ON V_$ARCHIVED_LOG TO logminer_user;
GRANT SELECT ON V_$ARCHIVE_DEST_STATUS TO logminer_user;
GRANT EXECUTE ON DBMS_LOGMNR TO logminer_user;
GRANT EXECUTE ON DBMS_LOGMNR_D TO logminer_user;-- 授权表权限
GRANT SELECT ANY TABLE TO logminer_user;
GRANT ANALYZE ANY TO logminer_user;-- 增加权限,解决权限不足问题
GRANT SELECT ANY TRANSACTION TO logminer_user; 
GRANT SELECT ON V_$TRANSACTION TO logminer_user;

SeaTunnel 增加作业配置文件

配置文件目录: $SEATUNNEL_HOME/config

cd $SEATUNNEL_HOME
cat config/oracdc_consoleenv {# You can set engine configuration hereparallelism = 1job.mode = "STREAMING"checkpoint.interval = 5000
}source {# This is a example source plugin **only for test and demonstrate the feature source plugin**Oracle-CDC {driver = "oracle.jdbc.driver.OracleDriver"result_table_name = "customers"username = "LOGMINER_USER"password = "Vir123"database-names = ["VIRDB"]schema-names = ["VUSER"]table-names = ["VIRDB.VUSER.CT_DB_LABORATORY_BAK"]base-url = "jdbc:oracle:thin:@10.10.10.10:1521/VIRDB"source.reader.close.timeout = 120000connection.pool.size = 1}
}transform {
}sink {Console {source_table_name = "customers"}
}

本地启动作业,查看日志情况

sh ./bin/seatunnel.sh -e local --config ./config/oracdc_console

file

源端插入一条数据后,日志更新

file

汇总信息可以看到,总数从11条变成12条

file

常见问题

作业启动报错

报错信息为:Unable to create a source for identifier 'Jdbc'

===============================================================================
2024-07-24 14:38:08,672 ERROR [o.a.s.c.s.SeaTunnel           ] [main] - Fatal Error,2024-07-24 14:38:08,673 ERROR [o.a.s.c.s.SeaTunnel           ] [main] - Please submit bug report in https://github.com/apache/seatunnel/issues2024-07-24 14:38:08,673 ERROR [o.a.s.c.s.SeaTunnel           ] [main] - Reason:SeaTunnel job executed failed2024-07-24 14:38:08,674 ERROR [o.a.s.c.s.SeaTunnel           ] [main] - Exception StackTrace:org.apache.seatunnel.core.starter.exception.CommandExecuteException: SeaTunnel job executed failedat org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand.execute(ClientExecuteCommand.java:202)at org.apache.seatunnel.core.starter.SeaTunnel.run(SeaTunnel.java:40)at org.apache.seatunnel.core.starter.seatunnel.SeaTunnelClient.main(SeaTunnelClient.java:34)
Caused by: org.apache.seatunnel.api.table.factory.FactoryException: ErrorCode:[API-06], ErrorDescription:[Factory initialize failed] - Unable to create a source for identifier 'Jdbc'.at org.apache.seatunnel.api.table.factory.FactoryUtil.createAndPrepareSource(FactoryUtil.java:100)at org.apache.seatunnel.engine.core.parse.MultipleTableJobConfigParser.parseSource(MultipleTableJobConfigParser.java:332)at org.apache.seatunnel.engine.core.parse.MultipleTableJobConfigParser.parse(MultipleTableJobConfigParser.java:188)at org.apache.seatunnel.engine.client.job.ClientJobExecutionEnvironment.getLogicalDag(ClientJobExecutionEnvironment.java:88)at org.apache.seatunnel.engine.client.job.ClientJobExecutionEnvironment.execute(ClientJobExecutionEnvironment.java:156)at org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand.execute(ClientExecuteCommand.java:149)... 2 more
Caused by: org.apache.seatunnel.api.table.catalog.exception.CatalogException: ErrorCode:[API-03], ErrorDescription:[Catalog initialize failed] - Failed connecting to jdbc:oracle:thin:@10.10.100.140:1521:jszx via JDBC.at org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.AbstractJdbcCatalog.getConnection(AbstractJdbcCatalog.java:123)at org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.AbstractJdbcCatalog.open(AbstractJdbcCatalog.java:129)at org.apache.seatunnel.connectors.seatunnel.jdbc.utils.JdbcCatalogUtils.getTables(JdbcCatalogUtils.java:78)at org.apache.seatunnel.connectors.seatunnel.jdbc.source.JdbcSource.<init>(JdbcSource.java:57)at org.apache.seatunnel.connectors.seatunnel.jdbc.source.JdbcSourceFactory.lambda$createSource$0(JdbcSourceFactory.java:78)at org.apache.seatunnel.api.table.factory.FactoryUtil.createAndPrepareSource(FactoryUtil.java:112)at org.apache.seatunnel.api.table.factory.FactoryUtil.createAndPrepareSource(FactoryUtil.java:73)... 7 more
Caused by: java.sql.SQLException: No suitable driver found for jdbc:oracle:thin:@10.10.100.140:1521:jszxat java.sql.DriverManager.getConnection(DriverManager.java:689)at java.sql.DriverManager.getConnection(DriverManager.java:247)at org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.AbstractJdbcCatalog.getConnection(AbstractJdbcCatalog.java:119)... 13 more

解决方法:

(1) 驱动配置错误

正确的应该是oracle.jdbc.driver.OracleDriver 而非oracle.jdbc.OracleDriver

(2) 驱动错误问题

之前用了ojdbc14-10.2.0.3.0.jar 支持不了,重新下载了JDBC的驱动,如上文提到的“安装Jdbc驱动”

找不到ORACLE-CDC 驱动

===============================================================================2024-07-24 17:15:37,657 ERROR [o.a.s.c.s.SeaTunnel           ] [main] - Fatal Error,2024-07-24 17:15:37,657 ERROR [o.a.s.c.s.SeaTunnel           ] [main] - Please submit bug report in https://github.com/apache/seatunnel/issues2024-07-24 17:15:37,657 ERROR [o.a.s.c.s.SeaTunnel           ] [main] - Reason:SeaTunnel job executed failed2024-07-24 17:15:37,661 ERROR [o.a.s.c.s.SeaTunnel           ] [main] - Exception StackTrace:org.apache.seatunnel.core.starter.exception.CommandExecuteException: SeaTunnel job executed failedat org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand.execute(ClientExecuteCommand.java:202)at org.apache.seatunnel.core.starter.SeaTunnel.run(SeaTunnel.java:40)at org.apache.seatunnel.core.starter.seatunnel.SeaTunnelClient.main(SeaTunnelClient.java:34)
Caused by: java.lang.RuntimeException: Plugin PluginIdentifier{engineType='seatunnel', pluginType='source', pluginName='Oracle-CDC'} not found.at org.apache.seatunnel.plugin.discovery.AbstractPluginDiscovery.createPluginInstance(AbstractPluginDiscovery.java:234)at org.apache.seatunnel.engine.core.parse.ConnectorInstanceLoader.loadSourceInstance(ConnectorInstanceLoader.java:61)at org.apache.seatunnel.engine.core.parse.JobConfigParser.parseSource(JobConfigParser.java:77)at org.apache.seatunnel.engine.core.parse.MultipleTableJobConfigParser.parseSource(MultipleTableJobConfigParser.java:327)at org.apache.seatunnel.engine.core.parse.MultipleTableJobConfigParser.parse(MultipleTableJobConfigParser.java:188)at org.apache.seatunnel.engine.client.job.ClientJobExecutionEnvironment.getLogicalDag(ClientJobExecutionEnvironment.java:88)at org.apache.seatunnel.engine.client.job.ClientJobExecutionEnvironment.execute(ClientJobExecutionEnvironment.java:156)at org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand.execute(ClientExecuteCommand.java:149)... 2 more2024-07-24 17:15:37,661 ERROR [o.a.s.c.s.SeaTunnel           ] [main] -

解决方法:

重点关注

“Caused by: java.lang.RuntimeException: Plugin PluginIdentifier{engineType='seatunnel', pluginType='source', pluginName='Oracle-CDC'} not found.”

报错找不到ORACLE-CDC ,可以通过配置文件 $SENTUNNEL_HOME/connectors/plugin-mapping.properties,查看当前支持的连接器名称,再配置后进行安装!

cat $SENTUNNEL_HOME/config/plugin_config--connectors-v2--
connector-cdc-oracle
--end--再次运行连接器插件安装
sh bin/install-plugin.sh 2.3.5

权限问题

ORA-01031: insufficient privileges

===============================================================================Exception in thread "main" org.apache.seatunnel.core.starter.exception.CommandExecuteException: SeaTunnel job executed failedat org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand.execute(ClientExecuteCommand.java:202)at org.apache.seatunnel.core.starter.SeaTunnel.run(SeaTunnel.java:40)at org.apache.seatunnel.core.starter.seatunnel.SeaTunnelClient.main(SeaTunnelClient.java:34)
Caused by: org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException: java.lang.RuntimeException: One or more fetch                       ers have encountered exceptionat org.apache.seatunnel.connectors.seatunnel.common.source.reader.fetcher.SplitFetcherManager.checkErrors(SplitFetcherM                       anager.java:147)at org.apache.seatunnel.connectors.seatunnel.common.source.reader.SourceReaderBase.getNextFetch(SourceReaderBase.java:1                       60)at org.apache.seatunnel.connectors.seatunnel.common.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:93)at org.apache.seatunnel.connectors.cdc.base.source.reader.IncrementalSourceReader.pollNext(IncrementalSourceReader.java                       :111)at org.apache.seatunnel.engine.server.task.flow.SourceFlowLifeCycle.collect(SourceFlowLifeCycle.java:156)at org.apache.seatunnel.engine.server.task.SourceSeaTunnelTask.collect(SourceSeaTunnelTask.java:116)at org.apache.seatunnel.engine.server.task.SeaTunnelTask.stateProcess(SeaTunnelTask.java:168)at org.apache.seatunnel.engine.server.task.SourceSeaTunnelTask.call(SourceSeaTunnelTask.java:121)at org.apache.seatunnel.engine.server.TaskExecutionService$BlockingWorker.run(TaskExecutionService.java:703)at org.apache.seatunnel.engine.server.TaskExecutionService$NamedTaskWrapper.run(TaskExecutionService.java:1004)at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)at java.util.concurrent.FutureTask.run(FutureTask.java:266)at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)at java.lang.Thread.run(Thread.java:750)
Caused by: java.lang.RuntimeException: SplitFetcher thread 0 received unexpected exception while polling the recordsat org.apache.seatunnel.connectors.seatunnel.common.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:165)at org.apache.seatunnel.connectors.seatunnel.common.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:81)... 5 more
Caused by: org.apache.kafka.connect.errors.ConnectException: An exception occurred in the change event producer. This connector                        will be stopped.at io.debezium.pipeline.ErrorHandler.setProducerThrowable(ErrorHandler.java:42)at io.debezium.connector.oracle.logminer.LogMinerStreamingChangeEventSource.execute(LogMinerStreamingChangeEventSource.                       java:341)at org.apache.seatunnel.connectors.seatunnel.cdc.oracle.source.reader.fetch.logminer.OracleRedoLogFetchTask$RedoLogSpli                       tReadTask.execute(OracleRedoLogFetchTask.java:130)at org.apache.seatunnel.connectors.seatunnel.cdc.oracle.source.reader.fetch.logminer.OracleRedoLogFetchTask.execute(Ora                       cleRedoLogFetchTask.java:73)at org.apache.seatunnel.connectors.cdc.base.source.reader.external.IncrementalSourceStreamFetcher.lambda$submitTask$0(I                       ncrementalSourceStreamFetcher.java:106)... 5 more
Caused by: java.sql.SQLSyntaxErrorException: ORA-01031: insufficient privilegesat oracle.jdbc.driver.T4CTTIoer11.processError(T4CTTIoer11.java:494)at oracle.jdbc.driver.T4CTTIoer11.processError(T4CTTIoer11.java:446)at oracle.jdbc.driver.T4C8Oall.processError(T4C8Oall.java:1054)at oracle.jdbc.driver.T4CTTIfun.receive(T4CTTIfun.java:623)at oracle.jdbc.driver.T4CTTIfun.doRPC(T4CTTIfun.java:252)at oracle.jdbc.driver.T4C8Oall.doOALL(T4C8Oall.java:612)at oracle.jdbc.driver.T4CPreparedStatement.doOall8(T4CPreparedStatement.java:226)at oracle.jdbc.driver.T4CPreparedStatement.doOall8(T4CPreparedStatement.java:59)at oracle.jdbc.driver.T4CPreparedStatement.executeForDescribe(T4CPreparedStatement.java:747)at oracle.jdbc.driver.OracleStatement.executeMaybeDescribe(OracleStatement.java:904)at oracle.jdbc.driver.OracleStatement.doExecuteWithTimeout(OracleStatement.java:1082)at oracle.jdbc.driver.OraclePreparedStatement.executeInternal(OraclePreparedStatement.java:3780)at oracle.jdbc.driver.T4CPreparedStatement.executeInternal(T4CPreparedStatement.java:1343)at oracle.jdbc.driver.OraclePreparedStatement.executeQuery(OraclePreparedStatement.java:3822)at oracle.jdbc.driver.OraclePreparedStatementWrapper.executeQuery(OraclePreparedStatementWrapper.java:1165)at io.debezium.connector.oracle.logminer.LogMinerStreamingChangeEventSource.execute(LogMinerStreamingChangeEventSource.                       java:291)... 8 more

解决方法: 增加TRANSACTION的权限

GRANT SELECT ANY TRANSACTION TO logminer_user; 
GRANT SELECT ON V_$TRANSACTION TO logminer_user;

总的来说,SeaTunnel的安装配置还是很简单的,如果你是初次接触,出现了各种文档不熟悉,各部件不熟悉漏了下载的情况,以及各个JAR包、Connector的位置存放以及配置混乱的等问题,可以通过社区Github、邮件列表及微信社群来进行解决,包括查看社区的文章及参加社区的活动!

最后也感谢Apache SeaTunnel社群支持的小伙伴们!

参考文档
  • 下载地址:https://seatunnel.apache.org/zh-CN/download
  • 文档参考:https://seatunnel.apache.org/zh-CN/docs/2.3.5/connector-v2/source/Oracle-CDC
  • 源码查询: https://github.com/apache/seatunnel/tree/dev/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-oracle-e2e
  • issue记录: https://github.com/apache/seatunnel/issues/6799

本文由 白鲸开源科技 提供发布支持!

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com