跳转至

第五章:Python 客户端

kafka-python 库

安装

pip install kafka-python

生产者

from kafka import KafkaProducer
import json

# 创建生产者
producer = KafkaProducer(
    bootstrap_servers=['localhost:9092'],
    key_serializer=str.encode,
    value_serializer=lambda v: json.dumps(v).encode('utf-8')
)

# 发送消息
# 简单发送
producer.send('test', value={'name': '张三', 'age': 25})

# 带 key 发送
producer.send('test', key='user_001', value={'name': '张三'})

# 指定分区
producer.send('test', value={'name': '张三'}, partition=0)

# 同步发送
future = producer.send('test', value={'name': '张三'})
result = future.get(timeout=10)
print(f'发送成功: partition={result.partition}, offset={result.offset}')

# 异步回调
def on_send_success(record_metadata):
    print(f'发送成功: {record_metadata.topic} {record_metadata.partition} {record_metadata.offset}')

def on_send_error(excp):
    print(f'发送失败: {excp}')

producer.send('test', value={'name': '张三'}).add_callback(on_send_success).add_errback(on_send_error)

# 确保消息发送
producer.flush()

# 关闭生产者
producer.close()

消费者

from kafka import KafkaConsumer
import json

# 创建消费者
consumer = KafkaConsumer(
    'test',
    bootstrap_servers=['localhost:9092'],
    group_id='my-group',
    auto_offset_reset='earliest',
    key_deserializer=lambda b: b.decode('utf-8') if b else None,
    value_deserializer=lambda b: json.loads(b.decode('utf-8'))
)

# 消费消息
for message in consumer:
    print(f'Topic: {message.topic}')
    print(f'Partition: {message.partition}')
    print(f'Offset: {message.offset}')
    print(f'Key: {message.key}')
    print(f'Value: {message.value}')
    print('---')

消费者配置

consumer = KafkaConsumer(
    'test',
    bootstrap_servers=['localhost:9092'],
    group_id='my-group',

    # Offset 管理
    auto_offset_reset='earliest',      # earliest, latest, none
    enable_auto_commit=True,           # 自动提交
    auto_commit_interval_ms=5000,      # 提交间隔

    # 消费控制
    max_poll_records=500,              # 单次拉取最大记录数
    max_poll_interval_ms=300000,       # 拉取间隔

    # 心跳配置
    session_timeout_ms=10000,          # 会话超时
    heartbeat_interval_ms=3000,        # 心跳间隔

    # 反序列化
    key_deserializer=lambda b: b.decode('utf-8') if b else None,
    value_deserializer=lambda b: json.loads(b.decode('utf-8'))
)

手动提交 Offset

consumer = KafkaConsumer(
    'test',
    bootstrap_servers=['localhost:9092'],
    group_id='my-group',
    enable_auto_commit=False  # 禁用自动提交
)

for message in consumer:
    try:
        # 处理消息
        process_message(message.value)

        # 手动提交
        consumer.commit()

        # 或异步提交
        # consumer.commit_async()

    except Exception as e:
        print(f'处理失败: {e}')
        # 不提交 Offset,下次重新消费

指定分区消费

from kafka import TopicPartition

# 订阅特定分区
consumer.assign([
    TopicPartition('test', 0),
    TopicPartition('test', 1)
])

# 从指定 Offset 开始
consumer.seek(TopicPartition('test', 0), 100)

# 从最早开始
consumer.seek_to_beginning(TopicPartition('test', 0))

# 从最新开始
consumer.seek_to_end(TopicPartition('test', 0))

# 按时间查找 Offset
import time
timestamp = int(time.time() * 1000) - 3600000  # 1 小时前
offsets = consumer.offsets_for_times({
    TopicPartition('test', 0): timestamp
})

confluent-kafka 库

安装

pip install confluent-kafka

生产者

from confluent_kafka import Producer
import json

# 配置
conf = {
    'bootstrap.servers': 'localhost:9092',
    'client.id': 'python-producer'
}

# 创建生产者
producer = Producer(conf)

# 回调函数
def delivery_report(err, msg):
    if err is not None:
        print(f'发送失败: {err}')
    else:
        print(f'发送成功: {msg.topic()} [{msg.partition()}] @ {msg.offset()}')

# 发送消息
producer.produce(
    'test',
    key='user_001',
    value=json.dumps({'name': '张三', 'age': 25}),
    callback=delivery_report
)

# 等待消息发送完成
producer.flush()

消费者

from confluent_kafka import Consumer, KafkaError
import json

# 配置
conf = {
    'bootstrap.servers': 'localhost:9092',
    'group.id': 'my-group',
    'auto.offset.reset': 'earliest'
}

# 创建消费者
consumer = Consumer(conf)

# 订阅 Topic
consumer.subscribe(['test'])

