一般流式计算会与批量计算相比较。在流式计算模型中,输入是持续的,可以认为在时间上是无界的,也就意味着,永远拿不到全量数据去做计算。同时,计算结果是持续输出的,也即计算结果在时间上也是无界的。流式计算一般对实时性要求较高,同时一般是先定义目标计算,然后数据到来之后将计算逻辑应用于数据。同时为了提高计算效率,往往尽可能采用增量计算代替全量计算。
流式计算就相当于上图的右侧扶梯,是可以源源不断的产生数据,源源不断的接收数据,没有边界。
日志分析
网站的用户访问日志进行实时的分析,计算访问量,用户画像,留存率等等,实时的进行数据分析,帮助企业进行决策
大屏看板统计
可以实时的查看网站注册数量,订单数量,购买数量,金额等。
公交实时数据
可以随时更新公交车方位,计算多久到达站牌等
实时文章分值计算
头条类文章的分值计算,通过用户的行为实时文章的分值,分值越高就越被推荐。
Hadoop
Apche Storm
Storm 是一个分布式实时大数据处理系统,可以帮助我们方便地处理海量数据,具有高可靠、高容错、高扩展的特点。是流式框架,有很高的数据吞吐能力。
Kafka Stream
可以轻松地将其嵌入任何Java应用程序中,并与用户为其流应用程序所拥有的任何现有打包,部署和操作工具集成。
Kafka Stream是Apache Kafka从0.10版本引入的一个新Feature。它是提供了对存储于Kafka内的数据进行流式处理和分析的功能。
Kafka Stream的特点如下:
Kafka Stream提供了一个非常简单而轻量的Library,它可以非常方便地嵌入任意Java应用中,也可以任意方式打包和部署
除了Kafka外,无任何外部依赖
充分利用Kafka分区机制实现水平扩展和顺序性保证
通过可容错的state store实现高效的状态操作(如windowed join和aggregation)
支持正好一次处理语义
提供记录级的处理能力,从而实现毫秒级的低延迟
支持基于事件时间的窗口操作,并且可处理晚到的数据(late arrival of records)
同时提供底层的处理原语Processor(类似于Storm的spout和bolt),以及高层抽象的DSL(类似于Spark的map/group/reduce)
源处理器(Source Processor):源处理器是一个没有任何上游处理器的特殊类型的流处理器。它从一个或多个kafka主题生成输入流。通过消费这些主题的消息并将它们转发到下游处理器。
Sink处理器:sink处理器是一个没有下游流处理器的特殊类型的流处理器。它接收上游流处理器的消息发送到一个指定的Kafka主题。
(1)数据结构类似于map,如下图,key-value键值对
(2)KStream
KStream数据流(data stream),即是一段顺序的,可以无限长,不断更新的数据集。 数据流中比较常记录的是事件,这些事件可以是一次鼠标点击(click),一次交易,或是传感器记录的位置数据。
KStream负责抽象的,就是数据流。与Kafka自身topic中的数据一样,类似日志,每一次操作都是向其中插入(insert)新数据。
为了说明这一点,让我们想象一下以下两个数据记录正在发送到流中:
(“ alice”,1)->(“” alice“,3)
如果您的流处理应用是要总结每个用户的价值,它将返回4
了alice
。为什么?因为第二条数据记录将不被视为先前记录的更新。(insert)新数据
(1)需求分析,求单词个数(word count)
(2)引入依赖
<dependency> ? ?<groupId>org.apache.kafka</groupId> ? ?<artifactId>kafka-streams</artifactId> ? ?<exclusions> ? ? ? ?<exclusion> ? ? ? ? ? ?<artifactId>connect-json</artifactId> ? ? ? ? ? ?<groupId>org.apache.kafka</groupId> ? ? ? ?</exclusion> ? ? ? ?<exclusion> ? ? ? ? ? ?<groupId>org.apache.kafka</groupId> ? ? ? ? ? ?<artifactId>kafka-clients</artifactId> ? ? ? ?</exclusion> ? ?</exclusions> </dependency>
(3)创建原生的kafka staream入门案例
? import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.TimeWindows; import org.apache.kafka.streams.kstream.ValueMapper; ? import java.time.Duration; import java.util.Arrays; import java.util.Properties; ? /** * 流式处理 */ public class KafkaStreamQuickStart { ? ? ?public static void main(String[] args) { ? ? ? ? ?//kafka的配置信心 ? ? ? ?Properties prop = new Properties(); ? ? ? ?prop.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.200.130:9092"); ? ? ? ?prop.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); ? ? ? ?prop.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); ? ? ? ?prop.put(StreamsConfig.APPLICATION_ID_CONFIG,"streams-quickstart"); ? ? ? ? ?//stream 构建器 ? ? ? ?StreamsBuilder streamsBuilder = new StreamsBuilder(); ? ? ? ? ?//流式计算 ? ? ? ?streamProcessor(streamsBuilder); ? ? ? ? ? ?//创建kafkaStream对象 ? ? ? ?KafkaStreams kafkaStreams = new KafkaStreams(streamsBuilder.build(),prop); ? ? ? ?//开启流式计算 ? ? ? ?kafkaStreams.start(); ? } ? ? ?/** ? ? * 流式计算 ? ? * 消息的内容:hello kafka hello itcast ? ? * @param streamsBuilder ? ? */ ? ?private static void streamProcessor(StreamsBuilder streamsBuilder) { ? ? ? ?//创建kstream对象,同时指定从那个topic中接收消息 ? ? ? ?KStream<String, String> stream = streamsBuilder.stream("itcast-topic-input"); ? ? ? ?/** ? ? ? ? * 处理消息的value ? ? ? ? */ ? ? ? ?stream.flatMapValues(new ValueMapper<String, Iterable<String>>() { ? ? ? ? ? ?@Override ? ? ? ? ? ?public Iterable<String> apply(String value) { ? ? ? ? ? ? ? ?return Arrays.asList(value.split(" ")); ? ? ? ? ? } ? ? ? }) ? ? ? ? ? ? ? ?//按照value进行聚合处理 ? ? ? ? ? ? ? .groupBy((key,value)->value) ? ? ? ? ? ? ? ?//时间窗口 ? ? ? ? ? ? ? .windowedBy(TimeWindows.of(Duration.ofSeconds(10))) ? ? ? ? ? ? ? ?//统计单词的个数 ? ? ? ? ? ? ? .count() ? ? ? ? ? ? ? ?//转换为kStream ? ? ? ? ? ? ? .toStream() ? ? ? ? ? ? ? .map((key,value)->{ ? ? ? ? ? ? ? ? ? ?System.out.println("key:"+key+",vlaue:"+value); ? ? ? ? ? ? ? ? ? ?return new KeyValue<>(key.key().toString(),value.toString()); ? ? ? ? ? ? ? }) ? ? ? ? ? ? ? ?//发送消息 ? ? ? ? ? ? ? .to("itcast-topic-out"); ? } }
(4)测试准备
使用生产者在topic为:itcast_topic_input中发送多条消息
使用消费者接收topic为:itcast_topic_out
结果:
通过流式计算,会把生产者的多条消息汇总成一条发送到消费者中输出
(1)自定配置参数
? import lombok.Getter; import lombok.Setter; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.Topology; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.annotation.EnableKafkaStreams; import org.springframework.kafka.annotation.KafkaStreamsDefaultConfiguration; import org.springframework.kafka.config.KafkaStreamsConfiguration; ? import java.util.HashMap; import java.util.Map; ? /** * 通过重新注册KafkaStreamsConfiguration对象,设置自定配置参数 */ ? @Setter @Getter @Configuration @EnableKafkaStreams @ConfigurationProperties(prefix="kafka") public class KafkaStreamConfig { ? ?private static final int MAX_MESSAGE_SIZE = 16* 1024 * 1024; ? ?private String hosts; ? ?private String group; ? ?@Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME) ? ?public KafkaStreamsConfiguration defaultKafkaStreamsConfig() { ? ? ? ?Map<String, Object> props = new HashMap<>(); ? ? ? ?props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, hosts); ? ? ? ?props.put(StreamsConfig.APPLICATION_ID_CONFIG, this.getGroup()+"_stream_aid"); ? ? ? ?props.put(StreamsConfig.CLIENT_ID_CONFIG, this.getGroup()+"_stream_cid"); ? ? ? ?props.put(StreamsConfig.RETRIES_CONFIG, 10); ? ? ? ?props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); ? ? ? ?props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); ? ? ? ?return new KafkaStreamsConfiguration(props); ? } }
修改application.yml文件,在最下方添加自定义配置
kafka: hosts: 192.168.200.130:9092 group: ${spring.application.name}
(2)新增配置类,创建KStream对象,进行聚合
? import lombok.extern.slf4j.Slf4j; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.TimeWindows; import org.apache.kafka.streams.kstream.ValueMapper; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; ? import java.time.Duration; import java.util.Arrays; ? @Configuration @Slf4j public class KafkaStreamHelloListener { ? ? ?@Bean ? ?public KStream<String,String> kStream(StreamsBuilder streamsBuilder){ ? ? ? ?//创建kstream对象,同时指定从那个topic中接收消息 ? ? ? ?KStream<String, String> stream = streamsBuilder.stream("itcast-topic-input"); ? ? ? ?stream.flatMapValues(new ValueMapper<String, Iterable<String>>() { ? ? ? ? ? ?@Override ? ? ? ? ? ?public Iterable<String> apply(String value) { ? ? ? ? ? ? ? ?return Arrays.asList(value.split(" ")); ? ? ? ? ? } ? ? ? }) ? ? ? ? ? ? ? ?//根据value进行聚合分组 ? ? ? ? ? ? ? .groupBy((key,value)->value) ? ? ? ? ? ? ? ?//聚合计算时间间隔 ? ? ? ? ? ? ? .windowedBy(TimeWindows.of(Duration.ofSeconds(10))) ? ? ? ? ? ? ? ?//求单词的个数 ? ? ? ? ? ? ? .count() ? ? ? ? ? ? ? .toStream() ? ? ? ? ? ? ? ?//处理后的结果转换为string字符串 ? ? ? ? ? ? ? .map((key,value)->{ ? ? ? ? ? ? ? ? ? ?System.out.println("key:"+key+",value:"+value); ? ? ? ? ? ? ? ? ? ?return new KeyValue<>(key.key().toString(),value.toString()); ? ? ? ? ? ? ? }) ? ? ? ? ? ? ? ?//发送消息 ? ? ? ? ? ? ? .to("itcast-topic-out"); ? ? ? ?return stream; ? } }
测试:
启动微服务,正常发送消息,可以正常接收到消息