Agent-to-Agent (A2A) 协议

Agent-to-Agent (A2A) 协议使 Agent 能够跨网络边界进行通信和协作。adk-rust 提供对通过 A2A 暴露 Agent 和消费远程 A2A Agent 的全面支持。

概述

A2A 在以下情况下非常有用:

  • 与第三方 Agent 服务集成
  • 使用专业 Agent 构建微服务架构
  • 实现跨语言 Agent 通信
  • 强制 Agent 系统之间的正式契约

对于简单的内部组织,请使用本地 sub-agent 而非 A2A 以获得更好的性能。

Agent 卡片

每个 A2A Agent 都会暴露一个描述其能力的 Agent 卡片。该卡片会自动生成并服务于 /.well-known/agent.json

use adk_server::a2a::build_agent_card;

let agent_card = build_agent_card(&agent, "http://localhost:8080");

println!("Agent: {}", agent_card.name);
println!("Skills: {}", agent_card.skills.len());
println!("Streaming: {}", agent_card.capabilities.streaming);

Agent 卡片包括:

  • Agent 名称和描述
  • 用于通信的 Base URL
  • 功能(streaming、状态 history 等)
  • 从 Agent 及其 sub-agent 派生的 skills

通过 A2A 暴露 Agent

要暴露一个 Agent 以便其他 Agent 可以使用它,请创建一个带有 A2A endpoints 的 HTTP 服务器:

use adk_server::{create_app_with_a2a, ServerConfig};
use adk_agent::LlmAgentBuilder;
use adk_model::gemini::GeminiModel;
use std::sync::Arc;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    // Create your agent
    let model = GeminiModel::new(&api_key, "gemini-2.5-flash")?;
    let agent = LlmAgentBuilder::new("weather_agent")
        .description("Answers weather questions")
        .model(Arc::new(model))
        .build()?;

    // Create server config
    let config = ServerConfig::new(
        Arc::new(SingleAgentLoader::new(Arc::new(agent))),
        Arc::new(InMemorySessionService::new()),
    );

    // Create app with A2A support
    let app = create_app_with_a2a(config, Some("http://localhost:8080"));

    // Serve
    let listener = tokio::net::TcpListener::bind("0.0.0.0:8080").await?;
    axum::serve(listener, app).await?;

    Ok(())
}

这暴露了:

  • GET /.well-known/agent.json - Agent 卡片
  • POST /a2a - 用于 A2A 协议的 JSON-RPC endpoint
  • POST /a2a/stream - SSE streaming endpoint

消费远程 Agent

使用 RemoteA2aAgent 与远程 A2A Agent 进行通信:

use adk_server::a2a::RemoteA2aAgent;

let remote_agent = RemoteA2aAgent::builder("prime_checker")
    .description("Checks if numbers are prime")
    .agent_url("http://localhost:8001")
    .build()?;

// Use as a sub-agent
let root_agent = LlmAgentBuilder::new("root")
    .model(Arc::new(model))
    .sub_agent(Arc::new(remote_agent))
    .build()?;

RemoteA2aAgent

  • 自动从远程 URL 获取 Agent 卡片
  • 将 ADK 事件转换为 A2A 协议消息,并反向转换
  • 处理 streaming 响应
  • 作为 sub-agent 无缝工作

A2A Client

为了与远程 Agent 直接通信,请使用 A2aClient

use adk_server::a2a::{A2aClient, Message, Part, Role};

// Create client from URL (fetches agent card)
let client = A2aClient::from_url("http://localhost:8080").await?;

// Build a message
let message = Message::builder()
    .role(Role::User)
    .parts(vec![Part::text("What's the weather?".to_string())])
    .message_id(uuid::Uuid::new_v4().to_string())
    .build();

// Send message (blocking)
let response = client.send_message(message.clone()).await?;

