一切皆 Runnable。 Prompt 模板、ChatModel、OutputParser、Retriever、甚至你自己写的普通函数,只要纳入 LCEL,都被统一成实现了 Runnable 协议的对象。这意味着它们共享同一套方法签名,因此可以无缝相互替换、相互组合| 不是魔法,它只是 Runnable.__or__ 的重载——把左右两个 Runnable 拼成一个新的 RunnableSequence

Runnable 协议:四个标准方法

任何 Runnable 都暴露同一组方法。理解它们的区别,等于一次性学会了 LangChain 里所有组件的调用方式——因为它们全都遵守这套协议。

方法用途输入/输出形态典型场景
invoke(input)单次同步调用单个输入 → 单个输出最常用,一次问答
stream(input)流式同步调用单个输入 → 输出的生成器(逐 token)聊天打字机效果
batch(inputs)批量同步调用输入列表 → 输出列表(并发执行)一次处理多条数据
ainvoke(input)单次异步调用同 invoke,但是 awaitableFastAPI / 高并发后端
astream / abatch异步流式 / 异步批量stream / batch 的 async 版异步服务里的流式与批处理
口诀记忆口诀:i-s-b 三件套(invoke 单发、stream 流式、batch 批量),前面加 a 就是异步版(a = async)。
# 安装(langchain 0.3.x,组件已拆分到独立包)
# pip install langchain langchain-openai

import os
os.environ["OPENAI_API_KEY"] = "sk-..."  # 或用环境变量 / .env

from langchain_openai import ChatOpenAI

# ChatModel 本身就是一个 Runnable,直接享有四个标准方法
model = ChatOpenAI(model="gpt-4o-mini", temperature=0)

# 1) invoke —— 单次同步
resp = model.invoke("用一句话解释什么是 Runnable")
print(resp.content)

# 2) stream —— 逐 token 流式输出(打字机效果)
for chunk in model.stream("写一首关于管道的两行小诗"):
    print(chunk.content, end="", flush=True)
print()

# 3) batch —— 批量并发,输入列表对应输出列表
results = model.batch(["中国的首都", "日本的首都", "法国的首都"])
for r in results:
    print(r.content)

# 4) ainvoke —— 异步(需在 async 函数里 await)
import asyncio
async def main():
    r = await model.ainvoke("异步说一句你好")
    print(r.content)
asyncio.run(main())

LCEL:用 | 组合管道

LCEL 的核心动作只有一个:用 | 把若干 Runnable 串成一条 RunnableSequence。执行时,输入从左流到右,上一个组件的输出自动作为下一个组件的输入。下面是 LangChain 里最经典的「三件套」管道。

# pip install langchain langchain-openai

from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser

prompt = ChatPromptTemplate.from_messages([
    ("system", "你是一个专业的{role}。"),
    ("human", "请解释这个概念:{topic}"),
])
model = ChatOpenAI(model="gpt-4o-mini", temperature=0)
parser = StrOutputParser()  # 把 AIMessage 抽成纯字符串

# 组合即编排:一行声明一条数据流图
chain = prompt | model | parser

# 输入一个 dict —— 这里发生了「类型自动适配」
# dict 的 key 自动填充 prompt 模板里的 {role} / {topic} 变量
out = chain.invoke({"role": "计算机科学讲师", "topic": "什么是管道"})
print(out)        # 直接是 str,已被 parser 解析
print(type(out))  # <class 'str'>

# 整条链也是 Runnable,照样能 stream / batch
for piece in chain.stream({"role": "诗人", "topic": "递归"}):
    print(piece, end="", flush=True)

四个核心原语:透传、并行、自定义函数、条件路由

纯线性的 a | b | c 只能表达「顺序」。真实场景需要:保留原始输入、并行扇出、插入普通 Python 逻辑、按条件分流。LCEL 用四个原语覆盖这些需求。它们是搭建 RAG 与 Agent 链路的乐高积木。

# pip install langchain langchain-openai

from langchain_core.runnables import (
    RunnablePassthrough,
    RunnableParallel,
    RunnableLambda,
    RunnableBranch,
)
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser

model = ChatOpenAI(model="gpt-4o-mini", temperature=0)

# ---------- 1) RunnablePassthrough:原样透传输入 ----------
# 单独使用时,输入是什么就原样吐出来
assert RunnablePassthrough().invoke({"q": "hi"}) == {"q": "hi"}

# .assign:在保留原始 dict 的同时,并行注入新字段
# 下面把 {"name": ...} 扩展成 {"name": ..., "greeting": ...}
add_field = RunnablePassthrough.assign(
    greeting=lambda x: f"你好,{x['name']}!"
)
print(add_field.invoke({"name": "小明"}))
# -> {'name': '小明', 'greeting': '你好,小明!'}

# ---------- 2) RunnableParallel:并行扇出(dict 字面量即可) ----------
# 同一个输入同时喂给多个分支,结果汇成一个 dict
parallel = RunnableParallel(
    upper=RunnableLambda(lambda s: s.upper()),
    length=RunnableLambda(lambda s: len(s)),
)
print(parallel.invoke("hello"))
# -> {'upper': 'HELLO', 'length': 5}
# 注意:在 | 管道里,直接写一个 dict 字面量会被自动包成 RunnableParallel

# ---------- 3) RunnableLambda:把普通函数变成 Runnable ----------
def strip_quotes(text: str) -> str:
    return text.strip().strip('"').strip("「」")

