关于 Kafka 应用开发知识点的整理(一)

在项目中使用 Python 的 kafka 进行开发

Posted by pandaychen on June 29, 2020

0x00 前言

项目中使用 Kafka 作为消息队列,本篇文章就个人使用 Kafka 的一些经验做一些总结。大概从下面几个方面:

  1. Kafka 的基础知识总结
  2. Kakfa 的生产者分区选择
  3. Kafka 的消费者组机制
  4. Kafka 的 offset 机制?
  5. Kafka 的消费延迟问题如何监控和优化?
  6. 关于 Kafka 应用的场景

0x01 Kafka 的基本概念

Kafka 的基础知识可以参见 这篇文章。 Kafka 中,Topic 是逻辑上的概念,而 Partition 是物理上的概念,简而言之:

  • topic:理解为一个消息的集合(或者表),topic 存储在 broker 中,一个 topic 可以有多个 partition 分区,一个 topic 可以有多个 Producer 来 push 消息,一个 topic 可以有多个消费者向其 pull 消息,一个 topic 可以存在一个或多个 broker 中。写入不同的 topic 即写入不同的表
  • partition:是 topic 的子集(topic 下的物理分组),每个 partition 是一个有序的队列(大文件)。不同 partition 分配在不同的 broker 上进行水平扩展从而增加 kafka 并行处理能力,同 topic 下的不同分区信息是不同的,同一分区信息是有序的;每一个分区都有一个或者多个副本,其中会选举一个 leader,fowller 从 leader 拉取数据更新自己的 log(每个分区逻辑上对应一个 log 文件夹),消费者向 leader 中 pull 信息;partition 中每一条消息都有一个有序的 offset

Offset 机制

Partition 中的每条消息都会按照时间顺序分配到一个单调递增的顺序编号(Offset),该编号可以唯一的定位当前分区的每一条消息,一般有两种 Offset:

1、生产者 offset
生产者的 offset 更新由 Kafka 完成,根据客户端设置的分区写入策略写入给定的 partition 中 kafka-producer-offset

2、消费者 offset
在 consumer group 机制中,每个 consumer 实例都会为它消费的 partition 维护属于自己的位置信息来记录当前消费了多少条信息。consumer group 使用 Kafka 的内部 topic __consumer_offsets 保存 offset 信息。由于一个 partition 只能固定的交给一个 consumer group 中的一个 consumer 消费,因此 Kafka 保存 offset 时并不直接为每个 consumer 保存,而是以 <group.id-topic-partition -> offset> 的格式保存。

kafka-consumer-offset

消费者 offset 的意义,个人理解有两点:

  • 开发者可以主动告知 Kafka,已经成功消费了哪些 offset 上的数据(消费完了 commit)
  • Kafka 反馈开发者,当前 partition 的消费位移到哪了(在此 offset 之前的数据开发者可能不需要)

正是由于消费者 offset 机制的存在,会导致重复消费及数据丢失这两种异常的 case,后文再讨论如何优化这些问题。

kafka-comsumer-offset

kafka 的生产消费模型

kafka-model

0x02 Kafka 生产者 Producer

Kakfa 的生产者流程如下: image

Producer 采用 push 的方式将消息发到 broker 上进行存储,然后由 consumer 采用 pull 模式订阅并消费消息。

kafka-producer-1

存储结构的要素

  • Msg: 消息,每个 msg 在 topic 下的不同 partiton 仅有一份,在 partition 中有一个唯一的 offset 用于定位
  • Offset:一个逻辑意义上的偏移,用于区分每一条消息,producer 可认为 partition 是一个大的串行文件,msg 存储时被分配了唯一的 offset
  • Replica: 副本,partition 的数据冗余备份,用于实现分布式的数据可靠性,但引入了不同副本间的数据一致性问题,带来了一定的复杂度
  • Leader/Follower: replica 的角色,leader replica 用来提供该 partition 的读写服务。Follower 不停的从 leader 侧同步写入的消息。它们之间的消息状态采用一致性策略来解决

