그래프 에이전트

네이티브 ADK-Rust 통합과 LangGraph 스타일 오케스트레이션을 사용하여 복잡하고 상태 저장 워크플로우를 구축하세요.

개요

GraphAgent는 노드와 엣지로 구성된 방향성 그래프로 워크플로우를 정의할 수 있게 해주며, 다음을 지원합니다:

  • AgentNode: 커스텀 입/출력 매퍼를 사용하여 LLM 에이전트를 그래프 노드로 래핑합니다.
  • 순환 워크플로우: 반복 및 반복적 추론(ReAct pattern)에 대한 네이티브 지원.
  • 조건부 라우팅: 상태에 기반한 동적 엣지 라우팅.
  • 상태 관리: 리듀서(덮어쓰기, 추가, 합계, 커스텀)가 있는 타입 지정 상태.
  • 체크포인트: 내결함성과 Human-in-the-loop를 위한 영구 상태.
  • 스트리밍: 다중 스트림 모드(값, 업데이트, 메시지, 디버그).

adk-graph 크레이트는 복잡하고 상태 저장 Agent 워크플로우를 구축하기 위한 LangGraph 스타일 워크플로우 오케스트레이션을 제공합니다. 이는 ADK의 Agent 시스템과 완벽한 호환성을 유지하면서 ADK-Rust 생태계에 그래프 기반 워크플로우 기능을 제공합니다.

주요 이점:

  • 시각적 워크플로우 설계: 직관적인 노드 및 엣지 그래프로 복잡한 로직을 정의합니다.
  • 병렬 실행: 여러 노드가 동시에 실행되어 성능을 향상시킬 수 있습니다.
  • 상태 영속성: 내결함성과 Human-in-the-loop를 위한 내장 체크포인트 기능.
  • LLM 통합: ADK Agent를 그래프 노드로 래핑하는 네이티브 지원.
  • 유연한 라우팅: 정적 엣지, 조건부 라우팅 및 동적 의사 결정.

만들게 될 것

이 가이드에서는 번역과 요약을 병렬로 실행하는 텍스트 처리 파이프라인을 만들게 됩니다:

                        ┌─────────────────────┐
       User Input       │                     │
      ────────────────▶ │       START         │
                        │                     │
                        └──────────┬──────────┘
                                   │
                   ┌───────────────┴───────────────┐
                   │                               │
                   ▼                               ▼
        ┌──────────────────┐            ┌──────────────────┐
        │   TRANSLATOR     │            │   SUMMARIZER     │
        │                  │            │                  │
        │  🇫🇷 French       │            │  📝 One sentence │
        │     Translation  │            │     Summary      │
        └─────────┬────────┘            └─────────┬────────┘
                  │                               │
                  └───────────────┬───────────────┘
                                  │
                                  ▼
                        ┌─────────────────────┐
                        │      COMBINE        │
                        │                     │
                        │  📋 Merge Results   │
                        └──────────┬──────────┘
                                   │
                                   ▼
                        ┌─────────────────────┐
                        │        END          │
                        │                     │
                        │   ✅ Complete       │
                        └─────────────────────┘

핵심 개념:

  • Nodes - 작업을 수행하는 처리 단위 (LLM 에이전트, 함수 또는 사용자 정의 로직)
  • Edges - 노드 간의 제어 흐름 (정적 연결 또는 조건부 라우팅)
  • State - 그래프를 통해 흐르고 노드 간에 유지되는 공유 데이터
  • Parallel Execution - 더 나은 성능을 위해 여러 노드가 동시에 실행될 수 있음

핵심 구성 요소 이해하기

🔧 Nodes: 작업자 노드는 실제 작업이 일어나는 곳입니다. 각 노드는 다음을 수행할 수 있습니다:

  • AgentNode: 자연어를 처리하기 위해 LLM 에이전트를 감쌉니다.
  • Function Node: 데이터 처리를 위해 사용자 정의 Rust 코드를 실행합니다.
  • Built-in Nodes: 카운터 또는 유효성 검사기 같은 사전 정의된 로직을 사용합니다.

노드를 조립 라인의 전문 작업자라고 생각해보세요. 각 작업자는 특정 작업과 전문성을 가집니다.

🔀 Edges: 흐름 제어 에지는 그래프에서 실행이 어떻게 이동하는지 결정합니다:

  • Static Edges: 직접 연결 (A → B → C)
  • Conditional Edges: 상태에 기반한 동적 라우팅 (if sentiment == "positive" → positive_handler)
  • Parallel Edges: 한 노드에서 여러 경로 (START → [translator, summarizer])

에지는 작업 흐름을 지시하는 교통 신호등 및 도로 표지판과 같습니다.

💾 State: 공유 메모리 State는 모든 노드가 읽고 쓸 수 있는 키-값 저장소입니다:

  • Input Data: 그래프에 공급되는 초기 정보
  • Intermediate Results: 한 노드의 출력이 다른 노드의 입력이 됨
  • Final Output: 모든 처리 후 완료된 결과

State는 노드들이 다른 노드들이 사용할 정보를 남길 수 있는 공유 화이트보드 역할을 합니다.

