1、部署
1.1、修改flink-conf.yaml
1.1.1、flink-17
jobmanager.rpc.address: boshi-122
jobmanager.rpc.port: 6123
jobmanager.memory.process.size: 2048m
taskmanager.memory.process.size: 4096mb
taskmanager.memory.task.heap.size: 3072m
taskmanager.memory.framework.heap.size: 128m
taskmanager.memory.managed.size: 128m
taskmanager.memory.framework.off-heap.size: 128m
taskmanager.memory.network.max: 128m
taskmanager.memory.jvm-metaspace.size: 256m
taskmanager.memory.jvm-overhead.max: 256m
taskmanager.numberOfTaskSlots: 8
parallelism.default: 1
jobmanager.execution.failover-strategy: region
classloader.check-leaked-classloader: false
akka.ask.timeout: 50s
web.timeout: 50000
heartbeat.timeout: 180000
taskmanager.network.request-backoff.max: 240000
state.savepoints.dir: file:///data/flink/savepoint/
state.checkpoints.dir: file:///data/flink/checkpoint/
env.java.opts: -XX:+UseConcMarkSweepGC -XX:CMSInitiatingOccupancyFraction=75 -XX:+UseCMSInitiatingOccupancyOnly -XX:+AlwaysPreTouch -server -XX:+HeapDumpOnOutOfMemoryError
1.1.2、flink-1-13
jobmanager.rpc.address: boshi-146
jobmanager.rpc.port: 6123
jobmanager.memory.process.size: 4096m
taskmanager.memory.process.size: 16384mb
taskmanager.memory.task.heap.size: 15360m
taskmanager.memory.framework.heap.size: 128m
taskmanager.memory.managed.size: 128m
taskmanager.memory.framework.off-heap.size: 128m
taskmanager.memory.network.max: 128m
taskmanager.memory.jvm-metaspace.size: 256m
taskmanager.memory.jvm-overhead.max: 256m
taskmanager.numberOfTaskSlots: 2
parallelism.default: 2
jobmanager.execution.failover-strategy: region
classloader.check-leaked-classloader: false
akka.ask.timeout: 100s
web.timeout: 100000
heartbeat.timeout: 180000
taskmanager.network.request-backoff.max: 240000
state.savepoints.dir: hdfs://hdfs-ha/flink/savepoint/
state.checkpoints.dir: hdfs://hdfs-ha/flink/checkpoint/
env.java.opts: -server -XX:+UseG1GC -Xloggc:<LOG_DIR>/gc.log -XX:+PrintGCDetails -XX:-OmitStackTraceInFastThrow -XX:+PrintGCTimeStamps -XX:+PrintGCDateStamps -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=20 -XX:GCLogFileSize=100M
1.2、masters
boshi-122:8081
1.3、workers
boshi-129
boshi-137
boshi-144
boshi-166
2、提交任务
2.1、mysql-to-kafka-starrocks
CREATE TABLE mysql_crawl_enterprise_website (
`id` int,
`eid` varchar,
`enterprise_name` varchar,
`website` varchar,
`html` varchar,
PRIMARY KEY (`id`) NOT ENFORCED
) WITH (
'connector' = 'mysql-cdc',
'hostname' = 'ip',
'port' = '3306',
'username' = 'root',
'password' = '',
'database-name' = 'db_enterprise_outer_resource',
'table-name' = 'crawl_enterprise_website',
'scan.incremental.snapshot.enabled' = 'false'
);
CREATE TABLE kafka_crawl_enterprise_website (
`id` int,
`eid` varchar,
`enterprise_name` varchar,
`website` varchar,
`html` varchar,
PRIMARY KEY (`id`) NOT ENFORCED
) WITH (
'connector' = 'upsert-kafka',
'topic' = 'ods_crawl_enterprise_website',
'properties.bootstrap.servers' = 'ip:6667,ip:6667,ip:6667',
'properties.group.id' = 'source_province',
'properties.max.request.size' = '512000000',
'properties.session.timeout.ms' = '60000',
'properties.request.timeout.ms' = '40000',
'key.format' = 'json',
'value.format' = 'json'
);
CREATE TABLE starrock_ods_crawl_enterprise_website (
`id` int,
`eid` varchar,
`enterprise_name` varchar,
`website` varchar,
`html` varchar,
PRIMARY KEY (`id`) NOT ENFORCED
) WITH (
'connector' = 'starrocks',
'jdbc-url' = 'jdbc:mysql://ip:9030',
'load-url' = 'ip:8030',
'database-name' = 'ods',
'table-name' = 'ods_crawl_enterprise_website',
'username' = 'starrocks',
'password' = '',
'sink.max-retries' = '5',
'sink.buffer-flush.max-bytes' = '256000000',
'sink.buffer-flush.interval-ms' = '3000',
'sink.properties.format' = 'json',
'sink.properties.strip_outer_array' = 'true',
'sink.properties.ignore_json_size' = 'true'
);
insert into kafka_crawl_enterprise_website select * from mysql_crawl_enterprise_website;
insert into starrock_ods_crawl_enterprise_website select * from kafka_crawl_enterprise_website;
2.2、提交参数
jobmanager.memory.process.size=4096m
taskmanager.memory.process.size=8192m
taskmanager.memory.task.heap.size=7168m
taskmanager.memory.framework.heap.size=128m
taskmanager.memory.framework.off-heap.size=128m
taskmanager.memory.managed.size=128m
taskmanager.memory.network.max=128m
taskmanager.memory.jvm-metaspace.size=256m
taskmanager.memory.jvm-overhead.max=256m
parallelism.default=3
taskmanager.numberOfTaskSlots=1
yarn.containers.vcores=1