第五章:Python 客户端¶
kafka-python 库¶
安装¶
生产者¶
from kafka import KafkaProducer
import json
# 创建生产者
producer = KafkaProducer(
bootstrap_servers=['localhost:9092'],
key_serializer=str.encode,
value_serializer=lambda v: json.dumps(v).encode('utf-8')
)
# 发送消息
# 简单发送
producer.send('test', value={'name': '张三', 'age': 25})
# 带 key 发送
producer.send('test', key='user_001', value={'name': '张三'})
# 指定分区
producer.send('test', value={'name': '张三'}, partition=0)
# 同步发送
future = producer.send('test', value={'name': '张三'})
result = future.get(timeout=10)
print(f'发送成功: partition={result.partition}, offset={result.offset}')
# 异步回调
def on_send_success(record_metadata):
print(f'发送成功: {record_metadata.topic} {record_metadata.partition} {record_metadata.offset}')
def on_send_error(excp):
print(f'发送失败: {excp}')
producer.send('test', value={'name': '张三'}).add_callback(on_send_success).add_errback(on_send_error)
# 确保消息发送
producer.flush()
# 关闭生产者
producer.close()
消费者¶
from kafka import KafkaConsumer
import json
# 创建消费者
consumer = KafkaConsumer(
'test',
bootstrap_servers=['localhost:9092'],
group_id='my-group',
auto_offset_reset='earliest',
key_deserializer=lambda b: b.decode('utf-8') if b else None,
value_deserializer=lambda b: json.loads(b.decode('utf-8'))
)
# 消费消息
for message in consumer:
print(f'Topic: {message.topic}')
print(f'Partition: {message.partition}')
print(f'Offset: {message.offset}')
print(f'Key: {message.key}')
print(f'Value: {message.value}')
print('---')
消费者配置¶
consumer = KafkaConsumer(
'test',
bootstrap_servers=['localhost:9092'],
group_id='my-group',
# Offset 管理
auto_offset_reset='earliest', # earliest, latest, none
enable_auto_commit=True, # 自动提交
auto_commit_interval_ms=5000, # 提交间隔
# 消费控制
max_poll_records=500, # 单次拉取最大记录数
max_poll_interval_ms=300000, # 拉取间隔
# 心跳配置
session_timeout_ms=10000, # 会话超时
heartbeat_interval_ms=3000, # 心跳间隔
# 反序列化
key_deserializer=lambda b: b.decode('utf-8') if b else None,
value_deserializer=lambda b: json.loads(b.decode('utf-8'))
)
手动提交 Offset¶
consumer = KafkaConsumer(
'test',
bootstrap_servers=['localhost:9092'],
group_id='my-group',
enable_auto_commit=False # 禁用自动提交
)
for message in consumer:
try:
# 处理消息
process_message(message.value)
# 手动提交
consumer.commit()
# 或异步提交
# consumer.commit_async()
except Exception as e:
print(f'处理失败: {e}')
# 不提交 Offset,下次重新消费
指定分区消费¶
from kafka import TopicPartition
# 订阅特定分区
consumer.assign([
TopicPartition('test', 0),
TopicPartition('test', 1)
])
# 从指定 Offset 开始
consumer.seek(TopicPartition('test', 0), 100)
# 从最早开始
consumer.seek_to_beginning(TopicPartition('test', 0))
# 从最新开始
consumer.seek_to_end(TopicPartition('test', 0))
# 按时间查找 Offset
import time
timestamp = int(time.time() * 1000) - 3600000 # 1 小时前
offsets = consumer.offsets_for_times({
TopicPartition('test', 0): timestamp
})
confluent-kafka 库¶
安装¶
生产者¶
from confluent_kafka import Producer
import json
# 配置
conf = {
'bootstrap.servers': 'localhost:9092',
'client.id': 'python-producer'
}
# 创建生产者
producer = Producer(conf)
# 回调函数
def delivery_report(err, msg):
if err is not None:
print(f'发送失败: {err}')
else:
print(f'发送成功: {msg.topic()} [{msg.partition()}] @ {msg.offset()}')
# 发送消息
producer.produce(
'test',
key='user_001',
value=json.dumps({'name': '张三', 'age': 25}),
callback=delivery_report
)
# 等待消息发送完成
producer.flush()
消费者¶
from confluent_kafka import Consumer, KafkaError
import json
# 配置
conf = {
'bootstrap.servers': 'localhost:9092',
'group.id': 'my-group',
'auto.offset.reset': 'earliest'
}
# 创建消费者
consumer = Consumer(conf)
# 订阅 Topic
consumer.subscribe(['test'])
# 消费消息
try:
while True:
msg = consumer.poll(timeout=1.0)
if msg is None:
continue
if msg.error():
if msg.error().code() == KafkaError._PARTITION_EOF:
print(f'到达分区末尾: {msg.topic()} [{msg.partition()}]')
else:
print(f'消费错误: {msg.error()}')
else:
print(f'Topic: {msg.topic()}')
print(f'Partition: {msg.partition()}')
print(f'Offset: {msg.offset()}')
print(f'Key: {msg.key()}')
print(f'Value: {msg.value()}')
except KeyboardInterrupt:
pass
finally:
consumer.close()
事务支持¶
from confluent_kafka import Producer
conf = {
'bootstrap.servers': 'localhost:9092',
'transactional.id': 'my-transactional-id'
}
producer = Producer(conf)
# 初始化事务
producer.init_transactions()
try:
# 开始事务
producer.begin_transaction()
# 发送消息
producer.produce('topic1', value='message1')
producer.produce('topic2', value='message2')
# 提交事务
producer.commit_transaction()
except Exception as e:
# 回滚事务
producer.abort_transaction()
print(f'事务失败: {e}')
producer.flush()
实战示例¶
日志收集¶
import logging
from kafka import KafkaProducer
import json
from datetime import datetime
class KafkaLogHandler(logging.Handler):
def __init__(self, bootstrap_servers, topic):
super().__init__()
self.producer = KafkaProducer(
bootstrap_servers=bootstrap_servers,
value_serializer=lambda v: json.dumps(v).encode('utf-8')
)
self.topic = topic
def emit(self, record):
log_entry = {
'timestamp': datetime.now().isoformat(),
'level': record.levelname,
'logger': record.name,
'message': record.getMessage(),
'module': record.module,
'line': record.lineno
}
self.producer.send(self.topic, value=log_entry)
def close(self):
self.producer.flush()
self.producer.close()
super().close()
# 使用
logger = logging.getLogger('app')
logger.setLevel(logging.INFO)
logger.addHandler(KafkaLogHandler(['localhost:9092'], 'app-logs'))
logger.info('用户登录成功')
logger.error('数据库连接失败')
消息处理管道¶
from kafka import KafkaConsumer, KafkaProducer
import json
from concurrent.futures import ThreadPoolExecutor
class MessagePipeline:
def __init__(self, input_topic, output_topic, bootstrap_servers):
self.consumer = KafkaConsumer(
input_topic,
bootstrap_servers=bootstrap_servers,
group_id='pipeline-group',
auto_offset_reset='earliest',
value_deserializer=lambda m: json.loads(m.decode('utf-8'))
)
self.producer = KafkaProducer(
bootstrap_servers=bootstrap_servers,
value_serializer=lambda v: json.dumps(v).encode('utf-8')
)
self.executor = ThreadPoolExecutor(max_workers=10)
def process_message(self, message):
"""处理消息的逻辑"""
value = message.value
# 示例:数据转换
processed = {
'original': value,
'processed_at': datetime.now().isoformat(),
'status': 'processed'
}
return processed
def run(self):
for message in self.consumer:
# 提交到线程池处理
future = self.executor.submit(self.process_message, message)
# 处理完成后发送到输出 Topic
def on_done(fut, msg=message):
try:
result = fut.result()
self.producer.send('output-topic', value=result)
except Exception as e:
print(f'处理失败: {e}')
future.add_done_callback(on_done)
# 使用
pipeline = MessagePipeline(
input_topic='input-topic',
output_topic='output-topic',
bootstrap_servers=['localhost:9092']
)
pipeline.run()
消费者监控¶
from kafka import KafkaConsumer, TopicPartition
import time
class ConsumerMonitor:
def __init__(self, bootstrap_servers, group_id):
self.bootstrap_servers = bootstrap_servers
self.group_id = group_id
def get_lag(self, topic):
"""获取消费者组延迟"""
consumer = KafkaConsumer(
bootstrap_servers=self.bootstrap_servers,
group_id=self.group_id
)
partitions = consumer.partitions_for_topic(topic)
lag_info = {}
for p in partitions:
tp = TopicPartition(topic, p)
# 获取消费者组 Offset
committed = consumer.committed(tp)
# 获取分区末尾 Offset
end_offset = consumer.end_offsets([tp])[tp]
if committed is not None:
lag = end_offset - committed
lag_info[p] = {
'committed': committed,
'end_offset': end_offset,
'lag': lag
}
consumer.close()
return lag_info
# 使用
monitor = ConsumerMonitor(['localhost:9092'], 'my-group')
lag = monitor.get_lag('test')
print(json.dumps(lag, indent=2))
小结¶
本章学习了:
- ✅ kafka-python 库使用
- ✅ confluent-kafka 库使用
- ✅ 生产者和消费者编程
- ✅ 事务支持
- ✅ 实战示例
下一章¶
第六章:集群部署 - 学习 Kafka 集群部署和高可用配置。