第五章:LCEL 链式调用¶
LCEL(LangChain Expression Language)是 LangChain 0.3+ 的核心语法,用声明式方式组合组件。
LCEL 基础¶
管道操作符¶
使用 | 连接组件:
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
# 组件
prompt = ChatPromptTemplate.from_template("讲一个关于{topic}的笑话")
llm = ChatOpenAI(model="gpt-4o-mini")
parser = StrOutputParser()
# 组装链
chain = prompt | llm | parser
# 调用
result = chain.invoke({"topic": "程序员"})
执行流程¶
chain = prompt | llm | parser
# 等价于:
def manual_chain(inputs):
# 1. prompt: dict → PromptValue
prompt_value = prompt.invoke(inputs)
# 2. llm: PromptValue → AIMessage
ai_message = llm.invoke(prompt_value)
# 3. parser: AIMessage → str
output = parser.invoke(ai_message)
return output
Runnable 接口¶
所有 LCEL 组件都实现 Runnable 接口:
同步方法¶
# 单次调用
result = runnable.invoke(input)
# 批量调用
results = runnable.batch([input1, input2, input3])
# 流式输出
for chunk in runnable.stream(input):
print(chunk)
异步方法¶
import asyncio
# 异步单次
result = await runnable.ainvoke(input)
# 异步批量
results = await runnable.abatch([input1, input2])
# 异步流式
async for chunk in runnable.astream(input):
print(chunk)
astream_events¶
获取详细执行事件:
async def run_with_events():
async for event in chain.astream_events(
{"topic": "Python"},
version="v2"
):
kind = event["event"]
name = event["name"]
if kind == "on_prompt_start":
print(f"提示词开始: {name}")
elif kind == "on_llm_start":
print(f"LLM 开始: {name}")
elif kind == "on_llm_stream":
chunk = event["data"]["chunk"]
print(chunk.content, end="")
elif kind == "on_parser_start":
print(f"\n解析开始: {name}")
asyncio.run(run_with_events())
RunnablePassthrough¶
传递输入数据:
直接传递¶
from langchain_core.runnables import RunnablePassthrough
# 直接传递输入
chain = RunnablePassthrough()
result = chain.invoke("hello") # "hello"
构建字典¶
from langchain_core.runnables import RunnablePassthrough
chain = RunnablePassthrough.assign(
length=lambda x: len(x["text"]),
uppercase=lambda x: x["text"].upper()
)
result = chain.invoke({"text": "hello"})
# {"text": "hello", "length": 5, "uppercase": "HELLO"}
在链中使用¶
from langchain_core.runnables import RunnablePassthrough
# 保留原始输入
chain = {
"original": RunnablePassthrough(),
"result": prompt | llm | parser
}
result = chain.invoke({"topic": "Python"})
# {"original": {"topic": "Python"}, "result": "..."}
RunnableParallel¶
并行执行多个分支:
基本用法¶
from langchain_core.runnables import RunnableParallel
# 定义多个分支
joke_chain = ChatPromptTemplate.from_template("讲一个关于{topic}的笑话") | llm | parser
story_chain = ChatPromptTemplate.from_template("讲一个关于{topic}的故事") | llm | parser
# 并行执行
chain = RunnableParallel(
joke=joke_chain,
story=story_chain
)
result = chain.invoke({"topic": "程序员"})
# {"joke": "...", "story": "..."}
字典语法¶
组合使用¶
from operator import itemgetter
chain = {
"topic": RunnablePassthrough(),
"joke": ChatPromptTemplate.from_template("讲一个关于{topic}的笑话") | llm | parser,
"story": ChatPromptTemplate.from_template("讲一个关于{topic}的故事") | llm | parser
}
result = chain.invoke("程序员")
# {"topic": "程序员", "joke": "...", "story": "..."}
RunnableLambda¶
将自定义函数转为 Runnable:
基本用法¶
from langchain_core.runnables import RunnableLambda
def parse_output(text: str) -> str:
return text.upper()
chain = prompt | llm | parser | RunnableLambda(parse_output)
result = chain.invoke({"topic": "Python"})
Lambda 表达式¶
复杂处理¶
def format_output(inputs):
return f"""
主题: {inputs['topic']}
笑话: {inputs['joke']}
故事: {inputs['story']}
"""
chain = (
RunnableParallel(
topic=RunnablePassthrough(),
joke=joke_chain,
story=story_chain
)
| RunnableLambda(format_output)
)
RunnableSequence¶
顺序执行多个 Runnable:
from langchain_core.runnables import RunnableSequence
# 方式 1:管道语法
chain = prompt | llm | parser
# 方式 2:RunnableSequence
chain = RunnableSequence(prompt, llm, parser)
# 方式 3:first 和 next
chain = prompt.pipe(llm).pipe(parser)
条件分支¶
RunnableBranch¶
根据条件选择执行分支:
from langchain_core.runnables import RunnableBranch
# 定义分支
joke_chain = ChatPromptTemplate.from_template("讲一个关于{topic}的笑话") | llm
story_chain = ChatPromptTemplate.from_template("讲一个关于{topic}的故事") | llm
default_chain = ChatPromptTemplate.from_template("介绍{topic}") | llm
# 创建条件分支
branch = RunnableBranch(
(lambda x: x.get("type") == "joke", joke_chain),
(lambda x: x.get("type") == "story", story_chain),
default_chain # 默认分支
)
# 调用
result1 = branch.invoke({"type": "joke", "topic": "Python"})
result2 = branch.invoke({"type": "story", "topic": "Python"})
result3 = branch.invoke({"topic": "Python"}) # 使用默认
自定义条件¶
def check_length(inputs):
return len(inputs.get("text", "")) > 100
branch = RunnableBranch(
(check_length, long_text_chain),
default_chain
)
重试和降级¶
with_retry¶
添加重试机制:
chain = prompt | llm | parser
# 添加重试
retry_chain = chain.with_retry(
stop_after_attempt=3,
wait_exponential_jitter=True,
retry_if_exception_type=(Exception,)
)
result = retry_chain.invoke({"topic": "Python"})
with_fallbacks¶
添加降级方案:
from langchain_openai import ChatOpenAI
primary_llm = ChatOpenAI(model="gpt-4o")
fallback_llm = ChatOpenAI(model="gpt-4o-mini")
chain = (
prompt
| primary_llm.with_fallbacks([fallback_llm])
| parser
)
# gpt-4o 失败时自动切换到 gpt-4o-mini
result = chain.invoke({"topic": "Python"})
多级降级¶
llm_with_fallbacks = primary_llm.with_fallbacks([
ChatOpenAI(model="gpt-4o-mini"),
ChatOpenAI(model="gpt-3.5-turbo"),
])
配置化¶
configurable_fields¶
运行时配置字段:
from langchain_openai import ChatOpenAI
llm = ChatOpenAI(temperature=0).configurable_fields(
temperature=ConfigurableField(
id="temperature",
name="Temperature",
description="模型温度"
)
)
chain = prompt | llm | parser
# 运行时配置
result = chain.invoke(
{"topic": "Python"},
config={"configurable": {"temperature": 0.9}}
)
configurable_alternatives¶
运行时切换组件:
from langchain_openai import ChatOpenAI
from langchain_anthropic import ChatAnthropic
llm = ChatOpenAI(model="gpt-4o-mini").configurable_alternatives(
ConfigurableField(id="llm"),
default_key="openai",
anthropic=ChatAnthropic(model="claude-3-5-sonnet-20241022"),
deepseek=ChatOpenAI(
model="deepseek-chat",
base_url="https://api.deepseek.com/v1"
)
)
chain = prompt | llm | parser
# 使用 OpenAI(默认)
result1 = chain.invoke({"topic": "Python"})
# 使用 Anthropic
result2 = chain.invoke(
{"topic": "Python"},
config={"configurable": {"llm": "anthropic"}}
)
# 使用 DeepSeek
result3 = chain.invoke(
{"topic": "Python"},
config={"configurable": {"llm": "deepseek"}}
)
完整示例¶
RAG 链¶
from langchain_openai import ChatOpenAI, OpenAIEmbeddings
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_core.runnables import RunnablePassthrough
from langchain_chroma import Chroma
# 组件
llm = ChatOpenAI(model="gpt-4o-mini")
embeddings = OpenAIEmbeddings()
vectorstore = Chroma.from_texts(
["Python 是一种编程语言", "LangChain 是 LLM 框架"],
embeddings
)
retriever = vectorstore.as_retriever()
# 提示词
prompt = ChatPromptTemplate.from_messages([
("system", "根据以下上下文回答问题:\n{context}"),
("human", "{question}")
])
# RAG 链
chain = (
{
"context": retriever | (lambda docs: "\n".join(d.page_content for d in docs)),
"question": RunnablePassthrough()
}
| prompt
| llm
| StrOutputParser()
)
result = chain.invoke("什么是 Python?")
多步骤处理链¶
from langchain_core.runnables import RunnableParallel, RunnableLambda
# 步骤 1:生成大纲
outline_prompt = ChatPromptTemplate.from_template(
"为'{topic}'生成一个文章大纲"
)
outline_chain = outline_prompt | llm | StrOutputParser()
# 步骤 2:根据大纲写文章
article_prompt = ChatPromptTemplate.from_template(
"根据以下大纲写一篇文章:\n{outline}"
)
article_chain = article_prompt | llm | StrOutputParser()
# 步骤 3:总结文章
summary_prompt = ChatPromptTemplate.from_template(
"用一句话总结以下文章:\n{article}"
)
summary_chain = summary_prompt | llm | StrOutputParser()
# 组合
def process_article(inputs):
outline = inputs["outline"]
article = inputs["article"]
summary = inputs["summary"]
return f"""
## 大纲
{outline}
## 文章
{article}
## 总结
{summary}
"""
full_chain = (
{"topic": RunnablePassthrough()}
| RunnableParallel(
outline=outline_chain,
)
| {
"outline": itemgetter("outline"),
"topic": RunnablePassthrough()
}
| RunnableParallel(
outline=itemgetter("outline"),
article=article_prompt | llm | StrOutputParser()
)
# ... 继续组合
)
小结¶
本章学习了:
- ✅ LCEL 管道语法
- ✅ Runnable 接口方法
- ✅ RunnablePassthrough 传递数据
- ✅ RunnableParallel 并行执行
- ✅ RunnableLambda 自定义函数
- ✅ RunnableBranch 条件分支
- ✅ 重试和降级机制
- ✅ 运行时配置
下一章¶
第六章:记忆系统 - 学习如何维护对话上下文。