【Flink-Kafka-To-Mysql】使用 Flink 实现 Kafka 数据写入 Mysql(根据对应操作类型进行增、删、改操作)

发布时间:2023年12月25日

需求描述:

1、数据从 Kafka 写入 Mysql。

2、相关配置存放于 Mysql 中,通过 Mysql 进行动态读取。

3、此案例中的 Kafka 是进行了 Kerberos 安全认证的,如果不需要自行修改。

4、Kafka 数据为 Json 格式,获取到的数据根据操作类型字段进行增删改操作。

5、读取时使用自定义 Source,写入时使用自定义 Sink。

6、消费 Kafka 数据时自定义反序列化。

1)导入依赖

这里的依赖比较冗余,大家可以根据各自需求做删除或保留。

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>gaei.cn.x5l</groupId>
    <artifactId>x8vbusiness</artifactId>
    <version>1.0.0</version>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <target.java.version>1.8</target.java.version>
        <maven.compiler.source>${target.java.version}</maven.compiler.source>
        <maven.compiler.target>${target.java.version}</maven.compiler.target>

        <scala.binary.version>2.12</scala.binary.version>
        <scala.version>2.12.10</scala.version>
        <flink.version>1.14.0</flink.version>
        <log4j.version>2.17.2</log4j.version>
        <hadoop.version>3.1.2</hadoop.version>
        <hive.version>3.1.2</hive.version>

        <mongo.driver.version>3.12.6</mongo.driver.version>
        <mongo.driver.core.version>4.3.1</mongo.driver.core.version>

    </properties>
    <dependencies>
        <dependency>
            <groupId>com.ververica</groupId>
            <artifactId>flink-connector-mysql-cdc</artifactId>
            <version>2.3.0</version>
            <!--            <exclusions>-->
            <!--                <exclusion>-->
            <!--                    <groupId>mysql</groupId>-->
            <!--                    <artifactId>mysql-connector-java</artifactId>-->
            <!--                </exclusion>-->
            <!--            </exclusions>-->
        </dependency>

        <dependency>
            <groupId>redis.clients</groupId>
            <artifactId>jedis</artifactId>
            <version>2.9.0</version>
        </dependency>

        <!-- 基础依赖  开始-->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>
        <!-- 基础依赖  结束-->
        <!-- TABLE  开始-->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId>
            <version>1.14.0</version>
            <scope>provided</scope>
        </dependency>

        <!-- 使用 hive sql时注销,其他时候可以放开 -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-planner_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-common</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-cep_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <!-- TABLE  结束-->
        <!-- sql  开始-->
        <!-- sql解析 开始 -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-json</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-csv</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>
        <!-- sql解析 结束 -->
        <!-- sql连接 kafka -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-sql-connector-kafka_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <!-- sql  结束-->
        <!-- 检查点 -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-state-processor-api_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>


        <dependency>
            <groupId>commons-lang</groupId>
            <artifactId>commons-lang</artifactId>
            <version>2.5</version>
            <scope>compile</scope>
        </dependency>

        <!-- 本地监控任务 开始 -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-runtime-web_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>
        <!-- 本地监控任务 结束 -->
        <!-- DataStream 开始 -->
        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-slf4j-impl</artifactId>
            <version>${log4j.version}</version>
            <scope>runtime</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-api</artifactId>
            <version>${log4j.version}</version>
            <scope>runtime</scope>
        </dependency>

        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-core</artifactId>
            <version>${log4j.version}</version>
            <scope>runtime</scope>
        </dependency>
        <!-- hdfs -->
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>3.3.1</version>
            <exclusions>
                <exclusion>
                    <groupId>org.apache.avro</groupId>
                    <artifactId>avro</artifactId>
                </exclusion>
            </exclusions>
        </dependency>


        <!-- 重点,容易被忽略的jar -->
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-auth</artifactId>
            <version>${hadoop.version}</version>


        </dependency>
        <!-- rocksdb_2 -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-statebackend-rocksdb_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>

        <!-- 其他 -->
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.1.23</version>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.16.18</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.jyaml</groupId>
            <artifactId>jyaml</artifactId>
            <version>1.3</version>
        </dependency>


        <!-- TABLE  开始-->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
            <!--            <version>${flink.version}</version>-->
            <version>1.13.5</version>
            <scope>provided</scope>
        </dependency>


        <!-- TABLE  结束-->


        <dependency>
            <groupId>com.google.code.gson</groupId>
            <artifactId>gson</artifactId>
            <version>2.8.3</version>
        </dependency>

        <dependency>
            <groupId>com.ververica</groupId>
            <artifactId>flink-connector-mongodb-cdc</artifactId>
            <version>2.3.0</version>
        </dependency>

        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <!--            <version>5.1.44</version>-->
            <version>8.0.27</version>
            <scope>runtime</scope>
        </dependency>

        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>druid</artifactId>
            <version>1.2.8</version>
        </dependency>



        <dependency>
            <groupId>org.mongodb</groupId>
            <artifactId>bson</artifactId>
            <version>${mongo.driver.core.version}</version>
        </dependency>


        <dependency>
            <groupId>org.mongodb</groupId>
            <artifactId>mongodb-driver-core</artifactId>
            <version>${mongo.driver.core.version}</version>
        </dependency>

        <!--    使用 mongodb-driver 重新打包成的 custom-mongo-core  -->
        <dependency>
            <groupId>org.mongodb</groupId>
            <artifactId>mongodb-driver</artifactId>
            <version>3.12.6</version>
        </dependency>

    </dependencies>
    <build>
        <plugins>

            <!-- Java Compiler -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.1</version>
                <configuration>
                    <source>${target.java.version}</source>
                    <target>${target.java.version}</target>
                </configuration>
            </plugin>

            <!-- We use the maven-shade plugin to create a fat jar that contains all necessary dependencies. -->
            <!-- Change the value of <mainClass>...</mainClass> if your program entry point changes. -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>3.0.0</version>
                <executions>
                    <!-- Run shade goal on package phase -->
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <artifactSet>
                                <excludes>
                                    <exclude>org.apache.flink:force-shading</exclude>
                                    <exclude>com.google.code.findbugs:jsr305</exclude>
                                    <exclude>org.slf4j:*</exclude>
                                    <exclude>org.apache.logging.log4j:*</exclude>
                                    <exclude>org.apache.flink:flink-runtime-web_2.11</exclude>
                                </excludes>
                            </artifactSet>
                            <filters>
                                <filter>
                                    <!-- Do not copy the signatures in the META-INF folder.
                                    Otherwise, this might cause SecurityExceptions when using the JAR. -->
                                    <artifact>*:*</artifact>
                                    <excludes>
                                        <exclude>META-INF/*.SF</exclude>
                                        <exclude>META-INF/*.DSA</exclude>
                                        <exclude>META-INF/*.RSA</exclude>
                                    </excludes>
                                </filter>
                            </filters>
                            <transformers>
                                <transformer
                                        implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                    <mainClass>com.owp.flink.kafka.KafkaSourceDemo</mainClass>
                                </transformer>
                                <!-- flink sql 需要  -->
                                <!-- The service transformer is needed to merge META-INF/services files -->
                                <transformer
                                        implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
                                <!-- ... -->
                            </transformers>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>

        <pluginManagement>
            <plugins>
                <!-- This improves the out-of-the-box experience in Eclipse by resolving some warnings. -->
                <plugin>
                    <groupId>org.eclipse.m2e</groupId>
                    <artifactId>lifecycle-mapping</artifactId>
                    <version>1.0.0</version>
                    <configuration>
                        <lifecycleMappingMetadata>
                            <pluginExecutions>
                                <pluginExecution>
                                    <pluginExecutionFilter>
                                        <groupId>org.apache.maven.plugins</groupId>
                                        <artifactId>maven-shade-plugin</artifactId>
                                        <versionRange>[3.0.0,)</versionRange>
                                        <goals>
                                            <goal>shade</goal>
                                        </goals>
                                    </pluginExecutionFilter>
                                    <action>
                                        <ignore/>
                                    </action>
                                </pluginExecution>
                                <pluginExecution>
                                    <pluginExecutionFilter>
                                        <groupId>org.apache.maven.plugins</groupId>
                                        <artifactId>maven-compiler-plugin</artifactId>
                                        <versionRange>[3.1,)</versionRange>
                                        <goals>
                                            <goal>testCompile</goal>
                                            <goal>compile</goal>
                                        </goals>
                                    </pluginExecutionFilter>
                                    <action>
                                        <ignore/>
                                    </action>
                                </pluginExecution>
                            </pluginExecutions>
                        </lifecycleMappingMetadata>
                    </configuration>
                </plugin>

            </plugins>
        </pluginManagement>

    </build>

</project>

2)resources

2.1.appconfig.yml

mysql.url: "jdbc:mysql://1.1.1.1:3306/test?useSSL=false"
mysql.username: "test"
mysql.password: "123456"
mysql.driver: "com.mysql.jdbc.Driver"

2.2.application.properties

url=mongodb://test:test123456@10.1.1.1:34516/?authSource=admin
#database=diagnosis
#collection=diagnosisEntiry
maxConnectionIdleTime=1000000
batchSize=1

# flink
checkpoint.interval=300000
checkpoint.minPauseBetweenCheckpoints=10000
checkpoint.checkpointTimeout=400000
maxConcurrentCheckpoints=1
restartInterval=120
restartStrategy=3
checkpointDataUri=hdfs://nameserver/user/flink/rocksdbcheckpoint_mongo

mysql.url=jdbc:mysql://1.1.1.1:3306/test?useSSL=false
mysql.username=test
mysql.password=123456

#envType=PRE
envType=PRD

# mysql  druid 连接池生产环境连接池配置
druid.driverClassName=com.mysql.jdbc.Driver
#生产
druid.url=jdbc:mysql://1.1.1.1:3306/test
druid.username=test
druid.password=123456
# 初始化连接数
druid.initialSize=1
# 最大连接数
druid.maxActive=5
# 最大等待时间
druid.maxWait=3000

2.3.log4j.properties

log4j.rootLogger=info, stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n

2.4.log4j2.xml

<?xml version="1.0" encoding="UTF-8"?>
<configuration monitorInterval="5">
    <Properties>
        <property name="LOG_PATTERN" value="%date{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n" />
        <property name="LOG_LEVEL" value="ERROR" />
    </Properties>

    <appenders>
        <console name="console" target="SYSTEM_OUT">
            <PatternLayout pattern="${LOG_PATTERN}"/>
            <ThresholdFilter level="${LOG_LEVEL}" onMatch="ACCEPT" onMismatch="DENY"/>
        </console>
        <File name="log" fileName="tmp/log/job.log" append="false">
            <PatternLayout pattern="%d{HH:mm:ss.SSS} %-5level %class{36} %L %M - %msg%xEx%n"/>
        </File>
    </appenders>

    <loggers>
        <root level="${LOG_LEVEL}">
            <appender-ref ref="console"/>
            <appender-ref ref="log"/>
        </root>
    </loggers>
</configuration>

3)util

3.1.KafkaMysqlUtils

public class KafkaUtils {
    public static FlinkKafkaConsumer<ConsumerRecord<String, String>> getKafkaConsumer(List<String> topic) throws IOException {
        Properties prop1 = confFromYaml();
        //认证环境
        String envType = prop1.getProperty("envType");


        Properties prop = new Properties();

        System.setProperty("java.security.krb5.conf", "/opt/conf/krb5.conf");
        prop.put("security.protocol", "SASL_PLAINTEXT");
        prop.put("sasl.jaas.config", "com.sun.security.auth.module.Krb5LoginModule required "
                + "useTicketCache=false  "
                + "serviceName=\"" + "kafka" + "\" "
                + "useKeyTab=true "
                + "keyTab=\"" + "/opt/conf/test.keytab" + "\" "
                + "principal=\"" + getKafkaKerberos(envType).get("principal") + "\";");
        prop.put("bootstrap.servers", getKafkaKerberos(envType).get("bootstrap.servers"));
        prop.put("group.id", "Kafka2Mysql_test");
        prop.put("auto.offset.reset", "earliest");
        prop.put("enable.auto.commit", "false");
        prop.put("max.poll.interval.ms", "60000");
        prop.put("max.poll.records", "3000");
        prop.put("session.timeout.ms", "600000");

//        List<String> topics = Stream.of(prop.getProperty("topics").split(",", -1))
//                .collect(Collectors.toList());

        prop.put("key.serializer", "org.apache.flink.kafka.shaded.org.apache.kafka.common.serialization.ByteArrayDeserializer");
        prop.put("value.serializer", "org.apache.flink.kafka.shaded.org.apache.kafka.common.serialization.ByteArrayDeserializer");


        FlinkKafkaConsumer<ConsumerRecord<String, String>> consumer = new FlinkKafkaConsumer<ConsumerRecord<String, String>>(topic, new CustomDeSerializationSchema(), prop);

        consumer.setStartFromGroupOffsets();
        consumer.setCommitOffsetsOnCheckpoints(true);

        return consumer;
    }

    public static void main(String[] args) throws Exception {
        Properties druidConf = KafkaUtils.getDruidConf();
        if (druidConf == null) {
            throw new RuntimeException("缺少druid相关配置信息,请检查");
        }

        DataSource dataSource = DruidDataSourceFactory.createDataSource(druidConf);
        Connection connection = dataSource.getConnection();
        PreparedStatement showDatabases = connection.prepareStatement("\n" +
                "select count(*) from tab_factory");
        ResultSet resultSet = showDatabases.executeQuery();
        while (resultSet.next()) {
            String string = resultSet.getString(1);
            System.out.println(string);
        }
        resultSet.close();
        showDatabases.close();

        connection.close();


    }

    public static Properties getDruidConf() {
        try {
            Properties prop = confFromYaml();
            String driverClassName = prop.get("druid.driverClassName").toString();
            String url = prop.get("druid.url").toString();
            String username = prop.get("druid.username").toString();
            String password = prop.get("druid.password").toString();
            String initialSize = prop.get("druid.initialSize").toString();
            String maxActive = prop.get("druid.maxActive").toString();
            String maxWait = prop.get("druid.maxWait").toString();

            Properties p = new Properties();
            p.put("driverClassName", driverClassName);
            p.put("url", url);
            p.put("username", username);
            p.put("password", password);
            p.put("initialSize", initialSize);
            p.put("maxActive", maxActive);
            p.put("maxWait", maxWait);
//            p.forEach((k,v)-> System.out.println("连接池属性 "+k+"="+v));
            return p;
        } catch (Exception e) {
            e.printStackTrace();
        }
        return null;
    }


    // envType     PRE  PRD
    public static Map<String, String> getKafkaKerberos(String envType) {
        Map<String, String> map = new HashMap<>();
        if ("PRD".equalsIgnoreCase(envType)) {
            map.put("principal", "prd@PRD.PRD.COM");
            map.put("bootstrap.servers", "kfk01.prd:9092,kfk02.prd:9092,kfk03.prd:9092,kfk04.prd:9092,kfk05.prd:9092,kfk06.prd:9092");
        } else if ("PRE".equalsIgnoreCase(envType)) {
            map.put("principal", "pre@PRE.PRE.COM");
            map.put("bootstrap.servers", "kfk01.pre:9092,kfk02.pre:9092,kfk03.pre:9092");
        } /*else if ("TEST".equalsIgnoreCase(envType)) {
            map.put("principal","test@TEST.TEST.COM");
            map.put("bootstrap.servers","test@TEST.TEST.COM");
        } */ else {
            System.out.println("没有该" + envType + "环境");
            throw new RuntimeException("没有该" + envType + "环境");
        }

        return map;
    }

    public static StreamExecutionEnvironment setupFlinkEnv(StreamExecutionEnvironment env) throws IOException {
        Properties prop = confFromYaml();
        env.enableCheckpointing(Long.valueOf(prop.getProperty("checkpoint.interval")), CheckpointingMode.EXACTLY_ONCE);//这里会造成offset提交的延迟
        env.getCheckpointConfig().setMinPauseBetweenCheckpoints(Long.valueOf(prop.getProperty("checkpoint.minPauseBetweenCheckpoints")));
        env.getCheckpointConfig().setCheckpointTimeout(Long.valueOf(prop.getProperty("checkpoint.checkpointTimeout")));
        env.getCheckpointConfig().setMaxConcurrentCheckpoints(Integer.valueOf(prop.getProperty("maxConcurrentCheckpoints")));
        env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
//        env.getCheckpointConfig().setTolerableCheckpointFailureNumber(5);
        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(
                Integer.valueOf(prop.getProperty("restartStrategy")), // 尝试重启的次数,不宜过小,分布式任务很容易出问题(正常情况),建议3-5次
                Time.of(Integer.valueOf(prop.getProperty("restartInterval")), TimeUnit.SECONDS) // 延时
        ));
        // 设置状态后端存储方式
