跳转至

第六章:FastAPI 微服务实战

项目结构

fastapi-nacos-microservices/
├── common/                    # 公共模块
│   ├── __init__.py
│   ├── nacos_client.py       # Nacos 客户端封装
│   ├── config.py             # 配置管理
│   └── service_discovery.py  # 服务发现
├── user-service/             # 用户服务
│   ├── __init__.py
│   ├── main.py
│   ├── config.py
│   ├── models.py
│   ├── schemas.py
│   └── routers/
│       └── user.py
├── order-service/            # 订单服务
│   ├── __init__.py
│   ├── main.py
│   ├── config.py
│   ├── models.py
│   ├── schemas.py
│   └── routers/
│       └── order.py
├── gateway/                  # API 网关
│   ├── __init__.py
│   ├── main.py
│   └── routes.py
├── docker-compose.yml
└── requirements.txt

公共模块

Nacos 客户端封装

# common/nacos_client.py
import nacos
import socket
import threading
import time
import logging
from typing import Optional, Dict, Any
from contextlib import contextmanager

logger = logging.getLogger(__name__)


class NacosClient:
    """Nacos 客户端封装"""

    _instance: Optional['NacosClient'] = None
    _lock = threading.Lock()

    def __new__(cls, *args, **kwargs):
        """单例模式"""
        if cls._instance is None:
            with cls._lock:
                if cls._instance is None:
                    cls._instance = super().__new__(cls)
        return cls._instance

    def __init__(
        self,
        server_addresses: str,
        namespace: str = 'public',
        username: str = 'nacos',
        password: str = 'nacos'
    ):
        if hasattr(self, '_initialized') and self._initialized:
            return

        self.client = nacos.NacosClient(
            server_addresses=server_addresses,
            namespace=namespace,
            username=username,
            password=password
        )
        self.namespace = namespace
        self._initialized = True
        logger.info(f"Nacos 客户端初始化完成: {server_addresses}")

    @staticmethod
    def get_local_ip() -> str:
        """获取本机 IP"""
        try:
            s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
            s.connect(('8.8.8.8', 80))
            ip = s.getsockname()[0]
            s.close()
            return ip
        except Exception:
            return '127.0.0.1'

    def register_service(
        self,
        service_name: str,
        port: int,
        ip: str = None,
        weight: float = 1.0,
        metadata: Dict[str, Any] = None,
        group_name: str = 'DEFAULT_GROUP',
        ephemeral: bool = True
    ) -> bool:
        """注册服务"""
        ip = ip or self.get_local_ip()
        metadata = metadata or {}

        try:
            self.client.add_naming_instance(
                service_name=service_name,
                ip=ip,
                port=port,
                weight=weight,
                metadata=metadata,
                group_name=group_name,
                ephemeral=ephemeral
            )
            logger.info(f"服务注册成功: {service_name} @ {ip}:{port}")
            return True
        except Exception as e:
            logger.error(f"服务注册失败: {e}")
            return False

    def deregister_service(
        self,
        service_name: str,
        port: int,
        ip: str = None,
        group_name: str = 'DEFAULT_GROUP'
    ) -> bool:
        """注销服务"""
        ip = ip or self.get_local_ip()

        try:
            self.client.remove_naming_instance(
                service_name=service_name,
                ip=ip,
                port=port,
                group_name=group_name
            )
            logger.info(f"服务注销成功: {service_name}")
            return True
        except Exception as e:
            logger.error(f"服务注销失败: {e}")
            return False

    def get_instances(
        self,
        service_name: str,
        group_name: str = 'DEFAULT_GROUP',
        clusters: str = None,
        healthy_only: bool = True
    ) -> list:
        """获取服务实例列表"""
        try:
            result = self.client.list_naming_instance(
                service_name=service_name,
                group_name=group_name,
                clusters=clusters,
                healthy_only=healthy_only
            )
            return result.get('hosts', [])
        except Exception as e:
            logger.error(f"获取服务实例失败: {e}")
            return []

    def get_config(
        self,
        data_id: str,
        group: str = 'DEFAULT_GROUP'
    ) -> Optional[str]:
        """获取配置"""
        try:
            return self.client.get_config(
                data_id=data_id,
                group=group
            )
        except Exception as e:
            logger.error(f"获取配置失败: {e}")
            return None

    def publish_config(
        self,
        data_id: str,
        content: str,
        group: str = 'DEFAULT_GROUP',
        config_type: str = 'yaml'
    ) -> bool:
        """发布配置"""
        try:
            return self.client.publish_config(
                data_id=data_id,
                group=group,
                content=content,
                config_type=config_type
            )
        except Exception as e:
            logger.error(f"发布配置失败: {e}")
            return False


