Spark Streaming简介与代码实例

发布时间:2024年01月03日

背景:

Spark Streaming是准实时流处理框架,处理响应时间一般以分钟为单位,处理实时数据的延迟时间一般是秒级别的;其他容易混淆的例如Storm实时流处理框架,处理响应是毫秒级。

在我们项目实施选择流框架时需要看具体业务场景:使用MapReduce和Spark进行大数据处理,能够解决很多生产环境下的计算问题,但是随着业务逐渐丰富,数据逐渐丰富,这种批处理在很多场景已经不能满足生产环境的需要了,体现例如①离线计算一般就会建立一个数据仓库,数据量大的情况下,计算耗时也会很长。②例如一个业务场景,需要在根绝客户访问一个网站时的浏览、点击行为,实时做出一些业务上的反馈,时延太长这个数据也流失了很多价值。③现在技术发展的需要,许多机器学习和人工智能应用需要大量的实时数据进行训练和优化。

数据是源源不断产生,计算程序也是一直存在的,即实时计算。

1.流式计算和批处理的关系

批处理和流式本来就存在某种微妙的关系,不是完全隔离的。Spark Streaming充分利用了这种微妙关系,将其发挥到极致。批量处理是Spark Streaming流式处理的一个窗口特别大的特例,实际上,如果我们定时执行某个Spark程序,或者每天执行一次,也相当于是流失计算,不过是以天为事件窗口。但是如果细加观察,Spark Streaming的每个batch又都是一个批处理,只是因为这个批处理可以足够小,看起来就像数据在真实流动一样,所以我们也称之为流式处理。

2.主流的流式计算框架

流式计算最具代表性的框架之一就是Storm。在Storm中,先要设计一个用于实时计算的图状结构,我们称之为拓扑(topology)。这个拓扑将会被提交给集群,由集群中的主控节点(Master node)分发代码,将任务分配给工作节点(Worker node)执行。一个拓扑中包括spout和bolt两种角色,其中spout发送消息,负责将数据流以tuple元组的形式发送出去;而bolt则负责转换这些数据流,在bolt中可以完成计算、过滤等操作,bolt自身也可以随机将数据发送给其他bolt。由spout发射出的tuple是不可变数组,对应着固定的键值对。

Spark Streaming是核心Spark API的一个扩展,它并不会像Storm那样一次一个地处理数据流,而是在处理前按时间间隔预先将其切分为一段一段的批处理作业。Spark针对持续性数据流的抽象称为DStream(DiscretizedStream),一个DStream是一个微批处理(micro-batching)的RDD(弹性分布式数据集);RDD则是一种分布式数据集,能够以两种方式并行运作,分别是任意函数和滑动窗口数据的转换。

除了Spark,Flink也是类似Spark的计算框架,Flink是一个针对流数据和批数据的分布式处理引擎。它主要是由Java代码实现。对Flink而言,其所要处理的主要场景就是流数据,批数据只是流数据的一个极限特例而已。Flink会把所有任务当成流来处理,这也是其最大的特点。Flink可以支持本地的快速迭代,以及一些环形的迭代任务,并且Flink可以定制化内存管理。在这点,如果要对比Flink和Spark的话,Flink并没有将内存完全交给应用层。这也是为什么Spark相对于Flink,更容易出现OOM的原因(out ofmemory)。就框架本身与应用场景来说,Flink更相似与Storm。

3.自定义流式计算举例

为了更好理解流式计算思想,我们来举例一个更具体的流式计算的程序。常见的实时计算需要有数据源、消息队列、数据处理。我们的数据源来自Socket,消息队列为了保证线程安全,我们使用Java自带的BlockingQueue,而数据处理就通过一个独立线程读取消息队列的内容处理,结果我们放在ConcurrentHashMap中,保证线程安全。

Spark Streaming的基本原理是将输入数据流以时间片(秒级)为单位进行拆分,然后以类似批处理的方式处理每个时间片数据,见下图:

首先,Spark Streaming把实时输入数据流以时间片Δt(如1秒)为单位切分成块。Spark Streaming会把每块数据作为一个RDD,并使用RDD操作处理每一小块数据。每个块都会生成一个Spark Job处理,最终结果也返回多个块。使用Spark Streaming编写的程序与编写Spark程序非常相似,在Spark程序中,主要通过操作RDD提供的接口,如Map、Reduce、Filter等,实现数据的批处理。而在Spark Streaming中,则通过操作DStream(表示数据流的RDD序列)提供的接口,这些接口和RDD提供的接口类似。下图显示了Spark Streaming程序到Spark Job的转换:

Spark Streaming把程序中对DStream的操作转换为DStream Graph,对于每个时间片,DSteam Graph都会产生一个RDD Graph;针对每个输出操作(如Print、Foreach等),SparkStreaming都会创建一个Spark Action;对于每个Spark Action,Spark Streaming都会产生一个相应的Spark Job,Spark会调度Task到相应的Spark Executor上执行。

Spark Streaming的一些常用组件如下:

1.StreamingContext:Spark Streaming中Driver端的上下文对象,初始化的时候会构造Spark Streaming应用程序需要使用的组件,比如DStreamGraph、JobScheduler 等

2.JobGenerator:主要是从DStream产生Job,且根据指定时间执行checkpoint。它维护着一个定时器,该定时器在批处理时间到来的时候会生成作业的操作。

3.JobScheduler:主要用于调度Job。JobScheduler主要通过JobGenerator产生Job,并且通过ReceiverTracker管理流数据接收器Receiver。

4.ReceiverTracker:管理各个Executor上的Receiver的元数据。它在启动的时候,需要根据流数据接收器Receiver分发策略通知对应的Executor中的ReceiverSupervisor(接收器管理着)启动,然后再由ReceiverSupervisor来启动对应节点的Receiver。

数据源:

数据源程序,使用Java编写一个程序,使用socket来向7777端口发送数据:

Package test;

import java.io.Bufferedwriter;

import java.io.IOException;

import java.io.Outputstream;

import java.net.ServerSocket;

import java.net.socket;

import java.io.Outputstreamwriter;



public class DataGenerator{

???????? public static void main(string[] args) throws IOException{

????????????????? //设置发送端口为7777

????????????????? ServerSocket ss = new ServerSocket(7777);

????????????????? Socket accept = ss.accept();

????????????????? Outputstream outputstream = accept.getoutputstream();

????????????????? Bufferedwriter writer = new Bufferedwriter(new Outputstreamhriter(outputstream));

????????????????? //发送的字符串

????????????????? String[] words = new String[]{"hello Hadoop\n", "hello spark\n", "world hello\n", "hello\n", "hadoop\n"};

????????????????? while (true){

????????????????????????? try{

?????????????????????????????????? Thread.sleep(1000);

????????????????????????? }catch (InterruptedException e){

?????????????????????????

????????????????????????? }

????????????????????????? //随机发送一个字符串

????????????????????????? writer.write(words[(int)(Math.random() * 5)]);

????????????????????????? writer.flush();

????????????????? }

???????? }

}

数据接收处理:

import org.apache.spark.streaming.streamingContext

import org.apache.spark.streaming.streamingContext

import org.apache.spark.streaming.dstream.Dstream

import org.apache.spark.streaming.Duration

import org.apache.spark.streaming.seconds

import org.apache.spark.SparkContext

import org.apache.spark.SparkConf

import org.apache.spark. storage.storageLevel



val sparkconf = new SparkConf().setApplame("NetworkWordCount").setMaster("localhost")

//设置每秒处理一次

val ssc = new streamingContext(sc, Seconds(1))

//使用socket发送数据,ip为localhost,端门为7777

val lines =ssc.socketTextstream("localhost",7777,StorageLevel.MEMORY_AND_DISK_SER)

//flatMap以空格分隔

val words = lines.flatmap(_.split(" "))

//对每一组数据各个字符串数量累加

val wordCounts = words.map(x => (x,1)).reduceBykey(_+_)

//对每一组数据各个字符串数量累加,每10秒一次,统计最近30秒的结果

val wordCounts = words.map(x => (x,1)).reduceByKeyAndWindow((a:Int,b:Int) => (a+b),Seconds(30),Seconds(10))

//输出



wordCounts.print()

ssc.start()

ssc.awaitTermination()

运行数据源:

把刚刚的Java程序打包,用spark-submit执行,我们将打包好的程序放到某一个目录,例如/opt下,命名为hadoop-streaming.jar,使用spark-submit提交(命令制定类名、主机名、UI端口号、Jar包路径):

nohup .../你的路径/bin/spark-submit –class test.DataGenerator –master spark://localhost:9000 /opt/Hadoop-streaming.jar &

运行数据接收、处理程序:

进入spark-shell来运行上面写好的“数据接收处理”的代码,可收到结果。

文章来源:https://blog.csdn.net/qq_30168227/article/details/135344025
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。