//        env.setStateBackend(new RocksDBStateBackend((String) prop.getProperty("checkPointPath"), true));
//        env.setStateBackend(new MemoryStateBackend());
        env.setStateBackend(new RocksDBStateBackend(String.valueOf(prop.getProperty("checkpointDataUri")), true));
        return env;

    }

    public static Properties confFromYaml() {
        Properties prop = new Properties();
        InputStream resourceStream = Thread.currentThread().getContextClassLoader().getResourceAsStream("application.properties");
        try {
            prop.load(resourceStream);
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            try {
                if (resourceStream != null) {
                    resourceStream.close();
                }
            } catch (Exception ex) {
                ex.printStackTrace();
            }
        }
        return prop;
    }
}

3.2.CustomDeSerializationSchema

public class CustomDeSerializationSchema implements KafkaDeserializationSchema<ConsumerRecord<String, String>> {
    private static String encoding = "UTF8";

    //是否表示l流的最后一条元素,设置为false,表示数据会源源不断的到来
    @Override
    public boolean isEndOfStream(ConsumerRecord<String, String> nextElement) {
        return false;
    }

    //这里返回一个ConsumerRecord<String,String>类型的数据,除了原数据还包括topic,offset,partition等信息
    @Override
    public ConsumerRecord<String, String> deserialize(ConsumerRecord<byte[], byte[]> record) throws Exception {
        byte[] key = (record.key() == null ? "".getBytes() : record.key());
        return new ConsumerRecord<String, String>(
                record.topic(),
                record.partition(),
                record.offset(),
                record.timestamp(),
                record.timestampType(),
                record.checksum(),
                record.serializedKeySize(),
                record.serializedValueSize(),
                /*这里我没有进行空值判断,生产一定记得处理*/
                new  String(key, encoding),
                new  String(record.value(), encoding));
    }

