그래프 에이전트
네이티브 ADK-Rust 통합과 LangGraph 스타일 오케스트레이션을 사용하여 복잡하고 상태 저장 워크플로우를 구축하세요.
개요
GraphAgent는 노드와 엣지로 구성된 방향성 그래프로 워크플로우를 정의할 수 있게 해주며, 다음을 지원합니다:
- AgentNode: 커스텀 입/출력 매퍼를 사용하여 LLM 에이전트를 그래프 노드로 래핑합니다.
- 순환 워크플로우: 반복 및 반복적 추론(ReAct pattern)에 대한 네이티브 지원.
- 조건부 라우팅: 상태에 기반한 동적 엣지 라우팅.
- 상태 관리: 리듀서(덮어쓰기, 추가, 합계, 커스텀)가 있는 타입 지정 상태.
- 체크포인트: 내결함성과 Human-in-the-loop를 위한 영구 상태.
- 스트리밍: 다중 스트림 모드(값, 업데이트, 메시지, 디버그).
adk-graph 크레이트는 복잡하고 상태 저장 Agent 워크플로우를 구축하기 위한 LangGraph 스타일 워크플로우 오케스트레이션을 제공합니다. 이는 ADK의 Agent 시스템과 완벽한 호환성을 유지하면서 ADK-Rust 생태계에 그래프 기반 워크플로우 기능을 제공합니다.
주요 이점:
- 시각적 워크플로우 설계: 직관적인 노드 및 엣지 그래프로 복잡한 로직을 정의합니다.
- 병렬 실행: 여러 노드가 동시에 실행되어 성능을 향상시킬 수 있습니다.
- 상태 영속성: 내결함성과 Human-in-the-loop를 위한 내장 체크포인트 기능.
- LLM 통합: ADK Agent를 그래프 노드로 래핑하는 네이티브 지원.
- 유연한 라우팅: 정적 엣지, 조건부 라우팅 및 동적 의사 결정.
만들게 될 것
이 가이드에서는 번역과 요약을 병렬로 실행하는 텍스트 처리 파이프라인을 만들게 됩니다:
┌─────────────────────┐
User Input │ │
────────────────▶ │ START │
│ │
└──────────┬──────────┘
│
┌───────────────┴───────────────┐
│ │
▼ ▼
┌──────────────────┐ ┌──────────────────┐
│ TRANSLATOR │ │ SUMMARIZER │
│ │ │ │
│ 🇫🇷 French │ │ 📝 One sentence │
│ Translation │ │ Summary │
└─────────┬────────┘ └─────────┬────────┘
│ │
└───────────────┬───────────────┘
│
▼
┌─────────────────────┐
│ COMBINE │
│ │
│ 📋 Merge Results │
└──────────┬──────────┘
│
▼
┌─────────────────────┐
│ END │
│ │
│ ✅ Complete │
└─────────────────────┘
핵심 개념:
- Nodes - 작업을 수행하는 처리 단위 (LLM 에이전트, 함수 또는 사용자 정의 로직)
- Edges - 노드 간의 제어 흐름 (정적 연결 또는 조건부 라우팅)
- State - 그래프를 통해 흐르고 노드 간에 유지되는 공유 데이터
- Parallel Execution - 더 나은 성능을 위해 여러 노드가 동시에 실행될 수 있음
핵심 구성 요소 이해하기
🔧 Nodes: 작업자 노드는 실제 작업이 일어나는 곳입니다. 각 노드는 다음을 수행할 수 있습니다:
- AgentNode: 자연어를 처리하기 위해 LLM 에이전트를 감쌉니다.
- Function Node: 데이터 처리를 위해 사용자 정의 Rust 코드를 실행합니다.
- Built-in Nodes: 카운터 또는 유효성 검사기 같은 사전 정의된 로직을 사용합니다.
노드를 조립 라인의 전문 작업자라고 생각해보세요. 각 작업자는 특정 작업과 전문성을 가집니다.
🔀 Edges: 흐름 제어 에지는 그래프에서 실행이 어떻게 이동하는지 결정합니다:
- Static Edges: 직접 연결 (
A → B → C) - Conditional Edges: 상태에 기반한 동적 라우팅 (
if sentiment == "positive" → positive_handler) - Parallel Edges: 한 노드에서 여러 경로 (
START → [translator, summarizer])
에지는 작업 흐름을 지시하는 교통 신호등 및 도로 표지판과 같습니다.
💾 State: 공유 메모리 State는 모든 노드가 읽고 쓸 수 있는 키-값 저장소입니다:
- Input Data: 그래프에 공급되는 초기 정보
- Intermediate Results: 한 노드의 출력이 다른 노드의 입력이 됨
- Final Output: 모든 처리 후 완료된 결과
State는 노드들이 다른 노드들이 사용할 정보를 남길 수 있는 공유 화이트보드 역할을 합니다.
⚡ Parallel Execution: 속도 향상 여러 에지가 한 노드에서 나갈 때, 해당 대상 노드들은 동시에 실행됩니다:
- Faster Processing: 독립적인 작업이 동시에 실행됨
- Resource Efficiency: CPU 및 I/O의 더 나은 활용
- Scalability: 선형적인 속도 저하 없이 더 복잡한 워크플로 처리
이것은 줄을 서서 기다리는 대신 여러 작업자가 작업의 여러 부분을 동시에 처리하는 것과 같습니다.
빠른 시작
1. 프로젝트 생성
cargo new graph_demo
cd graph_demo
Cargo.toml에 의존성을 추가합니다:
[dependencies]
adk-graph = { version = "0.2", features = ["sqlite"] }
adk-agent = "0.2"
adk-model = "0.2"
adk-core = "0.2"
tokio = { version = "1", features = ["full"] }
dotenvy = "0.15"
serde_json = "1.0"
API 키를 사용하여 .env 파일을 생성합니다:
echo 'GOOGLE_API_KEY=your-api-key' > .env
2. 병렬 처리 예시
다음은 텍스트를 병렬로 처리하는 완전한 작동 예시입니다:
use adk_agent::LlmAgentBuilder;
use adk_graph::{
agent::GraphAgent,
edge::{END, START},
node::{AgentNode, ExecutionConfig, NodeOutput},
state::State,
};
use adk_model::GeminiModel;
use serde_json::json;
use std::sync::Arc;
#[tokio::main]
async fn main() -> anyhow::Result<()> {
dotenvy::dotenv().ok();
let api_key = std::env::var("GOOGLE_API_KEY")?;
let model = Arc::new(GeminiModel::new(&api_key, "gemini-2.0-flash")?);
// Create specialized LLM agents
let translator_agent = Arc::new(
LlmAgentBuilder::new("translator")
.description("Translates text to French")
.model(model.clone())
.instruction("Translate the input text to French. Only output the translation.")
.build()?,
);
let summarizer_agent = Arc::new(
LlmAgentBuilder::new("summarizer")
.description("Summarizes text")
.model(model.clone())
.instruction("Summarize the input text in one sentence.")
.build()?,
);
// Wrap agents as graph nodes with input/output mappers
let translator_node = AgentNode::new(translator_agent)
.with_input_mapper(|state| {
let text = state.get("input").and_then(|v| v.as_str()).unwrap_or("");
adk_core::Content::new("user").with_text(text)
})
.with_output_mapper(|events| {
let mut updates = std::collections::HashMap::new();
for event in events {
if let Some(content) = event.content() {
let text: String = content.parts.iter()
.filter_map(|p| p.text())
.collect::<Vec<_>>()
.join("");
if !text.is_empty() {
updates.insert("translation".to_string(), json!(text));
}
}
}
updates
});
let summarizer_node = AgentNode::new(summarizer_agent)
.with_input_mapper(|state| {
let text = state.get("input").and_then(|v| v.as_str()).unwrap_or("");
adk_core::Content::new("user").with_text(text)
})
.with_output_mapper(|events| {
let mut updates = std::collections::HashMap::new();
for event in events {
if let Some(content) = event.content() {
let text: String = content.parts.iter()
.filter_map(|p| p.text())
.collect::<Vec<_>>()
.join("");
if !text.is_empty() {
updates.insert("summary".to_string(), json!(text));
}
}
}
updates
});
// Build the graph with parallel execution
let agent = GraphAgent::builder("text_processor")
.description("Processes text with translation and summarization in parallel")
.channels(&["input", "translation", "summary", "result"])
.node(translator_node)
.node(summarizer_node)
.node_fn("combine", |ctx| async move {
let translation = ctx.get("translation").and_then(|v| v.as_str()).unwrap_or("N/A");
let summary = ctx.get("summary").and_then(|v| v.as_str()).unwrap_or("N/A");
let result = format!(
"=== Processing Complete ===\n\n\
French Translation:\n{}\n\n\
Summary:\n{}",
translation, summary
);
Ok(NodeOutput::new().with_update("result", json!(result)))
})
// Parallel execution: both nodes start simultaneously
.edge(START, "translator")
.edge(START, "summarizer")
.edge("translator", "combine")
.edge("summarizer", "combine")
.edge("combine", END)
.build()?;
// Execute the graph
let mut input = State::new();
input.insert("input".to_string(), json!("AI is transforming how we work and live."));
let result = agent.invoke(input, ExecutionConfig::new("thread-1")).await?;
println!("{}", result.get("result").and_then(|v| v.as_str()).unwrap_or(""));
Ok(())
}
예시 출력:
=== Processing Complete ===
French Translation:
L'IA transforme notre façon de travailler et de vivre.
Summary:
AI is revolutionizing work and daily life through technological transformation.
Graph 실행 방식
큰 그림
GraphAgent는 슈퍼 스텝으로 실행됩니다. 모든 준비된 노드는 병렬로 실행되며, 다음 스텝으로 진행하기 전에 그래프는 모든 노드가 완료될 때까지 기다립니다:
Step 1: START ──┬──▶ translator (running)
└──▶ summarizer (running)
⏳ Wait for both to complete...
Step 2: translator ──┬──▶ combine (running)
summarizer ──┘
⏳ Wait for combine to complete...
Step 3: combine ──▶ END ✅
노드를 통한 상태 흐름
각 노드는 공유 상태에서 읽고 쓸 수 있습니다:
┌─────────────────────────────────────────────────────────────────────┐
│ STEP 1: Initial state │
├─────────────────────────────────────────────────────────────────────┤
│ │
│ State: { "input": "AI is transforming how we work" } │
│ │
│ ↓ │
│ │
│ ┌──────────────────┐ ┌──────────────────┐ │
│ │ translator │ │ summarizer │ │
│ │ reads "input" │ │ reads "input" │ │
│ └──────────────────┘ └──────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────────┘
↓
┌─────────────────────────────────────────────────────────────────────┐
│ STEP 2: After parallel execution │
├─────────────────────────────────────────────────────────────────────┤
│ │
│ State: { │
│ "input": "AI is transforming how we work", │
│ "translation": "L'IA transforme notre façon de travailler", │
│ "summary": "AI is revolutionizing work through technology" │
│ } │
│ │
│ ↓ │
│ │
│ ┌──────────────────────────────────────┐ │
│ │ combine │ │
│ │ reads "translation" + "summary" │ │
│ │ writes "result" │ │
│ └──────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────────┘
↓
┌─────────────────────────────────────────────────────────────────────┐
│ STEP 3: Final state │
├─────────────────────────────────────────────────────────────────────┤
│ │
│ State: { │
│ "input": "AI is transforming how we work", │
│ "translation": "L'IA transforme notre façon de travailler", │
│ "summary": "AI is revolutionizing work through technology", │
│ "result": "=== Processing Complete ===\n\nFrench..." │
│ } │
│ │
└─────────────────────────────────────────────────────────────────────┘
작동 원리
| 구성 요소 | 역할 |
|---|---|
AgentNode | LLM Agent를 input/output mappers로 래핑합니다. |
input_mapper | 상태를 Agent 입력 Content로 변환합니다. |
output_mapper | Agent 이벤트를 상태 업데이트로 변환합니다. |
channels | 그래프가 사용할 상태 필드를 선언합니다. |
edge() | 노드 간의 실행 흐름을 정의합니다. |
ExecutionConfig | 체크포인팅을 위한 스레드 ID를 제공합니다. |
LLM 분류를 사용한 조건부 라우팅
LLM이 실행 경로를 결정하는 스마트 라우팅 시스템을 구축합니다:
시각화: 감성 기반 라우팅
┌─────────────────────┐
사용자 피드백 │ │
────────────────▶ │ 분류기 │
│ 🧠 톤 분석 │
└──────────┬──────────┘
│
┌───────────────┼───────────────┐
│ │ │
▼ ▼ ▼
┌──────────────────┐ ┌──────────────────┐ ┌──────────────────┐
│ 긍정적 │ │ 부정적 │ │ 중립적 │
│ │ │ │ │ │
│ 😊 감사합니다! │ │ 😔 사과하기 │ │ 😐 추가 질문하기 │
│ 축하하기 │ │ 문제 해결 돕기 │ │ 추가 질문하기 │
└──────────────────┘ └──────────────────┘ └──────────────────┘
전체 예시 코드
use adk_agent::LlmAgentBuilder;
use adk_graph::{
edge::{END, Router, START},
graph::StateGraph,
node::{AgentNode, ExecutionConfig},
state::State,
};
use adk_model::GeminiModel;
use serde_json::json;
use std::sync::Arc;
#[tokio::main]
async fn main() -> anyhow::Result<()> {
dotenvy::dotenv().ok();
let api_key = std::env::var("GOOGLE_API_KEY")?;
let model = Arc::new(GeminiModel::new(&api_key, "gemini-2.0-flash")?);
// Create classifier agent
let classifier_agent = Arc::new(
LlmAgentBuilder::new("classifier")
.description("Classifies text sentiment")
.model(model.clone())
.instruction(
"You are a sentiment classifier. Analyze the input text and respond with \
ONLY one word: 'positive', 'negative', or 'neutral'. Nothing else.",
)
.build()?,
);
// Create response agents for each sentiment
let positive_agent = Arc::new(
LlmAgentBuilder::new("positive")
.description("Handles positive feedback")
.model(model.clone())
.instruction(
"You are a customer success specialist. The customer has positive feedback. \
Express gratitude, reinforce the positive experience, and suggest ways to \
share their experience. Be warm and appreciative. Keep response under 3 sentences.",
)
.build()?,
);
let negative_agent = Arc::new(
LlmAgentBuilder::new("negative")
.description("Handles negative feedback")
.model(model.clone())
.instruction(
"You are a customer support specialist. The customer has a complaint. \
Acknowledge their frustration, apologize sincerely, and offer help. \
Be empathetic. Keep response under 3 sentences.",
)
.build()?,
);
let neutral_agent = Arc::new(
LlmAgentBuilder::new("neutral")
.description("Handles neutral feedback")
.model(model.clone())
.instruction(
"You are a customer service representative. The customer has neutral feedback. \
Ask clarifying questions to better understand their needs. Be helpful and curious. \
Keep response under 3 sentences.",
)
.build()?,
);
// Create AgentNodes with mappers
let classifier_node = AgentNode::new(classifier_agent)
.with_input_mapper(|state| {
let text = state.get("feedback").and_then(|v| v.as_str()).unwrap_or("");
adk_core::Content::new("user").with_text(text)
})
.with_output_mapper(|events| {
let mut updates = std::collections::HashMap::new();
for event in events {
if let Some(content) = event.content() {
let text: String = content.parts.iter()
.filter_map(|p| p.text())
.collect::<Vec<_>>()
.join("")
.to_lowercase()
.trim()
.to_string();
let sentiment = if text.contains("positive") { "positive" }
else if text.contains("negative") { "negative" }
else { "neutral" };
updates.insert("sentiment".to_string(), json!(sentiment));
}
}
updates
});
// Response nodes (similar pattern for each)
let positive_node = AgentNode::new(positive_agent)
.with_input_mapper(|state| {
let text = state.get("feedback").and_then(|v| v.as_str()).unwrap_or("");
adk_core::Content::new("user").with_text(text)
})
.with_output_mapper(|events| {
let mut updates = std::collections::HashMap::new();
for event in events {
if let Some(content) = event.content() {
let text: String = content.parts.iter()
.filter_map(|p| p.text())
.collect::<Vec<_>>()
.join("");
updates.insert("response".to_string(), json!(text));
}
}
updates
});
// Build graph with conditional routing
let graph = StateGraph::with_channels(&["feedback", "sentiment", "response"])
.add_node(classifier_node)
.add_node(positive_node)
// ... add negative_node and neutral_node similarly
.add_edge(START, "classifier")
.add_conditional_edges(
"classifier",
Router::by_field("sentiment"), // Route based on sentiment field
[
("positive", "positive"),
("negative", "negative"),
("neutral", "neutral"),
],
)
.add_edge("positive", END)
.add_edge("negative", END)
.add_edge("neutral", END)
.compile()?;
// Test with different feedback
let mut input = State::new();
input.insert("feedback".to_string(), json!("Your product is amazing! I love it!"));
let result = graph.invoke(input, ExecutionConfig::new("feedback-1")).await?;
println!("Sentiment: {}", result.get("sentiment").and_then(|v| v.as_str()).unwrap_or(""));
println!("Response: {}", result.get("response").and_then(|v| v.as_str()).unwrap_or(""));
Ok(())
}
예시 흐름:
입력: "귀하의 제품은 정말 놀랍습니다! 마음에 듭니다!"
↓
분류기: "positive"
↓
긍정 에이전트: "멋진 피드백 정말 감사합니다!
저희 제품을 마음에 들어 해주셔서 기쁩니다.
다른 분들을 돕기 위해 리뷰를 남겨주시겠어요?"
ReAct 패턴: 추론 + 행동
복잡한 문제를 해결하기 위해 도구를 반복적으로 사용할 수 있는 에이전트를 구축합니다:
시각 자료: ReAct 사이클
┌─────────────────────┐
사용자 질문 │ │
────────────────▶ │ 추론자 │
│ 🧠 생각 + 행동 │
└──────────┬──────────┘
│
▼
┌─────────────────────┐
│ 도구 호출이 있는가? │
│ │
└──────────┬──────────┘
│
┌───────────────┴───────────────┐
│ │
▼ ▼
┌──────────────────┐ ┌──────────────────┐
│ 예 │ │ 아니오 │
│ │ │ │
│ 🔄 추론자로 돌아가기 │ │ ✅ 최종 답변 종료 │
│ │ │ │
└─────────┬────────┘ └──────────────────┘
│
└─────────────────┐
│
▼
┌─────────────────────┐
│ 추론자 │
│ 🧠 생각 + 행동 │
│ (다음 반복) │
└─────────────────────┘
ReAct 전체 예시
use adk_agent::LlmAgentBuilder;
use adk_core::{Part, Tool};
use adk_graph::{
edge::{END, START},
graph::StateGraph,
node::{AgentNode, ExecutionConfig, NodeOutput},
state::State,
};
use adk_model::GeminiModel;
use adk_tool::FunctionTool;
use serde_json::json;
use std::sync::Arc;
#[tokio::main]
async fn main() -> anyhow::Result<()> {
dotenvy::dotenv().ok();
let api_key = std::env::var("GOOGLE_API_KEY")?;
let model = Arc::new(GeminiModel::new(&api_key, "gemini-2.0-flash")?);
// Create tools
let weather_tool = Arc::new(FunctionTool::new(
"get_weather",
"Get the current weather for a location. Takes a 'location' parameter (city name).",
|_ctx, args| async move {
let location = args.get("location").and_then(|v| v.as_str()).unwrap_or("unknown");
Ok(json!({
"location": location,
"temperature": "72°F",
"condition": "Sunny",
"humidity": "45%"
}))
},
)) as Arc<dyn Tool>;
let calculator_tool = Arc::new(FunctionTool::new(
"calculator",
"Perform mathematical calculations. Takes an 'expression' parameter (string).",
|_ctx, args| async move {
let expr = args.get("expression").and_then(|v| v.as_str()).unwrap_or("0");
let result = match expr {
"2 + 2" => "4",
"10 * 5" => "50",
"100 / 4" => "25",
"15 - 7" => "8",
_ => "Unable to evaluate",
};
Ok(json!({ "result": result, "expression": expr }))
},
)) as Arc<dyn Tool>;
// Create reasoner agent with tools
let reasoner_agent = Arc::new(
LlmAgentBuilder::new("reasoner")
.description("Reasoning agent with tools")
.model(model.clone())
.instruction(
"You are a helpful assistant with access to tools. Use tools when needed to answer questions. \
When you have enough information, provide a final answer without using more tools.",
)
.tool(weather_tool)
.tool(calculator_tool)
.build()?,
);
// Create reasoner node that detects tool usage
let reasoner_node = AgentNode::new(reasoner_agent)
.with_input_mapper(|state| {
let question = state.get("question").and_then(|v| v.as_str()).unwrap_or("");
adk_core::Content::new("user").with_text(question)
})
.with_output_mapper(|events| {
let mut updates = std::collections::HashMap::new();
let mut has_tool_calls = false;
let mut response = String::new();
for event in events {
if let Some(content) = event.content() {
for part in &content.parts {
match part {
Part::FunctionCall { .. } => {
has_tool_calls = true;
}
Part::Text { text } => {
response.push_str(text);
}
_ => {}
}
}
}
}
updates.insert("has_tool_calls".to_string(), json!(has_tool_calls));
updates.insert("response".to_string(), json!(response));
updates
});
// Build ReAct graph with cycle
let graph = StateGraph::with_channels(&["question", "has_tool_calls", "response", "iteration"])
.add_node(reasoner_node)
.add_node_fn("counter", |ctx| async move {
let i = ctx.get("iteration").and_then(|v| v.as_i64()).unwrap_or(0);
Ok(NodeOutput::new().with_update("iteration", json!(i + 1)))
})
.add_edge(START, "counter")
.add_edge("counter", "reasoner")
.add_conditional_edges(
"reasoner",
|state| {
let has_tools = state.get("has_tool_calls").and_then(|v| v.as_bool()).unwrap_or(false);
let iteration = state.get("iteration").and_then(|v| v.as_i64()).unwrap_or(0);
// Safety limit
if iteration >= 5 { return END.to_string(); }
if has_tools {
"counter".to_string() // Loop back for more reasoning
} else {
END.to_string() // Done - final answer
}
},
[("counter", "counter"), (END, END)],
)
.compile()?
.with_recursion_limit(10);
// Test the ReAct agent
let mut input = State::new();
input.insert("question".to_string(), json!("What's the weather in Paris and what's 15 + 25?"));
let result = graph.invoke(input, ExecutionConfig::new("react-1")).await?;
println!("Final answer: {}", result.get("response").and_then(|v| v.as_str()).unwrap_or(""));
println!("Iterations: {}", result.get("iteration").and_then(|v| v.as_i64()).unwrap_or(0));
Ok(())
}
예시 흐름:
질문: "What's the weather in Paris and what's 15 + 25?"
반복 1:
추론자: "날씨 정보와 계산이 필요합니다."
→ get_weather(location="Paris") 및 calculator(expression="15 + 25") 호출
→ has_tool_calls = true → 루프 재개
반복 2:
추론자: "결과에 따르면: 파리는 72°F이고 맑으며, 15 + 25 = 40입니다."
→ 도구 호출 없음 → has_tool_calls = false → END
최종 답변: "The weather in Paris is 72°F and sunny with 45% humidity.
And 15 + 25 equals 40."
AgentNode
모든 ADK Agent (일반적으로 LlmAgent)를 그래프 노드로 래핑합니다:
let node = AgentNode::new(llm_agent)
.with_input_mapper(|state| {
// 그래프 상태를 에이전트 입력 Content로 변환
let text = state.get("input").and_then(|v| v.as_str()).unwrap_or("");
adk_core::Content::new("user").with_text(text)
})
.with_output_mapper(|events| {
// 에이전트 이벤트를 상태 업데이트로 변환
let mut updates = std::collections::HashMap::new();
for event in events {
if let Some(content) = event.content() {
let text: String = content.parts.iter()
.filter_map(|p| p.text())
.collect::<Vec<_>>()
.join("");
updates.insert("output".to_string(), json!(text));
}
}
updates
});
함수 노드
상태를 처리하는 간단한 async 함수:
.node_fn("process", |ctx| async move {
let input = ctx.state.get("input").unwrap();
let output = process_data(input).await?;
Ok(NodeOutput::new().with_update("output", output))
})
엣지 유형
정적 엣지
노드 간의 직접적인 연결:
.edge(START, "first_node")
.edge("first_node", "second_node")
.edge("second_node", END)
조건부 엣지
상태에 기반한 동적 라우팅:
.conditional_edge(
"router",
|state| {
match state.get("next").and_then(|v| v.as_str()) {
Some("research") => "research_node".to_string(),
Some("write") => "write_node".to_string(),
_ => END.to_string(),
}
},
[
("research_node", "research_node"),
("write_node", "write_node"),
(END, END),
],
)
라우터 헬퍼
일반적인 패턴에 내장 라우터를 사용합니다:
use adk_graph::edge::Router;
// 상태 필드 값에 따라 라우팅
.conditional_edge("classifier", Router::by_field("sentiment"), [
("positive", "positive_handler"),
("negative", "negative_handler"),
("neutral", "neutral_handler"),
])
// 부울 필드에 따라 라우팅
.conditional_edge("check", Router::by_bool("approved"), [
("true", "execute"),
("false", "reject"),
])
// 반복 제한
.conditional_edge("loop", Router::max_iterations("count", 5), [
("continue", "process"),
("done", END),
])
병렬 실행
단일 노드에서 시작하는 여러 엣지가 병렬로 실행됩니다:
let agent = GraphAgent::builder("parallel_processor")
.channels(&["input", "translation", "summary", "analysis"])
.node(translator_node)
.node(summarizer_node)
.node(analyzer_node)
.node(combiner_node)
// 세 가지 모두 동시에 시작
.edge(START, "translator")
.edge(START, "summarizer")
.edge(START, "analyzer")
// 모든 작업이 완료될 때까지 기다린 후 결합
.edge("translator", "combiner")
.edge("summarizer", "combiner")
.edge("analyzer", "combiner")
.edge("combiner", END)
.build()?;
순환 그래프 (ReAct 패턴)
순환을 사용하여 반복적 추론 에이전트를 구축합니다:
use adk_core::Part;
// 도구를 사용하여 에이전트 생성
let reasoner = Arc::new(
LlmAgentBuilder::new("reasoner")
.model(model)
.instruction("Use tools to answer questions. Provide final answer when done.")
.tool(search_tool)
.tool(calculator_tool)
.build()?
);
let reasoner_node = AgentNode::new(reasoner)
.with_input_mapper(|state| {
let question = state.get("question").and_then(|v| v.as_str()).unwrap_or("");
adk_core::Content::new("user").with_text(question)
})
.with_output_mapper(|events| {
let mut updates = std::collections::HashMap::new();
let mut has_tool_calls = false;
let mut response = String::new();
for event in events {
if let Some(content) = event.content() {
for part in &content.parts {
match part {
Part::FunctionCall { name, .. } => {
has_tool_calls = true;
}
Part::Text { text } => {
response.push_str(text);
}
_ => {}
}
}
}
}
updates.insert("has_tool_calls".to_string(), json!(has_tool_calls));
updates.insert("response".to_string(), json!(response));
updates
});
// 순환을 사용하여 그래프 구축
let react_agent = StateGraph::with_channels(&["question", "has_tool_calls", "response", "iteration"])
.add_node(reasoner_node)
.add_node_fn("counter", |ctx| async move {
let i = ctx.get("iteration").and_then(|v| v.as_i64()).unwrap_or(0);
Ok(NodeOutput::new().with_update("iteration", json!(i + 1)))
})
.add_edge(START, "counter")
.add_edge("counter", "reasoner")
.add_conditional_edges(
"reasoner",
|state| {
let has_tools = state.get("has_tool_calls").and_then(|v| v.as_bool()).unwrap_or(false);
let iteration = state.get("iteration").and_then(|v| v.as_i64()).unwrap_or(0);
// 안전 제한
if iteration >= 5 { return END.to_string(); }
if has_tools {
"counter".to_string() // 다시 루프
} else {
END.to_string() // 완료
}
},
[("counter", "counter"), (END, END)],
)
.compile()?
.with_recursion_limit(10);
멀티 에이전트 감독관
작업을 전문 에이전트에게 라우팅합니다:
// Create supervisor agent
let supervisor = Arc::new(
LlmAgentBuilder::new("supervisor")
.model(model.clone())
.instruction("Route tasks to: researcher, writer, or coder. Reply with agent name only.")
.build()?
);
let supervisor_node = AgentNode::new(supervisor)
.with_output_mapper(|events| {
let mut updates = std::collections::HashMap::new();
for event in events {
if let Some(content) = event.content() {
let text: String = content.parts.iter()
.filter_map(|p| p.text())
.collect::<Vec<_>>()
.join("")
.to_lowercase();
let next = if text.contains("researcher") { "researcher" }
else if text.contains("writer") { "writer" }
else if text.contains("coder") { "coder" }
else { "done" };
updates.insert("next_agent".to_string(), json!(next));
}
}
updates
});
// Build supervisor graph
let graph = StateGraph::with_channels(&["task", "next_agent", "research", "content", "code"])
.add_node(supervisor_node)
.add_node(researcher_node)
.add_node(writer_node)
.add_node(coder_node)
.add_edge(START, "supervisor")
.add_conditional_edges(
"supervisor",
Router::by_field("next_agent"),
[
("researcher", "researcher"),
("writer", "writer"),
("coder", "coder"),
("done", END),
],
)
// Agents report back to supervisor
.add_edge("researcher", "supervisor")
.add_edge("writer", "supervisor")
.add_edge("coder", "supervisor")
.compile()?;
상태 관리
리듀서와 함께하는 상태 스키마
상태 업데이트가 병합되는 방식을 제어합니다:
let schema = StateSchema::builder()
.channel("current_step") // Overwrite (default)
.list_channel("messages") // Append to list
.channel_with_reducer("count", Reducer::Sum) // Sum values
.channel_with_reducer("data", Reducer::Custom(Arc::new(|old, new| {
// Custom merge logic
merge_json(old, new)
})))
.build();
let agent = GraphAgent::builder("stateful")
.state_schema(schema)
// ... nodes and edges
.build()?;
리듀서 타입
| Reducer | 동작 |
|---|---|
Overwrite | 이전 값을 새 값으로 교체 (기본값) |
Append | 목록에 추가 |
Sum | 숫자 값 더하기 |
Custom | 사용자 정의 병합 함수 |
체크포인트
결함 허용 및 Human-in-the-Loop를 위한 영구 상태를 활성화합니다:
인메모리 (개발용)
use adk_graph::checkpoint::MemoryCheckpointer;
let checkpointer = Arc::new(MemoryCheckpointer::new());
let graph = StateGraph::with_channels(&["task", "result"])
// ... nodes and edges
.compile()?
.with_checkpointer_arc(checkpointer.clone());
SQLite (프로덕션용)
use adk_graph::checkpoint::SqliteCheckpointer;
let checkpointer = SqliteCheckpointer::new("checkpoints.db").await?;
let graph = StateGraph::with_channels(&["task", "result"])
// ... nodes and edges
.compile()?
.with_checkpointer(checkpointer);
체크포인트 기록 (타임 트래블)
// List all checkpoints for a thread
let checkpoints = checkpointer.list("thread-id").await?;
for cp in checkpoints {
println!("Step {}: {:?}", cp.step, cp.state.get("status"));
}
// Load a specific checkpoint
if let Some(checkpoint) = checkpointer.load_by_id(&checkpoint_id).await? {
println!("State at step {}: {:?}", checkpoint.step, checkpoint.state);
}
Human-in-the-Loop
동적 인터럽트를 사용하여 사람의 승인을 위한 실행을 일시 중지합니다.
use adk_graph::{error::GraphError, node::NodeOutput};
// Planner agent assesses risk
let planner_node = AgentNode::new(planner_agent)
.with_output_mapper(|events| {
let mut updates = std::collections::HashMap::new();
for event in events {
if let Some(content) = event.content() {
let text: String = content.parts.iter()
.filter_map(|p| p.text())
.collect::<Vec<_>>()
.join("");
// Extract risk level from LLM response
let risk = if text.to_lowercase().contains("risk: high") { "high" }
else if text.to_lowercase().contains("risk: medium") { "medium" }
else { "low" };
updates.insert("plan".to_string(), json!(text));
updates.insert("risk_level".to_string(), json!(risk));
}
}
updates
});
// Review node with dynamic interrupt
let graph = StateGraph::with_channels(&["task", "plan", "risk_level", "approved", "result"])
.add_node(planner_node)
.add_node(executor_node)
.add_node_fn("review", |ctx| async move {
let risk = ctx.get("risk_level").and_then(|v| v.as_str()).unwrap_or("low");
let approved = ctx.get("approved").and_then(|v| v.as_bool());
// Already approved - continue
if approved == Some(true) {
return Ok(NodeOutput::new());
}
// High/medium risk - interrupt for approval
if risk == "high" || risk == "medium" {
return Ok(NodeOutput::interrupt_with_data(
&format!("{} RISK: Human approval required", risk.to_uppercase()),
json!({
"plan": ctx.get("plan"),
"risk_level": risk,
"action": "Set 'approved' to true to continue"
})
));
}
// Low risk - auto-approve
Ok(NodeOutput::new().with_update("approved", json!(true)))
})
.add_edge(START, "planner")
.add_edge("planner", "review")
.add_edge("review", "executor")
.add_edge("executor", END)
.compile()?
.with_checkpointer_arc(checkpointer.clone());
// Execute - may pause for approval
let thread_id = "task-001";
let result = graph.invoke(input, ExecutionConfig::new(thread_id)).await;
match result {
Err(GraphError::Interrupted(interrupt)) => {
println!("*** EXECUTION PAUSED ***");
println!("Reason: {}", interrupt.interrupt);
println!("Plan awaiting approval: {:?}", interrupt.state.get("plan"));
// Human reviews and approves...
// Update state with approval
graph.update_state(thread_id, [("approved".to_string(), json!(true))]).await?;
// Resume execution
let final_result = graph.invoke(State::new(), ExecutionConfig::new(thread_id)).await?;
println!("Final result: {:?}", final_result.get("result"));
}
Ok(result) => {
println!("Completed without interrupt: {:?}", result);
}
Err(e) => {
println!("Error: {}", e);
}
}
Static Interrupts
필수 일시 중지 지점을 위해 interrupt_before 또는 interrupt_after를 사용합니다.
let graph = StateGraph::with_channels(&["task", "plan", "result"])
.add_node(planner_node)
.add_node(executor_node)
.add_edge(START, "planner")
.add_edge("planner", "executor")
.add_edge("executor", END)
.compile()?
.with_interrupt_before(&["executor"]); // Always pause before execution
스트리밍 실행
그래프가 실행되는 동안 이벤트를 스트리밍합니다:
use futures::StreamExt;
use adk_graph::stream::StreamMode;
let stream = agent.stream(input, config, StreamMode::Updates);
while let Some(event) = stream.next().await {
match event? {
StreamEvent::NodeStart(name) => println!("Starting: {}", name),
StreamEvent::Updates { node, updates } => {
println!("{} updated state: {:?}", node, updates);
}
StreamEvent::NodeEnd(name) => println!("Completed: {}", name),
StreamEvent::Done(state) => println!("Final state: {:?}", state),
_ => {}
}
}
스트림 모드
| 모드 | 설명 |
|---|---|
Values | 각 노드 후에 전체 상태를 스트리밍합니다. |
Updates | 상태 변경만 스트리밍합니다. |
Messages | 메시지 유형 업데이트를 스트리밍합니다. |
Debug | 모든 내부 이벤트를 스트리밍합니다. |
ADK 통합
GraphAgent는 ADK Agent 트레이트를 구현하므로 다음 항목들과 함께 작동합니다:
- Runner: 표준 실행을 위해
adk-runner와 함께 사용 - Callbacks: before/after 콜백에 대한 완전한 지원
- Sessions: 대화 기록을 위해
adk-session과 함께 작동 - Streaming: ADK
EventStream을 반환합니다.
use adk_runner::Runner;
let graph_agent = GraphAgent::builder("workflow")
.before_agent_callback(|ctx| async {
println!("Starting graph execution for session: {}", ctx.session_id());
Ok(())
})
.after_agent_callback(|ctx, event| async {
if let Some(content) = event.content() {
println!("Graph completed with content");
}
Ok(())
})
// ... graph definition
.build()?;
// GraphAgent implements Agent trait - use with Launcher or Runner
// See adk-runner README for Runner configuration
예시
모든 예시는 AgentNode와 실제 LLM 통합을 사용합니다:
# Parallel LLM agents with before/after callbacks
cargo run --example graph_agent
# Sequential multi-agent pipeline (extractor → analyzer → formatter)
cargo run --example graph_workflow
# LLM-based sentiment classification and conditional routing
cargo run --example graph_conditional
# ReAct pattern with tools and cyclic execution
cargo run --example graph_react
# Multi-agent supervisor routing to specialists
cargo run --example graph_supervisor
# Human-in-the-loop with risk-based interrupts
cargo run --example graph_hitl
# Checkpointing and time travel debugging
cargo run --example graph_checkpoint
LangGraph와의 비교
| 기능 | LangGraph | adk-graph |
|---|---|---|
| State Management | TypedDict + Reducers | StateSchema + Reducers |
| Execution Model | Pregel super-steps | Pregel super-steps |
| Checkpointing | Memory, SQLite, Postgres | Memory, SQLite |
| Human-in-Loop | interrupt_before/after | interrupt_before/after + dynamic |
| Streaming | 5 modes | 5가지 모드 |
| Cycles | Native support | 네이티브 지원 |
| Type Safety | Python typing | Rust type system |
| LLM Integration | LangChain | AgentNode + ADK agents |
이전: ← 다중 에이전트 시스템 | 다음: 실시간 에이전트 →