当 Producer 发送一条消息(某个 topic 下的)到 broker 中, 会根据分配 partition 规则选择被存储到哪一个 partition(基于 partition 分区的选择),消息会均匀的分布到不同的 partition 里,这样就实现了水平扩展。另外,partition 本身作为文件,可以有多个分布于不同 broker 上的副本 replica(leader/follower),这样就实现了 Msg 的多副本。

kafka-partition

0x03 Kafka 消费者(组)

消费者组

消费者组(Consumer Group)是由一个或多个消费者实例组成的 Group,具有可扩展性和可容错性的一种机制。消费者组内的 consumer 共享一个消费者组 Id(Group Id),组内的 consumer 共同对一个 topic 进行订阅和消费,同一个 group 中的 consumer 只能消费一个 partition 的消息,多余的 consumer 会空闲。

  1. 一个消费者群组消费一个主题中的消息,这种消费模式又称为点对点的消费方式,点对点的消费方式又被称为消息队列
  2. 一个主题中的消息被多个消费者群组共同消费,这种消费模式又称为发布 - 订阅模式

分区 Rebalance

0x04 Kafka 的生产者分区选择

本小节解决的问题是:生产者发送数据到哪个分区?如何体现生产者的扩展性?

生产者在生产数据的时候,可以为每条消息指定 Key,这样消息被发送到 broker 时,会根据分区规则选择被存储到哪一个分区中,如果分区规则设置的合理,那么所有的消息将会被均匀的分布到不同的分区中,这样就实现了负载均衡和水平扩展。

Kafka 的 Partition 发送规则是:

  1. 如果不指定 Message 的 Key,则一般采用顺序轮询各个 Partition 的方式;当然也可以使用自定义策略
  2. 如果指定了 Message 的 Key,则会根据消息的 hash 值和 topic 的分区数取模来获取 Partition 如果应用有消息顺序性(Message 有先后之分)的需要,则可以通过指定 Message 的 Key 和自定义分区类来将符合某种规则的消息发送到同一个分区。同一个分区消息是有序的,同一个分区只有一个消费者就可以保证消息的顺序性消费。

比如,在实现 OpenSSH 终端会话(Session)审计时,生产者端为每条消息都按照 SessionId 指定 key 值,这样可以保证每个会话的数据一定被顺序性消费的。

msg := &sarama.ProducerMessage{
        Topic: topic_name,
        Value: sarama.ByteEncoder(sendmsg),
        //Key:   nil,		// 如不指定 key, 则按照轮询方式写入每个 partition
        Key: sarama.StringEncoder(sessionid), // 指定 sessionid,将相同会话写入一个 partition
}

0x05 Kafka 的消费者组的意义

  1. 消费者组机制提供了 Kafka 消费端的高可用,实际项目中强烈推荐以此方式来部署消费端。
  2. 消费者组实现了一种发布 - 订阅模式的概念

0x06 消费 Lag 计算及意义

为什么要(如何)计算 Kafka 的消费延迟?这个问题是我在腾讯 T9 升 T10 的评审们提问过的。使用消息队列,在消费端一定要处理好这个问题。当生产者大批量写入数据时,Kafka Consumer 消费会存在延迟情况,我们需要查看消息堆积情况,就是所谓的消息 Lag。原因可能是因为消费端性能,亦或是消费端的消费速度本来就比较慢。

此时监控消费 Lag 的指标,非常有意义,是否需要对消费端进行扩容等等。那么如何计算消费 Lag 呢?

计算 Lag

在计算 Lag 之前先普及几个基本概念:

  • LEO(LOG-END-OFFSET): 这里主要是指堆 Consumer 可见的 Offset. 即 HW(High Watermark)
  • CURRENT-OFFSET: Consumer 消费到的具体位移 从此二者意义可知 \(Lag=LEO-CURRENT-OFFSET\),计算出来的值即为消费延迟的差值。

