Agentes de Grafo

Construya flujos de trabajo complejos y con estado utilizando orquestación al estilo LangGraph con integración nativa de ADK-Rust.

Resumen

GraphAgent le permite definir flujos de trabajo como grafos dirigidos con nodos y aristas, lo que permite:

  • AgentNode: Envuelva los agentes LLM como nodos de grafo con mapeadores de entrada/salida personalizados
  • Flujos de trabajo cíclicos: Soporte nativo para bucles y razonamiento iterativo (ReAct pattern)
  • Enrutamiento condicional: Enrutamiento dinámico de aristas basado en el estado
  • Gestión de estado: Estado tipado con reductores (sobrescribir, añadir, sumar, personalizado)
  • Checkpointing: Estado persistente para tolerancia a fallos y asistencia humana
  • Streaming: Múltiples modos de flujo (valores, actualizaciones, mensajes, depuración)

El crate adk-graph proporciona orquestación de flujos de trabajo al estilo LangGraph para construir flujos de trabajo de agente complejos y con estado. Aporta capacidades de flujo de trabajo basadas en grafos al ecosistema ADK-Rust manteniendo una compatibilidad total con el sistema de agentes de ADK.

Beneficios clave:

  • Diseño visual de flujos de trabajo: Defina lógica compleja como grafos intuitivos de nodos y aristas
  • Ejecución paralela: Múltiples nodos pueden ejecutarse simultáneamente para un mejor rendimiento
  • Persistencia de estado: Checkpointing integrado para tolerancia a fallos y asistencia humana
  • Integración de LLM: Soporte nativo para envolver agentes ADK como nodos de grafo
  • Enrutamiento flexible: Aristas estáticas, enrutamiento condicional y toma de decisiones dinámica

Lo que construirás

En esta guía, crearás una Pipeline de Procesamiento de Texto que ejecuta la traducción y el resumen en paralelo:

                        ┌─────────────────────┐
       User Input       │                     │
      ────────────────▶ │       INICIO        │
                        │                     │
                        └──────────┬──────────┘
                                   │
                   ┌───────────────┴───────────────┐
                   │                               │
                   ▼                               ▼
        ┌──────────────────┐            ┌──────────────────┐
        │    TRADUCTOR     │            │   RESUMIDOR      │
        │                  │            │                  │
        │  🇫🇷 Traducción   │            │  📝 Resumen      │
        │     al francés   │            │     de una frase │
        └─────────┬────────┘            └─────────┬────────┘
                  │                               │
                  └───────────────┬───────────────┘
                                  │
                                  ▼
                        ┌─────────────────────┐
                        │      COMBINAR       │
                        │                     │
                        │  📋 Unir Resultados │
                        └──────────┬──────────┘
                                   │
                                   ▼
                        ┌─────────────────────┐
                        │       FINAL         │
                        │                     │
                        │   ✅ Completado     │
                        └─────────────────────┘

Conceptos Clave:

  • Nodes - Unidades de procesamiento que realizan trabajo (agentes LlmAgent, funciones o lógica personalizada)
  • Edges - Flujo de control entre nodes (conexiones estáticas o enrutamiento condicional)
  • State - Datos compartidos que fluyen a través del graph y persisten entre nodes
  • Parallel Execution - Múltiples nodes pueden ejecutarse simultáneamente para un mejor rendimiento

Comprendiendo los Componentes Principales

🔧 Nodes: Los Trabajadores Los Nodes son donde ocurre el trabajo real. Cada node puede:

  • AgentNode: Envolver un LlmAgent para procesar lenguaje natural
  • Function Node: Ejecutar código Rust personalizado para el procesamiento de datos
  • Built-in Nodes: Usar lógica predefinida como contadores o validadores

Piensa en los nodes como trabajadores especializados en una línea de montaje: cada uno tiene un trabajo y una experiencia específicos.

🔀 Edges: El Control de Flujo Los Edges determinan cómo se mueve la ejecución a través de tu graph:

  • Static Edges: Conexiones directas (A → B → C)
  • Conditional Edges: Enrutamiento dinámico basado en State (if sentiment == "positive" → positive_handler)
  • Parallel Edges: Múltiples rutas desde un node (START → [translator, summarizer])

Los Edges son como señales de tráfico y señales de carretera que dirigen el flujo de trabajo.

💾 State: La Memoria Compartida State es un almacén de clave-valor que todos los nodes pueden leer y escribir:

  • Input Data: Información inicial introducida en el graph
  • Intermediate Results: La salida de un node se convierte en la entrada para otro
  • Final Output: El resultado completado después de todo el procesamiento

State actúa como una pizarra compartida donde los nodes pueden dejar información para que otros la utilicen.

⚡ Parallel Execution: El Aumento de Velocidad Cuando múltiples edges salen de un node, esos nodes de destino se ejecutan simultáneamente:

  • Faster Processing: Las tareas independientes se ejecutan al mismo tiempo
  • Resource Efficiency: Mejor utilización de CPU y E/S
  • Scalability: Maneja flujos de trabajo más complejos sin una desaceleración lineal

