Graph Agents

Construa fluxos de trabalho complexos e com estado usando orquestração estilo LangGraph com integração nativa ADK-Rust.

Overview

GraphAgent permite que você defina fluxos de trabalho como grafos direcionados com nodes e edges, suportando:

  • AgentNode: Empacote agentes LLM como nodes de grafo com mappers de entrada/saída personalizados
  • Cyclic Workflows: Suporte nativo para loops e raciocínio iterativo (padrão ReAct)
  • Conditional Routing: Roteamento de edge dinâmico baseado em estado
  • State Management: Estado tipado com reducers (sobrescrever, anexar, somar, personalizado)
  • Checkpointing: Estado persistente para tolerância a falhas e human-in-the-loop
  • Streaming: Múltiplos modos de stream (values, updates, messages, debug)

A crate adk-graph fornece orquestração de fluxo de trabalho estilo LangGraph para construir fluxos de trabalho de agentes complexos e com estado. Ela traz capacidades de fluxo de trabalho baseadas em grafo para o ecossistema ADK-Rust, mantendo compatibilidade total com o sistema de agentes do ADK.

Key Benefits:

  • Visual Workflow Design: Defina lógica complexa como grafos intuitivos de node e edge
  • Parallel Execution: Múltiplos nodes podem ser executados simultaneamente para melhor performance
  • State Persistence: Checkpointing integrado para tolerância a falhas e human-in-the-loop
  • LLM Integration: Suporte nativo para empacotar agentes ADK como nodes de grafo
  • Flexible Routing: Edges estáticos, roteamento condicional e tomada de decisão dinâmica

O Que Você Irá Construir

Neste guia, você criará um Pipeline de Processamento de Texto que executa tradução e sumarização em paralelo:

                        ┌─────────────────────┐
       User Input       │                     │
      ────────────────▶ │       START         │
                        │                     │
                        └──────────┬──────────┘
                                   │
                   ┌───────────────┴───────────────┐
                   │                               │
                   ▼                               ▼
        ┌──────────────────┐            ┌──────────────────┐
        │   TRANSLATOR     │            │   SUMMARIZER     │
        │                  │            │                  │
        │  🇫🇷 French       │            │  📝 One sentence │
        │     Translation  │            │     Summary      │
        └─────────┬────────┘            └─────────┬────────┘
                  │                               │
                  └───────────────┬───────────────┘
                                  │
                                  ▼
                        ┌─────────────────────┐
                        │      COMBINE        │
                        │                     │
                        │  📋 Merge Results   │
                        └──────────┬──────────┘
                                   │
                                   ▼
                        ┌─────────────────────┐
                        │        END          │
                        │                     │
                        │   ✅ Complete       │
                        └─────────────────────┘

Conceitos Chave:

  • Nodes - Unidades de processamento que realizam trabalho (agents LLM, funções ou lógica personalizada)
  • Edges - Fluxo de controle entre nodes (conexões estáticas ou roteamento condicional)
  • State - Dados compartilhados que fluem através do grafo e persistem entre nodes
  • Execução Paralela - Múltiplos nodes podem ser executados simultaneamente para melhor desempenho

Entendendo os Componentes Principais

🔧 Nodes: Os Trabalhadores Os Nodes são onde o trabalho real acontece. Cada node pode:

  • AgentNode: Envolve um agent LLM para processar linguagem natural
  • Function Node: Executa código Rust personalizado para processamento de dados
  • Built-in Nodes: Usam lógica predefinida como contadores ou validadores

Pense nos nodes como trabalhadores especializados em uma linha de montagem - cada um tem um trabalho e uma experiência específicos.

🔀 Edges: O Controle de Fluxo Os Edges determinam como a execução se move através do seu grafo:

  • Static Edges: Conexões diretas (A → B → C)
  • Conditional Edges: Roteamento dinâmico baseado em state (if sentiment == "positive" → positive_handler)
  • Parallel Edges: Múltiplos caminhos a partir de um node (START → [translator, summarizer])

Os Edges são como semáforos e sinais de trânsito que direcionam o fluxo de trabalho.

💾 State: A Memória Compartilhada State é um armazenamento de chave-valor que todos os nodes podem ler e escrever:

  • Dados de Entrada: Informações iniciais alimentadas no grafo
  • Resultados Intermediários: A saída de um node se torna a entrada para outro
  • Saída Final: O resultado completo após todo o processamento

State age como um quadro branco compartilhado onde os nodes podem deixar informações para que outros as usem.

⚡ Execução Paralela: O Aumento de Velocidade Quando múltiplos edges saem de um node, esses nodes de destino são executados simultaneamente:

  • Processamento Mais Rápido: Tarefas independentes são executadas ao mesmo tempo
  • Eficiência de Recursos: Melhor utilização da CPU e I/O
  • Escalabilidade: Lida com fluxos de trabalho mais complexos sem desaceleração linear

Isso é como ter múltiplos trabalhadores abordando diferentes partes de um trabalho simultaneamente em vez de esperar em fila.


Início Rápido

1. Crie Seu Projeto

cargo new graph_demo
cd graph_demo

