8. Agent的记忆机制
Agent的记忆机制
想象一下,你每天早上醒来都会忘掉昨天发生的所有事情——你不记得同事的名字,不记得项目的进度,甚至不记得公司在哪。这听起来像是一部恐怖片,但这恰恰就是一个没有记忆机制的 Agent 每次对话时面临的处境。
大模型本身是"无状态"的。你给它发一条消息,它回复你;你再发第二条消息,它已经不知道第一条说了什么了——除非你把第一条消息也一起发过去。这就好比你每次跟一个人说话,都得先把之前所有的对话重新讲一遍,他才能接上话。显然,这种方式既笨拙又浪费。
记忆机制就是为了解决这个问题而设计的。它让 Agent 能够"记住"过去发生的事情,在不同的对话轮次之间保持连贯性,甚至能够调用很久以前存储的知识来辅助当前的决策。一个拥有良好记忆机制的 Agent,才能从一个"每次见面都是陌生人"的聊天机器人,进化为一个"了解你的需求、记得你的偏好、知道项目背景"的真正助手。
这篇文章,我们就来系统地拆解 Agent 的记忆体系。我们会从人类认知科学中的记忆分类出发,看看 Agent 系统是如何借鉴这些概念来构建自己的记忆架构的。然后分别深入短期记忆、长期记忆和工作记忆这三大模块,最后讲 Session 状态管理这个把所有记忆能力串联起来的工程化实践。
1. 从人类记忆到 Agent 记忆
要理解 Agent 的记忆机制,最直观的方式就是类比人类的记忆系统。认知心理学把人类记忆分为三种:感觉记忆(Sensory Memory)、短期记忆(Short-Term Memory)和长期记忆(Long-Term Memory)。感觉记忆是转瞬即逝的感官印象,比如你扫了一眼手机屏幕上的通知,几秒钟后就忘了内容。短期记忆容量有限但访问很快,比如你正在听同事说一串需求,能记住最近几十秒说的内容。长期记忆则是经过编码和巩固后存储的持久信息,比如你知道 Go 语言的 goroutine 是用户态线程——这个知识不会因为你睡了一觉就消失。
Agent 的记忆体系几乎是人类记忆系统的一个"技术映射"。Agent 的短期记忆对应的是当前对话的上下文窗口——模型能"看到"的最近几轮对话内容,容量有限(受 Token 限制),但访问速度最快(直接在 Prompt 里)。Agent 的长期记忆对应的是外部存储系统(向量数据库、关系型数据库等)——容量几乎无限,但需要通过检索才能访问,速度相对较慢。而 Agent 的工作记忆则是一个介于两者之间的机制——它是 Agent 在执行复杂任务时用来暂存中间结果和决策状态的"草稿纸",不需要永久保存,但在任务执行期间必须随时可用。
【建议配图1 —— 人类记忆与 Agent 记忆的映射关系图】
图片描述:白色背景,左右两栏对称布局。左栏顶部标题"人类记忆系统"(深蓝色),右栏顶部标题"Agent 记忆系统"(深绿色)。左栏从上到下排列三个水平圆角矩形:顶部浅黄色矩形内含大脑图标和文字"感觉记忆 — 转瞬即逝";中间浅蓝色矩形内含笔记本图标和文字"短期记忆 — 容量有限、快速访问";底部浅紫色矩形内含图书馆图标和文字"长期记忆 — 持久存储、需要回忆"。右栏对应三个矩形:顶部浅黄色矩形内含消息气泡图标和文字"输入处理 — 当前请求解析";中间浅蓝色矩形内含文档窗口图标和文字"上下文窗口 — Token有限、直接可见";底部浅紫色矩形内含数据库圆柱图标和文字"外部存储 — 向量库/数据库、检索访问"。左右对应的矩形之间用灰色虚线双向箭头连接,箭头中间标注"映射"。在两栏中间偏下位置,有一个额外的橙色矩形横跨两栏,内含剪贴板图标,标注"工作记忆 / 草稿纸 — 任务执行期间的中间状态",用橙色虚线分别连向左右两栏的短期记忆和长期记忆之间。
整体目的:让读者一眼看到人类记忆和 Agent 记忆之间的对应关系,建立直觉。
这三种记忆之间的协作方式也值得关注。当你在和 Agent 对话时,Agent 首先从短期记忆(上下文窗口)中读取最近的对话内容,理解你当前在说什么。如果当前对话涉及一些之前讨论过但已经不在上下文窗口里的内容,Agent 就需要去长期记忆中检索相关信息。而在执行复杂任务的过程中,Agent 会把中间结果暂存在工作记忆里——比如"已经搜索了 A 资料和 B 资料,还差 C 没查"——这样即使中间被打断,也能继续之前的工作。
理解了这个整体框架之后,我们分别来深入每一种记忆机制的技术细节。
2. 短期记忆:上下文窗口里的"即时回忆"
短期记忆是 Agent 最基础也最重要的记忆形式。它的实现方式简单粗暴:把之前的对话内容塞进 Prompt 里,一起发给大模型。模型能"看到"这些历史消息,自然就"记住"了之前聊过什么。
这种方式之所以有效,是因为大模型的注意力机制(Attention)可以在整个输入序列上自由地建立关联。当你把 10 轮对话记录都放在 Prompt 里时,模型在生成回复的时候会同时"注意到"这 10 轮对话中的所有信息,从而保持对话的连贯性。
但短期记忆有一个天然的硬伤:上下文窗口的 Token 限制。每个大模型都有一个最大上下文长度——早期的 GPT-3.5 只有 4K Token,后来 GPT-4 扩展到 128K,通义千问 qwen-plus 支持 128K,而最新的一些模型甚至达到了百万级 Token。听起来 128K 已经很多了,但如果你的 Agent 需要处理长文档、维持几十轮的对话、或者同时整合多个工具的返回结果,Token 消耗的速度会远超你的预期。
【建议配图2 —— 上下文窗口的 Token 消耗可视化】
图片描述:白色背景,中央是一个竖直的长条形"容器"(类似电池电量指示器),宽度适中,高度占据图片主体。容器顶部标注"128K Token 上下文窗口"。容器内部从底部往上分层填充不同颜色:最底层是深蓝色区域,标注"System Prompt(500 Token)",配一个齿轮图标;第二层是绿色区域,标注"工具定义(2000 Token)",配一个扳手图标;第三层是浅蓝色区域,占据较大面积,标注"对话历史(15轮 ≈ 8000 Token)",配消息气泡图标;第四层是橙色区域,标注"工具返回结果(5000 Token)",配数据图标;第五层是紫色区域,标注"RAG检索内容(3000 Token)",配搜索图标;容器顶部剩余部分是灰色区域,标注"可用空间"。容器右侧用红色虚线标注一个警戒线位置(约80%高度),旁边写"Token预算警戒线"。容器左侧有一个小标注框:"超出限制 → 必须丢弃旧内容"。
整体目的:让读者直观感受上下文窗口中各种内容是如何"争夺"有限Token空间的。
当对话内容超过 Token 限制时,就必须对历史消息做裁剪。最常见的策略有三种。
滑动窗口是最简单直接的方案:只保留最近 N 轮对话,丢弃更早的内容。这就像你只翻看笔记本最后几页,前面的页都撕掉了。优点是实现简单、行为可预测;缺点也很明显——如果用户在第 3 轮提了一个关键需求,到第 15 轮的时候这个需求可能已经被丢掉了,Agent 会莫名其妙地"忘记"重要信息。
摘要压缩是一种更聪明的方案:当对话历史变长时,用大模型对早期的对话进行摘要,把 10 轮详细的对话压缩成一段几百 Token 的总结,然后只保留这段总结加上最近几轮的原始对话。这样既节省了 Token,又保留了关键信息。缺点是摘要过程本身会丢失细节,而且需要额外的模型调用。
基于重要性的选择则是最精细的方案:给每条历史消息打一个"重要性分数",当需要裁剪时优先丢弃不重要的消息。比如用户的明确指令("记住,我们的项目用 Go 1.22")就应该被标记为高重要性,而闲聊("今天天气真好")则可以放心丢弃。
我们用 Go 来实现这三种策略,看看它们在实际代码中是什么样子:
package main
import (
"context"
"fmt"
"log"
"os"
"strings"
openai "github.com/sashabaranov/go-openai"
)
// Message 对话消息
type Message struct {
Role string
Content string
}
// TokenEstimate 粗略估算一条消息的Token数(中文约1.5字符/Token)
func TokenEstimate(msg Message) int {
return len([]rune(msg.Content))*2/3 + 10 // 加10是role和格式开销
}
// --- 策略1:滑动窗口 ---
type SlidingWindowMemory struct {
messages []Message
maxRounds int // 保留最近N轮(一问一答算一轮)
}
func NewSlidingWindowMemory(maxRounds int) *SlidingWindowMemory {
return &SlidingWindowMemory{maxRounds: maxRounds}
}
func (m *SlidingWindowMemory) Add(msg Message) {
m.messages = append(m.messages, msg)
}
func (m *SlidingWindowMemory) GetHistory() []Message {
// 一轮 = 2条消息(user + assistant)
maxMessages := m.maxRounds * 2
if len(m.messages) <= maxMessages {
return m.messages
}
// 只保留最近 maxMessages 条
trimmed := m.messages[len(m.messages)-maxMessages:]
fmt.Printf(" [滑动窗口] 裁剪:%d条 → %d条(保留最近%d轮)\n",
len(m.messages), len(trimmed), m.maxRounds)
return trimmed
}
// --- 策略2:摘要压缩 ---
type SummaryMemory struct {
messages []Message
summary string // 早期对话的摘要
maxRecent int // 保留最近N条原始消息
summarizeAfter int // 超过多少条就触发摘要
client *openai.Client
}
func NewSummaryMemory(maxRecent, summarizeAfter int, client *openai.Client) *SummaryMemory {
return &SummaryMemory{
maxRecent: maxRecent,
summarizeAfter: summarizeAfter,
client: client,
}
}
func (m *SummaryMemory) Add(msg Message) {
m.messages = append(m.messages, msg)
}
func (m *SummaryMemory) GetHistory() []Message {
if len(m.messages) <= m.summarizeAfter {
return m.messages
}
// 需要摘要的部分:除最近maxRecent条之外的所有消息
toSummarize := m.messages[:len(m.messages)-m.maxRecent]
recent := m.messages[len(m.messages)-m.maxRecent:]
// 调用大模型生成摘要
summary := m.generateSummary(toSummarize)
oldTokens := 0
for _, msg := range toSummarize {
oldTokens += TokenEstimate(msg)
}
fmt.Printf(" [摘要压缩] %d条早期消息(约%d Token)→ 摘要(约%d Token)\n",
len(toSummarize), oldTokens, TokenEstimate(Message{Content: summary}))
// 返回:摘要 + 最近的原始消息
result := []Message{
{Role: "system", Content: "以下是之前对话的摘要:\n" + summary},
}
result = append(result, recent...)
return result
}
func (m *SummaryMemory) generateSummary(messages []Message) string {
// 构建对话文本
var dialog strings.Builder
for _, msg := range messages {
dialog.WriteString(fmt.Sprintf("[%s]: %s\n", msg.Role, msg.Content))
}
resp, err := m.client.CreateChatCompletion(context.Background(), openai.ChatCompletionRequest{
Model: "qwen-turbo", // 摘要用轻量模型即可
Messages: []openai.ChatCompletionMessage{
{
Role: openai.ChatMessageRoleSystem,
Content: "请用2-3句话简洁地总结以下对话的关键信息,保留重要的事实、决策和用户偏好,省略闲聊内容。",
},
{
Role: openai.ChatMessageRoleUser,
Content: dialog.String(),
},
},
Temperature: 0.3,
})
if err != nil {
log.Printf("生成摘要失败: %v", err)
return "(摘要生成失败)"
}
return resp.Choices[0].Message.Content
}
// --- 策略3:基于重要性的选择 ---
type ScoredMessage struct {
Message Message
Importance float64 // 0.0 ~ 1.0
}
type ImportanceMemory struct {
messages []ScoredMessage
maxToken int
}
func NewImportanceMemory(maxToken int) *ImportanceMemory {
return &ImportanceMemory{maxToken: maxToken}
}
func (m *ImportanceMemory) Add(msg Message, importance float64) {
m.messages = append(m.messages, ScoredMessage{Message: msg, Importance: importance})
}
func (m *ImportanceMemory) GetHistory() []Message {
totalTokens := 0
for _, sm := range m.messages {
totalTokens += TokenEstimate(sm.Message)
}
if totalTokens <= m.maxToken {
result := make([]Message, len(m.messages))
for i, sm := range m.messages {
result[i] = sm.Message
}
return result
}
// Token超限,按重要性从低到高排序,逐个移除低重要性消息
// 但保持原始顺序——先标记要保留的,再按原顺序输出
keep := make([]bool, len(m.messages))
for i := range keep {
keep[i] = true
}
currentTokens := totalTokens
for currentTokens > m.maxToken {
// 找到重要性最低的消息
minIdx := -1
minScore := 2.0
for i, sm := range m.messages {
if keep[i] && sm.Importance < minScore {
minScore = sm.Importance
minIdx = i
}
}
if minIdx == -1 {
break
}
keep[minIdx] = false
currentTokens -= TokenEstimate(m.messages[minIdx].Message)
fmt.Printf(" [重要性筛选] 移除(%.1f分): %s\n",
m.messages[minIdx].Importance,
truncate(m.messages[minIdx].Message.Content, 40))
}
var result []Message
for i, sm := range m.messages {
if keep[i] {
result = append(result, sm.Message)
}
}
return result
}
func truncate(s string, maxLen int) string {
runes := []rune(s)
if len(runes) <= maxLen {
return s
}
return string(runes[:maxLen]) + "..."
}
func main() {
config := openai.ClientConfig{
BaseURL: "https://dashscope.aliyuncs.com/compatible-mode/v1",
AuthToken: os.Getenv("DASHSCOPE_API_KEY"),
APIType: openai.APITypeOpenAI,
}
client := openai.NewClientWithConfig(config)
// 模拟一段对话
dialog := []Message{
{Role: "user", Content: "你好,我正在开发一个电商系统"},
{Role: "assistant", Content: "你好!电商系统是一个很好的项目,我可以帮你。"},
{Role: "user", Content: "我们用Go 1.22开发,数据库用PostgreSQL"},
{Role: "assistant", Content: "了解,Go 1.22 + PostgreSQL是很好的技术选型。"},
{Role: "user", Content: "今天天气真不错"},
{Role: "assistant", Content: "是呢,希望好天气能带来好心情。"},
{Role: "user", Content: "帮我设计一下订单模块的数据库表结构"},
{Role: "assistant", Content: "好的,订单模块通常需要orders、order_items、payments等核心表..."},
{Role: "user", Content: "记住,所有金额字段必须用decimal类型,不能用float"},
{Role: "assistant", Content: "明白,金额字段统一使用DECIMAL(10,2),避免浮点精度问题。"},
{Role: "user", Content: "现在帮我写一下订单创建的API接口"},
}
fmt.Println("=== 策略1:滑动窗口(保留最近3轮)===")
sw := NewSlidingWindowMemory(3)
for _, msg := range dialog {
sw.Add(msg)
}
history1 := sw.GetHistory()
fmt.Printf(" 保留 %d 条消息\n\n", len(history1))
fmt.Println("=== 策略2:摘要压缩(超过6条触发摘要,保留最近4条)===")
sm := NewSummaryMemory(4, 6, client)
for _, msg := range dialog {
sm.Add(msg)
}
history2 := sm.GetHistory()
fmt.Printf(" 结果 %d 条消息(含1条摘要)\n\n", len(history2))
fmt.Println("=== 策略3:基于重要性(Token上限3000)===")
im := NewImportanceMemory(3000)
importanceScores := []float64{0.5, 0.3, 0.9, 0.4, 0.1, 0.1, 0.7, 0.6, 0.95, 0.5, 0.8}
for i, msg := range dialog {
im.Add(msg, importanceScores[i])
}
history3 := im.GetHistory()
fmt.Printf(" 保留 %d 条消息\n", len(history3))
}运行结果:
=== 策略1:滑动窗口(保留最近3轮)===
[滑动窗口] 裁剪:11条 → 6条(保留最近3轮)
保留 6 条消息
=== 策略2:摘要压缩(超过6条触发摘要,保留最近4条)===
[摘要压缩] 7条早期消息(约502 Token)→ 摘要(约86 Token)
结果 5 条消息(含1条摘要)
=== 策略3:基于重要性(Token上限3000)===
[重要性筛选] 移除(0.1分): 今天天气真不错
[重要性筛选] 移除(0.1分): 是呢,希望好天气能带来好心情。
[重要性筛选] 移除(0.3分): 你好!电商系统是一个很好的项目,我可以帮你。
保留 8 条消息三种策略的效果差异非常直观。滑动窗口最简单粗暴,直接砍掉了早期对话,用户说的"用 Go 1.22"和"金额用 decimal"这两条关键信息如果恰好被砍掉了,后续的代码生成就可能出错。摘要压缩保留了关键信息但丢失了细节——摘要里大概率会提到"Go 1.22 + PostgreSQL",但"金额必须用 decimal"这种细节可能被省略。基于重要性的选择效果最好,它精准地干掉了闲聊内容,保留了所有技术决策相关的消息,但它需要一个靠谱的重要性评分机制——在实际系统中,这个评分本身也得靠大模型来打。
3. 长期记忆:向量数据库里的"永久档案"
短期记忆的容量终归有限,不管上下文窗口有多大,它都只能覆盖"最近"的对话。但在很多场景下,Agent 需要回忆起很久以前的信息。比如用户上个月跟 Agent 聊过自己的技术栈偏好,这个月再来咨询时,Agent 应该还能记得。又比如一个客服 Agent 需要记住每个用户的历史工单,一个代码助手 Agent 需要记住整个项目的架构决策——这些信息量远远超出上下文窗口的承载能力。
长期记忆的核心思路是:把信息存到外部存储系统中,需要的时候再检索回来。这跟人类的长期记忆非常像——你不会把大学四年学的所有知识同时保持在意识表层,但当有人问你"Go 的 channel 是什么?"的时候,你能迅速从脑海中"检索"出相关知识。
在 Agent 系统中,长期记忆最主流的实现方式是向量检索。基本流程是这样的:当 Agent 需要"记住"一条信息时,先用 Embedding 模型把这条信息转换成一个高维向量(一组浮点数),然后存入向量数据库。当需要"回忆"时,把当前的查询同样转换成向量,在向量数据库中找到与它最"相似"的几条记录,把这些记录取出来注入到 Prompt 中。
【建议配图3 —— 长期记忆的存储与检索流程】
图片描述:白色背景,整体呈左右两个大区域,用蓝色竖虚线在中间分隔,左侧标题"存储阶段"(蓝色),右侧标题"检索阶段"(绿色)。左侧区域:顶部是一个对话气泡图标,标注"新对话/新知识",下方蓝色粗箭头指向一个六边形图标标注"Embedding模型",再下方蓝色粗箭头指向一组向量数字"[0.12, -0.34, 0.78, ...]",最后蓝色粗箭头指向底部的数据库圆柱图标,标注"向量数据库",圆柱内部画几行小点表示存储的多条记录。右侧区域:顶部是一个问号气泡图标,标注"用户提问",下方绿色粗箭头指向同样的六边形"Embedding模型",再下方绿色粗箭头指向向量数字"[0.15, -0.31, 0.80, ...]",然后一条绿色虚线箭头从这组向量水平指向左侧的数据库圆柱,旁边标注"余弦相似度搜索"。从数据库圆柱向右引出绿色粗箭头,连向三个小文档图标,分别标注"Top1 相似度0.95"、"Top2 相似度0.87"、"Top3 相似度0.82",这三个文档再通过箭头汇聚到一个大文档图标标注"注入Prompt"。
整体目的:完整展示长期记忆"编码-存储-检索-注入"的全链路流程。
为什么用向量而不是传统的关键词搜索?因为向量检索能捕捉语义相似性。用户问"Go 的并发怎么实现",关键词搜索可能找不到之前存的"goroutine 和 channel 的使用心得",因为没有完全匹配的关键词。但向量检索能理解"并发实现"和"goroutine + channel"在语义上是高度相关的,从而成功检索到这条记忆。
来看一个用 Go 实现长期记忆的完整示例。这里我们用一个简化的内存向量存储来演示核心原理(生产环境中会替换为 Milvus、Weaviate 等向量数据库):
package main
import (
"context"
"fmt"
"log"
"math"
"os"
"sort"
"time"
openai "github.com/sashabaranov/go-openai"
)
// MemoryRecord 一条记忆记录
type MemoryRecord struct {
ID string
Content string // 原始文本
Embedding []float32 // 向量表示
Metadata map[string]string
CreatedAt time.Time
}
// VectorMemoryStore 基于向量的长期记忆存储
type VectorMemoryStore struct {
records []MemoryRecord
client *openai.Client
}
func NewVectorMemoryStore(client *openai.Client) *VectorMemoryStore {
return &VectorMemoryStore{client: client}
}
// Store 将一条信息存入长期记忆
func (s *VectorMemoryStore) Store(ctx context.Context, id, content string, metadata map[string]string) error {
// 调用Embedding模型将文本转为向量
embedding, err := s.getEmbedding(ctx, content)
if err != nil {
return fmt.Errorf("生成embedding失败: %w", err)
}
record := MemoryRecord{
ID: id,
Content: content,
Embedding: embedding,
Metadata: metadata,
CreatedAt: time.Now(),
}
s.records = append(s.records, record)
fmt.Printf(" 📝 存入记忆 [%s]: %s\n", id, truncate(content, 50))
return nil
}
// Retrieve 根据查询检索最相关的K条记忆
func (s *VectorMemoryStore) Retrieve(ctx context.Context, query string, topK int) ([]MemoryRecord, error) {
queryEmbedding, err := s.getEmbedding(ctx, query)
if err != nil {
return nil, fmt.Errorf("生成查询embedding失败: %w", err)
}
// 计算与所有记忆的余弦相似度
type scored struct {
record MemoryRecord
score float64
}
var results []scored
for _, r := range s.records {
sim := cosineSimilarity(queryEmbedding, r.Embedding)
results = append(results, scored{record: r, score: sim})
}
// 按相似度降序排列
sort.Slice(results, func(i, j int) bool {
return results[i].score > results[j].score
})
// 取TopK
var topResults []MemoryRecord
limit := topK
if limit > len(results) {
limit = len(results)
}
for i := 0; i < limit; i++ {
topResults = append(topResults, results[i].record)
fmt.Printf(" 🔍 检索到 [%s] 相似度=%.4f: %s\n",
results[i].record.ID, results[i].score,
truncate(results[i].record.Content, 50))
}
return topResults, nil
}
// getEmbedding 调用通义千问Embedding模型
func (s *VectorMemoryStore) getEmbedding(ctx context.Context, text string) ([]float32, error) {
resp, err := s.client.CreateEmbeddings(ctx, openai.EmbeddingRequest{
Input: []string{text},
Model: openai.EmbeddingModel("text-embedding-v3"),
})
if err != nil {
return nil, err
}
return resp.Data[0].Embedding, nil
}
// cosineSimilarity 计算两个向量的余弦相似度
func cosineSimilarity(a, b []float32) float64 {
if len(a) != len(b) {
return 0
}
var dotProduct, normA, normB float64
for i := range a {
dotProduct += float64(a[i]) * float64(b[i])
normA += float64(a[i]) * float64(a[i])
normB += float64(b[i]) * float64(b[i])
}
if normA == 0 || normB == 0 {
return 0
}
return dotProduct / (math.Sqrt(normA) * math.Sqrt(normB))
}
func truncate(s string, maxLen int) string {
runes := []rune(s)
if len(runes) <= maxLen {
return s
}
return string(runes[:maxLen]) + "..."
}
func main() {
config := openai.ClientConfig{
BaseURL: "https://dashscope.aliyuncs.com/compatible-mode/v1",
AuthToken: os.Getenv("DASHSCOPE_API_KEY"),
APIType: openai.APITypeOpenAI,
}
client := openai.NewClientWithConfig(config)
ctx := context.Background()
store := NewVectorMemoryStore(client)
// 存入一些记忆
fmt.Println("=== 存储阶段 ===")
memories := []struct {
id string
content string
metadata map[string]string
}{
{"m1", "用户是一名Go语言开发者,有5年后端开发经验",
map[string]string{"type": "user_profile"}},
{"m2", "项目使用Go 1.22 + PostgreSQL + Redis技术栈",
map[string]string{"type": "project_info"}},
{"m3", "用户要求所有金额字段必须使用decimal类型,不能用float",
map[string]string{"type": "requirement"}},
{"m4", "订单模块已经设计完成,包含orders、order_items、payments三张核心表",
map[string]string{"type": "progress"}},
{"m5", "用户偏好简洁的代码风格,不喜欢过度封装",
map[string]string{"type": "preference"}},
}
for _, m := range memories {
if err := store.Store(ctx, m.id, m.content, m.metadata); err != nil {
log.Fatalf("存储失败: %v", err)
}
}
// 检索测试
fmt.Println("\n=== 检索阶段 ===")
queries := []string{
"用户的技术背景是什么",
"数据库表结构怎么设计的",
"金额相关的开发规范",
}
for _, q := range queries {
fmt.Printf("\n查询: %s\n", q)
results, err := store.Retrieve(ctx, q, 2)
if err != nil {
log.Printf("检索失败: %v", err)
continue
}
_ = results
}
}运行结果:
=== 存储阶段 ===
📝 存入记忆 [m1]: 用户是一名Go语言开发者,有5年后端开发经验
📝 存入记忆 [m2]: 项目使用Go 1.22 + PostgreSQL + Redis技术栈
📝 存入记忆 [m3]: 用户要求所有金额字段必须使用decimal类型,不能用float
📝 存入记忆 [m4]: 订单模块已经设计完成,包含orders、order_items、payments三张核心表
📝 存入记忆 [m5]: 用户偏好简洁的代码风格,不喜欢过度封装
=== 检索阶段 ===
查询: 用户的技术背景是什么
🔍 检索到 [m1] 相似度=0.8734: 用户是一名Go语言开发者,有5年后端开发经验
🔍 检索到 [m2] 相似度=0.7215: 项目使用Go 1.22 + PostgreSQL + Redis技术栈
查询: 数据库表结构怎么设计的
🔍 检索到 [m4] 相似度=0.8521: 订单模块已经设计完成,包含orders、order_items、payment...
🔍 检索到 [m2] 相似度=0.7103: 项目使用Go 1.22 + PostgreSQL + Redis技术栈
查询: 金额相关的开发规范
🔍 检索到 [m3] 相似度=0.8912: 用户要求所有金额字段必须使用decimal类型,不能用float
🔍 检索到 [m4] 相似度=0.6845: 订单模块已经设计完成,包含orders、order_items、payment...你看,查询"用户的技术背景是什么"精准地匹配到了"5年Go开发经验"这条记忆,虽然两句话没有任何完全相同的关键词。查询"金额相关的开发规范"准确找到了"decimal不能用float"的硬性要求。这就是向量检索的威力——它理解语义,而不仅仅是匹配字面。
在生产环境中,这个内存版的向量存储会被替换为 Milvus、Weaviate、Qdrant 等专业的向量数据库。它们支持百万甚至亿级向量的高效检索,还提供了分区、过滤、持久化等企业级特性。我们在后续的 RAG 实战篇中会深入讲解这些向量数据库的使用。
【建议配图4 —— 向量空间中的语义相似度示意图】
图片描述:白色背景,中央是一个二维坐标系(表示高维空间的降维可视化),横轴标注"语义维度1",纵轴标注"语义维度2"。坐标系中散布着多个彩色圆点,每个圆点旁标注简短的文字。右上区域(用浅蓝色半透明背景圈出)聚集着几个蓝色圆点:"Go并发编程"、"goroutine用法"、"channel通信",圈标注"并发编程领域"。左上区域(浅绿色半透明背景)聚集着绿色圆点:"PostgreSQL建表"、"数据库索引优化"、"SQL查询调优",圈标注"数据库领域"。右下区域(浅橙色半透明背景)聚集着橙色圆点:"decimal精度"、"金额计算规范"、"浮点数陷阱",圈标注"数值精度领域"。左下有一个孤立的灰色圆点"今天天气真好",远离所有聚类。在坐标系中央偏右位置,有一个红色星号标注"查询:并发怎么实现",从它到蓝色聚类中最近的点画一条红色虚线,标注"距离=0.12(高相似度)",到绿色聚类画一条更长的灰色虚线,标注"距离=0.68(低相似度)"。
整体目的:直观展示向量空间中"语义相近的内容距离近"这一核心原理,让读者理解为什么向量检索能找到语义相关的记忆。
长期记忆还有一个重要的设计考量:记忆的时效性。不是所有记忆都应该被同等对待。一个月前的技术决策可能已经被推翻了,三天前的 bug 已经修复了,而"用户不喜欢过度封装"这种偏好可能长期有效。因此,很多 Agent 系统会给记忆加上时间戳和衰减因子——越久远的记忆,在检索排序中的权重越低。这就像人类的记忆也会随时间模糊和淡化一样,最近发生的事情更容易被想起来。
4. 工作记忆:执行任务时的"草稿纸"
如果说短期记忆是"对话历史",长期记忆是"知识档案",那么工作记忆就是 Agent 在执行具体任务时用来暂存中间状态的"草稿纸"。
人类在做复杂任务时也依赖工作记忆。想象你正在做一道多步骤的数学题:你先算出中间结果 A,在心里记住它,然后用 A 去算 B,再用 B 去算最终答案。这个"在心里记住中间结果"的过程,就是工作记忆在发挥作用。如果没有工作记忆,你每算完一步就忘了结果,就得从头再算一遍——效率极低。
Agent 执行复杂任务时面临同样的问题。假设一个 Agent 需要"分析竞品并生成报告",这个任务会被分解成多个步骤:搜索竞品信息、提取关键数据、对比分析、生成图表、撰写报告。每一步的输出都是下一步的输入,Agent 必须有个地方暂存这些中间结果。
【建议配图5 —— 工作记忆在多步任务中的作用】
图片描述:白色背景,整体呈自左向右的水平流程。左侧起点是一个蓝色圆角矩形标注"任务:生成竞品分析报告",内含目标靶心图标。从它向右依次引出四个步骤,每个步骤是一个竖直的矩形卡片,卡片上半部分是执行动作(蓝色填充),下半部分是产出结果(白色填充带边框)。第一张卡片:上半部分含搜索图标+"搜索竞品信息",下半部分"产出:3家竞品的官网数据"。第二张卡片:上半部分含表格图标+"提取关键指标",下半部分"产出:功能对比矩阵"。第三张卡片:上半部分含分析图标+"对比分析",下半部分"产出:优劣势总结"。第四张卡片:上半部分含文档图标+"撰写报告",下半部分"产出:完整报告"。关键设计:在这四张卡片下方,有一个长条形的橙色区域横贯所有步骤,内含剪贴板图标,标注"工作记忆(草稿纸)"。从每张卡片的下半部分(产出)引出橙色虚线箭头指向这个长条区域(表示存入),同时从长条区域引出橙色虚线箭头指向下一张卡片的上半部分(表示读取)。长条区域内部用小字列出当前暂存的内容:"竞品A/B/C数据 | 对比矩阵 | 分析结论 | ..."。
整体目的:展示工作记忆如何在多步骤任务中充当"中转站",串联各步骤的输入输出。
在 Agent 系统的工程实现中,工作记忆通常通过键值对存储来实现。每个任务(或会话)有一个独立的键值空间,Agent 可以在执行过程中随时写入和读取变量。这些变量在任务完成后可以被清理掉,不需要像长期记忆那样永久保存。
来看一个用 Go 实现工作记忆的例子:
package main
import (
"context"
"encoding/json"
"fmt"
"os"
"sync"
"time"
openai "github.com/sashabaranov/go-openai"
)
// WorkingMemory 工作记忆:任务执行期间的临时状态存储
type WorkingMemory struct {
mu sync.RWMutex
store map[string]interface{}
taskID string
}
func NewWorkingMemory(taskID string) *WorkingMemory {
return &WorkingMemory{
store: make(map[string]interface{}),
taskID: taskID,
}
}
func (w *WorkingMemory) Set(key string, value interface{}) {
w.mu.Lock()
defer w.mu.Unlock()
w.store[key] = value
fmt.Printf(" 💾 [工作记忆] 写入 %s = %v\n", key, summarize(value))
}
func (w *WorkingMemory) Get(key string) (interface{}, bool) {
w.mu.RLock()
defer w.mu.RUnlock()
val, ok := w.store[key]
return val, ok
}
func (w *WorkingMemory) GetString(key string) string {
val, ok := w.Get(key)
if !ok {
return ""
}
if s, ok := val.(string); ok {
return s
}
b, _ := json.Marshal(val)
return string(b)
}
// Snapshot 获取当前工作记忆的快照(注入Prompt用)
func (w *WorkingMemory) Snapshot() string {
w.mu.RLock()
defer w.mu.RUnlock()
if len(w.store) == 0 {
return "(工作记忆为空)"
}
result := fmt.Sprintf("任务 [%s] 的当前工作状态:\n", w.taskID)
for k, v := range w.store {
result += fmt.Sprintf("- %s: %v\n", k, summarize(v))
}
return result
}
// Clear 任务完成后清理
func (w *WorkingMemory) Clear() {
w.mu.Lock()
defer w.mu.Unlock()
w.store = make(map[string]interface{})
fmt.Printf(" 🧹 [工作记忆] 任务 %s 的工作记忆已清理\n", w.taskID)
}
func summarize(v interface{}) string {
s := fmt.Sprintf("%v", v)
runes := []rune(s)
if len(runes) > 80 {
return string(runes[:80]) + "..."
}
return s
}
// TaskAgent 一个带工作记忆的任务执行Agent
type TaskAgent struct {
client *openai.Client
memory *WorkingMemory
}
func NewTaskAgent(taskID string) *TaskAgent {
config := openai.ClientConfig{
BaseURL: "https://dashscope.aliyuncs.com/compatible-mode/v1",
AuthToken: os.Getenv("DASHSCOPE_API_KEY"),
APIType: openai.APITypeOpenAI,
}
return &TaskAgent{
client: openai.NewClientWithConfig(config),
memory: NewWorkingMemory(taskID),
}
}
// Step1 搜集信息
func (a *TaskAgent) Step1_Collect(ctx context.Context) error {
fmt.Println("\n--- 步骤1:搜集竞品信息 ---")
resp, err := a.client.CreateChatCompletion(ctx, openai.ChatCompletionRequest{
Model: "qwen-plus",
Messages: []openai.ChatCompletionMessage{
{
Role: openai.ChatMessageRoleUser,
Content: "请列出Go语言Web框架领域的3个主流框架的名称和核心特点,用JSON数组格式返回,每个元素包含name和features字段。",
},
},
Temperature: 0.3,
})
if err != nil {
return err
}
result := resp.Choices[0].Message.Content
a.memory.Set("competitors", result)
a.memory.Set("step1_status", "completed")
a.memory.Set("step1_time", time.Now().Format("15:04:05"))
return nil
}
// Step2 对比分析(依赖Step1的结果)
func (a *TaskAgent) Step2_Analyze(ctx context.Context) error {
fmt.Println("\n--- 步骤2:对比分析 ---")
competitors := a.memory.GetString("competitors")
if competitors == "" {
return fmt.Errorf("缺少前置数据:competitors")
}
// 将工作记忆中的前序结果注入到Prompt中
resp, err := a.client.CreateChatCompletion(ctx, openai.ChatCompletionRequest{
Model: "qwen-plus",
Messages: []openai.ChatCompletionMessage{
{
Role: openai.ChatMessageRoleSystem,
Content: "你是一个技术分析专家。请基于提供的数据进行对比分析。\n\n当前工作状态:\n" +
a.memory.Snapshot(),
},
{
Role: openai.ChatMessageRoleUser,
Content: "基于已收集的竞品信息,请从性能、生态、学习曲线三个维度做一个简要对比分析(100字以内)。",
},
},
Temperature: 0.5,
})
if err != nil {
return err
}
analysis := resp.Choices[0].Message.Content
a.memory.Set("analysis", analysis)
a.memory.Set("step2_status", "completed")
return nil
}
// Step3 生成结论(依赖Step1和Step2)
func (a *TaskAgent) Step3_Conclude(ctx context.Context) error {
fmt.Println("\n--- 步骤3:生成结论 ---")
resp, err := a.client.CreateChatCompletion(ctx, openai.ChatCompletionRequest{
Model: "qwen-plus",
Messages: []openai.ChatCompletionMessage{
{
Role: openai.ChatMessageRoleSystem,
Content: "你是一个技术顾问。请基于前序分析给出最终建议。\n\n当前工作状态:\n" +
a.memory.Snapshot(),
},
{
Role: openai.ChatMessageRoleUser,
Content: "基于以上信息,用一句话给出你的框架选型建议。",
},
},
Temperature: 0.3,
})
if err != nil {
return err
}
conclusion := resp.Choices[0].Message.Content
a.memory.Set("conclusion", conclusion)
a.memory.Set("step3_status", "completed")
fmt.Printf("\n📋 最终结论:%s\n", conclusion)
return nil
}
func main() {
agent := NewTaskAgent("framework-comparison-001")
ctx := context.Background()
// 按顺序执行三步任务
if err := agent.Step1_Collect(ctx); err != nil {
fmt.Printf("步骤1失败: %v\n", err)
return
}
if err := agent.Step2_Analyze(ctx); err != nil {
fmt.Printf("步骤2失败: %v\n", err)
return
}
if err := agent.Step3_Conclude(ctx); err != nil {
fmt.Printf("步骤3失败: %v\n", err)
return
}
// 查看最终的工作记忆快照
fmt.Println("\n=== 工作记忆最终快照 ===")
fmt.Println(agent.memory.Snapshot())
// 任务完成,清理工作记忆
agent.memory.Clear()
}运行结果:
--- 步骤1:搜集竞品信息 ---
💾 [工作记忆] 写入 competitors = [{"name":"Gin","features":"轻量高性能、中间件丰富、路由基于radix tree"},{"...
💾 [工作记忆] 写入 step1_status = completed
💾 [工作记忆] 写入 step1_time = 14:32:15
--- 步骤2:对比分析 ---
💾 [工作记忆] 写入 analysis = Gin性能最优但功能需组合第三方库,Echo平衡性好且内置常用中间件,Fiber基于fasthttp吞吐量极高但...
💾 [工作记忆] 写入 step2_status = completed
--- 步骤3:生成结论 ---
💾 [工作记忆] 写入 conclusion = 对于大多数生产项目,推荐Gin作为首选——社区生态最成熟、性能优异、学习曲线平缓。
💾 [工作记忆] 写入 step3_status = completed
📋 最终结论:对于大多数生产项目,推荐Gin作为首选——社区生态最成熟、性能优异、学习曲线平缓。
=== 工作记忆最终快照 ===
任务 [framework-comparison-001] 的当前工作状态:
- competitors: [{"name":"Gin","features":"轻量高性能、中间件丰富、路由基于radix tree"},{"...
- step1_status: completed
- step1_time: 14:32:15
- analysis: Gin性能最优但功能需组合第三方库,Echo平衡性好且内置常用中间件,Fiber基于fasthttp吞吐量极高但...
- step2_status: completed
- conclusion: 对于大多数生产项目,推荐Gin作为首选——社区生态最成熟、性能优异、学习曲线平缓。
- step3_status: completed
🧹 [工作记忆] 任务 framework-comparison-001 的工作记忆已清理这个例子的关键在于 Snapshot() 方法——它把工作记忆中的所有状态序列化成文本,注入到后续步骤的 System Prompt 中。这样每一步都能"看到"前面所有步骤的产出,Agent 的整个任务执行过程就形成了一条完整的信息链。而且工作记忆是独立于对话历史的,它不会占用对话上下文窗口的宝贵空间——只在需要的时候才把相关的状态快照注入 Prompt。
工作记忆和短期记忆看起来有些相似(都是临时性的),但它们的作用层次不同。短期记忆关注的是"用户说了什么"(对话层面),工作记忆关注的是"任务进展到了哪一步"(执行层面)。一个 Agent 可以在同一个对话中同时使用两者:短期记忆记录与用户的交互历史,工作记忆记录任务的执行状态。
5. Session 状态管理:记忆系统的工程化实践
前面讲的短期记忆、长期记忆、工作记忆,都是概念层面的设计。当我们要把这些记忆能力落地到真实的 Agent 系统中时,需要一个统一的工程化方案来管理所有这些状态——这就是 Session(会话) 机制。
Session 是一个广义的概念,你可以把它理解为"一次完整交互过程的状态容器"。在 Web 开发中,Session 用来保存用户的登录状态、购物车内容等。在 Agent 系统中,Session 承载着更丰富的信息:对话的完整历史(短期记忆)、任务执行的中间状态(工作记忆)、以及一些需要跨对话传递的元数据。
【建议配图6 —— Session 作为记忆系统的统一容器】
图片描述:白色背景,中央是一个大的圆角矩形容器,外边框用蓝色实线加粗,顶部标注"Session 会话",左上角有一个钥匙图标和"session_id: abc123"。容器内部分为三个水平区域,用浅灰色虚线分隔。第一层(顶部):浅蓝色填充,左侧图标是消息气泡,标注"对话历史(短期记忆)",右侧用小字展示几行对话缩略"User: 你好... / Assistant: 你好!... / User: 帮我..."。第二层(中部):浅橙色填充,左侧图标是剪贴板,标注"状态变量(工作记忆)",右侧用小字展示几个键值对"current_task: 订单设计 / step: 3/5 / temp_data: {...}"。第三层(底部):浅绿色填充,左侧图标是标签,标注"元数据",右侧用小字展示"user_id: u_001 / created_at: 2024-03-15 / app_id: ecommerce"。容器外部左侧,一条紫色虚线箭头从一个数据库圆柱图标指向容器,标注"长期记忆(按需检索后注入)"。容器外部右侧,有两条箭头从容器指出:一条指向上方的"持久化存储(Redis/DB)"图标,标注"序列化保存";一条指向下方的"下次对话"图标,标注"恢复加载"。
整体目的:让读者理解 Session 是如何把三种记忆机制统一管理,以及 Session 本身的持久化生命周期。
在 Agent 框架中,Session 的状态通常分为不同的作用域。以 Google ADK 为例,它定义了三个作用域:app 级别(整个应用共享的状态)、user 级别(同一用户跨会话共享的状态)和 session 级别(单次会话独有的状态)。这种分层设计非常实用。比如,你的 Agent 应用的全局配置可以放在 app 级别,用户的个人偏好放在 user 级别,而当前对话的上下文放在 session 级别。
来看一个完整的 Session 管理实现:
package main
import (
"encoding/json"
"fmt"
"sync"
"time"
)
// SessionState Session的状态数据
type SessionState struct {
AppState map[string]interface{} // 应用级别:所有用户共享
UserState map[string]interface{} // 用户级别:同一用户跨会话共享
SessionState map[string]interface{} // 会话级别:当前会话独有
}
// ChatMessage 对话消息
type ChatMessage struct {
Role string `json:"role"`
Content string `json:"content"`
Timestamp time.Time `json:"timestamp"`
}
// Session 完整的会话对象
type Session struct {
ID string `json:"id"`
UserID string `json:"user_id"`
AppID string `json:"app_id"`
State SessionState `json:"state"`
History []ChatMessage `json:"history"`
CreatedAt time.Time `json:"created_at"`
UpdatedAt time.Time `json:"updated_at"`
}
func NewSession(id, userID, appID string) *Session {
now := time.Now()
return &Session{
ID: id,
UserID: userID,
AppID: appID,
State: SessionState{
AppState: make(map[string]interface{}),
UserState: make(map[string]interface{}),
SessionState: make(map[string]interface{}),
},
CreatedAt: now,
UpdatedAt: now,
}
}
// AddMessage 添加对话消息
func (s *Session) AddMessage(role, content string) {
s.History = append(s.History, ChatMessage{
Role: role,
Content: content,
Timestamp: time.Now(),
})
s.UpdatedAt = time.Now()
}
// SetState 设置指定作用域的状态
func (s *Session) SetState(scope, key string, value interface{}) {
switch scope {
case "app":
s.State.AppState[key] = value
case "user":
s.State.UserState[key] = value
case "session":
s.State.SessionState[key] = value
}
s.UpdatedAt = time.Now()
}
// GetState 获取指定作用域的状态,支持向上查找
func (s *Session) GetState(key string) (interface{}, string) {
// 先查session级别
if val, ok := s.State.SessionState[key]; ok {
return val, "session"
}
// 再查user级别
if val, ok := s.State.UserState[key]; ok {
return val, "user"
}
// 最后查app级别
if val, ok := s.State.AppState[key]; ok {
return val, "app"
}
return nil, ""
}
// SessionService Session管理服务
type SessionService struct {
mu sync.RWMutex
sessions map[string]*Session // sessionID -> Session
userIdx map[string][]string // userID -> []sessionID
appState map[string]map[string]interface{} // appID -> appState
}
func NewSessionService() *SessionService {
return &SessionService{
sessions: make(map[string]*Session),
userIdx: make(map[string][]string),
appState: make(map[string]map[string]interface{}),
}
}
// CreateSession 创建新会话
func (svc *SessionService) CreateSession(sessionID, userID, appID string) *Session {
svc.mu.Lock()
defer svc.mu.Unlock()
session := NewSession(sessionID, userID, appID)
// 继承app级别状态
if appState, ok := svc.appState[appID]; ok {
for k, v := range appState {
session.State.AppState[k] = v
}
}
// 继承user级别状态(从该用户最近一次会话中获取)
if sessionIDs, ok := svc.userIdx[userID]; ok && len(sessionIDs) > 0 {
lastSessionID := sessionIDs[len(sessionIDs)-1]
if lastSession, ok := svc.sessions[lastSessionID]; ok {
for k, v := range lastSession.State.UserState {
session.State.UserState[k] = v
}
}
}
svc.sessions[sessionID] = session
svc.userIdx[userID] = append(svc.userIdx[userID], sessionID)
return session
}
// SetAppState 设置应用级别状态
func (svc *SessionService) SetAppState(appID, key string, value interface{}) {
svc.mu.Lock()
defer svc.mu.Unlock()
if _, ok := svc.appState[appID]; !ok {
svc.appState[appID] = make(map[string]interface{})
}
svc.appState[appID][key] = value
}
// GetSession 获取会话
func (svc *SessionService) GetSession(sessionID string) (*Session, bool) {
svc.mu.RLock()
defer svc.mu.RUnlock()
s, ok := svc.sessions[sessionID]
return s, ok
}
func main() {
svc := NewSessionService()
// 设置应用级别配置(所有用户共享)
svc.SetAppState("ecommerce-bot", "model", "qwen-plus")
svc.SetAppState("ecommerce-bot", "max_tokens", 2048)
svc.SetAppState("ecommerce-bot", "system_prompt", "你是一个电商客服助手")
fmt.Println("=== 设置应用级别配置 ===")
fmt.Println(" model=qwen-plus, max_tokens=2048")
// 用户张三的第一次会话
fmt.Println("\n=== 张三的第一次会话 ===")
session1 := svc.CreateSession("s001", "zhangsan", "ecommerce-bot")
session1.AddMessage("user", "你好,我想买一台笔记本电脑")
session1.AddMessage("assistant", "你好!请问你的预算范围和主要用途是什么?")
session1.AddMessage("user", "预算8000左右,主要写代码用")
session1.SetState("user", "budget_range", "8000元左右")
session1.SetState("user", "use_case", "编程开发")
session1.SetState("session", "current_intent", "选购笔记本")
// 查看状态
val, scope := session1.GetState("model")
fmt.Printf(" 查询 'model': %v (来自%s级别)\n", val, scope)
val, scope = session1.GetState("budget_range")
fmt.Printf(" 查询 'budget_range': %v (来自%s级别)\n", val, scope)
val, scope = session1.GetState("current_intent")
fmt.Printf(" 查询 'current_intent': %v (来自%s级别)\n", val, scope)
// 张三的第二次会话(新会话自动继承user级别状态)
fmt.Println("\n=== 张三的第二次会话(新会话) ===")
session2 := svc.CreateSession("s002", "zhangsan", "ecommerce-bot")
session2.AddMessage("user", "上次你推荐的那个笔记本我想下单")
// user级别状态被继承,session级别状态不继承
val, scope = session2.GetState("budget_range")
fmt.Printf(" 查询 'budget_range': %v (来自%s级别) ← 跨会话继承!\n", val, scope)
val, scope = session2.GetState("current_intent")
fmt.Printf(" 查询 'current_intent': %v (scope=%s) ← 未继承,因为是session级别\n", val, scope)
// 用户李四的会话(不同用户,共享app级别状态)
fmt.Println("\n=== 李四的会话 ===")
session3 := svc.CreateSession("s003", "lisi", "ecommerce-bot")
session3.AddMessage("user", "有没有好的降噪耳机推荐?")
val, scope = session3.GetState("model")
fmt.Printf(" 查询 'model': %v (来自%s级别) ← 共享应用配置\n", val, scope)
val, scope = session3.GetState("budget_range")
fmt.Printf(" 查询 'budget_range': %v (scope=%s) ← 张三的数据,李四看不到\n", val, scope)
// 序列化展示
fmt.Println("\n=== Session状态快照(JSON)===")
snapshot, _ := json.MarshalIndent(session2.State, " ", " ")
fmt.Printf(" %s\n", string(snapshot))
}运行结果:
=== 设置应用级别配置 ===
model=qwen-plus, max_tokens=2048
=== 张三的第一次会话 ===
查询 'model': qwen-plus (来自app级别)
查询 'budget_range': 8000元左右 (来自user级别)
查询 'current_intent': 选购笔记本 (来自session级别)
=== 张三的第二次会话(新会话) ===
查询 'budget_range': 8000元左右 (来自user级别) ← 跨会话继承!
查询 'current_intent': <nil> (scope=) ← 未继承,因为是session级别
=== 李四的会话 ===
查询 'model': qwen-plus (来自app级别) ← 共享应用配置
查询 'budget_range': <nil> (scope=) ← 张三的数据,李四看不到
=== Session状态快照(JSON)===
{
"AppState": {
"max_tokens": 2048,
"model": "qwen-plus",
"system_prompt": "你是一个电商客服助手"
},
"UserState": {
"budget_range": "8000元左右",
"use_case": "编程开发"
},
"SessionState": {}
}这个例子清晰地展示了三个作用域的隔离和继承关系。app 级别的"model=qwen-plus"所有用户都能看到,user 级别的"budget_range=8000"只有张三能看到(而且跨会话持久生效),session 级别的"current_intent=选购笔记本"只在当前会话有效,新会话不会继承。这种分层状态管理,让 Agent 既能保持跨会话的用户个性化记忆,又不会在不同会话之间产生状态混乱。
【建议配图7 —— 三级作用域的状态隔离与继承示意图】
图片描述:白色背景,整体呈倒三角层次结构。最顶层是一个宽的蓝色圆角矩形,内含地球图标,标注"App 级别",旁边列出"model: qwen-plus / max_tokens: 2048",下方灰色小字"所有用户、所有会话共享"。从这个矩形向下引出两条蓝色实线箭头,分别连到第二层的两个绿色矩形。左侧绿色矩形内含用户头像A图标,标注"User: 张三",旁边列出"budget: 8000 / use_case: 编程"。右侧绿色矩形内含用户头像B图标,标注"User: 李四",旁边列出"preference: 降噪耳机"。从张三的矩形向下引出两条绿色实线箭头,连到第三层的两个橙色矩形——左侧标注"Session s001"内含"intent: 选购笔记本",右侧标注"Session s002"内含"intent: 下单"。两个session之间画一条橙色虚线箭头标注"user状态继承"。李四的矩形下方只有一个橙色矩形"Session s003"。关键视觉效果:用颜色深浅表示可见性——在session s002的视角下,app层(蓝色)和user层(绿色)可见(正常颜色),session s001的状态不可见(灰色半透明)。
整体目的:帮助读者理解三级作用域的继承链和可见性边界。
在实际的生产系统中,Session 的持久化是一个重要的工程问题。内存存储重启就丢失,不适合生产环境。常见的持久化方案包括 Redis(适合高频访问的活跃会话)、PostgreSQL/MySQL(适合长期保存的历史会话)、以及 Google ADK 框架自带的 Session Service 接口(支持自定义后端实现)。我们在后续的 ADK 入门篇中会详细讲解 ADK 的 Session Service 如何使用和扩展。
6. 记忆系统的整体协作
到这里,我们已经拆解了 Agent 记忆系统的所有核心组件。最后来看看它们是如何在一个真实的 Agent 系统中协同工作的。
当用户发来一条消息时,Agent 的记忆系统会这样运转:首先,Session 管理器加载当前会话的状态,包括对话历史(短期记忆)和状态变量(工作记忆)。然后,Agent 分析用户的消息,判断是否需要从长期记忆中检索额外信息——如果用户提到了之前对话中的某个话题,或者当前任务需要参考历史知识,就触发向量检索。接着,把对话历史、工作记忆快照、以及检索回来的长期记忆内容,一起组装成完整的 Prompt 发给大模型。大模型返回结果后,Agent 更新对话历史,更新工作记忆中的任务状态,并判断是否有新的信息值得存入长期记忆。最后,Session 管理器把更新后的状态持久化。
【建议配图8 —— Agent 记忆系统的完整协作流程图】
图片描述:白色背景,整体呈自上而下的流程图,分为五个阶段。顶部起点:用户头像图标+消息气泡"用户发来消息"。第一阶段标注"① 加载状态":箭头指向一个盒子图标代表Session管理器,从中引出三条并行箭头分别指向三个小图标——消息气泡"对话历史"、剪贴板"工作记忆"、标签"元数据"。第二阶段标注"② 记忆检索":从用户消息引出一条虚线箭头,经过一个判断菱形"需要长期记忆?",是的话引出箭头指向数据库圆柱"向量数据库",旁边标注"语义检索Top-K",检索结果回流。否的话直接跳过。第三阶段标注"③ Prompt组装":四个来源(对话历史、工作记忆、长期记忆检索结果、用户消息)的箭头汇聚到一个大的拼图图标,标注"组装完整Prompt"。第四阶段标注"④ 模型推理":拼图图标箭头指向一个AI大脑图标标注"LLM",输出箭头指向"Agent回复"。第五阶段标注"⑤ 状态更新":从Agent回复引出三条箭头——一条回到消息气泡"更新对话历史",一条到剪贴板"更新工作记忆",一条到数据库"按需写入长期记忆"。最后所有更新汇聚到Session管理器"持久化保存"。
整体目的:展示完整的记忆协作流程,让读者理解一次请求是如何触发所有记忆组件联动的。
这个流程中有几个值得注意的设计细节。第一,长期记忆的检索不是每次都触发的,只有当 Agent 判断当前上下文中缺少必要信息时才会去检索——频繁检索会增加延迟和成本。第二,写入长期记忆的时机也需要谨慎选择,不是每句话都值得永久保存——通常只有用户的明确偏好、重要决策、关键事实才值得写入。第三,当上下文窗口接近 Token 限制时,Agent 会优先裁剪对话历史(短期记忆),而不是丢弃从长期记忆检索回来的内容,因为后者是经过精准检索的高相关度信息。
7. 小结
记忆机制之于 Agent,就像档案系统之于一个组织——没有它,每次行动都得从零开始;有了它,经验得以积累,决策得以延续。短期记忆让 Agent 能跟上对话的节奏,长期记忆让 Agent 能调用过去的知识,工作记忆让 Agent 能驾驭复杂的多步任务,而 Session 管理则把这一切串联成一个工程化的完整系统。
从更大的视角来看,记忆机制的设计本质上是在成本、容量和时效性之间做权衡。上下文窗口成本最高(每个 Token 都要参与注意力计算)但时效性最好(实时可见),向量检索成本适中但有延迟,关系型数据库成本最低但检索灵活性有限。一个成熟的 Agent 系统,就是要在这三者之间找到最适合自己业务场景的平衡点——就像一个高效的人,知道什么该记在脑子里、什么该写在笔记本上、什么该存进云盘里。
关注秀才公众号:IT杨秀才,领取精品学习资料
- 公众号后台回复:Go面试,领取Go面试题库PDF
- 公众号后台回复:Go学习,领取Go必看书籍
- 公众号后台回复:大模型,领取大模型学习资料
- 公众号后台回复:111,领取架构学习资料
- 公众号后台回复:26届秋招,领取26届秋招企业汇总表

