Agents de graphe
Construisez des workflows complexes et avec état en utilisant l'orchestration de style LangGraph avec l'intégration native ADK-Rust.
Vue d'ensemble
GraphAgent vous permet de définir des workflows sous forme de graphes dirigés avec des nœuds et des arêtes, prenant en charge :
- AgentNode : Enveloppe les agents LLM comme des nœuds de graphe avec des mappers d'entrée/sortie personnalisés
- Cyclic Workflows : Prise en charge native des boucles et du raisonnement itératif (modèle ReAct)
- Conditional Routing : Routage dynamique des arêtes basé sur l'état
- State Management : État typé avec des réducteurs (écrasement, ajout, somme, personnalisé)
- Checkpointing : État persistant pour la tolérance aux pannes et l'intervention humaine
- Streaming : Plusieurs modes de streaming (valeurs, mises à jour, messages, débogage)
Le crate adk-graph fournit une orchestration de workflow de style LangGraph pour construire des workflows d'agent complexes et avec état. Il apporte des capacités de workflow basées sur des graphes à l'écosystème ADK-Rust tout en maintenant une compatibilité totale avec le système d'agents d'ADK.
Avantages Clés :
- Visual Workflow Design : Définissez une logique complexe sous forme de graphes intuitifs de nœuds et d'arêtes
- Parallel Execution : Plusieurs nœuds peuvent s'exécuter simultanément pour une meilleure performance
- State Persistence : Point de contrôle intégré pour la tolérance aux pannes et l'intervention humaine
- LLM Integration : Prise en charge native pour l'enveloppement des agents ADK en tant que nœuds de graphe
- Flexible Routing : Arêtes statiques, routage conditionnel et prise de décision dynamique
Ce que vous allez construire
Dans ce guide, vous allez créer un pipeline de traitement de texte qui exécute la traduction et la synthèse en parallèle :
┌─────────────────────┐
User Input │ │
────────────────▶ │ START │
│ │
└──────────┬──────────┘
│
┌───────────────┴───────────────┐
│ │
▼ ▼
┌──────────────────┐ ┌──────────────────┐
│ TRANSLATOR │ │ SUMMARIZER │
│ │ │ │
│ 🇫🇷 French │ │ 📝 One sentence │
│ Translation │ │ Summary │
└─────────┬────────┘ └─────────┬────────┘
│ │
└───────────────┬───────────────┘
│
▼
┌─────────────────────┐
│ COMBINE │
│ │
│ 📋 Merge Results │
└──────────┬──────────┘
│
▼
┌─────────────────────┐
│ END │
│ │
│ ✅ Complete │
└─────────────────────┘
Concepts Clés :
- Nodes - Unités de traitement qui effectuent le travail (agents LLM, fonctions ou logique personnalisée)
- Edges - Flux de contrôle entre les nodes (connexions statiques ou routage conditionnel)
- State - Données partagées qui circulent dans le graphe et persistent entre les nodes
- Exécution Parallèle - Plusieurs nodes peuvent s'exécuter simultanément pour de meilleures performances
Comprendre les composants principaux
🔧 Nodes : Les Ouvriers Les nodes sont l'endroit où le travail réel se produit. Chaque node peut :
- AgentNode : Envelopper un agent LLM pour traiter le langage naturel
- Function Node : Exécuter du code Rust personnalisé pour le traitement des données
- Built-in Nodes : Utiliser une logique prédéfinie comme les compteurs ou les validateurs
Pensez aux nodes comme à des ouvriers spécialisés dans une chaîne de montage - chacun a un travail et une expertise spécifiques.
🔀 Edges : Le Contrôle de Flux Les edges déterminent comment l'exécution se déplace à travers votre graphe :
- Static Edges : Connexions directes (
A → B → C) - Conditional Edges : Routage dynamique basé sur l'état (
if sentiment == "positive" → positive_handler) - Parallel Edges : Plusieurs chemins à partir d'un node (
START → [translator, summarizer])
Les edges sont comme des feux de signalisation et des panneaux routiers qui dirigent le flux de travail.
💾 State : La Mémoire Partagée Le State est un magasin clé-valeur que tous les nodes peuvent lire et écrire :
- Input Data : Informations initiales alimentant le graphe
- Intermediate Results : La sortie d'un node devient l'entrée d'un autre
- Final Output : Le résultat final après tout le traitement
Le State agit comme un tableau blanc partagé où les nodes peuvent laisser des informations que d'autres peuvent utiliser.
⚡ Exécution Parallèle : L'Accélérateur de Vitesse Lorsque plusieurs edges quittent un node, ces nodes cibles s'exécutent simultanément :
- Traitement plus rapide : Les tâches indépendantes s'exécutent en même temps
- Efficacité des ressources : Meilleure utilisation du CPU et des E/S
- Évolutivité : Gérer des flux de travail plus complexes sans ralentissement linéaire
C'est comme avoir plusieurs ouvriers qui s'attaquent simultanément à différentes parties d'un travail au lieu de faire la queue.
Démarrage Rapide
1. Créer Votre Projet
cargo new graph_demo
cd graph_demo
Ajoutez les dépendances à Cargo.toml :
[dependencies]
adk-graph = { version = "0.2", features = ["sqlite"] }
adk-agent = "0.2"
adk-model = "0.2"
adk-core = "0.2"
tokio = { version = "1", features = ["full"] }
dotenvy = "0.15"
serde_json = "1.0"
Créez .env avec votre clé API :
echo 'GOOGLE_API_KEY=your-api-key' > .env
2. Exemple de Traitement Parallèle
Voici un exemple complet et fonctionnel qui traite du texte en parallèle :
use adk_agent::LlmAgentBuilder;
use adk_graph::{
agent::GraphAgent,
edge::{END, START},
node::{AgentNode, ExecutionConfig, NodeOutput},
state::State,
};
use adk_model::GeminiModel;
use serde_json::json;
use std::sync::Arc;
#[tokio::main]
async fn main() -> anyhow::Result<()> {
dotenvy::dotenv().ok();
let api_key = std::env::var("GOOGLE_API_KEY")?;
let model = Arc::new(GeminiModel::new(&api_key, "gemini-2.0-flash")?);
// Create specialized LLM agents
let translator_agent = Arc::new(
LlmAgentBuilder::new("translator")
.description("Translates text to French")
.model(model.clone())
.instruction("Translate the input text to French. Only output the translation.")
.build()?,
);
let summarizer_agent = Arc::new(
LlmAgentBuilder::new("summarizer")
.description("Summarizes text")
.model(model.clone())
.instruction("Summarize the input text in one sentence.")
.build()?,
);
// Wrap agents as graph nodes with input/output mappers
let translator_node = AgentNode::new(translator_agent)
.with_input_mapper(|state| {
let text = state.get("input").and_then(|v| v.as_str()).unwrap_or("");
adk_core::Content::new("user").with_text(text)
})
.with_output_mapper(|events| {
let mut updates = std::collections::HashMap::new();
for event in events {
if let Some(content) = event.content() {
let text: String = content.parts.iter()
.filter_map(|p| p.text())
.collect::<Vec<_>>()
.join("");
if !text.is_empty() {
updates.insert("translation".to_string(), json!(text));
}
}
}
updates
});
let summarizer_node = AgentNode::new(summarizer_agent)
.with_input_mapper(|state| {
let text = state.get("input").and_then(|v| v.as_str()).unwrap_or("");
adk_core::Content::new("user").with_text(text)
})
.with_output_mapper(|events| {
let mut updates = std::collections::HashMap::new();
for event in events {
if let Some(content) = event.content() {
let text: String = content.parts.iter()
.filter_map(|p| p.text())
.collect::<Vec<_>>()
.join("");
if !text.is_empty() {
updates.insert("summary".to_string(), json!(text));
}
}
}
updates
});
// Build the graph with parallel execution
let agent = GraphAgent::builder("text_processor")
.description("Processes text with translation and summarization in parallel")
.channels(&["input", "translation", "summary", "result"])
.node(translator_node)
.node(summarizer_node)
.node_fn("combine", |ctx| async move {
let translation = ctx.get("translation").and_then(|v| v.as_str()).unwrap_or("N/A");
let summary = ctx.get("summary").and_then(|v| v.as_str()).unwrap_or("N/A");
let result = format!(
"=== Processing Complete ===\n\n\
French Translation:\n{}\n\n\
Summary:\n{}",
translation, summary
);
Ok(NodeOutput::new().with_update("result", json!(result)))
})
// Parallel execution: both nodes start simultaneously
.edge(START, "translator")
.edge(START, "summarizer")
.edge("translator", "combine")
.edge("summarizer", "combine")
.edge("combine", END)
.build()?;
// Execute the graph
let mut input = State::new();
input.insert("input".to_string(), json!("AI is transforming how we work and live."));
let result = agent.invoke(input, ExecutionConfig::new("thread-1")).await?;
println!("{}", result.get("result").and_then(|v| v.as_str()).unwrap_or(""));
Ok(())
}
Exemple de Sortie :
=== Processing Complete ===
French Translation:
L'IA transforme notre façon de travailler et de vivre.
Summary:
AI is revolutionizing work and daily life through technological transformation.
Comment l'exécution de graphes fonctionne
Vue d'ensemble
Les agents de graphe s'exécutent en super-étapes - tous les nœuds prêts s'exécutent en parallèle, puis le graphe attend que tous soient terminés avant l'étape suivante :
Step 1: START ──┬──▶ translator (running)
└──▶ summarizer (running)
⏳ Attendre que les deux soient terminés...
Step 2: translator ──┬──▶ combine (running)
summarizer ──┘
⏳ Attendre que combine soit terminé...
Step 3: combine ──▶ END ✅
Flux d'état à travers les nœuds
Chaque nœud peut lire et écrire dans l'état partagé :
┌─────────────────────────────────────────────────────────────────────┐
│ STEP 1: État initial │
├─────────────────────────────────────────────────────────────────────┤
│ │
│ State: { "input": "AI is transforming how we work" } │
│ │
│ ↓ │
│ │
│ ┌──────────────────┐ ┌──────────────────┐ │
│ │ translator │ │ summarizer │ │
│ │ lit "input" │ │ lit "input" │ │
│ └──────────────────┘ └──────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────────┘
↓
┌─────────────────────────────────────────────────────────────────────┐
│ STEP 2: Après exécution parallèle │
├─────────────────────────────────────────────────────────────────────┤
│ │
│ State: { │
│ "input": "AI is transforming how we work", │
│ "translation": "L'IA transforme notre façon de travailler", │
│ "summary": "AI is revolutionizing work through technology" │
│ } │
│ │
│ ↓ │
│ │
│ ┌──────────────────────────────────────┐ │
│ │ combine │ │
│ │ lit "translation" + "summary" │ │
│ │ écrit "result" │ │
│ └──────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────────┘
↓
┌─────────────────────────────────────────────────────────────────────┐
│ STEP 3: État final │
├─────────────────────────────────────────────────────────────────────┤
│ │
│ State: { │
│ "input": "AI is transforming how we work", │
│ "translation": "L'IA transforme notre façon de travailler", │
│ "summary": "AI is revolutionizing work through technology", │
│ "result": "=== Processing Complete ===\n\nFrench..." │
│ } │
│ │
└─────────────────────────────────────────────────────────────────────┘
Ce qui fait que ça marche
| Composant | Rôle |
|---|---|
AgentNode | Encapsule les agents LLM avec des mappers d'entrée/sortie |
input_mapper | Transforme l'état → entrée agent Content |
output_mapper | Transforme les événements d'agent → mises à jour de l'état |
channels | Déclare les champs d'état que le graphe utilisera |
edge() | Définit le flux d'exécution entre les nœuds |
ExecutionConfig | Fournit l'ID de thread pour la sauvegarde des points de contrôle |
Routage Conditionnel avec Classification par LLM
Construisez des systèmes de routage intelligents où les LLM décident du chemin d'exécution :
Visuel : Routage Basé sur le Sentiment
┌─────────────────────┐
User Feedback │ │
────────────────▶ │ CLASSIFIER │
│ 🧠 Analyze tone │
└──────────┬──────────┘
│
┌───────────────┼───────────────┐
│ │ │
▼ ▼ ▼
┌──────────────────┐ ┌──────────────────┐ ┌──────────────────┐
│ POSITIVE │ │ NEGATIVE │ │ NEUTRAL │
│ │ │ │ │ │
│ 😊 Thank you! │ │ 😔 Apologize │ │ 😐 Ask more │
│ Celebrate │ │ Help fix │ │ questions │
└──────────────────┘ └──────────────────┘ └──────────────────┘
Exemple de Code Complet
use adk_agent::LlmAgentBuilder;
use adk_graph::{
edge::{END, Router, START},
graph::StateGraph,
node::{AgentNode, ExecutionConfig},
state::State,
};
use adk_model::GeminiModel;
use serde_json::json;
use std::sync::Arc;
#[tokio::main]
async fn main() -> anyhow::Result<()> {
dotenvy::dotenv().ok();
let api_key = std::env::var("GOOGLE_API_KEY")?;
let model = Arc::new(GeminiModel::new(&api_key, "gemini-2.0-flash")?);
// Create classifier agent
let classifier_agent = Arc::new(
LlmAgentBuilder::new("classifier")
.description("Classifies text sentiment")
.model(model.clone())
.instruction(
"You are a sentiment classifier. Analyze the input text and respond with \
ONLY one word: 'positive', 'negative', or 'neutral'. Nothing else.",
)
.build()?,
);
// Create response agents for each sentiment
let positive_agent = Arc::new(
LlmAgentBuilder::new("positive")
.description("Handles positive feedback")
.model(model.clone())
.instruction(
"You are a customer success specialist. The customer has positive feedback. \
Express gratitude, reinforce the positive experience, and suggest ways to \
share their experience. Be warm and appreciative. Keep response under 3 sentences.",
)
.build()?,
);
let negative_agent = Arc::new(
LlmAgentBuilder::new("negative")
.description("Handles negative feedback")
.model(model.clone())
.instruction(
"You are a customer support specialist. The customer has a complaint. \
Acknowledge their frustration, apologize sincerely, and offer help. \
Be empathetic. Keep response under 3 sentences.",
)
.build()?,
);
let neutral_agent = Arc::new(
LlmAgentBuilder::new("neutral")
.description("Handles neutral feedback")
.model(model.clone())
.instruction(
"You are a customer service representative. The customer has neutral feedback. \
Ask clarifying questions to better understand their needs. Be helpful and curious. \
Keep response under 3 sentences.",
)
.build()?,
);
// Create AgentNodes with mappers
let classifier_node = AgentNode::new(classifier_agent)
.with_input_mapper(|state| {
let text = state.get("feedback").and_then(|v| v.as_str()).unwrap_or("");
adk_core::Content::new("user").with_text(text)
})
.with_output_mapper(|events| {
let mut updates = std::collections::HashMap::new();
for event in events {
if let Some(content) = event.content() {
let text: String = content.parts.iter()
.filter_map(|p| p.text())
.collect::<Vec<_>>()
.join("")
.to_lowercase()
.trim()
.to_string();
let sentiment = if text.contains("positive") { "positive" }
else if text.contains("negative") { "negative" }
else { "neutral" };
updates.insert("sentiment".to_string(), json!(sentiment));
}
}
updates
});
// Response nodes (similar pattern for each)
let positive_node = AgentNode::new(positive_agent)
.with_input_mapper(|state| {
let text = state.get("feedback").and_then(|v| v.as_str()).unwrap_or("");
adk_core::Content::new("user").with_text(text)
})
.with_output_mapper(|events| {
let mut updates = std::collections::HashMap::new();
for event in events {
if let Some(content) = event.content() {
let text: String = content.parts.iter()
.filter_map(|p| p.text())
.collect::<Vec<_>>()
.join("");
updates.insert("response".to_string(), json!(text));
}
}
updates
});
// Build graph with conditional routing
let graph = StateGraph::with_channels(&["feedback", "sentiment", "response"])
.add_node(classifier_node)
.add_node(positive_node)
// ... add negative_node and neutral_node similarly
.add_edge(START, "classifier")
.add_conditional_edges(
"classifier",
Router::by_field("sentiment"), // Route based on sentiment field
[
("positive", "positive"),
("negative", "negative"),
("neutral", "neutral"),
],
)
.add_edge("positive", END)
.add_edge("negative", END)
.add_edge("neutral", END)
.compile()?;
// Test with different feedback
let mut input = State::new();
input.insert("feedback".to_string(), json!("Your product is amazing! I love it!"));
let result = graph.invoke(input, ExecutionConfig::new("feedback-1")).await?;
println!("Sentiment: {}", result.get("sentiment").and_then(|v| v.as_str()).unwrap_or(""));
println!("Response: {}", result.get("response").and_then(|v| v.as_str()).unwrap_or(""));
Ok(())
}
Exemple de Flux :
Input: "Your product is amazing! I love it!"
↓
Classifier: "positive"
↓
Positive Agent: "Thank you so much for the wonderful feedback!
We're thrilled you love our product.
Would you consider leaving a review to help others?"
Modèle ReAct : Raisonnement + Action
Créez des agents capables d'utiliser des outils de manière itérative pour résoudre des problèmes complexes :
Visuel : Cycle ReAct
┌─────────────────────┐
User Question │ │
────────────────▶ │ REASONER │
│ 🧠 Think + Act │
└──────────┬──────────┘
│
▼
┌─────────────────────┐
│ Has tool calls? │
│ │
└──────────┬──────────┘
│
┌───────────────┴───────────────┐
│ │
▼ ▼
┌──────────────────┐ ┌──────────────────┐
│ YES │ │ NO │
│ │ │ │
│ 🔄 Loop back │ │ ✅ Final answer │
│ to reasoner │ │ END │
└─────────┬────────┘ └──────────────────┘
│
└─────────────────┐
│
▼
┌─────────────────────┐
│ REASONER │
│ 🧠 Think + Act │
│ (next iteration) │
└─────────────────────┘
Exemple ReAct Complet
use adk_agent::LlmAgentBuilder;
use adk_core::{Part, Tool};
use adk_graph::{
edge::{END, START},
graph::StateGraph,
node::{AgentNode, ExecutionConfig, NodeOutput},
state::State,
};
use adk_model::GeminiModel;
use adk_tool::FunctionTool;
use serde_json::json;
use std::sync::Arc;
#[tokio::main]
async fn main() -> anyhow::Result<()> {
dotenvy::dotenv().ok();
let api_key = std::env::var("GOOGLE_API_KEY")?;
let model = Arc::new(GeminiModel::new(&api_key, "gemini-2.0-flash")?);
// Créer des outils
let weather_tool = Arc::new(FunctionTool::new(
"get_weather",
"Get the current weather for a location. Takes a 'location' parameter (city name).",
|_ctx, args| async move {
let location = args.get("location").and_then(|v| v.as_str()).unwrap_or("unknown");
Ok(json!({
"location": location,
"temperature": "72°F",
"condition": "Sunny",
"humidity": "45%"
}))
},
)) as Arc<dyn Tool>;
let calculator_tool = Arc::new(FunctionTool::new(
"calculator",
"Perform mathematical calculations. Takes an 'expression' parameter (string).",
|_ctx, args| async move {
let expr = args.get("expression").and_then(|v| v.as_str()).unwrap_or("0");
let result = match expr {
"2 + 2" => "4",
"10 * 5" => "50",
"100 / 4" => "25",
"15 - 7" => "8",
_ => "Unable to evaluate",
};
Ok(json!({ "result": result, "expression": expr }))
},
)) as Arc<dyn Tool>;
// Créer un agent de raisonnement avec des outils
let reasoner_agent = Arc::new(
LlmAgentBuilder::new("reasoner")
.description("Reasoning agent with tools")
.model(model.clone())
.instruction(
"You are a helpful assistant with access to tools. Use tools when needed to answer questions. \
When you have enough information, provide a final answer without using more tools.",
)
.tool(weather_tool)
.tool(calculator_tool)
.build()?,
);
// Créer un nœud de raisonnement qui détecte l'utilisation d'outils
let reasoner_node = AgentNode::new(reasoner_agent)
.with_input_mapper(|state| {
let question = state.get("question").and_then(|v| v.as_str()).unwrap_or("");
adk_core::Content::new("user").with_text(question)
})
.with_output_mapper(|events| {
let mut updates = std::collections::HashMap::new();
let mut has_tool_calls = false;
let mut response = String::new();
for event in events {
if let Some(content) = event.content() {
for part in &content.parts {
match part {
Part::FunctionCall { .. } => {
has_tool_calls = true;
}
Part::Text { text } => {
response.push_str(text);
}
_ => {}
}
}
}
}
updates.insert("has_tool_calls".to_string(), json!(has_tool_calls));
updates.insert("response".to_string(), json!(response));
updates
});
// Construire un graphe ReAct avec cycle
let graph = StateGraph::with_channels(&["question", "has_tool_calls", "response", "iteration"])
.add_node(reasoner_node)
.add_node_fn("counter", |ctx| async move {
let i = ctx.get("iteration").and_then(|v| v.as_i64()).unwrap_or(0);
Ok(NodeOutput::new().with_update("iteration", json!(i + 1)))
})
.add_edge(START, "counter")
.add_edge("counter", "reasoner")
.add_conditional_edges(
"reasoner",
|state| {
let has_tools = state.get("has_tool_calls").and_then(|v| v.as_bool()).unwrap_or(false);
let iteration = state.get("iteration").and_then(|v| v.as_i64()).unwrap_or(0);
// Limite de sécurité
if iteration >= 5 { return END.to_string(); }
if has_tools {
"counter".to_string() // Retourner à la boucle pour plus de raisonnement
} else {
END.to_string() // Terminé - réponse finale
}
},
[("counter", "counter"), (END, END)],
)
.compile()?
.with_recursion_limit(10);
// Tester l'agent ReAct
let mut input = State::new();
input.insert("question".to_string(), json!("What's the weather in Paris and what's 15 + 25?"));
let result = graph.invoke(input, ExecutionConfig::new("react-1")).await?;
println!("Final answer: {}", result.get("response").and_then(|v| v.as_str()).unwrap_or(""));
println!("Iterations: {}", result.get("iteration").and_then(|v| v.as_i64()).unwrap_or(0));
Ok(())
}
Flux d'exemple :
Question: "What's the weather in Paris and what's 15 + 25?"
Iteration 1:
Reasoner: "I need to get weather info and do math"
→ Calls get_weather(location="Paris") and calculator(expression="15 + 25")
→ has_tool_calls = true → Loop back
Iteration 2:
Reasoner: "Based on the results: Paris is 72°F and sunny, 15 + 25 = 40"
→ No tool calls → has_tool_calls = false → END
Final Answer: "The weather in Paris is 72°F and sunny with 45% humidity.
And 15 + 25 equals 40."
AgentNode
Enveloppe tout Agent ADK (typiquement LlmAgent) en tant que nœud de graphe :
let node = AgentNode::new(llm_agent)
.with_input_mapper(|state| {
// Transformer l'état du graphe en Content d'entrée de l'agent
let text = state.get("input").and_then(|v| v.as_str()).unwrap_or("");
adk_core::Content::new("user").with_text(text)
})
.with_output_mapper(|events| {
// Transformer les événements de l'agent en mises à jour d'état
let mut updates = std::collections::HashMap::new();
for event in events {
if let Some(content) = event.content() {
let text: String = content.parts.iter()
.filter_map(|p| p.text())
.collect::<Vec<_>>()
.join("");
updates.insert("output".to_string(), json!(text));
}
}
updates
});
Nœuds de Fonction
Fonctions async simples qui traitent l'état :
.node_fn("process", |ctx| async move {
let input = ctx.state.get("input").unwrap();
let output = process_data(input).await?;
Ok(NodeOutput::new().with_update("output", output))
})
Types d'arêtes
Arêtes statiques
Connexions directes entre les nœuds :
.edge(START, "first_node")
.edge("first_node", "second_node")
.edge("second_node", END)
Arêtes conditionnelles
Routage dynamique basé sur l'état :
.conditional_edge(
"router",
|state| {
match state.get("next").and_then(|v| v.as_str()) {
Some("research") => "research_node".to_string(),
Some("write") => "write_node".to_string(),
_ => END.to_string(),
}
},
[
("research_node", "research_node"),
("write_node", "write_node"),
(END, END),
],
)
Assistants de routeur
Utilisez les routeurs intégrés pour les motifs courants :
use adk_graph::edge::Router;
// Routage basé sur la valeur d'un champ d'état
.conditional_edge("classifier", Router::by_field("sentiment"), [
("positive", "positive_handler"),
("negative", "negative_handler"),
("neutral", "neutral_handler"),
])
// Routage basé sur un champ booléen
.conditional_edge("check", Router::by_bool("approved"), [
("true", "execute"),
("false", "reject"),
])
// Limiter les itérations
.conditional_edge("loop", Router::max_iterations("count", 5), [
("continue", "process"),
("done", END),
])
Exécution parallèle
Plusieurs arêtes partant d'un même nœud s'exécutent en parallèle :
let agent = GraphAgent::builder("parallel_processor")
.channels(&["input", "translation", "summary", "analysis"])
.node(translator_node)
.node(summarizer_node)
.node(analyzer_node)
.node(combiner_node)
// Les trois démarrent simultanément
.edge(START, "translator")
.edge(START, "summarizer")
.edge(START, "analyzer")
// Attendre que tous soient terminés avant de combiner
.edge("translator", "combiner")
.edge("summarizer", "combiner")
.edge("analyzer", "combiner")
.edge("combiner", END)
.build()?;
Graphes Cycliques (Modèle ReAct)
Créez des agents de raisonnement itératif avec des cycles :
use adk_core::Part;
// Create agent with tools
let reasoner = Arc::new(
LlmAgentBuilder::new("reasoner")
.model(model)
.instruction("Use tools to answer questions. Provide final answer when done.")
.tool(search_tool)
.tool(calculator_tool)
.build()?
);
let reasoner_node = AgentNode::new(reasoner)
.with_input_mapper(|state| {
let question = state.get("question").and_then(|v| v.as_str()).unwrap_or("");
adk_core::Content::new("user").with_text(question)
})
.with_output_mapper(|events| {
let mut updates = std::collections::HashMap::new();
let mut has_tool_calls = false;
let mut response = String::new();
for event in events {
if let Some(content) = event.content() {
for part in &content.parts {
match part {
Part::FunctionCall { name, .. } => {
has_tool_calls = true;
}
Part::Text { text } => {
response.push_str(text);
}
_ => {}
}
}
}
}
updates.insert("has_tool_calls".to_string(), json!(has_tool_calls));
updates.insert("response".to_string(), json!(response));
updates
});
// Build graph with cycle
let react_agent = StateGraph::with_channels(&["question", "has_tool_calls", "response", "iteration"])
.add_node(reasoner_node)
.add_node_fn("counter", |ctx| async move {
let i = ctx.get("iteration").and_then(|v| v.as_i64()).unwrap_or(0);
Ok(NodeOutput::new().with_update("iteration", json!(i + 1)))
})
.add_edge(START, "counter")
.add_edge("counter", "reasoner")
.add_conditional_edges(
"reasoner",
|state| {
let has_tools = state.get("has_tool_calls").and_then(|v| v.as_bool()).unwrap_or(false);
let iteration = state.get("iteration").and_then(|v| v.as_i64()).unwrap_or(0);
// Safety limit
if iteration >= 5 { return END.to_string(); }
if has_tools {
"counter".to_string() // Loop back
} else {
END.to_string() // Done
}
},
[("counter", "counter"), (END, END)],
)
.compile()?
.with_recursion_limit(10);
Superviseur Multi-Agent
Achemine les tâches vers des agents spécialistes :
// Create supervisor agent
let supervisor = Arc::new(
LlmAgentBuilder::new("supervisor")
.model(model.clone())
.instruction("Route tasks to: researcher, writer, or coder. Reply with agent name only.")
.build()?
);
let supervisor_node = AgentNode::new(supervisor)
.with_output_mapper(|events| {
let mut updates = std::collections::HashMap::new();
for event in events {
if let Some(content) = event.content() {
let text: String = content.parts.iter()
.filter_map(|p| p.text())
.collect::<Vec<_>>()
.join("")
.to_lowercase();
let next = if text.contains("researcher") { "researcher" }
else if text.contains("writer") { "writer" }
else if text.contains("coder") { "coder" }
else { "done" };
updates.insert("next_agent".to_string(), json!(next));
}
}
updates
});
// Build supervisor graph
let graph = StateGraph::with_channels(&["task", "next_agent", "research", "content", "code"])
.add_node(supervisor_node)
.add_node(researcher_node)
.add_node(writer_node)
.add_node(coder_node)
.add_edge(START, "supervisor")
.add_conditional_edges(
"supervisor",
Router::by_field("next_agent"),
[
("researcher", "researcher"),
("writer", "writer"),
("coder", "coder"),
("done", END),
],
)
// Agents report back to supervisor
.add_edge("researcher", "supervisor")
.add_edge("writer", "supervisor")
.add_edge("coder", "supervisor")
.compile()?;
Gestion de l'état
Schéma d'état avec des Reducers
Contrôle la manière dont les mises à jour d'état sont fusionnées :
let schema = StateSchema::builder()
.channel("current_step") // Overwrite (default)
.list_channel("messages") // Append to list
.channel_with_reducer("count", Reducer::Sum) // Sum values
.channel_with_reducer("data", Reducer::Custom(Arc::new(|old, new| {
// Custom merge logic
merge_json(old, new)
})))
.build();
let agent = GraphAgent::builder("stateful")
.state_schema(schema)
// ... nodes and edges
.build()?;
Types de Reducers
| Reducer | Comportement |
|---|---|
Overwrite | Remplace l'ancienne valeur par la nouvelle (par défaut) |
Append | Ajoute à la liste |
Sum | Ajoute les valeurs numériques |
Custom | Fonction de fusion personnalisée |
Sauvegarde (Checkpointing)
Active l'état persistant pour la tolérance aux pannes et l'intervention humaine :
En mémoire (Développement)
use adk_graph::checkpoint::MemoryCheckpointer;
let checkpointer = Arc::new(MemoryCheckpointer::new());
let graph = StateGraph::with_channels(&["task", "result"])
// ... nodes and edges
.compile()?
.with_checkpointer_arc(checkpointer.clone());
SQLite (Production)
use adk_graph::checkpoint::SqliteCheckpointer;
let checkpointer = SqliteCheckpointer::new("checkpoints.db").await?;
let graph = StateGraph::with_channels(&["task", "result"])
// ... nodes and edges
.compile()?
.with_checkpointer(checkpointer);
Historique des sauvegardes (Voyage dans le temps)
// List all checkpoints for a thread
let checkpoints = checkpointer.list("thread-id").await?;
for cp in checkpoints {
println!("Step {}: {:?}", cp.step, cp.state.get("status"));
}
// Load a specific checkpoint
if let Some(checkpoint) = checkpointer.load_by_id(&checkpoint_id).await? {
println!("State at step {}: {:?}", checkpoint.step, checkpoint.state);
}
Intervention humaine
Mettre l'exécution en pause pour une approbation humaine à l'aide d'interruptions dynamiques :
use adk_graph::{error::GraphError, node::NodeOutput};
// Planner agent assesses risk
let planner_node = AgentNode::new(planner_agent)
.with_output_mapper(|events| {
let mut updates = std::collections::HashMap::new();
for event in events {
if let Some(content) = event.content() {
let text: String = content.parts.iter()
.filter_map(|p| p.text())
.collect::<Vec<_>>()
.join("");
// Extract risk level from LLM response
let risk = if text.to_lowercase().contains("risk: high") { "high" }
else if text.to_lowercase().contains("risk: medium") { "medium" }
else { "low" };
updates.insert("plan".to_string(), json!(text));
updates.insert("risk_level".to_string(), json!(risk));
}
}
updates
});
// Review node with dynamic interrupt
let graph = StateGraph::with_channels(&["task", "plan", "risk_level", "approved", "result"])
.add_node(planner_node)
.add_node(executor_node)
.add_node_fn("review", |ctx| async move {
let risk = ctx.get("risk_level").and_then(|v| v.as_str()).unwrap_or("low");
let approved = ctx.get("approved").and_then(|v| v.as_bool());
// Already approved - continue
if approved == Some(true) {
return Ok(NodeOutput::new());
}
// High/medium risk - interrupt for approval
if risk == "high" || risk == "medium" {
return Ok(NodeOutput::interrupt_with_data(
&format!("{} RISK: Human approval required", risk.to_uppercase()),
json!({
"plan": ctx.get("plan"),
"risk_level": risk,
"action": "Set 'approved' to true to continue"
})
));
}
// Low risk - auto-approve
Ok(NodeOutput::new().with_update("approved", json!(true)))
})
.add_edge(START, "planner")
.add_edge("planner", "review")
.add_edge("review", "executor")
.add_edge("executor", END)
.compile()?
.with_checkpointer_arc(checkpointer.clone());
// Execute - may pause for approval
let thread_id = "task-001";
let result = graph.invoke(input, ExecutionConfig::new(thread_id)).await;
match result {
Err(GraphError::Interrupted(interrupt)) => {
println!("*** EXECUTION PAUSED ***");
println!("Reason: {}", interrupt.interrupt);
println!("Plan awaiting approval: {:?}", interrupt.state.get("plan"));
// Human reviews and approves...
// Update state with approval
graph.update_state(thread_id, [("approved".to_string(), json!(true))]).await?;
// Resume execution
let final_result = graph.invoke(State::new(), ExecutionConfig::new(thread_id)).await?;
println!("Final result: {:?}", final_result.get("result"));
}
Ok(result) => {
println!("Completed without interrupt: {:?}", result);
}
Err(e) => {
println!("Error: {}", e);
}
}
Interruptions statiques
Utilisez interrupt_before ou interrupt_after pour les points de pause obligatoires :
let graph = StateGraph::with_channels(&["task", "plan", "result"])
.add_node(planner_node)
.add_node(executor_node)
.add_edge(START, "planner")
.add_edge("planner", "executor")
.add_edge("executor", END)
.compile()?
.with_interrupt_before(&["executor"]); // Always pause before execution
Exécution en streaming
Diffusez les événements au fur et à mesure de l'exécution du graphe :
use futures::StreamExt;
use adk_graph::stream::StreamMode;
let stream = agent.stream(input, config, StreamMode::Updates);
while let Some(event) = stream.next().await {
match event? {
StreamEvent::NodeStart(name) => println!("Starting: {}", name),
StreamEvent::Updates { node, updates } => {
println!("{} updated state: {:?}", node, updates);
}
StreamEvent::NodeEnd(name) => println!("Completed: {}", name),
StreamEvent::Done(state) => println!("Final state: {:?}", state),
_ => {}
}
}
Modes de streaming
| Mode | Description |
|---|---|
Values | Diffuse l'état complet après chaque nœud |
Updates | Diffuse uniquement les changements d'état |
Messages | Diffuse les mises à jour de type message |
Debug | Diffuse tous les événements internes |
Intégration ADK
GraphAgent implémente le trait ADK Agent, il fonctionne donc avec :
- Runner : Utilisation avec
adk-runnerpour une exécution standard - Callbacks : Support complet des callbacks avant/après
- Sessions : Fonctionne avec
adk-sessionpour l'historique des conversations - Streaming : Renvoie un
EventStreamADK
use adk_runner::Runner;
let graph_agent = GraphAgent::builder("workflow")
.before_agent_callback(|ctx| async {
println!("Starting graph execution for session: {}", ctx.session_id());
Ok(())
})
.after_agent_callback(|ctx, event| async {
if let Some(content) = event.content() {
println!("Graph completed with content");
}
Ok(())
})
// ... graph definition
.build()?;
// GraphAgent implements Agent trait - use with Launcher or Runner
// See adk-runner README for Runner configuration
Exemples
Tous les exemples utilisent une intégration réelle de LLM avec AgentNode :
# Parallel LLM agents with before/after callbacks
cargo run --example graph_agent
# Sequential multi-agent pipeline (extractor → analyzer → formatter)
cargo run --example graph_workflow
# LLM-based sentiment classification and conditional routing
cargo run --example graph_conditional
# ReAct pattern with tools and cyclic execution
cargo run --example graph_react
# Multi-agent supervisor routing to specialists
cargo run --example graph_supervisor
# Human-in-the-loop with risk-based interrupts
cargo run --example graph_hitl
# Checkpointing and time travel debugging
cargo run --example graph_checkpoint
Comparaison avec LangGraph
| Fonctionnalité | LangGraph | adk-graph |
|---|---|---|
| Gestion d'état | TypedDict + Reducers | StateSchema + Reducers |
| Modèle d'exécution | Pregel super-steps | Pregel super-steps |
| Checkpointing | Memory, SQLite, Postgres | Memory, SQLite |
| Humain dans la boucle | interrupt_before/after | interrupt_before/after + dynamic |
| Streaming | 5 modes | 5 modes |
| Cycles | Support natif | Support natif |
| Sûreté des types | Python typing | Rust type system |
| Intégration LLM | LangChain | AgentNode + ADK agents |
Précédent: ← Systèmes Multi-Agents | Suivant: Agents en Temps Réel →