Adicione as dependências ao Cargo.toml:

[dependencies]
adk-graph = { version = "0.2", features = ["sqlite"] }
adk-agent = "0.2"
adk-model = "0.2"
adk-core = "0.2"
tokio = { version = "1", features = ["full"] }
dotenvy = "0.15"
serde_json = "1.0"

Crie o .env com sua chave de API:

echo 'GOOGLE_API_KEY=your-api-key' > .env

2. Exemplo de Processamento Paralelo

Aqui está um exemplo completo e funcional que processa texto em paralelo:

use adk_agent::LlmAgentBuilder;
use adk_graph::{
    agent::GraphAgent,
    edge::{END, START},
    node::{AgentNode, ExecutionConfig, NodeOutput},
    state::State,
};
use adk_model::GeminiModel;
use serde_json::json;
use std::sync::Arc;

#[tokio::main]
async fn main() -> anyhow::Result<()> {
    dotenvy::dotenv().ok();
    let api_key = std::env::var("GOOGLE_API_KEY")?;
    let model = Arc::new(GeminiModel::new(&api_key, "gemini-2.0-flash")?);

    // Crie agentes LlmAgent especializados
    let translator_agent = Arc::new(
        LlmAgentBuilder::new("translator")
            .description("Traduz texto para francês")
            .model(model.clone())
            .instruction("Traduza o texto de entrada para francês. Apenas mostre a tradução.")
            .build()?,
    );

    let summarizer_agent = Arc::new(
        LlmAgentBuilder::new("summarizer")
            .description("Resume texto")
            .model(model.clone())
            .instruction("Resuma o texto de entrada em uma frase.")
            .build()?,
    );

    // Empacote agentes como nós de grafo com mappers de entrada/saída
    let translator_node = AgentNode::new(translator_agent)
        .with_input_mapper(|state| {
            let text = state.get("input").and_then(|v| v.as_str()).unwrap_or("");
            adk_core::Content::new("user").with_text(text)
        })
        .with_output_mapper(|events| {
            let mut updates = std::collections::HashMap::new();
            for event in events {
                if let Some(content) = event.content() {
                    let text: String = content.parts.iter()
                        .filter_map(|p| p.text())
                        .collect::<Vec<_>>()
                        .join("");
                    if !text.is_empty() {
                        updates.insert("translation".to_string(), json!(text));
                    }
                }
            }
            updates
        });

    let summarizer_node = AgentNode::new(summarizer_agent)
        .with_input_mapper(|state| {
            let text = state.get("input").and_then(|v| v.as_str()).unwrap_or("");
            adk_core::Content::new("user").with_text(text)
        })
        .with_output_mapper(|events| {
            let mut updates = std::collections::HashMap::new();
            for event in events {
                if let Some(content) = event.content() {
                    let text: String = content.parts.iter()
                        .filter_map(|p| p.text())
                        .collect::<Vec<_>>()
                        .join("");
                    if !text.is_empty() {
                        updates.insert("summary".to_string(), json!(text));
                    }
                }
            }
            updates
        });

    // Construa o grafo com execução paralela
    let agent = GraphAgent::builder("text_processor")
        .description("Processa texto com tradução e sumarização em paralelo")
        .channels(&["input", "translation", "summary", "result"])
        .node(translator_node)
        .node(summarizer_node)
        .node_fn("combine", |ctx| async move {
            let translation = ctx.get("translation").and_then(|v| v.as_str()).unwrap_or("N/A");
            let summary = ctx.get("summary").and_then(|v| v.as_str()).unwrap_or("N/A");

            let result = format!(
                "=== Processing Complete ===\n\n\
                French Translation:\n{}\n\n\
                Summary:\n{}",
                translation, summary
            );

            Ok(NodeOutput::new().with_update("result", json!(result)))
        })
        // Execução paralela: ambos os nós iniciam simultaneamente
        .edge(START, "translator")
        .edge(START, "summarizer")
        .edge("translator", "combine")
        .edge("summarizer", "combine")
        .edge("combine", END)
        .build()?;

    // Execute o grafo
    let mut input = State::new();
    input.insert("input".to_string(), json!("AI is transforming how we work and live."));

    let result = agent.invoke(input, ExecutionConfig::new("thread-1")).await?;
    println!("{}", result.get("result").and_then(|v| v.as_str()).unwrap_or(""));

    Ok(())
}

Saída de Exemplo:

=== Processing Complete ===

French Translation:
L'IA transforme notre façon de travailler et de vivre.

Summary:
AI is revolutionizing work and daily life through technological transformation.

Como a Execução de Grafo Funciona

A Visão Geral

Agentes de grafo são executados em super-passos - todos os nós prontos rodam em paralelo, então o grafo espera que todos concluam antes do próximo passo:

PASSO 1: INÍCIO ──┬──▶ translator (executando)
                 └──▶ summarizer (executando)
                 
                 ⏳ Esperar ambos concluírem...
                 
PASSO 2: translator ──┬──▶ combine (executando)
         summarizer ──┘
         
                 ⏳ Esperar 'combine' concluir...
                 
PASSO 3: combine ──▶ FIM ✅

Fluxo de Estado Através dos Nós

Cada nó pode ler e escrever no estado compartilhado:

┌─────────────────────────────────────────────────────────────────────┐
│ PASSO 1: Estado inicial                                             │
├─────────────────────────────────────────────────────────────────────┤
│                                                                     │
│   Estado: { "input": "A IA está transformando a forma como trabalhamos" } │
│                                                                     │
│                              ↓                                      │
│                                                                     │
│   ┌──────────────────┐              ┌──────────────────┐           │
│   │   translator     │              │   summarizer     │           │
│   │  lê "input"      │              │  lê "input"      │           │
│   └──────────────────┘              └──────────────────┘           │
│                                                                     │
└─────────────────────────────────────────────────────────────────────┘
                              ↓
┌─────────────────────────────────────────────────────────────────────┐
│ PASSO 2: Após execução paralela                                     │
├─────────────────────────────────────────────────────────────────────┤
│                                                                     │
│   Estado: {                                                          │
│     "input": "A IA está transformando a forma como trabalhamos",   │
│     "translation": "A IA transforma a nossa forma de trabalhar",   │
│     "summary": "A IA está revolucionando o trabalho através da tecnologia" │
│   }                                                                 │
│                                                                     │
│                              ↓                                      │
│                                                                     │
│   ┌──────────────────────────────────────┐                         │
│   │           combine                    │                         │
│   │  lê "translation" + "summary"        │                         │
│   │  escreve "result"                    │                         │
│   └──────────────────────────────────────┘                         │
│                                                                     │
└─────────────────────────────────────────────────────────────────────┘
                              ↓
┌─────────────────────────────────────────────────────────────────────┐
│ PASSO 3: Estado final                                               │
├─────────────────────────────────────────────────────────────────────┤
│                                                                     │
│   Estado: {                                                          │
│     "input": "A IA está transformando a forma como trabalhamos",   │
│     "translation": "A IA transforma a nossa forma de trabalhar",   │
│     "summary": "A IA está revolucionando o trabalho através da tecnologia", │
│     "result": "=== Processamento Concluído ===\n\nFrancês..."     │
│   }                                                                 │
│                                                                     │
└─────────────────────────────────────────────────────────────────────┘

O Que o Faz Funcionar

ComponenteFunção
AgentNodeEncapsula agentes LLM com mapeadores de entrada/saída
input_mapperTransforma o estado → entrada do agente Content
output_mapperTransforma eventos do agente → atualizações de estado
channelsDeclara os campos de estado que o grafo usará
edge()Define o fluxo de execução entre os nós
ExecutionConfigFornece ID de thread para pontos de verificação

Roteamento Condicional com Classificação LLM

Crie sistemas de roteamento inteligentes onde LLMs decidem o caminho de execução:

Visual: Roteamento Baseado em Sentimento

                        ┌─────────────────────┐
       Feedback do Usuário│                     │
      ────────────────▶ │    CLASSIFICADOR    │
                        │  🧠 Analisar tom    │
                        └──────────┬──────────┘
                                   │
                   ┌───────────────┼───────────────┐
                   │               │               │
                   ▼               ▼               ▼
        ┌──────────────────┐ ┌──────────────────┐ ┌──────────────────┐
        │     POSITIVO     │ │     NEGATIVO     │ │      NEUTRO      │
        │                  │ │                  │ │                  │
        │  😊 Obrigado!    │ │  😔 Pedir desculpas│ │  😐 Fazer mais   │
        │    Comemorar     │ │   Ajudar a corrigir│ │     perguntas    │
        └──────────────────┘ └──────────────────┘ └──────────────────┘

Exemplo de Código Completo

use adk_agent::LlmAgentBuilder;
use adk_graph::{
    edge::{END, Router, START},
    graph::StateGraph,
    node::{AgentNode, ExecutionConfig},
    state::State,
};
use adk_model::GeminiModel;
use serde_json::json;
use std::sync::Arc;