class ServiceRegistrar:
    """服务注册器(带心跳)"""

    def __init__(
        self,
        nacos_client: NacosClient,
        service_name: str,
        port: int,
        heartbeat_interval: int = 5
    ):
        self.nacos = nacos_client
        self.service_name = service_name
        self.port = port
        self.ip = nacos_client.get_local_ip()
        self.heartbeat_interval = heartbeat_interval
        self._running = False
        self._thread: Optional[threading.Thread] = None

    def start(self):
        """启动服务注册"""
        # 注册服务
        self.nacos.register_service(
            service_name=self.service_name,
            ip=self.ip,
            port=self.port
        )

        # 启动心跳线程
        self._running = True
        self._thread = threading.Thread(target=self._heartbeat, daemon=True)
        self._thread.start()
        logger.info(f"服务注册器启动: {self.service_name}")

    def stop(self):
        """停止服务注册"""
        self._running = False
        self.nacos.deregister_service(
            service_name=self.service_name,
            ip=self.ip,
            port=self.port
        )
        logger.info(f"服务注册器停止: {self.service_name}")

    def _heartbeat(self):
        """心跳线程"""
        while self._running:
            try:
                self.nacos.client.send_heartbeat(
                    service_name=self.service_name,
                    ip=self.ip,
                    port=self.port
                )
            except Exception as e:
                logger.warning(f"心跳发送失败: {e}")
            time.sleep(self.heartbeat_interval)


@contextmanager
def nacos_service(
    server_addresses: str,
    namespace: str,
    service_name: str,
    port: int
):
    """Nacos 服务上下文管理器"""
    client = NacosClient(
        server_addresses=server_addresses,
        namespace=namespace
    )
    registrar = ServiceRegistrar(client, service_name, port)

    try:
        registrar.start()
        yield client
    finally:
        registrar.stop()

配置管理

# common/config.py
import yaml
import json
import logging
from typing import Any, Dict, Optional
from pydantic import BaseSettings
from pydantic.fields import Field

logger = logging.getLogger(__name__)


class NacosConfigSettings(BaseSettings):
    """Nacos 配置"""

    nacos_server: str = Field('localhost:8848', env='NACOS_SERVER')
    nacos_namespace: str = Field('dev', env='NACOS_NAMESPACE')
    nacos_username: str = Field('nacos', env='NACOS_USERNAME')
    nacos_password: str = Field('nacos', env='NACOS_PASSWORD')
    nacos_group: str = Field('DEFAULT_GROUP', env='NACOS_GROUP')

    class Config:
        env_file = '.env'
        env_file_encoding = 'utf-8'


class ConfigManager:
    """配置管理器"""

    def __init__(self, nacos_client, data_id: str, group: str = 'DEFAULT_GROUP'):
        self.nacos = nacos_client
        self.data_id = data_id
        self.group = group
        self._config: Dict[str, Any] = {}
        self._callbacks = []

    def load(self) -> Dict[str, Any]:
        """加载配置"""
        content = self.nacos.get_config(
            data_id=self.data_id,
            group=self.group
        )

        if content:
            # 根据文件类型解析
            if self.data_id.endswith('.yaml') or self.data_id.endswith('.yml'):
                self._config = yaml.safe_load(content) or {}
            elif self.data_id.endswith('.json'):
                self._config = json.loads(content)
            else:
                self._config = {'content': content}

            logger.info(f"配置加载成功: {self.data_id}")
        else:
            logger.warning(f"配置不存在: {self.data_id}")

        return self._config

    def get(self, key: str, default: Any = None) -> Any:
        """获取配置项"""
        keys = key.split('.')
        value = self._config

        for k in keys:
            if isinstance(value, dict):
                value = value.get(k)
            else:
                return default

        return value if value is not None else default

    def reload(self):
        """重新加载配置"""
        old_config = self._config.copy()
        self.load()

        # 触发回调
        for callback in self._callbacks:
            try:
                callback(old_config, self._config)
            except Exception as e:
                logger.error(f"配置回调执行失败: {e}")

    def on_change(self, callback):
        """注册配置变更回调"""
        self._callbacks.append(callback)

服务发现

# common/service_discovery.py
import random
import logging
from typing import List, Dict, Optional
from .nacos_client import NacosClient