Esto es como tener varios trabajadores abordando diferentes partes de un trabajo simultáneamente en lugar de esperar en la fila.


Inicio Rápido

1. Crea Tu Proyecto

cargo new graph_demo
cd graph_demo

Añade dependencias a Cargo.toml:

[dependencies]
adk-graph = { version = "0.2", features = ["sqlite"] }
adk-agent = "0.2"
adk-model = "0.2"
adk-core = "0.2"
tokio = { version = "1", features = ["full"] }
dotenvy = "0.15"
serde_json = "1.0"

Crea .env con tu GOOGLE_API_KEY:

echo 'GOOGLE_API_KEY=your-api-key' > .env

2. Ejemplo de Procesamiento Paralelo

Aquí tienes un ejemplo completo y funcional que procesa texto en paralelo:

use adk_agent::LlmAgentBuilder;
use adk_graph::{
    agent::GraphAgent,
    edge::{END, START},
    node::{AgentNode, ExecutionConfig, NodeOutput},
    state::State,
};
use adk_model::GeminiModel;
use serde_json::json;
use std::sync::Arc;

#[tokio::main]
async fn main() -> anyhow::Result<()> {
    dotenvy::dotenv().ok();
    let api_key = std::env::var("GOOGLE_API_KEY")?;
    let model = Arc::new(GeminiModel::new(&api_key, "gemini-2.0-flash")?);

    // Create specialized LLM agents
    let translator_agent = Arc::new(
        LlmAgentBuilder::new("translator")
            .description("Translates text to French")
            .model(model.clone())
            .instruction("Translate the input text to French. Only output the translation.")
            .build()?,
    );

    let summarizer_agent = Arc::new(
        LlmAgentBuilder::new("summarizer")
            .description("Summarizes text")
            .model(model.clone())
            .instruction("Summarize the input text in one sentence.")
            .build()?,
    );

    // Wrap agents as graph nodes with input/output mappers
    let translator_node = AgentNode::new(translator_agent)
        .with_input_mapper(|state| {
            let text = state.get("input").and_then(|v| v.as_str()).unwrap_or("");
            adk_core::Content::new("user").with_text(text)
        })
        .with_output_mapper(|events| {
            let mut updates = std::collections::HashMap::new();
            for event in events {
                if let Some(content) = event.content() {
                    let text: String = content.parts.iter()
                        .filter_map(|p| p.text())
                        .collect::<Vec<_>>()
                        .join("");
                    if !text.is_empty() {
                        updates.insert("translation".to_string(), json!(text));
                    }
                }
            }
            updates
        });

    let summarizer_node = AgentNode::new(summarizer_agent)
        .with_input_mapper(|state| {
            let text = state.get("input").and_then(|v| v.as_str()).unwrap_or("");
            adk_core::Content::new("user").with_text(text)
        })
        .with_output_mapper(|events| {
            let mut updates = std::collections::HashMap::new();
            for event in events {
                if let Some(content) = event.content() {
                    let text: String = content.parts.iter()
                        .filter_map(|p| p.text())
                        .collect::<Vec<_>>()
                        .join("");
                    if !text.is_empty() {
                        updates.insert("summary".to_string(), json!(text));
                    }
                }
            }
            updates
        });

    // Build the graph with parallel execution
    let agent = GraphAgent::builder("text_processor")
        .description("Processes text with translation and summarization in parallel")
        .channels(&["input", "translation", "summary", "result"])
        .node(translator_node)
        .node(summarizer_node)
        .node_fn("combine", |ctx| async move {
            let translation = ctx.get("translation").and_then(|v| v.as_str()).unwrap_or("N/A");
            let summary = ctx.get("summary").and_then(|v| v.as_str()).unwrap_or("N/A");

            let result = format!(
                "=== Processing Complete ===\n\n\
                French Translation:\n{}\n\n\
                Summary:\n{}",
                translation, summary
            );

            Ok(NodeOutput::new().with_update("result", json!(result)))
        })
        // Parallel execution: both nodes start simultaneously
        .edge(START, "translator")
        .edge(START, "summarizer")
        .edge("translator", "combine")
        .edge("summarizer", "combine")
        .edge("combine", END)
        .build()?;

    // Execute the graph
    let mut input = State::new();
    input.insert("input".to_string(), json!("AI is transforming how we work and live."));

    let result = agent.invoke(input, ExecutionConfig::new("thread-1")).await?;
    println!("{}", result.get("result").and_then(|v| v.as_str()).unwrap_or(""));

    Ok(())
}

Salida de Ejemplo:

=== Processing Complete ===

French Translation:
L'IA transforme notre façon de travailler et de vivre.

Summary:
AI is revolutionizing work and daily life through technological transformation.

Cómo funciona la ejecución de grafos

La visión general

Los Agent de Graph se ejecutan en super-pasos: todos los nodos listos se ejecutan en paralelo, luego el grafo espera a que todos se completen antes del siguiente paso:

Step 1: START ──┬──▶ translator (running)
                └──▶ summarizer (running)
                
                ⏳ Esperar a que ambos se completen...
                
Step 2: translator ──┬──▶ combine (running)
        summarizer ──┘
        
                ⏳ Esperar a que combine se complete...
                