// Or send with streaming
let mut stream = client.send_streaming_message(message).await?;
while let Some(event) = stream.next().await {
    match event? {
        UpdateEvent::TaskArtifactUpdate(artifact) => {
            println!("Artifact: {:?}", artifact);
        }
        UpdateEvent::TaskStatusUpdate(status) => {
            println!("Status: {:?}", status.status.state);
        }
    }
}

JSON-RPC Protocol

adk-rust 使用 JSON-RPC 2.0 实现 A2A 协议。支持的方法有:

message/send

向 agent 发送消息:

{
  "jsonrpc": "2.0",
  "method": "message/send",
  "params": {
    "message": {
      "role": "user",
      "messageId": "msg-123",
      "parts": [{"text": "Hello!"}]
    }
  },
  "id": 1
}

响应包含一个带有状态和工件的 task 对象。

message/stream

message/send 相同,但返回 Server-Sent Events (SSE) 以进行流式响应。

tasks/cancel

取消正在运行的 task:

{
  "jsonrpc": "2.0",
  "method": "tasks/cancel",
  "params": {
    "taskId": "task-123"
  },
  "id": 1
}

Multi-Agent Example

结合本地和远程 agent:

// Local agent
let roll_agent = LlmAgentBuilder::new("roll_agent")
    .description("Rolls dice")
    .model(Arc::new(model.clone()))
    .tool(Arc::new(roll_die_tool))
    .build()?;

// Remote agent
let prime_agent = RemoteA2aAgent::builder("prime_agent")
    .description("Checks if numbers are prime")
    .agent_url("http://localhost:8001")
    .build()?;

// Root agent orchestrates both
let root_agent = LlmAgentBuilder::new("root_agent")
    .instruction("Delegate dice rolling to roll_agent and prime checking to prime_agent")
    .model(Arc::new(model))
    .sub_agent(Arc::new(roll_agent))
    .sub_agent(Arc::new(prime_agent))
    .build()?;

Error Handling

A2A 操作返回标准的 ADK 错误:

match client.send_message(message).await {
    Ok(response) => {
        if let Some(error) = response.error {
            eprintln!("RPC error: {} (code: {})", error.message, error.code);
        }
    }
    Err(e) => {
        eprintln!("Request failed: {}", e);
    }
}

常见错误代码:

  • -32600: 无效请求
  • -32601: 方法未找到
  • -32602: 无效参数
  • -32603: 内部错误

Best Practices

  1. 使用 agent 卡片:在通信前始终获取并验证 agent 卡片
  2. 处理流式传输:对长时间运行的操作使用流式传输
  3. 错误恢复:为网络故障实施重试逻辑
  4. 超时:为远程调用设置适当的超时
  5. 安全性:在生产环境中使用 HTTPS 并实施身份验证

Security Configuration

为生产部署配置 CORS、超时和安全头:

use adk_server::{ServerConfig, SecurityConfig};
use std::time::Duration;

// Production configuration
let config = ServerConfig::new(agent_loader, session_service)
    .with_allowed_origins(vec![
        "https://myapp.com".to_string(),
        "https://admin.myapp.com".to_string(),
    ])
    .with_request_timeout(Duration::from_secs(30))
    .with_max_body_size(10 * 1024 * 1024);  // 10MB

// Or use presets
let dev_config = ServerConfig::new(agent_loader, session_service)
    .with_security(SecurityConfig::development());  // Permissive for dev

let prod_config = ServerConfig::new(agent_loader, session_service)
    .with_security(SecurityConfig::production(allowed_origins));

安全功能包括:

  • CORS:可配置的允许来源(默认:开发环境宽松)
  • 请求超时:可配置的超时(默认:30 秒)
  • 正文大小限制:最大请求正文大小(默认:10MB)
  • 安全头:X-Content-Type-Options, X-Frame-Options, X-XSS-Protection
  • 错误净化:内部错误会被记录,但在生产环境中不会暴露给客户端

上一页: ← Server | 下一页: Evaluation →