kafka是一种高吞吐量的分布式发布订阅消息消息队列,有如下特性:
可扩展性:Kafka可以处理大规模的数据流,并支持高并发的生产和消费操作。它可以水平扩展以适应负载的增长。
持久性:Kafka将消息持久化到磁盘,允许消息在发布和消费之间进行持久存储。这使得消费者能够根据自己的节奏处理数据,并且不会因为未及时消费而丢失数据。
可靠性:Kafka通过在多个服务器上复制分区来提供容错性。如果某个服务器故障,仍然可以从其他副本读取数据。
实时处理:Kafka支持实时数据流的处理,允许应用程序实时地处理和分析数据。
生态系统:Kafka有一个丰富的生态系统,提供了各种工具和集成,如Kafka?Connect用于数据导入和导出,Kafka?Streams用于流处理,以及Kafka客户端库可用于多种编程语言。
*在Kafka有几个比较重要的概念:
broker
用于标识每一个Kafka服务,当然同一台服务器上可以开多个broker,只要他们的broker?id不相同即可
Topic
消息主题,从逻辑上区分不同的消息类型
Partition
用于存放消息的队列,存放的消息都是有序的,同一主题topic可以分多个partition,如分多个partition时,同样会以如partition1存放1,3,5消息,partition2存放2,4,6消息。
Produce
消息生产者,生产消息,可指定向哪个topic,topic哪个分区中生成消息。
Consumer
消息消费者,消费消息,同一消息只能被同一个consumer?group中的consumer所消费。consumer是通过offset进行标识消息被消费的位置。当然consumer的个数取决于此topic所划分的partition,如同一group中的consumer个数大于partition的个数,多出的consumer将不会处理消息。
from confluent_kafka import Producer
# bootstrap.servers 对应的服务器地址,可以有多个mybroker
p = Producer({'bootstrap.servers': 'mybroker1,mybroker2'})
def delivery_report(err, msg):
""" Called once for each message produced to indicate delivery result.
Triggered by poll() or flush(). """
if err is not None:
print('Message delivery failed: {}'.format(err))
else:
print('Message delivered to {} [{}]'.format(msg.topic(), msg.partition()))
for data in some_data_source:
# Trigger any available delivery report callbacks from previous produce() calls
p.poll(0)
# Asynchronously produce a message, the delivery report callback
# will be triggered from poll() above, or flush() below, when the message has
# been successfully delivered or failed permanently.
# 修改此处设定partition= n, n代表几个分区
p.produce('mytopic', data.encode('utf-8'), partition=3, callback=delivery_report)
# Wait for any outstanding messages to be delivered and delivery report
# callbacks to be triggered.
p.flush()
# =============================分割线=============================
from confluent_kafka import Consumer, KafkaError
c = Consumer({
'bootstrap.servers': 'mybroker', # Producer中对应的bootstrap.servers 对应的服务器地址
'group.id': 'mygroup',
'auto.offset.reset': 'earliest' # topic的一些配置
})
#c.subscribe(['mytopic']) # 指定消费的topic,与生产时的topic一致
# 此处修改,需要消费的分区
tp = TopicPartition("mytopic", 0, 0) # 第一个0是分区,第二个0是auto.offset.reset,不能为负数
c.assign([tp])
c.seek(tp)
while True:
msg = c.poll(1.0)
if msg is None:
continue
if msg.error():
print("Consumer error: {}".format(msg.error()))
continue
print('Received message: {}'.format(msg.value().decode('utf-8')))
c.close()
Producer?:消息生产者,就是向kafka?broker发消息的客户端;
Consumer?:消息消费者,向kafka?broker取消息的客户端;
Consumer?Group(CG):消费者组,由多个consumer组成。消费者组内每个消费者负责消费不同分区的数据,一个分区只能由一个消费组消费;消费者组之间互不影响。所有的消费者都属于某个消费者组,即消费者组是逻辑上的一个订阅者**。
Broker?:一台kafka服务器就是一个broker。一个集群由多个broker组成。一个broker可以容纳多个topic。
Topic?:可以理解为一个队列,生产者和消费者面向的都是一个topic;
Partition:为了实现扩展性,一个非常大的topic可以分布到多个broker(即服务器)上,一个topic可以分为多个partition,每个partition是一个有序的队列;
Replica:副本,为保证集群中的某个节点发生故障时,该节点上的partition数据不丢失,且kafka仍然能够继续工作,kafka提供了副本机制,一个topic的每个分区都有若干个副本,一个leader和若干个follower。
leader:每个分区多个副本的“主”,生产者发送数据的对象,以及消费者消费数据的对象都是leader。
follower:每个分区多个副本中的“从”,实时从leader中同步数据,保持和leader数据的同步。leader发生故障时,某个follower会成为新的follower。
关于Zookeeper:
Zookeeper是一个开源的分布式的,为分布式应用提供协调服务的Apache项目。
工作机制:
是一个基于观察者模式设计的分布式服务管理框架,它负责存储和管理大家都关心的数据,然后接收观察者的注册,一旦这些数据状态发生变化,Zookeeper就将负责通知已经在Zookeeper上的那些观察者做出相应的反应。
Kafka中消息是以topic进行分类的,生产者生产消息,消费者消费消息,都是面向topic的。
topic是逻辑上的概念,而partition是物理上的概念,每个partition对应于一个log文件,该log文件中存储的就是producer生产的数据。Producer生产的数据会被不断追加到该log文件末端,且每条数据都有自己的offset。消费者组中的每个消费者,都会实时记录自己消费到了哪个offset,以便出错恢复时,从上次的位置继续消费。
由于生产者生产的消息会不断追加到log文件末尾,为防止log文件过大导致数据定位效率低下,Kafka采取了分片和索引机制,将每个partition分为多个segment。每个segment对应两个文件——“.index”文件和“.log”文件。这些文件位于一个文件夹下,该文件夹的命名规则为:topic名称+分区序号。例如,first这个topic有三个分区,则其对应的文件夹为first-0,first-1,first-2。
“.index”文件存储大量的索引信息,“.log”文件存储大量的数据,索引文件中的元数据指向对应数据文件中message的物理偏移地址。比如1-237
producer发送的数据封装成一个ProducerRecord对象。
(1)指明?partition?的情况下,直接将指明的值直接作为?partiton?值;
(2)没有指明?partition?值但有?key?的情况下,将?key?的?hash?值与?topic?的?partition?数进行取余得到?partition?值;
(3)既没有?partition?值又没有?key?值的情况下,第一次调用时随机生成一个整数(后面每次调用在这个整数上自增),将这个值与?topic?可用的?partition?总数取余得到?partition?值,也就是常说的?round-robin?算法。
一个consumer?group中有多个consumer,一个?topic有多个partition,所以必然会涉及到partition的分配问题,即确定那个partition由哪个consumer来消费。
Kafka有两种分配策略,一是roundrobin(默认的),一是range。
副本数据同步策略
方案 | 优点 | 缺点 |
半数以上完成同步,就发送ack | 延迟低 | 选举新的leader时,容忍n台节点的故障,需要2n+1个副本 |
全部完成同步,才发送ack | 选举新的leader时,容忍n台节点的故障,需要n+1个副本 | 延迟高 |
Kafka选择了第二种方案,原因如下:
1.同样为了容忍n台节点的故障,第一种方案需要2n+1个副本,而第二种方案只需要n+1个副本,而Kafka的每个分区都有大量的数据,第一种方案会造成大量数据的冗余。
2.虽然第二种方案的网络延迟会比较高,但网络延迟对Kafka的影响较小。
比如说3台机器,推选一个leader,每个人手里都有一票,可以投自己,发现一个人只要半数以上的票就已经可以成为leader
??采用第二种方案之后,设想以下情景:leader收到数据,所有follower都开始同步数据,但有一个follower,因为某种故障,迟迟不能与leader进行同步,那leader就要一直等下去,直到它完成同步,才能发送ack。这个问题怎么解决呢?
??Leader维护了一个动态的in-sync?replica?set?(ISR),意为和leader保持同步的follower集合。当ISR中的follower完成数据的同步之后,leader就会给follower发送ack。如果follower长时间未向leader同步数据,则该follower将被踢出ISR,该时间阈值由replica.lag.time.max.ms参数设定。Leader发生故障之后,就会从ISR中选举新的leader。
对于某些不太重要的数据,对数据的可靠性要求不是很高,能够容忍数据的少量丢失,所以没必要等ISR中的follower全部接收成功。
所以Kafka为用户提供了三种可靠性级别,用户根据对可靠性和延迟的要求进行权衡,选择以下的配置。
acks参数配置:
acks:
0:producer不等待broker的ack,这一操作提供了一个最低的延迟,broker一接收到还没有写入磁盘就已经返回,当broker故障时有可能丢失数据;
1:producer等待broker的ack,partition的leader落盘成功后返回ack,如果在follower同步成功之前leader故障,那么将会丢失数据
-1(all):producer等待broker的ack,partition的leader和follower全部落盘成功后才返回ack。但是如果在follower同步完成后,broker发送ack之前,leader发生故障,那么会
为了使我们能够专注于自己的业务逻辑,Kafka提供了自动提交offset的功能。?
自动提交offset的相关参数:
enable.auto.commit:是否开启自动提交offset功能
auto.commit.interval.ms:自动提交offset的时间间隔
Kafka的Producer发送消息采用的是异步发送的方式。在消息发送的过程中,涉及到了两个线程——main线程和Sender线程,以及一个线程共享变量——RecordAccumulator。main线程将消息发送给RecordAccumulator,Sender线程不断从RecordAccumulator中拉取消息发送到Kafka?broker。
Producer拦截器(interceptor)是在Kafka?0.10版本被引入的,主要用于实现clients端的定制化控制逻辑。
对于producer而言,interceptor使得用户在消息发送前以及producer回调逻辑前有机会对消息做一些定制化需求,比如修改消息等。同时,producer允许用户指定多个interceptor按序作用于同一条消息从而形成一个拦截链(interceptor?chain)。Intercetpor的实现接口是org.apache.kafka.clients.producer.ProducerInterceptor,其定义的方法包括:
(1)configure(configs)
获取配置信息和初始化数据时调用。
(2)onSend(ProducerRecord):
该方法封装进KafkaProducer.send方法中,即它运行在用户主线程中。Producer确保在消息被序列化以及计算分区前调用该方法。用户可以在该方法中对消息做任何操作,但最好保证不要修改消息所属的topic和分区,否则会影响目标分区的计算。
(3)onAcknowledgement(RecordMetadata,?Exception):
该方法会在消息从RecordAccumulator成功发送到Kafka?Broker之后,或者在发送过程中失败时调用。并且通常都是在producer回调逻辑触发之前。onAcknowledgement运行在producer的IO线程中,因此不要在该方法中放入很重的逻辑,否则会拖慢producer的消息发送效率。
(4)close:
关闭interceptor,主要用于执行一些资源清理工作
如前所述,interceptor可能被运行在多个线程中,因此在具体实现时用户需要自行确保线程安全。另外倘若指定了多个interceptor,则producer将按照指定顺序调用它们,并仅仅是捕获每个interceptor可能抛出的异常记录到错误日志中而非在向上传递。这在使用过程中要特别留意。
优先选择?RabbitMQ?的条件:?高级灵活的路由规则;?消息时序控制(控制消息过期或者消息延迟);?高级的容错处理能力,在消费者更有可能处理消息不成功的情景中(瞬时或者持久);?更简单的消费者实现。?
优先选择?Kafka?的条件:?严格的消息顺序;?延长消息留存时间,包括过去消息重放的可能;?传统解决方案无法满足的高伸缩能力。
RabbitMQ是一个开源的消息代理和队列服务器,用来通过普通协议在不同的应用之间共享数据(跨平台跨语言)。RabbitMQ是使用Erlang语言编写,并且基于AMQP协议实现。
AMQP(Advanced?Message?Queuing?Protocol)?高级消息队列协议:高级消息队列协议。它是应用层协议的一个开放标准,为面向消息的中间件设计,基于此协议的客户端与消息中间件可传递消息,并不受产品、开发语言等条件的限制。
AMQP中增加了Exchange和Binging的角色。生产者把消息发布到Exchange上,消息最终到达队列并被消费者接收,而Binding决定交换器的消息应该发送到哪个队列。
Broker?:?标识消息队列服务器实体rabbitmq-server
v-host?:?Virtual?Host?虚拟主机。标识一批交换机、消息队列和相关对象。虚拟主机是共享相同的身份认证和加密环境的独立服务器域。每个vhost本质上就是一个mini版的RabbitMQ服务器,拥有自己的队列、交换器、绑定和权限机制。vhost是AMQP概念的基础,必须在链接时指定,RabbitMQ默认的vhost是?/。
Exchange:?交换器用来接收生产者发送的消息并将这些消息路由给服务器中的队列。
RabbitMQ?的交换机有四种类型:fanout、direct、topic、headers。
Direct,完全匹配型交换机,此种类型交换机,通过RoutingKey路由键将交换机和队列进行绑定,?消息被发送到exchange时,需要根据消息的RoutingKey,来进行匹配,只将消息发送到完全匹配到此RoutingKey的队列。
Fanout,扇出类型交换机,此种交换机,会将消息分发给所有绑定了此交换机的队列,此时RoutingKey参数无效。
Topic,主题类型交换机,此种交换机与Direct类似,也是需要通过routingkey路由键进行匹配分发,区别在于Topic可以进行模糊匹配,Direct是完全匹配。Topic中,将routingkey通过"."来分为多个部分,通过如下功能字符来进行匹配:
"*":代表一个部分
"#":代表一个或多个部分
Headers,headers信息类型交换机,此类型交换机不通过routingkey路由键来分发消息,而是通过消息内容中的headers属性来进行匹配
Queue?:?消息队列,用来保存消息直到发送给消费者。它是消息的容器,也是消息的终点。一个消息可投入一个或多个队列。消息一直在队列里面,等待消费者连接到这个队列将其取走。
Banding?:?绑定,用于消息队列和交换机之间的关联。一个绑定就是基于路由键将交换机和消息队列连接起来的路由规则,所以可以将交换器理解成一个由绑定构成的路由表。
Channel?:?信道,多路复用连接中的一条独立的双向数据流通道。信道是建立在真实的TCP连接内地虚拟链接,AMQP命令都是通过信道发出去的,不管是发布消息、订阅队列还是接收消息,这些动作都是通过信道完成。因为对于操作系统来说,建立和销毁TCP都是非常昂贵的开销,所以引入了信道的概念,以复用一条TCP连接。
Connection?:?网络连接,比如一个TCP连接。
Routing?key?路由键:生产者将消息发送到交换机时,会在消息头上携带一个?key,这个?key就是routing?key,来指定这个消息的路由规则。等同于数据库中的where条件一样
Binding?key?绑定键:在绑定Exchange交换机与Queue队列时,一般会指定一个binding?key,生产者将消息发送给Exchange时,消息头上会携带一个routing?key,当binding?key与routing?key相匹配时,消息将会被路由到对应的Queue中。
GitHub - dpkp/kafka-python: Python client for Apache Kafka
`pip?install?kafka-python`
创建topic
from kafka.admin import KafkaAdminClient, NewTopic
admin_client = KafkaAdminClient(
bootstrap_servers="localhost:9092",
client_id='test'
)
topic_list = []
topic_list.append(NewTopic(name="topic_by_python", num_partitions=3, replication_factor=1))
admin_client.create_topics(new_topics=topic_list, validate_only=False)
消费topic
import json
from kafka import KafkaConsumer
consumer = KafkaConsumer('profile',
group_id='reader_2',
auto_offset_reset='earliest',
max_poll_interval_ms=600000,
request_timeout_ms=605000,
connections_max_idle_ms=700000,
max_poll_records=100,
bootstrap_servers=['localhost:9092'])
count = 0
for data in consumer:
value = data.value
info = json.loads(value)
print(info)
count += 1
if count == 5:
break
consumer.commit() # 读取少量数据才需要这个
往topic生产数据
import json
import datetime
from kafka import KafkaProducer
producer = KafkaProducer(bootstrap_servers=['localhost:9092'], retries=1)
for i in range(10):
data = {'index': i, 'name': datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}
producer.send('profile', json.dumps(data).encode())
print(data)
GitHub - confluentinc/confluent-kafka-python: Confluent's Kafka Python Client
使用文档:https://docs.confluent.io/current/clients/confluent-kafka-python/index.html
Producer
from confluent_kafka import Producer
# bootstrap.servers 对应的服务器地址,可以有多个mybroker
p = Producer({'bootstrap.servers': 'mybroker1,mybroker2'})
def delivery_report(err, msg):
? ?""" Called once for each message produced to indicate delivery result.
? ? ? Triggered by poll() or flush(). """
? ?if err is not None:
? ? ? ?print('Message delivery failed: {}'.format(err))
? ?else:
? ? ? ?print('Message delivered to {} [{}]'.format(msg.topic(), msg.partition()))
for data in some_data_source:
? ?# Trigger any available delivery report callbacks from previous produce() calls
? ?p.poll(0)
? ?# Asynchronously produce a message, the delivery report callback
? ?# will be triggered from poll() above, or flush() below, when the message has
? ?# been successfully delivered or failed permanently.
? ?p.produce('mytopic', data.encode('utf-8'), callback=delivery_report)
# Wait for any outstanding messages to be delivered and delivery report
# callbacks to be triggered.
p.flush()
Consumer
from confluent_kafka import Consumer, KafkaError
c = Consumer({
? ?'bootstrap.servers': 'mybroker', # Producer中对应的bootstrap.servers 对应的服务器地址
? ?'group.id': 'mygroup',
? ?'auto.offset.reset': 'earliest' # topic的一些配置
})
c.subscribe(['mytopic']) # 指定消费的topic,与生产时的topic一致
while True:
? ?msg = c.poll(1.0)
? ?if msg is None:
? ? ? ?continue
? ?if msg.error():
? ? ? ?print("Consumer error: {}".format(msg.error()))
? ? ? ?continue
? ?print('Received message: {}'.format(msg.value().decode('utf-8')))
c.close()