logger = logging.getLogger(__name__)


class ServiceDiscovery:
    """服务发现"""

    def __init__(self, nacos_client: NacosClient):
        self.nacos = nacos_client
        self._cache: Dict[str, List[Dict]] = {}
        self._round_robin_index: Dict[str, int] = {}

    def get_instance(
        self,
        service_name: str,
        group_name: str = 'DEFAULT_GROUP',
        strategy: str = 'random'
    ) -> Optional[Dict]:
        """获取单个服务实例"""
        instances = self.nacos.get_instances(
            service_name=service_name,
            group_name=group_name
        )

        if not instances:
            logger.warning(f"没有可用的服务实例: {service_name}")
            return None

        if strategy == 'random':
            return random.choice(instances)
        elif strategy == 'round_robin':
            return self._round_robin(service_name, instances)
        elif strategy == 'weighted':
            return self._weighted_random(instances)
        else:
            return instances[0]

    def get_all_instances(
        self,
        service_name: str,
        group_name: str = 'DEFAULT_GROUP'
    ) -> List[Dict]:
        """获取所有服务实例"""
        return self.nacos.get_instances(
            service_name=service_name,
            group_name=group_name
        )

    def get_url(
        self,
        service_name: str,
        path: str = '',
        group_name: str = 'DEFAULT_GROUP',
        strategy: str = 'random'
    ) -> Optional[str]:
        """获取服务 URL"""
        instance = self.get_instance(service_name, group_name, strategy)
        if instance:
            return f"http://{instance['ip']}:{instance['port']}{path}"
        return None

    def _round_robin(self, service_name: str, instances: List[Dict]) -> Dict:
        """轮询选择"""
        if service_name not in self._round_robin_index:
            self._round_robin_index[service_name] = 0

        index = self._round_robin_index[service_name] % len(instances)
        self._round_robin_index[service_name] += 1
        return instances[index]

    def _weighted_random(self, instances: List[Dict]) -> Dict:
        """加权随机选择"""
        total_weight = sum(inst.get('weight', 1.0) for inst in instances)
        r = random.uniform(0, total_weight)
        current_weight = 0

        for instance in instances:
            current_weight += instance.get('weight', 1.0)
            if current_weight >= r:
                return instance

        return instances[-1]


class ServiceProxy:
    """服务代理(用于服务间调用)"""

    def __init__(
        self,
        discovery: ServiceDiscovery,
        service_name: str,
        group_name: str = 'DEFAULT_GROUP'
    ):
        self.discovery = discovery
        self.service_name = service_name
        self.group_name = group_name

    def get_base_url(self) -> Optional[str]:
        """获取基础 URL"""
        return self.discovery.get_url(
            service_name=self.service_name,
            group_name=self.group_name
        )

    async def request(
        self,
        method: str,
        path: str,
        **kwargs
    ):
        """发送请求"""
        import httpx

        base_url = self.get_base_url()
        if not base_url:
            raise Exception(f"服务不可用: {self.service_name}")

        url = f"{base_url}{path}"

        async with httpx.AsyncClient() as client:
            response = await client.request(method, url, **kwargs)
            return response

    async def get(self, path: str, **kwargs):
        """GET 请求"""
        return await self.request('GET', path, **kwargs)

    async def post(self, path: str, **kwargs):
        """POST 请求"""
        return await self.request('POST', path, **kwargs)

    async def put(self, path: str, **kwargs):
        """PUT 请求"""
        return await self.request('PUT', path, **kwargs)

    async def delete(self, path: str, **kwargs):
        """DELETE 请求"""
        return await self.request('DELETE', path, **kwargs)

用户服务

主程序

# user-service/main.py
import logging
from contextlib import asynccontextmanager
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware

from common.nacos_client import NacosClient, ServiceRegistrar
from common.config import NacosConfigSettings, ConfigManager
from common.service_discovery import ServiceDiscovery

from routers import user

# 配置日志
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)

# 配置
settings = NacosConfigSettings()

# 全局变量
nacos_client: NacosClient = None
registrar: ServiceRegistrar = None
config_manager: ConfigManager = None