#[tokio::main]
async fn main() -> anyhow::Result<()> {
    dotenvy::dotenv().ok();
    let api_key = std::env::var("GOOGLE_API_KEY")?;
    let model = Arc::new(GeminiModel::new(&api_key, "gemini-2.0-flash")?);

    // Criar agente classificador
    let classifier_agent = Arc::new(
        LlmAgentBuilder::new("classifier")
            .description("Classifies text sentiment")
            .model(model.clone())
            .instruction(
                "You are a sentiment classifier. Analyze the input text and respond with \
                ONLY one word: 'positive', 'negative', or 'neutral'. Nothing else.",
            )
            .build()?,
    );

    // Criar agentes de resposta para cada sentimento
    let positive_agent = Arc::new(
        LlmAgentBuilder::new("positive")
            .description("Handles positive feedback")
            .model(model.clone())
            .instruction(
                "You are a customer success specialist. The customer has positive feedback. \
                Express gratitude, reinforce the positive experience, and suggest ways to \
                share their experience. Be warm and appreciative. Keep response under 3 sentences.",
            )
            .build()?,
    );

    let negative_agent = Arc::new(
        LlmAgentBuilder::new("negative")
            .description("Handles negative feedback")
            .model(model.clone())
            .instruction(
                "You are a customer support specialist. The customer has a complaint. \
                Acknowledge their frustration, apologize sincerely, and offer help. \
                Be empathetic. Keep response under 3 sentences.",
            )
            .build()?,
    );

    let neutral_agent = Arc::new(
        LlmAgentBuilder::new("neutral")
            .description("Handles neutral feedback")
            .model(model.clone())
            .instruction(
                "You are a customer service representative. The customer has neutral feedback. \
                Ask clarifying questions to better understand their needs. Be helpful and curious. \
                Keep response under 3 sentences.",
            )
            .build()?,
    );

    // Criar AgentNodes com mappers
    let classifier_node = AgentNode::new(classifier_agent)
        .with_input_mapper(|state| {
            let text = state.get("feedback").and_then(|v| v.as_str()).unwrap_or("");
            adk_core::Content::new("user").with_text(text)
        })
        .with_output_mapper(|events| {
            let mut updates = std::collections::HashMap::new();
            for event in events {
                if let Some(content) = event.content() {
                    let text: String = content.parts.iter()
                        .filter_map(|p| p.text())
                        .collect::<Vec<_>>()
                        .join("")
                        .to_lowercase()
                        .trim()
                        .to_string();
                    
                    let sentiment = if text.contains("positive") { "positive" }
                        else if text.contains("negative") { "negative" }
                        else { "neutral" };
                    
                    updates.insert("sentiment".to_string(), json!(sentiment));
                }
            }
            updates
        });

    // Nós de resposta (padrão similar para cada)
    let positive_node = AgentNode::new(positive_agent)
        .with_input_mapper(|state| {
            let text = state.get("feedback").and_then(|v| v.as_str()).unwrap_or("");
            adk_core::Content::new("user").with_text(text)
        })
        .with_output_mapper(|events| {
            let mut updates = std::collections::HashMap::new();
            for event in events {
                if let Some(content) = event.content() {
                    let text: String = content.parts.iter()
                        .filter_map(|p| p.text())
                        .collect::<Vec<_>>()
                        .join("");
                    updates.insert("response".to_string(), json!(text));
                }
            }
            updates
        });

    // Construir grafo com roteamento condicional
    let graph = StateGraph::with_channels(&["feedback", "sentiment", "response"])
        .add_node(classifier_node)
        .add_node(positive_node)
        // ... add negative_node and neutral_node similarly
        .add_edge(START, "classifier")
        .add_conditional_edges(
            "classifier",
            Router::by_field("sentiment"),  // Roteia com base no campo de sentimento
            [
                ("positive", "positive"),
                ("negative", "negative"),
                ("neutral", "neutral"),
            ],
        )
        .add_edge("positive", END)
        .add_edge("negative", END)
        .add_edge("neutral", END)
        .compile()?;

    // Testar com diferentes feedbacks
    let mut input = State::new();
    input.insert("feedback".to_string(), json!("Your product is amazing! I love it!"));

    let result = graph.invoke(input, ExecutionConfig::new("feedback-1")).await?;
    println!("Sentiment: {}", result.get("sentiment").and_then(|v| v.as_str()).unwrap_or(""));
    println!("Response: {}", result.get("response").and_then(|v| v.as_str()).unwrap_or(""));

    Ok(())
}

Exemplo de Fluxo:

Entrada: "Your product is amazing! I love it!"
       ↓
Classificador: "positive"
       ↓
Agente Positivo: "Muito obrigado pelo feedback maravilhoso! 
                Estamos muito felizes que você ame nosso produto. 
                Você consideraria deixar uma avaliação para ajudar outras pessoas?"

Padrão ReAct: Raciocínio + Ação

Crie agentes que podem usar ferramentas iterativamente para resolver problemas complexos:

Visual: Ciclo ReAct

                        ┌─────────────────────┐
       User Question    │                     │
      ────────────────▶ │      REASONER       │
                        │  🧠 Think + Act     │
                        └──────────┬──────────┘
                                   │
                                   ▼
                        ┌─────────────────────┐
                        │   Has tool calls?   │
                        │                     │
                        └──────────┬──────────┘
                                   │
                   ┌───────────────┴───────────────┐
                   │                               │
                   ▼                               ▼
        ┌──────────────────┐            ┌──────────────────┐
        │       YES        │            │        NO        │
        │                  │            │                  │
        │  🔄 Loop back    │            │  ✅ Final answer │
        │     to reasoner  │            │      END         │
        └─────────┬────────┘            └──────────────────┘
                  │
                  └─────────────────┐
                                    │
                                    ▼
                        ┌─────────────────────┐
                        │      REASONER       │
                        │  🧠 Think + Act     │
                        │   (next iteration)  │
                        └─────────────────────┘

Exemplo Completo de ReAct

use adk_agent::LlmAgentBuilder;
use adk_core::{Part, Tool};
use adk_graph::{
    edge::{END, START},
    graph::StateGraph,
    node::{AgentNode, ExecutionConfig, NodeOutput},
    state::State,
};
use adk_model::GeminiModel;
use adk_tool::FunctionTool;
use serde_json::json;
use std::sync::Arc;

#[tokio::main]
async fn main() -> anyhow::Result<()> {
    dotenvy::dotenv().ok();
    let api_key = std::env::var("GOOGLE_API_KEY")?;
    let model = Arc::new(GeminiModel::new(&api_key, "gemini-2.0-flash")?);

    // Create tools
    let weather_tool = Arc::new(FunctionTool::new(
        "get_weather",
        "Get the current weather for a location. Takes a 'location' parameter (city name).",
        |_ctx, args| async move {
            let location = args.get("location").and_then(|v| v.as_str()).unwrap_or("unknown");
            Ok(json!({
                "location": location,
                "temperature": "72°F",
                "condition": "Sunny",
                "humidity": "45%"
            }))
        },
    )) as Arc<dyn Tool>;

    let calculator_tool = Arc::new(FunctionTool::new(
        "calculator",
        "Perform mathematical calculations. Takes an 'expression' parameter (string).",
        |_ctx, args| async move {
            let expr = args.get("expression").and_then(|v| v.as_str()).unwrap_or("0");
            let result = match expr {
                "2 + 2" => "4",
                "10 * 5" => "50",
                "100 / 4" => "25",
                "15 - 7" => "8",
                _ => "Unable to evaluate",
            };
            Ok(json!({ "result": result, "expression": expr }))
        },
    )) as Arc<dyn Tool>;

    // Create reasoner agent with tools
    let reasoner_agent = Arc::new(
        LlmAgentBuilder::new("reasoner")
            .description("Reasoning agent with tools")
            .model(model.clone())
            .instruction(
                "You are a helpful assistant with access to tools. Use tools when needed to answer questions. \
                When you have enough information, provide a final answer without using more tools.",
            )
            .tool(weather_tool)
            .tool(calculator_tool)
            .build()?,
    );

    // Create reasoner node that detects tool usage
    let reasoner_node = AgentNode::new(reasoner_agent)
        .with_input_mapper(|state| {
            let question = state.get("question").and_then(|v| v.as_str()).unwrap_or("");
            adk_core::Content::new("user").with_text(question)
        })
        .with_output_mapper(|events| {
            let mut updates = std::collections::HashMap::new();
            let mut has_tool_calls = false;
            let mut response = String::new();

            for event in events {
                if let Some(content) = event.content() {
                    for part in &content.parts {
                        match part {
                            Part::FunctionCall { .. } => {
                                has_tool_calls = true;
                            }
                            Part::Text { text } => {
                                response.push_str(text);
                            }
                            _ => {}
                        }
                    }
                }
            }

            updates.insert("has_tool_calls".to_string(), json!(has_tool_calls));
            updates.insert("response".to_string(), json!(response));
            updates
        });

    // Build ReAct graph with cycle
    let graph = StateGraph::with_channels(&["question", "has_tool_calls", "response", "iteration"])
        .add_node(reasoner_node)
        .add_node_fn("counter", |ctx| async move {
            let i = ctx.get("iteration").and_then(|v| v.as_i64()).unwrap_or(0);
            Ok(NodeOutput::new().with_update("iteration", json!(i + 1)))
        })
        .add_edge(START, "counter")
        .add_edge("counter", "reasoner")
        .add_conditional_edges(
            "reasoner",
            |state| {
                let has_tools = state.get("has_tool_calls").and_then(|v| v.as_bool()).unwrap_or(false);
                let iteration = state.get("iteration").and_then(|v| v.as_i64()).unwrap_or(0);

                // Safety limit
                if iteration >= 5 { return END.to_string(); }

                if has_tools {
                    "counter".to_string()  // Loop back for more reasoning
                } else {
                    END.to_string()  // Done - final answer
                }
            },
            [("counter", "counter"), (END, END)],
        )
        .compile()?
        .with_recursion_limit(10);

    // Test the ReAct agent
    let mut input = State::new();
    input.insert("question".to_string(), json!("What's the weather in Paris and what's 15 + 25?"));

    let result = graph.invoke(input, ExecutionConfig::new("react-1")).await?;
    println!("Final answer: {}", result.get("response").and_then(|v| v.as_str()).unwrap_or(""));
    println!("Iterations: {}", result.get("iteration").and_then(|v| v.as_i64()).unwrap_or(0));

    Ok(())
}

Fluxo de Exemplo:

Question: "Qual é o clima em Paris e quanto é 15 + 25?"

Iteração 1:
  Reasoner: "Preciso obter informações sobre o clima e fazer contas"
  → Chama get_weather(location="Paris") e calculator(expression="15 + 25")
  → has_tool_calls = true → Retorna ao início

Iteração 2:
  Reasoner: "Com base nos resultados: Paris está a 72°F e ensolarado, 15 + 25 = 40"
  → Nenhuma chamada de ferramenta → has_tool_calls = false → END

Resposta Final: "O clima em Paris está a 72°F e ensolarado com 45% de umidade. 
              E 15 + 25 é igual a 40."

AgentNode

