前几篇内容讲解的都是环境的部署安装,下面就关于安装好的环境,开始着手程序的编写和实现。
批处理在flink中来说操作是有界的,比如对一个文件的单词进行统计,首选的话需要创建执行环境,此处使用的ExecutionEnviroment,下面是具体的执行代码
提示:先导入maven依赖
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>1.17.0</version>
</dependency>
提示:创建java运行类
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.AggregateOperator;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.operators.FlatMapOperator;
import org.apache.flink.api.java.operators.UnsortedGrouping;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;
public class WordCountBatchDemo {
public static void main(String[] args) throws Exception {
//TODO 1.创建执行环境
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
//TODO 2.读取文件:从文件中读取
DataSource<String> lineDs = env.readTextFile("input/word.txt");
//TODO 3.切分、转换(word,1)
FlatMapOperator<String, Tuple2<String, Integer>> wordAndOne = lineDs.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
@Override
public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
//TODO 3.1按照空格切分单词
String[] words = value.split(" ");
//TODO 3.2将单词转换为(word,1)格式
for (String word:words){
Tuple2<String,Integer> wordTuple2 = Tuple2.of(word,1);
out.collect(wordTuple2);
}
}
});
//TODO 4.按照word分组
UnsortedGrouping<Tuple2<String,Integer>> wordAndOneGroupBy = wordAndOne.groupBy(0);
//ToDO 5.各分组内聚合
AggregateOperator<Tuple2<String,Integer>> sum = wordAndOneGroupBy.sum(1);//1是位置,表示第二个袁术
//TODO 6.输出
sum.print();
}
}
读取的文件目录可以在项目文件中创建,也可以更改为hdfs路径
Flink流式处理首选要创建流式执行环境,此时用StreamExecutionEnvironment
提示:flink高版本maven依赖和低版本有很大区别
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>1.17.0</version>
</dependency>
提示:类名需导入正确路径,防止程序出错
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
public class WordCountStreamDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(new Configuration());
DataStreamSource<String> lineDS = env.socketTextStream("cdp1", 7777);
SingleOutputStreamOperator<Tuple2<String, Integer>> singleOutputStreamOperator = lineDS.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
@Override
public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
String[] words = value.split(" ");
for(String word:words){
out.collect(Tuple2.of(word,1));
}
}
});
KeyedStream<Tuple2<String, Integer>, String> tuple2StringKeyedStream = singleOutputStreamOperator.keyBy(new KeySelector<Tuple2<String, Integer>, String>() {
@Override
public String getKey(Tuple2<String, Integer> value) throws Exception {
return value.f0;
}
});
SingleOutputStreamOperator<Tuple2<String, Integer>> sum1 = tuple2StringKeyedStream.sum(1);
sum1.print();
env.execute();
}
}
该程序是监听cdp1这台机器的7777端口,可以在cdp1这台机器上执行命令nc -lk 7777开启7777端口(确保机器已安装netcat,如果未安装,则执行yum install -y netcat命令安装),在窗口内,输入单词,flink程序这边即可统计单词数量。
以上两个例子分别是批处理文件单词统计和流式处理统计7777端口输入的单词统计,也是开始接触flink的第一个demo,flink还有很多强大的功能,后续会结合具体的业务场景讲解具体的实现代码,好了,今天就讲到这里,后续会继续持续更新。