事件

Event是adk-rust中对话历史的基本构建块。与Agent的每次交互——无论是用户消息、Agent响应还是Tool执行——都记录为EventEvent形成了一个不可变日志,捕获了Agent``Session的完整执行轨迹。

概述

Event系统有几个关键用途:

  • 对话历史Event形成Session中所有交互的时间顺序记录
  • 状态管理Event通过state_delta字段承载状态变化
  • Artifact 追踪Event通过artifact_delta字段记录Artifact操作
  • Agent 协调Event支持Agent的转移和升级
  • 调试和可观测性Event提供了Agent行为的完整审计跟踪

Event 结构

Event代表对话中的一次单一交互。adk-rust使用统一的Event类型,该类型嵌入了LlmResponse,与ADK-Go中使用的设计模式相匹配:

pub struct Event {
    pub id: String,                    // 唯一事件标识符 (UUID)
    pub timestamp: DateTime<Utc>,      // 事件发生时间
    pub invocation_id: String,         // 在单个调用中关联事件
    pub branch: String,                // 用于未来分支支持
    pub author: String,                // 创建此事件的作者(用户、Agent 名称、Tool 名称)
    pub llm_response: LlmResponse,     // 包含内容和 LLM 元数据
    pub actions: EventActions,         // 副作用和元数据
    pub long_running_tool_ids: Vec<String>,  // 长期运行的 Tool ID
}

LlmResponse结构包含:

pub struct LlmResponse {
    pub content: Option<Content>,      // 消息内容(文本、Part 等)
    pub usage_metadata: Option<UsageMetadata>,
    pub finish_reason: Option<FinishReason>,
    pub partial: bool,                 // 对于流式传输的部分响应为 true
    pub turn_complete: bool,           // 当轮次完成时为 true
    pub interrupted: bool,             // 如果生成被中断则为 true
    pub error_code: Option<String>,
    pub error_message: Option<String>,
}

Content通过event.llm_response.content一致地访问:

if let Some(content) = &event.llm_response.content {
    for part in &content.parts {
        if let Part::Text { text } = part {
            println!("{}", text);
        }
    }
}

关键字段

  • id:标识此特定事件的唯一 UUID。用于事件检索和排序。

  • timestampEvent创建时的 UTC 时间戳。EventSession中按时间顺序排列。

  • invocation_id:将属于相同Agent调用的Event分组。当Agent处理消息时,所有生成的EventAgent响应、Tool调用、子Agent调用)都共享相同的invocation_id

  • branch:保留用于未来的分支功能。目前未使用,但允许在未来版本中进行对话分支。

  • author:标识Event的创建者:

    • 用户消息:通常是“user”或用户标识符
    • Agent响应:Agent的名称
    • Tool执行:Tool的名称
    • 系统事件:“system”
  • llm_response:包含消息Content和 LLM 元数据。通过event.llm_response.content访问ContentContent类型可以包含文本、多模态Part(图像、音频)或结构化数据。某些Event(例如纯状态更新)可能具有content: None

EventActions

EventActions 结构包含与事件相关的元数据和副作用:

pub struct EventActions {
    pub state_delta: HashMap<String, Value>,    // 要应用的状态更改
    pub artifact_delta: HashMap<String, i64>,   // 工件版本更改
    pub skip_summarization: bool,               // 在摘要中跳过此事件
    pub transfer_to_agent: Option<String>,      // 将控制权转移给另一个 agent
    pub escalate: bool,                         // 升级到人工或 supervisor
}

state_delta

state_delta 字段包含表示会话状态更改的键值对。当一个事件被追加到会话中时,这些更改会被合并到会话的状态中。

状态键可以使用前缀来控制范围:

  • app:key - 应用程序范围的状态(所有用户共享)
  • user:key - 用户范围的状态(用户的所有会话共享)
  • temp:key - 临时状态(在调用之间清除)
  • 无前缀 - 会话范围的状态(默认)

示例:

let mut actions = EventActions::default();
actions.state_delta.insert("user_name".to_string(), json!("Alice"));
actions.state_delta.insert("temp:current_step".to_string(), json!(3));

artifact_delta

artifact_delta 字段跟踪对工件的更改。键是工件名称,值是版本号。这允许系统跟踪在事件期间创建或修改了哪些工件。

示例:

actions.artifact_delta.insert("report.pdf".to_string(), 1);
actions.artifact_delta.insert("chart.png".to_string(), 2);

skip_summarization

当为 true 时,此事件将从会话摘要中排除。这对于不应作为主对话流程一部分的内部事件、调试信息或冗长的 Tool 输出很有用。

transfer_to_agent

当设置为某个 agent 名称时,控制权将转移给该 agent。这使得多 agent 工作流成为可能,其中一个 agent 可以将任务移交给另一个 agent。目标 agent 必须配置为子 agent。

示例:

actions.transfer_to_agent = Some("specialist_agent".to_string());

escalate

当为 true 时,表示对话应升级给人工操作员或 supervisor agent。具体的升级行为取决于您应用程序的实现。

对话历史形成

事件通过在会话中按时间顺序累积来形成对话历史。当一个 agent 处理一个请求时:

  1. 用户消息事件:创建一个包含用户输入的新事件
  2. Agent 处理:agent 接收对话历史(所有之前的事件)
  3. Agent 响应事件:agent 的响应被记录为一个新事件
  4. Tool 执行事件:每个 Tool 调用都可能生成额外的事件
  5. 状态更新:来自所有事件的状态增量被合并到会话状态中

对话历史通过以下方式构建:

  • 按时间顺序检索会话中的所有事件
  • 将每个事件的内容转换为适合 LlmAgent 的格式
  • 包含来自累积状态增量的状态信息
  • 在适当时候过滤掉标记为 skip_summarization 的事件

事件流示例

Session Start
  ↓
[Event 1] User: "What's the weather in Tokyo?"
  ↓
[Event 2] Agent: "Let me check that for you."
  ↓
[Event 3] Tool (weather_api): {"temp": 22, "condition": "sunny"}
  ↓
[Event 4] Agent: "It's 22°C and sunny in Tokyo."
  ↓
Session State Updated

每个事件都建立在先前的事件之上,创建了一个完整的对话审计跟踪。

处理事件

从会话中访问事件

use adk_rust::session::{SessionService, GetRequest};

// 检索一个会话及其事件
let session = session_service.get(GetRequest {
    app_name: "my_app".to_string(),
    user_id: "user_123".to_string(),
    session_id: session_id.clone(),
    num_recent_events: None,  // 获取所有事件
    after: None,
}).await?;

// 访问事件
let events = session.events();
println!("Total events: {}", events.len());

// 遍历事件(注意:会话事件使用 llm_response.content)
for i in 0..events.len() {
    if let Some(event) = events.at(i) {
        println!("Event {}: {} by {} at {}",
            event.id,
            event.llm_response.content.as_ref().map(|_| "has content").unwrap_or("no content"),
            event.author,
            event.timestamp
        );
    }
}

检查事件详情

// 从会话中获取特定事件(使用 llm_response.content)
if let Some(event) = events.at(0) {
    // 检查作者
    println!("Author: {}", event.author);

    // 检查内容(会话事件使用 llm_response.content)
    if let Some(content) = &event.llm_response.content {
        for part in &content.parts {
            if let Part::Text { text } = part {
                println!("Text: {}", text);
            }
        }
    }

    // 检查状态变化
    if !event.actions.state_delta.is_empty() {
        println!("State changes:");
        for (key, value) in &event.actions.state_delta {
            println!("  {} = {}", key, value);
        }
    }

    // 检查 Agent 转移
    if let Some(target) = &event.actions.transfer_to_agent {
        println!("Transfers to: {}", target);
    }

    // 检查工件
    if !event.actions.artifact_delta.is_empty() {
        println!("Artifacts modified:");
        for (name, version) in &event.actions.artifact_delta {
            println!("  {} (v{})", name, version);
        }
    }
}