@asynccontextmanager
async def lifespan(app: FastAPI):
    """应用生命周期"""
    global nacos_client, registrar, config_manager

    # 初始化 Nacos 客户端
    nacos_client = NacosClient(
        server_addresses=settings.nacos_server,
        namespace=settings.nacos_namespace,
        username=settings.nacos_username,
        password=settings.nacos_password
    )

    # 加载配置
    config_manager = ConfigManager(
        nacos_client=nacos_client,
        data_id='user-service.yaml',
        group=settings.nacos_group
    )
    config_manager.load()

    # 注册服务
    port = config_manager.get('server.port', 8001)
    registrar = ServiceRegistrar(
        nacos_client=nacos_client,
        service_name='user-service',
        port=port
    )
    registrar.start()

    logger.info("用户服务启动完成")

    yield

    # 清理
    if registrar:
        registrar.stop()
    logger.info("用户服务已停止")


# 创建应用
app = FastAPI(
    title="用户服务",
    description="用户管理微服务",
    version="1.0.0",
    lifespan=lifespan
)

# 跨域
app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)

# 路由
app.include_router(user.router, prefix="/api/users", tags=["用户管理"])


@app.get("/health")
async def health():
    """健康检查"""
    return {"status": "healthy", "service": "user-service"}


@app.get("/info")
async def info():
    """服务信息"""
    instances = nacos_client.get_instances('user-service')
    return {
        "service": "user-service",
        "instances": len(instances),
        "config": config_manager._config if config_manager else {}
    }


if __name__ == "__main__":
    import uvicorn

    # 从 Nacos 获取端口配置
    port = 8001
    if config_manager:
        port = config_manager.get('server.port', 8001)

    uvicorn.run(app, host="0.0.0.0", port=port)

用户路由

# user-service/routers/user.py
from fastapi import APIRouter, HTTPException, Depends
from pydantic import BaseModel
from typing import List, Optional
import logging

router = APIRouter()
logger = logging.getLogger(__name__)


class UserCreate(BaseModel):
    username: str
    email: str
    password: str


class UserUpdate(BaseModel):
    email: Optional[str] = None
    password: Optional[str] = None


class UserResponse(BaseModel):
    id: int
    username: str
    email: str


# 模拟数据库
fake_db = {
    1: {"id": 1, "username": "admin", "email": "admin@example.com"},
    2: {"id": 2, "username": "user", "email": "user@example.com"}
}
next_id = 3


@router.get("/", response_model=List[UserResponse])
async def list_users():
    """获取用户列表"""
    return list(fake_db.values())


@router.get("/{user_id}", response_model=UserResponse)
async def get_user(user_id: int):
    """获取用户详情"""
    if user_id not in fake_db:
        raise HTTPException(status_code=404, detail="用户不存在")
    return fake_db[user_id]


@router.post("/", response_model=UserResponse)
async def create_user(user: UserCreate):
    """创建用户"""
    global next_id

    new_user = {
        "id": next_id,
        "username": user.username,
        "email": user.email
    }
    fake_db[next_id] = new_user
    next_id += 1

    logger.info(f"创建用户: {user.username}")
    return new_user


@router.put("/{user_id}", response_model=UserResponse)
async def update_user(user_id: int, user: UserUpdate):
    """更新用户"""
    if user_id not in fake_db:
        raise HTTPException(status_code=404, detail="用户不存在")

    existing = fake_db[user_id]
    if user.email:
        existing["email"] = user.email

    logger.info(f"更新用户: {user_id}")
    return existing


@router.delete("/{user_id}")
async def delete_user(user_id: int):
    """删除用户"""
    if user_id not in fake_db:
        raise HTTPException(status_code=404, detail="用户不存在")

    del fake_db[user_id]
    logger.info(f"删除用户: {user_id}")
    return {"message": "删除成功"}

订单服务

主程序

# order-service/main.py
import logging
from contextlib import asynccontextmanager
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware

from common.nacos_client import NacosClient, ServiceRegistrar
from common.config import NacosConfigSettings, ConfigManager
from common.service_discovery import ServiceDiscovery, ServiceProxy

from routers import order

logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)

settings = NacosConfigSettings()

nacos_client: NacosClient = None
registrar: ServiceRegistrar = None
config_manager: ConfigManager = None
discovery: ServiceDiscovery = None
user_service: ServiceProxy = None


