第四章:Python 实现¶
安装依赖¶
定义 Proto¶
// user.proto
syntax = "proto3";
package user;
service UserService {
rpc GetUser(GetUserRequest) returns (User);
rpc CreateUser(CreateUserRequest) returns (User);
rpc ListUsers(ListUsersRequest) returns (stream User);
}
message GetUserRequest {
int32 user_id = 1;
}
message CreateUserRequest {
string name = 1;
string email = 2;
}
message ListUsersRequest {
int32 page_size = 1;
}
message User {
int32 id = 1;
string name = 2;
string email = 3;
}
编译 Proto¶
python -m grpc_tools.protoc \
-I./protos \
--python_out=./generated \
--grpc_python_out=./generated \
./protos/user.proto
实现服务端¶
# server.py
from concurrent import futures
import grpc
import user_pb2
import user_pb2_grpc
class UserServiceServicer(user_pb2_grpc.UserServiceServicer):
"""用户服务实现"""
def __init__(self):
self.users = {}
self.next_id = 1
def GetUser(self, request, context):
"""获取用户"""
user_id = request.user_id
if user_id not in self.users:
context.set_code(grpc.StatusCode.NOT_FOUND)
context.set_details(f'User {user_id} not found')
return user_pb2.User()
return self.users[user_id]
def CreateUser(self, request, context):
"""创建用户"""
user = user_pb2.User(
id=self.next_id,
name=request.name,
email=request.email,
)
self.users[self.next_id] = user
self.next_id += 1
return user
def ListUsers(self, request, context):
"""列出用户(服务端流)"""
page_size = request.page_size or 10
count = 0
for user in self.users.values():
if count >= page_size:
break
yield user
count += 1
def serve():
"""启动服务"""
server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
user_pb2_grpc.add_UserServiceServicer_to_server(
UserServiceServicer(),
server
)
server.add_insecure_port('[::]:50051')
server.start()
print("Server started on port 50051")
server.wait_for_termination()
if __name__ == '__main__':
serve()
实现客户端¶
# client.py
import grpc
import user_pb2
import user_pb2_grpc
def run():
"""运行客户端"""
# 连接服务
channel = grpc.insecure_channel('localhost:50051')
stub = user_pb2_grpc.UserServiceStub(channel)
# 创建用户
print("Creating user...")
user = stub.CreateUser(user_pb2.CreateUserRequest(
name='Alice',
email='alice@example.com',
))
print(f"Created user: {user}")
# 获取用户
print("\nGetting user...")
user = stub.GetUser(user_pb2.GetUserRequest(
user_id=user.id,
))
print(f"Got user: {user}")
# 列出用户
print("\nListing users...")
for user in stub.ListUsers(user_pb2.ListUsersRequest(
page_size=10,
)):
print(f"User: {user}")
if __name__ == '__main__':
run()
异步实现¶
异步服务端¶
# async_server.py
import grpc
from grpc import aio
import user_pb2
import user_pb2_grpc
class UserServiceServicer(user_pb2_grpc.UserServiceServicer):
"""异步用户服务实现"""
async def GetUser(self, request, context):
"""获取用户"""
# 模拟异步操作
await asyncio.sleep(0.1)
return user_pb2.User(
id=request.user_id,
name='Alice',
email='alice@example.com',
)
async def ListUsers(self, request, context):
"""列出用户"""
for i in range(10):
await asyncio.sleep(0.1)
yield user_pb2.User(
id=i,
name=f'User {i}',
email=f'user{i}@example.com',
)
async def serve():
"""启动异步服务"""
server = aio.server()
user_pb2_grpc.add_UserServiceServicer_to_server(
UserServiceServicer(),
server
)
server.add_insecure_port('[::]:50051')
await server.start()
print("Async server started on port 50051")
await server.wait_for_termination()
if __name__ == '__main__':
import asyncio
asyncio.run(serve())
异步客户端¶
# async_client.py
import asyncio
import grpc
from grpc import aio
import user_pb2
import user_pb2_grpc
async def run():
"""运行异步客户端"""
# 连接服务
async with aio.insecure_channel('localhost:50051') as channel:
stub = user_pb2_grpc.UserServiceStub(channel)
# 创建用户
user = await stub.CreateUser(user_pb2.CreateUserRequest(
name='Alice',
email='alice@example.com',
))
print(f"Created user: {user}")
# 列出用户
async for user in stub.ListUsers(user_pb2.ListUsersRequest()):
print(f"User: {user}")
if __name__ == '__main__':
asyncio.run(run())
错误处理¶
from grpc import StatusCode
def GetUser(self, request, context):
"""获取用户(带错误处理)"""
user_id = request.user_id
# 参数验证
if user_id <= 0:
context.set_code(StatusCode.INVALID_ARGUMENT)
context.set_details('User ID must be positive')
return user_pb2.User()
# 查询用户
user = get_user_from_db(user_id)
if user is None:
context.set_code(StatusCode.NOT_FOUND)
context.set_details(f'User {user_id} not found')
return user_pb2.User()
return user
小结¶
Python 实现要点:
- 安装依赖:grpcio、grpcio-tools
- 编译 Proto:生成 Python 代码
- 服务端:实现 Servicer、启动服务
- 客户端:创建 Stub、调用方法
- 异步:aio 模块
下一章我们将学习 Go 实现。