clean = RunnableLambda(strip_quotes)
print(clean.invoke('  "裹了引号的文本"  '))  # -> 裹了引号的文本

# ---------- 4) RunnableBranch:条件路由 ----------
# (condition, runnable) 元组按顺序匹配,最后一个参数是默认分支
cn_prompt = ChatPromptTemplate.from_template("用中文回答:{query}")
en_prompt = ChatPromptTemplate.from_template("Answer in English: {query}")

router = RunnableBranch(
    # 含中文字符 -> 走中文分支
    (lambda x: any('\u4e00' <= c <= '\u9fff' for c in x["query"]),
     cn_prompt | model | StrOutputParser()),
    # 默认分支:英文
    en_prompt | model | StrOutputParser(),
)
print(router.invoke({"query": "什么是向量数据库?"}))  # 走中文
print(router.invoke({"query": "What is RAG?"}))            # 走英文
  1. RunnablePassthrough() —— 原样透传,常作占位;.assign(k=fn) 在保留原 dict 基础上并行追加新键,RAG 链里把检索到的 context 注入输入靠的就是它。
  2. RunnableParallel(...) —— 把同一输入扇出给多个分支并发执行,结果汇成 dict。在 | 链里直接写 dict 字面量会被自动转成它。
  3. RunnableLambda(fn) —— 把任意普通 Python 函数(清洗、格式化、调用外部 API)包成 Runnable,纳入管道。
  4. RunnableBranch((cond1, r1), (cond2, r2), default) —— 按条件路由到不同子链,类似 if/elif/else,但保持 Runnable 的可组合性。

实战:用四原语拼一条「带上下文」的微型 RAG 骨架

把前面四个原语组合起来,就能写出 RAG 问答的标准骨架。这里用一个假的 retriever 函数替代真实向量库(真实向量库见 RAG 章节),重点看数据流如何编排:检索与原始问题并行,汇成 dict,再喂给 prompt。

# pip install langchain langchain-openai

from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_core.runnables import RunnablePassthrough, RunnableLambda

model = ChatOpenAI(model="gpt-4o-mini", temperature=0)

# 假装这是向量库检索:输入 query 字符串,返回相关文档拼成的 context
KB = {
    "langchain": "LangChain 是用于构建 LLM 应用的开源框架,核心抽象是 Runnable。",
    "lcel": "LCEL 是 LangChain 表达式语言,用 | 组合 Runnable 成管道。",
}
def retrieve(query: str) -> str:
    hits = [v for k, v in KB.items() if k in query.lower()]
    return "\n".join(hits) or "(无相关资料)"

retriever = RunnableLambda(retrieve)

prompt = ChatPromptTemplate.from_template(
    "仅根据下面的资料回答问题。\n\n资料:\n{context}\n\n问题:{question}"
)

# 编排核心:用 dict 字面量做并行扇出
#   - context:把输入丢给 retriever 检索
#   - question:用 RunnablePassthrough 原样透传
# 二者汇成 {"context": ..., "question": ...} 正好对上 prompt 的两个变量
rag_chain = (
    {"context": retriever, "question": RunnablePassthrough()}
    | prompt
    | model
    | StrOutputParser()
)

print(rag_chain.invoke("什么是 lcel?"))
# 输入字符串同时流向 retriever(检索)和 passthrough(保留原问题),
# 这就是 RunnableParallel + Passthrough 的经典搭配。
推荐做法
  • 组件优先用 langchain_core.runnables 里的原语组合,让整条链保持 Runnable,从而免费获得 stream/batch/async
  • | 串管道时,先想清楚每一段的输入/输出类型(dict / str / Message),让相邻两段对得上
  • 需要并行注入字段就用 RunnablePassthrough.assign,需要并行扇出多个独立结果就用 dict 字面量(RunnableParallel)
  • 把数据清洗、格式化等普通 Python 逻辑包成 RunnableLambda 纳入管道,而不是在链外手动处理
不推荐
  • 不要再用已弃用的 LLMChain / SequentialChain 写新代码——LCEL 是 0.3.x 之后的官方推荐范式
  • 不要在 .assign 上游传非 dict 的数据,否则字段注入会失败
  • 不要把整条链拆成一堆 invoke 手动串联,那会丢掉流式与批量的统一能力
常见误区
  • 把 dict 字面量误当普通字典:在 | 管道上下文里它会被自动转成 RunnableParallel,分支会被并发执行
  • RunnableBranch 的条件函数接收的是「上游传下来的完整输入」,不是某个字段——记得自己取 key
  • stream 一条含 RunnableParallel 的链时,并行分支的 token 顺序不保证,别假设严格交错

prompt | model | parser 改成「检索 + 透传 + 注入字段」的多分支链,且全程不用 LLMChain、整条链仍可 .stream() / .batch(),即视为掌握。

组合即编排:你写的不是一串顺序调用,而是在声明一张数据从哪流向哪的图。框架负责执行这张图,并自动赋予它流式、批量、异步与可观测的能力。

— LCEL 设计哲学

现在你已经握住了 LangChain 的方向盘:Runnable 给了统一接口,LCEL 给了组合语法,四个原语给了编排能力。下一章我们把镜头拉近,逐一拆解管道里最常用的三个零件——Prompt / ChatModel / OutputParser,看清每一段到底吃什么、吐什么。