Step 3: combine ──▶ END ✅

Flujo de estado a través de los nodos

Cada nodo puede leer y escribir en el estado compartido:

┌─────────────────────────────────────────────────────────────────────┐
│ PASO 1: Estado inicial                                              │
├─────────────────────────────────────────────────────────────────────┤
│                                                                     │
│   State: { "input": "AI is transforming how we work" }             │
│                                                                     │
│                              ↓                                      │
│                                                                     │
│   ┌──────────────────┐              ┌──────────────────┐           │
│   │   translator     │              │   summarizer     │           │
│   │  lee "input"     │              │  lee "input"     │           │
│   └──────────────────┘              └──────────────────┘           │
│                                                                     │
└─────────────────────────────────────────────────────────────────────┘
                              ↓
┌─────────────────────────────────────────────────────────────────────┐
│ PASO 2: Después de la ejecución paralela                            │
├─────────────────────────────────────────────────────────────────────┤
│                                                                     │
│   State: {                                                          │
│     "input": "AI is transforming how we work",                     │
│     "translation": "L'IA transforme notre façon de travailler",    │
│     "summary": "AI is revolutionizing work through technology"     │
│   }                                                                 │
│                                                                     │
│                              ↓                                      │
│                                                                     │
│   ┌──────────────────────────────────────┐                         │
│   │           combine                    │                         │
│   │  lee "translation" + "summary"       │                         │
│   │  escribe "result"                    │                         │
│   └──────────────────────────────────────┘                         │
│                                                                     │
└─────────────────────────────────────────────────────────────────────┘
                              ↓
┌─────────────────────────────────────────────────────────────────────┐
│ PASO 3: Estado final                                                │
├─────────────────────────────────────────────────────────────────────┤
│                                                                     │
│   State: {                                                          │
│     "input": "AI is transforming how we work",                     │
│     "translation": "L'IA transforme notre façon de travailler",    │
│     "summary": "AI is revolutionizing work through technology",    │
│     "result": "=== Processing Complete ===\n\nFrench..."          │
│   }                                                                 │
│                                                                     │
└─────────────────────────────────────────────────────────────────────┘

Qué lo hace funcionar

ComponenteRol
AgentNodeEnvuelve los LlmAgent con mapeadores de entrada/salida
input_mapperTransforma el estado → entrada de LlmAgent Content
output_mapperTransforma los eventos del LlmAgent → actualizaciones de estado
channelsDeclara los campos de estado que usará el grafo
edge()Define el flujo de ejecución entre nodos
ExecutionConfigProporciona el ID de hilo para el checkpointing

Enrutamiento Condicional con Clasificación LLM

Construye sistemas de enrutamiento inteligentes donde los LLM decidan la ruta de ejecución:

Visual: Enrutamiento Basado en el Sentimiento

                        ┌─────────────────────┐
     Comentarios del    │                     │
       Usuario        ────────────────▶ │    CLASIFICADOR     │
                        │  🧠 Analizar tono   │
                        └──────────┬──────────┘
                                   │
                   ┌───────────────┼───────────────┐
                   │               │               │
                   ▼               ▼               ▼
        ┌──────────────────┐ ┌──────────────────┐ ┌──────────────────┐
        │    POSITIVO      │ │    NEGATIVE      │ │     NEUTRO       │
        │                  │ │                  │ │                  │
        │  😊 ¡Gracias!    │ │  😔 Disculparse  │ │  😐 Preguntar más│
        │     Celebrar     │ │ Ayudar a solucionar│ │     preguntas    │
        └──────────────────┘ └──────────────────┘ └──────────────────┘

Código de Ejemplo Completo

use adk_agent::LlmAgentBuilder;
use adk_graph::{
    edge::{END, Router, START},
    graph::StateGraph,
    node::{AgentNode, ExecutionConfig},
    state::State,
};
use adk_model::GeminiModel;
use serde_json::json;
use std::sync::Arc;

