グラフエージェント
ネイティブな ADK-Rust 統合により、LangGraph スタイルのオーケストレーションを使用して複雑なステートフルワークフローを構築します。
概要
GraphAgent を使用すると、ワークフローをノードとエッジを持つ有向グラフとして定義でき、以下をサポートします。
- AgentNode: LLM agent をカスタムの入出力マッパーを持つグラフノードとしてラップします
- 循環ワークフロー: ループと反復推論 (ReAct pattern) のネイティブサポート
- 条件付きルーティング: 状態に基づいた動的なエッジルーティング
- 状態管理: リデューサー (overwrite, append, sum, custom) を伴う型付きの状態
- Checkpointing: フォールトトレランスとヒューマン・イン・ザ・ループのための永続的な状態
- Streaming: 複数のストリームモード (values, updates, messages, debug)
adk-graph crate は、複雑なステートフル agent ワークフローを構築するための LangGraph スタイルのワークフローオーケストレーションを提供します。これは、ADK の agent system との完全な互換性を維持しながら、ADK-Rust エコシステムにグラフベースのワークフロー機能をもたらします。
主な利点:
- 視覚的なワークフロー設計: 複雑なロジックを直感的なノードとエッジのグラフとして定義します
- 並列実行: 複数のノードを同時に実行してパフォーマンスを向上させることができます
- 状態の永続化: フォールトトレランスとヒューマン・イン・ザ・ループのための組み込みの Checkpointing
- LLM 統合: ADK agent をグラフノードとしてラップするためのネイティブサポート
- 柔軟なルーティング: 静的なエッジ、条件付きルーティング、および動的な意思決定
作成するもの
このガイドでは、翻訳と要約を並行して実行するテキスト処理パイプラインを作成します。
┌─────────────────────┐
User Input │ │
────────────────▶ │ START │
│ │
└──────────┬──────────┘
│
┌───────────────┴───────────────┐
│ │
▼ ▼
┌──────────────────┐ ┌──────────────────┐
│ TRANSLATOR │ │ SUMMARIZER │
│ │ │ │
│ 🇫🇷 フランス語 │ │ 📝 1文要約 │
│ 翻訳 │ │ │
└─────────┬────────┘ └─────────┬────────┘
│ │
└───────────────┬───────────────┘
│
▼
┌─────────────────────┐
│ COMBINE │
│ │
│ 📋 結果のマージ │
└──────────┬──────────┘
│
▼
┌─────────────────────┐
│ END │
│ │
│ ✅ 完了 │
└─────────────────────┘
主要な概念:
- Nodes - 作業を実行する処理単位(LlmAgent、FunctionTool、カスタムロジック)
- Edges - ノード間の制御フロー(静的な接続または条件付きルーティング)
- State - グラフを流れ、ノード間で永続化される共有データ
- Parallel Execution - 複数のノードが同時に実行され、パフォーマンスが向上
コアコンポーネントの理解
🔧 Nodes: 作業員 Nodes は実際の作業が行われる場所です。各ノードは以下を実行できます。
- AgentNode: 自然言語を処理するためにLlmAgentをラップします。
- Function Node: データ処理のためにカスタムRustコードを実行します。
- Built-in Nodes: カウンターやバリデーターのような事前定義されたロジックを使用します。
ノードは、組み立てラインの専門作業員のように考えてください。それぞれが特定の仕事と専門知識を持っています。
🔀 Edges: フロー制御 Edges は、グラフ内での実行の移動方法を決定します。
- Static Edges: 直接接続 (
A → B → C) - Conditional Edges: Stateに基づく動的ルーティング (
if sentiment == "positive" → positive_handler) - Parallel Edges: 1つのノードからの複数のパス (
START → [translator, summarizer])
Edges は、作業の流れを指示する信号機や道路標識のようなものです。
💾 State: 共有メモリ State は、すべてのノードが読み書きできるキーと値のストアです。
- 入力データ: グラフに供給される初期情報
- 中間結果: あるノードからの出力が別のノードへの入力になります。
- 最終出力: すべての処理が完了した後の最終結果
State は、ノードが他のノードが使用する情報を残せる共有ホワイトボードのように機能します。
⚡ Parallel Execution: スピードブースト 複数のEdgeがノードから出ると、それらのターゲットノードは同時に実行されます。
- 処理の高速化: 独立したタスクが同時に実行されます。
- リソース効率: CPUとI/Oのより良い利用。
- スケーラビリティ: 線形的な速度低下なしに、より複雑なワークフローを処理します。
これは、複数の作業員が列に並んで待つのではなく、同時に仕事の異なる部分に取り組むようなものです。
クイックスタート
1. プロジェクトを作成する
cargo new graph_demo
cd graph_demo
Cargo.tomlに依存関係を追加します。
[dependencies]
adk-graph = { version = "0.2", features = ["sqlite"] }
adk-agent = "0.2"
adk-model = "0.2"
adk-core = "0.2"
tokio = { version = "1", features = ["full"] }
dotenvy = "0.15"
serde_json = "1.0"
あなたのAPI keyで.envを作成します。
echo 'GOOGLE_API_KEY=your-api-key' > .env
2. 並列処理の例
以下は、テキストを並列処理する完全な動作例です。
use adk_agent::LlmAgentBuilder;
use adk_graph::{
agent::GraphAgent,
edge::{END, START},
node::{AgentNode, ExecutionConfig, NodeOutput},
state::State,
};
use adk_model::GeminiModel;
use serde_json::json;
use std::sync::Arc;
#[tokio::main]
async fn main() -> anyhow::Result<()> {
dotenvy::dotenv().ok();
let api_key = std::env::var("GOOGLE_API_KEY")?;
let model = Arc::new(GeminiModel::new(&api_key, "gemini-2.0-flash")?);
// Create specialized LLM agents
let translator_agent = Arc::new(
LlmAgentBuilder::new("translator")
.description("Translates text to French")
.model(model.clone())
.instruction("Translate the input text to French. Only output the translation.")
.build()?,
);
let summarizer_agent = Arc::new(
LlmAgentBuilder::new("summarizer")
.description("Summarizes text")
.model(model.clone())
.instruction("Summarize the input text in one sentence.")
.build()?,
);
// Wrap agents as graph nodes with input/output mappers
let translator_node = AgentNode::new(translator_agent)
.with_input_mapper(|state| {
let text = state.get("input").and_then(|v| v.as_str()).unwrap_or("");
adk_core::Content::new("user").with_text(text)
})
.with_output_mapper(|events| {
let mut updates = std::collections::HashMap::new();
for event in events {
if let Some(content) = event.content() {
let text: String = content.parts.iter()
.filter_map(|p| p.text())
.collect::<Vec<_>>()
.join("");
if !text.is_empty() {
updates.insert("translation".to_string(), json!(text));
}
}
}
updates
});
let summarizer_node = AgentNode::new(summarizer_agent)
.with_input_mapper(|state| {
let text = state.get("input").and_then(|v| v.as_str()).unwrap_or("");
adk_core::Content::new("user").with_text(text)
})
.with_output_mapper(|events| {
let mut updates = std::collections::HashMap::new();
for event in events {
if let Some(content) = event.content() {
let text: String = content.parts.iter()
.filter_map(|p| p.text())
.collect::<Vec<_>>()
.join("");
if !text.is_empty() {
updates.insert("summary".to_string(), json!(text));
}
}
}
updates
});
// Build the graph with parallel execution
let agent = GraphAgent::builder("text_processor")
.description("Processes text with translation and summarization in parallel")
.channels(&["input", "translation", "summary", "result"])
.node(translator_node)
.node(summarizer_node)
.node_fn("combine", |ctx| async move {
let translation = ctx.get("translation").and_then(|v| v.as_str()).unwrap_or("N/A");
let summary = ctx.get("summary").and_then(|v| v.as_str()).unwrap_or("N/A");
let result = format!(
"=== Processing Complete ===\n\n\
French Translation:\n{}\n\n\
Summary:\n{}",
translation, summary
);
Ok(NodeOutput::new().with_update("result", json!(result)))
})
// Parallel execution: both nodes start simultaneously
.edge(START, "translator")
.edge(START, "summarizer")
.edge("translator", "combine")
.edge("summarizer", "combine")
.edge("combine", END)
.build()?;
// Execute the graph
let mut input = State::new();
input.insert("input".to_string(), json!("AI is transforming how we work and live."));
let result = agent.invoke(input, ExecutionConfig::new("thread-1")).await?;
println!("{}", result.get("result").and_then(|v| v.as_str()).unwrap_or(""));
Ok(())
}
出力例:
=== Processing Complete ===
French Translation:
L'IA transforme notre façon de travailler et de vivre.
Summary:
AI is revolutionizing work and daily life through technological transformation.
グラフの実行方法
全体像
GraphAgent はスーパーステップで実行されます。準備ができたすべてのノードが並行して実行され、グラフはすべてのノードが完了するのを待ってから次のステップに進みます。
Step 1: START ──┬──▶ translator (running)
└──▶ summarizer (running)
⏳ Wait for both to complete...
Step 2: translator ──┬──▶ combine (running)
summarizer ──┘
⏳ Wait for combine to complete...
Step 3: combine ──▶ END ✅
ノードを介した状態の流れ
各ノードは共有状態から読み取り、共有状態に書き込むことができます。
┌─────────────────────────────────────────────────────────────────────┐
│ STEP 1: Initial state │
├─────────────────────────────────────────────────────────────────────┤
│ │
│ State: { "input": "AI is transforming how we work" } │
│ │
│ ↓ │
│ │
│ ┌──────────────────┐ ┌──────────────────┐ │
│ │ translator │ │ summarizer │ │
│ │ reads "input" │ │ reads "input" │ │
│ └──────────────────┘ └──────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────────┘
↓
┌─────────────────────────────────────────────────────────────────────┐
│ STEP 2: After parallel execution │
├─────────────────────────────────────────────────────────────────────┤
│ │
│ State: { │
│ "input": "AI is transforming how we work", │
│ "translation": "L'IA transforme notre façon de travailler", │
│ "summary": "AI is revolutionizing work through technology" │
│ } │
│ │
│ ↓ │
│ │
│ ┌──────────────────────────────────────┐ │
│ │ combine │ │
│ │ reads "translation" + "summary" │ │
│ │ writes "result" │ │
│ └──────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────────┘
↓
┌─────────────────────────────────────────────────────────────────────┐
│ STEP 3: Final state │
├─────────────────────────────────────────────────────────────────────┤
│ │
│ State: { │
│ "input": "AI is transforming how we work", │
│ "translation": "L'IA transforme notre façon de travailler", │
│ "summary": "AI is revolutionizing work through technology",
│ "result": "=== Processing Complete ===\n\nFrench..." │
│ } │
│ │
└─────────────────────────────────────────────────────────────────────┘
動作の仕組み
| コンポーネント | 役割 |
|---|---|
AgentNode | LLMエージェントをinput/outputマッパーでラップします |
input_mapper | 状態を Agent 入力 Content に変換します |
output_mapper | Agent イベントを状態更新に変換します |
channels | グラフが使用する状態フィールドを宣言します |
edge() | ノード間の実行フローを定義します |
ExecutionConfig | チェックポイント用のスレッド ID を提供します |
LLM分類による条件付きルーティング
LLMが実行パスを決定するスマートなルーティングシステムを構築します。
ビジュアル: 感情ベースのルーティング
┌─────────────────────┐
ユーザーフィードバック │ │
────────────────▶ │ 分類器 │
│ 🧠 トーンを分析 │
└──────────┬──────────┘
│
┌───────────────┼───────────────┐
│ │ │
▼ ▼ ▼
┌──────────────────┐ ┌──────────────────┐ ┌──────────────────┐
│ 肯定的 │ │ 否定的 │ │ 中立的 │
│ │ │ │ │ │
│ 😊 ありがとうございます! │ │ 😔 お詫び │ │ 😐 追加の質問 │
│ 祝う │ │ 修正を支援 │ │ 追加の質問 │
└──────────────────┘ └──────────────────┘ └──────────────────┘
完全なコード例
use adk_agent::LlmAgentBuilder;
use adk_graph::{
edge::{END, Router, START},
graph::StateGraph,
node::{AgentNode, ExecutionConfig},
state::State,
};
use adk_model::GeminiModel;
use serde_json::json;
use std::sync::Arc;
#[tokio::main]
async fn main() -> anyhow::Result<()> {
dotenvy::dotenv().ok();
let api_key = std::env::var("GOOGLE_API_KEY")?;
let model = Arc::new(GeminiModel::new(&api_key, "gemini-2.0-flash")?);
// 分類器エージェントを作成
let classifier_agent = Arc::new(
LlmAgentBuilder::new("classifier")
.description("Classifies text sentiment")
.model(model.clone())
.instruction(
"You are a sentiment classifier. Analyze the input text and respond with \
ONLY one word: 'positive', 'negative', or 'neutral'. Nothing else.",
)
.build()?,
);
// 各感情に対応する応答エージェントを作成
let positive_agent = Arc::new(
LlmAgentBuilder::new("positive")
.description("Handles positive feedback")
.model(model.clone())
.instruction(
"You are a customer success specialist. The customer has positive feedback. \
Express gratitude, reinforce the positive experience, and suggest ways to \
share their experience. Be warm and appreciative. Keep response under 3 sentences.",
)
.build()?,
);
let negative_agent = Arc::new(
LlmAgentBuilder::new("negative")
.description("Handles negative feedback")
.model(model.clone())
.instruction(
"You are a customer support specialist. The customer has a complaint. \
Acknowledge their frustration, apologize sincerely, and offer help. \
Be empathetic. Keep response under 3 sentences.",
)
.build()?,
);
let neutral_agent = Arc::new(
LlmAgentBuilder::new("neutral")
.description("Handles neutral feedback")
.model(model.clone())
.instruction(
"You are a customer service representative. The customer has neutral feedback. \
Ask clarifying questions to better understand their needs. Be helpful and curious. \
Keep response under 3 sentences.",
)
.build()?,
);
// マッパーを持つAgentNodeを作成
let classifier_node = AgentNode::new(classifier_agent)
.with_input_mapper(|state| {
let text = state.get("feedback").and_then(|v| v.as_str()).unwrap_or("");
adk_core::Content::new("user").with_text(text)
})
.with_output_mapper(|events| {
let mut updates = std::collections::HashMap::new();
for event in events {
if let Some(content) = event.content() {
let text: String = content.parts.iter()
.filter_map(|p| p.text())
.collect::<Vec<_>>()
.join("")
.to_lowercase()
.trim()
.to_string();
let sentiment = if text.contains("positive") { "positive" }
else if text.contains("negative") { "negative" }
else { "neutral" };
updates.insert("sentiment".to_string(), json!(sentiment));
}
}
updates
});
// 応答ノード(それぞれ同様のパターン)
let positive_node = AgentNode::new(positive_agent)
.with_input_mapper(|state| {
let text = state.get("feedback").and_then(|v| v.as_str()).unwrap_or("");
adk_core::Content::new("user").with_text(text)
})
.with_output_mapper(|events| {
let mut updates = std::collections::HashMap::new();
for event in events {
if let Some(content) = event.content() {
let text: String = content.parts.iter()
.filter_map(|p| p.text())
.collect::<Vec<_>>()
.join("");
updates.insert("response".to_string(), json!(text));
}
}
updates
});
// 条件付きルーティングでグラフを構築
let graph = StateGraph::with_channels(&["feedback", "sentiment", "response"])
.add_node(classifier_node)
.add_node(positive_node)
// ... add negative_node and neutral_node similarly
.add_edge(START, "classifier")
.add_conditional_edges(
"classifier",
Router::by_field("sentiment"), // Route based on sentiment field
[
("positive", "positive"),
("negative", "negative"),
("neutral", "neutral"),
],
)
.add_edge("positive", END)
.add_edge("negative", END)
.add_edge("neutral", END)
.compile()?;
// 異なるフィードバックでテスト
let mut input = State::new();
input.insert("feedback".to_string(), json!("Your product is amazing! I love it!"));
let result = graph.invoke(input, ExecutionConfig::new("feedback-1")).await?;
println!("Sentiment: {}", result.get("sentiment").and_then(|v| v.as_str()).unwrap_or(""));
println!("Response: {}", result.get("response").and_then(|v| v.as_str()).unwrap_or(""));
Ok(())
}
実行例:
入力: "Your product is amazing! I love it!"
↓
分類器: "positive"
↓
肯定的エージェント: "Thank you so much for the wonderful feedback!
We're thrilled you love our product.
Would you consider leaving a review to help others?"
ReActパターン: 推論 + 行動
複雑な問題を解決するために、ツールを繰り返し使用できるエージェントを構築します。
ビジュアル: ReActサイクル
┌─────────────────────┐
ユーザーからの質問 │ │
────────────────▶ │ 推論器 │
│ 🧠 思考 + 行動 │
└──────────┬──────────┘
│
▼
┌─────────────────────┐
│ ツール呼び出しあり? │
│ │
└──────────┬──────────┘
│
┌───────────────┴───────────────┐
│ │
▼ ▼
┌──────────────────┐ ┌──────────────────┐
│ はい │ │ いいえ │
│ │ │ │
│ 🔄 推論器へ │ │ ✅ 最終回答 │
│ ループバック │ │ 終了 │
└─────────┬────────┘ └──────────────────┘
│
└─────────────────┐
│
▼
┌─────────────────────┐
│ 推論器 │
│ 🧠 思考 + 行動 │
│ (次のイテレーション) │
└─────────────────────┘
完全なReActの例
use adk_agent::LlmAgentBuilder;
use adk_core::{Part, Tool};
use adk_graph::{
edge::{END, START},
graph::StateGraph,
node::{AgentNode, ExecutionConfig, NodeOutput},
state::State,
};
use adk_model::GeminiModel;
use adk_tool::FunctionTool;
use serde_json::json;
use std::sync::Arc;
#[tokio::main]
async fn main() -> anyhow::Result<()> {
dotenvy::dotenv().ok();
let api_key = std::env::var("GOOGLE_API_KEY")?;
let model = Arc::new(GeminiModel::new(&api_key, "gemini-2.0-flash")?);
// Create tools
let weather_tool = Arc::new(FunctionTool::new(
"get_weather",
"Get the current weather for a location. Takes a 'location' parameter (city name).",
|_ctx, args| async move {
let location = args.get("location").and_then(|v| v.as_str()).unwrap_or("unknown");
Ok(json!({
"location": location,
"temperature": "72°F",
"condition": "Sunny",
"humidity": "45%"
}))
},
)) as Arc<dyn Tool>;
let calculator_tool = Arc::new(FunctionTool::new(
"calculator",
"Perform mathematical calculations. Takes an 'expression' parameter (string).",
|_ctx, args| async move {
let expr = args.get("expression").and_then(|v| v.as_str()).unwrap_or("0");
let result = match expr {
"2 + 2" => "4",
"10 * 5" => "50",
"100 / 4" => "25",
"15 - 7" => "8",
_ => "Unable to evaluate",
};
Ok(json!({ "result": result, "expression": expr }))
},
)) as Arc<dyn Tool>;
// Create reasoner agent with tools
let reasoner_agent = Arc::new(
LlmAgentBuilder::new("reasoner")
.description("Reasoning agent with tools")
.model(model.clone())
.instruction(
"You are a helpful assistant with access to tools. Use tools when needed to answer questions. \
When you have enough information, provide a final answer without using more tools.",
)
.tool(weather_tool)
.tool(calculator_tool)
.build()?,
);
// Create reasoner node that detects tool usage
let reasoner_node = AgentNode::new(reasoner_agent)
.with_input_mapper(|state| {
let question = state.get("question").and_then(|v| v.as_str()).unwrap_or("");
adk_core::Content::new("user").with_text(question)
})
.with_output_mapper(|events| {
let mut updates = std::collections::HashMap::new();
let mut has_tool_calls = false;
let mut response = String::new();
for event in events {
if let Some(content) = event.content() {
for part in &content.parts {
match part {
Part::FunctionCall { .. } => {
has_tool_calls = true;
}
Part::Text { text } => {
response.push_str(text);
}
_ => {}
}
}
}
}
updates.insert("has_tool_calls".to_string(), json!(has_tool_calls));
updates.insert("response".to_string(), json!(response));
updates
});
// Build ReAct graph with cycle
let graph = StateGraph::with_channels(&["question", "has_tool_calls", "response", "iteration"])
.add_node(reasoner_node)
.add_node_fn("counter", |ctx| async move {
let i = ctx.get("iteration").and_then(|v| v.as_i64()).unwrap_or(0);
Ok(NodeOutput::new().with_update("iteration", json!(i + 1)))
})
.add_edge(START, "counter")
.add_edge("counter", "reasoner")
.add_conditional_edges(
"reasoner",
|state| {
let has_tools = state.get("has_tool_calls").and_then(|v| v.as_bool()).unwrap_or(false);
let iteration = state.get("iteration").and_then(|v| v.as_i64()).unwrap_or(0);
// Safety limit
if iteration >= 5 { return END.to_string(); }
if has_tools {
"counter".to_string() // Loop back for more reasoning
} else {
END.to_string() // Done - final answer
}
},
[("counter", "counter"), (END, END)],
)
.compile()?
.with_recursion_limit(10);
// Test the ReAct agent
let mut input = State::new();
input.insert("question".to_string(), json!("What's the weather in Paris and what's 15 + 25?"));
let result = graph.invoke(input, ExecutionConfig::new("react-1")).await?;
println!("Final answer: {}", result.get("response").and_then(|v| v.as_str()).unwrap_or(""));
println!("Iterations: {}", result.get("iteration").and_then(|v| v.as_i64()).unwrap_or(0));
Ok(())
}
例の流れ:
質問: "パリの天気と15 + 25は何ですか?"
イテレーション 1:
推論器: "天気情報と計算が必要です"
→ get_weather(location="Paris") と calculator(expression="15 + 25") を呼び出す
→ has_tool_calls = true → ループバック
イテレーション 2:
推論器: "結果に基づくと: パリは72°Fで晴れ、15 + 25 = 40です"
→ ツール呼び出しなし → has_tool_calls = false → END
最終回答: "パリの天気は72°Fで晴れ、湿度は45%です。
そして、15 + 25 は 40 です。"
AgentNode
任意のADK Agent(通常はLlmAgent)をグラフノードとしてラップします。
let node = AgentNode::new(llm_agent)
.with_input_mapper(|state| {
// グラフの状態をエージェントの入力Contentに変換します
let text = state.get("input").and_then(|v| v.as_str()).unwrap_or("");
adk_core::Content::new("user").with_text(text)
})
.with_output_mapper(|events| {
// エージェントのイベントを状態の更新に変換します
let mut updates = std::collections::HashMap::new();
for event in events {
if let Some(content) = event.content() {
let text: String = content.parts.iter()
.filter_map(|p| p.text())
.collect::<Vec<_>>()
.join("");
updates.insert("output".to_string(), json!(text));
}
}
updates
});
関数ノード
状態を処理するシンプルなasync関数:
.node_fn("process", |ctx| async move {
let input = ctx.state.get("input").unwrap();
let output = process_data(input).await?;
Ok(NodeOutput::new().with_update("output", output))
})
エッジの種類
静的エッジ
ノード間の直接的な接続:
.edge(START, "first_node")
.edge("first_node", "second_node")
.edge("second_node", END)
条件付きエッジ
状態に基づく動的なルーティング:
.conditional_edge(
"router",
|state| {
match state.get("next").and_then(|v| v.as_str()) {
Some("research") => "research_node".to_string(),
Some("write") => "write_node".to_string(),
_ => END.to_string(),
}
},
[
("research_node", "research_node"),
("write_node", "write_node"),
(END, END),
],
)
ルーターヘルパー
一般的なパターンには組み込みのルーターを使用します:
use adk_graph::edge::Router;
// 状態フィールドの値に基づいてルーティング
.conditional_edge("classifier", Router::by_field("sentiment"), [
("positive", "positive_handler"),
("negative", "negative_handler"),
("neutral", "neutral_handler"),
])
// 真偽値フィールドに基づいてルーティング
.conditional_edge("check", Router::by_bool("approved"), [
("true", "execute"),
("false", "reject"),
])
// 反復回数を制限
.conditional_edge("loop", Router::max_iterations("count", 5), [
("continue", "process"),
("done", END),
])
並列実行
単一ノードからの複数のエッジは並列に実行されます:
let agent = GraphAgent::builder("parallel_processor")
.channels(&["input", "translation", "summary", "analysis"])
.node(translator_node)
.node(summarizer_node)
.node(analyzer_node)
.node(combiner_node)
// 3つすべてが同時に開始
.edge(START, "translator")
.edge(START, "summarizer")
.edge(START, "analyzer")
// すべてが完了するのを待ってから結合
.edge("translator", "combiner")
.edge("summarizer", "combiner")
.edge("analyzer", "combiner")
.edge("combiner", END)
.build()?;
循環グラフ (ReAct パターン)
サイクルを持つ反復推論エージェントを構築します。
use adk_core::Part;
// Create agent with tools
let reasoner = Arc::new(
LlmAgentBuilder::new("reasoner")
.model(model)
.instruction("Use tools to answer questions. Provide final answer when done.")
.tool(search_tool)
.tool(calculator_tool)
.build()?
);
let reasoner_node = AgentNode::new(reasoner)
.with_input_mapper(|state| {
let question = state.get("question").and_then(|v| v.as_str()).unwrap_or("");
adk_core::Content::new("user").with_text(question)
})
.with_output_mapper(|events| {
let mut updates = std::collections::HashMap::new();
let mut has_tool_calls = false;
let mut response = String::new();
for event in events {
if let Some(content) = event.content() {
for part in &content.parts {
match part {
Part::FunctionCall { name, .. } => {
has_tool_calls = true;
}
Part::Text { text } => {
response.push_str(text);
}
_ => {}
}
}
}
}
updates.insert("has_tool_calls".to_string(), json!(has_tool_calls));
updates.insert("response".to_string(), json!(response));
updates
});
// Build graph with cycle
let react_agent = StateGraph::with_channels(&["question", "has_tool_calls", "response", "iteration"])
.add_node(reasoner_node)
.add_node_fn("counter", |ctx| async move {
let i = ctx.get("iteration").and_then(|v| v.as_i64()).unwrap_or(0);
Ok(NodeOutput::new().with_update("iteration", json!(i + 1)))
})
.add_edge(START, "counter")
.add_edge("counter", "reasoner")
.add_conditional_edges(
"reasoner",
|state| {
let has_tools = state.get("has_tool_calls").and_then(|v| v.as_bool()).unwrap_or(false);
let iteration = state.get("iteration").and_then(|v| v.as_i64()).unwrap_or(0);
// Safety limit
if iteration >= 5 { return END.to_string(); }
if has_tools {
"counter".to_string() // Loop back
} else {
END.to_string() // Done
}
},
[("counter", "counter"), (END, END)],
)
.compile()?
.with_recursion_limit(10);
マルチエージェントスーパーバイザー
タスクを専門エージェントにルーティングします:
// Create supervisor agent
let supervisor = Arc::new(
LlmAgentBuilder::new("supervisor")
.model(model.clone())
.instruction("Route tasks to: researcher, writer, or coder. Reply with agent name only.")
.build()?
);
let supervisor_node = AgentNode::new(supervisor)
.with_output_mapper(|events| {
let mut updates = std::collections::HashMap::new();
for event in events {
if let Some(content) = event.content() {
let text: String = content.parts.iter()
.filter_map(|p| p.text())
.collect::<Vec<_>>()
.join("")
.to_lowercase();
let next = if text.contains("researcher") { "researcher" }
else if text.contains("writer") { "writer" }
else if text.contains("coder") { "coder" }
else { "done" };
updates.insert("next_agent".to_string(), json!(next));
}
}
updates
});
// Build supervisor graph
let graph = StateGraph::with_channels(&["task", "next_agent", "research", "content", "code"])
.add_node(supervisor_node)
.add_node(researcher_node)
.add_node(writer_node)
.add_node(coder_node)
.add_edge(START, "supervisor")
.add_conditional_edges(
"supervisor",
Router::by_field("next_agent"),
[
("researcher", "researcher"),
("writer", "writer"),
("coder", "coder"),
("done", END),
],
)
// Agents report back to supervisor
.add_edge("researcher", "supervisor")
.add_edge("writer", "supervisor")
.add_edge("coder", "supervisor")
.compile()?;
状態管理
リデューサーによるState Schema
状態更新がどのようにマージされるかを制御します:
let schema = StateSchema::builder()
.channel("current_step") // 上書き (デフォルト)
.list_channel("messages") // リストに追加
.channel_with_reducer("count", Reducer::Sum) // 値を合計
.channel_with_reducer("data", Reducer::Custom(Arc::new(|old, new| {
// カスタムマージロジック
merge_json(old, new)
})))
.build();
let agent = GraphAgent::builder("stateful")
.state_schema(schema)
// ... nodes and edges
.build()?;
Reducer タイプ
| Reducer | 動作 |
|---|---|
Overwrite | 古い値を新しい値に置き換えます (デフォルト) |
Append | リストに追加します |
Sum | 数値を合計します |
Custom | カスタムマージ関数 |
チェックポイント
フォールトトレランスとヒューマン・イン・ザ・ループのために永続的な状態を有効にします:
インメモリ (開発用)
use adk_graph::checkpoint::MemoryCheckpointer;
let checkpointer = Arc::new(MemoryCheckpointer::new());
let graph = StateGraph::with_channels(&["task", "result"])
// ... nodes and edges
.compile()?
.with_checkpointer_arc(checkpointer.clone());
SQLite (本番用)
use adk_graph::checkpoint::SqliteCheckpointer;
let checkpointer = SqliteCheckpointer::new("checkpoints.db").await?;
let graph = StateGraph::with_channels(&["task", "result"])
// ... nodes and edges
.compile()?
.with_checkpointer(checkpointer);
チェックポイント履歴 (タイムトラベル)
// スレッドのすべてのチェックポイントをリスト表示
let checkpoints = checkpointer.list("thread-id").await?;
for cp in checkpoints {
println!("Step {}: {:?}", cp.step, cp.state.get("status"));
}
// 特定のチェックポイントをロード
if let Some(checkpoint) = checkpointer.load_by_id(&checkpoint_id).await? {
println!("State at step {}: {:?}", checkpoint.step, checkpoint.state);
}
ヒューマン・イン・ザ・ループ
動的割り込みを使用して人間による承認のために実行を一時停止します。
use adk_graph::{error::GraphError, node::NodeOutput};
// Planner agentがリスクを評価します
let planner_node = AgentNode::new(planner_agent)
.with_output_mapper(|events| {
let mut updates = std::collections::HashMap::new();
for event in events {
if let Some(content) = event.content() {
let text: String = content.parts.iter()
.filter_map(|p| p.text())
.collect::<Vec<_>>()
.join("");
// LLMの応答からリスクレベルを抽出します
let risk = if text.to_lowercase().contains("risk: high") { "high" }
else if text.to_lowercase().contains("risk: medium") { "medium" }
else { "low" };
updates.insert("plan".to_string(), json!(text));
updates.insert("risk_level".to_string(), json!(risk));
}
}
updates
});
// 動的割り込みを持つレビューノード
let graph = StateGraph::with_channels(&["task", "plan", "risk_level", "approved", "result"])
.add_node(planner_node)
.add_node(executor_node)
.add_node_fn("review", |ctx| async move {
let risk = ctx.get("risk_level").and_then(|v| v.as_str()).unwrap_or("low");
let approved = ctx.get("approved").and_then(|v| v.as_bool());
// すでに承認済み - 続行します
if approved == Some(true) {
return Ok(NodeOutput::new());
}
// 高/中リスク - 承認のために割り込みます
if risk == "high" || risk == "medium" {
return Ok(NodeOutput::interrupt_with_data(
&format!("{} RISK: Human approval required", risk.to_uppercase()),
json!({
"plan": ctx.get("plan"),
"risk_level": risk,
"action": "Set 'approved' to true to continue"
})
));
}
// 低リスク - 自動承認します
Ok(NodeOutput::new().with_update("approved", json!(true)))
})
.add_edge(START, "planner")
.add_edge("planner", "review")
.add_edge("review", "executor")
.add_edge("executor", END)
.compile()?
.with_checkpointer_arc(checkpointer.clone());
// 実行 - 承認のために一時停止する場合があります
let thread_id = "task-001";
let result = graph.invoke(input, ExecutionConfig::new(thread_id)).await;
match result {
Err(GraphError::Interrupted(interrupt)) => {
println!("*** EXECUTION PAUSED ***");
println!("Reason: {}", interrupt.interrupt);
println!("Plan awaiting approval: {:?}", interrupt.state.get("plan"));
// 人間がレビューし、承認します...
// 承認でstateを更新します
graph.update_state(thread_id, [("approved".to_string(), json!(true))]).await?;
// 実行を再開します
let final_result = graph.invoke(State::new(), ExecutionConfig::new(thread_id)).await?;
println!("Final result: {:?}", final_result.get("result"));
}
Ok(result) => {
println!("Completed without interrupt: {:?}", result);
}
Err(e) => {
println!("Error: {}", e);
}
}
静的割り込み
強制的な一時停止ポイントには、interrupt_beforeまたはinterrupt_afterを使用します。
let graph = StateGraph::with_channels(&["task", "plan", "result"])
.add_node(planner_node)
.add_node(executor_node)
.add_edge(START, "planner")
.add_edge("planner", "executor")
.add_edge("executor", END)
.compile()?
.with_interrupt_before(&["executor"]); // 常に実行前に一時停止します
ストリーミング実行
グラフの実行中にイベントをストリームします。
use futures::StreamExt;
use adk_graph::stream::StreamMode;
let stream = agent.stream(input, config, StreamMode::Updates);
while let Some(event) = stream.next().await {
match event? {
StreamEvent::NodeStart(name) => println!("Starting: {}", name),
StreamEvent::Updates { node, updates } => {
println!("{} updated state: {:?}", node, updates);
}
StreamEvent::NodeEnd(name) => println!("Completed: {}", name),
StreamEvent::Done(state) => println!("Final state: {:?}", state),
_ => {}
}
}
ストリームモード
| モード | 説明 |
|---|---|
Values | 各ノードの実行後に完全な状態をストリームします |
Updates | 状態の変更のみをストリームします |
Messages | メッセージ型の更新をストリームします |
Debug | すべての内部イベントをストリームします |
ADK連携
GraphAgentはADK Agentトレイトを実装しているため、以下と連携できます。
- Runner:
adk-runnerと組み合わせて標準的な実行に使用します - Callbacks: before/afterコールバックを完全にサポートします
- Sessions:
adk-sessionと連携して会話履歴を扱います - Streaming: ADK
EventStreamを返します
use adk_runner::Runner;
let graph_agent = GraphAgent::builder("workflow")
.before_agent_callback(|ctx| async {
// セッション: {} のグラフ実行を開始します
println!("Starting graph execution for session: {}", ctx.session_id());
Ok(())
})
.after_agent_callback(|ctx, event| async {
if let Some(content) = event.content() {
// グラフがコンテンツで完了しました
println!("Graph completed with content");
}
Ok(())
})
// ... グラフ定義
.build()?;
// GraphAgentはAgentトレイトを実装しています - LauncherまたはRunnerと組み合わせて使用します
// Runnerの設定については adk-runner のREADMEを参照してください
例
すべての例は、AgentNodeとの実際のLLM連携を使用しています。
# before/afterコールバックを持つ並列LLMエージェント
cargo run --example graph_agent
# 順次マルチエージェントパイプライン(抽出器 → 分析器 → フォーマッタ)
cargo run --example graph_workflow
# LLMベースの感情分類と条件付きルーティング
cargo run --example graph_conditional
# ツールと循環実行を伴うReActパターン
cargo run --example graph_react
# スペシャリストへのマルチエージェントスーパーバイザールーティング
cargo run --example graph_supervisor
# リスクベースの割り込みを伴うヒューマン・イン・ザ・ループ
cargo run --example graph_hitl
# チェックポイントとタイムトラベルデバッグ
cargo run --example graph_checkpoint
LangGraphとの比較
| 機能 | LangGraph | adk-graph |
|---|---|---|
| 状態管理 | TypedDict + Reducers | StateSchema + Reducers |
| 実行モデル | Pregel super-steps | Pregel super-steps |
| チェックポイント | Memory, SQLite, Postgres | Memory, SQLite |
| ヒューマン・イン・ザ・ループ | interrupt_before/after | interrupt_before/after + dynamic |
| ストリーミング | 5つのモード | 5つのモード |
| サイクル | ネイティブサポート | ネイティブサポート |
| 型安全性 | Pythonの型付け | Rustの型システム |
| LLM連携 | LangChain | AgentNode + ADK agents |
前へ: ← マルチエージェントシステム | 次へ: リアルタイムエージェント →