限制事件历史记录

对于长时间的对话,您可能只想检索最近的事件:

// 只获取最近的 10 个事件
let session = session_service.get(GetRequest {
    app_name: "my_app".to_string(),
    user_id: "user_123".to_string(),
    session_id: session_id.clone(),
    num_recent_events: Some(10),
    after: None,
}).await?;

如何流转事件:生成与处理

理解事件的创建和处理方式有助于阐明框架如何管理操作和历史记录。

生成来源

事件在 Agent 执行生命周期的不同点创建:

  1. 用户输入Runner 将用户消息封装成一个 Event,其中 author = "user"
  2. Agent 响应Agent 生成 Event 对象(设置 author = agent.name())以传递响应
  3. LLM 输出:模型集成层将 LLM 输出(文本、函数调用)转换为 Event 对象
  4. Tool 结果Tool 执行后,框架会生成一个包含 Tool 响应的 Event

处理流程

当一个事件生成后,它遵循以下处理路径:

  1. 生成:事件由其来源(AgentTool 或用户输入处理器)创建并生成
  2. Runner 接收:执行 AgentRunner 接收事件
  3. SessionService 处理Runner 将事件发送到 SessionServiceSessionService 执行以下操作:
    • 应用增量:将 state_delta 合并到会话状态并更新工件记录
    • 确定元数据:如果不存在,则分配唯一的 id,设置 timestamp
    • 持久化到历史记录:将事件追加到 session.events
  4. 流输出Runner 将处理后的事件生成到调用应用程序

此流程确保状态更改和历史记录与通信内容一同被一致地记录。

// 概念性流程
User Input → Runner → Agent → LLM → Event Generated
                                         ↓
                                   SessionService
                                   - Apply state_delta
                                   - Record in history
                                         ↓
                                   Event Stream → Application

识别事件类型

当处理来自 Runner 的事件时,您需要识别正在处理的事件类型:

按 Author

author 字段告诉您谁创建了事件:

match event.author.as_str() {
    "user" => println!("用户输入"),
    agent_name => println!("来自 Agent 的响应: {}", agent_name),
}

按 Content 类型

检查 llm_response.content 字段以确定有效载荷类型:

if let Some(content) = &event.llm_response.content {
    // 检查文本内容
    let has_text = content.parts.iter().any(|part| {
        matches!(part, Part::Text { .. })
    });

    // 检查函数调用(Tool 请求)
    let has_function_call = content.parts.iter().any(|part| {
        matches!(part, Part::FunctionCall { .. })
    });

    // 检查函数响应(Tool 结果)
    let has_function_response = content.parts.iter().any(|part| {
        matches!(part, Part::FunctionResponse { .. })
    });

    if has_text {
        println!("文本消息");
    } else if has_function_call {
        println!("Tool 调用请求");
    } else if has_function_response {
        println!("Tool 结果");
    }
}

按 Actions

检查 actions 字段以获取控制信号和副作用:

// 状态变更
if !event.actions.state_delta.is_empty() {
    println!("事件包含状态变更");
}

// Agent 转移
if let Some(target) = &event.actions.transfer_to_agent {
    println!("转移到 Agent: {}", target);
}

// 升级信号
if event.actions.escalate {
    println!("请求升级");
}

// 跳过摘要
if event.actions.skip_summarization {
    println!("在摘要中跳过此事件");
}

处理事件流

当运行一个 agent 时,您会收到一个事件流。以下是有效处理它们的方法:

从 Runner 处理事件

use futures::StreamExt;

let mut stream = runner.run(
    "user_123".to_string(),
    "session_id".to_string(),
    user_input,
).await?;

