跳转至

第五章:LCEL 链式调用

LCEL(LangChain Expression Language)是 LangChain 0.3+ 的核心语法,用声明式方式组合组件。

LCEL 基础

管道操作符

使用 | 连接组件:

from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser

# 组件
prompt = ChatPromptTemplate.from_template("讲一个关于{topic}的笑话")
llm = ChatOpenAI(model="gpt-4o-mini")
parser = StrOutputParser()

# 组装链
chain = prompt | llm | parser

# 调用
result = chain.invoke({"topic": "程序员"})

执行流程

chain = prompt | llm | parser

# 等价于:
def manual_chain(inputs):
    # 1. prompt: dict → PromptValue
    prompt_value = prompt.invoke(inputs)

    # 2. llm: PromptValue → AIMessage
    ai_message = llm.invoke(prompt_value)

    # 3. parser: AIMessage → str
    output = parser.invoke(ai_message)

    return output

Runnable 接口

所有 LCEL 组件都实现 Runnable 接口:

同步方法

# 单次调用
result = runnable.invoke(input)

# 批量调用
results = runnable.batch([input1, input2, input3])

# 流式输出
for chunk in runnable.stream(input):
    print(chunk)

异步方法

import asyncio

# 异步单次
result = await runnable.ainvoke(input)

# 异步批量
results = await runnable.abatch([input1, input2])

# 异步流式
async for chunk in runnable.astream(input):
    print(chunk)

astream_events

获取详细执行事件:

async def run_with_events():
    async for event in chain.astream_events(
        {"topic": "Python"},
        version="v2"
    ):
        kind = event["event"]
        name = event["name"]

        if kind == "on_prompt_start":
            print(f"提示词开始: {name}")
        elif kind == "on_llm_start":
            print(f"LLM 开始: {name}")
        elif kind == "on_llm_stream":
            chunk = event["data"]["chunk"]
            print(chunk.content, end="")
        elif kind == "on_parser_start":
            print(f"\n解析开始: {name}")

asyncio.run(run_with_events())

RunnablePassthrough

传递输入数据:

直接传递

from langchain_core.runnables import RunnablePassthrough

# 直接传递输入
chain = RunnablePassthrough()
result = chain.invoke("hello")  # "hello"

构建字典

from langchain_core.runnables import RunnablePassthrough

chain = RunnablePassthrough.assign(
    length=lambda x: len(x["text"]),
    uppercase=lambda x: x["text"].upper()
)

result = chain.invoke({"text": "hello"})
# {"text": "hello", "length": 5, "uppercase": "HELLO"}

在链中使用

from langchain_core.runnables import RunnablePassthrough

# 保留原始输入
chain = {
    "original": RunnablePassthrough(),
    "result": prompt | llm | parser
}

result = chain.invoke({"topic": "Python"})
# {"original": {"topic": "Python"}, "result": "..."}

RunnableParallel

并行执行多个分支:

基本用法

from langchain_core.runnables import RunnableParallel

# 定义多个分支
joke_chain = ChatPromptTemplate.from_template("讲一个关于{topic}的笑话") | llm | parser
story_chain = ChatPromptTemplate.from_template("讲一个关于{topic}的故事") | llm | parser

# 并行执行
chain = RunnableParallel(
    joke=joke_chain,
    story=story_chain
)

result = chain.invoke({"topic": "程序员"})
# {"joke": "...", "story": "..."}

字典语法

# 字典语法等价于 RunnableParallel
chain = {
    "joke": joke_chain,
    "story": story_chain
}

组合使用

from operator import itemgetter

chain = {
    "topic": RunnablePassthrough(),
    "joke": ChatPromptTemplate.from_template("讲一个关于{topic}的笑话") | llm | parser,
    "story": ChatPromptTemplate.from_template("讲一个关于{topic}的故事") | llm | parser
}

result = chain.invoke("程序员")
# {"topic": "程序员", "joke": "...", "story": "..."}

RunnableLambda

将自定义函数转为 Runnable:

基本用法

from langchain_core.runnables import RunnableLambda

def parse_output(text: str) -> str:
    return text.upper()

chain = prompt | llm | parser | RunnableLambda(parse_output)

result = chain.invoke({"topic": "Python"})

Lambda 表达式

chain = (
    prompt 
    | llm 
    | parser 
    | (lambda x: x[:100])  # 截取前100字符
)

复杂处理

def format_output(inputs):
    return f"""
主题: {inputs['topic']}
笑话: {inputs['joke']}
故事: {inputs['story']}
"""

chain = (
    RunnableParallel(
        topic=RunnablePassthrough(),
        joke=joke_chain,
        story=story_chain
    )
    | RunnableLambda(format_output)
)

RunnableSequence

顺序执行多个 Runnable:

from langchain_core.runnables import RunnableSequence

# 方式 1:管道语法
chain = prompt | llm | parser

# 方式 2:RunnableSequence
chain = RunnableSequence(prompt, llm, parser)

# 方式 3:first 和 next
chain = prompt.pipe(llm).pipe(parser)

条件分支

RunnableBranch

根据条件选择执行分支:

from langchain_core.runnables import RunnableBranch

# 定义分支
joke_chain = ChatPromptTemplate.from_template("讲一个关于{topic}的笑话") | llm
story_chain = ChatPromptTemplate.from_template("讲一个关于{topic}的故事") | llm
default_chain = ChatPromptTemplate.from_template("介绍{topic}") | llm

