Graph Agents
Construa fluxos de trabalho complexos e com estado usando orquestração estilo LangGraph com integração nativa ADK-Rust.
Overview
GraphAgent permite que você defina fluxos de trabalho como grafos direcionados com nodes e edges, suportando:
- AgentNode: Empacote agentes LLM como nodes de grafo com mappers de entrada/saída personalizados
- Cyclic Workflows: Suporte nativo para loops e raciocínio iterativo (padrão ReAct)
- Conditional Routing: Roteamento de edge dinâmico baseado em estado
- State Management: Estado tipado com reducers (sobrescrever, anexar, somar, personalizado)
- Checkpointing: Estado persistente para tolerância a falhas e human-in-the-loop
- Streaming: Múltiplos modos de stream (values, updates, messages, debug)
A crate adk-graph fornece orquestração de fluxo de trabalho estilo LangGraph para construir fluxos de trabalho de agentes complexos e com estado. Ela traz capacidades de fluxo de trabalho baseadas em grafo para o ecossistema ADK-Rust, mantendo compatibilidade total com o sistema de agentes do ADK.
Key Benefits:
- Visual Workflow Design: Defina lógica complexa como grafos intuitivos de node e edge
- Parallel Execution: Múltiplos nodes podem ser executados simultaneamente para melhor performance
- State Persistence: Checkpointing integrado para tolerância a falhas e human-in-the-loop
- LLM Integration: Suporte nativo para empacotar agentes ADK como nodes de grafo
- Flexible Routing: Edges estáticos, roteamento condicional e tomada de decisão dinâmica
O Que Você Irá Construir
Neste guia, você criará um Pipeline de Processamento de Texto que executa tradução e sumarização em paralelo:
┌─────────────────────┐
User Input │ │
────────────────▶ │ START │
│ │
└──────────┬──────────┘
│
┌───────────────┴───────────────┐
│ │
▼ ▼
┌──────────────────┐ ┌──────────────────┐
│ TRANSLATOR │ │ SUMMARIZER │
│ │ │ │
│ 🇫🇷 French │ │ 📝 One sentence │
│ Translation │ │ Summary │
└─────────┬────────┘ └─────────┬────────┘
│ │
└───────────────┬───────────────┘
│
▼
┌─────────────────────┐
│ COMBINE │
│ │
│ 📋 Merge Results │
└──────────┬──────────┘
│
▼
┌─────────────────────┐
│ END │
│ │
│ ✅ Complete │
└─────────────────────┘
Conceitos Chave:
- Nodes - Unidades de processamento que realizam trabalho (agents LLM, funções ou lógica personalizada)
- Edges - Fluxo de controle entre nodes (conexões estáticas ou roteamento condicional)
- State - Dados compartilhados que fluem através do grafo e persistem entre nodes
- Execução Paralela - Múltiplos nodes podem ser executados simultaneamente para melhor desempenho
Entendendo os Componentes Principais
🔧 Nodes: Os Trabalhadores Os Nodes são onde o trabalho real acontece. Cada node pode:
- AgentNode: Envolve um agent LLM para processar linguagem natural
- Function Node: Executa código Rust personalizado para processamento de dados
- Built-in Nodes: Usam lógica predefinida como contadores ou validadores
Pense nos nodes como trabalhadores especializados em uma linha de montagem - cada um tem um trabalho e uma experiência específicos.
🔀 Edges: O Controle de Fluxo Os Edges determinam como a execução se move através do seu grafo:
- Static Edges: Conexões diretas (
A → B → C) - Conditional Edges: Roteamento dinâmico baseado em state (
if sentiment == "positive" → positive_handler) - Parallel Edges: Múltiplos caminhos a partir de um node (
START → [translator, summarizer])
Os Edges são como semáforos e sinais de trânsito que direcionam o fluxo de trabalho.
💾 State: A Memória Compartilhada State é um armazenamento de chave-valor que todos os nodes podem ler e escrever:
- Dados de Entrada: Informações iniciais alimentadas no grafo
- Resultados Intermediários: A saída de um node se torna a entrada para outro
- Saída Final: O resultado completo após todo o processamento
State age como um quadro branco compartilhado onde os nodes podem deixar informações para que outros as usem.
⚡ Execução Paralela: O Aumento de Velocidade Quando múltiplos edges saem de um node, esses nodes de destino são executados simultaneamente:
- Processamento Mais Rápido: Tarefas independentes são executadas ao mesmo tempo
- Eficiência de Recursos: Melhor utilização da CPU e I/O
- Escalabilidade: Lida com fluxos de trabalho mais complexos sem desaceleração linear
Isso é como ter múltiplos trabalhadores abordando diferentes partes de um trabalho simultaneamente em vez de esperar em fila.
Início Rápido
1. Crie Seu Projeto
cargo new graph_demo
cd graph_demo
Adicione as dependências ao Cargo.toml:
[dependencies]
adk-graph = { version = "0.2", features = ["sqlite"] }
adk-agent = "0.2"
adk-model = "0.2"
adk-core = "0.2"
tokio = { version = "1", features = ["full"] }
dotenvy = "0.15"
serde_json = "1.0"
Crie o .env com sua chave de API:
echo 'GOOGLE_API_KEY=your-api-key' > .env
2. Exemplo de Processamento Paralelo
Aqui está um exemplo completo e funcional que processa texto em paralelo:
use adk_agent::LlmAgentBuilder;
use adk_graph::{
agent::GraphAgent,
edge::{END, START},
node::{AgentNode, ExecutionConfig, NodeOutput},
state::State,
};
use adk_model::GeminiModel;
use serde_json::json;
use std::sync::Arc;
#[tokio::main]
async fn main() -> anyhow::Result<()> {
dotenvy::dotenv().ok();
let api_key = std::env::var("GOOGLE_API_KEY")?;
let model = Arc::new(GeminiModel::new(&api_key, "gemini-2.0-flash")?);
// Crie agentes LlmAgent especializados
let translator_agent = Arc::new(
LlmAgentBuilder::new("translator")
.description("Traduz texto para francês")
.model(model.clone())
.instruction("Traduza o texto de entrada para francês. Apenas mostre a tradução.")
.build()?,
);
let summarizer_agent = Arc::new(
LlmAgentBuilder::new("summarizer")
.description("Resume texto")
.model(model.clone())
.instruction("Resuma o texto de entrada em uma frase.")
.build()?,
);
// Empacote agentes como nós de grafo com mappers de entrada/saída
let translator_node = AgentNode::new(translator_agent)
.with_input_mapper(|state| {
let text = state.get("input").and_then(|v| v.as_str()).unwrap_or("");
adk_core::Content::new("user").with_text(text)
})
.with_output_mapper(|events| {
let mut updates = std::collections::HashMap::new();
for event in events {
if let Some(content) = event.content() {
let text: String = content.parts.iter()
.filter_map(|p| p.text())
.collect::<Vec<_>>()
.join("");
if !text.is_empty() {
updates.insert("translation".to_string(), json!(text));
}
}
}
updates
});
let summarizer_node = AgentNode::new(summarizer_agent)
.with_input_mapper(|state| {
let text = state.get("input").and_then(|v| v.as_str()).unwrap_or("");
adk_core::Content::new("user").with_text(text)
})
.with_output_mapper(|events| {
let mut updates = std::collections::HashMap::new();
for event in events {
if let Some(content) = event.content() {
let text: String = content.parts.iter()
.filter_map(|p| p.text())
.collect::<Vec<_>>()
.join("");
if !text.is_empty() {
updates.insert("summary".to_string(), json!(text));
}
}
}
updates
});
// Construa o grafo com execução paralela
let agent = GraphAgent::builder("text_processor")
.description("Processa texto com tradução e sumarização em paralelo")
.channels(&["input", "translation", "summary", "result"])
.node(translator_node)
.node(summarizer_node)
.node_fn("combine", |ctx| async move {
let translation = ctx.get("translation").and_then(|v| v.as_str()).unwrap_or("N/A");
let summary = ctx.get("summary").and_then(|v| v.as_str()).unwrap_or("N/A");
let result = format!(
"=== Processing Complete ===\n\n\
French Translation:\n{}\n\n\
Summary:\n{}",
translation, summary
);
Ok(NodeOutput::new().with_update("result", json!(result)))
})
// Execução paralela: ambos os nós iniciam simultaneamente
.edge(START, "translator")
.edge(START, "summarizer")
.edge("translator", "combine")
.edge("summarizer", "combine")
.edge("combine", END)
.build()?;
// Execute o grafo
let mut input = State::new();
input.insert("input".to_string(), json!("AI is transforming how we work and live."));
let result = agent.invoke(input, ExecutionConfig::new("thread-1")).await?;
println!("{}", result.get("result").and_then(|v| v.as_str()).unwrap_or(""));
Ok(())
}
Saída de Exemplo:
=== Processing Complete ===
French Translation:
L'IA transforme notre façon de travailler et de vivre.
Summary:
AI is revolutionizing work and daily life through technological transformation.
Como a Execução de Grafo Funciona
A Visão Geral
Agentes de grafo são executados em super-passos - todos os nós prontos rodam em paralelo, então o grafo espera que todos concluam antes do próximo passo:
PASSO 1: INÍCIO ──┬──▶ translator (executando)
└──▶ summarizer (executando)
⏳ Esperar ambos concluírem...
PASSO 2: translator ──┬──▶ combine (executando)
summarizer ──┘
⏳ Esperar 'combine' concluir...
PASSO 3: combine ──▶ FIM ✅
Fluxo de Estado Através dos Nós
Cada nó pode ler e escrever no estado compartilhado:
┌─────────────────────────────────────────────────────────────────────┐
│ PASSO 1: Estado inicial │
├─────────────────────────────────────────────────────────────────────┤
│ │
│ Estado: { "input": "A IA está transformando a forma como trabalhamos" } │
│ │
│ ↓ │
│ │
│ ┌──────────────────┐ ┌──────────────────┐ │
│ │ translator │ │ summarizer │ │
│ │ lê "input" │ │ lê "input" │ │
│ └──────────────────┘ └──────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────────┘
↓
┌─────────────────────────────────────────────────────────────────────┐
│ PASSO 2: Após execução paralela │
├─────────────────────────────────────────────────────────────────────┤
│ │
│ Estado: { │
│ "input": "A IA está transformando a forma como trabalhamos", │
│ "translation": "A IA transforma a nossa forma de trabalhar", │
│ "summary": "A IA está revolucionando o trabalho através da tecnologia" │
│ } │
│ │
│ ↓ │
│ │
│ ┌──────────────────────────────────────┐ │
│ │ combine │ │
│ │ lê "translation" + "summary" │ │
│ │ escreve "result" │ │
│ └──────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────────┘
↓
┌─────────────────────────────────────────────────────────────────────┐
│ PASSO 3: Estado final │
├─────────────────────────────────────────────────────────────────────┤
│ │
│ Estado: { │
│ "input": "A IA está transformando a forma como trabalhamos", │
│ "translation": "A IA transforma a nossa forma de trabalhar", │
│ "summary": "A IA está revolucionando o trabalho através da tecnologia", │
│ "result": "=== Processamento Concluído ===\n\nFrancês..." │
│ } │
│ │
└─────────────────────────────────────────────────────────────────────┘
O Que o Faz Funcionar
| Componente | Função |
|---|---|
AgentNode | Encapsula agentes LLM com mapeadores de entrada/saída |
input_mapper | Transforma o estado → entrada do agente Content |
output_mapper | Transforma eventos do agente → atualizações de estado |
channels | Declara os campos de estado que o grafo usará |
edge() | Define o fluxo de execução entre os nós |
ExecutionConfig | Fornece ID de thread para pontos de verificação |
Roteamento Condicional com Classificação LLM
Crie sistemas de roteamento inteligentes onde LLMs decidem o caminho de execução:
Visual: Roteamento Baseado em Sentimento
┌─────────────────────┐
Feedback do Usuário│ │
────────────────▶ │ CLASSIFICADOR │
│ 🧠 Analisar tom │
└──────────┬──────────┘
│
┌───────────────┼───────────────┐
│ │ │
▼ ▼ ▼
┌──────────────────┐ ┌──────────────────┐ ┌──────────────────┐
│ POSITIVO │ │ NEGATIVO │ │ NEUTRO │
│ │ │ │ │ │
│ 😊 Obrigado! │ │ 😔 Pedir desculpas│ │ 😐 Fazer mais │
│ Comemorar │ │ Ajudar a corrigir│ │ perguntas │
└──────────────────┘ └──────────────────┘ └──────────────────┘
Exemplo de Código Completo
use adk_agent::LlmAgentBuilder;
use adk_graph::{
edge::{END, Router, START},
graph::StateGraph,
node::{AgentNode, ExecutionConfig},
state::State,
};
use adk_model::GeminiModel;
use serde_json::json;
use std::sync::Arc;
#[tokio::main]
async fn main() -> anyhow::Result<()> {
dotenvy::dotenv().ok();
let api_key = std::env::var("GOOGLE_API_KEY")?;
let model = Arc::new(GeminiModel::new(&api_key, "gemini-2.0-flash")?);
// Criar agente classificador
let classifier_agent = Arc::new(
LlmAgentBuilder::new("classifier")
.description("Classifies text sentiment")
.model(model.clone())
.instruction(
"You are a sentiment classifier. Analyze the input text and respond with \
ONLY one word: 'positive', 'negative', or 'neutral'. Nothing else.",
)
.build()?,
);
// Criar agentes de resposta para cada sentimento
let positive_agent = Arc::new(
LlmAgentBuilder::new("positive")
.description("Handles positive feedback")
.model(model.clone())
.instruction(
"You are a customer success specialist. The customer has positive feedback. \
Express gratitude, reinforce the positive experience, and suggest ways to \
share their experience. Be warm and appreciative. Keep response under 3 sentences.",
)
.build()?,
);
let negative_agent = Arc::new(
LlmAgentBuilder::new("negative")
.description("Handles negative feedback")
.model(model.clone())
.instruction(
"You are a customer support specialist. The customer has a complaint. \
Acknowledge their frustration, apologize sincerely, and offer help. \
Be empathetic. Keep response under 3 sentences.",
)
.build()?,
);
let neutral_agent = Arc::new(
LlmAgentBuilder::new("neutral")
.description("Handles neutral feedback")
.model(model.clone())
.instruction(
"You are a customer service representative. The customer has neutral feedback. \
Ask clarifying questions to better understand their needs. Be helpful and curious. \
Keep response under 3 sentences.",
)
.build()?,
);
// Criar AgentNodes com mappers
let classifier_node = AgentNode::new(classifier_agent)
.with_input_mapper(|state| {
let text = state.get("feedback").and_then(|v| v.as_str()).unwrap_or("");
adk_core::Content::new("user").with_text(text)
})
.with_output_mapper(|events| {
let mut updates = std::collections::HashMap::new();
for event in events {
if let Some(content) = event.content() {
let text: String = content.parts.iter()
.filter_map(|p| p.text())
.collect::<Vec<_>>()
.join("")
.to_lowercase()
.trim()
.to_string();
let sentiment = if text.contains("positive") { "positive" }
else if text.contains("negative") { "negative" }
else { "neutral" };
updates.insert("sentiment".to_string(), json!(sentiment));
}
}
updates
});
// Nós de resposta (padrão similar para cada)
let positive_node = AgentNode::new(positive_agent)
.with_input_mapper(|state| {
let text = state.get("feedback").and_then(|v| v.as_str()).unwrap_or("");
adk_core::Content::new("user").with_text(text)
})
.with_output_mapper(|events| {
let mut updates = std::collections::HashMap::new();
for event in events {
if let Some(content) = event.content() {
let text: String = content.parts.iter()
.filter_map(|p| p.text())
.collect::<Vec<_>>()
.join("");
updates.insert("response".to_string(), json!(text));
}
}
updates
});
// Construir grafo com roteamento condicional
let graph = StateGraph::with_channels(&["feedback", "sentiment", "response"])
.add_node(classifier_node)
.add_node(positive_node)
// ... add negative_node and neutral_node similarly
.add_edge(START, "classifier")
.add_conditional_edges(
"classifier",
Router::by_field("sentiment"), // Roteia com base no campo de sentimento
[
("positive", "positive"),
("negative", "negative"),
("neutral", "neutral"),
],
)
.add_edge("positive", END)
.add_edge("negative", END)
.add_edge("neutral", END)
.compile()?;
// Testar com diferentes feedbacks
let mut input = State::new();
input.insert("feedback".to_string(), json!("Your product is amazing! I love it!"));
let result = graph.invoke(input, ExecutionConfig::new("feedback-1")).await?;
println!("Sentiment: {}", result.get("sentiment").and_then(|v| v.as_str()).unwrap_or(""));
println!("Response: {}", result.get("response").and_then(|v| v.as_str()).unwrap_or(""));
Ok(())
}
Exemplo de Fluxo:
Entrada: "Your product is amazing! I love it!"
↓
Classificador: "positive"
↓
Agente Positivo: "Muito obrigado pelo feedback maravilhoso!
Estamos muito felizes que você ame nosso produto.
Você consideraria deixar uma avaliação para ajudar outras pessoas?"
Padrão ReAct: Raciocínio + Ação
Crie agentes que podem usar ferramentas iterativamente para resolver problemas complexos:
Visual: Ciclo ReAct
┌─────────────────────┐
User Question │ │
────────────────▶ │ REASONER │
│ 🧠 Think + Act │
└──────────┬──────────┘
│
▼
┌─────────────────────┐
│ Has tool calls? │
│ │
└──────────┬──────────┘
│
┌───────────────┴───────────────┐
│ │
▼ ▼
┌──────────────────┐ ┌──────────────────┐
│ YES │ │ NO │
│ │ │ │
│ 🔄 Loop back │ │ ✅ Final answer │
│ to reasoner │ │ END │
└─────────┬────────┘ └──────────────────┘
│
└─────────────────┐
│
▼
┌─────────────────────┐
│ REASONER │
│ 🧠 Think + Act │
│ (next iteration) │
└─────────────────────┘
Exemplo Completo de ReAct
use adk_agent::LlmAgentBuilder;
use adk_core::{Part, Tool};
use adk_graph::{
edge::{END, START},
graph::StateGraph,
node::{AgentNode, ExecutionConfig, NodeOutput},
state::State,
};
use adk_model::GeminiModel;
use adk_tool::FunctionTool;
use serde_json::json;
use std::sync::Arc;
#[tokio::main]
async fn main() -> anyhow::Result<()> {
dotenvy::dotenv().ok();
let api_key = std::env::var("GOOGLE_API_KEY")?;
let model = Arc::new(GeminiModel::new(&api_key, "gemini-2.0-flash")?);
// Create tools
let weather_tool = Arc::new(FunctionTool::new(
"get_weather",
"Get the current weather for a location. Takes a 'location' parameter (city name).",
|_ctx, args| async move {
let location = args.get("location").and_then(|v| v.as_str()).unwrap_or("unknown");
Ok(json!({
"location": location,
"temperature": "72°F",
"condition": "Sunny",
"humidity": "45%"
}))
},
)) as Arc<dyn Tool>;
let calculator_tool = Arc::new(FunctionTool::new(
"calculator",
"Perform mathematical calculations. Takes an 'expression' parameter (string).",
|_ctx, args| async move {
let expr = args.get("expression").and_then(|v| v.as_str()).unwrap_or("0");
let result = match expr {
"2 + 2" => "4",
"10 * 5" => "50",
"100 / 4" => "25",
"15 - 7" => "8",
_ => "Unable to evaluate",
};
Ok(json!({ "result": result, "expression": expr }))
},
)) as Arc<dyn Tool>;
// Create reasoner agent with tools
let reasoner_agent = Arc::new(
LlmAgentBuilder::new("reasoner")
.description("Reasoning agent with tools")
.model(model.clone())
.instruction(
"You are a helpful assistant with access to tools. Use tools when needed to answer questions. \
When you have enough information, provide a final answer without using more tools.",
)
.tool(weather_tool)
.tool(calculator_tool)
.build()?,
);
// Create reasoner node that detects tool usage
let reasoner_node = AgentNode::new(reasoner_agent)
.with_input_mapper(|state| {
let question = state.get("question").and_then(|v| v.as_str()).unwrap_or("");
adk_core::Content::new("user").with_text(question)
})
.with_output_mapper(|events| {
let mut updates = std::collections::HashMap::new();
let mut has_tool_calls = false;
let mut response = String::new();
for event in events {
if let Some(content) = event.content() {
for part in &content.parts {
match part {
Part::FunctionCall { .. } => {
has_tool_calls = true;
}
Part::Text { text } => {
response.push_str(text);
}
_ => {}
}
}
}
}
updates.insert("has_tool_calls".to_string(), json!(has_tool_calls));
updates.insert("response".to_string(), json!(response));
updates
});
// Build ReAct graph with cycle
let graph = StateGraph::with_channels(&["question", "has_tool_calls", "response", "iteration"])
.add_node(reasoner_node)
.add_node_fn("counter", |ctx| async move {
let i = ctx.get("iteration").and_then(|v| v.as_i64()).unwrap_or(0);
Ok(NodeOutput::new().with_update("iteration", json!(i + 1)))
})
.add_edge(START, "counter")
.add_edge("counter", "reasoner")
.add_conditional_edges(
"reasoner",
|state| {
let has_tools = state.get("has_tool_calls").and_then(|v| v.as_bool()).unwrap_or(false);
let iteration = state.get("iteration").and_then(|v| v.as_i64()).unwrap_or(0);
// Safety limit
if iteration >= 5 { return END.to_string(); }
if has_tools {
"counter".to_string() // Loop back for more reasoning
} else {
END.to_string() // Done - final answer
}
},
[("counter", "counter"), (END, END)],
)
.compile()?
.with_recursion_limit(10);
// Test the ReAct agent
let mut input = State::new();
input.insert("question".to_string(), json!("What's the weather in Paris and what's 15 + 25?"));
let result = graph.invoke(input, ExecutionConfig::new("react-1")).await?;
println!("Final answer: {}", result.get("response").and_then(|v| v.as_str()).unwrap_or(""));
println!("Iterations: {}", result.get("iteration").and_then(|v| v.as_i64()).unwrap_or(0));
Ok(())
}
Fluxo de Exemplo:
Question: "Qual é o clima em Paris e quanto é 15 + 25?"
Iteração 1:
Reasoner: "Preciso obter informações sobre o clima e fazer contas"
→ Chama get_weather(location="Paris") e calculator(expression="15 + 25")
→ has_tool_calls = true → Retorna ao início
Iteração 2:
Reasoner: "Com base nos resultados: Paris está a 72°F e ensolarado, 15 + 25 = 40"
→ Nenhuma chamada de ferramenta → has_tool_calls = false → END
Resposta Final: "O clima em Paris está a 72°F e ensolarado com 45% de umidade.
E 15 + 25 é igual a 40."
AgentNode
Empacota qualquer Agent do ADK (tipicamente LlmAgent) como um nó de grafo:
let node = AgentNode::new(llm_agent)
.with_input_mapper(|state| {
// Transform graph state to agent input Content
let text = state.get("input").and_then(|v| v.as_str()).unwrap_or("");
adk_core::Content::new("user").with_text(text)
})
.with_output_mapper(|events| {
// Transform agent events to state updates
let mut updates = std::collections::HashMap::new();
for event in events {
if let Some(content) = event.content() {
let text: String = content.parts.iter()
.filter_map(|p| p.text())
.collect::<Vec<_>>()
.join("");
updates.insert("output".to_string(), json!(text));
}
}
updates
});
Nós de Função
Funções assíncronas simples que processam o estado:
.node_fn("process", |ctx| async move {
let input = ctx.state.get("input").unwrap();
let output = process_data(input).await?;
Ok(NodeOutput::new().with_update("output", output))
})
Tipos de Aresta
Arestas Estáticas
Conexões diretas entre nós:
.edge(START, "first_node")
.edge("first_node", "second_node")
.edge("second_node", END)
Arestas Condicionais
Roteamento dinâmico baseado no estado:
.conditional_edge(
"router",
|state| {
match state.get("next").and_then(|v| v.as_str()) {
Some("research") => "research_node".to_string(),
Some("write") => "write_node".to_string(),
_ => END.to_string(),
}
},
[
("research_node", "research_node"),
("write_node", "write_node"),
(END, END),
],
)
Auxiliares de Roteador
Use roteadores embutidos para padrões comuns:
use adk_graph::edge::Router;
// Route based on a state field value
.conditional_edge("classifier", Router::by_field("sentiment"), [
("positive", "positive_handler"),
("negative", "negative_handler"),
("neutral", "neutral_handler"),
])
// Route based on boolean field
.conditional_edge("check", Router::by_bool("approved"), [
("true", "execute"),
("false", "reject"),
])
// Limit iterations
.conditional_edge("loop", Router::max_iterations("count", 5), [
("continue", "process"),
("done", END),
])
Execução Paralela
Múltiplas arestas de um único nó são executadas em paralelo:
let agent = GraphAgent::builder("parallel_processor")
.channels(&["input", "translation", "summary", "analysis"])
.node(translator_node)
.node(summarizer_node)
.node(analyzer_node)
.node(combiner_node)
// All three start simultaneously
.edge(START, "translator")
.edge(START, "summarizer")
.edge(START, "analyzer")
// Wait for all to complete before combining
.edge("translator", "combiner")
.edge("summarizer", "combiner")
.edge("analyzer", "combiner")
.edge("combiner", END)
.build()?;
Grafos Cíclicos (Padrão ReAct)
Construa agentes de raciocínio iterativo com ciclos:
use adk_core::Part;
// Create agent with tools
let reasoner = Arc::new(
LlmAgentBuilder::new("reasoner")
.model(model)
.instruction("Use tools to answer questions. Provide final answer when done.")
.tool(search_tool)
.tool(calculator_tool)
.build()?
);
let reasoner_node = AgentNode::new(reasoner)
.with_input_mapper(|state| {
let question = state.get("question").and_then(|v| v.as_str()).unwrap_or("");
adk_core::Content::new("user").with_text(question)
})
.with_output_mapper(|events| {
let mut updates = std::collections::HashMap::new();
let mut has_tool_calls = false;
let mut response = String::new();
for event in events {
if let Some(content) = event.content() {
for part in &content.parts {
match part {
Part::FunctionCall { name, .. } => {
has_tool_calls = true;
}
Part::Text { text } => {
response.push_str(text);
}
_ => {}
}
}
}
}
updates.insert("has_tool_calls".to_string(), json!(has_tool_calls));
updates.insert("response".to_string(), json!(response));
updates
});
// Build graph with cycle
let react_agent = StateGraph::with_channels(&["question", "has_tool_calls", "response", "iteration"])
.add_node(reasoner_node)
.add_node_fn("counter", |ctx| async move {
let i = ctx.get("iteration").and_then(|v| v.as_i64()).unwrap_or(0);
Ok(NodeOutput::new().with_update("iteration", json!(i + 1)))
})
.add_edge(START, "counter")
.add_edge("counter", "reasoner")
.add_conditional_edges(
"reasoner",
|state| {
let has_tools = state.get("has_tool_calls").and_then(|v| v.as_bool()).unwrap_or(false);
let iteration = state.get("iteration").and_then(|v| v.as_i64()).unwrap_or(0);
// Safety limit
if iteration >= 5 { return END.to_string(); }
if has_tools {
"counter".to_string() // Loop back
} else {
END.to_string() // Done
}
},
[("counter", "counter"), (END, END)],
)
.compile()?
.with_recursion_limit(10);
Supervisor Multiagente
Encaminha tarefas para agentes especialistas:
// Create supervisor agent
let supervisor = Arc::new(
LlmAgentBuilder::new("supervisor")
.model(model.clone())
.instruction("Route tasks to: researcher, writer, or coder. Reply with agent name only.")
.build()?
);
let supervisor_node = AgentNode::new(supervisor)
.with_output_mapper(|events| {
let mut updates = std::collections::HashMap::new();
for event in events {
if let Some(content) = event.content() {
let text: String = content.parts.iter()
.filter_map(|p| p.text())
.collect::<Vec<_>>()
.join("")
.to_lowercase();
let next = if text.contains("researcher") { "researcher" }
else if text.contains("writer") { "writer" }
else if text.contains("coder") { "coder" }
else { "done" };
updates.insert("next_agent".to_string(), json!(next));
}
}
updates
});
// Build supervisor graph
let graph = StateGraph::with_channels(&["task", "next_agent", "research", "content", "code"])
.add_node(supervisor_node)
.add_node(researcher_node)
.add_node(writer_node)
.add_node(coder_node)
.add_edge(START, "supervisor")
.add_conditional_edges(
"supervisor",
Router::by_field("next_agent"),
[
("researcher", "researcher"),
("writer", "writer"),
("coder", "coder"),
("done", END),
],
)
// Agents report back to supervisor
.add_edge("researcher", "supervisor")
.add_edge("writer", "supervisor")
.add_edge("coder", "supervisor")
.compile()?;
Gerenciamento de Estado
Esquema de Estado com Reducers
Controla como as atualizações de estado são mescladas:
let schema = StateSchema::builder()
.channel("current_step") // Overwrite (padrão)
.list_channel("messages") // Adicionar à lista
.channel_with_reducer("count", Reducer::Sum) // Somar valores
.channel_with_reducer("data", Reducer::Custom(Arc::new(|old, new| {
// Lógica de mesclagem customizada
merge_json(old, new)
})))
.build();
let agent = GraphAgent::builder("stateful")
.state_schema(schema)
// ... nodes and edges
.build()?;
Tipos de Reducer
| Reducer | Comportamento |
|---|---|
Overwrite | Substituir valor antigo por novo (padrão) |
Append | Anexar à lista |
Sum | Adicionar valores numéricos |
Custom | Função de mesclagem customizada |
Checkpointing
Habilita estado persistente para tolerância a falhas e human-in-the-loop:
Em Memória (Desenvolvimento)
use adk_graph::checkpoint::MemoryCheckpointer;
let checkpointer = Arc::new(MemoryCheckpointer::new());
let graph = StateGraph::with_channels(&["task", "result"])
// ... nodes and edges
.compile()?
.with_checkpointer_arc(checkpointer.clone());
SQLite (Produção)
use adk_graph::checkpoint::SqliteCheckpointer;
let checkpointer = SqliteCheckpointer::new("checkpoints.db").await?;
let graph = StateGraph::with_channels(&["task", "result"])
// ... nodes and edges
.compile()?
.with_checkpointer(checkpointer);
Histórico de Checkpoint (Viagem no Tempo)
// Listar todos os checkpoints para uma thread
let checkpoints = checkpointer.list("thread-id").await?;
for cp in checkpoints {
println!("Step {}: {:?}", cp.step, cp.state.get("status"));
}
// Carregar um checkpoint específico
if let Some(checkpoint) = checkpointer.load_by_id(&checkpoint_id).await? {
println!("State at step {}: {:?}", checkpoint.step, checkpoint.state);
}
Intervenção Humana (Human-in-the-Loop)
Pause a execução para aprovação humana usando interrupções dinâmicas:
use adk_graph::{error::GraphError, node::NodeOutput};
// Planner agent assesses risk
let planner_node = AgentNode::new(planner_agent)
.with_output_mapper(|events| {
let mut updates = std::collections::HashMap::new();
for event in events {
if let Some(content) = event.content() {
let text: String = content.parts.iter()
.filter_map(|p| p.text())
.collect::<Vec<_>>()
.join("");
// Extract risk level from LLM response
let risk = if text.to_lowercase().contains("risk: high") { "high" }
else if text.to_lowercase().contains("risk: medium") { "medium" }
else { "low" };
updates.insert("plan".to_string(), json!(text));
updates.insert("risk_level".to_string(), json!(risk));
}
}
updates
});
// Review node with dynamic interrupt
let graph = StateGraph::with_channels(&["task", "plan", "risk_level", "approved", "result"])
.add_node(planner_node)
.add_node(executor_node)
.add_node_fn("review", |ctx| async move {
let risk = ctx.get("risk_level").and_then(|v| v.as_str()).unwrap_or("low");
let approved = ctx.get("approved").and_then(|v| v.as_bool());
// Already approved - continue
if approved == Some(true) {
return Ok(NodeOutput::new());
}
// High/medium risk - interrupt for approval
if risk == "high" || risk == "medium" {
return Ok(NodeOutput::interrupt_with_data(
&format!("{} RISK: Human approval required", risk.to_uppercase()),
json!({
"plan": ctx.get("plan"),
"risk_level": risk,
"action": "Set 'approved' to true to continue"
})
));
}
// Low risk - auto-approve
Ok(NodeOutput::new().with_update("approved", json!(true)))
})
.add_edge(START, "planner")
.add_edge("planner", "review")
.add_edge("review", "executor")
.add_edge("executor", END)
.compile()?
.with_checkpointer_arc(checkpointer.clone());
// Execute - may pause for approval
let thread_id = "task-001";
let result = graph.invoke(input, ExecutionConfig::new(thread_id)).await;
match result {
Err(GraphError::Interrupted(interrupt)) => {
println!("*** EXECUTION PAUSED ***");
println!("Reason: {}", interrupt.interrupt);
println!("Plan awaiting approval: {:?}", interrupt.state.get("plan"));
// Human reviews and approves...
// Update state with approval
graph.update_state(thread_id, [("approved".to_string(), json!(true))]).await?;
// Resume execution
let final_result = graph.invoke(State::new(), ExecutionConfig::new(thread_id)).await?;
println!("Final result: {:?}", final_result.get("result"));
}
Ok(result) => {
println!("Completed without interrupt: {:?}", result);
}
Err(e) => {
println!("Error: {}", e);
}
}
Interrupções Estáticas
Use interrupt_before ou interrupt_after para pontos de pausa obrigatórios:
let graph = StateGraph::with_channels(&["task", "plan", "result"])
.add_node(planner_node)
.add_node(executor_node)
.add_edge(START, "planner")
.add_edge("planner", "executor")
.add_edge("executor", END)
.compile()?
.with_interrupt_before(&["executor"]); // Always pause before execution
Execução em Streaming
Transmita eventos conforme o grafo é executado:
use futures::StreamExt;
use adk_graph::stream::StreamMode;
let stream = agent.stream(input, config, StreamMode::Updates);
while let Some(event) = stream.next().await {
match event? {
StreamEvent::NodeStart(name) => println!("Starting: {}", name),
StreamEvent::Updates { node, updates } => {
println!("{} updated state: {:?}", node, updates);
}
StreamEvent::NodeEnd(name) => println!("Completed: {}", name),
StreamEvent::Done(state) => println!("Final state: {:?}", state),
_ => {}
}
}
Modos de Streaming
| Modo | Descrição |
|---|---|
Values | Transmite o estado completo após cada nó |
Updates | Transmite apenas as mudanças de estado |
Messages | Transmite atualizações do tipo mensagem |
Debug | Transmite todos os eventos internos |
Integração ADK
GraphAgent implementa a trait ADK Agent, então funciona com:
- Runner: Use com
adk-runnerpara execução padrão - Callbacks: Suporte completo para callbacks before/after
- Sessions: Funciona com
adk-sessionpara histórico de conversas - Streaming: Retorna ADK
EventStream
use adk_runner::Runner;
let graph_agent = GraphAgent::builder("workflow")
.before_agent_callback(|ctx| async {
println!("Starting graph execution for session: {}", ctx.session_id());
Ok(())
})
.after_agent_callback(|ctx, event| async {
if let Some(content) = event.content() {
println!("Graph completed with content");
}
Ok(())
})
// ... graph definition
.build()?;
// GraphAgent implements Agent trait - use with Launcher or Runner
// See adk-runner README for Runner configuration
Exemplos
Todos os exemplos usam integração real de LLM com AgentNode:
# Agentes LLM paralelos com callbacks before/after
cargo run --example graph_agent
# Pipeline multiagente sequencial (extrator → analisador → formatador)
cargo run --example graph_workflow
# Classificação de sentimento baseada em LLM e roteamento condicional
cargo run --example graph_conditional
# Padrão ReAct com tools e execução cíclica
cargo run --example graph_react
# Supervisor multiagente roteando para especialistas
cargo run --example graph_supervisor
# Humano-no-loop com interrupções baseadas em risco
cargo run --example graph_hitl
# Checkpointing e depuração com "viagem no tempo"
cargo run --example graph_checkpoint
Comparação com LangGraph
| Funcionalidade | LangGraph | adk-graph |
|---|---|---|
| Gerenciamento de Estado | TypedDict + Reducers | StateSchema + Reducers |
| Modelo de Execução | Pregel super-steps | Pregel super-steps |
| Checkpointing | Memória, SQLite, Postgres | Memória, SQLite |
| Humano-no-Loop | interrupt_before/after | interrupt_before/after + dinâmico |
| Streaming | 5 modos | 5 modos |
| Ciclos | Suporte nativo | Suporte nativo |
| Segurança de Tipo | Tipagem Python | Sistema de tipos Rust |
| Integração LLM | LangChain | AgentNode + ADK agents |
Anterior: ← Sistemas Multiagente | Próximo: Agentes em Tempo Real →