这里首先要记得,消费 Lag 是针对于某个消费者组 Id 的 Lag,通过官方方包里提供的 kafka-consumer-groups.sh 工具即可获取此值:

/data/home/user00/application/kafka_2.12-2.2.0/bin/kafka-consumer-groups.sh --describe --bootstrap-server 1.2.3.4:19092  --group group_name

比如,项目中的一个消费者组的 Lag 数据如下: image

TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET     LAG             CONSUMER-ID                                                 HOST            CLIENT-ID
topidc1             0          1057030         1057030         0               kafka-python-1.4.7-2d802e7f-157d-4768-8a1d-3b7727724c71     /1.1.1.1   kafka-python-1.4.7
topidc1             1          1118889         1118889         0               kafka-python-1.4.7.dev-fe05daa9-a6f3-47c5-8801-2a28b6a4b7a5 /2.2.2.2    kafka-python-1.4.7.dev

计算 LAG 的方法 1 – Broker

Broker 消费方式 offset 获取的实现思路为:

  1. 根据 Topic 获取消费该 Topic 的 Group 信息
  2. 通过 Consumer-API 读取 Broker 上指定 Group 和 Topic 的消费情况,可以获取到 clientId,CURRENT-OFFSET,patitionhost 等数据
  3. 通过 Consumer-API 获取 LogEndOffset(可见 offset)
  4. 将上面 2 处信息合并,计算 Lag

计算 LAG 的方法 2 –Zookeeper

Zookeeper 消费方式 offset 获取的 实现思路为:

  1. 根据 topic 获取消费该 topic 的 group
  2. 读取 Zookeeper 上指定 group 和 topic 的消费情况,可以获取到 clientId,CURRENT-OFFSET,patition
  3. 通过 Consumer-API 获取 LogEndOffset(可见 offset)
  4. 将上面 2 处信息合并,计算 Lag

下面给出一段计算 Lag 的 Python 脚本:

#coding=utf-8

from kafka import SimpleClient, KafkaConsumer
from kafka.common import OffsetRequestPayload, TopicPartition

def get_topic_offset(brokers, topic):
    """
    获取一个 topic 的 offset 值的和
    """
    client = SimpleClient(brokers)
    partitions = client.topic_partitions[topic]
    offset_requests = [OffsetRequestPayload(topic, p, -1, 1) for p in partitions.keys()]
    offsets_responses = client.send_offset_request(offset_requests)
    return sum([r.offsets[0] for r in offsets_responses])


def get_group_offset(brokers, group_id, topic):
    """
    获取一个 topic 特定 group 已经消费的 offset 值的和
    """
    consumer = KafkaConsumer(bootstrap_servers=brokers,group_id=group_id)
    pts = [TopicPartition(topic=topic, partition=i) for i in consumer.partitions_for_topic(topic)]
    result = consumer._coordinator.fetch_committed_offsets(pts)
    return sum([r.offset for r in result.values()])


if __name__ == '__main__':
    topic_offset = get_topic_offset("kafka-cluster-addrlist", "topic_name")
    group_offset = get_group_offset("kafka-cluster-addrlist", "group_name", "topic_name")
    lag = topic_offset - group_offset
    print topic_offset,group_offset,lag

0x07 重复消费

由于数据存储在 Kafka 中,我们可以根据某个 Offset 来实现固定 Offset 区间的消费:这里注意,consumer_start_offset 一定要大于等于 consumer.beginning_offsets(consumer.assignment()) 计算出来的分区的起始 offset

#coding=utf-8

import os
from kafka import KafkaConsumer
from kafka.structs import TopicPartition

consumer = KafkaConsumer('topic_name', bootstrap_servers='1.1.1.1:19092,2.2.2.2:19092,3.3.3.3:19092')