Empacota qualquer Agent do ADK (tipicamente LlmAgent) como um nó de grafo:

let node = AgentNode::new(llm_agent)
    .with_input_mapper(|state| {
        // Transform graph state to agent input Content
        let text = state.get("input").and_then(|v| v.as_str()).unwrap_or("");
        adk_core::Content::new("user").with_text(text)
    })
    .with_output_mapper(|events| {
        // Transform agent events to state updates
        let mut updates = std::collections::HashMap::new();
        for event in events {
            if let Some(content) = event.content() {
                let text: String = content.parts.iter()
                    .filter_map(|p| p.text())
                    .collect::<Vec<_>>()
                    .join("");
                updates.insert("output".to_string(), json!(text));
            }
        }
        updates
    });

Nós de Função

Funções assíncronas simples que processam o estado:

.node_fn("process", |ctx| async move {
    let input = ctx.state.get("input").unwrap();
    let output = process_data(input).await?;
    Ok(NodeOutput::new().with_update("output", output))
})

Tipos de Aresta

Arestas Estáticas

Conexões diretas entre nós:

.edge(START, "first_node")
.edge("first_node", "second_node")
.edge("second_node", END)

Arestas Condicionais

Roteamento dinâmico baseado no estado:

.conditional_edge(
    "router",
    |state| {
        match state.get("next").and_then(|v| v.as_str()) {
            Some("research") => "research_node".to_string(),
            Some("write") => "write_node".to_string(),
            _ => END.to_string(),
        }
    },
    [
        ("research_node", "research_node"),
        ("write_node", "write_node"),
        (END, END),
    ],
)

Auxiliares de Roteador

Use roteadores embutidos para padrões comuns:

use adk_graph::edge::Router;

// Route based on a state field value
.conditional_edge("classifier", Router::by_field("sentiment"), [
    ("positive", "positive_handler"),
    ("negative", "negative_handler"),
    ("neutral", "neutral_handler"),
])

// Route based on boolean field
.conditional_edge("check", Router::by_bool("approved"), [
    ("true", "execute"),
    ("false", "reject"),
])

// Limit iterations
.conditional_edge("loop", Router::max_iterations("count", 5), [
    ("continue", "process"),
    ("done", END),
])

Execução Paralela

Múltiplas arestas de um único nó são executadas em paralelo:

let agent = GraphAgent::builder("parallel_processor")
    .channels(&["input", "translation", "summary", "analysis"])
    .node(translator_node)
    .node(summarizer_node)
    .node(analyzer_node)
    .node(combiner_node)
    // All three start simultaneously
    .edge(START, "translator")
    .edge(START, "summarizer")
    .edge(START, "analyzer")
    // Wait for all to complete before combining
    .edge("translator", "combiner")
    .edge("summarizer", "combiner")
    .edge("analyzer", "combiner")
    .edge("combiner", END)
    .build()?;

Grafos Cíclicos (Padrão ReAct)

Construa agentes de raciocínio iterativo com ciclos:

use adk_core::Part;

// Create agent with tools
let reasoner = Arc::new(
    LlmAgentBuilder::new("reasoner")
        .model(model)
        .instruction("Use tools to answer questions. Provide final answer when done.")
        .tool(search_tool)
        .tool(calculator_tool)
        .build()?
);

let reasoner_node = AgentNode::new(reasoner)
    .with_input_mapper(|state| {
        let question = state.get("question").and_then(|v| v.as_str()).unwrap_or("");
        adk_core::Content::new("user").with_text(question)
    })
    .with_output_mapper(|events| {
        let mut updates = std::collections::HashMap::new();
        let mut has_tool_calls = false;
        let mut response = String::new();

        for event in events {
            if let Some(content) = event.content() {
                for part in &content.parts {
                    match part {
                        Part::FunctionCall { name, .. } => {
                            has_tool_calls = true;
                        }
                        Part::Text { text } => {
                            response.push_str(text);
                        }
                        _ => {}
                    }
                }
            }
        }

        updates.insert("has_tool_calls".to_string(), json!(has_tool_calls));
        updates.insert("response".to_string(), json!(response));
        updates
    });

// Build graph with cycle
let react_agent = StateGraph::with_channels(&["question", "has_tool_calls", "response", "iteration"])
    .add_node(reasoner_node)
    .add_node_fn("counter", |ctx| async move {
        let i = ctx.get("iteration").and_then(|v| v.as_i64()).unwrap_or(0);
        Ok(NodeOutput::new().with_update("iteration", json!(i + 1)))
    })
    .add_edge(START, "counter")
    .add_edge("counter", "reasoner")
    .add_conditional_edges(
        "reasoner",
        |state| {
            let has_tools = state.get("has_tool_calls").and_then(|v| v.as_bool()).unwrap_or(false);
            let iteration = state.get("iteration").and_then(|v| v.as_i64()).unwrap_or(0);

            // Safety limit
            if iteration >= 5 { return END.to_string(); }

            if has_tools {
                "counter".to_string()  // Loop back
            } else {
                END.to_string()  // Done
            }
        },
        [("counter", "counter"), (END, END)],
    )
    .compile()?
    .with_recursion_limit(10);