while let Some(event_result) = stream.next().await {
    match event_result {
        Ok(event) => {
            // Process the event
            println!("Event from: {}", event.author);

            // Extract text content
            if let Some(content) = &event.llm_response.content {
                for part in &content.parts {
                    if let Part::Text { text } = part {
                        print!("{}", text);
                    }
                }
            }

            // Check for state changes
            if !event.actions.state_delta.is_empty() {
                println!("\nState updated: {:?}", event.actions.state_delta);
            }
        }
        Err(e) => {
            eprintln!("Error: {}", e);
            break;
        }
    }
}

提取函数调用

当 LLM 请求一个 tool 时,事件中包含函数调用信息:

if let Some(content) = &event.llm_response.content {
    for part in &content.parts {
        if let Part::FunctionCall { name, args } = part {
            println!("Tool requested: {}", name);
            println!("Arguments: {}", args);

            // 您的应用程序可以在此处根据 tool 名称和参数分派 tool 执行
        }
    }
}

提取函数响应

tool 执行后,结果会在函数响应中返回:

if let Some(content) = &event.llm_response.content {
    for part in &content.parts {
        if let Part::FunctionResponse { function_response, .. } = part {
            println!("Tool result from: {}", function_response.name);
            println!("Response: {}", function_response.response);

            // 处理 tool 结果
            // LLM 将使用此结果继续对话
        }
    }
}

常见事件模式

以下是您会遇到的典型事件序列:

简单文本交换

[事件 1] author="user", content=Text("你好")
[事件 2] author="assistant", content=Text("你好!有什么我可以帮忙的吗?")

Tool 使用流程

[事件 1] author="user", content=Text("天气怎么样?")
[事件 2] author="assistant", content=FunctionCall(name="get_weather", args={...})
[事件 3] author="assistant", content=FunctionResponse(name="get_weather", response={...})
[事件 4] author="assistant", content=Text("晴天,72°F")

状态更新

[事件 1] author="assistant", content=Text("我已保存您的偏好设置")
           actions.state_delta={"user_theme": "dark"}

Agent 转移

[事件 1] author="router", content=Text("正在转接给专家")
           actions.transfer_to_agent=Some("specialist_agent")
[事件 2] author="specialist_agent", content=Text("我能帮您解决")

事件元数据和标识符

Event ID

每个事件都有一个唯一的 id (UUID) 用于精确标识:

println!("Event ID: {}", event.id);

Invocation ID

invocation_id 将从单个用户请求到最终响应的所有事件分组在一起:

// 同一次交互中的所有事件共享相同的 invocation_id
println!("Invocation: {}", event.invocation_id);

// 将其用于日志和追踪
log::info!("Processing event {} in invocation {}", event.id, event.invocation_id);

Timestamp

事件会加上时间戳,以便按时间顺序排列:

println!("Event occurred at: {}", event.timestamp.format("%Y-%m-%d %H:%M:%S"));

最佳实践

  1. 事件不可变性: 事件创建后不应被修改。它们构成一个不可变的审计日志。

  2. 状态管理: 使用 state_delta 进行所有状态更改,而不是直接修改状态。这确保了更改在事件日志中被跟踪。

  3. 有意义的作者: 设置清晰、描述性的作者名称,使事件日志更易于理解。

  4. 选择性摘要: 对冗长或内部事件使用 skip_summarization,以避免它们混淆会话历史记录。

  5. 调用分组: 在单个 agent 调用期间生成的所有事件保持相同的 invocation_id,以维持逻辑分组。

  6. Artifact 跟踪: 在创建或修改 artifacts 时始终更新 artifact_delta 以保持一致性。

  7. 流处理: 在处理事件流时始终处理错误。事件可能由于 LLM 错误、tool 失败或网络问题而失败。

  8. 内容检查: 在访问部分内容之前,始终检查 llm_response.content 是否为 Some。某些事件(例如纯状态更新)可能没有内容。

  9. 模式匹配: 使用 Rust 的模式匹配来优雅地处理不同的事件类型和内容部分。

  10. 日志记录: 使用 invocation_id 关联单个用户交互中的所有事件,以便进行调试和可观测性。

相关文档


上一个: ← Artifacts | 下一个: Telemetry →