# 消费消息
try:
    while True:
        msg = consumer.poll(timeout=1.0)

        if msg is None:
            continue
        if msg.error():
            if msg.error().code() == KafkaError._PARTITION_EOF:
                print(f'到达分区末尾: {msg.topic()} [{msg.partition()}]')
            else:
                print(f'消费错误: {msg.error()}')
        else:
            print(f'Topic: {msg.topic()}')
            print(f'Partition: {msg.partition()}')
            print(f'Offset: {msg.offset()}')
            print(f'Key: {msg.key()}')
            print(f'Value: {msg.value()}')

except KeyboardInterrupt:
    pass
finally:
    consumer.close()

事务支持

from confluent_kafka import Producer

conf = {
    'bootstrap.servers': 'localhost:9092',
    'transactional.id': 'my-transactional-id'
}

producer = Producer(conf)

# 初始化事务
producer.init_transactions()

try:
    # 开始事务
    producer.begin_transaction()

    # 发送消息
    producer.produce('topic1', value='message1')
    producer.produce('topic2', value='message2')

    # 提交事务
    producer.commit_transaction()

except Exception as e:
    # 回滚事务
    producer.abort_transaction()
    print(f'事务失败: {e}')

producer.flush()

实战示例

日志收集

import logging
from kafka import KafkaProducer
import json
from datetime import datetime

class KafkaLogHandler(logging.Handler):
    def __init__(self, bootstrap_servers, topic):
        super().__init__()
        self.producer = KafkaProducer(
            bootstrap_servers=bootstrap_servers,
            value_serializer=lambda v: json.dumps(v).encode('utf-8')
        )
        self.topic = topic

    def emit(self, record):
        log_entry = {
            'timestamp': datetime.now().isoformat(),
            'level': record.levelname,
            'logger': record.name,
            'message': record.getMessage(),
            'module': record.module,
            'line': record.lineno
        }
        self.producer.send(self.topic, value=log_entry)

    def close(self):
        self.producer.flush()
        self.producer.close()
        super().close()

# 使用
logger = logging.getLogger('app')
logger.setLevel(logging.INFO)
logger.addHandler(KafkaLogHandler(['localhost:9092'], 'app-logs'))

logger.info('用户登录成功')
logger.error('数据库连接失败')

消息处理管道

from kafka import KafkaConsumer, KafkaProducer
import json
from concurrent.futures import ThreadPoolExecutor

class MessagePipeline:
    def __init__(self, input_topic, output_topic, bootstrap_servers):
        self.consumer = KafkaConsumer(
            input_topic,
            bootstrap_servers=bootstrap_servers,
            group_id='pipeline-group',
            auto_offset_reset='earliest',
            value_deserializer=lambda m: json.loads(m.decode('utf-8'))
        )
        self.producer = KafkaProducer(
            bootstrap_servers=bootstrap_servers,
            value_serializer=lambda v: json.dumps(v).encode('utf-8')
        )
        self.executor = ThreadPoolExecutor(max_workers=10)

    def process_message(self, message):
        """处理消息的逻辑"""
        value = message.value

        # 示例:数据转换
        processed = {
            'original': value,
            'processed_at': datetime.now().isoformat(),
            'status': 'processed'
        }

        return processed

    def run(self):
        for message in self.consumer:
            # 提交到线程池处理
            future = self.executor.submit(self.process_message, message)

            # 处理完成后发送到输出 Topic
            def on_done(fut, msg=message):
                try:
                    result = fut.result()
                    self.producer.send('output-topic', value=result)
                except Exception as e:
                    print(f'处理失败: {e}')

            future.add_done_callback(on_done)

# 使用
pipeline = MessagePipeline(
    input_topic='input-topic',
    output_topic='output-topic',
    bootstrap_servers=['localhost:9092']
)
pipeline.run()

消费者监控

from kafka import KafkaConsumer, TopicPartition
import time

class ConsumerMonitor:
    def __init__(self, bootstrap_servers, group_id):
        self.bootstrap_servers = bootstrap_servers
        self.group_id = group_id

    def get_lag(self, topic):
        """获取消费者组延迟"""
        consumer = KafkaConsumer(
            bootstrap_servers=self.bootstrap_servers,
            group_id=self.group_id
        )

        partitions = consumer.partitions_for_topic(topic)
        lag_info = {}

        for p in partitions:
            tp = TopicPartition(topic, p)

            # 获取消费者组 Offset
            committed = consumer.committed(tp)

            # 获取分区末尾 Offset
            end_offset = consumer.end_offsets([tp])[tp]

            if committed is not None:
                lag = end_offset - committed
                lag_info[p] = {
                    'committed': committed,
                    'end_offset': end_offset,
                    'lag': lag
                }

        consumer.close()
        return lag_info

# 使用
monitor = ConsumerMonitor(['localhost:9092'], 'my-group')
lag = monitor.get_lag('test')
print(json.dumps(lag, indent=2))

小结

本章学习了:

  • ✅ kafka-python 库使用
  • ✅ confluent-kafka 库使用
  • ✅ 生产者和消费者编程
  • ✅ 事务支持
  • ✅ 实战示例

下一章

第六章:集群部署 - 学习 Kafka 集群部署和高可用配置。