Supervisor Multiagente

Encaminha tarefas para agentes especialistas:

// Create supervisor agent
let supervisor = Arc::new(
    LlmAgentBuilder::new("supervisor")
        .model(model.clone())
        .instruction("Route tasks to: researcher, writer, or coder. Reply with agent name only.")
        .build()?
);

let supervisor_node = AgentNode::new(supervisor)
    .with_output_mapper(|events| {
        let mut updates = std::collections::HashMap::new();
        for event in events {
            if let Some(content) = event.content() {
                let text: String = content.parts.iter()
                    .filter_map(|p| p.text())
                    .collect::<Vec<_>>()
                    .join("")
                    .to_lowercase();

                let next = if text.contains("researcher") { "researcher" }
                    else if text.contains("writer") { "writer" }
                    else if text.contains("coder") { "coder" }
                    else { "done" };

                updates.insert("next_agent".to_string(), json!(next));
            }
        }
        updates
    });

// Build supervisor graph
let graph = StateGraph::with_channels(&["task", "next_agent", "research", "content", "code"])
    .add_node(supervisor_node)
    .add_node(researcher_node)
    .add_node(writer_node)
    .add_node(coder_node)
    .add_edge(START, "supervisor")
    .add_conditional_edges(
        "supervisor",
        Router::by_field("next_agent"),
        [
            ("researcher", "researcher"),
            ("writer", "writer"),
            ("coder", "coder"),
            ("done", END),
        ],
    )
    // Agents report back to supervisor
    .add_edge("researcher", "supervisor")
    .add_edge("writer", "supervisor")
    .add_edge("coder", "supervisor")
    .compile()?;

Gerenciamento de Estado

Esquema de Estado com Reducers

Controla como as atualizações de estado são mescladas:

let schema = StateSchema::builder()
    .channel("current_step")                    // Overwrite (padrão)
    .list_channel("messages")                   // Adicionar à lista
    .channel_with_reducer("count", Reducer::Sum) // Somar valores
    .channel_with_reducer("data", Reducer::Custom(Arc::new(|old, new| {
        // Lógica de mesclagem customizada
        merge_json(old, new)
    })))
    .build();

let agent = GraphAgent::builder("stateful")
    .state_schema(schema)
    // ... nodes and edges
    .build()?;

Tipos de Reducer

ReducerComportamento
OverwriteSubstituir valor antigo por novo (padrão)
AppendAnexar à lista
SumAdicionar valores numéricos
CustomFunção de mesclagem customizada

Checkpointing

Habilita estado persistente para tolerância a falhas e human-in-the-loop:

Em Memória (Desenvolvimento)

use adk_graph::checkpoint::MemoryCheckpointer;

let checkpointer = Arc::new(MemoryCheckpointer::new());

let graph = StateGraph::with_channels(&["task", "result"])
    // ... nodes and edges
    .compile()?
    .with_checkpointer_arc(checkpointer.clone());

SQLite (Produção)

use adk_graph::checkpoint::SqliteCheckpointer;

let checkpointer = SqliteCheckpointer::new("checkpoints.db").await?;

let graph = StateGraph::with_channels(&["task", "result"])
    // ... nodes and edges
    .compile()?
    .with_checkpointer(checkpointer);

Histórico de Checkpoint (Viagem no Tempo)

// Listar todos os checkpoints para uma thread
let checkpoints = checkpointer.list("thread-id").await?;
for cp in checkpoints {
    println!("Step {}: {:?}", cp.step, cp.state.get("status"));
}

// Carregar um checkpoint específico
if let Some(checkpoint) = checkpointer.load_by_id(&checkpoint_id).await? {
    println!("State at step {}: {:?}", checkpoint.step, checkpoint.state);
}

Intervenção Humana (Human-in-the-Loop)

Pause a execução para aprovação humana usando interrupções dinâmicas:

use adk_graph::{error::GraphError, node::NodeOutput};

// Planner agent assesses risk
let planner_node = AgentNode::new(planner_agent)
    .with_output_mapper(|events| {
        let mut updates = std::collections::HashMap::new();
        for event in events {
            if let Some(content) = event.content() {
                let text: String = content.parts.iter()
                    .filter_map(|p| p.text())
                    .collect::<Vec<_>>()
                    .join("");

                // Extract risk level from LLM response
                let risk = if text.to_lowercase().contains("risk: high") { "high" }
                    else if text.to_lowercase().contains("risk: medium") { "medium" }
                    else { "low" };

                updates.insert("plan".to_string(), json!(text));
                updates.insert("risk_level".to_string(), json!(risk));
            }
        }
        updates
    });