⚡ Parallel Execution: 속도 향상 여러 에지가 한 노드에서 나갈 때, 해당 대상 노드들은 동시에 실행됩니다:

  • Faster Processing: 독립적인 작업이 동시에 실행됨
  • Resource Efficiency: CPU 및 I/O의 더 나은 활용
  • Scalability: 선형적인 속도 저하 없이 더 복잡한 워크플로 처리

이것은 줄을 서서 기다리는 대신 여러 작업자가 작업의 여러 부분을 동시에 처리하는 것과 같습니다.


빠른 시작

1. 프로젝트 생성

cargo new graph_demo
cd graph_demo

Cargo.toml에 의존성을 추가합니다:

[dependencies]
adk-graph = { version = "0.2", features = ["sqlite"] }
adk-agent = "0.2"
adk-model = "0.2"
adk-core = "0.2"
tokio = { version = "1", features = ["full"] }
dotenvy = "0.15"
serde_json = "1.0"

API 키를 사용하여 .env 파일을 생성합니다:

echo 'GOOGLE_API_KEY=your-api-key' > .env

2. 병렬 처리 예시

다음은 텍스트를 병렬로 처리하는 완전한 작동 예시입니다:

use adk_agent::LlmAgentBuilder;
use adk_graph::{
    agent::GraphAgent,
    edge::{END, START},
    node::{AgentNode, ExecutionConfig, NodeOutput},
    state::State,
};
use adk_model::GeminiModel;
use serde_json::json;
use std::sync::Arc;

#[tokio::main]
async fn main() -> anyhow::Result<()> {
    dotenvy::dotenv().ok();
    let api_key = std::env::var("GOOGLE_API_KEY")?;
    let model = Arc::new(GeminiModel::new(&api_key, "gemini-2.0-flash")?);

    // Create specialized LLM agents
    let translator_agent = Arc::new(
        LlmAgentBuilder::new("translator")
            .description("Translates text to French")
            .model(model.clone())
            .instruction("Translate the input text to French. Only output the translation.")
            .build()?,
    );

    let summarizer_agent = Arc::new(
        LlmAgentBuilder::new("summarizer")
            .description("Summarizes text")
            .model(model.clone())
            .instruction("Summarize the input text in one sentence.")
            .build()?,
    );

    // Wrap agents as graph nodes with input/output mappers
    let translator_node = AgentNode::new(translator_agent)
        .with_input_mapper(|state| {
            let text = state.get("input").and_then(|v| v.as_str()).unwrap_or("");
            adk_core::Content::new("user").with_text(text)
        })
        .with_output_mapper(|events| {
            let mut updates = std::collections::HashMap::new();
            for event in events {
                if let Some(content) = event.content() {
                    let text: String = content.parts.iter()
                        .filter_map(|p| p.text())
                        .collect::<Vec<_>>()
                        .join("");
                    if !text.is_empty() {
                        updates.insert("translation".to_string(), json!(text));
                    }
                }
            }
            updates
        });

    let summarizer_node = AgentNode::new(summarizer_agent)
        .with_input_mapper(|state| {
            let text = state.get("input").and_then(|v| v.as_str()).unwrap_or("");
            adk_core::Content::new("user").with_text(text)
        })
        .with_output_mapper(|events| {
            let mut updates = std::collections::HashMap::new();
            for event in events {
                if let Some(content) = event.content() {
                    let text: String = content.parts.iter()
                        .filter_map(|p| p.text())
                        .collect::<Vec<_>>()
                        .join("");
                    if !text.is_empty() {
                        updates.insert("summary".to_string(), json!(text));
                    }
                }
            }
            updates
        });

    // Build the graph with parallel execution
    let agent = GraphAgent::builder("text_processor")
        .description("Processes text with translation and summarization in parallel")
        .channels(&["input", "translation", "summary", "result"])
        .node(translator_node)
        .node(summarizer_node)
        .node_fn("combine", |ctx| async move {
            let translation = ctx.get("translation").and_then(|v| v.as_str()).unwrap_or("N/A");
            let summary = ctx.get("summary").and_then(|v| v.as_str()).unwrap_or("N/A");

            let result = format!(
                "=== Processing Complete ===\n\n\
                French Translation:\n{}\n\n\
                Summary:\n{}",
                translation, summary
            );

            Ok(NodeOutput::new().with_update("result", json!(result)))
        })
        // Parallel execution: both nodes start simultaneously
        .edge(START, "translator")
        .edge(START, "summarizer")
        .edge("translator", "combine")
        .edge("summarizer", "combine")
        .edge("combine", END)
        .build()?;

    // Execute the graph
    let mut input = State::new();
    input.insert("input".to_string(), json!("AI is transforming how we work and live."));

    let result = agent.invoke(input, ExecutionConfig::new("thread-1")).await?;
    println!("{}", result.get("result").and_then(|v| v.as_str()).unwrap_or(""));

    Ok(())
}

예시 출력:

=== Processing Complete ===

French Translation:
L'IA transforme notre façon de travailler et de vivre.

Summary:
AI is revolutionizing work and daily life through technological transformation.

Graph 실행 방식

큰 그림

GraphAgent는 슈퍼 스텝으로 실행됩니다. 모든 준비된 노드는 병렬로 실행되며, 다음 스텝으로 진행하기 전에 그래프는 모든 노드가 완료될 때까지 기다립니다:

Step 1: START ──┬──▶ translator (running)
                └──▶ summarizer (running)
                
                ⏳ Wait for both to complete...
                
Step 2: translator ──┬──▶ combine (running)
        summarizer ──┘
        
                ⏳ Wait for combine to complete...
                
Step 3: combine ──▶ END ✅

노드를 통한 상태 흐름

각 노드는 공유 상태에서 읽고 쓸 수 있습니다:

┌─────────────────────────────────────────────────────────────────────┐
│ STEP 1: Initial state                                               │
├─────────────────────────────────────────────────────────────────────┤
│                                                                     │
│   State: { "input": "AI is transforming how we work" }             │
│                                                                     │
│                              ↓                                      │
│                                                                     │
│   ┌──────────────────┐              ┌──────────────────┐           │
│   │   translator     │              │   summarizer     │           │
│   │  reads "input"   │              │  reads "input"   │           │
│   └──────────────────┘              └──────────────────┘           │
│                                                                     │
└─────────────────────────────────────────────────────────────────────┘
                              ↓
┌─────────────────────────────────────────────────────────────────────┐
│ STEP 2: After parallel execution                                    │
├─────────────────────────────────────────────────────────────────────┤
│                                                                     │
│   State: {                                                          │
│     "input": "AI is transforming how we work",                     │
│     "translation": "L'IA transforme notre façon de travailler",    │
│     "summary": "AI is revolutionizing work through technology"     │
│   }                                                                 │
│                                                                     │
│                              ↓                                      │
│                                                                     │
│   ┌──────────────────────────────────────┐                         │
│   │           combine                    │                         │
│   │  reads "translation" + "summary"     │                         │
│   │  writes "result"                     │                         │
│   └──────────────────────────────────────┘                         │
│                                                                     │
└─────────────────────────────────────────────────────────────────────┘
                              ↓
┌─────────────────────────────────────────────────────────────────────┐
│ STEP 3: Final state                                                 │
├─────────────────────────────────────────────────────────────────────┤
│                                                                     │
│   State: {                                                          │
│     "input": "AI is transforming how we work",                     │
│     "translation": "L'IA transforme notre façon de travailler",    │
│     "summary": "AI is revolutionizing work through technology",    │
│     "result": "=== Processing Complete ===\n\nFrench..."          │
│   }                                                                 │
│                                                                     │
└─────────────────────────────────────────────────────────────────────┘

작동 원리

구성 요소역할
AgentNodeLLM Agent를 input/output mappers로 래핑합니다.
input_mapper상태를 Agent 입력 Content로 변환합니다.
output_mapperAgent 이벤트를 상태 업데이트로 변환합니다.
channels그래프가 사용할 상태 필드를 선언합니다.
edge()노드 간의 실행 흐름을 정의합니다.
ExecutionConfig체크포인팅을 위한 스레드 ID를 제공합니다.

LLM 분류를 사용한 조건부 라우팅

LLM이 실행 경로를 결정하는 스마트 라우팅 시스템을 구축합니다:

시각화: 감성 기반 라우팅

                        ┌─────────────────────┐
       사용자 피드백    │                     │
      ────────────────▶ │      분류기         │
                        │  🧠 톤 분석         │
                        └──────────┬──────────┘
                                   │
                   ┌───────────────┼───────────────┐
                   │               │               │
                   ▼               ▼               ▼
        ┌──────────────────┐ ┌──────────────────┐ ┌──────────────────┐
        │     긍정적       │ │     부정적       │ │     중립적       │
        │                  │ │                  │ │                  │
        │  😊 감사합니다!  │ │  😔 사과하기     │ │  😐 추가 질문하기 │
        │     축하하기     │ │   문제 해결 돕기 │ │  추가 질문하기    │
        └──────────────────┘ └──────────────────┘ └──────────────────┘

전체 예시 코드

use adk_agent::LlmAgentBuilder;
use adk_graph::{
    edge::{END, Router, START},
    graph::StateGraph,
    node::{AgentNode, ExecutionConfig},
    state::State,
};
use adk_model::GeminiModel;
use serde_json::json;
use std::sync::Arc;

#[tokio::main]
async fn main() -> anyhow::Result<()> {
    dotenvy::dotenv().ok();
    let api_key = std::env::var("GOOGLE_API_KEY")?;
    let model = Arc::new(GeminiModel::new(&api_key, "gemini-2.0-flash")?);

    // Create classifier agent
    let classifier_agent = Arc::new(
        LlmAgentBuilder::new("classifier")
            .description("Classifies text sentiment")
            .model(model.clone())
            .instruction(
                "You are a sentiment classifier. Analyze the input text and respond with \
                ONLY one word: 'positive', 'negative', or 'neutral'. Nothing else.",
            )
            .build()?,
    );

    // Create response agents for each sentiment
    let positive_agent = Arc::new(
        LlmAgentBuilder::new("positive")
            .description("Handles positive feedback")
            .model(model.clone())
            .instruction(
                "You are a customer success specialist. The customer has positive feedback. \
                Express gratitude, reinforce the positive experience, and suggest ways to \
                share their experience. Be warm and appreciative. Keep response under 3 sentences.",
            )
            .build()?,
    );

    let negative_agent = Arc::new(
        LlmAgentBuilder::new("negative")
            .description("Handles negative feedback")
            .model(model.clone())
            .instruction(
                "You are a customer support specialist. The customer has a complaint. \
                Acknowledge their frustration, apologize sincerely, and offer help. \
                Be empathetic. Keep response under 3 sentences.",
            )
            .build()?,
    );

    let neutral_agent = Arc::new(
        LlmAgentBuilder::new("neutral")
            .description("Handles neutral feedback")
            .model(model.clone())
            .instruction(
                "You are a customer service representative. The customer has neutral feedback. \
                Ask clarifying questions to better understand their needs. Be helpful and curious. \
                Keep response under 3 sentences.",
            )
            .build()?,
    );

    // Create AgentNodes with mappers
    let classifier_node = AgentNode::new(classifier_agent)
        .with_input_mapper(|state| {
            let text = state.get("feedback").and_then(|v| v.as_str()).unwrap_or("");
            adk_core::Content::new("user").with_text(text)
        })
        .with_output_mapper(|events| {
            let mut updates = std::collections::HashMap::new();
            for event in events {
                if let Some(content) = event.content() {
                    let text: String = content.parts.iter()
                        .filter_map(|p| p.text())
                        .collect::<Vec<_>>()
                        .join("")
                        .to_lowercase()
                        .trim()
                        .to_string();
                    
                    let sentiment = if text.contains("positive") { "positive" }
                        else if text.contains("negative") { "negative" }
                        else { "neutral" };
                    
                    updates.insert("sentiment".to_string(), json!(sentiment));
                }
            }
            updates
        });

    // Response nodes (similar pattern for each)
    let positive_node = AgentNode::new(positive_agent)
        .with_input_mapper(|state| {
            let text = state.get("feedback").and_then(|v| v.as_str()).unwrap_or("");
            adk_core::Content::new("user").with_text(text)
        })
        .with_output_mapper(|events| {
            let mut updates = std::collections::HashMap::new();
            for event in events {
                if let Some(content) = event.content() {
                    let text: String = content.parts.iter()
                        .filter_map(|p| p.text())
                        .collect::<Vec<_>>()
                        .join("");
                    updates.insert("response".to_string(), json!(text));
                }
            }
            updates
        });

    // Build graph with conditional routing
    let graph = StateGraph::with_channels(&["feedback", "sentiment", "response"])
        .add_node(classifier_node)
        .add_node(positive_node)
        // ... add negative_node and neutral_node similarly
        .add_edge(START, "classifier")
        .add_conditional_edges(
            "classifier",
            Router::by_field("sentiment"),  // Route based on sentiment field
            [
                ("positive", "positive"),
                ("negative", "negative"),
                ("neutral", "neutral"),
            ],
        )
        .add_edge("positive", END)
        .add_edge("negative", END)
        .add_edge("neutral", END)
        .compile()?;

    // Test with different feedback
    let mut input = State::new();
    input.insert("feedback".to_string(), json!("Your product is amazing! I love it!"));

    let result = graph.invoke(input, ExecutionConfig::new("feedback-1")).await?;
    println!("Sentiment: {}", result.get("sentiment").and_then(|v| v.as_str()).unwrap_or(""));
    println!("Response: {}", result.get("response").and_then(|v| v.as_str()).unwrap_or(""));

    Ok(())
}

예시 흐름:

입력: "귀하의 제품은 정말 놀랍습니다! 마음에 듭니다!"
       ↓
분류기: "positive"
       ↓
긍정 에이전트: "멋진 피드백 정말 감사합니다! 
                저희 제품을 마음에 들어 해주셔서 기쁩니다. 
                다른 분들을 돕기 위해 리뷰를 남겨주시겠어요?"

ReAct 패턴: 추론 + 행동

복잡한 문제를 해결하기 위해 도구를 반복적으로 사용할 수 있는 에이전트를 구축합니다:

시각 자료: ReAct 사이클

                        ┌─────────────────────┐
       사용자 질문    │                     │
      ────────────────▶ │       추론자        │
                        │  🧠 생각 + 행동     │
                        └──────────┬──────────┘
                                   │
                                   ▼
                        ┌─────────────────────┐
                        │   도구 호출이 있는가?   │
                        │                     │
                        └──────────┬──────────┘
                                   │
                   ┌───────────────┴───────────────┐
                   │                               │
                   ▼                               ▼
        ┌──────────────────┐            ┌──────────────────┐
        │        예        │            │       아니오      │
        │                  │            │                  │
        │ 🔄 추론자로 돌아가기 │            │ ✅ 최종 답변 종료 │
        │                  │            │                  │
        └─────────┬────────┘            └──────────────────┘
                  │
                  └─────────────────┐
                                    │
                                    ▼
                        ┌─────────────────────┐
                        │       추론자        │
                        │  🧠 생각 + 행동     │
                        │    (다음 반복)      │
                        └─────────────────────┘

ReAct 전체 예시

use adk_agent::LlmAgentBuilder;
use adk_core::{Part, Tool};
use adk_graph::{
    edge::{END, START},
    graph::StateGraph,
    node::{AgentNode, ExecutionConfig, NodeOutput},
    state::State,
};
use adk_model::GeminiModel;
use adk_tool::FunctionTool;
use serde_json::json;
use std::sync::Arc;

#[tokio::main]
async fn main() -> anyhow::Result<()> {
    dotenvy::dotenv().ok();
    let api_key = std::env::var("GOOGLE_API_KEY")?;
    let model = Arc::new(GeminiModel::new(&api_key, "gemini-2.0-flash")?);

    // Create tools
    let weather_tool = Arc::new(FunctionTool::new(
        "get_weather",
        "Get the current weather for a location. Takes a 'location' parameter (city name).",
        |_ctx, args| async move {
            let location = args.get("location").and_then(|v| v.as_str()).unwrap_or("unknown");
            Ok(json!({
                "location": location,
                "temperature": "72°F",
                "condition": "Sunny",
                "humidity": "45%"
            }))
        },
    )) as Arc<dyn Tool>;

    let calculator_tool = Arc::new(FunctionTool::new(
        "calculator",
        "Perform mathematical calculations. Takes an 'expression' parameter (string).",
        |_ctx, args| async move {
            let expr = args.get("expression").and_then(|v| v.as_str()).unwrap_or("0");
            let result = match expr {
                "2 + 2" => "4",
                "10 * 5" => "50",
                "100 / 4" => "25",
                "15 - 7" => "8",
                _ => "Unable to evaluate",
            };
            Ok(json!({ "result": result, "expression": expr }))
        },
    )) as Arc<dyn Tool>;

    // Create reasoner agent with tools
    let reasoner_agent = Arc::new(
        LlmAgentBuilder::new("reasoner")
            .description("Reasoning agent with tools")
            .model(model.clone())
            .instruction(
                "You are a helpful assistant with access to tools. Use tools when needed to answer questions. \
                When you have enough information, provide a final answer without using more tools.",
            )
            .tool(weather_tool)
            .tool(calculator_tool)
            .build()?,
    );

    // Create reasoner node that detects tool usage
    let reasoner_node = AgentNode::new(reasoner_agent)
        .with_input_mapper(|state| {
            let question = state.get("question").and_then(|v| v.as_str()).unwrap_or("");
            adk_core::Content::new("user").with_text(question)
        })
        .with_output_mapper(|events| {
            let mut updates = std::collections::HashMap::new();
            let mut has_tool_calls = false;
            let mut response = String::new();

            for event in events {
                if let Some(content) = event.content() {
                    for part in &content.parts {
                        match part {
                            Part::FunctionCall { .. } => {
                                has_tool_calls = true;
                            }
                            Part::Text { text } => {
                                response.push_str(text);
                            }
                            _ => {}
                        }
                    }
                }
            }

            updates.insert("has_tool_calls".to_string(), json!(has_tool_calls));
            updates.insert("response".to_string(), json!(response));
            updates
        });

    // Build ReAct graph with cycle
    let graph = StateGraph::with_channels(&["question", "has_tool_calls", "response", "iteration"])
        .add_node(reasoner_node)
        .add_node_fn("counter", |ctx| async move {
            let i = ctx.get("iteration").and_then(|v| v.as_i64()).unwrap_or(0);
            Ok(NodeOutput::new().with_update("iteration", json!(i + 1)))
        })
        .add_edge(START, "counter")
        .add_edge("counter", "reasoner")
        .add_conditional_edges(
            "reasoner",
            |state| {
                let has_tools = state.get("has_tool_calls").and_then(|v| v.as_bool()).unwrap_or(false);
                let iteration = state.get("iteration").and_then(|v| v.as_i64()).unwrap_or(0);

                // Safety limit
                if iteration >= 5 { return END.to_string(); }

                if has_tools {
                    "counter".to_string()  // Loop back for more reasoning
                } else {
                    END.to_string()  // Done - final answer
                }
            },
            [("counter", "counter"), (END, END)],
        )
        .compile()?
        .with_recursion_limit(10);

    // Test the ReAct agent
    let mut input = State::new();
    input.insert("question".to_string(), json!("What's the weather in Paris and what's 15 + 25?"));

    let result = graph.invoke(input, ExecutionConfig::new("react-1")).await?;
    println!("Final answer: {}", result.get("response").and_then(|v| v.as_str()).unwrap_or(""));
    println!("Iterations: {}", result.get("iteration").and_then(|v| v.as_i64()).unwrap_or(0));

    Ok(())
}

예시 흐름:

질문: "What's the weather in Paris and what's 15 + 25?"

반복 1:
  추론자: "날씨 정보와 계산이 필요합니다."
  → get_weather(location="Paris") 및 calculator(expression="15 + 25") 호출
  → has_tool_calls = true → 루프 재개

반복 2:
  추론자: "결과에 따르면: 파리는 72°F이고 맑으며, 15 + 25 = 40입니다."
  → 도구 호출 없음 → has_tool_calls = false → END

최종 답변: "The weather in Paris is 72°F and sunny with 45% humidity.
              And 15 + 25 equals 40."

AgentNode

모든 ADK Agent (일반적으로 LlmAgent)를 그래프 노드로 래핑합니다:

let node = AgentNode::new(llm_agent)
    .with_input_mapper(|state| {
        // 그래프 상태를 에이전트 입력 Content로 변환
        let text = state.get("input").and_then(|v| v.as_str()).unwrap_or("");
        adk_core::Content::new("user").with_text(text)
    })
    .with_output_mapper(|events| {
        // 에이전트 이벤트를 상태 업데이트로 변환
        let mut updates = std::collections::HashMap::new();
        for event in events {
            if let Some(content) = event.content() {
                let text: String = content.parts.iter()
                    .filter_map(|p| p.text())
                    .collect::<Vec<_>>()
                    .join("");
                updates.insert("output".to_string(), json!(text));
            }
        }
        updates
    });

함수 노드

상태를 처리하는 간단한 async 함수:

.node_fn("process", |ctx| async move {
    let input = ctx.state.get("input").unwrap();
    let output = process_data(input).await?;
    Ok(NodeOutput::new().with_update("output", output))
})

엣지 유형

정적 엣지

노드 간의 직접적인 연결:

.edge(START, "first_node")
.edge("first_node", "second_node")
.edge("second_node", END)

조건부 엣지

상태에 기반한 동적 라우팅:

.conditional_edge(
    "router",
    |state| {
        match state.get("next").and_then(|v| v.as_str()) {
            Some("research") => "research_node".to_string(),
            Some("write") => "write_node".to_string(),
            _ => END.to_string(),
        }
    },
    [
        ("research_node", "research_node"),
        ("write_node", "write_node"),
        (END, END),
    ],
)

라우터 헬퍼

일반적인 패턴에 내장 라우터를 사용합니다:

use adk_graph::edge::Router;

// 상태 필드 값에 따라 라우팅
.conditional_edge("classifier", Router::by_field("sentiment"), [
    ("positive", "positive_handler"),
    ("negative", "negative_handler"),
    ("neutral", "neutral_handler"),
])

// 부울 필드에 따라 라우팅
.conditional_edge("check", Router::by_bool("approved"), [
    ("true", "execute"),
    ("false", "reject"),
])

// 반복 제한
.conditional_edge("loop", Router::max_iterations("count", 5), [
    ("continue", "process"),
    ("done", END),
])

병렬 실행

단일 노드에서 시작하는 여러 엣지가 병렬로 실행됩니다:

let agent = GraphAgent::builder("parallel_processor")
    .channels(&["input", "translation", "summary", "analysis"])
    .node(translator_node)
    .node(summarizer_node)
    .node(analyzer_node)
    .node(combiner_node)
    // 세 가지 모두 동시에 시작
    .edge(START, "translator")
    .edge(START, "summarizer")
    .edge(START, "analyzer")
    // 모든 작업이 완료될 때까지 기다린 후 결합
    .edge("translator", "combiner")
    .edge("summarizer", "combiner")
    .edge("analyzer", "combiner")
    .edge("combiner", END)
    .build()?;

순환 그래프 (ReAct 패턴)

순환을 사용하여 반복적 추론 에이전트를 구축합니다:

use adk_core::Part;

// 도구를 사용하여 에이전트 생성
let reasoner = Arc::new(
    LlmAgentBuilder::new("reasoner")
        .model(model)
        .instruction("Use tools to answer questions. Provide final answer when done.")
        .tool(search_tool)
        .tool(calculator_tool)
        .build()?
);

let reasoner_node = AgentNode::new(reasoner)
    .with_input_mapper(|state| {
        let question = state.get("question").and_then(|v| v.as_str()).unwrap_or("");
        adk_core::Content::new("user").with_text(question)
    })
    .with_output_mapper(|events| {
        let mut updates = std::collections::HashMap::new();
        let mut has_tool_calls = false;
        let mut response = String::new();

        for event in events {
            if let Some(content) = event.content() {
                for part in &content.parts {
                    match part {
                        Part::FunctionCall { name, .. } => {
                            has_tool_calls = true;
                        }
                        Part::Text { text } => {
                            response.push_str(text);
                        }
                        _ => {}
                    }
                }
            }
        }

        updates.insert("has_tool_calls".to_string(), json!(has_tool_calls));
        updates.insert("response".to_string(), json!(response));
        updates
    });

// 순환을 사용하여 그래프 구축
let react_agent = StateGraph::with_channels(&["question", "has_tool_calls", "response", "iteration"])
    .add_node(reasoner_node)
    .add_node_fn("counter", |ctx| async move {
        let i = ctx.get("iteration").and_then(|v| v.as_i64()).unwrap_or(0);
        Ok(NodeOutput::new().with_update("iteration", json!(i + 1)))
    })
    .add_edge(START, "counter")
    .add_edge("counter", "reasoner")
    .add_conditional_edges(
        "reasoner",
        |state| {
            let has_tools = state.get("has_tool_calls").and_then(|v| v.as_bool()).unwrap_or(false);
            let iteration = state.get("iteration").and_then(|v| v.as_i64()).unwrap_or(0);

            // 안전 제한
            if iteration >= 5 { return END.to_string(); }

            if has_tools {
                "counter".to_string()  // 다시 루프
            } else {
                END.to_string()  // 완료
            }
        },
        [("counter", "counter"), (END, END)],
    )
    .compile()?
    .with_recursion_limit(10);

멀티 에이전트 감독관

작업을 전문 에이전트에게 라우팅합니다:

// Create supervisor agent
let supervisor = Arc::new(
    LlmAgentBuilder::new("supervisor")
        .model(model.clone())
        .instruction("Route tasks to: researcher, writer, or coder. Reply with agent name only.")
        .build()?
);

let supervisor_node = AgentNode::new(supervisor)
    .with_output_mapper(|events| {
        let mut updates = std::collections::HashMap::new();
        for event in events {
            if let Some(content) = event.content() {
                let text: String = content.parts.iter()
                    .filter_map(|p| p.text())
                    .collect::<Vec<_>>()
                    .join("")
                    .to_lowercase();

                let next = if text.contains("researcher") { "researcher" }
                    else if text.contains("writer") { "writer" }
                    else if text.contains("coder") { "coder" }
                    else { "done" };

                updates.insert("next_agent".to_string(), json!(next));
            }
        }
        updates
    });

// Build supervisor graph
let graph = StateGraph::with_channels(&["task", "next_agent", "research", "content", "code"])
    .add_node(supervisor_node)
    .add_node(researcher_node)
    .add_node(writer_node)
    .add_node(coder_node)
    .add_edge(START, "supervisor")
    .add_conditional_edges(
        "supervisor",
        Router::by_field("next_agent"),
        [
            ("researcher", "researcher"),
            ("writer", "writer"),
            ("coder", "coder"),
            ("done", END),
        ],
    )
    // Agents report back to supervisor
    .add_edge("researcher", "supervisor")
    .add_edge("writer", "supervisor")
    .add_edge("coder", "supervisor")
    .compile()?;

상태 관리

리듀서와 함께하는 상태 스키마

상태 업데이트가 병합되는 방식을 제어합니다:

let schema = StateSchema::builder()
    .channel("current_step")                    // Overwrite (default)
    .list_channel("messages")                   // Append to list
    .channel_with_reducer("count", Reducer::Sum) // Sum values
    .channel_with_reducer("data", Reducer::Custom(Arc::new(|old, new| {
        // Custom merge logic
        merge_json(old, new)
    })))
    .build();

let agent = GraphAgent::builder("stateful")
    .state_schema(schema)
    // ... nodes and edges
    .build()?;

리듀서 타입

Reducer동작
Overwrite이전 값을 새 값으로 교체 (기본값)
Append목록에 추가
Sum숫자 값 더하기
Custom사용자 정의 병합 함수

체크포인트

결함 허용 및 Human-in-the-Loop를 위한 영구 상태를 활성화합니다:

인메모리 (개발용)

use adk_graph::checkpoint::MemoryCheckpointer;

let checkpointer = Arc::new(MemoryCheckpointer::new());

let graph = StateGraph::with_channels(&["task", "result"])
    // ... nodes and edges
    .compile()?
    .with_checkpointer_arc(checkpointer.clone());

SQLite (프로덕션용)

use adk_graph::checkpoint::SqliteCheckpointer;

let checkpointer = SqliteCheckpointer::new("checkpoints.db").await?;

let graph = StateGraph::with_channels(&["task", "result"])
    // ... nodes and edges
    .compile()?
    .with_checkpointer(checkpointer);

체크포인트 기록 (타임 트래블)

// List all checkpoints for a thread
let checkpoints = checkpointer.list("thread-id").await?;
for cp in checkpoints {
    println!("Step {}: {:?}", cp.step, cp.state.get("status"));
}

// Load a specific checkpoint
if let Some(checkpoint) = checkpointer.load_by_id(&checkpoint_id).await? {
    println!("State at step {}: {:?}", checkpoint.step, checkpoint.state);
}

Human-in-the-Loop

동적 인터럽트를 사용하여 사람의 승인을 위한 실행을 일시 중지합니다.

use adk_graph::{error::GraphError, node::NodeOutput};

// Planner agent assesses risk
let planner_node = AgentNode::new(planner_agent)
    .with_output_mapper(|events| {
        let mut updates = std::collections::HashMap::new();
        for event in events {
            if let Some(content) = event.content() {
                let text: String = content.parts.iter()
                    .filter_map(|p| p.text())
                    .collect::<Vec<_>>()
                    .join("");

                // Extract risk level from LLM response
                let risk = if text.to_lowercase().contains("risk: high") { "high" }
                    else if text.to_lowercase().contains("risk: medium") { "medium" }
                    else { "low" };

                updates.insert("plan".to_string(), json!(text));
                updates.insert("risk_level".to_string(), json!(risk));
            }
        }
        updates
    });

