flink学习之旅(-)

发布时间:2024年01月23日

? 某天正在摸鱼的小邓,突然接到任务需要1个月内掌握flink并接手前辈遗留下来的大数据计算项目,于是便有了此文。

1.flink?简单了解

? ? ?有状态的数据计算、流批一体、高吞吐、低延迟、灵活、可扩展性好

? ? ?发展历史:?

? ?Flink起源于一个叫作Stratosphere的项目,它是由3所地处柏林的大学和欧洲其他一些大学在2010-2014年共同进行的研究项目,由柏林理工大学的教授沃克尔·马尔科(Volker Markl)领街开发2014年4月,Stratosphere的代码被复制并捐赠给了Apache软件基金会,Flink就是在此基础上被重新设计出来的。在德语中,“flink”一词表示“快速、灵巧”项目的1ogo是一只彩色的松鼠。
2014年8月,Flink第一个版本0.6正式发布,与此同时Fink的几位核心开发者创办Data Atisans公司
2014年12月,Flink项目完成孵化从apache毕业
2015年4月,Flink发布了里程碑式的重要版本0.9.0;
2019年1月,长期对Flink投入研发的阿里巴巴,以9000万欧元的价格收购了DataArtisans 公司2019年8月,阿里巴巴将内部版本Blink开源,合并入Fink 1.9.0版本。

查看:flink官网

2.环境准备?

? ?一台安装了java环境的liunx服务器(jdk8+)

3.下载flink安装包

wget?https://dlcdn.apache.org/flink/flink-1.17.2/flink-1.17.2-bin-scala_2.12.tgz

?4.解压并安装

tar -zxvf flink-1.17.0-bin-scala_2.12.tgz

修改配置文件(路径为解压后的conf目录如下:)

主要调整的配置如下:

# JobManager节点地址.

jobmanager.rpc.address: 10.26.141.203

jobmanager.bind-host: 0.0.0.0

rest.address: 10.26.141.203

rest.bind-address: 0.0.0.0

# TaskManager节点地址.需要配置为当前机器名

taskmanager.bind-host: 0.0.0.0

taskmanager.host: 10.26.141.203

启动并访问:

进入bin目录下执行启动脚本:

?bin/start-cluster.sh

打印StandaloneSession信息即为启动成功可以访问对应的webui界面如下:

5.提交任务jar并统计单词个数?

?新建maven项目命名为:FlinkLearn对应pom.xml文件如下:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>

  <groupId>org.example</groupId>
  <artifactId>FlinkLearn</artifactId>
  <version>1.0-SNAPSHOT</version>

  <properties>
    <maven.compiler.source>11</maven.compiler.source>
    <maven.compiler.target>11</maven.compiler.target>
    <flink.version>1.17.0</flink.version>
  </properties>
  <dependencies>
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-streaming-java</artifactId>
      <version>${flink.version}</version>
    </dependency>

    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-clients</artifactId>
      <version>${flink.version}</version>
    </dependency>
  </dependencies>

  <build>
    <plugins>
      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-shade-plugin</artifactId>
        <version>3.2.4</version>
        <executions>
          <execution>
            <phase>package</phase>
            <goals>
              <goal>shade</goal>
            </goals>
            <configuration>
              <artifactSet>
                <excludes>
                  <exclude>com.google.code.findbugs:jsr305</exclude>
                  <exclude>org.slf4j:*</exclude>
                  <exclude>log4j:*</exclude>
                </excludes>
              </artifactSet>
              <filters>
                <filter>
                  <!-- Do not copy the signatures in the META-INF folder.
                  Otherwise, this might cause SecurityExceptions when using the JAR. -->
                  <artifact>*:*</artifact>
                  <excludes>
                    <exclude>META-INF/*.SF</exclude>
                    <exclude>META-INF/*.DSA</exclude>
                    <exclude>META-INF/*.RSA</exclude>
                  </excludes>
                </filter>
              </filters>
              <transformers combine.children="append">
                <transformer
                        implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer">
                </transformer>
              </transformers>
            </configuration>
          </execution>
        </executions>
      </plugin>
    </plugins>
  </build>

</project

?统计单词的执行方法如下:

public class SocketStreamWordCount {
    public static void main(String[] args) throws Exception {
        // TODO 1.创建执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // TODO 2.读取数据:从文件读
      // TODO 2. 读取数据: socket
      DataStreamSource<String> socketDS = env.socketTextStream("10.26.141.203", 7777);

        // TODO 3.处理数据: 切分、转换、分组、聚合
        // TODO 3.1 切分、转换
        SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndOneDS = socketDS
                .flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
                    @Override
                    public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
                        // 按照 空格 切分
                        String[] words = value.split(" ");
                        for (String word : words) {
                            // 转换成 二元组 (word,1)
                            Tuple2<String, Integer> wordsAndOne = Tuple2.of(word, 1);
                            // 通过 采集器 向下游发送数据
                            out.collect(wordsAndOne);
                        }
                    }
                });
        // TODO 3.2 分组
        KeyedStream<Tuple2<String, Integer>, String> wordAndOneKS = wordAndOneDS.keyBy(
                new KeySelector<Tuple2<String, Integer>, String>() {
                    @Override
                    public String getKey(Tuple2<String, Integer> value) throws Exception {
                        return value.f0;
                    }
                }
        );
        // TODO 3.3 聚合
        SingleOutputStreamOperator<Tuple2<String, Integer>> sumDS = wordAndOneKS.sum(1);

        // TODO 4.输出数据
        sumDS.print();

        // TODO 5.执行:类似 sparkstreaming最后 ssc.start()
        env.execute();
    }
}

打包并上传到界面:

打开socket 7777监听?

并执行任务:

可以看到有一个任务在执行:

在socket中输入?hello?dxy?可以再stdout 中看到?如下打印完成了一次单词统计:

文章来源:https://blog.csdn.net/u012440725/article/details/135752025
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。