第六章:FastAPI 微服务实战¶
项目结构¶
fastapi-nacos-microservices/
├── common/ # 公共模块
│ ├── __init__.py
│ ├── nacos_client.py # Nacos 客户端封装
│ ├── config.py # 配置管理
│ └── service_discovery.py # 服务发现
├── user-service/ # 用户服务
│ ├── __init__.py
│ ├── main.py
│ ├── config.py
│ ├── models.py
│ ├── schemas.py
│ └── routers/
│ └── user.py
├── order-service/ # 订单服务
│ ├── __init__.py
│ ├── main.py
│ ├── config.py
│ ├── models.py
│ ├── schemas.py
│ └── routers/
│ └── order.py
├── gateway/ # API 网关
│ ├── __init__.py
│ ├── main.py
│ └── routes.py
├── docker-compose.yml
└── requirements.txt
公共模块¶
Nacos 客户端封装¶
# common/nacos_client.py
import nacos
import socket
import threading
import time
import logging
from typing import Optional, Dict, Any
from contextlib import contextmanager
logger = logging.getLogger(__name__)
class NacosClient:
"""Nacos 客户端封装"""
_instance: Optional['NacosClient'] = None
_lock = threading.Lock()
def __new__(cls, *args, **kwargs):
"""单例模式"""
if cls._instance is None:
with cls._lock:
if cls._instance is None:
cls._instance = super().__new__(cls)
return cls._instance
def __init__(
self,
server_addresses: str,
namespace: str = 'public',
username: str = 'nacos',
password: str = 'nacos'
):
if hasattr(self, '_initialized') and self._initialized:
return
self.client = nacos.NacosClient(
server_addresses=server_addresses,
namespace=namespace,
username=username,
password=password
)
self.namespace = namespace
self._initialized = True
logger.info(f"Nacos 客户端初始化完成: {server_addresses}")
@staticmethod
def get_local_ip() -> str:
"""获取本机 IP"""
try:
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.connect(('8.8.8.8', 80))
ip = s.getsockname()[0]
s.close()
return ip
except Exception:
return '127.0.0.1'
def register_service(
self,
service_name: str,
port: int,
ip: str = None,
weight: float = 1.0,
metadata: Dict[str, Any] = None,
group_name: str = 'DEFAULT_GROUP',
ephemeral: bool = True
) -> bool:
"""注册服务"""
ip = ip or self.get_local_ip()
metadata = metadata or {}
try:
self.client.add_naming_instance(
service_name=service_name,
ip=ip,
port=port,
weight=weight,
metadata=metadata,
group_name=group_name,
ephemeral=ephemeral
)
logger.info(f"服务注册成功: {service_name} @ {ip}:{port}")
return True
except Exception as e:
logger.error(f"服务注册失败: {e}")
return False
def deregister_service(
self,
service_name: str,
port: int,
ip: str = None,
group_name: str = 'DEFAULT_GROUP'
) -> bool:
"""注销服务"""
ip = ip or self.get_local_ip()
try:
self.client.remove_naming_instance(
service_name=service_name,
ip=ip,
port=port,
group_name=group_name
)
logger.info(f"服务注销成功: {service_name}")
return True
except Exception as e:
logger.error(f"服务注销失败: {e}")
return False
def get_instances(
self,
service_name: str,
group_name: str = 'DEFAULT_GROUP',
clusters: str = None,
healthy_only: bool = True
) -> list:
"""获取服务实例列表"""
try:
result = self.client.list_naming_instance(
service_name=service_name,
group_name=group_name,
clusters=clusters,
healthy_only=healthy_only
)
return result.get('hosts', [])
except Exception as e:
logger.error(f"获取服务实例失败: {e}")
return []
def get_config(
self,
data_id: str,
group: str = 'DEFAULT_GROUP'
) -> Optional[str]:
"""获取配置"""
try:
return self.client.get_config(
data_id=data_id,
group=group
)
except Exception as e:
logger.error(f"获取配置失败: {e}")
return None
def publish_config(
self,
data_id: str,
content: str,
group: str = 'DEFAULT_GROUP',
config_type: str = 'yaml'
) -> bool:
"""发布配置"""
try:
return self.client.publish_config(
data_id=data_id,
group=group,
content=content,
config_type=config_type
)
except Exception as e:
logger.error(f"发布配置失败: {e}")
return False
class ServiceRegistrar:
"""服务注册器(带心跳)"""
def __init__(
self,
nacos_client: NacosClient,
service_name: str,
port: int,
heartbeat_interval: int = 5
):
self.nacos = nacos_client
self.service_name = service_name
self.port = port
self.ip = nacos_client.get_local_ip()
self.heartbeat_interval = heartbeat_interval
self._running = False
self._thread: Optional[threading.Thread] = None
def start(self):
"""启动服务注册"""
# 注册服务
self.nacos.register_service(
service_name=self.service_name,
ip=self.ip,
port=self.port
)
# 启动心跳线程
self._running = True
self._thread = threading.Thread(target=self._heartbeat, daemon=True)
self._thread.start()
logger.info(f"服务注册器启动: {self.service_name}")
def stop(self):
"""停止服务注册"""
self._running = False
self.nacos.deregister_service(
service_name=self.service_name,
ip=self.ip,
port=self.port
)
logger.info(f"服务注册器停止: {self.service_name}")
def _heartbeat(self):
"""心跳线程"""
while self._running:
try:
self.nacos.client.send_heartbeat(
service_name=self.service_name,
ip=self.ip,
port=self.port
)
except Exception as e:
logger.warning(f"心跳发送失败: {e}")
time.sleep(self.heartbeat_interval)
@contextmanager
def nacos_service(
server_addresses: str,
namespace: str,
service_name: str,
port: int
):
"""Nacos 服务上下文管理器"""
client = NacosClient(
server_addresses=server_addresses,
namespace=namespace
)
registrar = ServiceRegistrar(client, service_name, port)
try:
registrar.start()
yield client
finally:
registrar.stop()
配置管理¶
# common/config.py
import yaml
import json
import logging
from typing import Any, Dict, Optional
from pydantic import BaseSettings
from pydantic.fields import Field
logger = logging.getLogger(__name__)
class NacosConfigSettings(BaseSettings):
"""Nacos 配置"""
nacos_server: str = Field('localhost:8848', env='NACOS_SERVER')
nacos_namespace: str = Field('dev', env='NACOS_NAMESPACE')
nacos_username: str = Field('nacos', env='NACOS_USERNAME')
nacos_password: str = Field('nacos', env='NACOS_PASSWORD')
nacos_group: str = Field('DEFAULT_GROUP', env='NACOS_GROUP')
class Config:
env_file = '.env'
env_file_encoding = 'utf-8'
class ConfigManager:
"""配置管理器"""
def __init__(self, nacos_client, data_id: str, group: str = 'DEFAULT_GROUP'):
self.nacos = nacos_client
self.data_id = data_id
self.group = group
self._config: Dict[str, Any] = {}
self._callbacks = []
def load(self) -> Dict[str, Any]:
"""加载配置"""
content = self.nacos.get_config(
data_id=self.data_id,
group=self.group
)
if content:
# 根据文件类型解析
if self.data_id.endswith('.yaml') or self.data_id.endswith('.yml'):
self._config = yaml.safe_load(content) or {}
elif self.data_id.endswith('.json'):
self._config = json.loads(content)
else:
self._config = {'content': content}
logger.info(f"配置加载成功: {self.data_id}")
else:
logger.warning(f"配置不存在: {self.data_id}")
return self._config
def get(self, key: str, default: Any = None) -> Any:
"""获取配置项"""
keys = key.split('.')
value = self._config
for k in keys:
if isinstance(value, dict):
value = value.get(k)
else:
return default
return value if value is not None else default
def reload(self):
"""重新加载配置"""
old_config = self._config.copy()
self.load()
# 触发回调
for callback in self._callbacks:
try:
callback(old_config, self._config)
except Exception as e:
logger.error(f"配置回调执行失败: {e}")
def on_change(self, callback):
"""注册配置变更回调"""
self._callbacks.append(callback)
服务发现¶
# common/service_discovery.py
import random
import logging
from typing import List, Dict, Optional
from .nacos_client import NacosClient
logger = logging.getLogger(__name__)
class ServiceDiscovery:
"""服务发现"""
def __init__(self, nacos_client: NacosClient):
self.nacos = nacos_client
self._cache: Dict[str, List[Dict]] = {}
self._round_robin_index: Dict[str, int] = {}
def get_instance(
self,
service_name: str,
group_name: str = 'DEFAULT_GROUP',
strategy: str = 'random'
) -> Optional[Dict]:
"""获取单个服务实例"""
instances = self.nacos.get_instances(
service_name=service_name,
group_name=group_name
)
if not instances:
logger.warning(f"没有可用的服务实例: {service_name}")
return None
if strategy == 'random':
return random.choice(instances)
elif strategy == 'round_robin':
return self._round_robin(service_name, instances)
elif strategy == 'weighted':
return self._weighted_random(instances)
else:
return instances[0]
def get_all_instances(
self,
service_name: str,
group_name: str = 'DEFAULT_GROUP'
) -> List[Dict]:
"""获取所有服务实例"""
return self.nacos.get_instances(
service_name=service_name,
group_name=group_name
)
def get_url(
self,
service_name: str,
path: str = '',
group_name: str = 'DEFAULT_GROUP',
strategy: str = 'random'
) -> Optional[str]:
"""获取服务 URL"""
instance = self.get_instance(service_name, group_name, strategy)
if instance:
return f"http://{instance['ip']}:{instance['port']}{path}"
return None
def _round_robin(self, service_name: str, instances: List[Dict]) -> Dict:
"""轮询选择"""
if service_name not in self._round_robin_index:
self._round_robin_index[service_name] = 0
index = self._round_robin_index[service_name] % len(instances)
self._round_robin_index[service_name] += 1
return instances[index]
def _weighted_random(self, instances: List[Dict]) -> Dict:
"""加权随机选择"""
total_weight = sum(inst.get('weight', 1.0) for inst in instances)
r = random.uniform(0, total_weight)
current_weight = 0
for instance in instances:
current_weight += instance.get('weight', 1.0)
if current_weight >= r:
return instance
return instances[-1]
class ServiceProxy:
"""服务代理(用于服务间调用)"""
def __init__(
self,
discovery: ServiceDiscovery,
service_name: str,
group_name: str = 'DEFAULT_GROUP'
):
self.discovery = discovery
self.service_name = service_name
self.group_name = group_name
def get_base_url(self) -> Optional[str]:
"""获取基础 URL"""
return self.discovery.get_url(
service_name=self.service_name,
group_name=self.group_name
)
async def request(
self,
method: str,
path: str,
**kwargs
):
"""发送请求"""
import httpx
base_url = self.get_base_url()
if not base_url:
raise Exception(f"服务不可用: {self.service_name}")
url = f"{base_url}{path}"
async with httpx.AsyncClient() as client:
response = await client.request(method, url, **kwargs)
return response
async def get(self, path: str, **kwargs):
"""GET 请求"""
return await self.request('GET', path, **kwargs)
async def post(self, path: str, **kwargs):
"""POST 请求"""
return await self.request('POST', path, **kwargs)
async def put(self, path: str, **kwargs):
"""PUT 请求"""
return await self.request('PUT', path, **kwargs)
async def delete(self, path: str, **kwargs):
"""DELETE 请求"""
return await self.request('DELETE', path, **kwargs)
用户服务¶
主程序¶
# user-service/main.py
import logging
from contextlib import asynccontextmanager
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from common.nacos_client import NacosClient, ServiceRegistrar
from common.config import NacosConfigSettings, ConfigManager
from common.service_discovery import ServiceDiscovery
from routers import user
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
# 配置
settings = NacosConfigSettings()
# 全局变量
nacos_client: NacosClient = None
registrar: ServiceRegistrar = None
config_manager: ConfigManager = None
@asynccontextmanager
async def lifespan(app: FastAPI):
"""应用生命周期"""
global nacos_client, registrar, config_manager
# 初始化 Nacos 客户端
nacos_client = NacosClient(
server_addresses=settings.nacos_server,
namespace=settings.nacos_namespace,
username=settings.nacos_username,
password=settings.nacos_password
)
# 加载配置
config_manager = ConfigManager(
nacos_client=nacos_client,
data_id='user-service.yaml',
group=settings.nacos_group
)
config_manager.load()
# 注册服务
port = config_manager.get('server.port', 8001)
registrar = ServiceRegistrar(
nacos_client=nacos_client,
service_name='user-service',
port=port
)
registrar.start()
logger.info("用户服务启动完成")
yield
# 清理
if registrar:
registrar.stop()
logger.info("用户服务已停止")
# 创建应用
app = FastAPI(
title="用户服务",
description="用户管理微服务",
version="1.0.0",
lifespan=lifespan
)
# 跨域
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# 路由
app.include_router(user.router, prefix="/api/users", tags=["用户管理"])
@app.get("/health")
async def health():
"""健康检查"""
return {"status": "healthy", "service": "user-service"}
@app.get("/info")
async def info():
"""服务信息"""
instances = nacos_client.get_instances('user-service')
return {
"service": "user-service",
"instances": len(instances),
"config": config_manager._config if config_manager else {}
}
if __name__ == "__main__":
import uvicorn
# 从 Nacos 获取端口配置
port = 8001
if config_manager:
port = config_manager.get('server.port', 8001)
uvicorn.run(app, host="0.0.0.0", port=port)
用户路由¶
# user-service/routers/user.py
from fastapi import APIRouter, HTTPException, Depends
from pydantic import BaseModel
from typing import List, Optional
import logging
router = APIRouter()
logger = logging.getLogger(__name__)
class UserCreate(BaseModel):
username: str
email: str
password: str
class UserUpdate(BaseModel):
email: Optional[str] = None
password: Optional[str] = None
class UserResponse(BaseModel):
id: int
username: str
email: str
# 模拟数据库
fake_db = {
1: {"id": 1, "username": "admin", "email": "admin@example.com"},
2: {"id": 2, "username": "user", "email": "user@example.com"}
}
next_id = 3
@router.get("/", response_model=List[UserResponse])
async def list_users():
"""获取用户列表"""
return list(fake_db.values())
@router.get("/{user_id}", response_model=UserResponse)
async def get_user(user_id: int):
"""获取用户详情"""
if user_id not in fake_db:
raise HTTPException(status_code=404, detail="用户不存在")
return fake_db[user_id]
@router.post("/", response_model=UserResponse)
async def create_user(user: UserCreate):
"""创建用户"""
global next_id
new_user = {
"id": next_id,
"username": user.username,
"email": user.email
}
fake_db[next_id] = new_user
next_id += 1
logger.info(f"创建用户: {user.username}")
return new_user
@router.put("/{user_id}", response_model=UserResponse)
async def update_user(user_id: int, user: UserUpdate):
"""更新用户"""
if user_id not in fake_db:
raise HTTPException(status_code=404, detail="用户不存在")
existing = fake_db[user_id]
if user.email:
existing["email"] = user.email
logger.info(f"更新用户: {user_id}")
return existing
@router.delete("/{user_id}")
async def delete_user(user_id: int):
"""删除用户"""
if user_id not in fake_db:
raise HTTPException(status_code=404, detail="用户不存在")
del fake_db[user_id]
logger.info(f"删除用户: {user_id}")
return {"message": "删除成功"}
订单服务¶
主程序¶
# order-service/main.py
import logging
from contextlib import asynccontextmanager
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from common.nacos_client import NacosClient, ServiceRegistrar
from common.config import NacosConfigSettings, ConfigManager
from common.service_discovery import ServiceDiscovery, ServiceProxy
from routers import order
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
settings = NacosConfigSettings()
nacos_client: NacosClient = None
registrar: ServiceRegistrar = None
config_manager: ConfigManager = None
discovery: ServiceDiscovery = None
user_service: ServiceProxy = None
@asynccontextmanager
async def lifespan(app: FastAPI):
"""应用生命周期"""
global nacos_client, registrar, config_manager, discovery, user_service
# 初始化
nacos_client = NacosClient(
server_addresses=settings.nacos_server,
namespace=settings.nacos_namespace,
username=settings.nacos_username,
password=settings.nacos_password
)
config_manager = ConfigManager(
nacos_client=nacos_client,
data_id='order-service.yaml',
group=settings.nacos_group
)
config_manager.load()
# 服务发现
discovery = ServiceDiscovery(nacos_client)
user_service = ServiceProxy(discovery, 'user-service')
# 注册服务
port = config_manager.get('server.port', 8002)
registrar = ServiceRegistrar(
nacos_client=nacos_client,
service_name='order-service',
port=port
)
registrar.start()
logger.info("订单服务启动完成")
yield
if registrar:
registrar.stop()
logger.info("订单服务已停止")
app = FastAPI(
title="订单服务",
description="订单管理微服务",
version="1.0.0",
lifespan=lifespan
)
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
app.include_router(order.router, prefix="/api/orders", tags=["订单管理"])
@app.get("/health")
async def health():
"""健康检查"""
return {"status": "healthy", "service": "order-service"}
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8002)
订单路由(调用用户服务)¶
# order-service/routers/order.py
from fastapi import APIRouter, HTTPException
from pydantic import BaseModel
from typing import List, Optional
import logging
import sys
sys.path.append('..')
from common.service_discovery import user_service
router = APIRouter()
logger = logging.getLogger(__name__)
class OrderCreate(BaseModel):
user_id: int
product: str
quantity: int
price: float
class OrderResponse(BaseModel):
id: int
user_id: int
user_name: Optional[str] = None
product: str
quantity: int
price: float
total: float
# 模拟数据库
fake_db = {
1: {"id": 1, "user_id": 1, "product": "iPhone", "quantity": 1, "price": 999.0}
}
next_id = 2
@router.get("/", response_model=List[OrderResponse])
async def list_orders():
"""获取订单列表"""
orders = []
for order in fake_db.values():
order_data = order.copy()
order_data["total"] = order["quantity"] * order["price"]
# 调用用户服务获取用户信息
try:
response = await user_service.get(f"/api/users/{order['user_id']}")
if response.status_code == 200:
user = response.json()
order_data["user_name"] = user["username"]
except Exception as e:
logger.warning(f"获取用户信息失败: {e}")
orders.append(order_data)
return orders
@router.get("/{order_id}", response_model=OrderResponse)
async def get_order(order_id: int):
"""获取订单详情"""
if order_id not in fake_db:
raise HTTPException(status_code=404, detail="订单不存在")
order = fake_db[order_id].copy()
order["total"] = order["quantity"] * order["price"]
# 获取用户信息
try:
response = await user_service.get(f"/api/users/{order['user_id']}")
if response.status_code == 200:
user = response.json()
order["user_name"] = user["username"]
except Exception as e:
logger.warning(f"获取用户信息失败: {e}")
return order
@router.post("/", response_model=OrderResponse)
async def create_order(order: OrderCreate):
"""创建订单"""
global next_id
# 验证用户是否存在
try:
response = await user_service.get(f"/api/users/{order.user_id}")
if response.status_code != 200:
raise HTTPException(status_code=400, detail="用户不存在")
except HTTPException:
raise
except Exception as e:
logger.error(f"验证用户失败: {e}")
raise HTTPException(status_code=503, detail="用户服务不可用")
new_order = {
"id": next_id,
"user_id": order.user_id,
"product": order.product,
"quantity": order.quantity,
"price": order.price
}
fake_db[next_id] = new_order
next_id += 1
result = new_order.copy()
result["total"] = order.quantity * order.price
logger.info(f"创建订单: {new_order['id']}")
return result
@router.delete("/{order_id}")
async def delete_order(order_id: int):
"""删除订单"""
if order_id not in fake_db:
raise HTTPException(status_code=404, detail="订单不存在")
del fake_db[order_id]
logger.info(f"删除订单: {order_id}")
return {"message": "删除成功"}
API 网关¶
# gateway/main.py
import logging
from fastapi import FastAPI, Request, HTTPException
from fastapi.responses import JSONResponse
from fastapi.middleware.cors import CORSMiddleware
import httpx
from common.nacos_client import NacosClient
from common.service_discovery import ServiceDiscovery
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
# 初始化
nacos_client = NacosClient(
server_addresses='localhost:8848',
namespace='dev'
)
discovery = ServiceDiscovery(nacos_client)
app = FastAPI(
title="API 网关",
description="微服务 API 网关",
version="1.0.0"
)
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# 路由映射
ROUTE_MAP = {
"/api/users": "user-service",
"/api/orders": "order-service"
}
@app.api_route("/{path:path}", methods=["GET", "POST", "PUT", "DELETE"])
async def proxy(request: Request, path: str):
"""代理请求到后端服务"""
# 查找目标服务
target_service = None
for prefix, service in ROUTE_MAP.items():
if path.startswith(prefix[1:]): # 去掉开头的 /
target_service = service
break
if not target_service:
raise HTTPException(status_code=404, detail="路由不存在")
# 获取服务实例
url = discovery.get_url(target_service, f"/{path}")
if not url:
raise HTTPException(status_code=503, detail=f"服务不可用: {target_service}")
# 转发请求
async with httpx.AsyncClient() as client:
try:
# 构建请求
body = await request.body()
headers = dict(request.headers)
headers.pop("host", None)
response = await client.request(
method=request.method,
url=url,
headers=headers,
content=body,
params=request.query_params
)
return JSONResponse(
content=response.json() if response.headers.get("content-type", "").startswith("application/json") else {"data": response.text},
status_code=response.status_code
)
except Exception as e:
logger.error(f"代理请求失败: {e}")
raise HTTPException(status_code=500, detail="代理请求失败")
@app.get("/health")
async def health():
"""健康检查"""
return {"status": "healthy", "service": "gateway"}
@app.get("/services")
async def services():
"""查看所有服务状态"""
result = {}
for service_name in ["user-service", "order-service"]:
instances = discovery.get_all_instances(service_name)
result[service_name] = {
"count": len(instances),
"instances": [
{"ip": i["ip"], "port": i["port"], "healthy": i["healthy"]}
for i in instances
]
}
return result
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)
Docker Compose 部署¶
# docker-compose.yml
version: '3.8'
services:
nacos:
image: nacos/nacos-server:latest
container_name: nacos
environment:
- MODE=standalone
- PREFER_HOST_MODE=hostname
ports:
- "8848:8848"
- "9848:9848"
volumes:
- nacos_data:/home/nacos/data
networks:
- microservices
user-service:
build:
context: .
dockerfile: Dockerfile.user
container_name: user-service
environment:
- NACOS_SERVER=nacos:8848
- NACOS_NAMESPACE=dev
ports:
- "8001:8001"
depends_on:
- nacos
networks:
- microservices
order-service:
build:
context: .
dockerfile: Dockerfile.order
container_name: order-service
environment:
- NACOS_SERVER=nacos:8848
- NACOS_NAMESPACE=dev
ports:
- "8002:8002"
depends_on:
- nacos
networks:
- microservices
gateway:
build:
context: .
dockerfile: Dockerfile.gateway
container_name: gateway
environment:
- NACOS_SERVER=nacos:8848
- NACOS_NAMESPACE=dev
ports:
- "8000:8000"
depends_on:
- nacos
- user-service
- order-service
networks:
- microservices
volumes:
nacos_data:
networks:
microservices:
driver: bridge
运行测试¶
# 启动 Nacos
docker-compose up -d nacos
# 等待 Nacos 启动
sleep 30
# 启动服务
docker-compose up -d
# 查看服务状态
curl http://localhost:8000/services
# 测试用户服务
curl http://localhost:8000/api/users
# 测试订单服务(会调用用户服务)
curl http://localhost:8000/api/orders
# 创建订单
curl -X POST http://localhost:8000/api/orders \
-H "Content-Type: application/json" \
-d '{"user_id": 1, "product": "MacBook", "quantity": 2, "price": 1999.0}'
小结¶
本章实现了一个完整的 FastAPI 微服务架构:
- 公共模块:Nacos 客户端封装、配置管理、服务发现
- 用户服务:独立的用户管理微服务
- 订单服务:演示服务间调用
- API 网关:统一入口,路由转发
- Docker Compose:一键部署
下一章我们将学习负载均衡的详细实现。