跳转至

第四章:Python 实现

安装依赖

pip install grpcio grpcio-tools

定义 Proto

// user.proto
syntax = "proto3";

package user;

service UserService {
  rpc GetUser(GetUserRequest) returns (User);
  rpc CreateUser(CreateUserRequest) returns (User);
  rpc ListUsers(ListUsersRequest) returns (stream User);
}

message GetUserRequest {
  int32 user_id = 1;
}

message CreateUserRequest {
  string name = 1;
  string email = 2;
}

message ListUsersRequest {
  int32 page_size = 1;
}

message User {
  int32 id = 1;
  string name = 2;
  string email = 3;
}

编译 Proto

python -m grpc_tools.protoc \
  -I./protos \
  --python_out=./generated \
  --grpc_python_out=./generated \
  ./protos/user.proto

实现服务端

# server.py
from concurrent import futures
import grpc
import user_pb2
import user_pb2_grpc

class UserServiceServicer(user_pb2_grpc.UserServiceServicer):
    """用户服务实现"""

    def __init__(self):
        self.users = {}
        self.next_id = 1

    def GetUser(self, request, context):
        """获取用户"""
        user_id = request.user_id

        if user_id not in self.users:
            context.set_code(grpc.StatusCode.NOT_FOUND)
            context.set_details(f'User {user_id} not found')
            return user_pb2.User()

        return self.users[user_id]

    def CreateUser(self, request, context):
        """创建用户"""
        user = user_pb2.User(
            id=self.next_id,
            name=request.name,
            email=request.email,
        )

        self.users[self.next_id] = user
        self.next_id += 1

        return user

    def ListUsers(self, request, context):
        """列出用户(服务端流)"""
        page_size = request.page_size or 10

        count = 0
        for user in self.users.values():
            if count >= page_size:
                break

            yield user
            count += 1

def serve():
    """启动服务"""
    server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))

    user_pb2_grpc.add_UserServiceServicer_to_server(
        UserServiceServicer(),
        server
    )

    server.add_insecure_port('[::]:50051')
    server.start()

    print("Server started on port 50051")
    server.wait_for_termination()

if __name__ == '__main__':
    serve()

实现客户端

# client.py
import grpc
import user_pb2
import user_pb2_grpc

def run():
    """运行客户端"""
    # 连接服务
    channel = grpc.insecure_channel('localhost:50051')
    stub = user_pb2_grpc.UserServiceStub(channel)

    # 创建用户
    print("Creating user...")
    user = stub.CreateUser(user_pb2.CreateUserRequest(
        name='Alice',
        email='alice@example.com',
    ))
    print(f"Created user: {user}")

    # 获取用户
    print("\nGetting user...")
    user = stub.GetUser(user_pb2.GetUserRequest(
        user_id=user.id,
    ))
    print(f"Got user: {user}")

    # 列出用户
    print("\nListing users...")
    for user in stub.ListUsers(user_pb2.ListUsersRequest(
        page_size=10,
    )):
        print(f"User: {user}")

if __name__ == '__main__':
    run()

异步实现

异步服务端

# async_server.py
import grpc
from grpc import aio
import user_pb2
import user_pb2_grpc

class UserServiceServicer(user_pb2_grpc.UserServiceServicer):
    """异步用户服务实现"""

    async def GetUser(self, request, context):
        """获取用户"""
        # 模拟异步操作
        await asyncio.sleep(0.1)

        return user_pb2.User(
            id=request.user_id,
            name='Alice',
            email='alice@example.com',
        )

    async def ListUsers(self, request, context):
        """列出用户"""
        for i in range(10):
            await asyncio.sleep(0.1)

            yield user_pb2.User(
                id=i,
                name=f'User {i}',
                email=f'user{i}@example.com',
            )

async def serve():
    """启动异步服务"""
    server = aio.server()

    user_pb2_grpc.add_UserServiceServicer_to_server(
        UserServiceServicer(),
        server
    )

    server.add_insecure_port('[::]:50051')
    await server.start()

    print("Async server started on port 50051")
    await server.wait_for_termination()

if __name__ == '__main__':
    import asyncio
    asyncio.run(serve())

异步客户端

# async_client.py
import asyncio
import grpc
from grpc import aio
import user_pb2
import user_pb2_grpc

async def run():
    """运行异步客户端"""
    # 连接服务
    async with aio.insecure_channel('localhost:50051') as channel:
        stub = user_pb2_grpc.UserServiceStub(channel)

        # 创建用户
        user = await stub.CreateUser(user_pb2.CreateUserRequest(
            name='Alice',
            email='alice@example.com',
        ))
        print(f"Created user: {user}")

        # 列出用户
        async for user in stub.ListUsers(user_pb2.ListUsersRequest()):
            print(f"User: {user}")

if __name__ == '__main__':
    asyncio.run(run())

错误处理

from grpc import StatusCode

def GetUser(self, request, context):
    """获取用户(带错误处理)"""
    user_id = request.user_id

    # 参数验证
    if user_id <= 0:
        context.set_code(StatusCode.INVALID_ARGUMENT)
        context.set_details('User ID must be positive')
        return user_pb2.User()

    # 查询用户
    user = get_user_from_db(user_id)

    if user is None:
        context.set_code(StatusCode.NOT_FOUND)
        context.set_details(f'User {user_id} not found')
        return user_pb2.User()

    return user

小结

Python 实现要点:

  • 安装依赖:grpcio、grpcio-tools
  • 编译 Proto:生成 Python 代码
  • 服务端:实现 Servicer、启动服务
  • 客户端:创建 Stub、调用方法
  • 异步:aio 模块

下一章我们将学习 Go 实现。