#指定消费者开始的 offset 和结束的 offset
partition_id = 0
consumer_start_offset = 0
consumer_end_offset = 200000

print consumer.partitions_for_topic("topic_name") #获取 test 主题的分区信息
print consumer.topics() #获取主题列表
print consumer.subscription() #获取当前消费者订阅的主题
print consumer.assignment() #获取当前消费者 topic、分区信息
print "************************"
print consumer.beginning_offsets(consumer.assignment()) #获取当前消费者可消费的偏移量
#consumer.seek(TopicPartition(topic=u'test', partition=0), 5) #重置偏移量,从第 5 个偏移量消费
consumer.seek(TopicPartition(topic=u'topic_name', partition=partition_id), consumer_start_offset) #重置偏移量,从第 5 个偏移量消费
count = 0
for message in consumer:
	if  message.offset> consumer_end_offset:
		break
	count+=1
	print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition, message.offset, message.key, message.value))

print count

0x08 Python-kafka 库的基础用法

生产者

Kafka 的生产者代码如下:其中调用了 send 方法向 Kafka 写入数据,该方法的 原型如下send(topic, value=None, key=None, headers=None, partition=None, timestamp_ms=None)

from kafka import KafkaProducer
from kafka.errors import KafkaError
import json

class KafkaProducer():
    def __init__(self, kafka_serverlist, kafkatopic):
        self.kafkatopic = kafkatopic
		self.serverslist = kafka_serverlist
        self.producer = KafkaProducer(bootstrap_servers = kafka_serverlist)

    def sendjsondata(self, params, key):
        try:
            parmas_message = json.dumps(params)
            producer = self.producer
            producer.send(self.kafkatopic, key=key, parmas_message.encode('utf-8'))
            producer.flush()
        except KafkaError as e:
            print e

消费者

KafkaConsumer 的文档 在此。

from kafka import KafkaConsumer

class Kafka_consumer():
    '''
    使用 Kafka—python 的消费模块
    '''

    def __init__(self, kafkahost, kafkaport, kafkatopic, groupid):
        self.kafkaHost = kafkahost
        self.kafkaPort = kafkaport
        self.kafkatopic = kafkatopic
        self.groupid = groupid
        self.consumer = KafkaConsumer(self.kafkatopic, group_id = self.groupid,
                                      bootstrap_servers = '{kafka_host}:{kafka_port}'.format(
            kafka_host=self.kafkaHost,
            kafka_port=self.kafkaPort ))

    def consume_data(self):
        try:
            #其中返回的 message 为一个生成器
            #其中元素的 type 为 <class 'kafka.consumer.fetcher.ConsumerRecord'>
            for message in self.consumer:
                # print json.loads(message.value)
                yield message
        except KeyboardInterrupt, e:
            print e

测试

(TO BE CONTINUED)

def main():
    '''
    测试 consumer 和 producer
    :return:
    '''
    ## 测试生产模块
    #producer = Kafka_producer("127.0.0.1", 9092, "ranktest")
    #for id in range(10):
    #    params = '{abetst}:{null}---'+str(i)
    #    producer.sendjsondata(params)
    ## 测试消费模块
    #消费模块的返回格式为 ConsumerRecord(topic=u'ranktest', partition=0, offset=202, timestamp=None,
    #\timestamp_type=None, key=None, value='"{abetst}:{null}---0"', checksum=-1868164195,
    #\serialized_key_size=-1, serialized_value_size=21)
    consumer = Kafka_consumer('127.0.0.1', 9092, "ranktest", 'test-python-ranktest')
    message = consumer.consume_data()
    for i in message:
        print i.value

0x09 Kafka 的应用场景

项目使用的 Kafka 整体模型如下图所示,N 个生产者,针对每个 Topic 默认开启 2 个消费者组来消费: image

0x0A 参考

转载请注明出处,本文采用 CC4.0 协议授权