    //指定数据的输入类型
    @Override
    public TypeInformation<ConsumerRecord<String, String>> getProducedType() {
        return TypeInformation.of(new TypeHint<ConsumerRecord<String, String>>() {
        });
    }
}

4)po

4.1.TableBean

@Data
public class TableBean {
    private String database;
    private String table;
    private String primaryKey;

    private TableBean() {
    }

    public TableBean(String database, String table, String primaryKey) {
        this.database = '`' + database + '`';
        this.table = '`' + table + '`';
        this.primaryKey = primaryKey;
    }
}

5)kafkacdc2mysql

5.1.Kafka2MysqlApp

public class Kafka2MysqlApp {
    // key 是 topic 名,value是对应数据库表中的主键列名
    private static final Map<String, TableBean> map = new HashMap<>();

    static {
    	//表名这里没有进行配置,后面根据实际业务进行配置即可
        map.put("mysql_tab1", new TableBean("db1", "", "alarm_id"));
        map.put("mysql_tab2", new TableBean("db2", "", "id"));
    }

    public static void main(String[] args) throws Exception {

        ArrayList<String> topicList = new ArrayList<>(map.keySet());

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().disableOperatorChaining();
        KafkaUtils.setupFlinkEnv(env);

        RichSinkFunction<ConsumerRecord<String, String>> sinkFunction =
                new RichSinkFunction<ConsumerRecord<String, String>>() {
                    DataSource dataSource = null;

                    @Override
                    public void open(Configuration parameters) throws Exception {
                        initDruidDataSource();
                    }

                    private void initDruidDataSource() throws Exception {
                        Properties druidConf = KafkaUtils.getDruidConf();
                        if (druidConf == null) {
                            throw new RuntimeException("缺少druid相关配置信息,请检查");
                        }

                        dataSource = DruidDataSourceFactory.createDataSource(druidConf);
                    }

                    @Override
                    public void close() throws Exception {

                    }

                    @Override
                    public void invoke(ConsumerRecord<String, String> record, Context context) throws Exception {
                        if (dataSource == null) {
                            throw new RuntimeException("连接池未初始化");
                        }
                        String operationType = "";
                        String keyId = "";
                        String sql = "";
                        try (Connection connection = dataSource.getConnection()) {
                        	//定义表名
                            String table_name = record.topic();

                            JSONObject jsonObject = JSONObject.parseObject(record.value());
                            operationType = jsonObject.getString("operationType");
                            jsonObject.remove("operationType");
                            String primaryKey = map.get(record.topic()).getPrimaryKey();
                            String database = map.get(record.topic()).getDatabase();
                            keyId = jsonObject.getString(primaryKey);
                            List<String> columns = new ArrayList<>();
                            List<String> columnValues = new ArrayList<>();

                            jsonObject.forEach((k, v) -> {
                                columns.add(k);
                                columnValues.add(v.toString());
                            });

                            if ("INSERT".equals(operationType)) {
                                try {
                                    sql = "delete from " + database + "." + table_name + " where " + primaryKey + "= ?";

                                    PreparedStatement preparedStatement = connection.prepareStatement(sql);
                                    preparedStatement.setObject(1, keyId);
                                    preparedStatement.executeUpdate();
                                    preparedStatement.close();
                                } catch (Exception ignore) {
                                }


                                StringBuilder sb = new StringBuilder();
                                sb.append("insert into ").append(database).append(".").append(table_name).append("(");
                                for (String column : columns) {
                                    sb.append("`").append(column).append("`,");
                                }
                                sb.append(") values(");
                                for (String columnValue : columnValues) {
                                    sb.append("?,");
                                }
                                sb.append(")");
                                //去除最后一个逗号
                                sql = sb.toString().replace(",)", ")");

                                PreparedStatement preparedStatement = connection.prepareStatement(sql);
                                for (int i = 0; i < columnValues.size(); i++) {
                                    preparedStatement.setObject(i + 1, columnValues.get(i));
                                }
                                preparedStatement.executeUpdate();
                                preparedStatement.close();


                            } else if ("UPDATE".equals(operationType)) {

                                StringBuilder sb = new StringBuilder();
                                sb.append("update ").append(database).append(".").append(table_name).append(" set ");
                                for (String column : columns) {
                                    sb.append("`").append(column).append("`= ?,");
                                }
                                String sqlPre = sb.substring(0, sb.length() - 1);
                                sql = sqlPre + " where " + primaryKey + "='" + keyId + "'";

                                PreparedStatement preparedStatement = connection.prepareStatement(sql);
                                for (int i = 0; i < columnValues.size(); i++) {
                                    preparedStatement.setObject(i + 1, columnValues.get(i));
                                }
                                preparedStatement.executeUpdate();
                                preparedStatement.close();

                            } else if ("DELETE".equals(operationType)) {
                                sql = "delete from " + database + "." + table_name + " where " + primaryKey + "= ?";

                                PreparedStatement preparedStatement = connection.prepareStatement(sql);
                                preparedStatement.setObject(1, keyId);
                                preparedStatement.executeUpdate();
                                preparedStatement.close();
                            }
                        } catch (Exception e) {
                            System.out.printf("mysql同步操作(%s)有误,主键是%s,原因是%s,对应topic数据是%s%n", operationType, keyId, e.getMessage(), record);
                            System.out.println("执行sql语句为 " + sql);
                            throw new RuntimeException(e);
                        }
                    }
                };

        env.addSource(KafkaUtils.getKafkaConsumer(topicList)).addSink(sinkFunction);

        env.execute("kafka2mysql synchronization " + topicList.toString());
    }
}
文章来源:https://blog.csdn.net/weixin_53543905/article/details/135201505
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。