#[tokio::main]
async fn main() -> anyhow::Result<()> {
    dotenvy::dotenv().ok();
    let api_key = std::env::var("GOOGLE_API_KEY")?;
    let model = Arc::new(GeminiModel::new(&api_key, "gemini-2.0-flash")?);

    // Create classifier agent
    let classifier_agent = Arc::new(
        LlmAgentBuilder::new("classifier")
            .description("Classifies text sentiment")
            .model(model.clone())
            .instruction(
                "You are a sentiment classifier. Analyze the input text and respond with \
                ONLY one word: 'positive', 'negative', or 'neutral'. Nothing else.",
            )
            .build()?,
    );

    // Create response agents for each sentiment
    let positive_agent = Arc::new(
        LlmAgentBuilder::new("positive")
            .description("Handles positive feedback")
            .model(model.clone())
            .instruction(
                "You are a customer success specialist. The customer has positive feedback. \
                Express gratitude, reinforce the positive experience, and suggest ways to \
                share their experience. Be warm and appreciative. Keep response under 3 sentences.",
            )
            .build()?,
    );

    let negative_agent = Arc::new(
        LlmAgentBuilder::new("negative")
            .description("Handles negative feedback")
            .model(model.clone())
            .instruction(
                "You are a customer support specialist. The customer has a complaint. \
                Acknowledge their frustration, apologize sincerely, and offer help. \
                Be empathetic. Keep response under 3 sentences.",
            )
            .build()?,
    );

    let neutral_agent = Arc::new(
        LlmAgentBuilder::new("neutral")
            .description("Handles neutral feedback")
            .model(model.clone())
            .instruction(
                "You are a customer service representative. The customer has neutral feedback. \
                Ask clarifying questions to better understand their needs. Be helpful and curious. \
                Keep response under 3 sentences.",
            )
            .build()?,
    );

    // Create AgentNodes with mappers
    let classifier_node = AgentNode::new(classifier_agent)
        .with_input_mapper(|state| {
            let text = state.get("feedback").and_then(|v| v.as_str()).unwrap_or("");
            adk_core::Content::new("user").with_text(text)
        })
        .with_output_mapper(|events| {
            let mut updates = std::collections::HashMap::new();
            for event in events {
                if let Some(content) = event.content() {
                    let text: String = content.parts.iter()
                        .filter_map(|p| p.text())
                        .collect::<Vec<_>>()
                        .join("")
                        .to_lowercase()
                        .trim()
                        .to_string();
                    
                    let sentiment = if text.contains("positive") { "positive" }
                        else if text.contains("negative") { "negative" }
                        else { "neutral" };
                    
                    updates.insert("sentiment".to_string(), json!(sentiment));
                }
            }
            updates
        });

    // Response nodes (similar pattern for each)
    let positive_node = AgentNode::new(positive_agent)
        .with_input_mapper(|state| {
            let text = state.get("feedback").and_then(|v| v.as_str()).unwrap_or("");
            adk_core::Content::new("user").with_text(text)
        })
        .with_output_mapper(|events| {
            let mut updates = std::collections::HashMap::new();
            for event in events {
                if let Some(content) = event.content() {
                    let text: String = content.parts.iter()
                        .filter_map(|p| p.text())
                        .collect::<Vec<_>>()
                        .join("");
                    updates.insert("response".to_string(), json!(text));
                }
            }
            updates
        });

    // Build graph with conditional routing
    let graph = StateGraph::with_channels(&["feedback", "sentiment", "response"])
        .add_node(classifier_node)
        .add_node(positive_node)
        // ... add negative_node and neutral_node similarly
        .add_edge(START, "classifier")
        .add_conditional_edges(
            "classifier",
            Router::by_field("sentiment"),  // Route based on sentiment field
            [
                ("positive", "positive"),
                ("negative", "negative"),
                ("neutral", "neutral"),
            ],
        )
        .add_edge("positive", END)
        .add_edge("negative", END)
        .add_edge("neutral", END)
        .compile()?;

    // Test with different feedback
    let mut input = State::new();
    input.insert("feedback".to_string(), json!("Your product is amazing! I love it!"));

    let result = graph.invoke(input, ExecutionConfig::new("feedback-1")).await?;
    println!("Sentiment: {}", result.get("sentiment").and_then(|v| v.as_str()).unwrap_or(""));
    println!("Response: {}", result.get("response").and_then(|v| v.as_str()).unwrap_or(""));

    Ok(())
}

Flujo de Ejemplo:

Input: "Your product is amazing! I love it!"
       ↓
Classifier: "positive"
       ↓
Agente Positivo: "¡Muchas gracias por los maravillosos comentarios!
                Estamos encantados de que te encante nuestro producto.
                ¿Considerarías dejar una reseña para ayudar a otros?"

Patrón ReAct: Razonamiento + Actuación

Construye agents que pueden usar tools de forma iterativa para resolver problemas complejos:

Visual: Ciclo ReAct

                        ┌─────────────────────┐
       Pregunta del Usuario   │                     │
      ────────────────▶ │      REASONER       │
                        │  🧠 Piensa + Actúa   │
                        └──────────┬──────────┘
                                   │
                                   ▼
                        ┌─────────────────────┐
                        │   ¿Hay llamadas a tool?   │
                        │                     │
                        └──────────┬──────────┘
                                   │
                   ┌───────────────┴───────────────┐
                   │                               │
                   ▼                               ▼
        ┌──────────────────┐            ┌──────────────────┐
        │       SÍ         │            │        NO        │
        │                  │            │                  │
        │  🔄 Vuelve al reasoner │            │  ✅ Respuesta final FIN │
        │                  │            │                     │
        └─────────┬────────┘            └──────────────────┘
                  │
                  └─────────────────┐
                                    │
                                    ▼
                        ┌─────────────────────┐
                        │      REASONER       │
                        │  🧠 Piensa + Actúa   │
                        │   (siguiente iteración)  │
                        └─────────────────────┘

Ejemplo Completo de ReAct

use adk_agent::LlmAgentBuilder;
use adk_core::{Part, Tool};
use adk_graph::{
    edge::{END, START},
    graph::StateGraph,
    node::{AgentNode, ExecutionConfig, NodeOutput},
    state::State,
};
use adk_model::GeminiModel;
use adk_tool::FunctionTool;
use serde_json::json;
use std::sync::Arc;