// Review node with dynamic interrupt
let graph = StateGraph::with_channels(&["task", "plan", "risk_level", "approved", "result"])
    .add_node(planner_node)
    .add_node(executor_node)
    .add_node_fn("review", |ctx| async move {
        let risk = ctx.get("risk_level").and_then(|v| v.as_str()).unwrap_or("low");
        let approved = ctx.get("approved").and_then(|v| v.as_bool());

        // Already approved - continue
        if approved == Some(true) {
            return Ok(NodeOutput::new());
        }

        // High/medium risk - interrupt for approval
        if risk == "high" || risk == "medium" {
            return Ok(NodeOutput::interrupt_with_data(
                &format!("{} RISK: Human approval required", risk.to_uppercase()),
                json!({
                    "plan": ctx.get("plan"),
                    "risk_level": risk,
                    "action": "Set 'approved' to true to continue"
                })
            ));
        }

        // Low risk - auto-approve
        Ok(NodeOutput::new().with_update("approved", json!(true)))
    })
    .add_edge(START, "planner")
    .add_edge("planner", "review")
    .add_edge("review", "executor")
    .add_edge("executor", END)
    .compile()?
    .with_checkpointer_arc(checkpointer.clone());

// Execute - may pause for approval
let thread_id = "task-001";
let result = graph.invoke(input, ExecutionConfig::new(thread_id)).await;

match result {
    Err(GraphError::Interrupted(interrupt)) => {
        println!("*** EXECUTION PAUSED ***");
        println!("Reason: {}", interrupt.interrupt);
        println!("Plan awaiting approval: {:?}", interrupt.state.get("plan"));

        // Human reviews and approves...

        // Update state with approval
        graph.update_state(thread_id, [("approved".to_string(), json!(true))]).await?;

        // Resume execution
        let final_result = graph.invoke(State::new(), ExecutionConfig::new(thread_id)).await?;
        println!("Final result: {:?}", final_result.get("result"));
    }
    Ok(result) => {
        println!("Completed without interrupt: {:?}", result);
    }
    Err(e) => {
        println!("Error: {}", e);
    }
}

Static Interrupts

필수 일시 중지 지점을 위해 interrupt_before 또는 interrupt_after를 사용합니다.

let graph = StateGraph::with_channels(&["task", "plan", "result"])
    .add_node(planner_node)
    .add_node(executor_node)
    .add_edge(START, "planner")
    .add_edge("planner", "executor")
    .add_edge("executor", END)
    .compile()?
    .with_interrupt_before(&["executor"]);  // Always pause before execution

스트리밍 실행

그래프가 실행되는 동안 이벤트를 스트리밍합니다:

use futures::StreamExt;
use adk_graph::stream::StreamMode;

let stream = agent.stream(input, config, StreamMode::Updates);

while let Some(event) = stream.next().await {
    match event? {
        StreamEvent::NodeStart(name) => println!("Starting: {}", name),
        StreamEvent::Updates { node, updates } => {
            println!("{} updated state: {:?}", node, updates);
        }
        StreamEvent::NodeEnd(name) => println!("Completed: {}", name),
        StreamEvent::Done(state) => println!("Final state: {:?}", state),
        _ => {}
    }
}

스트림 모드

모드설명
Values각 노드 후에 전체 상태를 스트리밍합니다.
Updates상태 변경만 스트리밍합니다.
Messages메시지 유형 업데이트를 스트리밍합니다.
Debug모든 내부 이벤트를 스트리밍합니다.

ADK 통합

GraphAgent는 ADK Agent 트레이트를 구현하므로 다음 항목들과 함께 작동합니다:

  • Runner: 표준 실행을 위해 adk-runner와 함께 사용
  • Callbacks: before/after 콜백에 대한 완전한 지원
  • Sessions: 대화 기록을 위해 adk-session과 함께 작동
  • Streaming: ADK EventStream을 반환합니다.
use adk_runner::Runner;

let graph_agent = GraphAgent::builder("workflow")
    .before_agent_callback(|ctx| async {
        println!("Starting graph execution for session: {}", ctx.session_id());
        Ok(())
    })
    .after_agent_callback(|ctx, event| async {
        if let Some(content) = event.content() {
            println!("Graph completed with content");
        }
        Ok(())
    })
    // ... graph definition
    .build()?;

// GraphAgent implements Agent trait - use with Launcher or Runner
// See adk-runner README for Runner configuration

예시

모든 예시는 AgentNode와 실제 LLM 통합을 사용합니다:

# Parallel LLM agents with before/after callbacks
cargo run --example graph_agent

# Sequential multi-agent pipeline (extractor → analyzer → formatter)
cargo run --example graph_workflow

# LLM-based sentiment classification and conditional routing
cargo run --example graph_conditional

# ReAct pattern with tools and cyclic execution
cargo run --example graph_react

# Multi-agent supervisor routing to specialists
cargo run --example graph_supervisor

# Human-in-the-loop with risk-based interrupts
cargo run --example graph_hitl

# Checkpointing and time travel debugging
cargo run --example graph_checkpoint

LangGraph와의 비교

기능LangGraphadk-graph
State ManagementTypedDict + ReducersStateSchema + Reducers
Execution ModelPregel super-stepsPregel super-steps
CheckpointingMemory, SQLite, PostgresMemory, SQLite
Human-in-Loopinterrupt_before/afterinterrupt_before/after + dynamic
Streaming5 modes5가지 모드
CyclesNative support네이티브 지원
Type SafetyPython typingRust type system
LLM IntegrationLangChainAgentNode + ADK agents

이전: ← 다중 에이전트 시스템 | 다음: 실시간 에이전트 →