CentOS安装maxwell

发布时间:2024年01月17日

一、简介

????????maxwell是由美国Zendesk公司开源,它通过读取mysql的binlog日志,将数据变更以JSON的方式发送给Kafka, Kinesis等流数据处理平台。
????????This is Maxwell’s daemon, a change data capture application that reads MySQL binlogs and writes data changes as JSON to Kafka, Kinesis, and other streaming platforms.

官网地址
源码地址

二、准备工作

本次实现mysql数据同步给kafka,所以mysql与kafka需要提前准备好。
mysql表结构如下:

CREATE TABLE `user_info` (
  `id` int NOT NULL AUTO_INCREMENT,
  `user_id` int NOT NULL,
  `username` varchar(255) NOT NULL,
  `email` varchar(255) NOT NULL,
  `phone_number` varchar(30) DEFAULT NULL,
  `status` enum('active','inactive') DEFAULT 'active',
  `score` int unsigned DEFAULT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=35464335 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci

特别注意:需要启用mysql的binlog

vim /etc/my.cnf

增加如下内容:

#数据库id
server-id = 1
#该参数的值会作为binlog的文件名
log-bin=mysql-bin
#binlog类型,maxwell要求为row类型
binlog_format=row

三、安装

本次使用版本:V1.29.2
注:maxwell-1.30.0及以上版本不再支持JDK1.8

1、下载安装包

下载安装包V1.29.2

2、解压

tar -zxvf maxwell-1.29.2.tar.gz

3、编写配置文件

在解压目录下给了一个示例文件config.properties.example

cp config.properties.example config.properties

编辑内容参考如下:

# tl;dr config
log_level=info

#maxwell同步数据的去向,支持stdout|file|kafka|kinesis|pubsub|sqs|rabbitmq|redis等
producer=kafka
kafka.bootstrap.servers=hadoop001:9092,hadoop002:9092,hadoop003:9092
#kafka的topic如下是动态配置
kafka_topic=%{database}_%{table}

# mysql login info
host=***********
port=3306
user=root
password=************
jdbc_options=useSSL=false&serverTimezone=Asia/Shanghai&useUnicode=true&characterEncoding=utf8

4、启动maxwell

进入解压目录执行如下命令:

./bin/maxwell --config ./config.properties --daemon

查看日志已启动成功:
在这里插入图片描述

5、验证

启动一个消费者,在数据库操作数据,然后观察kafka

kafka-console-consumer.sh --bootstrap-server hadoop001:9092 --topic hadoop_user_info

在这里插入图片描述

6、停止maxwell

ps -ef | grep maxwell | grep -v grep | grep maxwell | awk '{print $2}' | xargs kill -9

四、说明

maxwell同步输出格式如下:

1、更新数据

UPDATE `hadoop`.`user_info` SET `id` = '148254', `user_id` = '8321174' WHERE (`id` = '1482');
{
	"database": "hadoop",
	"table": "user_info",
	"type": "update",
	"ts": 1705314481,
	"xid": 10903,
	"commit": true,
	"data": {
		"id": 148254,
		"user_id": 8321174,
		"username": "batesanthony",
		"email": "justin62@example.com",
		"phone_number": "+1-982-342-3093x988",
		"status": "inactive",
		"score": 99
	},
	"old": {
		"id": 1482,
		"user_id": 832117
	}
}

2、插入数据

INSERT INTO `hadoop`.`user_info` (`id`, `user_id`, `username`, `email`, `phone_number`, `status`, `score`) VALUES ('14832247', '57377145', 'joseph90', 'tbarnett@example.net', '295-683-4540x37958', 'active', '100');
{
	"database": "hadoop",
	"table": "user_info",
	"type": "insert",
	"ts": 1705314503,
	"xid": 10966,
	"commit": true,
	"data": {
		"id": 14832247,
		"user_id": 57377145,
		"username": "joseph90",
		"email": "tbarnett@example.net",
		"phone_number": "295-683-4540x37958",
		"status": "active",
		"score": 100
	}
}

3、删除数据

DELETE FROM `hadoop`.`user_info` WHERE (`id` = '1483');
{
	"database": "hadoop",
	"table": "user_info",
	"type": "delete",
	"ts": 1705314531,
	"xid": 11056,
	"commit": true,
	"data": {
		"id": 1483,
		"user_id": 573771,
		"username": "joseph90",
		"email": "tbarnett@example.net",
		"phone_number": "295-683-4540x37958",
		"status": "active",
		"score": 100
	}
}

JSON字段说明如下:

字段说明
database同步数据所属的数据库
table同步数据所属的数据库表
type数据变更的类型(insert、update、delete)
ts数据同步的时间戳
xid事务id
commit事务提交标志
data同步的具体数据属性与值
old在update类型中,表示变更的相关字段之前的值

五、遇到问题

  • java.lang.RuntimeException: error: unhandled character set ‘utf8mb3’
    在这里插入图片描述

解决该问题可修改源码,然后重新打包替换掉对应的jar即可。

详情可以参考这篇文章

在这里插入图片描述
在这里插入图片描述

在这里插入图片描述

文章来源:https://blog.csdn.net/qq_26709459/article/details/135608072
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。