目录
4. 简述Spark on Yarn的两种部署模式的区别和特点
9. mapPartitions和foreachPartitions分区算子,相对map和foreach有什么优点?
10. 简述Spark持久化中缓存和checkpotin检查点的区别
14. 创建得到DataFrame的方式有哪些,适用于什么场景?
15. SparkSQL中数据清洗的API有哪些,各自作用是什么?
16. 设置SparkSQL的shuffle分区数的方式有哪几种?
17. 简述基于Pandas实现UDF和UDAF函数的步骤?
22. 简述Kafka 之所以具有高速的读写性能,主要有哪几个原因
查询机制:消费者在消费的时候,是如何找到对应offset偏移量的消息的
29. 结构化流中Sink输出模式有哪几类,各自特点是什么?
30. 结构化流中Sink输出终端常见的有哪几类,各自特点是什么?
spark是一款大数据统一分析引擎,底层数据结构是RDD
速度快(线程,基于内存的rdd,高效api)
易用性(多种语言python,java,scala等)
通用性(有sparksql,mlib等安装包)
兼容性(适配不同的资源调度,存储工具,可运行在多个系统中)
mapreduce是基于进程执行的,消耗资源多运算慢;
使用磁盘进行计算,反复IO读写效率低下;
API较为原始低级,实现复杂的API需要写很复杂的代码;
Spark基于线程做数据处理,创建时所需要的资源更少,运行速度更快;
引入了新的数据结构-RDD弹性分布式数据集,使得Spark可以基于内存进行数据处理,读写速度相较于磁盘更快
Spark提供了更丰富的编程API,能够轻松的实现功能开发;
两种方式分别是client客户端模式和cluster集群模式
两种方法的本质区别是driver进程运行的地方不一样
Client部署方式:Driver进程运行在你提交程序的那台机器上
? ? ? ? 优点是日志和运行结果都输出到了提交的那台机器上,方便查看结果
? ? ? ? 缺点是Driver进程和Yarn集群可能不在同一个集群中,会导致Driver进程和Excutor进程间进行数据交换的时候,效率较低
? ? ? ? 场景一般在开发和测试环境中使用
Cluster部署方式:Driver进程运行在集群中某台节点上
? ? ? ? 优点是Driver进程和Yarn集群在同一个集群中,Driver进程和Excutor进程间进行数据交换的时候效率比较高
? ? ? ? 缺点是查看日志与运行结果需要在18080或者8088的页面中进行查看
? ? ? ? 一般在生产环境中使用
????????DAGScheduler:DAG调度器,将job任务形成DAG有向无环图和划分Stage阶段;
? ? ? ? ? ? ? ? TaskScheduler:Task调度器,将Task线程分配给具体的Executor执行;
以client on spark 为例
1. 提交spark程序,在哪里提交程序,就在哪里启动Driver进程;
2. 由于Driver进程是java与scala语言编写的,无法直接执行python代码,所以需要将创建SparkContext对象的代码基于PY4J转为java后再创建对象;
? ? ? ? 2.1 Driver进程启动后,底层PY4J创建SparkContext顶级对象,同时还会创建DAGscheduler和TaskSchduler;? ? ? ? ? ? ? ??
3. Driver连接Master,根据资源的配置,向master申请资源来创建Executor
4. Master接收到资源申请,进行资源分配,分配的原则采用FIFO先进先出规则,制定资源分配方案并返回给Driver;
5. Driver连接到对应的worker从节点上,占用相应的资源,通知worker启动Excutor,启动后将信息反向注册回Driver;
6. Driver开始处理代码
? ? ? ? 6.1 Driver加载RDD相关的算子,根据算子间的依赖绘制DAG有向无环图和划分Stage阶段(一个Spark应用程序遇到Action算子后,就会触发一个Job任务的产生,Job任务会将它所依赖的所有算子都加载进来,形成一个Stage; 接着从Action算子从后往前进行回溯,遇到窄依赖就将算子放在同一个Stage当中; 如果遇到宽依赖,就经历shuffle阶段,划分形成新的Stage,最后一直回溯完成)
? ? ? ? 6.2 之后Driver需要确定任务分配给哪些Excutor进行执行,首先确定每个Stage阶段有多少个Task线程,将众多的Task线程放到Taskset集合中,DAG调度器将TaskSet集合给到Task调度器,Task调度器拿到Taskset集合以后,将Task分配给到具体Executor执行,底层是基于SchedulerBackend调度队列来实现的,
? ? ? ? 6.3 Driver通知对应的Executor进程来执行相应的任务
? ? ? ? 6.4 Executor开始执行具体的任务,因执行的是python函数,因此会调用服务器上的python解释器,将py函数和输入数据传输到python解释器,执行完后会将数据返回给Executor进程;
? ? ? ? 6.5 Executor在运行过程中,会判断是否需要将结果返回给Driver进程,如果需要就返回,如不需要就直接输出;
? ? ? ? 6.6 Driver会定时检查多个Executor的执行状态,直到所有的Executor执行完成,就认为任务运行结束;
7. Driver调用sparkContext.stop()代码,通知Master回收资源,整个程序运行结束;
分成两类:?? ?Transformation算子和Action算子
Transformation算子:返回值是一个新的RDD;该算子运行后会不会立即执行,需要配合Action算子触发。
Action算子:返回值是None或者非RDD数据类型;算子运行后会立即执行,并且把之前的Transformation算子一并运行。
五大特性:
? ? ? ? 1. RDD由一系列分区组成
? ? ? ? 2. RDD计算相当于对RDD每个分区做计算
? ? ? ? 3. RDD之间有宽窄依赖;
? ? ? ? 4. KeyValue型RDD可以自定义分区;
? ? ? ? 5. 尽量让计算程序靠近数据源,移动数据不如移动计算程序;
五大特点:
? ? ? ? 1. 分区:RDD的分区是逻辑上的分区,并不是直接对数据进行分区操作,因为RDD本身不存储数据;
? ? ? ? 2. 只读:RDD只读的,对其进行增加改变本质都是创建新的RDD
? ? ? ? 3. 依赖: 存在宽窄依赖
? ? ? ? 4. 缓存:?如果在程序中多次使用同一个RDD可以将其缓存起来,该RDD只有第一次计算时会根据血缘关系得到分区的数据.
? ? ? ? 5. checkpoint检查点: 与缓存类似,但可以持久化保存.
重分区算子有:reparation , coalesce , Partitions by
repartition:调整RDD的分区数,得到一个新RDD,既可以增大也可以减小分区数,但都会触发shuffle.
coalesce: 默认只能减小分区,减小的过程中不会触发shuffle,如果将参数2的shuffle改为True也可以增大分区,但会触发shuffle
partitions by: 主要是针对kv类型的RDD进行重分区操作,可以增大也可以减少,但都会shuffle,用户自定义函数fn来指定分区方案.
优点:每次都对一整个分区进行操作,减少分区调用操作的次数,减少资源消耗,而且可以对分区内数据批量操作,提高效率.适用于文件的打开和关闭、数据库的连接和关闭等有反复消耗资源的操作.
区别有4点:
1. 主要作用
? ? ? ? 缓存是为了提升Spark程序运算效率.
? ? ? ? 检查点是为了提升Spark程序容错性.
2. 存储位置
? ? ? ? 缓存存储在内存或磁盘中,或Executor的堆外内存中.
? ? ? ? 检查点存储在磁盘或Hdfs中
3. 生命周期
? ? ? ? 缓存在程序结束或手动调用unpersist后被删除
? ? ? ? 检查点可永久保存在HDFS上,除非手动删除
4. 血缘关系
? ? ? ? 缓存不会切断RDD的血缘关系,因为缓存是不稳定的,如果发生故障,可以从头运行RDD
? ? ? ? 检查点会切断RDD的血缘关系,因为保存在安全的HDFS上,认为不会丢失
一同使用时先设置缓存再设置检查点,可以减少一次IO过程
在spark的运行过程中,遇到了一个action算子会生成一个job
一个job将action算子和其依赖的其他算子聚合起来形成一个stage
然后会向前回溯rdd算子,如果没有shuffle阶段(产生宽依赖)就把他们
放在一个stage里面,如果有shuffle发生(产生宽依赖)就会划分一个新的
stage放置这些算子
1- Driver进程启动后,底层PY4J创建SparkContext顶级对象。在创建该对象的过程中,还会创建另外两个对象,分别是: DAGScheduler和TaskScheduler
?? ?DAGScheduler: DAG调度器。将Job任务形成DAG有向无环图和划分Stage的阶段
?? ?TaskScheduler: Task调度器。将Task线程分配给到具体的Executor执行2- 一个Spark程序遇到一个Action算子就会触发产生一个Job任务。SparkContext将Job任务给到DAG调度器,拿到Job任务后,会将Job任务形成DAG有向无环图和划分Stage的阶段。并且会确定每个Stage阶段有多少个Task线程,会将众多的Task线程放到TaskSet的集合中。DAG调度器将TaskSet集合给到Task调度器
3- Task调度器拿到TaskSet集合以后,将Task分配给到给到具体的Executor执行。底层是基于SchedulerBackend调度队列来实现的。
4- Executor开始执行任务。并且Driver会监控各个Executor的执行状态,直到所有的Executor执行完成,就认为任务运行结束
1. hive只能写SQL,spark既可以写SQL又可以写代码
2.?hive有元数据存储metastore,spark需要手动维护元数据
3. hive的运行基于磁盘,spark的运行基于内存
4. hive的计算引擎是MapReduce,spark基于spark RD
相同点:
1都是大数据分布式处理架构
2都是处理结构化数据
3都可以用yarn做集群资源调度
1.通过RDD得到一个DataFrame
场景:RDD可以存储任意结构的数据;而DataFrame只能处理二维表数据。在使用Spark处理数据的初期,可能输入进来的数据是半结构化或者是非结构化的数据,那么我可以先通过RDD对数据进行ETL处理成结构化数据,再使用开发效率高的SparkSQL来对后续数据进行处理分析。
2.内部初始化数据得到DataFrame
场景:一般用在开发和测试中。因为只能处理少量的数据
3. 读取外部文件得到DataFrame
text , json , csv
csv常设置的参数有path\header\sep\inferschema\encoding等
Drop duplicate 去重: 没有指定subset就默认一整行完全一样才会删除,指定了就将范围限定到指定字段.
dropna去空 : 没有指定参数,那只要有一个是null的那整行就被删除, 可以指定参数thresh,只有当null数量大于了thresh数才会删除整行.
fillna填充替换 :??DF.fillna(value={"name":"未知姓名","age":100}).show() ,?value必须传递参数,用于填充缺失值,subset限定缺失值替换范围。如果不是字典,那么只会替换字段类型匹配的空值,最常用的是value传递字典的形式。
shuffle分区数量默认200个,手动调整的方式如下:
1. 全局设置?: spark.sql.shuffle.partitions 数量
2. 动态分区:?在客户端通过submit命令提交的时候,动态设置shuffle分区数量,部署上线时,基于spark-submit提交运行的时候:
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ./spark-submit --conf"spark.sql.shuffle.partition=数量"
3. 写死 : SparkSession.conf.set('spark.sql.shuffle.partitions',数量)
sparksql原生只能udf,借助第三方工具可以实现udaf
UDF步骤:
1.创建python自定义函数
? ? ? ? 要求输入类型和返回值类型都必须是pandas中的series类型
2.注册进SparkSQL
? ? ? ? 方式一:udf对象 = spark.udf.register(参数1,参数2)
? ? ? ? 方式二:udf对象 = F.pandas_udf(参数1,参数2)
? ? ? ? 方式三:语法糖装饰器写法
? ? ? ? ? ? ? ? @F.pandas_udf(returnTyep = 返回值SparkSQL的数据类型)放置到对应的python函数上
3.在代码中使用
UDAF步骤:
1.创建python自定义函数
? ? ? ? 要求输入参数类型是pandas中的Series对象,返回python中的标量数据类型
2.注册进SparkSQL
? ? ? ? 方式一: udf对象= spark.udf.resiter(参数1,参数2)
? ? ? ? 方式二: udf对象 = F.pandas_udf(参数1,参数2)
? ? ? ? 方式三: 语法糖装饰器写法
? ? ? ? ? ? ? ? @ F.pandas_udf(returnType= 返回值sparksql的数据类型) 放置到对应的python函数上
3.在代码中使用
1. UDF 一进一出,split等
2. UDAF 多进一出, 聚合函数 count,sum等
3. UDTF 一进多出, 表生成函数, explode炸裂函数等
如何将sparksql翻译成rdd的,基于catalys优化器来实施
1. 当catalys接收到客户端的代码,会先校验语法,通过后会根据执行顺序,生成未解析的逻辑计划(ats抽象语法树)
2. 对于ats抽象语法树加入元数据信息,确定一共涉及到哪些字段,字段的类型是什么,以及表其他相关元数据信息,加入元数据信息后,得到了未优化的逻辑计划.
3. 对未优化的逻辑计划执行优化操作,优化是通过优化器来执行的,在优化器匹配相对应的优化规则,Sparksql底层提供了一两百个优化规则,如:
? ? ? ? 谓词下推:也叫作断言下推,将数据过滤操作提前到数据扫描的时候进行,减少后续操作的数据量,提升效率;
? ? ? ? 列式裁剪:不加载与数据分析无关的字段,减少后续处理的数据量,提升效率;
4. 由于优化规则很多,导致会得到多个优化的逻辑计划,在转换为物理执行计划的过程中,会根据成本模型(运行耗时,资源消耗等)得到一个最优的物理执行计划;
5. 将物理执行计划通过code generation(代码生成器),转换成Spark RDD的代码;
6. 最后就是将SparkRDD代码部署到集群上运行
1. 应用解耦合
2. 异步处理
3. 限流削峰
4. 消息驱动系统
架构中的角色:
1. Producer生产者:负责将信息/数据发送到kafka中
2. consumer消费者:负责将信息从kafka中取出
3. broker : kafka集群中的节点,节点与节点之间没有主从之分
4. topic : 主题,是业务层面对消息进行分类的,一个topic可以有多个分区,分区数量没有限制
5. partiton分区: 一个分区可以有多个副本,副本的数量不超过broker集群的数量
6. leader 主副本: leader主副本会主动将信息副本发送到follwer从副本上
7. Follower从副本: Follower从副本被动接收Leader主副本发送过来的信息副本
8. Zookeeper:用来管理Kafka集群,管理信息的元数据
9. ISR同步列表: 存储和Leader主副本信息差距最小的一个副本,当leader主副本无法对外提供服务的时候,会从该ISR列表中选择一个Follwer从副本变成Leader主副本,对外提供服务
1. 顺序写入磁盘 相比于随机写入,顺序写入磁盘的性能更高,减少了磁盘寻址时间,减少了IO操作的次数,提高了写入的速度
2. 零拷贝技术 直接从数据源进行读取,不用经过中间的数据拷贝过程,减少了网络IO和磁盘IO等过程,提升了效率
3. 分布式架构 分布式的架构实现了扩展与负载均衡,提高了整体的吞吐量和并发处理能力
分区机制:
? ? ? ? 1- 避免单台服务器容量的限制,分多个区可以避免单个分区的数据过大导致服务器无法存储
? ? ? ? 2- 提升topic的吞吐量,数据读写速度,利用多台服务器的数据读写能力,网络等资源
? ? ? ? 3- 分区的数量没有限制,但尽量不要超过kafka集群中broker节点个数的3倍
副本机制:?
? ? ? ? 1-通过多副本机制,提升数据的安全性,但副本过多会导致冗余过多
? ? ? ? 2- 副本数量有限制,不可超过kafka集群中broker节点个数,推荐分区的副本数量为1-3个
生产者产生的消息,是如何保存到具体分区上的
? ? ? ? 1- 随机分发策略
????????????????将消息发到到随机的某个分区上,还是发送到Leader主副本上。Python支持,Java不支持
????????????????当在发送数据的时候, 如果只传递了topic 和 value,没有指定key的时候, 那么此时就采用随机策略
? ? ? ? 2- 指定分区策略
????????????????将消息发到指定的分区上面。Python支持,Java支持
????????????????当在发送数据的时候, 如果指定了partition参数, 表示的采用指定分区的方案, 分区的编号从0开始
????????????????当指定了partition的参数后, 与DefaultPartitioner没有任何的关系
? ? ? ? 3- Hash取模策略
????????????????对消息的key先取Hash值,再和分区数取模。Python支持,Java支持
????????????????当在发送数据的时候, 如果传递了topic 和 value 以及key的时候, 那么此时就是采用hash取模策略
????????????????注意: 相同key的返回的hash值是一致的, 同样对应分区也是同一个。也就是要注意数据倾斜的问题
? ? ? ? 4- 轮询策略
? ? ? ? ? ? ? ? 在kafka的2.4及以上版本,已经更名为粘性分发策略,python不支持,java支持
? ? ? ? 5- 自定义分发策略
????????????????Python支持,Java支持
????????????????参考源代码DefaultPartitioner模仿写即可
JAVA中的轮询分发策略 和 粘性分发策略介绍
????????1- 轮询分发策略:kafka老版本的策略,当生产数据的时候,只有value但是没有key的时候,采用轮询
?? ?????????????????优点: 可以保证每个分区拿到的数据基本是一样,因为是一个一个的轮询的分发
?? ?????????????????缺点: 如果采用异步发送方式,意味着一批数据发送到broker端,由于是轮询策略,会将这一批数据拆分为多个小的批次,分别再写入到不同的分区里面去,????????????????????????????????写入进去以后,每个分区都会给予响应,会影响写入效率。
?? ?? ? ? ? 2- 粘性分发策略: kafka2.4版本及以上的策略,当生产数据的时候,只有value但是没有key的时候,采用粘性分发策略
?? ?????????????????优点: 在发送数据的时候,首先会随机的选取一个分区,然后尽可能将数据分发到这个分区上面去,也就是尽可能粘着这个分区。该分发方式,
????????????????????????????????在异步发送的操作中,效率比较高。
?? ?????????????????缺点: 在数据发送特别快的时候,可能会导致某个分区的数据比其他分区数据多很多,造成大量的数据集中在一个分区上面
消息存储机制
Kafka集群中的消息存储在一组称为“分区”的逻辑日志上。每个主题可以分成多个分区,每个分区都有一个唯一的标识符和一组不断增加的有序消息。这些分区可以分布在不同的Kafka节点上,以实现负载均衡和可伸缩性。
在每个Kafka节点上,每个分区都被存储为一个或多个文件(称为日志段),这些文件包含了该分区的所有消息。当消息被写入时,它们会被追加到最后一个日志段。当日志段达到一定大小(通过broker端参数log.segment.bytes进行配置)或时间(通过broker端参数log.segment.ms进行配置)时,将会创建一个新的日志段,原来的日志段将会被关闭。这种设计使得Kafka能够高效地追加消息,并且可以轻松地删除旧数据,同时保证消息的持久性和可靠性。此外,Kafka还提供了复制机制来确保数据的容错性和高可用性。
1-xx.log和xx.index它们的作用是什么? 答: xx.log: 称之为segment片段文件,也就是一个Partition分区的数据,会被分成多个segment(log)片段文件进行存储。 xx.index: 称之为索引文件,该文件的作用是用来加快对xx.log文件内容检索的速度
2-xx.log和xx.index文件名称的意义? 答: 这个数字是xx.log文件中第一条消息的offset(偏移量)。offset偏移量从0开始编号。
3-为什么一个Partition分区的数据要分成多个xx.log(segment片段文件)文件进行存储? 答: 1- 如果一个文件的数据量过大,打开和关闭文件都非常消耗资源 2- 在一个大的文件中,检索内容也会非常消耗资源 3- Kafka只是用来临时存储消息数据。会定时将过期数据删除。如果数据放在一个文件中,删除的效率低;如果数据分成了多个segment片段文件进行存储,删除的时候只需要判断segment文件最后修改时间,如果超过了保留时间,就直接将整个segment文件删除。该保留时间是通过server.properties文件中的log.retention.hours=168进行设置,默认保留168小时(7天)
查询机制:消费者在消费的时候,是如何找到对应offset偏移量的消息的
查询步骤:
1- 首先先确定要读取哪个xx.log(segment片段)文件。368776该offset的消息在368769.log文件中
2- 查询xx.log对应的xx.index,查询该条消息的物理偏移量范围
3- 根据消息的物理偏移量范围去读取xx.log文件(底层是基于磁盘的顺序读取)
4- 最终就获取到了具体的消息内容
Kafka集群中每分钟新产生400条数据,下游的一个消费者每分钟能够处理400条数据。
随着业务发展,Kafka集群中每分钟新产生1200条数据,下游的一个消费者每分钟能够处理400条数据。 答:会导致broker中积压的消息条数越来越多,造成消息处理不及时。可以增加消费者数量,并且将这些消费者放到同一个消费组当中
随着业务发展,Kafka集群中每分钟新产生1600条数据,下游的一个消费者每分钟能够处理400条数据。 答:会导致broker中积压的消息条数越来越多,造成消息处理不及时。再增加消费组中消费者的个数已经无法解决问题。
如何解决: 1- 增加消费组中消费者的个数 2- 提高下游消费者对消息的处理效率
Kafka消费者的负载均衡机制
1- 在同一个消费组中,消费者的个数最多不能超过Topic的分区数。如果超过了,就会有一些消费者处于闲置状态,消费不到任何数据。
2- 在同一个消费组中,一个Topic中一个分区的数据,只能被同个消费组中的一个消费者所消费,不能被同个消费组中多个消费者所消费。但是一个消费组内的一个消费者可以消费多个分区的数据。也就是分区和消费者的对应关系,多对一
3- 不同的消费组中的消费者,可以对一个Topic的数据同时消费,也就是不同消费组间没有任何关系。也就是Topic的数据能够被多个消费组中的消费者重复消费。
补充:
查看消费组中有多少个消费者,用来避免消费者个数超过分区个数。
./kafka-consumer-groups.sh --bootstrap-server node1.itcast.cn:9092,node2.itcast.cn:9092 --group g_1 --members --describe
生产者保证数据不丢失:
????????生产者端将消息发给Kafka集群后,broker要给生产者响应信息,响应原理就是ack机制
? ? ? ? ack机制有三个参数,分别是 0,1,-1
? ? ? ? 0:生产给到集群,生产者不等待不接收broker返回的响应信息
? ? ? ? 1: 生产给到集群,集群中的分区对应leader主副本所在的broker给生产者返回响应信息
? ? ? ? -1: 生产给到集群,集群中的分区对应的所有副本给生产者返回响应信息
? ? ? ? 效率级别 0>1>-1
? ? ? ? 安全级别 -1>1>0
????????????????根据安全和效率的要求选择ack参数配置
Broker端如何保证数据不丢失:
? ? ? ? broker通过多副本机制保证数据不丢失,同时需要生产者将ack设置为-1,安全级别最高
消费端如何保证数据不丢失:
????????消费者消费消息的步骤:
? ? ? ? 1- 消费者首先连接到kafka集群中,进行消息消费
? ? ? ? 2- Kafka集群接收到消费者的请求后,会根据消费组id,查找上次消费消息对应的offset偏移量
? ? ? ? 3- 如果没有查找到offset,消费者默认从topic最新的地方开始消费
? ? ? ? 4- 如果有查找到offset,会从上次消费到的offset地方继续进行消费
? ? ? ? ? ? ? ? 4.1- 首先先确定要读取的这个偏移量在哪个segment文件当中
? ? ? ? ? ? ? ? 4.2- 查询这个segment文件对应的index文件,根据offset确定这个消息在log文件的什么位置,也就是确定消息的物理偏移量
? ? ? ? ? ? ? ? 4.3- 读取log文件,查询对应范围内的数据即可
? ? ? ? ? ? ? ? 4.4- 获取最终的消息数据
? ? ? ? 5- 消费者在消费的过程中,底层有个线程会定时的将消费的offset提交给到kafka集群,kafka集群会更新对应的offset的值;
1- 将消费者的 enable.auto.commit 属性设置为 false,并手动管理消费者的偏移量。这样可以确保消费者在处理完所有消息后才更新偏移量,避免重复消费数据。也就是将消息的消费、消息业务处理代码、offset提交代码放在同一个事务当中。
2- 使用幂等生产者或事务性生产者来确保消息只被发送一次。这样可以避免重复发送消息,从而避免消费者重复消费数据。
3- 在消息中加入唯一的ID
输出模式有三种:append , complete, update
????????append模式:
????????????????????????只支持追加,不支持聚合和排序,每次只打印追加的内容
????????????????????????适用于对数据进行累加计算的场景
????????complete模式:
????????????????????????每一次都是全量处理,因为数据量大,所以必须聚合,也可以支持排序
????????????????????????适用于需要获取完整数据集的场景,不适用无限数据流的场景
????????update模式:
????????????????????????就是支持聚合的append,有聚合操作,只会输出有变化和新增的内容,不支持排序
? ? ? ? ? ? ? ? ? ? ? ? 适用场景需要跟踪数据的变化,实时监控指标的更新等
1- console sink: 将结果数据输出到控制台。主要是用在测试中,并且支持3种输出模式
2- File sink: 输出到文件。将结果数据输出到某个目录下,形成文件数据。只支持append模式
3- foreach sink 和 foreachBatch sink: 将数据进行遍历处理。遍历后输出到哪里,取决于自定义函数。并且支持3种输出模式
4- memory sink: 将结果数据输出到内存中。主要目的是进行再次的迭代处理。数据大小不能过大。支持append模式和complete模式
5- Kafka sink: 将结果数据输出到Kafka中。类似于Kafka中的生产者角色。并且支持3种输出模式
Spark结构化流可以通过设置水印(watermark)来处理延迟到来的数据。
水印是一种基于时间的度量,表示数据流中已经处理的最新时间。可以将水印理解为一个延迟阈值,表示在当前时间点之前的所有数据都已经到达,而在此之后的数据可能还未到达。Spark结构化流会根据水印来判断哪些数据已经过期,从而进行数据清理和聚合操作。
具体来说,可以通过以下步骤来设置水印:
定义事件时间字段:在创建数据流时,需要指定事件时间字段,即数据中表示事件时间的列。
设置水印生成规则:使用
withWatermark()
方法来设置水印生成规则,该方法需要指定一个时间间隔作为水印的生成周期。处理延迟数据:在数据处理过程中,可以使用
window()
方法来对数据进行窗口操作,同时使用trigger()
方法来设置触发器,以便在水印到达窗口结束时间后触发数据处理。
常规处理小文件的办法:
1- 大数据框架提供的现有的工具或者命令
?? ?1.1- hadoop fs -getmerge /input/small_files/*.txt /output/merged_file.txt
?? ?1.2- hadoop archive -archiveName myhar.har -p /small_files /big_files
?? ?
2- 可以通过编写自定义的代码,将小文件读取进来,在代码中输出的时候,输出形成大的文件wholeTextFiles: 读取小文件。
?? ?1-支持本地文件系统和HDFS文件系统。参数minPartitions指定最小的分区数。
?? ?2-通过该方式读取文件,会尽可能使用少的分区数,可能会将多个小文件的数据放到同一个分区中进行处理。
? ? 3-一个文件要完整的存放在一个元组中,也就是不能将一个文件分成多个进行读取。文件是不可分割的。
? ? 4-RDD分区数量既受到minPartitions参数的影响,同时受到小文件个数的影响