# 创建条件分支
branch = RunnableBranch(
    (lambda x: x.get("type") == "joke", joke_chain),
    (lambda x: x.get("type") == "story", story_chain),
    default_chain  # 默认分支
)

# 调用
result1 = branch.invoke({"type": "joke", "topic": "Python"})
result2 = branch.invoke({"type": "story", "topic": "Python"})
result3 = branch.invoke({"topic": "Python"})  # 使用默认

自定义条件

def check_length(inputs):
    return len(inputs.get("text", "")) > 100

branch = RunnableBranch(
    (check_length, long_text_chain),
    default_chain
)

重试和降级

with_retry

添加重试机制:

chain = prompt | llm | parser

# 添加重试
retry_chain = chain.with_retry(
    stop_after_attempt=3,
    wait_exponential_jitter=True,
    retry_if_exception_type=(Exception,)
)

result = retry_chain.invoke({"topic": "Python"})

with_fallbacks

添加降级方案:

from langchain_openai import ChatOpenAI

primary_llm = ChatOpenAI(model="gpt-4o")
fallback_llm = ChatOpenAI(model="gpt-4o-mini")

chain = (
    prompt 
    | primary_llm.with_fallbacks([fallback_llm]) 
    | parser
)

# gpt-4o 失败时自动切换到 gpt-4o-mini
result = chain.invoke({"topic": "Python"})

多级降级

llm_with_fallbacks = primary_llm.with_fallbacks([
    ChatOpenAI(model="gpt-4o-mini"),
    ChatOpenAI(model="gpt-3.5-turbo"),
])

配置化

configurable_fields

运行时配置字段:

from langchain_openai import ChatOpenAI

llm = ChatOpenAI(temperature=0).configurable_fields(
    temperature=ConfigurableField(
        id="temperature",
        name="Temperature",
        description="模型温度"
    )
)

chain = prompt | llm | parser

# 运行时配置
result = chain.invoke(
    {"topic": "Python"},
    config={"configurable": {"temperature": 0.9}}
)

configurable_alternatives

运行时切换组件:

from langchain_openai import ChatOpenAI
from langchain_anthropic import ChatAnthropic

llm = ChatOpenAI(model="gpt-4o-mini").configurable_alternatives(
    ConfigurableField(id="llm"),
    default_key="openai",
    anthropic=ChatAnthropic(model="claude-3-5-sonnet-20241022"),
    deepseek=ChatOpenAI(
        model="deepseek-chat",
        base_url="https://api.deepseek.com/v1"
    )
)

chain = prompt | llm | parser

# 使用 OpenAI(默认)
result1 = chain.invoke({"topic": "Python"})

# 使用 Anthropic
result2 = chain.invoke(
    {"topic": "Python"},
    config={"configurable": {"llm": "anthropic"}}
)

# 使用 DeepSeek
result3 = chain.invoke(
    {"topic": "Python"},
    config={"configurable": {"llm": "deepseek"}}
)

完整示例

RAG 链

from langchain_openai import ChatOpenAI, OpenAIEmbeddings
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_core.runnables import RunnablePassthrough
from langchain_chroma import Chroma

# 组件
llm = ChatOpenAI(model="gpt-4o-mini")
embeddings = OpenAIEmbeddings()
vectorstore = Chroma.from_texts(
    ["Python 是一种编程语言", "LangChain 是 LLM 框架"],
    embeddings
)
retriever = vectorstore.as_retriever()

# 提示词
prompt = ChatPromptTemplate.from_messages([
    ("system", "根据以下上下文回答问题:\n{context}"),
    ("human", "{question}")
])

# RAG 链
chain = (
    {
        "context": retriever | (lambda docs: "\n".join(d.page_content for d in docs)),
        "question": RunnablePassthrough()
    }
    | prompt
    | llm
    | StrOutputParser()
)

result = chain.invoke("什么是 Python?")

多步骤处理链

from langchain_core.runnables import RunnableParallel, RunnableLambda

# 步骤 1:生成大纲
outline_prompt = ChatPromptTemplate.from_template(
    "为'{topic}'生成一个文章大纲"
)
outline_chain = outline_prompt | llm | StrOutputParser()

# 步骤 2:根据大纲写文章
article_prompt = ChatPromptTemplate.from_template(
    "根据以下大纲写一篇文章:\n{outline}"
)
article_chain = article_prompt | llm | StrOutputParser()

# 步骤 3:总结文章
summary_prompt = ChatPromptTemplate.from_template(
    "用一句话总结以下文章:\n{article}"
)
summary_chain = summary_prompt | llm | StrOutputParser()

# 组合
def process_article(inputs):
    outline = inputs["outline"]
    article = inputs["article"]
    summary = inputs["summary"]
    return f"""
## 大纲
{outline}

## 文章
{article}

## 总结
{summary}
"""

full_chain = (
    {"topic": RunnablePassthrough()}
    | RunnableParallel(
        outline=outline_chain,
    )
    | {
        "outline": itemgetter("outline"),
        "topic": RunnablePassthrough()
    }
    | RunnableParallel(
        outline=itemgetter("outline"),
        article=article_prompt | llm | StrOutputParser()
    )
    # ... 继续组合
)

小结

本章学习了:

  • ✅ LCEL 管道语法
  • ✅ Runnable 接口方法
  • ✅ RunnablePassthrough 传递数据
  • ✅ RunnableParallel 并行执行
  • ✅ RunnableLambda 自定义函数
  • ✅ RunnableBranch 条件分支
  • ✅ 重试和降级机制
  • ✅ 运行时配置

下一章

第六章:记忆系统 - 学习如何维护对话上下文。