一致性其实就是结果的正确性。精确一次是指数据有可能被处理多次,但是结果只有一个。
三个级别:
一次性
写入外部系统public class Flink02_KafkaToFlink {
public static void main(String[] args) {
//1.创建运行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//默认是最大并行度
env.setParallelism(1);
//开启检查点
env.enableCheckpointing(1000L);
//kafka source
KafkaSource<String> kafkaSource = KafkaSource.<String>builder()
.setBootstrapServers("hadoop102:9092,hadoop103:9092")
.setGroupId("flinkb")
.setTopics("topicA")
//优先使用消费者组 记录的Offset进行消费,如果offset不存在,根据策略进行重置
.setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.LATEST))
.setValueOnlyDeserializer(new SimpleStringSchema())
//如果还有别的配置需要指定,统一使用通用方法
.setProperty("isolation.level", "read_committed")
.build();
DataStreamSource<String> ds = env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "kafkasource");
//处理过程
//kafka Sink
KafkaSink<String> kafkaSink = KafkaSink.<String>builder()
.setBootstrapServers("hadoop102:9092,hadoop103:9092")
.setRecordSerializer(
KafkaRecordSerializationSchema.<String>builder()
.setTopic("first")
.setValueSerializationSchema(new SimpleStringSchema())
.build()
)
//语义
//AT_LEAST_ONCE:至少一次,表示数据可能重复,需要考虑去重操作
//EXACTLY_ONCE:精确一次
//kafka transaction timeout is larger than broker
//kafka超时时间:1H
//broker超时时间:15分钟
// .setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)//数据传输的保障
.setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE)//数据传输的保障
.setTransactionalIdPrefix("flink"+ RandomUtils.nextInt(0,100000))
// .setProperty(ProducerConfig.RETRIES_CONFIG,"10")
.setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG,"60*1000*10")//10分钟
.build();
ds.map(
JSON::toJSONString
).sinkTo(kafkaSink);//写入到kafka 生产者
ds.sinkTo(kafkaSink);
try {
env.execute();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}
FlinkSQL不同版本的接口仍在变化,有变动查看官网。
在官网这个位置可以查看Flink对于以来的一些官方介绍。
Table依赖剖析
三个依赖:
1. flink-table-api-java-uber-1.17.2.jar (所有的Java API)
2. flink-table-runtime-1.17.2.jar (包含Table运行时)
3. flink-table-planner-loader-1.17.2.jar (查询计划器,即SQL解析器)
静态导包:在import后添加static,并在类后面加上*导入全部。主要是为了方便使用下面的 $ 方法,否则 $ 方法前面都要添加Expressions的类名前缀
table.where($("vc").isGreaterOrEqual(100))
.select($("id"),$("vc"),$("ts"))
.execute()
.print();
因此处理过程中的表是动态表,必须要持续查询。
public class Flink04_TableToStreamQQ {
public static void main(String[] args) {
//1.创建运行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//默认是最大并行度
env.setParallelism(1);
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
SingleOutputStreamOperator<Event> ds = env.socketTextStream("hadoop102", 8888)
.map(
line -> {
String[] fields = line.split(",");
return new Event(fields[0].trim(), fields[1].trim(), Long.valueOf(fields[2].trim()));
}
);
Table table = tableEnv.fromDataStream(ds);
tableEnv.createTemporaryView("t1", table);
//SQL
String appendSQL = "select user, url, ts from t1 where user <> 'zhangsan'";
//需要在查询过程中更新上一次的值
String updateSQL = "select user, count(*) cnt from t1 group by user";
Table resultTable = tableEnv.sqlQuery(updateSQL);
//表转换为流
//doesn't support consuming update changes which is produced by node GroupAggregate(groupBy=[user], select=[user, COUNT(*) AS cnt])
// DataStream<Row> rowDs = tableEnv.toDataStream(resultTable);
//有更新操作时,使用toChangelogStream(),它即支持追加,也支持更新查询
DataStream<Row> rowDs = tableEnv.toChangelogStream(resultTable);
rowDs.print();
try {
env.execute();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}
public class Flink01_DataGenPrint {
public static void main(String[] args) {
//TableEnvironment tableEnv = TableEnvironment.create(EnvironmentSettings.newInstance().build());
//1. 准备表环境, 基于流环境,创建表环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
//DataGen
String createTable =
" create table t1 ( " +
" id STRING , " +
" vc INT ," +
" ts BIGINT " +
" ) WITH (" +
" 'connector' = 'datagen' ," +
" 'rows-per-second' = '1' ," +
" 'fields.id.kind' = 'random' , " +
" 'fields.id.length' = '6' ," +
" 'fields.vc.kind' = 'random' , " +
" 'fields.vc.min' = '100' , " +
" 'fields.vc.max' = '1000' ," +
" 'fields.ts.kind' = 'sequence' , " +
" 'fields.ts.start' = '1000000' , " +
" 'fields.ts.end' = '100000000' " +
" )" ;
tableEnv.executeSql(createTable);
//Table resultTable = tableEnv.sqlQuery("select * from t1 where vc >= 200");
//.execute().print();
//print
String sinkTable =
"create table t2(" +
"id string," +
"vc int," +
"ts bigint" +
") with (" +
" 'connector' = 'print', " +
" 'print-identifier' = 'print>' " +
")";
tableEnv.executeSql(sinkTable);
tableEnv.executeSql("insert into t2 select id, vc, ts from t1 where vc >= 200");
}
}
public class Flink02_FileConnector {
public static void main(String[] args) {
TableEnvironment tableEnvironment = TableEnvironment.create(EnvironmentSettings.newInstance().build());
//FileSource
String sourceTable =
" create table t1 ( " +
" id STRING , " +
" vc INT ," +
" ts BIGINT," +
//" `file.name` string not null METADATA," + 文件名字由于系统原因无法识别盘符后面的冒号
" `file.size` bigint not null METADATA" +
" ) WITH (" +
" 'connector' = 'filesystem' ," +
" 'path' = 'input/ws.txt' ," +
" 'format' = 'csv' " +
" )" ;
tableEnvironment.executeSql(sourceTable);
//tableEnvironment.sqlQuery(" select * from t1 ").execute().print();
//转换处理...
//File sink
String sinkTable =
" create table t2 ( " +
" id STRING , " +
" vc INT ," +
" ts BIGINT," +
//" `file.name` string not null METADATA," + 文件名字由于系统原因无法识别盘符后面的冒号
" file_size bigint" +
" ) WITH (" +
" 'connector' = 'filesystem' ," +
" 'path' = 'output' ," +
" 'format' = 'json' " +
" )" ;
tableEnvironment.executeSql(sinkTable);
tableEnvironment.executeSql("insert into t2 " +
"select id, vc, ts, `file.size` from t1");
}
}
public class Flink03_KafkaConnector {
public static void main(String[] args) {
TableEnvironment tableEnvironment = TableEnvironment.create(EnvironmentSettings.newInstance().build());
//kafka source
String sourceTable =
" create table t1 ( " +
" id STRING , " +
" vc INT ," +
" ts BIGINT," +
" `topic` string not null METADATA," +
" `partition` int not null METADATA," +
" `offset` bigint not null METADATA" +
" ) WITH (" +
" 'connector' = 'kafka' ," +
" 'properties.bootstrap.servers' = 'hadoop102:9092,hadoop103:9092' ," +
" 'topic' = 'topicA', " +
" 'properties.group.id' = 'flinksql', " +
" 'value.format' = 'csv', " +
" 'scan.startup.mode' = 'group-offsets'," +
" 'properties.auto.offset.reset' = 'latest' " +
" )" ;
//创建表
tableEnvironment.executeSql(sourceTable);
//打印查询结果
//tableEnvironment.sqlQuery(" select * from t1 ").execute().print();
//转换处理...
//kafka Sink
String sinkTable =
" create table t2 ( " +
" id STRING , " +
" vc INT ," +
" ts BIGINT," +
" `topic` string " +
" ) WITH (" +
" 'connector' = 'kafka' ," +
" 'properties.bootstrap.servers' = 'hadoop102:9092,hadoop103:9092' ," +
" 'topic' = 'topicB', " +
" 'sink.delivery-guarantee' = 'at-least-once', " +
//" 'properties.transaction.timeout.ms' = '', " +
//" 'sink.transactional-id-prefix' = 'xf', " +
//" 'properties.group.id' = 'flinksql', " +
" 'value.format' = 'json' " +
//" 'scan.startup.mode' = 'group-offsets'," +
//" 'properties.auto.offset.reset' = 'latest' " +
" )" ;
tableEnvironment.executeSql(sinkTable);
tableEnvironment.executeSql("insert into t2 " +
"select id, vc, ts, `topic` from t1");
}
}