@asynccontextmanager
async def lifespan(app: FastAPI):
    """应用生命周期"""
    global nacos_client, registrar, config_manager, discovery, user_service

    # 初始化
    nacos_client = NacosClient(
        server_addresses=settings.nacos_server,
        namespace=settings.nacos_namespace,
        username=settings.nacos_username,
        password=settings.nacos_password
    )

    config_manager = ConfigManager(
        nacos_client=nacos_client,
        data_id='order-service.yaml',
        group=settings.nacos_group
    )
    config_manager.load()

    # 服务发现
    discovery = ServiceDiscovery(nacos_client)
    user_service = ServiceProxy(discovery, 'user-service')

    # 注册服务
    port = config_manager.get('server.port', 8002)
    registrar = ServiceRegistrar(
        nacos_client=nacos_client,
        service_name='order-service',
        port=port
    )
    registrar.start()

    logger.info("订单服务启动完成")

    yield

    if registrar:
        registrar.stop()
    logger.info("订单服务已停止")


app = FastAPI(
    title="订单服务",
    description="订单管理微服务",
    version="1.0.0",
    lifespan=lifespan
)

app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)

app.include_router(order.router, prefix="/api/orders", tags=["订单管理"])


@app.get("/health")
async def health():
    """健康检查"""
    return {"status": "healthy", "service": "order-service"}


if __name__ == "__main__":
    import uvicorn
    uvicorn.run(app, host="0.0.0.0", port=8002)

订单路由(调用用户服务)

# order-service/routers/order.py
from fastapi import APIRouter, HTTPException
from pydantic import BaseModel
from typing import List, Optional
import logging
import sys
sys.path.append('..')

from common.service_discovery import user_service

router = APIRouter()
logger = logging.getLogger(__name__)


class OrderCreate(BaseModel):
    user_id: int
    product: str
    quantity: int
    price: float


class OrderResponse(BaseModel):
    id: int
    user_id: int
    user_name: Optional[str] = None
    product: str
    quantity: int
    price: float
    total: float


# 模拟数据库
fake_db = {
    1: {"id": 1, "user_id": 1, "product": "iPhone", "quantity": 1, "price": 999.0}
}
next_id = 2


@router.get("/", response_model=List[OrderResponse])
async def list_orders():
    """获取订单列表"""
    orders = []
    for order in fake_db.values():
        order_data = order.copy()
        order_data["total"] = order["quantity"] * order["price"]

        # 调用用户服务获取用户信息
        try:
            response = await user_service.get(f"/api/users/{order['user_id']}")
            if response.status_code == 200:
                user = response.json()
                order_data["user_name"] = user["username"]
        except Exception as e:
            logger.warning(f"获取用户信息失败: {e}")

        orders.append(order_data)

    return orders


@router.get("/{order_id}", response_model=OrderResponse)
async def get_order(order_id: int):
    """获取订单详情"""
    if order_id not in fake_db:
        raise HTTPException(status_code=404, detail="订单不存在")

    order = fake_db[order_id].copy()
    order["total"] = order["quantity"] * order["price"]

    # 获取用户信息
    try:
        response = await user_service.get(f"/api/users/{order['user_id']}")
        if response.status_code == 200:
            user = response.json()
            order["user_name"] = user["username"]
    except Exception as e:
        logger.warning(f"获取用户信息失败: {e}")

    return order


@router.post("/", response_model=OrderResponse)
async def create_order(order: OrderCreate):
    """创建订单"""
    global next_id

    # 验证用户是否存在
    try:
        response = await user_service.get(f"/api/users/{order.user_id}")
        if response.status_code != 200:
            raise HTTPException(status_code=400, detail="用户不存在")
    except HTTPException:
        raise
    except Exception as e:
        logger.error(f"验证用户失败: {e}")
        raise HTTPException(status_code=503, detail="用户服务不可用")

    new_order = {
        "id": next_id,
        "user_id": order.user_id,
        "product": order.product,
        "quantity": order.quantity,
        "price": order.price
    }
    fake_db[next_id] = new_order
    next_id += 1

    result = new_order.copy()
    result["total"] = order.quantity * order.price

    logger.info(f"创建订单: {new_order['id']}")
    return result


@router.delete("/{order_id}")
async def delete_order(order_id: int):
    """删除订单"""
    if order_id not in fake_db:
        raise HTTPException(status_code=404, detail="订单不存在")

    del fake_db[order_id]
    logger.info(f"删除订单: {order_id}")
    return {"message": "删除成功"}

API 网关

# gateway/main.py
import logging
from fastapi import FastAPI, Request, HTTPException
from fastapi.responses import JSONResponse
from fastapi.middleware.cors import CORSMiddleware
import httpx

from common.nacos_client import NacosClient
from common.service_discovery import ServiceDiscovery

logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)

# 初始化
nacos_client = NacosClient(
    server_addresses='localhost:8848',
    namespace='dev'
)
discovery = ServiceDiscovery(nacos_client)

