? Kafka是最初由linkedin公司开发的,使用scala语言编写,kafka是一个分布式,分区的,多副本的,多订阅者的消息队列系统。
? 常见的消息队列:RabbitMQ,Redis ,zeroMQ ,ActiveMQ
Kafka的优势:
Broker:kafka集群中包含一个或者多个服务实例,这种服务实例被称为Broker
Topic:每条发布到kafka集群的消息都有一个类别,这个类别就叫做Topic
Partition:Partition是一个物理上的概念,每个Topic包含一个或者多个Partition
Producer:负责发布消息到kafka的Broker中。
Consumer:消息消费者,向kafka的broker中读取消息的客户端
Consumer Group:每一个Consumer属于一个特定的Consumer Group(可以为每个Consumer指定 groupName)
//可根据主题和内容发送
public ProducerRecord(String topic, V value)
//根据主题,key、内容发送
public ProducerRecord(String topic, K key, V value)
//根据主题、分区、key、内容发送
public ProducerRecord(String topic, Integer partition, K key, V value)
//根据主题、分区、时间戳、key,内容发送
public ProducerRecord(String topic, Integer partition, Long timestamp, K key, V value)
如果没有指定分区号,也没有指定具体的key(轮询)
如果没有指定分区号,指定了具体的key(hash)
前缀+date.getTime() fixlog_1564388581914
如果指定了具体的分区号,(按照分区号)
自定义分区
解决:设置 -1 保证produce 写入所有副本算成功 producer.type = sync request.required.acks=-1/all
解决:不限制阻塞超时时间。就是一满生产者就阻
? broker采用分片副本机制,保证数据高可用。
拿到数据后,存储到hbase中或者mysql中,如果hbase或者mysql在这个时候连接不上,就会抛出异常,如果在处理数据的时候已经进行了提交,那么kafka上的o?set值已经进行了修改了,但是hbase或者mysql中没有数据,这个时候就会出现数据丢失。 主要是因为offset提交使用了异步提交。
解决方案
第一步:通过offset确定数据保存在哪一个segment里面了,
第二步:查找对应的segment里面的index文件 。index文件都是key/value对的。key表示数据在log文件里面的顺序是第几条。value记录了这一条数据在全局的标号。如果能够直接找到对应的offset直接去获取对应的数据即可
? 如果index文件里面没有存储offset,就会查找offset最近的那一个offset,例如查找offset为7的数据找不到,那么就会去查找offset为6对应的数据,找到之后,再取下一条数据就是offset为7的数据。
earliest
latest
none
topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常
latest 这个设置容易丢失消息,假如Kafka出现问题,还有数据往topic中写,这个时候重启Kafka,这个设置会从最新的offset开始消费,中间出问题的哪些就不管了。