一切皆 Runnable。 Prompt 模板、ChatModel、OutputParser、Retriever、甚至你自己写的普通函数,只要纳入 LCEL,都被统一成实现了 Runnable 协议的对象。这意味着它们共享同一套方法签名,因此可以无缝相互替换、相互组合。
|不是魔法,它只是Runnable.__or__的重载——把左右两个 Runnable 拼成一个新的RunnableSequence。
Runnable 协议:四个标准方法
任何 Runnable 都暴露同一组方法。理解它们的区别,等于一次性学会了 LangChain 里所有组件的调用方式——因为它们全都遵守这套协议。
| 方法 | 用途 | 输入/输出形态 | 典型场景 |
|---|---|---|---|
invoke(input) | 单次同步调用 | 单个输入 → 单个输出 | 最常用,一次问答 |
stream(input) | 流式同步调用 | 单个输入 → 输出的生成器(逐 token) | 聊天打字机效果 |
batch(inputs) | 批量同步调用 | 输入列表 → 输出列表(并发执行) | 一次处理多条数据 |
ainvoke(input) | 单次异步调用 | 同 invoke,但是 awaitable | FastAPI / 高并发后端 |
astream / abatch | 异步流式 / 异步批量 | stream / batch 的 async 版 | 异步服务里的流式与批处理 |
# 安装(langchain 0.3.x,组件已拆分到独立包)
# pip install langchain langchain-openai
import os
os.environ["OPENAI_API_KEY"] = "sk-..." # 或用环境变量 / .env
from langchain_openai import ChatOpenAI
# ChatModel 本身就是一个 Runnable,直接享有四个标准方法
model = ChatOpenAI(model="gpt-4o-mini", temperature=0)
# 1) invoke —— 单次同步
resp = model.invoke("用一句话解释什么是 Runnable")
print(resp.content)
# 2) stream —— 逐 token 流式输出(打字机效果)
for chunk in model.stream("写一首关于管道的两行小诗"):
print(chunk.content, end="", flush=True)
print()
# 3) batch —— 批量并发,输入列表对应输出列表
results = model.batch(["中国的首都", "日本的首都", "法国的首都"])
for r in results:
print(r.content)
# 4) ainvoke —— 异步(需在 async 函数里 await)
import asyncio
async def main():
r = await model.ainvoke("异步说一句你好")
print(r.content)
asyncio.run(main())
LCEL:用 | 组合管道
LCEL 的核心动作只有一个:用 | 把若干 Runnable 串成一条 RunnableSequence。执行时,输入从左流到右,上一个组件的输出自动作为下一个组件的输入。下面是 LangChain 里最经典的「三件套」管道。
# pip install langchain langchain-openai
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
prompt = ChatPromptTemplate.from_messages([
("system", "你是一个专业的{role}。"),
("human", "请解释这个概念:{topic}"),
])
model = ChatOpenAI(model="gpt-4o-mini", temperature=0)
parser = StrOutputParser() # 把 AIMessage 抽成纯字符串
# 组合即编排:一行声明一条数据流图
chain = prompt | model | parser
# 输入一个 dict —— 这里发生了「类型自动适配」
# dict 的 key 自动填充 prompt 模板里的 {role} / {topic} 变量
out = chain.invoke({"role": "计算机科学讲师", "topic": "什么是管道"})
print(out) # 直接是 str,已被 parser 解析
print(type(out)) # <class 'str'>
# 整条链也是 Runnable,照样能 stream / batch
for piece in chain.stream({"role": "诗人", "topic": "递归"}):
print(piece, end="", flush=True)
四个核心原语:透传、并行、自定义函数、条件路由
纯线性的 a | b | c 只能表达「顺序」。真实场景需要:保留原始输入、并行扇出、插入普通 Python 逻辑、按条件分流。LCEL 用四个原语覆盖这些需求。它们是搭建 RAG 与 Agent 链路的乐高积木。
# pip install langchain langchain-openai
from langchain_core.runnables import (
RunnablePassthrough,
RunnableParallel,
RunnableLambda,
RunnableBranch,
)
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
model = ChatOpenAI(model="gpt-4o-mini", temperature=0)
# ---------- 1) RunnablePassthrough:原样透传输入 ----------
# 单独使用时,输入是什么就原样吐出来
assert RunnablePassthrough().invoke({"q": "hi"}) == {"q": "hi"}
# .assign:在保留原始 dict 的同时,并行注入新字段
# 下面把 {"name": ...} 扩展成 {"name": ..., "greeting": ...}
add_field = RunnablePassthrough.assign(
greeting=lambda x: f"你好,{x['name']}!"
)
print(add_field.invoke({"name": "小明"}))
# -> {'name': '小明', 'greeting': '你好,小明!'}
# ---------- 2) RunnableParallel:并行扇出(dict 字面量即可) ----------
# 同一个输入同时喂给多个分支,结果汇成一个 dict
parallel = RunnableParallel(
upper=RunnableLambda(lambda s: s.upper()),
length=RunnableLambda(lambda s: len(s)),
)
print(parallel.invoke("hello"))
# -> {'upper': 'HELLO', 'length': 5}
# 注意:在 | 管道里,直接写一个 dict 字面量会被自动包成 RunnableParallel
# ---------- 3) RunnableLambda:把普通函数变成 Runnable ----------
def strip_quotes(text: str) -> str:
return text.strip().strip('"').strip("「」")
clean = RunnableLambda(strip_quotes)
print(clean.invoke(' "裹了引号的文本" ')) # -> 裹了引号的文本
# ---------- 4) RunnableBranch:条件路由 ----------
# (condition, runnable) 元组按顺序匹配,最后一个参数是默认分支
cn_prompt = ChatPromptTemplate.from_template("用中文回答:{query}")
en_prompt = ChatPromptTemplate.from_template("Answer in English: {query}")
router = RunnableBranch(
# 含中文字符 -> 走中文分支
(lambda x: any('\u4e00' <= c <= '\u9fff' for c in x["query"]),
cn_prompt | model | StrOutputParser()),
# 默认分支:英文
en_prompt | model | StrOutputParser(),
)
print(router.invoke({"query": "什么是向量数据库?"})) # 走中文
print(router.invoke({"query": "What is RAG?"})) # 走英文
RunnablePassthrough()—— 原样透传,常作占位;.assign(k=fn)在保留原 dict 基础上并行追加新键,RAG 链里把检索到的 context 注入输入靠的就是它。RunnableParallel(...)—— 把同一输入扇出给多个分支并发执行,结果汇成 dict。在|链里直接写 dict 字面量会被自动转成它。RunnableLambda(fn)—— 把任意普通 Python 函数(清洗、格式化、调用外部 API)包成 Runnable,纳入管道。RunnableBranch((cond1, r1), (cond2, r2), default)—— 按条件路由到不同子链,类似 if/elif/else,但保持 Runnable 的可组合性。
实战:用四原语拼一条「带上下文」的微型 RAG 骨架
把前面四个原语组合起来,就能写出 RAG 问答的标准骨架。这里用一个假的 retriever 函数替代真实向量库(真实向量库见 RAG 章节),重点看数据流如何编排:检索与原始问题并行,汇成 dict,再喂给 prompt。
# pip install langchain langchain-openai
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_core.runnables import RunnablePassthrough, RunnableLambda
model = ChatOpenAI(model="gpt-4o-mini", temperature=0)
# 假装这是向量库检索:输入 query 字符串,返回相关文档拼成的 context
KB = {
"langchain": "LangChain 是用于构建 LLM 应用的开源框架,核心抽象是 Runnable。",
"lcel": "LCEL 是 LangChain 表达式语言,用 | 组合 Runnable 成管道。",
}
def retrieve(query: str) -> str:
hits = [v for k, v in KB.items() if k in query.lower()]
return "\n".join(hits) or "(无相关资料)"
retriever = RunnableLambda(retrieve)
prompt = ChatPromptTemplate.from_template(
"仅根据下面的资料回答问题。\n\n资料:\n{context}\n\n问题:{question}"
)
# 编排核心:用 dict 字面量做并行扇出
# - context:把输入丢给 retriever 检索
# - question:用 RunnablePassthrough 原样透传
# 二者汇成 {"context": ..., "question": ...} 正好对上 prompt 的两个变量
rag_chain = (
{"context": retriever, "question": RunnablePassthrough()}
| prompt
| model
| StrOutputParser()
)
print(rag_chain.invoke("什么是 lcel?"))
# 输入字符串同时流向 retriever(检索)和 passthrough(保留原问题),
# 这就是 RunnableParallel + Passthrough 的经典搭配。
✓推荐做法
- 组件优先用
langchain_core.runnables里的原语组合,让整条链保持 Runnable,从而免费获得 stream/batch/async - 用
|串管道时,先想清楚每一段的输入/输出类型(dict / str / Message),让相邻两段对得上 - 需要并行注入字段就用
RunnablePassthrough.assign,需要并行扇出多个独立结果就用 dict 字面量(RunnableParallel) - 把数据清洗、格式化等普通 Python 逻辑包成
RunnableLambda纳入管道,而不是在链外手动处理
✗不推荐
- 不要再用已弃用的
LLMChain/SequentialChain写新代码——LCEL 是 0.3.x 之后的官方推荐范式 - 不要在
.assign上游传非 dict 的数据,否则字段注入会失败 - 不要把整条链拆成一堆
invoke手动串联,那会丢掉流式与批量的统一能力
⚠常见误区
- 把 dict 字面量误当普通字典:在
|管道上下文里它会被自动转成 RunnableParallel,分支会被并发执行 - RunnableBranch 的条件函数接收的是「上游传下来的完整输入」,不是某个字段——记得自己取 key
- stream 一条含 RunnableParallel 的链时,并行分支的 token 顺序不保证,别假设严格交错
把 prompt | model | parser 改成「检索 + 透传 + 注入字段」的多分支链,且全程不用 LLMChain、整条链仍可 .stream() / .batch(),即视为掌握。
组合即编排:你写的不是一串顺序调用,而是在声明一张数据从哪流向哪的图。框架负责执行这张图,并自动赋予它流式、批量、异步与可观测的能力。
— LCEL 设计哲学
现在你已经握住了 LangChain 的方向盘:Runnable 给了统一接口,LCEL 给了组合语法,四个原语给了编排能力。下一章我们把镜头拉近,逐一拆解管道里最常用的三个零件——Prompt / ChatModel / OutputParser,看清每一段到底吃什么、吐什么。