#[tokio::main]
async fn main() -> anyhow::Result<()> {
    dotenvy::dotenv().ok();
    let api_key = std::env::var("GOOGLE_API_KEY")?;
    let model = Arc::new(GeminiModel::new(&api_key, "gemini-2.0-flash")?);

    // Create tools
    let weather_tool = Arc::new(FunctionTool::new(
        "get_weather",
        "Get the current weather for a location. Takes a 'location' parameter (city name).",
        |_ctx, args| async move {
            let location = args.get("location").and_then(|v| v.as_str()).unwrap_or("unknown");
            Ok(json!({
                "location": location,
                "temperature": "72°F",
                "condition": "Sunny",
                "humidity": "45%"
            }))
        },
    )) as Arc<dyn Tool>;

    let calculator_tool = Arc::new(FunctionTool::new(
        "calculator",
        "Perform mathematical calculations. Takes an 'expression' parameter (string).",
        |_ctx, args| async move {
            let expr = args.get("expression").and_then(|v| v.as_str()).unwrap_or("0");
            let result = match expr {
                "2 + 2" => "4",
                "10 * 5" => "50",
                "100 / 4" => "25",
                "15 - 7" => "8",
                _ => "Unable to evaluate",
            };
            Ok(json!({ "result": result, "expression": expr }))
        },
    )) as Arc<dyn Tool>;

    // Create reasoner agent with tools
    let reasoner_agent = Arc::new(
        LlmAgentBuilder::new("reasoner")
            .description("Reasoning agent with tools")
            .model(model.clone())
            .instruction(
                "You are a helpful assistant with access to tools. Use tools when needed to answer questions. \
                When you have enough information, provide a final answer without using more tools.",
            )
            .tool(weather_tool)
            .tool(calculator_tool)
            .build()?,
    );

    // Create reasoner node that detects tool usage
    let reasoner_node = AgentNode::new(reasoner_agent)
        .with_input_mapper(|state| {
            let question = state.get("question").and_then(|v| v.as_str()).unwrap_or("");
            adk_core::Content::new("user").with_text(question)
        })
        .with_output_mapper(|events| {
            let mut updates = std::collections::HashMap::new();
            let mut has_tool_calls = false;
            let mut response = String::new();

            for event in events {
                if let Some(content) = event.content() {
                    for part in &content.parts {
                        match part {
                            Part::FunctionCall { .. } => {
                                has_tool_calls = true;
                            }
                            Part::Text { text } => {
                                response.push_str(text);
                            }
                            _ => {}
                        }
                    }
                }
            }

            updates.insert("has_tool_calls".to_string(), json!(has_tool_calls));
            updates.insert("response".to_string(), json!(response));
            updates
        });

    // Build ReAct graph with cycle
    let graph = StateGraph::with_channels(&["question", "has_tool_calls", "response", "iteration"])
        .add_node(reasoner_node)
        .add_node_fn("counter", |ctx| async move {
            let i = ctx.get("iteration").and_then(|v| v.as_i64()).unwrap_or(0);
            Ok(NodeOutput::new().with_update("iteration", json!(i + 1)))
        })
        .add_edge(START, "counter")
        .add_edge("counter", "reasoner")
        .add_conditional_edges(
            "reasoner",
            |state| {
                let has_tools = state.get("has_tool_calls").and_then(|v| v.as_bool()).unwrap_or(false);
                let iteration = state.get("iteration").and_then(|v| v.as_i64()).unwrap_or(0);

                // Safety limit
                if iteration >= 5 { return END.to_string(); }

                if has_tools {
                    "counter".to_string()  // Loop back for more reasoning
                } else {
                    END.to_string()  // Done - final answer
                }
            },
            [("counter", "counter"), (END, END)],
        )
        .compile()?
        .with_recursion_limit(10);

    // Test the ReAct agent
    let mut input = State::new();
    input.insert("question".to_string(), json!("What's the weather in Paris and what's 15 + 25?"));

    let result = graph.invoke(input, ExecutionConfig::new("react-1")).await?;
    println!("Final answer: {}", result.get("response").and_then(|v| v.as_str()).unwrap_or(""));
    println!("Iterations: {}", result.get("iteration").and_then(|v| v.as_i64()).unwrap_or(0));

    Ok(())
}

Flujo de Ejemplo:

Question: "What's the weather in Paris and what's 15 + 25?"

Iteración 1:
  Reasoner: "Necesito obtener información del clima y hacer cálculos matemáticos"
  → Llama a get_weather(location="Paris") y calculator(expression="15 + 25")
  → has_tool_calls = true → Vuelve al bucle

Iteración 2:
  Reasoner: "Basado en los resultados: París está a 72°F y soleado, 15 + 25 = 40"
  → No hay llamadas a tool → has_tool_calls = false → END

Respuesta Final: "El clima en París es de 72°F y soleado con 45% de humedad. 
              Y 15 + 25 es igual a 40."

AgentNode

Envuelve cualquier Agent de ADK (típicamente LlmAgent) como un nodo de grafo:

let node = AgentNode::new(llm_agent)
    .with_input_mapper(|state| {
        // Transform graph state to agent input Content
        let text = state.get("input").and_then(|v| v.as_str()).unwrap_or("");
        adk_core::Content::new("user").with_text(text)
    })
    .with_output_mapper(|events| {
        // Transform agent events to state updates
        let mut updates = std::collections::HashMap::new();
        for event in events {
            if let Some(content) = event.content() {
                let text: String = content.parts.iter()
                    .filter_map(|p| p.text())
                    .collect::<Vec<_>>()
                    .join("");
                updates.insert("output".to_string(), json!(text));
            }
        }
        updates
    });

Nodos de Function

Simples funciones async que procesan el state:

.node_fn("process", |ctx| async move {
    let input = ctx.state.get("input").unwrap();
    let output = process_data(input).await?;
    Ok(NodeOutput::new().with_update("output", output))
})

Tipos de Aristas

Aristas Estáticas

Conexiones directas entre nodos:

.edge(START, "first_node")
.edge("first_node", "second_node")
.edge("second_node", END)

Aristas Condicionales

Enrutamiento dinámico basado en el estado:

.conditional_edge(
    "router",
    |state| {
        match state.get("next").and_then(|v| v.as_str()) {
            Some("research") => "research_node".to_string(),
            Some("write") => "write_node".to_string(),
            _ => END.to_string(),
        }
    },
    [
        ("research_node", "research_node"),
        ("write_node", "write_node"),
        (END, END),
    ],
)

Ayudantes de Router

Utilice routers incorporados para patrones comunes:

use adk_graph::edge::Router;

// Route based on a state field value
.conditional_edge("classifier", Router::by_field("sentiment"), [
    ("positive", "positive_handler"),
    ("negative", "negative_handler"),
    ("neutral", "neutral_handler"),
])

// Route based on boolean field
.conditional_edge("check", Router::by_bool("approved"), [
    ("true", "execute"),
    ("false", "reject"),
])

// Limit iterations
.conditional_edge("loop", Router::max_iterations("count", 5), [
    ("continue", "process"),
    ("done", END),
])

Ejecución Paralela

Múltiples aristas desde un único nodo se ejecutan en paralelo:

let agent = GraphAgent::builder("parallel_processor")
    .channels(&["input", "translation", "summary", "analysis"])
    .node(translator_node)
    .node(summarizer_node)
    .node(analyzer_node)
    .node(combiner_node)
    // All three start simultaneously
    .edge(START, "translator")
    .edge(START, "summarizer")
    .edge(START, "analyzer")
    // Wait for all to complete before combining
    .edge("translator", "combiner")
    .edge("summarizer", "combiner")
    .edge("analyzer", "combiner")
    .edge("combiner", END)
    .build()?;

Gráficos Cíclicos (Patrón ReAct)

Construye agentes de razonamiento iterativo con ciclos:

use adk_core::Part;

// Create agent with tools
let reasoner = Arc::new(
    LlmAgentBuilder::new("reasoner")
        .model(model)
        .instruction("Use tools to answer questions. Provide final answer when done.")
        .tool(search_tool)
        .tool(calculator_tool)
        .build()?
);

let reasoner_node = AgentNode::new(reasoner)
    .with_input_mapper(|state| {
        let question = state.get("question").and_then(|v| v.as_str()).unwrap_or("");
        adk_core::Content::new("user").with_text(question)
    })
    .with_output_mapper(|events| {
        let mut updates = std::collections::HashMap::new();
        let mut has_tool_calls = false;
        let mut response = String::new();

        for event in events {
            if let Some(content) = event.content() {
                for part in &content.parts {
                    match part {
                        Part::FunctionCall { name, .. } => {
                            has_tool_calls = true;
                        }
                        Part::Text { text } => {
                            response.push_str(text);
                        }
                        _ => {}
                    }
                }
            }
        }

        updates.insert("has_tool_calls".to_string(), json!(has_tool_calls));
        updates.insert("response".to_string(), json!(response));
        updates
    });

// Build graph with cycle
let react_agent = StateGraph::with_channels(&["question", "has_tool_calls", "response", "iteration"])
    .add_node(reasoner_node)
    .add_node_fn("counter", |ctx| async move {
        let i = ctx.get("iteration").and_then(|v| v.as_i64()).unwrap_or(0);
        Ok(NodeOutput::new().with_update("iteration", json!(i + 1)))
    })
    .add_edge(START, "counter")
    .add_edge("counter", "reasoner")
    .add_conditional_edges(
        "reasoner",
        |state| {
            let has_tools = state.get("has_tool_calls").and_then(|v| v.as_bool()).unwrap_or(false);
            let iteration = state.get("iteration").and_then(|v| v.as_i64()).unwrap_or(0);

            // Safety limit
            if iteration >= 5 { return END.to_string(); }

            if has_tools {
                "counter".to_string()  // Loop back
            } else {
                END.to_string()  // Done
            }
        },
        [("counter", "counter"), (END, END)],
    )
    .compile()?
    .with_recursion_limit(10);

Supervisor Multi-Agente

Dirige tareas a agentes especialistas:

