Agentes de Grafo
Construya flujos de trabajo complejos y con estado utilizando orquestación al estilo LangGraph con integración nativa de ADK-Rust.
Resumen
GraphAgent le permite definir flujos de trabajo como grafos dirigidos con nodos y aristas, lo que permite:
- AgentNode: Envuelva los agentes LLM como nodos de grafo con mapeadores de entrada/salida personalizados
- Flujos de trabajo cíclicos: Soporte nativo para bucles y razonamiento iterativo (ReAct pattern)
- Enrutamiento condicional: Enrutamiento dinámico de aristas basado en el estado
- Gestión de estado: Estado tipado con reductores (sobrescribir, añadir, sumar, personalizado)
- Checkpointing: Estado persistente para tolerancia a fallos y asistencia humana
- Streaming: Múltiples modos de flujo (valores, actualizaciones, mensajes, depuración)
El crate adk-graph proporciona orquestación de flujos de trabajo al estilo LangGraph para construir flujos de trabajo de agente complejos y con estado. Aporta capacidades de flujo de trabajo basadas en grafos al ecosistema ADK-Rust manteniendo una compatibilidad total con el sistema de agentes de ADK.
Beneficios clave:
- Diseño visual de flujos de trabajo: Defina lógica compleja como grafos intuitivos de nodos y aristas
- Ejecución paralela: Múltiples nodos pueden ejecutarse simultáneamente para un mejor rendimiento
- Persistencia de estado: Checkpointing integrado para tolerancia a fallos y asistencia humana
- Integración de LLM: Soporte nativo para envolver agentes ADK como nodos de grafo
- Enrutamiento flexible: Aristas estáticas, enrutamiento condicional y toma de decisiones dinámica
Lo que construirás
En esta guía, crearás una Pipeline de Procesamiento de Texto que ejecuta la traducción y el resumen en paralelo:
┌─────────────────────┐
User Input │ │
────────────────▶ │ INICIO │
│ │
└──────────┬──────────┘
│
┌───────────────┴───────────────┐
│ │
▼ ▼
┌──────────────────┐ ┌──────────────────┐
│ TRADUCTOR │ │ RESUMIDOR │
│ │ │ │
│ 🇫🇷 Traducción │ │ 📝 Resumen │
│ al francés │ │ de una frase │
└─────────┬────────┘ └─────────┬────────┘
│ │
└───────────────┬───────────────┘
│
▼
┌─────────────────────┐
│ COMBINAR │
│ │
│ 📋 Unir Resultados │
└──────────┬──────────┘
│
▼
┌─────────────────────┐
│ FINAL │
│ │
│ ✅ Completado │
└─────────────────────┘
Conceptos Clave:
- Nodes - Unidades de procesamiento que realizan trabajo (agentes LlmAgent, funciones o lógica personalizada)
- Edges - Flujo de control entre nodes (conexiones estáticas o enrutamiento condicional)
- State - Datos compartidos que fluyen a través del graph y persisten entre nodes
- Parallel Execution - Múltiples nodes pueden ejecutarse simultáneamente para un mejor rendimiento
Comprendiendo los Componentes Principales
🔧 Nodes: Los Trabajadores Los Nodes son donde ocurre el trabajo real. Cada node puede:
- AgentNode: Envolver un LlmAgent para procesar lenguaje natural
- Function Node: Ejecutar código Rust personalizado para el procesamiento de datos
- Built-in Nodes: Usar lógica predefinida como contadores o validadores
Piensa en los nodes como trabajadores especializados en una línea de montaje: cada uno tiene un trabajo y una experiencia específicos.
🔀 Edges: El Control de Flujo Los Edges determinan cómo se mueve la ejecución a través de tu graph:
- Static Edges: Conexiones directas (
A → B → C) - Conditional Edges: Enrutamiento dinámico basado en State (
if sentiment == "positive" → positive_handler) - Parallel Edges: Múltiples rutas desde un node (
START → [translator, summarizer])
Los Edges son como señales de tráfico y señales de carretera que dirigen el flujo de trabajo.
💾 State: La Memoria Compartida State es un almacén de clave-valor que todos los nodes pueden leer y escribir:
- Input Data: Información inicial introducida en el graph
- Intermediate Results: La salida de un node se convierte en la entrada para otro
- Final Output: El resultado completado después de todo el procesamiento
State actúa como una pizarra compartida donde los nodes pueden dejar información para que otros la utilicen.
⚡ Parallel Execution: El Aumento de Velocidad Cuando múltiples edges salen de un node, esos nodes de destino se ejecutan simultáneamente:
- Faster Processing: Las tareas independientes se ejecutan al mismo tiempo
- Resource Efficiency: Mejor utilización de CPU y E/S
- Scalability: Maneja flujos de trabajo más complejos sin una desaceleración lineal
Esto es como tener varios trabajadores abordando diferentes partes de un trabajo simultáneamente en lugar de esperar en la fila.
Inicio Rápido
1. Crea Tu Proyecto
cargo new graph_demo
cd graph_demo
Añade dependencias a Cargo.toml:
[dependencies]
adk-graph = { version = "0.2", features = ["sqlite"] }
adk-agent = "0.2"
adk-model = "0.2"
adk-core = "0.2"
tokio = { version = "1", features = ["full"] }
dotenvy = "0.15"
serde_json = "1.0"
Crea .env con tu GOOGLE_API_KEY:
echo 'GOOGLE_API_KEY=your-api-key' > .env
2. Ejemplo de Procesamiento Paralelo
Aquí tienes un ejemplo completo y funcional que procesa texto en paralelo:
use adk_agent::LlmAgentBuilder;
use adk_graph::{
agent::GraphAgent,
edge::{END, START},
node::{AgentNode, ExecutionConfig, NodeOutput},
state::State,
};
use adk_model::GeminiModel;
use serde_json::json;
use std::sync::Arc;
#[tokio::main]
async fn main() -> anyhow::Result<()> {
dotenvy::dotenv().ok();
let api_key = std::env::var("GOOGLE_API_KEY")?;
let model = Arc::new(GeminiModel::new(&api_key, "gemini-2.0-flash")?);
// Create specialized LLM agents
let translator_agent = Arc::new(
LlmAgentBuilder::new("translator")
.description("Translates text to French")
.model(model.clone())
.instruction("Translate the input text to French. Only output the translation.")
.build()?,
);
let summarizer_agent = Arc::new(
LlmAgentBuilder::new("summarizer")
.description("Summarizes text")
.model(model.clone())
.instruction("Summarize the input text in one sentence.")
.build()?,
);
// Wrap agents as graph nodes with input/output mappers
let translator_node = AgentNode::new(translator_agent)
.with_input_mapper(|state| {
let text = state.get("input").and_then(|v| v.as_str()).unwrap_or("");
adk_core::Content::new("user").with_text(text)
})
.with_output_mapper(|events| {
let mut updates = std::collections::HashMap::new();
for event in events {
if let Some(content) = event.content() {
let text: String = content.parts.iter()
.filter_map(|p| p.text())
.collect::<Vec<_>>()
.join("");
if !text.is_empty() {
updates.insert("translation".to_string(), json!(text));
}
}
}
updates
});
let summarizer_node = AgentNode::new(summarizer_agent)
.with_input_mapper(|state| {
let text = state.get("input").and_then(|v| v.as_str()).unwrap_or("");
adk_core::Content::new("user").with_text(text)
})
.with_output_mapper(|events| {
let mut updates = std::collections::HashMap::new();
for event in events {
if let Some(content) = event.content() {
let text: String = content.parts.iter()
.filter_map(|p| p.text())
.collect::<Vec<_>>()
.join("");
if !text.is_empty() {
updates.insert("summary".to_string(), json!(text));
}
}
}
updates
});
// Build the graph with parallel execution
let agent = GraphAgent::builder("text_processor")
.description("Processes text with translation and summarization in parallel")
.channels(&["input", "translation", "summary", "result"])
.node(translator_node)
.node(summarizer_node)
.node_fn("combine", |ctx| async move {
let translation = ctx.get("translation").and_then(|v| v.as_str()).unwrap_or("N/A");
let summary = ctx.get("summary").and_then(|v| v.as_str()).unwrap_or("N/A");
let result = format!(
"=== Processing Complete ===\n\n\
French Translation:\n{}\n\n\
Summary:\n{}",
translation, summary
);
Ok(NodeOutput::new().with_update("result", json!(result)))
})
// Parallel execution: both nodes start simultaneously
.edge(START, "translator")
.edge(START, "summarizer")
.edge("translator", "combine")
.edge("summarizer", "combine")
.edge("combine", END)
.build()?;
// Execute the graph
let mut input = State::new();
input.insert("input".to_string(), json!("AI is transforming how we work and live."));
let result = agent.invoke(input, ExecutionConfig::new("thread-1")).await?;
println!("{}", result.get("result").and_then(|v| v.as_str()).unwrap_or(""));
Ok(())
}
Salida de Ejemplo:
=== Processing Complete ===
French Translation:
L'IA transforme notre façon de travailler et de vivre.
Summary:
AI is revolutionizing work and daily life through technological transformation.
Cómo funciona la ejecución de grafos
La visión general
Los Agent de Graph se ejecutan en super-pasos: todos los nodos listos se ejecutan en paralelo, luego el grafo espera a que todos se completen antes del siguiente paso:
Step 1: START ──┬──▶ translator (running)
└──▶ summarizer (running)
⏳ Esperar a que ambos se completen...
Step 2: translator ──┬──▶ combine (running)
summarizer ──┘
⏳ Esperar a que combine se complete...
Step 3: combine ──▶ END ✅
Flujo de estado a través de los nodos
Cada nodo puede leer y escribir en el estado compartido:
┌─────────────────────────────────────────────────────────────────────┐
│ PASO 1: Estado inicial │
├─────────────────────────────────────────────────────────────────────┤
│ │
│ State: { "input": "AI is transforming how we work" } │
│ │
│ ↓ │
│ │
│ ┌──────────────────┐ ┌──────────────────┐ │
│ │ translator │ │ summarizer │ │
│ │ lee "input" │ │ lee "input" │ │
│ └──────────────────┘ └──────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────────┘
↓
┌─────────────────────────────────────────────────────────────────────┐
│ PASO 2: Después de la ejecución paralela │
├─────────────────────────────────────────────────────────────────────┤
│ │
│ State: { │
│ "input": "AI is transforming how we work", │
│ "translation": "L'IA transforme notre façon de travailler", │
│ "summary": "AI is revolutionizing work through technology" │
│ } │
│ │
│ ↓ │
│ │
│ ┌──────────────────────────────────────┐ │
│ │ combine │ │
│ │ lee "translation" + "summary" │ │
│ │ escribe "result" │ │
│ └──────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────────┘
↓
┌─────────────────────────────────────────────────────────────────────┐
│ PASO 3: Estado final │
├─────────────────────────────────────────────────────────────────────┤
│ │
│ State: { │
│ "input": "AI is transforming how we work", │
│ "translation": "L'IA transforme notre façon de travailler", │
│ "summary": "AI is revolutionizing work through technology", │
│ "result": "=== Processing Complete ===\n\nFrench..." │
│ } │
│ │
└─────────────────────────────────────────────────────────────────────┘
Qué lo hace funcionar
| Componente | Rol |
|---|---|
AgentNode | Envuelve los LlmAgent con mapeadores de entrada/salida |
input_mapper | Transforma el estado → entrada de LlmAgent Content |
output_mapper | Transforma los eventos del LlmAgent → actualizaciones de estado |
channels | Declara los campos de estado que usará el grafo |
edge() | Define el flujo de ejecución entre nodos |
ExecutionConfig | Proporciona el ID de hilo para el checkpointing |
Enrutamiento Condicional con Clasificación LLM
Construye sistemas de enrutamiento inteligentes donde los LLM decidan la ruta de ejecución:
Visual: Enrutamiento Basado en el Sentimiento
┌─────────────────────┐
Comentarios del │ │
Usuario ────────────────▶ │ CLASIFICADOR │
│ 🧠 Analizar tono │
└──────────┬──────────┘
│
┌───────────────┼───────────────┐
│ │ │
▼ ▼ ▼
┌──────────────────┐ ┌──────────────────┐ ┌──────────────────┐
│ POSITIVO │ │ NEGATIVE │ │ NEUTRO │
│ │ │ │ │ │
│ 😊 ¡Gracias! │ │ 😔 Disculparse │ │ 😐 Preguntar más│
│ Celebrar │ │ Ayudar a solucionar│ │ preguntas │
└──────────────────┘ └──────────────────┘ └──────────────────┘
Código de Ejemplo Completo
use adk_agent::LlmAgentBuilder;
use adk_graph::{
edge::{END, Router, START},
graph::StateGraph,
node::{AgentNode, ExecutionConfig},
state::State,
};
use adk_model::GeminiModel;
use serde_json::json;
use std::sync::Arc;
#[tokio::main]
async fn main() -> anyhow::Result<()> {
dotenvy::dotenv().ok();
let api_key = std::env::var("GOOGLE_API_KEY")?;
let model = Arc::new(GeminiModel::new(&api_key, "gemini-2.0-flash")?);
// Create classifier agent
let classifier_agent = Arc::new(
LlmAgentBuilder::new("classifier")
.description("Classifies text sentiment")
.model(model.clone())
.instruction(
"You are a sentiment classifier. Analyze the input text and respond with \
ONLY one word: 'positive', 'negative', or 'neutral'. Nothing else.",
)
.build()?,
);
// Create response agents for each sentiment
let positive_agent = Arc::new(
LlmAgentBuilder::new("positive")
.description("Handles positive feedback")
.model(model.clone())
.instruction(
"You are a customer success specialist. The customer has positive feedback. \
Express gratitude, reinforce the positive experience, and suggest ways to \
share their experience. Be warm and appreciative. Keep response under 3 sentences.",
)
.build()?,
);
let negative_agent = Arc::new(
LlmAgentBuilder::new("negative")
.description("Handles negative feedback")
.model(model.clone())
.instruction(
"You are a customer support specialist. The customer has a complaint. \
Acknowledge their frustration, apologize sincerely, and offer help. \
Be empathetic. Keep response under 3 sentences.",
)
.build()?,
);
let neutral_agent = Arc::new(
LlmAgentBuilder::new("neutral")
.description("Handles neutral feedback")
.model(model.clone())
.instruction(
"You are a customer service representative. The customer has neutral feedback. \
Ask clarifying questions to better understand their needs. Be helpful and curious. \
Keep response under 3 sentences.",
)
.build()?,
);
// Create AgentNodes with mappers
let classifier_node = AgentNode::new(classifier_agent)
.with_input_mapper(|state| {
let text = state.get("feedback").and_then(|v| v.as_str()).unwrap_or("");
adk_core::Content::new("user").with_text(text)
})
.with_output_mapper(|events| {
let mut updates = std::collections::HashMap::new();
for event in events {
if let Some(content) = event.content() {
let text: String = content.parts.iter()
.filter_map(|p| p.text())
.collect::<Vec<_>>()
.join("")
.to_lowercase()
.trim()
.to_string();
let sentiment = if text.contains("positive") { "positive" }
else if text.contains("negative") { "negative" }
else { "neutral" };
updates.insert("sentiment".to_string(), json!(sentiment));
}
}
updates
});
// Response nodes (similar pattern for each)
let positive_node = AgentNode::new(positive_agent)
.with_input_mapper(|state| {
let text = state.get("feedback").and_then(|v| v.as_str()).unwrap_or("");
adk_core::Content::new("user").with_text(text)
})
.with_output_mapper(|events| {
let mut updates = std::collections::HashMap::new();
for event in events {
if let Some(content) = event.content() {
let text: String = content.parts.iter()
.filter_map(|p| p.text())
.collect::<Vec<_>>()
.join("");
updates.insert("response".to_string(), json!(text));
}
}
updates
});
// Build graph with conditional routing
let graph = StateGraph::with_channels(&["feedback", "sentiment", "response"])
.add_node(classifier_node)
.add_node(positive_node)
// ... add negative_node and neutral_node similarly
.add_edge(START, "classifier")
.add_conditional_edges(
"classifier",
Router::by_field("sentiment"), // Route based on sentiment field
[
("positive", "positive"),
("negative", "negative"),
("neutral", "neutral"),
],
)
.add_edge("positive", END)
.add_edge("negative", END)
.add_edge("neutral", END)
.compile()?;
// Test with different feedback
let mut input = State::new();
input.insert("feedback".to_string(), json!("Your product is amazing! I love it!"));
let result = graph.invoke(input, ExecutionConfig::new("feedback-1")).await?;
println!("Sentiment: {}", result.get("sentiment").and_then(|v| v.as_str()).unwrap_or(""));
println!("Response: {}", result.get("response").and_then(|v| v.as_str()).unwrap_or(""));
Ok(())
}
Flujo de Ejemplo:
Input: "Your product is amazing! I love it!"
↓
Classifier: "positive"
↓
Agente Positivo: "¡Muchas gracias por los maravillosos comentarios!
Estamos encantados de que te encante nuestro producto.
¿Considerarías dejar una reseña para ayudar a otros?"
Patrón ReAct: Razonamiento + Actuación
Construye agents que pueden usar tools de forma iterativa para resolver problemas complejos:
Visual: Ciclo ReAct
┌─────────────────────┐
Pregunta del Usuario │ │
────────────────▶ │ REASONER │
│ 🧠 Piensa + Actúa │
└──────────┬──────────┘
│
▼
┌─────────────────────┐
│ ¿Hay llamadas a tool? │
│ │
└──────────┬──────────┘
│
┌───────────────┴───────────────┐
│ │
▼ ▼
┌──────────────────┐ ┌──────────────────┐
│ SÍ │ │ NO │
│ │ │ │
│ 🔄 Vuelve al reasoner │ │ ✅ Respuesta final FIN │
│ │ │ │
└─────────┬────────┘ └──────────────────┘
│
└─────────────────┐
│
▼
┌─────────────────────┐
│ REASONER │
│ 🧠 Piensa + Actúa │
│ (siguiente iteración) │
└─────────────────────┘
Ejemplo Completo de ReAct
use adk_agent::LlmAgentBuilder;
use adk_core::{Part, Tool};
use adk_graph::{
edge::{END, START},
graph::StateGraph,
node::{AgentNode, ExecutionConfig, NodeOutput},
state::State,
};
use adk_model::GeminiModel;
use adk_tool::FunctionTool;
use serde_json::json;
use std::sync::Arc;
#[tokio::main]
async fn main() -> anyhow::Result<()> {
dotenvy::dotenv().ok();
let api_key = std::env::var("GOOGLE_API_KEY")?;
let model = Arc::new(GeminiModel::new(&api_key, "gemini-2.0-flash")?);
// Create tools
let weather_tool = Arc::new(FunctionTool::new(
"get_weather",
"Get the current weather for a location. Takes a 'location' parameter (city name).",
|_ctx, args| async move {
let location = args.get("location").and_then(|v| v.as_str()).unwrap_or("unknown");
Ok(json!({
"location": location,
"temperature": "72°F",
"condition": "Sunny",
"humidity": "45%"
}))
},
)) as Arc<dyn Tool>;
let calculator_tool = Arc::new(FunctionTool::new(
"calculator",
"Perform mathematical calculations. Takes an 'expression' parameter (string).",
|_ctx, args| async move {
let expr = args.get("expression").and_then(|v| v.as_str()).unwrap_or("0");
let result = match expr {
"2 + 2" => "4",
"10 * 5" => "50",
"100 / 4" => "25",
"15 - 7" => "8",
_ => "Unable to evaluate",
};
Ok(json!({ "result": result, "expression": expr }))
},
)) as Arc<dyn Tool>;
// Create reasoner agent with tools
let reasoner_agent = Arc::new(
LlmAgentBuilder::new("reasoner")
.description("Reasoning agent with tools")
.model(model.clone())
.instruction(
"You are a helpful assistant with access to tools. Use tools when needed to answer questions. \
When you have enough information, provide a final answer without using more tools.",
)
.tool(weather_tool)
.tool(calculator_tool)
.build()?,
);
// Create reasoner node that detects tool usage
let reasoner_node = AgentNode::new(reasoner_agent)
.with_input_mapper(|state| {
let question = state.get("question").and_then(|v| v.as_str()).unwrap_or("");
adk_core::Content::new("user").with_text(question)
})
.with_output_mapper(|events| {
let mut updates = std::collections::HashMap::new();
let mut has_tool_calls = false;
let mut response = String::new();
for event in events {
if let Some(content) = event.content() {
for part in &content.parts {
match part {
Part::FunctionCall { .. } => {
has_tool_calls = true;
}
Part::Text { text } => {
response.push_str(text);
}
_ => {}
}
}
}
}
updates.insert("has_tool_calls".to_string(), json!(has_tool_calls));
updates.insert("response".to_string(), json!(response));
updates
});
// Build ReAct graph with cycle
let graph = StateGraph::with_channels(&["question", "has_tool_calls", "response", "iteration"])
.add_node(reasoner_node)
.add_node_fn("counter", |ctx| async move {
let i = ctx.get("iteration").and_then(|v| v.as_i64()).unwrap_or(0);
Ok(NodeOutput::new().with_update("iteration", json!(i + 1)))
})
.add_edge(START, "counter")
.add_edge("counter", "reasoner")
.add_conditional_edges(
"reasoner",
|state| {
let has_tools = state.get("has_tool_calls").and_then(|v| v.as_bool()).unwrap_or(false);
let iteration = state.get("iteration").and_then(|v| v.as_i64()).unwrap_or(0);
// Safety limit
if iteration >= 5 { return END.to_string(); }
if has_tools {
"counter".to_string() // Loop back for more reasoning
} else {
END.to_string() // Done - final answer
}
},
[("counter", "counter"), (END, END)],
)
.compile()?
.with_recursion_limit(10);
// Test the ReAct agent
let mut input = State::new();
input.insert("question".to_string(), json!("What's the weather in Paris and what's 15 + 25?"));
let result = graph.invoke(input, ExecutionConfig::new("react-1")).await?;
println!("Final answer: {}", result.get("response").and_then(|v| v.as_str()).unwrap_or(""));
println!("Iterations: {}", result.get("iteration").and_then(|v| v.as_i64()).unwrap_or(0));
Ok(())
}
Flujo de Ejemplo:
Question: "What's the weather in Paris and what's 15 + 25?"
Iteración 1:
Reasoner: "Necesito obtener información del clima y hacer cálculos matemáticos"
→ Llama a get_weather(location="Paris") y calculator(expression="15 + 25")
→ has_tool_calls = true → Vuelve al bucle
Iteración 2:
Reasoner: "Basado en los resultados: París está a 72°F y soleado, 15 + 25 = 40"
→ No hay llamadas a tool → has_tool_calls = false → END
Respuesta Final: "El clima en París es de 72°F y soleado con 45% de humedad.
Y 15 + 25 es igual a 40."
AgentNode
Envuelve cualquier Agent de ADK (típicamente LlmAgent) como un nodo de grafo:
let node = AgentNode::new(llm_agent)
.with_input_mapper(|state| {
// Transform graph state to agent input Content
let text = state.get("input").and_then(|v| v.as_str()).unwrap_or("");
adk_core::Content::new("user").with_text(text)
})
.with_output_mapper(|events| {
// Transform agent events to state updates
let mut updates = std::collections::HashMap::new();
for event in events {
if let Some(content) = event.content() {
let text: String = content.parts.iter()
.filter_map(|p| p.text())
.collect::<Vec<_>>()
.join("");
updates.insert("output".to_string(), json!(text));
}
}
updates
});
Nodos de Function
Simples funciones async que procesan el state:
.node_fn("process", |ctx| async move {
let input = ctx.state.get("input").unwrap();
let output = process_data(input).await?;
Ok(NodeOutput::new().with_update("output", output))
})
Tipos de Aristas
Aristas Estáticas
Conexiones directas entre nodos:
.edge(START, "first_node")
.edge("first_node", "second_node")
.edge("second_node", END)
Aristas Condicionales
Enrutamiento dinámico basado en el estado:
.conditional_edge(
"router",
|state| {
match state.get("next").and_then(|v| v.as_str()) {
Some("research") => "research_node".to_string(),
Some("write") => "write_node".to_string(),
_ => END.to_string(),
}
},
[
("research_node", "research_node"),
("write_node", "write_node"),
(END, END),
],
)
Ayudantes de Router
Utilice routers incorporados para patrones comunes:
use adk_graph::edge::Router;
// Route based on a state field value
.conditional_edge("classifier", Router::by_field("sentiment"), [
("positive", "positive_handler"),
("negative", "negative_handler"),
("neutral", "neutral_handler"),
])
// Route based on boolean field
.conditional_edge("check", Router::by_bool("approved"), [
("true", "execute"),
("false", "reject"),
])
// Limit iterations
.conditional_edge("loop", Router::max_iterations("count", 5), [
("continue", "process"),
("done", END),
])
Ejecución Paralela
Múltiples aristas desde un único nodo se ejecutan en paralelo:
let agent = GraphAgent::builder("parallel_processor")
.channels(&["input", "translation", "summary", "analysis"])
.node(translator_node)
.node(summarizer_node)
.node(analyzer_node)
.node(combiner_node)
// All three start simultaneously
.edge(START, "translator")
.edge(START, "summarizer")
.edge(START, "analyzer")
// Wait for all to complete before combining
.edge("translator", "combiner")
.edge("summarizer", "combiner")
.edge("analyzer", "combiner")
.edge("combiner", END)
.build()?;
Gráficos Cíclicos (Patrón ReAct)
Construye agentes de razonamiento iterativo con ciclos:
use adk_core::Part;
// Create agent with tools
let reasoner = Arc::new(
LlmAgentBuilder::new("reasoner")
.model(model)
.instruction("Use tools to answer questions. Provide final answer when done.")
.tool(search_tool)
.tool(calculator_tool)
.build()?
);
let reasoner_node = AgentNode::new(reasoner)
.with_input_mapper(|state| {
let question = state.get("question").and_then(|v| v.as_str()).unwrap_or("");
adk_core::Content::new("user").with_text(question)
})
.with_output_mapper(|events| {
let mut updates = std::collections::HashMap::new();
let mut has_tool_calls = false;
let mut response = String::new();
for event in events {
if let Some(content) = event.content() {
for part in &content.parts {
match part {
Part::FunctionCall { name, .. } => {
has_tool_calls = true;
}
Part::Text { text } => {
response.push_str(text);
}
_ => {}
}
}
}
}
updates.insert("has_tool_calls".to_string(), json!(has_tool_calls));
updates.insert("response".to_string(), json!(response));
updates
});
// Build graph with cycle
let react_agent = StateGraph::with_channels(&["question", "has_tool_calls", "response", "iteration"])
.add_node(reasoner_node)
.add_node_fn("counter", |ctx| async move {
let i = ctx.get("iteration").and_then(|v| v.as_i64()).unwrap_or(0);
Ok(NodeOutput::new().with_update("iteration", json!(i + 1)))
})
.add_edge(START, "counter")
.add_edge("counter", "reasoner")
.add_conditional_edges(
"reasoner",
|state| {
let has_tools = state.get("has_tool_calls").and_then(|v| v.as_bool()).unwrap_or(false);
let iteration = state.get("iteration").and_then(|v| v.as_i64()).unwrap_or(0);
// Safety limit
if iteration >= 5 { return END.to_string(); }
if has_tools {
"counter".to_string() // Loop back
} else {
END.to_string() // Done
}
},
[("counter", "counter"), (END, END)],
)
.compile()?
.with_recursion_limit(10);
Supervisor Multi-Agente
Dirige tareas a agentes especialistas:
// Crear agente supervisor
let supervisor = Arc::new(
LlmAgentBuilder::new("supervisor")
.model(model.clone())
.instruction("Route tasks to: researcher, writer, or coder. Reply with agent name only.")
.build()?
);
let supervisor_node = AgentNode::new(supervisor)
.with_output_mapper(|events| {
let mut updates = std::collections::HashMap::new();
for event in events {
if let Some(content) = event.content() {
let text: String = content.parts.iter()
.filter_map(|p| p.text())
.collect::<Vec<_>>()
.join("")
.to_lowercase();
let next = if text.contains("researcher") { "researcher" }
else if text.contains("writer") { "writer" }
else if text.contains("coder") { "coder" }
else { "done" };
updates.insert("next_agent".to_string(), json!(next));
}
}
updates
});
// Construir grafo supervisor
let graph = StateGraph::with_channels(&["task", "next_agent", "research", "content", "code"])
.add_node(supervisor_node)
.add_node(researcher_node)
.add_node(writer_node)
.add_node(coder_node)
.add_edge(START, "supervisor")
.add_conditional_edges(
"supervisor",
Router::by_field("next_agent"),
[
("researcher", "researcher"),
("writer", "writer"),
("coder", "coder"),
("done", END),
],
)
// Los agentes reportan al supervisor
.add_edge("researcher", "supervisor")
.add_edge("writer", "supervisor")
.add_edge("coder", "supervisor")
.compile()?;
Gestión de Estado
Esquema de Estado con Reductores
Controla cómo se fusionan las actualizaciones de estado:
let schema = StateSchema::builder()
.channel("current_step") // Sobreescribir (por defecto)
.list_channel("messages") // Añadir a la lista
.channel_with_reducer("count", Reducer::Sum) // Sumar valores
.channel_with_reducer("data", Reducer::Custom(Arc::new(|old, new| {
// Lógica de fusión personalizada
merge_json(old, new)
})))
.build();
let agent = GraphAgent::builder("stateful")
.state_schema(schema)
// ... nodos y aristas
.build()?;
Tipos de Reductor
| Reductor | Comportamiento |
|---|---|
Overwrite | Reemplaza el valor antiguo con el nuevo (por defecto) |
Append | Añade a la lista |
Sum | Suma valores numéricos |
Custom | Función de fusión personalizada |
Puntos de Control
Habilita el estado persistente para la tolerancia a fallos y la intervención humana:
En Memoria (Desarrollo)
use adk_graph::checkpoint::MemoryCheckpointer;
let checkpointer = Arc::new(MemoryCheckpointer::new());
let graph = StateGraph::with_channels(&["task", "result"])
// ... nodos y aristas
.compile()?
.with_checkpointer_arc(checkpointer.clone());
SQLite (Producción)
use adk_graph::checkpoint::SqliteCheckpointer;
let checkpointer = SqliteCheckpointer::new("checkpoints.db").await?;
let graph = StateGraph::with_channels(&["task", "result"])
// ... nodos y aristas
.compile()?
.with_checkpointer(checkpointer);
Historial de Puntos de Control (Viaje en el Tiempo)
// Listar todos los puntos de control para un hilo
let checkpoints = checkpointer.list("thread-id").await?;
for cp in checkpoints {
println!("Paso {}: {:?}", cp.step, cp.state.get("status"));
}
// Cargar un punto de control específico
if let Some(checkpoint) = checkpointer.load_by_id(&checkpoint_id).await? {
println!("Estado en el paso {}: {:?}", checkpoint.step, checkpoint.state);
}
Human-in-the-Loop
Pausa la ejecución para la aprobación humana utilizando interrupciones dinámicas:
use adk_graph::{error::GraphError, node::NodeOutput};
// Planner agent assesses risk
let planner_node = AgentNode::new(planner_agent)
.with_output_mapper(|events| {
let mut updates = std::collections::HashMap::new();
for event in events {
if let Some(content) = event.content() {
let text: String = content.parts.iter()
.filter_map(|p| p.text())
.collect::<Vec<_>>()
.join("");
// Extract risk level from LLM response
let risk = if text.to_lowercase().contains("risk: high") { "high" }
else if text.to_lowercase().contains("risk: medium") { "medium" }
else { "low" };
updates.insert("plan".to_string(), json!(text));
updates.insert("risk_level".to_string(), json!(risk));
}
}
updates
});
// Review node with dynamic interrupt
let graph = StateGraph::with_channels(&["task", "plan", "risk_level", "approved", "result"])
.add_node(planner_node)
.add_node(executor_node)
.add_node_fn("review", |ctx| async move {
let risk = ctx.get("risk_level").and_then(|v| v.as_str()).unwrap_or("low");
let approved = ctx.get("approved").and_then(|v| v.as_bool());
// Already approved - continue
if approved == Some(true) {
return Ok(NodeOutput::new());
}
// High/medium risk - interrupt for approval
if risk == "high" || risk == "medium" {
return Ok(NodeOutput::interrupt_with_data(
&format!("{} RISK: Human approval required", risk.to_uppercase()),
json!({
"plan": ctx.get("plan"),
"risk_level": risk,
"action": "Set 'approved' to true to continue"
})
));
}
// Low risk - auto-approve
Ok(NodeOutput::new().with_update("approved", json!(true)))
})
.add_edge(START, "planner")
.add_edge("planner", "review")
.add_edge("review", "executor")
.add_edge("executor", END)
.compile()?
.with_checkpointer_arc(checkpointer.clone());
// Execute - may pause for approval
let thread_id = "task-001";
let result = graph.invoke(input, ExecutionConfig::new(thread_id)).await;
match result {
Err(GraphError::Interrupted(interrupt)) => {
println!("*** EXECUTION PAUSED ***");
println!("Reason: {}", interrupt.interrupt);
println!("Plan awaiting approval: {:?}", interrupt.state.get("plan"));
// Human reviews and approves...
// Update state with approval
graph.update_state(thread_id, [("approved".to_string(), json!(true))]).await?;
// Resume execution
let final_result = graph.invoke(State::new(), ExecutionConfig::new(thread_id)).await?;
println!("Final result: {:?}", final_result.get("result"));
}
Ok(result) => {
println!("Completed without interrupt: {:?}", result);
}
Err(e) => {
println!("Error: {}", e);
}
}
Interrupciones Estáticas
Usa interrupt_before o interrupt_after para puntos de pausa obligatorios:
let graph = StateGraph::with_channels(&["task", "plan", "result"])
.add_node(planner_node)
.add_node(executor_node)
.add_edge(START, "planner")
.add_edge("planner", "executor")
.add_edge("executor", END)
.compile()?
.with_interrupt_before(&["executor"]); // Always pause before execution
Ejecución en Streaming
Transmite eventos a medida que el grafo se ejecuta:
use futures::StreamExt;
use adk_graph::stream::StreamMode;
let stream = agent.stream(input, config, StreamMode::Updates);
while let Some(event) = stream.next().await {
match event? {
StreamEvent::NodeStart(name) => println!("Starting: {}", name),
StreamEvent::Updates { node, updates } => {
println!("{} updated state: {:?}", node, updates);
}
StreamEvent::NodeEnd(name) => println!("Completed: {}", name),
StreamEvent::Done(state) => println!("Final state: {:?}", state),
_ => {}
}
}
Modos de Streaming
| Modo | Descripción |
|---|---|
Values | Transmite el estado completo después de cada nodo |
Updates | Transmite solo los cambios de estado |
Messages | Transmite actualizaciones de tipo mensaje |
Debug | Transmite todos los eventos internos |
Integración ADK
GraphAgent implementa el trait ADK Agent, por lo que funciona con:
- Runner: Usar con
adk-runnerpara ejecución estándar - Callbacks: Soporte completo para callbacks de antes/después
- Sessions: Funciona con
adk-sessionpara el historial de conversación - Streaming: Devuelve un ADK
EventStream
use adk_runner::Runner;
let graph_agent = GraphAgent::builder("workflow")
.before_agent_callback(|ctx| async {
println!("Starting graph execution for session: {}", ctx.session_id());
Ok(())
})
.after_agent_callback(|ctx, event| async {
if let Some(content) = event.content() {
println!("Graph completed with content");
}
Ok(())
})
// ... graph definition
.build()?;
// GraphAgent implements Agent trait - use with Launcher or Runner
// See adk-runner README for Runner configuration
Ejemplos
Todos los ejemplos utilizan integración real de LLM con AgentNode:
# Agentes LLM paralelos con callbacks de antes/después
cargo run --example graph_agent
# Pipeline secuencial multi-agente (extractor → analizador → formateador)
cargo run --example graph_workflow
# Clasificación de sentimiento basada en LLM y enrutamiento condicional
cargo run --example graph_conditional
# Patrón ReAct con tools y ejecución cíclica
cargo run --example graph_react
# Supervisor multi-agente enrutando a especialistas
cargo run --example graph_supervisor
# Humano en el bucle con interrupciones basadas en riesgo
cargo run --example graph_hitl
# Checkpointing y depuración con viaje en el tiempo
cargo run --example graph_checkpoint
Comparación con LangGraph
| Característica | LangGraph | adk-graph |
|---|---|---|
| Gestión de Estado | TypedDict + Reducers | StateSchema + Reducers |
| Modelo de Ejecución | Pregel super-steps | Pregel super-steps |
| Checkpointing | Memory, SQLite, Postgres | Memory, SQLite |
| Humano en el Bucle | interrupt_before/after | interrupt_before/after + dynamic |
| Streaming | 5 modos | 5 modos |
| Ciclos | Soporte nativo | Soporte nativo |
| Seguridad de Tipos | Python typing | Sistema de tipos de Rust |
| Integración de LLM | LangChain | AgentNode + ADK agents |
Anterior: ← Sistemas Multi-Agente | Siguiente: Agentes en Tiempo Real →