// Review node with dynamic interrupt
let graph = StateGraph::with_channels(&["task", "plan", "risk_level", "approved", "result"])
    .add_node(planner_node)
    .add_node(executor_node)
    .add_node_fn("review", |ctx| async move {
        let risk = ctx.get("risk_level").and_then(|v| v.as_str()).unwrap_or("low");
        let approved = ctx.get("approved").and_then(|v| v.as_bool());

        // Already approved - continue
        if approved == Some(true) {
            return Ok(NodeOutput::new());
        }

        // High/medium risk - interrupt for approval
        if risk == "high" || risk == "medium" {
            return Ok(NodeOutput::interrupt_with_data(
                &format!("{} RISK: Human approval required", risk.to_uppercase()),
                json!({
                    "plan": ctx.get("plan"),
                    "risk_level": risk,
                    "action": "Set 'approved' to true to continue"
                })
            ));
        }

        // Low risk - auto-approve
        Ok(NodeOutput::new().with_update("approved", json!(true)))
    })
    .add_edge(START, "planner")
    .add_edge("planner", "review")
    .add_edge("review", "executor")
    .add_edge("executor", END)
    .compile()?
    .with_checkpointer_arc(checkpointer.clone());

// Execute - may pause for approval
let thread_id = "task-001";
let result = graph.invoke(input, ExecutionConfig::new(thread_id)).await;

match result {
    Err(GraphError::Interrupted(interrupt)) => {
        println!("*** EXECUTION PAUSED ***");
        println!("Reason: {}", interrupt.interrupt);
        println!("Plan awaiting approval: {:?}", interrupt.state.get("plan"));

        // Human reviews and approves...

        // Update state with approval
        graph.update_state(thread_id, [("approved".to_string(), json!(true))]).await?;

        // Resume execution
        let final_result = graph.invoke(State::new(), ExecutionConfig::new(thread_id)).await?;
        println!("Final result: {:?}", final_result.get("result"));
    }
    Ok(result) => {
        println!("Completed without interrupt: {:?}", result);
    }
    Err(e) => {
        println!("Error: {}", e);
    }
}

Interrupções Estáticas

Use interrupt_before ou interrupt_after para pontos de pausa obrigatórios:

let graph = StateGraph::with_channels(&["task", "plan", "result"])
    .add_node(planner_node)
    .add_node(executor_node)
    .add_edge(START, "planner")
    .add_edge("planner", "executor")
    .add_edge("executor", END)
    .compile()?
    .with_interrupt_before(&["executor"]);  // Always pause before execution

Execução em Streaming

Transmita eventos conforme o grafo é executado:

use futures::StreamExt;
use adk_graph::stream::StreamMode;

let stream = agent.stream(input, config, StreamMode::Updates);

while let Some(event) = stream.next().await {
    match event? {
        StreamEvent::NodeStart(name) => println!("Starting: {}", name),
        StreamEvent::Updates { node, updates } => {
            println!("{} updated state: {:?}", node, updates);
        }
        StreamEvent::NodeEnd(name) => println!("Completed: {}", name),
        StreamEvent::Done(state) => println!("Final state: {:?}", state),
        _ => {}
    }
}

Modos de Streaming

ModoDescrição
ValuesTransmite o estado completo após cada nó
UpdatesTransmite apenas as mudanças de estado
MessagesTransmite atualizações do tipo mensagem
DebugTransmite todos os eventos internos

Integração ADK

GraphAgent implementa a trait ADK Agent, então funciona com:

  • Runner: Use com adk-runner para execução padrão
  • Callbacks: Suporte completo para callbacks before/after
  • Sessions: Funciona com adk-session para histórico de conversas
  • Streaming: Retorna ADK EventStream
use adk_runner::Runner;

let graph_agent = GraphAgent::builder("workflow")
    .before_agent_callback(|ctx| async {
        println!("Starting graph execution for session: {}", ctx.session_id());
        Ok(())
    })
    .after_agent_callback(|ctx, event| async {
        if let Some(content) = event.content() {
            println!("Graph completed with content");
        }
        Ok(())
    })
    // ... graph definition
    .build()?;

// GraphAgent implements Agent trait - use with Launcher or Runner
// See adk-runner README for Runner configuration

Exemplos

Todos os exemplos usam integração real de LLM com AgentNode:

# Agentes LLM paralelos com callbacks before/after
cargo run --example graph_agent

# Pipeline multiagente sequencial (extrator → analisador → formatador)
cargo run --example graph_workflow

# Classificação de sentimento baseada em LLM e roteamento condicional
cargo run --example graph_conditional

# Padrão ReAct com tools e execução cíclica
cargo run --example graph_react

# Supervisor multiagente roteando para especialistas
cargo run --example graph_supervisor

# Humano-no-loop com interrupções baseadas em risco
cargo run --example graph_hitl

# Checkpointing e depuração com "viagem no tempo"
cargo run --example graph_checkpoint

Comparação com LangGraph

FuncionalidadeLangGraphadk-graph
Gerenciamento de EstadoTypedDict + ReducersStateSchema + Reducers
Modelo de ExecuçãoPregel super-stepsPregel super-steps
CheckpointingMemória, SQLite, PostgresMemória, SQLite
Humano-no-Loopinterrupt_before/afterinterrupt_before/after + dinâmico
Streaming5 modos5 modos
CiclosSuporte nativoSuporte nativo
Segurança de TipoTipagem PythonSistema de tipos Rust
Integração LLMLangChainAgentNode + ADK agents

Anterior: ← Sistemas Multiagente | Próximo: Agentes em Tempo Real →