Graph Agents
Komplexe, zustandsbehaftete Workflows mit LangGraph-style Orchestrierung und nativer ADK-Rust Integration erstellen.
Γbersicht
GraphAgent ermΓΆglicht es Ihnen, Workflows als gerichtete Graphen mit Knoten und Kanten zu definieren, die Folgendes unterstΓΌtzen:
- AgentNode: LLM agents als Graphenknoten mit benutzerdefinierten Eingabe-/Ausgabe-Mappern kapseln
- Zyklische Workflows: Native UnterstΓΌtzung fΓΌr Schleifen und iteratives Reasoning (ReAct pattern)
- Bedingtes Routing: Dynamisches Kanten-Routing basierend auf dem Zustand
- Zustandsverwaltung: Typisierter Zustand mit Reducern (overwrite, append, sum, custom)
- Checkpointing: Persistenter Zustand fΓΌr Fehlertoleranz und human-in-the-loop
- Streaming: Mehrere Stream-Modi (values, updates, messages, debug)
Das adk-graph crate bietet LangGraph-style Workflow-Orchestrierung fΓΌr den Aufbau komplexer, zustandsbehafteter Agent-Workflows. Es bringt graph-basierte Workflow-FΓ€higkeiten in das ADK-Rust ecosystem und behΓ€lt dabei die volle KompatibilitΓ€t mit ADK's agent system bei.
Wichtige Vorteile:
- Visuelles Workflow-Design: Komplexe Logik als intuitive Knoten- und Kanten-Graphen definieren
- Parallele AusfΓΌhrung: Mehrere Knoten kΓΆnnen gleichzeitig laufen fΓΌr bessere Performance
- Zustands-Persistenz: Eingebautes Checkpointing fΓΌr Fehlertoleranz und human-in-the-loop
- LLM Integration: Native UnterstΓΌtzung zum Kapseln von ADK agents als Graphenknoten
- Flexibles Routing: Statische Kanten, bedingtes Routing und dynamische Entscheidungsfindung
Was Sie bauen werden
In diesem Leitfaden erstellen Sie eine Textverarbeitungs-Pipeline, die Γbersetzung und Zusammenfassung parallel ausfΓΌhrt:
βββββββββββββββββββββββ
User Input β β
βββββββββββββββββΆ β START β
β β
ββββββββββββ¬βββββββββββ
β
βββββββββββββββββ΄ββββββββββββββββ
β β
βΌ βΌ
ββββββββββββββββββββ ββββββββββββββββββββ
β TRANSLATOR β β SUMMARIZER β
β β β β
β π«π· French β β π One sentence β
β Translation β β Summary β
βββββββββββ¬βββββββββ βββββββββββ¬βββββββββ
β β
βββββββββββββββββ¬ββββββββββββββββ
β
βΌ
βββββββββββββββββββββββ
β COMBINE β
β β
β π Merge Results β
ββββββββββββ¬βββββββββββ
β
βΌ
βββββββββββββββββββββββ
β END β
β β
β β
Complete β
βββββββββββββββββββββββ
SchlΓΌsselkonzepte:
- Nodes β Verarbeitungseinheiten, die Aufgaben ausfΓΌhren (LLM agents, functions oder benutzerdefinierte Logik)
- Edges β Kontrollfluss zwischen Nodes (statische Verbindungen oder bedingtes Routing)
- State β Gemeinsame Daten, die durch den Graphen flieΓen und zwischen Nodes bestehen bleiben
- Parallel Execution β Mehrere Nodes kΓΆnnen gleichzeitig fΓΌr bessere Leistung ausgefΓΌhrt werden
Die Kernkomponenten verstehen
π§ Nodes: Die Arbeiter Nodes sind der Ort, an dem die eigentliche Arbeit stattfindet. Jeder Node kann:
- AgentNode: Einen LLM agent umschlieΓen, um natΓΌrliche Sprache zu verarbeiten
- Function Node: Benutzerdefinierten Rust code zur Datenverarbeitung ausfΓΌhren
- Built-in Nodes: Vordefinierte Logik wie ZΓ€hler oder Validatoren verwenden
Stellen Sie sich Nodes als spezialisierte Arbeiter in einer Montagelinie vor β jeder hat eine spezifische Aufgabe und Expertise.
π Edges: Die Ablaufsteuerung Edges bestimmen, wie die AusfΓΌhrung durch Ihren Graphen verlΓ€uft:
- Static Edges: Direkte Verbindungen (
A β B β C) - Conditional Edges: Dynamisches Routing basierend auf dem State (
if sentiment == "positive" β positive_handler) - Parallel Edges: Mehrere Pfade von einem Node (
START β [translator, summarizer])
Edges sind wie Ampeln und Verkehrszeichen, die den Arbeitsfluss lenken.
πΎ State: Der geteilte Speicher State ist ein SchlΓΌssel-Wert-Speicher, aus dem alle Nodes lesen und in den sie schreiben kΓΆnnen:
- Eingabedaten: Anfangsinformationen, die in den Graphen eingespeist werden
- Zwischenergebnisse: Die Ausgabe eines Nodes wird zur Eingabe fΓΌr einen anderen
- Endergebnis: Das vollstΓ€ndige Ergebnis nach der gesamten Verarbeitung
State fungiert wie ein gemeinsames Whiteboard, auf dem Nodes Informationen fΓΌr andere zur Verwendung hinterlassen kΓΆnnen.
β‘ Parallel Execution: Der Geschwindigkeitsschub Wenn mehrere Edges einen Node verlassen, laufen die Ziel-Nodes gleichzeitig:
- Schnellere Verarbeitung: UnabhΓ€ngige Aufgaben werden gleichzeitig ausgefΓΌhrt
- Ressourceneffizienz: Bessere Auslastung von CPU und E/A
- Skalierbarkeit: BewΓ€ltigung komplexerer Workflows ohne lineare Verlangsamung
Das ist, als wΓΌrden mehrere Arbeiter gleichzeitig verschiedene Teile einer Aufgabe erledigen, anstatt in der Schlange zu warten.
Schnellstart
1. Ihr Projekt erstellen
cargo new graph_demo
cd graph_demo
AbhΓ€ngigkeiten zu Cargo.toml hinzufΓΌgen:
[dependencies]
adk-graph = { version = "0.2", features = ["sqlite"] }
adk-agent = "0.2"
adk-model = "0.2"
adk-core = "0.2"
tokio = { version = "1", features = ["full"] }
dotenvy = "0.15"
serde_json = "1.0"
.env mit Ihrem API-SchlΓΌssel erstellen:
echo 'GOOGLE_API_KEY=your-api-key' > .env
2. Beispiel fΓΌr parallele Verarbeitung
Hier ist ein vollstΓ€ndiges, funktionierendes Beispiel, das Text parallel verarbeitet:
use adk_agent::LlmAgentBuilder;
use adk_graph::{
agent::GraphAgent,
edge::{END, START},
node::{AgentNode, ExecutionConfig, NodeOutput},
state::State,
};
use adk_model::GeminiModel;
use serde_json::json;
use std::sync::Arc;
#[tokio::main]
async fn main() -> anyhow::Result<()> {
dotenvy::dotenv().ok();
let api_key = std::env::var("GOOGLE_API_KEY")?;
let model = Arc::new(GeminiModel::new(&api_key, "gemini-2.0-flash")?);
// Create specialized LLM agents
let translator_agent = Arc::new(
LlmAgentBuilder::new("translator")
.description("Translates text to French")
.model(model.clone())
.instruction("Translate the input text to French. Only output the translation.")
.build()?,
);
let summarizer_agent = Arc::new(
LlmAgentBuilder::new("summarizer")
.description("Summarizes text")
.model(model.clone())
.instruction("Summarize the input text in one sentence.")
.build()?,
);
// Wrap agents as graph nodes with input/output mappers
let translator_node = AgentNode::new(translator_agent)
.with_input_mapper(|state| {
let text = state.get("input").and_then(|v| v.as_str()).unwrap_or("");
adk_core::Content::new("user").with_text(text)
})
.with_output_mapper(|events| {
let mut updates = std::collections::HashMap::new();
for event in events {
if let Some(content) = event.content() {
let text: String = content.parts.iter()
.filter_map(|p| p.text())
.collect::<Vec<_>>()
.join("");
if !text.is_empty() {
updates.insert("translation".to_string(), json!(text));
}
}
}
updates
});
let summarizer_node = AgentNode::new(summarizer_agent)
.with_input_mapper(|state| {
let text = state.get("input").and_then(|v| v.as_str()).unwrap_or("");
adk_core::Content::new("user").with_text(text)
})
.with_output_mapper(|events| {
let mut updates = std::collections::HashMap::new();
for event in events {
if let Some(content) = event.content() {
let text: String = content.parts.iter()
.filter_map(|p| p.text())
.collect::<Vec<_>>()
.join("");
if !text.is_empty() {
updates.insert("summary".to_string(), json!(text));
}
}
}
updates
});
// Build the graph with parallel execution
let agent = GraphAgent::builder("text_processor")
.description("Processes text with translation and summarization in parallel")
.channels(&["input", "translation", "summary", "result"])
.node(translator_node)
.node(summarizer_node)
.node_fn("combine", |ctx| async move {
let translation = ctx.get("translation").and_then(|v| v.as_str()).unwrap_or("N/A");
let summary = ctx.get("summary").and_then(|v| v.as_str()).unwrap_or("N/A");
let result = format!(
"=== Processing Complete ===\n\n\
French Translation:\n{}\n\n\
Summary:\n{}",
translation, summary
);
Ok(NodeOutput::new().with_update("result", json!(result)))
})
// Parallel execution: both nodes start simultaneously
.edge(START, "translator")
.edge(START, "summarizer")
.edge("translator", "combine")
.edge("summarizer", "combine")
.edge("combine", END)
.build()?;
// Execute the graph
let mut input = State::new();
input.insert("input".to_string(), json!("AI is transforming how we work and live."));
let result = agent.invoke(input, ExecutionConfig::new("thread-1")).await?;
println!("{}", result.get("result").and_then(|v| v.as_str()).unwrap_or(""));
Ok(())
}
Beispielausgabe:
=== Processing Complete ===
French Translation:
L'IA transforme notre faΓ§on de travailler et de vivre.
Summary:
AI is revolutionizing work and daily life through technological transformation.
Wie die Graph-AusfΓΌhrung funktioniert
Das Gesamtbild
Graph agents werden in Super-Schritten ausgefΓΌhrt β alle bereiten Knoten laufen parallel, dann wartet der Graph, bis alle abgeschlossen sind, bevor der nΓ€chste Schritt erfolgt:
Step 1: START βββ¬βββΆ translator (running)
ββββΆ summarizer (running)
β³ Wait for both to complete...
Step 2: translator βββ¬βββΆ combine (running)
summarizer βββ
β³ Wait for combine to complete...
Step 3: combine βββΆ END β
Zustandsfluss durch Knoten
Jeder Knoten kann den gemeinsamen Zustand lesen und schreiben:
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β SCHRITT 1: Initialer Zustand β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β β
β State: { "input": "AI is transforming how we work" } β
β β
β β β
β β
β ββββββββββββββββββββ ββββββββββββββββββββ β
β β translator β β summarizer β β
β β liest "input" β β liest "input" β β
β ββββββββββββββββββββ ββββββββββββββββββββ β
β β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β SCHRITT 2: Nach paralleler AusfΓΌhrung β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β β
β State: { β
β "input": "AI is transforming how we work", β
β "translation": "L'IA transforme notre faΓ§on de travailler", β
β "summary": "AI is revolutionizing work through technology" β
β } β
β β
β β β
β β
β ββββββββββββββββββββββββββββββββββββββββ β
β β combine β β
β β liest "translation" + "summary" β β
β β schreibt "result" β β
β ββββββββββββββββββββββββββββββββββββββββ β
β β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β SCHRITT 3: EndgΓΌltiger Zustand β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β β
β State: { β
β "input": "AI is transforming how we work", β
β "translation": "L'IA transforme notre faΓ§on de travailler", β
β "summary": "AI is revolutionizing work through technology", β
β "result": "=== Processing Complete ===\n\nFrench..." β
β } β
β β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
Was es zum Laufen bringt
| Komponente | Rolle |
|---|---|
AgentNode | UmschlieΓt LLM agents mit input/output mappers |
input_mapper | Transformiert den Zustand β agent input Content |
output_mapper | Transformiert agent events β Zustandsaktualisierungen |
channels | Deklariert die Zustandsfelder, die der Graph verwenden wird |
edge() | Definiert den AusfΓΌhrungsfluss zwischen Knoten |
ExecutionConfig | Bietet eine thread ID fΓΌr die Checkpoint-Erstellung |
Bedingtes Routing mit LLM-Klassifizierung
Erstellen Sie intelligente Routing-Systeme, bei denen LLMs den AusfΓΌhrungspfad bestimmen:
Visuell: Stimmungsbasiertes Routing
βββββββββββββββββββββββ
Benutzer-Feedback β β
βββββββββββββββββΆ β KLASSIFIZIERER β
β π§ Ton analysieren β
ββββββββββββ¬βββββββββββ
β
βββββββββββββββββΌββββββββββββββββ
β β β
βΌ βΌ βΌ
ββββββββββββββββββββ ββββββββββββββββββββ ββββββββββββββββββββ
β POSITIV β β NEGATIV β β NEUTRAL β
β β β β β β
β π Danke schΓΆn! β β π Entschuldigen β β π Mehr fragen β
β Feiern β β Helfen beheben β β Fragen β
ββββββββββββββββββββ ββββββββββββββββββββ ββββββββββββββββββββ
VollstΓ€ndiger Beispielcode
use adk_agent::LlmAgentBuilder;
use adk_graph::{
edge::{END, Router, START},
graph::StateGraph,
node::{AgentNode, ExecutionConfig},
state::State,
};
use adk_model::GeminiModel;
use serde_json::json;
use std::sync::Arc;
#[tokio::main]
async fn main() -> anyhow::Result<()> {
dotenvy::dotenv().ok();
let api_key = std::env::var("GOOGLE_API_KEY")?;
let model = Arc::new(GeminiModel::new(&api_key, "gemini-2.0-flash")?);
// Klassifizierer-Agent erstellen
let classifier_agent = Arc::new(
LlmAgentBuilder::new("classifier")
.description("Klassifiziert die Textstimmung")
.model(model.clone())
.instruction(
"Sie sind ein Stimmungs-Klassifizierer. Analysieren Sie den Eingabetext und antworten Sie mit \
NUR einem Wort: 'positive', 'negative' oder 'neutral'. Nichts sonst.",
)
.build()?,
);
// Antwort-Agents fΓΌr jede Stimmung erstellen
let positive_agent = Arc::new(
LlmAgentBuilder::new("positive")
.description("Behandelt positives Feedback")
.model(model.clone())
.instruction(
"Sie sind ein Kundenerfolgs-Spezialist. Der Kunde hat positives Feedback gegeben. \
DrΓΌcken Sie Ihre Dankbarkeit aus, verstΓ€rken Sie das positive Erlebnis und schlagen Sie MΓΆglichkeiten vor, die Erfahrung zu teilen. Seien Sie herzlich und dankbar. Halten Sie die Antwort unter 3 SΓ€tzen.",
)
.build()?,
);
let negative_agent = Arc::new(
LlmAgentBuilder::new("negative")
.description("Behandelt negatives Feedback")
.model(model.clone())
.instruction(
"Sie sind ein Kundensupport-Spezialist. Der Kunde hat eine Beschwerde. \
Erkennen Sie die Frustration an, entschuldigen Sie sich aufrichtig und bieten Sie Hilfe an. \
Seien Sie empathisch. Halten Sie die Antwort unter 3 SΓ€tzen.",
)
.build()?,
);
let neutral_agent = Arc::new(
LlmAgentBuilder::new("neutral")
.description("Behandelt neutrales Feedback")
.model(model.clone())
.instruction(
"Sie sind ein Kundendienstmitarbeiter. Der Kunde hat neutrales Feedback gegeben. \
Stellen Sie klΓ€rende Fragen, um die BedΓΌrfnisse besser zu verstehen. Seien Sie hilfsbereit und neugierig. \
Halten Sie die Antwort unter 3 SΓ€tzen.",
)
.build()?,
);
// AgentNodes mit Mappern erstellen
let classifier_node = AgentNode::new(classifier_agent)
.with_input_mapper(|state| {
let text = state.get("feedback").and_then(|v| v.as_str()).unwrap_or("");
adk_core::Content::new("user").with_text(text)
})
.with_output_mapper(|events| {
let mut updates = std::collections::HashMap::new();
for event in events {
if let Some(content) = event.content() {
let text: String = content.parts.iter()
.filter_map(|p| p.text())
.collect::<Vec<_>>()
.join("")
.to_lowercase()
.trim()
.to_string();
let sentiment = if text.contains("positive") { "positive" }
else if text.contains("negative") { "negative" }
else { "neutral" };
updates.insert("sentiment".to_string(), json!(sentiment));
}
}
updates
});
// Antwortknoten (Γ€hnliches Muster fΓΌr jeden)
let positive_node = AgentNode::new(positive_agent)
.with_input_mapper(|state| {
let text = state.get("feedback").and_then(|v| v.as_str()).unwrap_or("");
adk_core::Content::new("user").with_text(text)
})
.with_output_mapper(|events| {
let mut updates = std::collections::HashMap::new();
for event in events {
if let Some(content) = event.content() {
let text: String = content.parts.iter()
.filter_map(|p| p.text())
.collect::<Vec<_>>()
.join("");
updates.insert("response".to_string(), json!(text));
}
}
updates
});
// Graph mit bedingtem Routing erstellen
let graph = StateGraph::with_channels(&["feedback", "sentiment", "response"])
.add_node(classifier_node)
.add_node(positive_node)
// ... negative_node und neutral_node Γ€hnlich hinzufΓΌgen
.add_edge(START, "classifier")
.add_conditional_edges(
"classifier",
Router::by_field("sentiment"), // Routing basierend auf dem Stimmungsfeld
[
("positive", "positive"),
("negative", "negative"),
("neutral", "neutral"),
],
)
.add_edge("positive", END)
.add_edge("negative", END)
.add_edge("neutral", END)
.compile()?;
// Mit unterschiedlichem Feedback testen
let mut input = State::new();
input.insert("feedback".to_string(), json!("Your product is amazing! I love it!"));
let result = graph.invoke(input, ExecutionConfig::new("feedback-1")).await?;
println!("Stimmung: {}", result.get("sentiment").and_then(|v| v.as_str()).unwrap_or(""));
println!("Antwort: {}", result.get("response").and_then(|v| v.as_str()).unwrap_or(""));
Ok(())
}
Beispielablauf:
Input: "Ihr Produkt ist fantastisch! Ich liebe es!"
β
Classifier: "positive"
β
Positive Agent: "Vielen Dank fΓΌr das wunderbare Feedback!
Wir freuen uns sehr, dass Ihnen unser Produkt gefΓ€llt.
WΓΌrden Sie in ErwΓ€gung ziehen, eine Bewertung zu hinterlassen, um anderen zu helfen?"
ReAct-Muster: Reasoning + Acting
Erstellen Sie Agents, die Tools iterativ nutzen kΓΆnnen, um komplexe Probleme zu lΓΆsen:
Visualisierung: ReAct-Zyklus
βββββββββββββββββββββββ
User Question β β
βββββββββββββββββΆ β REASONER β
β π§ Think + Act β
ββββββββββββ¬βββββββββββ
β
βΌ
βββββββββββββββββββββββ
β Has tool calls? β
β β
ββββββββββββ¬βββββββββββ
β
βββββββββββββββββ΄ββββββββββββββββ
β β
βΌ βΌ
ββββββββββββββββββββ ββββββββββββββββββββ
β YES β β NO β
β β β β
β π Loop back β β β
Final answer β
β to reasoner β β END β
βββββββββββ¬βββββββββ ββββββββββββββββββββ
β
βββββββββββββββββββ
β
βΌ
βββββββββββββββββββββββ
β REASONER β
β π§ Think + Act β
β (next iteration) β
βββββββββββββββββββββββ
VollstΓ€ndiges ReAct-Beispiel
use adk_agent::LlmAgentBuilder;
use adk_core::{Part, Tool};
use adk_graph::{
edge::{END, START},
graph::StateGraph,
node::{AgentNode, ExecutionConfig, NodeOutput},
state::State,
};
use adk_model::GeminiModel;
use adk_tool::FunctionTool;
use serde_json::json;
use std::sync::Arc;
#[tokio::main]
async fn main() -> anyhow::Result<()> {
dotenvy::dotenv().ok();
let api_key = std::env::var("GOOGLE_API_KEY")?;
let model = Arc::new(GeminiModel::new(&api_key, "gemini-2.0-flash")?);
// Create tools
let weather_tool = Arc::new(FunctionTool::new(
"get_weather",
"Get the current weather for a location. Takes a 'location' parameter (city name).",
|_ctx, args| async move {
let location = args.get("location").and_then(|v| v.as_str()).unwrap_or("unknown");
Ok(json!({
"location": location,
"temperature": "72Β°F",
"condition": "Sunny",
"humidity": "45%"
}))
},
)) as Arc<dyn Tool>;
let calculator_tool = Arc::new(FunctionTool::new(
"calculator",
"Perform mathematical calculations. Takes an 'expression' parameter (string).",
|_ctx, args| async move {
let expr = args.get("expression").and_then(|v| v.as_str()).unwrap_or("0");
let result = match expr {
"2 + 2" => "4",
"10 * 5" => "50",
"100 / 4" => "25",
"15 - 7" => "8",
_ => "Unable to evaluate",
};
Ok(json!({ "result": result, "expression": expr }))
},
)) as Arc<dyn Tool>;
// Create reasoner agent with tools
let reasoner_agent = Arc::new(
LlmAgentBuilder::new("reasoner")
.description("Reasoning agent with tools")
.model(model.clone())
.instruction(
"You are a helpful assistant with access to tools. Use tools when needed to answer questions. \
When you have enough information, provide a final answer without using more tools.",
)
.tool(weather_tool)
.tool(calculator_tool)
.build()?,
);
// Create reasoner node that detects tool usage
let reasoner_node = AgentNode::new(reasoner_agent)
.with_input_mapper(|state| {
let question = state.get("question").and_then(|v| v.as_str()).unwrap_or("");
adk_core::Content::new("user").with_text(question)
})
.with_output_mapper(|events| {
let mut updates = std::collections::HashMap::new();
let mut has_tool_calls = false;
let mut response = String::new();
for event in events {
if let Some(content) = event.content() {
for part in &content.parts {
match part {
Part::FunctionCall { .. } => {
has_tool_calls = true;
}
Part::Text { text } => {
response.push_str(text);
}
_ => {}
}
}
}
}
updates.insert("has_tool_calls".to_string(), json!(has_tool_calls));
updates.insert("response".to_string(), json!(response));
updates
});
// Build ReAct graph with cycle
let graph = StateGraph::with_channels(&["question", "has_tool_calls", "response", "iteration"])
.add_node(reasoner_node)
.add_node_fn("counter", |ctx| async move {
let i = ctx.get("iteration").and_then(|v| v.as_i64()).unwrap_or(0);
Ok(NodeOutput::new().with_update("iteration", json!(i + 1)))
})
.add_edge(START, "counter")
.add_edge("counter", "reasoner")
.add_conditional_edges(
"reasoner",
|state| {
let has_tools = state.get("has_tool_calls").and_then(|v| v.as_bool()).unwrap_or(false);
let iteration = state.get("iteration").and_then(|v| v.as_i64()).unwrap_or(0);
// Safety limit
if iteration >= 5 { return END.to_string(); }
if has_tools {
"counter".to_string() // Loop back for more reasoning
} else {
END.to_string() // Done - final answer
}
},
[("counter", "counter"), (END, END)],
)
.compile()?
.with_recursion_limit(10);
// Test the ReAct agent
let mut input = State::new();
input.insert("question".to_string(), json!("What's the weather in Paris and what's 15 + 25?"));
let result = graph.invoke(input, ExecutionConfig::new("react-1")).await?;
println!("Final answer: {}", result.get("response").and_then(|v| v.as_str()).unwrap_or(""));
println!("Iterations: {}", result.get("iteration").and_then(|v| v.as_i64()).unwrap_or(0));
Ok(())
}
Beispielablauf:
Question: "What's the weather in Paris and what's 15 + 25?"
Iteration 1:
Reasoner: "I need to get weather info and do math"
β Calls get_weather(location="Paris") and calculator(expression="15 + 25")
β has_tool_calls = true β Loop back
Iteration 2:
Reasoner: "Based on the results: Paris is 72Β°F and sunny, 15 + 25 = 40"
β No tool calls β has_tool_calls = false β END
Final Answer: "The weather in Paris is 72Β°F and sunny with 45% humidity.
And 15 + 25 equals 40."
AgentNode
UmschlieΓt jeden ADK Agent (typischerweise LlmAgent) als Graphknoten:
let node = AgentNode::new(llm_agent)
.with_input_mapper(|state| {
// Transform graph state to agent input Content
let text = state.get("input").and_then(|v| v.as_str()).unwrap_or("");
adk_core::Content::new("user").with_text(text)
})
.with_output_mapper(|events| {
// Transform agent events to state updates
let mut updates = std::collections::HashMap::new();
for event in events {
if let Some(content) = event.content() {
let text: String = content.parts.iter()
.filter_map(|p| p.text())
.collect::<Vec<_>>()
.join("");
updates.insert("output".to_string(), json!(text));
}
}
updates
});
Funktionsknoten
Einfache async-Funktionen, die den state verarbeiten:
.node_fn("process", |ctx| async move {
let input = ctx.state.get("input").unwrap();
let output = process_data(input).await?;
Ok(NodeOutput::new().with_update("output", output))
})
Kantentypen
Statische Kanten
Direkte Verbindungen zwischen Knoten:
.edge(START, "first_node")
.edge("first_node", "second_node")
.edge("second_node", END)
Bedingte Kanten
Dynamische Weiterleitung basierend auf dem Zustand:
.conditional_edge(
"router",
|state| {
match state.get("next").and_then(|v| v.as_str()) {
Some("research") => "research_node".to_string(),
Some("write") => "write_node".to_string(),
_ => END.to_string(),
}
},
[
("research_node", "research_node"),
("write_node", "write_node"),
(END, END),
],
)
Router-Helfer
Verwenden Sie eingebaute Router fΓΌr gΓ€ngige Muster:
use adk_graph::edge::Router;
// Weiterleiten basierend auf einem Zustandswertfeld
.conditional_edge("classifier", Router::by_field("sentiment"), [
("positive", "positive_handler"),
("negative", "negative_handler"),
("neutral", "neutral_handler"),
])
// Weiterleiten basierend auf einem Booleschen Feld
.conditional_edge("check", Router::by_bool("approved"), [
("true", "execute"),
("false", "reject"),
])
// Iterationen begrenzen
.conditional_edge("loop", Router::max_iterations("count", 5), [
("continue", "process"),
("done", END),
])
Parallele AusfΓΌhrung
Mehrere Kanten von einem einzelnen Knoten werden parallel ausgefΓΌhrt:
let agent = GraphAgent::builder("parallel_processor")
.channels(&["input", "translation", "summary", "analysis"])
.node(translator_node)
.node(summarizer_node)
.node(analyzer_node)
.node(combiner_node)
// Alle drei starten gleichzeitig
.edge(START, "translator")
.edge(START, "summarizer")
.edge(START, "analyzer")
// Warten, bis alle abgeschlossen sind, bevor sie kombiniert werden
.edge("translator", "combiner")
.edge("summarizer", "combiner")
.edge("analyzer", "combiner")
.edge("combiner", END)
.build()?;
Zykliche Graphen (ReAct-Muster)
Erstellen Sie iterative Reasoning Agents mit Zyklen:
use adk_core::Part;
// Create agent with tools
let reasoner = Arc::new(
LlmAgentBuilder::new("reasoner")
.model(model)
.instruction("Use tools to answer questions. Provide final answer when done.")
.tool(search_tool)
.tool(calculator_tool)
.build()?
);
let reasoner_node = AgentNode::new(reasoner)
.with_input_mapper(|state| {
let question = state.get("question").and_then(|v| v.as_str()).unwrap_or("");
adk_core::Content::new("user").with_text(question)
})
.with_output_mapper(|events| {
let mut updates = std::collections::HashMap::new();
let mut has_tool_calls = false;
let mut response = String::new();
for event in events {
if let Some(content) = event.content() {
for part in &content.parts {
match part {
Part::FunctionCall { name, .. } => {
has_tool_calls = true;
}
Part::Text { text } => {
response.push_str(text);
}
_ => {}
}
}
}
}
updates.insert("has_tool_calls".to_string(), json!(has_tool_calls));
updates.insert("response".to_string(), json!(response));
updates
});
// Build graph with cycle
let react_agent = StateGraph::with_channels(&["question", "has_tool_calls", "response", "iteration"])
.add_node(reasoner_node)
.add_node_fn("counter", |ctx| async move {
let i = ctx.get("iteration").and_then(|v| v.as_i64()).unwrap_or(0);
Ok(NodeOutput::new().with_update("iteration", json!(i + 1)))
})
.add_edge(START, "counter")
.add_edge("counter", "reasoner")
.add_conditional_edges(
"reasoner",
|state| {
let has_tools = state.get("has_tool_calls").and_then(|v| v.as_bool()).unwrap_or(false);
let iteration = state.get("iteration").and_then(|v| v.as_i64()).unwrap_or(0);
// Safety limit
if iteration >= 5 { return END.to_string(); }
if has_tools {
"counter".to_string() // Loop back
} else {
END.to_string() // Done
}
},
[("counter", "counter"), (END, END)],
)
.compile()?
.with_recursion_limit(10);
Multi-Agenten-Supervisor
Leite Aufgaben an spezialisierte Agenten weiter:
// Create supervisor agent
let supervisor = Arc::new(
LlmAgentBuilder::new("supervisor")
.model(model.clone())
.instruction("Route tasks to: researcher, writer, or coder. Reply with agent name only.")
.build()?
);
let supervisor_node = AgentNode::new(supervisor)
.with_output_mapper(|events| {
let mut updates = std::collections::HashMap::new();
for event in events {
if let Some(content) = event.content() {
let text: String = content.parts.iter()
.filter_map(|p| p.text())
.collect::<Vec<_>>()
.join("")
.to_lowercase();
let next = if text.contains("researcher") { "researcher" }
else if text.contains("writer") { "writer" }
else if text.contains("coder") { "coder" }
else { "done" };
updates.insert("next_agent".to_string(), json!(next));
}
}
updates
});
// Build supervisor graph
let graph = StateGraph::with_channels(&["task", "next_agent", "research", "content", "code"])
.add_node(supervisor_node)
.add_node(researcher_node)
.add_node(writer_node)
.add_node(coder_node)
.add_edge(START, "supervisor")
.add_conditional_edges(
"supervisor",
Router::by_field("next_agent"),
[
("researcher", "researcher"),
("writer", "writer"),
("coder", "coder"),
("done", END),
],
)
// Agents report back to supervisor
.add_edge("researcher", "supervisor")
.add_edge("writer", "supervisor")
.add_edge("coder", "supervisor")
.compile()?;
Zustandsverwaltung
Zustandsschema mit Reducern
Kontrolliere, wie Zustandsaktualisierungen zusammengefΓΌhrt werden:
let schema = StateSchema::builder()
.channel("current_step") // Γberschreiben (Standard)
.list_channel("messages") // An Liste anhΓ€ngen
.channel_with_reducer("count", Reducer::Sum) // Werte summieren
.channel_with_reducer("data", Reducer::Custom(Arc::new(|old, new| {
// Benutzerdefinierte ZusammenfΓΌhrungslogik
merge_json(old, new)
})))
.build();
let agent = GraphAgent::builder("stateful")
.state_schema(schema)
// ... nodes and edges
.build()?;
Reducer-Typen
| Reducer | Verhalten |
|---|---|
Overwrite | Ersetzt alten Wert durch neuen (Standard) |
Append | FΓΌgt an Liste an |
Sum | Addiert numerische Werte |
Custom | Benutzerdefinierte ZusammenfΓΌhrungsfunktion |
Checkpointing
ErmΓΆglicht persistenten Zustand fΓΌr Fehlertoleranz und Human-in-the-Loop:
Im Speicher (Entwicklung)
use adk_graph::checkpoint::MemoryCheckpointer;
let checkpointer = Arc::new(MemoryCheckpointer::new());
let graph = StateGraph::with_channels(&["task", "result"])
// ... nodes and edges
.compile()?
.with_checkpointer_arc(checkpointer.clone());
SQLite (Produktion)
use adk_graph::checkpoint::SqliteCheckpointer;
let checkpointer = SqliteCheckpointer::new("checkpoints.db").await?;
let graph = StateGraph::with_channels(&["task", "result"])
// ... nodes and edges
.compile()?
.with_checkpointer(checkpointer);
Checkpoint-Verlauf (Zeitθ·)
// Liste alle Checkpoints fΓΌr einen Thread auf
let checkpoints = checkpointer.list("thread-id").await?;
for cp in checkpoints {
println!("Step {}: {:?}", cp.step, cp.state.get("status"));
}
// Lade einen spezifischen Checkpoint
if let Some(checkpoint) = checkpointer.load_by_id(&checkpoint_id).await? {
println!("State at step {}: {:?}", checkpoint.step, checkpoint.state);
}
Human-in-the-Loop
Pausieren Sie die AusfΓΌhrung fΓΌr die Genehmigung durch einen Menschen mithilfe dynamischer Unterbrechungen:
use adk_graph::{error::GraphError, node::NodeOutput};
// Planner agent assesses risk
let planner_node = AgentNode::new(planner_agent)
.with_output_mapper(|events| {
let mut updates = std::collections::HashMap::new();
for event in events {
if let Some(content) = event.content() {
let text: String = content.parts.iter()
.filter_map(|p| p.text())
.collect::<Vec<_>>()
.join("");
// Extract risk level from LLM response
let risk = if text.to_lowercase().contains("risk: high") { "high" }
else if text.to_lowercase().contains("risk: medium") { "medium" }
else { "low" };
updates.insert("plan".to_string(), json!(text));
updates.insert("risk_level".to_string(), json!(risk));
}
}
updates
});
// Review node with dynamic interrupt
let graph = StateGraph::with_channels(&["task", "plan", "risk_level", "approved", "result"])
.add_node(planner_node)
.add_node(executor_node)
.add_node_fn("review", |ctx| async move {
let risk = ctx.get("risk_level").and_then(|v| v.as_str()).unwrap_or("low");
let approved = ctx.get("approved").and_then(|v| v.as_bool());
// Already approved - continue
if approved == Some(true) {
return Ok(NodeOutput::new());
}
// High/medium risk - interrupt for approval
if risk == "high" || risk == "medium" {
return Ok(NodeOutput::interrupt_with_data(
&format!("{} RISK: Human approval required", risk.to_uppercase()),
json!({
"plan": ctx.get("plan"),
"risk_level": risk,
"action": "Set 'approved' to true to continue"
})
));
}
// Low risk - auto-approve
Ok(NodeOutput::new().with_update("approved", json!(true)))
})
.add_edge(START, "planner")
.add_edge("planner", "review")
.add_edge("review", "executor")
.add_edge("executor", END)
.compile()?
.with_checkpointer_arc(checkpointer.clone());
// Execute - may pause for approval
let thread_id = "task-001";
let result = graph.invoke(input, ExecutionConfig::new(thread_id)).await;
match result {
Err(GraphError::Interrupted(interrupt)) => {
println!("*** EXECUTION PAUSED ***");
println!("Reason: {}", interrupt.interrupt);
println!("Plan awaiting approval: {:?}", interrupt.state.get("plan"));
// Human reviews and approves...
// Update state with approval
graph.update_state(thread_id, [("approved".to_string(), json!(true))]).await?;
// Resume execution
let final_result = graph.invoke(State::new(), ExecutionConfig::new(thread_id)).await?;
println!("Final result: {:?}", final_result.get("result"));
}
Ok(result) => {
println!("Completed without interrupt: {:?}", result);
}
Err(e) => {
println!("Error: {}", e);
}
}
Static Interrupts
Verwenden Sie interrupt_before oder interrupt_after fΓΌr obligatorische Pausenpunkte:
let graph = StateGraph::with_channels(&["task", "plan", "result"])
.add_node(planner_node)
.add_node(executor_node)
.add_edge(START, "planner")
.add_edge("planner", "executor")
.add_edge("executor", END)
.compile()?
.with_interrupt_before(&["executor"]); // Always pause before execution
Streaming-AusfΓΌhrung
Ereignisse streamen, wΓ€hrend der Graph ausgefΓΌhrt wird:
use futures::StreamExt;
use adk_graph::stream::StreamMode;
let stream = agent.stream(input, config, StreamMode::Updates);
while let Some(event) = stream.next().await {
match event? {
StreamEvent::NodeStart(name) => println!("Starting: {}", name),
StreamEvent::Updates { node, updates } => {
println!("{} updated state: {:?}", node, updates);
}
StreamEvent::NodeEnd(name) => println!("Completed: {}", name),
StreamEvent::Done(state) => println!("Final state: {:?}", state),
_ => {}
}
}
Stream-Modi
| Modus | Beschreibung |
|---|---|
Values | Streamt den vollstΓ€ndigen Zustand nach jedem Knoten |
Updates | Streamt nur ZustandsΓ€nderungen |
Messages | Streamt Updates vom Nachrichtentyp |
Debug | Streamt alle internen Ereignisse |
ADK-Integration
GraphAgent implementiert das ADK Agent trait, sodass es mit Folgendem funktioniert:
- Runner: Verwendung mit
adk-runnerfΓΌr die StandardausfΓΌhrung - Callbacks: Volle UnterstΓΌtzung fΓΌr before/after-Callbacks
- Sessions: Funktioniert mit
adk-sessionfΓΌr den Konversationsverlauf - Streaming: Gibt ADK
EventStreamzurΓΌck
use adk_runner::Runner;
let graph_agent = GraphAgent::builder("workflow")
.before_agent_callback(|ctx| async {
println!("Starting graph execution for session: {}", ctx.session_id());
Ok(())
})
.after_agent_callback(|ctx, event| async {
if let Some(content) = event.content() {
println!("Graph completed with content");
}
Ok(())
})
// ... graph definition
.build()?;
// GraphAgent implements Agent trait - use with Launcher or Runner
// See adk-runner README for Runner configuration
Beispiele
Alle Beispiele verwenden eine echte LLM-Integration mit AgentNode:
# Parallel LLM agents with before/after callbacks
cargo run --example graph_agent
# Sequential multi-agent pipeline (extractor β analyzer β formatter)
cargo run --example graph_workflow
# LLM-based sentiment classification and conditional routing
cargo run --example graph_conditional
# ReAct pattern with tools and cyclic execution
cargo run --example graph_react
# Multi-agent supervisor routing to specialists
cargo run --example graph_supervisor
# Human-in-the-loop with risk-based interrupts
cargo run --example graph_hitl
# Checkpointing and time travel debugging
cargo run --example graph_checkpoint
Vergleich mit LangGraph
| Funktion | LangGraph | adk-graph |
|---|---|---|
| Zustandsverwaltung | TypedDict + Reducers | StateSchema + Reducers |
| AusfΓΌhrungsmodell | Pregel super-steps | Pregel super-steps |
| Checkpointing | Memory, SQLite, Postgres | Memory, SQLite |
| Mensch-in-der-Schleife | interrupt_before/after | interrupt_before/after + dynamic |
| Streaming | 5 Modi | 5 Modi |
| Zyklen | Native UnterstΓΌtzung | Native UnterstΓΌtzung |
| Typsicherheit | Python typing | Rust type system |
| LLM Integration | LangChain | AgentNode + ADK agents |
ZurΓΌck: β Multi-Agenten-Systeme | Weiter: Echtzeit-Agents β