? 某天正在摸鱼的小邓,突然接到任务需要1个月内掌握flink并接手前辈遗留下来的大数据计算项目,于是便有了此文。
1.flink?简单了解
? ? ?有状态的数据计算、流批一体、高吞吐、低延迟、灵活、可扩展性好
? ? ?发展历史:?
? ?Flink起源于一个叫作Stratosphere的项目,它是由3所地处柏林的大学和欧洲其他一些大学在2010-2014年共同进行的研究项目,由柏林理工大学的教授沃克尔·马尔科(Volker Markl)领街开发2014年4月,Stratosphere的代码被复制并捐赠给了Apache软件基金会,Flink就是在此基础上被重新设计出来的。在德语中,“flink”一词表示“快速、灵巧”项目的1ogo是一只彩色的松鼠。
2014年8月,Flink第一个版本0.6正式发布,与此同时Fink的几位核心开发者创办Data Atisans公司
2014年12月,Flink项目完成孵化从apache毕业
2015年4月,Flink发布了里程碑式的重要版本0.9.0;
2019年1月,长期对Flink投入研发的阿里巴巴,以9000万欧元的价格收购了DataArtisans 公司2019年8月,阿里巴巴将内部版本Blink开源,合并入Fink 1.9.0版本。
查看:flink官网
2.环境准备?
? ?一台安装了java环境的liunx服务器(jdk8+)
3.下载flink安装包
wget?https://dlcdn.apache.org/flink/flink-1.17.2/flink-1.17.2-bin-scala_2.12.tgz
?4.解压并安装
tar -zxvf flink-1.17.0-bin-scala_2.12.tgz
修改配置文件(路径为解压后的conf目录如下:)
主要调整的配置如下:
# JobManager节点地址.
jobmanager.rpc.address: 10.26.141.203
jobmanager.bind-host: 0.0.0.0
rest.address: 10.26.141.203
rest.bind-address: 0.0.0.0
# TaskManager节点地址.需要配置为当前机器名
taskmanager.bind-host: 0.0.0.0
taskmanager.host: 10.26.141.203
启动并访问:
进入bin目录下执行启动脚本:
?bin/start-cluster.sh
打印StandaloneSession信息即为启动成功可以访问对应的webui界面如下:
5.提交任务jar并统计单词个数?
?新建maven项目命名为:FlinkLearn对应pom.xml文件如下:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.example</groupId>
<artifactId>FlinkLearn</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<maven.compiler.source>11</maven.compiler.source>
<maven.compiler.target>11</maven.compiler.target>
<flink.version>1.17.0</flink.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>${flink.version}</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.2.4</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<artifactSet>
<excludes>
<exclude>com.google.code.findbugs:jsr305</exclude>
<exclude>org.slf4j:*</exclude>
<exclude>log4j:*</exclude>
</excludes>
</artifactSet>
<filters>
<filter>
<!-- Do not copy the signatures in the META-INF folder.
Otherwise, this might cause SecurityExceptions when using the JAR. -->
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<transformers combine.children="append">
<transformer
implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer">
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project
?统计单词的执行方法如下:
public class SocketStreamWordCount {
public static void main(String[] args) throws Exception {
// TODO 1.创建执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// TODO 2.读取数据:从文件读
// TODO 2. 读取数据: socket
DataStreamSource<String> socketDS = env.socketTextStream("10.26.141.203", 7777);
// TODO 3.处理数据: 切分、转换、分组、聚合
// TODO 3.1 切分、转换
SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndOneDS = socketDS
.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
@Override
public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
// 按照 空格 切分
String[] words = value.split(" ");
for (String word : words) {
// 转换成 二元组 (word,1)
Tuple2<String, Integer> wordsAndOne = Tuple2.of(word, 1);
// 通过 采集器 向下游发送数据
out.collect(wordsAndOne);
}
}
});
// TODO 3.2 分组
KeyedStream<Tuple2<String, Integer>, String> wordAndOneKS = wordAndOneDS.keyBy(
new KeySelector<Tuple2<String, Integer>, String>() {
@Override
public String getKey(Tuple2<String, Integer> value) throws Exception {
return value.f0;
}
}
);
// TODO 3.3 聚合
SingleOutputStreamOperator<Tuple2<String, Integer>> sumDS = wordAndOneKS.sum(1);
// TODO 4.输出数据
sumDS.print();
// TODO 5.执行:类似 sparkstreaming最后 ssc.start()
env.execute();
}
}
打包并上传到界面:
打开socket 7777监听?
并执行任务:
可以看到有一个任务在执行:
在socket中输入?hello?dxy?可以再stdout 中看到?如下打印完成了一次单词统计: