目录
4、flinkCDC同步oracle至Apache Doris
?????????Apache Doris(以前称为Palo)是一个开源的大数据分析数据库项目,是由百度公司发起的一个分布式 SQL 数据仓库。它的设计目标是支持低延迟、高吞吐量的交互式 SQL 查询,可以用于实时报表、在线分析处理等场景。Apache Doris 提供了分布式的、可伸缩的架构,支持高并发的大规模数据查询和分析。它提供了高可靠性、容错性和高性能的特性,支持实时数据加载、提交、分析和查询。进一步地,Apache Doris 支持各种数据格式和数据源,包括结构化数据、半结构化数据和非结构化数据等。在 Apache Doris 中,用户可以使用标准的 SQL 查询语言来进行数据分析,从而更方便地进行数据探索和分析。它也提供了对 Hadoop 生态系统的深度集成,可以直接在 HDFS 上查询数据。
doris要求有java环境,请预先安装java,要求jdk8版本。
然后去官网下载最新版本:Download - Apache Doris
我这里下载apache-doris-2.0.3-bin-x64版本
先设置linux句柄数:
vi /etc/security/limits.conf
写入以下内容:
* soft nofile 65536
* hard nofile 65536
然后关闭系统交换分区:
swapoff -a
登录linux创建目录,然后把doris上传至该目录:
mkdir /usr/doris/
解压:
tar -zxvf apache-doris-2.0.3-bin-x64.tar.gz
修改FE配置文件:
vi /usr/doris/apache-doris-2.0.3-bin-x64/fe/conf/fe.conf
修改以下值:
priority_networks=ip/24 这个ip就是你本机的ip
修改BE配置文件:
vi /usr/doris/apache-doris-2.0.3-bin-x64/be/conf/be.conf
修改以下值:
priority_networks=ip/24 这个ip就是你本机的ip
启动FE:
/usr/doris/apache-doris-2.0.3-bin-x64/fe/bin/start_fe.sh --daemon
这个 --daemon是后台运行的意思。
输入jps命令查看是否启动成功,有显示DorisFE即是成功启动FE:
然后配置BE,在 FE 中添加所有 BE 节点
BE 节点需要先在 FE 中添加,才可加入集群。可以使用 mysql-client(下载MySQL 5.7) 连接到 FE:
./mysql-client -h fe_host -P query_port -uroot
其中 fe_host 为 FE 所在节点 ip;query_port 在 fe/conf/fe.conf 中的,默认是9030;默认使用 root 账户,无密码登录。登录后,执行以下命令来添加每一个 BE:
ALTER SYSTEM ADD BACKEND "be_host:heartbeat-service_port";
其中 be_host 为 BE 所在节点 ip;heartbeat_service_port 在 be/conf/be.conf 中
启动 BE
bin/start_be.sh --daemon
使用mysql-client查看BE状态:
SHOW PROC '/backends'\G;
?
jps查看FE、BE是否启动成功,看到有DorisFE、DorisBE即为启动成功。
首先oracle要开启日志归档,创建同步账号并授权,这个自行百度解决。
本次测试主要是通过?Flink Doris Connector来实现0代码同步oracle,Flink Doris Connector?可以支持通过 Flink 操作(读取、插入、修改、删除) Doris 中存储的数据。详情请参考官网:Flink Doris Connector - Apache Doris
开始同步前准备以下jar包,上传至flink/lib目录底下,然后重启flink:
1、flink-sql-connector-oracle-cdc-3.0.0.jar
2、flink-sql-connector-mysql-cdc-3.0.0.jar
3、mysql-connector-java-8.0.33.jar
4、flink-doris-connector-1.16-1.5.0.jar
如果不知道怎么下载依赖,可以找任意一个maven项目,在pom.xml中添加以下依赖,然后去找到你的maven目录找到conf/settings.xml配置的maven仓库目录中找到这些jar包就行。
<dependency>
<groupId>com.ververica</groupId>
<artifactId>flink-sql-connector-oracle-cdc</artifactId>
<version>3.0.0</version>
</dependency>
<dependency>
<groupId>com.ververica</groupId>
<artifactId>flink-sql-connector-mysql-cdc</artifactId>
<version>3.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.doris</groupId>
<artifactId>flink-doris-connector-1.16</artifactId>
<version>1.5.0</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.33</version>
</dependency>
现在开始操作。进入flink目录,假设flink安装在/usr/flink中,具体安装请看我的上一篇文章:初识flinkCDC。那么:
cd /usr/flink/flink-1.18.0
输入以下命令,按需求修改成你的信息:
bin/flink run \
-Dexecution.checkpointing.interval=10s \
-Dparallelism.default=1 \
-c org.apache.doris.flink.tools.cdc.CdcTools \
lib/flink-doris-connector-1.16-1.5.0.jar \
oracle-sync-database \
--database test_db \
--oracle-conf url=jdbc:oracle:thin:@oracle数据库IP:1521/数据库服务名 \
--oracle-conf username=有归档日志权限的oracle用户名 \
--oracle-conf password=有归档日志权限的oracle用户密码 \
--oracle-conf database-name=数据库 \
--oracle-conf schema-name=数据库schema \
--including-tables "table1|table2表示同步table1、table2两张表" \
--sink-conf fenodes=doris数据库IP:8030 \
--sink-conf username=root \
--sink-conf password= \
--sink-conf jdbc-url=jdbc:mysql://doris数据库IP:9030 \
--sink-conf sink.label-prefix=label \
--table-conf replication_num=1
看到有Job has been submitted with JobID 954809be83d2493466ae3e98f09587b5字样说明flink任务提交成功,接下来我们去flink web UI看看,状态为running即为即为成功。:
如果任务报错可以点击任务进入详情查看exception,根据提示修改错。
至此数据同步完成。可用navicat或者其他工具连接doris,就可以看到oracle数据实时同步到doris啦。