// Crear agente supervisor
let supervisor = Arc::new(
    LlmAgentBuilder::new("supervisor")
        .model(model.clone())
        .instruction("Route tasks to: researcher, writer, or coder. Reply with agent name only.")
        .build()?
);

let supervisor_node = AgentNode::new(supervisor)
    .with_output_mapper(|events| {
        let mut updates = std::collections::HashMap::new();
        for event in events {
            if let Some(content) = event.content() {
                let text: String = content.parts.iter()
                    .filter_map(|p| p.text())
                    .collect::<Vec<_>>()
                    .join("")
                    .to_lowercase();

                let next = if text.contains("researcher") { "researcher" }
                    else if text.contains("writer") { "writer" }
                    else if text.contains("coder") { "coder" }
                    else { "done" };

                updates.insert("next_agent".to_string(), json!(next));
            }
        }
        updates
    });

// Construir grafo supervisor
let graph = StateGraph::with_channels(&["task", "next_agent", "research", "content", "code"])
    .add_node(supervisor_node)
    .add_node(researcher_node)
    .add_node(writer_node)
    .add_node(coder_node)
    .add_edge(START, "supervisor")
    .add_conditional_edges(
        "supervisor",
        Router::by_field("next_agent"),
        [
            ("researcher", "researcher"),
            ("writer", "writer"),
            ("coder", "coder"),
            ("done", END),
        ],
    )
    // Los agentes reportan al supervisor
    .add_edge("researcher", "supervisor")
    .add_edge("writer", "supervisor")
    .add_edge("coder", "supervisor")
    .compile()?;

Gestión de Estado

Esquema de Estado con Reductores

Controla cómo se fusionan las actualizaciones de estado:

let schema = StateSchema::builder()
    .channel("current_step")                    // Sobreescribir (por defecto)
    .list_channel("messages")                   // Añadir a la lista
    .channel_with_reducer("count", Reducer::Sum) // Sumar valores
    .channel_with_reducer("data", Reducer::Custom(Arc::new(|old, new| {
        // Lógica de fusión personalizada
        merge_json(old, new)
    })))
    .build();

let agent = GraphAgent::builder("stateful")
    .state_schema(schema)
    // ... nodos y aristas
    .build()?;

Tipos de Reductor

ReductorComportamiento
OverwriteReemplaza el valor antiguo con el nuevo (por defecto)
AppendAñade a la lista
SumSuma valores numéricos
CustomFunción de fusión personalizada

Puntos de Control

Habilita el estado persistente para la tolerancia a fallos y la intervención humana:

En Memoria (Desarrollo)

use adk_graph::checkpoint::MemoryCheckpointer;

let checkpointer = Arc::new(MemoryCheckpointer::new());

let graph = StateGraph::with_channels(&["task", "result"])
    // ... nodos y aristas
    .compile()?
    .with_checkpointer_arc(checkpointer.clone());

SQLite (Producción)

use adk_graph::checkpoint::SqliteCheckpointer;

let checkpointer = SqliteCheckpointer::new("checkpoints.db").await?;

let graph = StateGraph::with_channels(&["task", "result"])
    // ... nodos y aristas
    .compile()?
    .with_checkpointer(checkpointer);

Historial de Puntos de Control (Viaje en el Tiempo)

// Listar todos los puntos de control para un hilo
let checkpoints = checkpointer.list("thread-id").await?;
for cp in checkpoints {
    println!("Paso {}: {:?}", cp.step, cp.state.get("status"));
}

// Cargar un punto de control específico
if let Some(checkpoint) = checkpointer.load_by_id(&checkpoint_id).await? {
    println!("Estado en el paso {}: {:?}", checkpoint.step, checkpoint.state);
}

Human-in-the-Loop

Pausa la ejecución para la aprobación humana utilizando interrupciones dinámicas:

use adk_graph::{error::GraphError, node::NodeOutput};

// Planner agent assesses risk
let planner_node = AgentNode::new(planner_agent)
    .with_output_mapper(|events| {
        let mut updates = std::collections::HashMap::new();
        for event in events {
            if let Some(content) = event.content() {
                let text: String = content.parts.iter()
                    .filter_map(|p| p.text())
                    .collect::<Vec<_>>()
                    .join("");

                // Extract risk level from LLM response
                let risk = if text.to_lowercase().contains("risk: high") { "high" }
                    else if text.to_lowercase().contains("risk: medium") { "medium" }
                    else { "low" };

                updates.insert("plan".to_string(), json!(text));
                updates.insert("risk_level".to_string(), json!(risk));
            }
        }
        updates
    });

