目录
?六 . Kafka 之所以具有高速的读写性能,主要有以下几个原因
应用场景:
? ? ? ? 应用解耦合:类似单点故障
? ? ? ? 异步处理: 减少处理时间
? ? ? ? 限流削峰 : 不管流量多大,放到消息队列中,都是按照一定的节奏进行处理
? ? ? ? 消息驱动的系统: 消息队列,消息生产者,消费者(负责对消息进行处理)
????????消息(message): 指的是数据,只不过这个数据存在一定流动状态
????????队列(queue): 指的容器,可以存储数据,只不过这个容器具备FIFO(先进先出)特性
消息队列中两个角色:
? ? ? ? 生产者producer:生产/发送消息到消息队列中
? ? ? ? 消费者consumer:?从消息队列中获取消息
1. 基本介绍
????????kafka是一款消息队列的中间件产品;
????????kafka特点:
? ? ? ? ????????可靠性
? ? ? ? ????????可扩展性
? ? ? ? ????????耐用性
? ? ? ? ????????性能
?2. kafka架构
? ? ? ? 1. Kafka中集群节点叫broker ;
? ? ? ? 2. 集群的节点与节点之间,没有主从之分 ;?
? ? ? ? 3. 同一个分区的不同副本间中, 有主从关系 ,主是leader , 从是Follower
? ? ? ? 4. 同一个Partitions分区可以设置多个副本 , 但是副本数量不能超过集群broker节点的个数
? ? ? ? 5. zookeeper用来管理集群,以及管理元数据信息
? ? ? ? 6. Topic 主题 ,是业务层面对消息进行分类的
三台虚拟机启动Zookeeper
????????cd /export/server/zookeeper/bin
????????./zkServer.sh start
node1脚本启动kafka
?????????cd /export/onekey
????????./start-kafka.sh
????????./stop-kafka.sh
1.? 创建Topic
./kafka-topics.sh --bootstrap-server node1.itcast.cn:9092,node2.itcast.cn:9092 --create --topic test02 --partitions 4 --replication-factor 2 ?
参数:?
? ? ? ? -- bootstrap-server: Kafka集群中broker连接信息
? ? ? ? -- create : 指定操作类型 .这里是新建Topic
? ? ? ? -- topic: 指定要新建的Topic名称
? ? ? ? -- partitions :设置Topic的分区数
? ? ? ? -- relication-factor :设置Topic分区的副本数
?2.? 查看Topic
./kafka-topics.sh --bootstrap-server node1.itcast.cn:9092,node2.itcast.cn:9092 --list
参数说明:
?? ?--bootstrap-server: Kafka集群中broker连接信息
?? ?--list: 指定操作类型。这里是查看Kafka集群上所有可用的Topic列表
?3. 查看具体Topic
./kafka-topics.sh --bootstrap-server node1.itcast.cn:9092,node2.itcast.cn:9092 --describe --topic test04
参数说明:
?? ?--bootstrap-server: Kafka集群中broker连接信息
?? ?--describe: 指定操作类型。这里是查看具体Topic信息
?4. 模拟生产者Producer
./kafka-console-producer.sh --broker-list node1.itcast.cn:9092,node2.itcast.cn:9092 --topic test04
参数说明:
?? ?--broker-list: Kafka集群中broker连接信息
?? ?--topic: 指定要将消息发送到哪个具体的Topic
5. 模拟消费者 Consumer
?
./kafka-console-consumer.sh --bootstrap-server node1.itcast.cn:9092,node2.itcast.cn:9092 --topic test04
参数说明:
?? ?--bootstrap-server: Kafka集群中broker连接信息
?? ?--topic: 指定要从哪个Topic中消费消息
?? ?--from-beginning: 指定该参数以后,会从最旧的地方开始消费
?? ?latest: 消费者(默认)从最新的地方开始消费
?? ?--max-messages: 最多消费的条数。满足条数后,就会自动结束
?? ?--group: 指定消费组名称。一个消费者只能属于一个消费组;一个消费组里面可以有多个消费者。同一个Topic中的同一条数据,只能被同一个消费组中的一个消费者所消费
?? ?
6. 修改Topic
?
./kafka-topics.sh --bootstrap-server node1.itcast.cn:9092,node2.itcast.cn:9092 --alter --topic test01 --partitions 10
分区: 只能增大,不能减小。而且没有数量限制
副本: 既不能增大,也不能减小减小分区:
./kafka-topics.sh --bootstrap-server node1.itcast.cn:9092,node2.itcast.cn:9092 --alter --topic test01 --partitions 1?修改副本数:
./kafka-topics.sh --bootstrap-server node1.itcast.cn:9092,node2.itcast.cn:9092 --alter --topic test01 --replication-factor 2 --partitions 11
7. 删除Topic
?
./kafka-topics.sh --bootstrap-server node1.itcast.cn:9092,node2.itcast.cn:9092 --delete --topic test01
参数说明:
?? ?--bootstrap-server: Kafka集群中broker连接信息
?? ?--delete: 指定操作类型。这里是删除Topic
?? ?--topic: 指定要删除哪个Topic
?8. 查看消费组中有多少个消费者
./kafka-consumer-groups.sh --bootstrap-server node1.itcast.cn:9092,node2.itcast.cn:9092 --group g_01 --members --describe
?
? ? ? ? 分区的作用:
? ? ? ? ????????1- 避免单台服务器容量的限制
? ? ? ? ????????2- 提升Topic的吞吐量
? ? ? ????????? 3 - 分区数量不要超过Kafka集群中的broker节点个数的3倍
? ? ? ? 副本的作用:
? ? ? ? ? ? ? ? 1 - 提升数据安全性,但也会导致冗余过多
? ? ? ? ? ? ? ? 2- 副本个数不能超过集群的broker节点个数,推荐副本1-3个
? ? ? ? 消息存储机制
1-xx.log和xx.index它们的作用是什么?
????????答:
????????xx.log: 称之为segment片段文件,也就是一个Partition分区的数据,会被分成多个segment(log)片段文件进行存储。
????????xx.index: 称之为索引文件,该文件的作用是用来加快对xx.log文件内容检索的速度
2-xx.log和xx.index文件名称的意义?
????????答: 这个数字是xx.log文件中第一条消息的offset(偏移量)
3-为什么一个Partition分区的数据要分成多个xx.log(segment片段文件)文件进行存储?
????????答:
? ? ? 1- 如果一个文件的数据量过大,打开和关闭文件都非常消耗资源
? ? ? 2- 在一个大的文件中,检索内容也会非常消耗资源
? ? ? 3- Kafka只是用来临时存储消息数据。会定时将过期数据删除。如果数据放在一个文件中,删除的效率低;
????????如果数据分成了多个segment片段文件进行存储,删除的时候只需要判断segment文件最后修改时间,如果超过了保留时间,就直接将整个segment文件删除。该保留时间是通过server.properties文件中的log.retention.hours=168进行设置,默认保留168小时(7天)
????????
????????查询机制
查询步骤:
1- 首先先确定要读取哪个xx.log(segment片段)文件。368776该offset的消息在368769.log文件中
2- 查询xx.log对应的xx.index,查询该条消息的物理偏移量范围
3- 根据消息的物理偏移量范围去读取xx.log文件(底层是基于磁盘的顺序读取)
4- 最终就获取到了具体的消息内容
分发策略如下这些:
1- 随机分发策略:将消息发到到随机的某个分区上。Python支持,Java不支持
2- 指定分区策略:将消息发到指定的分区上面。Python支持,Java支持
3- Hash取模策略:对消息的key先取Hash值,再和分区数取模。Python支持,Java支持
4- 轮询策略:在Kafka的2.4及以上版本,已经更名成粘性分发策略。Python不支持,Java支持
5- 自定义分发策略:Python支持,Java支持
Kafka之所以具有高速的读写性能,主要有以下几个原因:
分布式架构:Kafka采用分布式架构,可以通过水平扩展来处理大规模的数据流。它将数据分成多个分区,并将这些分区分布在不同的节点上,实现了数据的并行处理和负载均衡,从而提高了读写性能。
零拷贝技术:Kafka使用零拷贝技术来减少数据在内存和磁盘之间的拷贝次数。它通过直接内存访问(DMA)技术,将数据从磁盘读取到内存或者从内存写入到磁盘,避免了数据的多次复制,减少了IO操作的开销,提高了读写性能。
批量写入和压缩:Kafka支持批量写入消息和消息的压缩。它可以将多个消息一次性写入到磁盘,减少了磁盘IO的次数,提高了写入性能。同时,Kafka还支持对消息进行压缩,减小了消息的存储空间,降低了网络传输的开销,进一步提高了读写性能。
高效的消息索引和存储结构:Kafka使用高效的消息索引和存储结构,例如日志结构和位移索引,可以快速地定位和检索消息。它采用追加写入的方式,顺序写入磁盘,减少了随机写入的开销,提高了读写性能。
综上所述,Kafka通过分布式架构、零拷贝技术、批量写入和压缩、高效的消息索引和存储结构等手段,实现了高速的读写性能,使其成为处理大规模数据流的理想选择。
count(1)会记null,
count(0)会记null,
count(*)会记null
?count(字段)不会记null
count (null)得到null
import os from pyspark import SparkConf, SparkContext, StorageLevel from pyspark.sql import SparkSession import pyspark.sql.functions as F from pyspark.sql import Window as win from pyspark.sql.types import StructType, IntegerType, StringType, StructField, FloatType # 绑定指定的Python解释器 os.environ['SPARK_HOME'] = '/export/server/spark' os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3' os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3' if __name__ == '__main__': # 1- 创建SparkSession对象 spark = SparkSession.builder \ .config('spark.sql.shuffle.partitions', 1) \ .appName('new_sale') \ .master('local[*]') \ .getOrCreate() # 使用框架 spark.conf.set('spark.sql.execution.arrow.pyspark.enabled', True)
?