app = FastAPI(
    title="API 网关",
    description="微服务 API 网关",
    version="1.0.0"
)

app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)

# 路由映射
ROUTE_MAP = {
    "/api/users": "user-service",
    "/api/orders": "order-service"
}


@app.api_route("/{path:path}", methods=["GET", "POST", "PUT", "DELETE"])
async def proxy(request: Request, path: str):
    """代理请求到后端服务"""
    # 查找目标服务
    target_service = None
    for prefix, service in ROUTE_MAP.items():
        if path.startswith(prefix[1:]):  # 去掉开头的 /
            target_service = service
            break

    if not target_service:
        raise HTTPException(status_code=404, detail="路由不存在")

    # 获取服务实例
    url = discovery.get_url(target_service, f"/{path}")
    if not url:
        raise HTTPException(status_code=503, detail=f"服务不可用: {target_service}")

    # 转发请求
    async with httpx.AsyncClient() as client:
        try:
            # 构建请求
            body = await request.body()
            headers = dict(request.headers)
            headers.pop("host", None)

            response = await client.request(
                method=request.method,
                url=url,
                headers=headers,
                content=body,
                params=request.query_params
            )

            return JSONResponse(
                content=response.json() if response.headers.get("content-type", "").startswith("application/json") else {"data": response.text},
                status_code=response.status_code
            )
        except Exception as e:
            logger.error(f"代理请求失败: {e}")
            raise HTTPException(status_code=500, detail="代理请求失败")


@app.get("/health")
async def health():
    """健康检查"""
    return {"status": "healthy", "service": "gateway"}


@app.get("/services")
async def services():
    """查看所有服务状态"""
    result = {}
    for service_name in ["user-service", "order-service"]:
        instances = discovery.get_all_instances(service_name)
        result[service_name] = {
            "count": len(instances),
            "instances": [
                {"ip": i["ip"], "port": i["port"], "healthy": i["healthy"]}
                for i in instances
            ]
        }
    return result


if __name__ == "__main__":
    import uvicorn
    uvicorn.run(app, host="0.0.0.0", port=8000)

Docker Compose 部署

# docker-compose.yml
version: '3.8'

services:
  nacos:
    image: nacos/nacos-server:latest
    container_name: nacos
    environment:
      - MODE=standalone
      - PREFER_HOST_MODE=hostname
    ports:
      - "8848:8848"
      - "9848:9848"
    volumes:
      - nacos_data:/home/nacos/data
    networks:
      - microservices

  user-service:
    build:
      context: .
      dockerfile: Dockerfile.user
    container_name: user-service
    environment:
      - NACOS_SERVER=nacos:8848
      - NACOS_NAMESPACE=dev
    ports:
      - "8001:8001"
    depends_on:
      - nacos
    networks:
      - microservices

  order-service:
    build:
      context: .
      dockerfile: Dockerfile.order
    container_name: order-service
    environment:
      - NACOS_SERVER=nacos:8848
      - NACOS_NAMESPACE=dev
    ports:
      - "8002:8002"
    depends_on:
      - nacos
    networks:
      - microservices

  gateway:
    build:
      context: .
      dockerfile: Dockerfile.gateway
    container_name: gateway
    environment:
      - NACOS_SERVER=nacos:8848
      - NACOS_NAMESPACE=dev
    ports:
      - "8000:8000"
    depends_on:
      - nacos
      - user-service
      - order-service
    networks:
      - microservices

volumes:
  nacos_data:

networks:
  microservices:
    driver: bridge

运行测试

# 启动 Nacos
docker-compose up -d nacos

# 等待 Nacos 启动
sleep 30

# 启动服务
docker-compose up -d

# 查看服务状态
curl http://localhost:8000/services

# 测试用户服务
curl http://localhost:8000/api/users

# 测试订单服务(会调用用户服务)
curl http://localhost:8000/api/orders

# 创建订单
curl -X POST http://localhost:8000/api/orders \
  -H "Content-Type: application/json" \
  -d '{"user_id": 1, "product": "MacBook", "quantity": 2, "price": 1999.0}'

小结

本章实现了一个完整的 FastAPI 微服务架构:

  1. 公共模块:Nacos 客户端封装、配置管理、服务发现
  2. 用户服务:独立的用户管理微服务
  3. 订单服务:演示服务间调用
  4. API 网关:统一入口,路由转发
  5. Docker Compose:一键部署

下一章我们将学习负载均衡的详细实现。