前面我们已经搭建起了Flink 的基础环境,这一节我们就在上一节的基础上,进行编写我们的第一个Flink 程序,开始之前我们先看一下一个完整的Flink 程序是什么样的
为了演示Flink 程序结构,我们下面写了一个程序,这个程序我称之为 MiniProgram
,也就是流程序的最小结构
package com.kingcall.examples.stream;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.util.ArrayList;
public class MiniProgram {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
ArrayList<String> arras = new ArrayList<>();
arras.add("China");
arras.add("Japan");
arras.add("Russia");
DataStream<String> country= env.fromCollection(arras);
DataStream<String> filters = country.filter(new FilterFunction<String>() {
@Override
public boolean filter(String str) throws Exception {
return !"Japan".equals(str);
}
});
filters.print();
env.execute();
}
}
可以参考下面的注释,一个流程序就这样被创建出来了,一个标准的程序就是这样的结构
env
。流式应用需要用到 StreamExecutionEnvironment
。StreamExecutionEnvironment
。当调用 env.execute()
时此 graph 就被打包并发送到 JobManager 上,后者对作业并行处理并将其子任务分发给 Task Manager 来执行。每个作业的并行子任务将在 task slot 中执行。filters.print()
打印其结果到 task manager 的日志中(如果运行在 IDE 中时,将追加到你的 IDE 控制台)。它会对流中的每个元素都调用 toString()
方法。‘
需要特别注意的是,Flink 的流程序需要我们在最后调用env.execute() 才可以执行,在此之前都是在定义程序的执行逻辑,只有最后的这一步骤才会提交任务并执行
如果没有调用 execute(),应用就不会运行
下面就是程序的输出
’
1> 和 5> 指出输出来自哪个 sub-task(即 thread),我们可以对上面的程序进行改造一下,也就是设置并行度env.setParallelism(1);
这个时候输出就没有>
这样的提示了
‘
上述示例用 env.fromElements(...)
方法构造 DataStream<Person>
。这样将简单的流放在一起是为了方便用于原型或测试。StreamExecutionEnvironment
上还有一个 fromCollection(Collection)
方法。因此,你可以这样做:
List<Person> people = new ArrayList<Person>();
people.add(new Person("Fred", 35));
people.add(new Person("Wilma", 35));
people.add(new Person("Pebbles", 2));
DataStream<Person> flintstones = env.fromCollection(people);
另一个获取数据到流中的便捷方法是用 socket
DataStream<String> lines = env.socketTextStream("localhost", 9999)
或读取文件
DataStream<String> lines = env.readTextFile("file:///path");
在真实的应用中,最常用的数据源是那些支持低延迟,高吞吐并行读取以及重复(高性能和容错能力为先决条件)的数据源,例如 Apache Kafka,Kinesis 和各种文件系统。REST API 和数据库也经常用于增强流处理的能力(stream enrichment)。
上述示例用 filters.print()
打印其结果到 task manager 的日志中(如果运行在 IDE 中时,将追加到你的 IDE 控制台)。它会对流中的每个元素都调用 toString()
方法。
输出看起来类似于
1> Fred: age 35
2> Wilma: age 35
前面我们介绍了Flink程序的结构,我们是通过一个叫做最小程序的MiniProgram
进行说明的,我们之前一直说到Flink 程序的数据流,流说明数据是实时的流动的,那这里的数据有什么格式的要求吗,或者什么样的数据才能流动。
DataStream API 得名于特殊的 DataStream
类,该类用于表示 Flink 程序中的数据集合。你可以认为 它们是可以包含重复项的不可变数据集合。这些数据可以是有界(有限)的,也可以是无界(无限)的,但用于处理它们的API是相同的。
DataStream
在用法上类似于常规的 Java 集合
,但在某些关键方面却大不相同。它们是不可变的,这意味着一旦它们被创建,你就不能添加或删除元素。你也不能简单地察看内部元素,而只能使用 DataStream
API 操作来处理它们,DataStream
API 操作也叫作转换(transformation)。
你可以通过在 Flink 程序中添加 source 创建一个初始的 DataStream
。然后,你可以基于 DataStream
派生新的流,并使用 map、filter 等 API 方法把 DataStream
和派生的流连接在一起。
Flink 的 Java DataStream API 可以将任何可序列化的对象转化为流。Flink 自带的序列化器有
而且 Flink 会交给 Kryo 序列化其他类型。也可以将其他序列化器和 Flink 一起使用。特别是有良好支持的 Avro。
Flink 的原生序列化器可以高效地操作 tuples 和 POJOs
对于 Java,Flink 自带有 Tuple0
到 Tuple25
类型。
Tuple2<String, Integer> person = Tuple2.of("Fred", 35);
// zero based index!
String name = person.f0;
Integer age = person.f1;
如果满足以下条件,Flink 将数据类型识别为 POJO 类型(并允许“按名称”字段引用):
示例:
public class Person {
public String name;
public Integer age;
public Person() {}
public Person(String name, Integer age) {
. . .
}
}
Person person = new Person("Fred Flintstone", 35);
下面的程序将关于人的记录流作为输入,并且过滤后只包含成年人。
package com.kingcall.examples.stream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.api.common.functions.FilterFunction;
public class Adults {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Person> flintstones = env.fromElements(
new Person("Fred", 35),
new Person("Wilma", 35),
new Person("Pebbles", 2));
DataStream<Person> adults = flintstones.filter(new FilterFunction<Person>() {
@Override
public boolean filter(Person person) throws Exception {
return person.age >= 18;
}
});
adults.print();
env.execute();
}
public static class Person {
public String name;
public Integer age;
public Person() {
}
public Person(String name, Integer age) {
this.name = name;
this.age = age;
}
@Override
public String toString() {
return this.name.toString() + ": age " + this.age.toString();
}
}
}
下面就是程序的输出
’
每个 Flink 应用都需要有执行环境,在该示例中为 env
。流式应用需要用到 StreamExecutionEnvironment
。
DataStream API 将你的应用构建为一个 job graph,并附加到 StreamExecutionEnvironment
。当调用 env.execute()
时此 graph 就被打包并发送到 JobManager 上,后者对作业并行处理并将其子任务分发给 Task Manager 来执行。每个作业的并行子任务将在 task slot 中执行。
注意,如果没有调用 execute(),应用就不会运行。
‘
在生产中,应用程序将在远程集群或一组容器中运行。如果集群或容器挂了,这就属于远程失败。JobManager 和 TaskManager 日志对于调试此类故障非常有用,但是更简单的是 Flink 支持在 IDE 内部进行本地调试。你可以设置断点,检查局部变量,并逐行执行代码。如果想了解 Flink 的工作原理和内部细节,查看 Flink 源码也是非常好的方法。
Flink 中的 DataStream 程序是对数据流(例如过滤、更新状态、定义窗口、聚合)进行转换的常规程序。数据流的起始是从各种源(例如消息队列、套接字流、文件)创建的。结果通过 sink 返回,例如可以将数据写入文件或标准输出(例如命令行终端)。Flink 程序可以在各种上下文中运行,可以独立运行,也可以嵌入到其它程序中。任务执行可以运行在本地 JVM 中,也可以运行在多台机器的集群上。
Flink 程序看起来像一个转换 DataStream
的常规程序。每个程序由相同的基本部分组成:
执行环境(execution environment)
;