// Review node with dynamic interrupt
let graph = StateGraph::with_channels(&["task", "plan", "risk_level", "approved", "result"])
    .add_node(planner_node)
    .add_node(executor_node)
    .add_node_fn("review", |ctx| async move {
        let risk = ctx.get("risk_level").and_then(|v| v.as_str()).unwrap_or("low");
        let approved = ctx.get("approved").and_then(|v| v.as_bool());

        // Already approved - continue
        if approved == Some(true) {
            return Ok(NodeOutput::new());
        }

        // High/medium risk - interrupt for approval
        if risk == "high" || risk == "medium" {
            return Ok(NodeOutput::interrupt_with_data(
                &format!("{} RISK: Human approval required", risk.to_uppercase()),
                json!({
                    "plan": ctx.get("plan"),
                    "risk_level": risk,
                    "action": "Set 'approved' to true to continue"
                })
            ));
        }

        // Low risk - auto-approve
        Ok(NodeOutput::new().with_update("approved", json!(true)))
    })
    .add_edge(START, "planner")
    .add_edge("planner", "review")
    .add_edge("review", "executor")
    .add_edge("executor", END)
    .compile()?
    .with_checkpointer_arc(checkpointer.clone());

// Execute - may pause for approval
let thread_id = "task-001";
let result = graph.invoke(input, ExecutionConfig::new(thread_id)).await;

match result {
    Err(GraphError::Interrupted(interrupt)) => {
        println!("*** EXECUTION PAUSED ***");
        println!("Reason: {}", interrupt.interrupt);
        println!("Plan awaiting approval: {:?}", interrupt.state.get("plan"));

        // Human reviews and approves...

        // Update state with approval
        graph.update_state(thread_id, [("approved".to_string(), json!(true))]).await?;

        // Resume execution
        let final_result = graph.invoke(State::new(), ExecutionConfig::new(thread_id)).await?;
        println!("Final result: {:?}", final_result.get("result"));
    }
    Ok(result) => {
        println!("Completed without interrupt: {:?}", result);
    }
    Err(e) => {
        println!("Error: {}", e);
    }
}

Interrupciones Estáticas

Usa interrupt_before o interrupt_after para puntos de pausa obligatorios:

let graph = StateGraph::with_channels(&["task", "plan", "result"])
    .add_node(planner_node)
    .add_node(executor_node)
    .add_edge(START, "planner")
    .add_edge("planner", "executor")
    .add_edge("executor", END)
    .compile()?
    .with_interrupt_before(&["executor"]);  // Always pause before execution

Ejecución en Streaming

Transmite eventos a medida que el grafo se ejecuta:

use futures::StreamExt;
use adk_graph::stream::StreamMode;

let stream = agent.stream(input, config, StreamMode::Updates);

while let Some(event) = stream.next().await {
    match event? {
        StreamEvent::NodeStart(name) => println!("Starting: {}", name),
        StreamEvent::Updates { node, updates } => {
            println!("{} updated state: {:?}", node, updates);
        }
        StreamEvent::NodeEnd(name) => println!("Completed: {}", name),
        StreamEvent::Done(state) => println!("Final state: {:?}", state),
        _ => {}
    }
}

Modos de Streaming

ModoDescripción
ValuesTransmite el estado completo después de cada nodo
UpdatesTransmite solo los cambios de estado
MessagesTransmite actualizaciones de tipo mensaje
DebugTransmite todos los eventos internos

Integración ADK

GraphAgent implementa el trait ADK Agent, por lo que funciona con:

  • Runner: Usar con adk-runner para ejecución estándar
  • Callbacks: Soporte completo para callbacks de antes/después
  • Sessions: Funciona con adk-session para el historial de conversación
  • Streaming: Devuelve un ADK EventStream
use adk_runner::Runner;

let graph_agent = GraphAgent::builder("workflow")
    .before_agent_callback(|ctx| async {
        println!("Starting graph execution for session: {}", ctx.session_id());
        Ok(())
    })
    .after_agent_callback(|ctx, event| async {
        if let Some(content) = event.content() {
            println!("Graph completed with content");
        }
        Ok(())
    })
    // ... graph definition
    .build()?;

// GraphAgent implements Agent trait - use with Launcher or Runner
// See adk-runner README for Runner configuration

Ejemplos

Todos los ejemplos utilizan integración real de LLM con AgentNode:

# Agentes LLM paralelos con callbacks de antes/después
cargo run --example graph_agent

# Pipeline secuencial multi-agente (extractor → analizador → formateador)
cargo run --example graph_workflow

# Clasificación de sentimiento basada en LLM y enrutamiento condicional
cargo run --example graph_conditional

# Patrón ReAct con tools y ejecución cíclica
cargo run --example graph_react

# Supervisor multi-agente enrutando a especialistas
cargo run --example graph_supervisor

# Humano en el bucle con interrupciones basadas en riesgo
cargo run --example graph_hitl

# Checkpointing y depuración con viaje en el tiempo
cargo run --example graph_checkpoint

Comparación con LangGraph

CaracterísticaLangGraphadk-graph
Gestión de EstadoTypedDict + ReducersStateSchema + Reducers
Modelo de EjecuciónPregel super-stepsPregel super-steps
CheckpointingMemory, SQLite, PostgresMemory, SQLite
Humano en el Bucleinterrupt_before/afterinterrupt_before/after + dynamic
Streaming5 modos5 modos
CiclosSoporte nativoSoporte nativo
Seguridad de TiposPython typingSistema de tipos de Rust
Integración de LLMLangChainAgentNode + ADK agents

Anterior: ← Sistemas Multi-Agente | Siguiente: Agentes en Tiempo Real →