وكلاء الرسم البياني (Graph Agents)
أنشئ مهام سير عمل معقدة وذات حالة باستخدام تنسيق على غرار LangGraph مع تكامل ADK-Rust الأصلي.
نظرة عامة
يتيح لك GraphAgent تعريف مهام سير العمل كرسوم بيانية موجهة تحتوي على عقد وحواف، ويدعم ما يلي:
- AgentNode: تغليف وكلاء LLM كعقد رسم بياني مع أدوات ربط مخصصة للمدخلات/المخرجات
- مهام سير العمل الدورية (Cyclic Workflows): دعم أصلي للحلقات والتفكير التكراري (نمط ReAct)
- التوجيه الشرطي (Conditional Routing): توجيه حافة ديناميكي بناءً على الحالة
- إدارة الحالة (State Management): حالة ذات نوع مع Reducers (استبدال، إلحاق، جمع، مخصص)
- Checkpointing: حالة مستمرة للتسامح مع الأخطاء وتفاعل الإنسان مع النظام
- Streaming: أوضاع بث متعددة (قيم، تحديثات، رسائل، تصحيح الأخطاء)
توفر حزمة adk-graph تنسيق مهام سير العمل على غرار LangGraph لبناء مهام سير عمل وكلاء معقدة وذات حالة. إنها تجلب إمكانيات مهام سير العمل المستندة إلى الرسوم البيانية إلى نظام ADK-Rust البيئي مع الحفاظ على التوافق الكامل مع نظام وكلاء ADK.
الفوائد الرئيسية:
- تصميم مرئي لمهام سير العمل (Visual Workflow Design): تحديد المنطق المعقد كرسوم بيانية بديهية للعقد والحواف
- التنفيذ المتوازي (Parallel Execution): يمكن تشغيل عقد متعددة في وقت واحد للحصول على أداء أفضل
- استمرارية الحالة (State Persistence): حفظ نقاط التحقق المدمج للتسامح مع الأخطاء وتفاعل الإنسان مع النظام
- تكامل LLM (LLM Integration): دعم أصلي لتغليف وكلاء ADK كعقد رسم بياني
- التوجيه المرن (Flexible Routing): حواف ثابتة، وتوجيه شرطي، واتخاذ قرارات ديناميكي
ما ستقوم ببنائه
في هذا الدليل، ستقوم بإنشاء مسار معالجة نصي يقوم بتشغيل الترجمة والتلخيص بالتوازي:
┌─────────────────────┐
User Input │ │
────────────────▶ │ START │
│ │
└──────────┬──────────┘
│
┌───────────────┴───────────────┐
│ │
▼ ▼
┌──────────────────┐ ┌──────────────────┐
│ TRANSLATOR │ │ SUMMARIZER │
│ │ │ │
│ 🇫🇷 French │ │ 📝 One sentence │
│ Translation │ │ Summary │
└─────────┬────────┘ └─────────┬────────┘
│ │
└───────────────┬───────────────┘
│
▼
┌─────────────────────┐
│ COMBINE │
│ │
│ 📋 Merge Results │
└──────────┬──────────┘
│
▼
┌─────────────────────┐
│ END │
│ │
│ ✅ Complete │
└─────────────────────┘
المفاهيم الأساسية:
- Nodes - وحدات معالجة تؤدي العمل (LLM agents، functions، أو منطق مخصص)
- Edges - تدفق التحكم بين nodes (اتصالات ثابتة أو توجيه شرطي)
- State - بيانات مشتركة تتدفق عبر الرسم البياني وتستمر بين nodes
- Parallel Execution - يمكن لعدة nodes العمل في وقت واحد لتحسين الأداء
فهم المكونات الأساسية
🔧 Nodes: العمال Nodes هي حيث يحدث العمل الفعلي. يمكن لكل node أن:
- AgentNode: يلف LLM agent لمعالجة اللغة الطبيعية
- Function Node: ينفذ كود Rust مخصصًا لمعالجة البيانات
- Built-in Nodes: يستخدم منطقًا محددًا مسبقًا مثل العدادات أو أدوات التحقق
فكر في nodes كعمال متخصصين في خط تجميع - لكل منهم وظيفة وخبرة محددة.
🔀 Edges: التحكم في التدفق Edges تحدد كيفية انتقال التنفيذ عبر الرسم البياني الخاص بك:
- Static Edges: اتصالات مباشرة (
A → B → C) - Conditional Edges: توجيه ديناميكي بناءً على state (
if sentiment == "positive" → positive_handler) - Parallel Edges: مسارات متعددة من node واحد (
START → [translator, summarizer])
Edges تشبه إشارات المرور وعلامات الطريق التي توجه تدفق العمل.
💾 State: الذاكرة المشتركة State هو مخزن قيم-مفاتيح يمكن لجميع nodes القراءة منه والكتابة إليه:
- Input Data: المعلومات الأولية التي يتم تغذيتها في الرسم البياني
- Intermediate Results: المخرجات من node واحد تصبح مدخلات لـ node آخر
- Final Output: النتيجة المكتملة بعد كل المعالجة
يعمل State كلوح أبيض مشترك حيث يمكن لـ nodes ترك المعلومات ليستخدمها الآخرون.
⚡ Parallel Execution: تعزيز السرعة عندما تغادر عدة edges node واحدًا، فإن nodes المستهدفة تعمل بالتوازي:
- Faster Processing: المهام المستقلة تعمل في نفس الوقت
- Resource Efficiency: استخدام أفضل لوحدة المعالجة المركزية (CPU) والإدخال/الإخراج (I/O)
- Scalability: التعامل مع سير عمل أكثر تعقيدًا دون تباطؤ خطي
هذا يشبه وجود عدة عمال يتعاملون مع أجزاء مختلفة من المهمة في وقت واحد بدلاً من الانتظار في طابور.
بدء سريع
1. إنشاء مشروعك
cargo new graph_demo
cd graph_demo
أضف التبعيات إلى Cargo.toml:
[dependencies]
adk-graph = { version = "0.2", features = ["sqlite"] }
adk-agent = "0.2"
adk-model = "0.2"
adk-core = "0.2"
tokio = { version = "1", features = ["full"] }
dotenvy = "0.15"
serde_json = "1.0"
أنشئ ملف .env باستخدام مفتاح API الخاص بك:
echo 'GOOGLE_API_KEY=your-api-key' > .env
2. مثال المعالجة المتوازية
إليك مثال عملي كامل يعالج النص بشكل متوازٍ:
use adk_agent::LlmAgentBuilder;
use adk_graph::{
agent::GraphAgent,
edge::{END, START},
node::{AgentNode, ExecutionConfig, NodeOutput},
state::State,
};
use adk_model::GeminiModel;
use serde_json::json;
use std::sync::Arc;
#[tokio::main]
async fn main() -> anyhow::Result<()> {
dotenvy::dotenv().ok();
let api_key = std::env::var("GOOGLE_API_KEY")?;
let model = Arc::new(GeminiModel::new(&api_key, "gemini-2.0-flash")?);
// Create specialized LLM agents
let translator_agent = Arc::new(
LlmAgentBuilder::new("translator")
.description("Translates text to French")
.model(model.clone())
.instruction("Translate the input text to French. Only output the translation.")
.build()?,
);
let summarizer_agent = Arc::new(
LlmAgentBuilder::new("summarizer")
.description("Summarizes text")
.model(model.clone())
.instruction("Summarize the input text in one sentence.")
.build()?,
);
// Wrap agents as graph nodes with input/output mappers
let translator_node = AgentNode::new(translator_agent)
.with_input_mapper(|state| {
let text = state.get("input").and_then(|v| v.as_str()).unwrap_or("");
adk_core::Content::new("user").with_text(text)
})
.with_output_mapper(|events| {
let mut updates = std::collections::HashMap::new();
for event in events {
if let Some(content) = event.content() {
let text: String = content.parts.iter()
.filter_map(|p| p.text())
.collect::<Vec<_>>()
.join("");
if !text.is_empty() {
updates.insert("translation".to_string(), json!(text));
}
}
}
updates
});
let summarizer_node = AgentNode::new(summarizer_agent)
.with_input_mapper(|state| {
let text = state.get("input").and_then(|v| v.as_str()).unwrap_or("");
adk_core::Content::new("user").with_text(text)
})
.with_output_mapper(|events| {
let mut updates = std::collections::HashMap::new();
for event in events {
if let Some(content) = event.content() {
let text: String = content.parts.iter()
.filter_map(|p| p.text())
.collect::<Vec<_>>()
.join("");
if !text.is_empty() {
updates.insert("summary".to_string(), json!(text));
}
}
}
updates
});
// Build the graph with parallel execution
let agent = GraphAgent::builder("text_processor")
.description("Processes text with translation and summarization in parallel")
.channels(&["input", "translation", "summary", "result"])
.node(translator_node)
.node(summarizer_node)
.node_fn("combine", |ctx| async move {
let translation = ctx.get("translation").and_then(|v| v.as_str()).unwrap_or("N/A");
let summary = ctx.get("summary").and_then(|v| v.as_str()).unwrap_or("N/A");
let result = format!(
"=== Processing Complete ===\n\n\
French Translation:\n{}\n\n\
Summary:\n{}",
translation, summary
);
Ok(NodeOutput::new().with_update("result", json!(result)))
})
// Parallel execution: both nodes start simultaneously
.edge(START, "translator")
.edge(START, "summarizer")
.edge("translator", "combine")
.edge("summarizer", "combine")
.edge("combine", END)
.build()?;
// Execute the graph
let mut input = State::new();
input.insert("input".to_string(), json!("AI is transforming how we work and live."));
let result = agent.invoke(input, ExecutionConfig::new("thread-1")).await?;
println!("{}", result.get("result").and_then(|v| v.as_str()).unwrap_or(""));
Ok(())
}
مثال الإخراج:
=== Processing Complete ===
French Translation:
L'IA transforme notre façon de travailler و de vivre.
Summary:
AI is revolutionizing work and daily life through technological transformation.
كيف يعمل تنفيذ الرسم البياني
الصورة الكبيرة
تنفذ Graph agents في خطوات فائقة - جميع العقد الجاهزة تعمل بالتوازي، ثم ينتظر الرسم البياني حتى تكتمل جميعها قبل الخطوة التالية:
Step 1: START ──┬──▶ translator (running)
└──▶ summarizer (running)
⏳ انتظر حتى يكتمل كلاهما...
Step 2: translator ──┬──▶ combine (running)
summarizer ──┘
⏳ انتظر حتى يكتمل combine...
Step 3: combine ──▶ END ✅
تدفق الحالة عبر العقد
يمكن لكل عقدة القراءة من الحالة المشتركة والكتابة إليها:
┌─────────────────────────────────────────────────────────────────────┐
│ STEP 1: الحالة الأولية │
├─────────────────────────────────────────────────────────────────────┤
│ │
│ State: { "input": "AI is transforming how we work" } │
│ │
│ ↓ │
│ │
│ ┌──────────────────┐ ┌──────────────────┐ │
│ │ translator │ │ summarizer │ │
│ │ يقرأ "input" │ │ يقرأ "input" │ │
│ └──────────────────┘ └──────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────────┘
↓
┌─────────────────────────────────────────────────────────────────────┐
│ STEP 2: بعد التنفيذ المتوازي │
├─────────────────────────────────────────────────────────────────────┤
│ │
│ State: { │
│ "input": "AI is transforming how we work", │
│ "translation": "L'IA transforme notre façon de travailler", │
│ "summary": "AI is revolutionizing work through technology" │
│ } │
│ │
│ ↓ │
│ │
│ ┌──────────────────────────────────────┐ │
│ │ combine │ │
│ │ يقرأ "translation" + "summary" │ │
│ │ يكتب "result" │ │
│ └──────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────────┘
↓
┌─────────────────────────────────────────────────────────────────────┐
│ STEP 3: الحالة النهائية │
├─────────────────────────────────────────────────────────────────────┤
│ │
│ State: { │
│ "input": "AI is transforming how we work", │
│ "translation": "L'IA transforme notre façon de travailler", │
│ "summary": "AI is revolutionizing work through technology", │
│ "result": "=== Processing Complete ===\n\nFrench..." │
│ } │
│ │
└─────────────────────────────────────────────────────────────────────┘
ما الذي يجعلها تعمل
| Component | Role |
|---|---|
AgentNode | يغلف LLM agents باستخدام input/output mappers |
input_mapper | يحول الحالة (state) إلى مدخلات الوكيل (agent input) من نوع Content |
output_mapper | يحول أحداث الوكيل (agent events) إلى تحديثات الحالة (state updates) |
channels | يعلن عن حقول الحالة (state fields) التي سيستخدمها الرسم البياني |
edge() | يحدد تدفق التنفيذ بين العقد |
ExecutionConfig | يوفر معرف سلسلة المحادثات (thread ID) لنقاط التحقق (checkpointing) |
التوجيه الشرطي باستخدام تصنيف LLM
أنشئ أنظمة توجيه ذكية حيث تقرر نماذج اللغة الكبيرة (LLMs) مسار التنفيذ:
مرئي: التوجيه بناءً على المشاعر
┌─────────────────────┐
ملاحظات المستخدم │ │
────────────────▶ │ المصنف │
│ 🧠 تحليل النبرة │
└──────────┬──────────┘
│
┌───────────────┼───────────────┐
│ │ │
▼ ▼ ▼
┌──────────────────┐ ┌──────────────────┐ ┌──────────────────┐
│ إيجابي │ │ سلبي │ │ محايد │
│ │ │ │ │ │
│ 😊 شكراً لك! │ │ 😔 اعتذر │ │ 😐 اطرح المزيد │
│ احتفل │ │ ساعد في الإصلاح │ من الأسئلة │
└──────────────────┘ └──────────────────┘ └──────────────────┘
مثال كامل للرمز
use adk_agent::LlmAgentBuilder;
use adk_graph::{
edge::{END, Router, START},
graph::StateGraph,
node::{AgentNode, ExecutionConfig},
state::State,
};
use adk_model::GeminiModel;
use serde_json::json;
use std::sync::Arc;
#[tokio::main]
async fn main() -> anyhow::Result<()> {
dotenvy::dotenv().ok();
let api_key = std::env::var("GOOGLE_API_KEY")?;
let model = Arc::new(GeminiModel::new(&api_key, "gemini-2.0-flash")?);
// Create classifier agent
let classifier_agent = Arc::new(
LlmAgentBuilder::new("classifier")
.description("Classifies text sentiment")
.model(model.clone())
.instruction(
"You are a sentiment classifier. Analyze the input text and respond with \
ONLY one word: 'positive', 'negative', or 'neutral'. Nothing else.",
)
.build()?,
);
// Create response agents for each sentiment
let positive_agent = Arc::new(
LlmAgentBuilder::new("positive")
.description("Handles positive feedback")
.model(model.clone())
.instruction(
"You are a customer success specialist. The customer has positive feedback. \
Express gratitude, reinforce the positive experience, and suggest ways to \
share their experience. Be warm and appreciative. Keep response under 3 sentences.",
)
.build()?,
);
let negative_agent = Arc::new(
LlmAgentBuilder::new("negative")
.description("Handles negative feedback")
.model(model.clone())
.instruction(
"You are a customer support specialist. The customer has a complaint. \
Acknowledge their frustration, apologize sincerely, and offer help. \
Be empathetic. Keep response under 3 sentences.",
)
.build()?,
);
let neutral_agent = Arc::new(
LlmAgentBuilder::new("neutral")
.description("Handles neutral feedback")
.model(model.clone())
.instruction(
"You are a customer service representative. The customer has neutral feedback. \
Ask clarifying questions to better understand their needs. Be helpful and curious. \
Keep response under 3 sentences.",
)
.build()?,
);
// Create AgentNodes with mappers
let classifier_node = AgentNode::new(classifier_agent)
.with_input_mapper(|state| {
let text = state.get("feedback").and_then(|v| v.as_str()).unwrap_or("");
adk_core::Content::new("user").with_text(text)
})
.with_output_mapper(|events| {
let mut updates = std::collections::HashMap::new();
for event in events {
if let Some(content) = event.content() {
let text: String = content.parts.iter()
.filter_map(|p| p.text())
.collect::<Vec<_>>()
.join("")
.to_lowercase()
.trim()
.to_string();
let sentiment = if text.contains("positive") { "positive" }
else if text.contains("negative") { "negative" }
else { "neutral" };
updates.insert("sentiment".to_string(), json!(sentiment));
}
}
updates
});
// Response nodes (similar pattern for each)
let positive_node = AgentNode::new(positive_agent)
.with_input_mapper(|state| {
let text = state.get("feedback").and_then(|v| v.as_str()).unwrap_or("");
adk_core::Content::new("user").with_text(text)
})
.with_output_mapper(|events| {
let mut updates = std::collections::HashMap::new();
for event in events {
if let Some(content) = event.content() {
let text: String = content.parts.iter()
.filter_map(|p| p.text())
.collect::<Vec<_>>()
.join("");
updates.insert("response".to_string(), json!(text));
}
}
updates
});
// Build graph with conditional routing
let graph = StateGraph::with_channels(&["feedback", "sentiment", "response"])
.add_node(classifier_node)
.add_node(positive_node)
// ... add negative_node and neutral_node similarly
.add_edge(START, "classifier")
.add_conditional_edges(
"classifier",
Router::by_field("sentiment"), // Route based on sentiment field
[
("positive", "positive"),
("negative", "negative"),
("neutral", "neutral"),
],
)
.add_edge("positive", END)
.add_edge("negative", END)
.add_edge("neutral", END)
.compile()?;
// Test with different feedback
let mut input = State::new();
input.insert("feedback".to_string(), json!("Your product is amazing! I love it!"));
let result = graph.invoke(input, ExecutionConfig::new("feedback-1")).await?;
println!("Sentiment: {}", result.get("sentiment").and_then(|v| v.as_str()).unwrap_or(""));
println!("Response: {}", result.get("response").and_then(|v| v.as_str()).unwrap_or(""));
Ok(())
}
سير العمل النموذجي:
الإدخال: "Your product is amazing! I love it!"
↓
المصنف: "positive"
↓
وكيل إيجابي: "Thank you so much for the wonderful feedback!
We're thrilled you love our product.
Would you consider leaving a review to help others?"
نمط ReAct: التفكير + العمل
قم بإنشاء وكلاء يمكنهم استخدام الأدوات بشكل متكرر لحل المشاكل المعقدة:
مرئي: دورة ReAct
┌─────────────────────┐
User Question │ │
────────────────▶ │ REASONER │
│ 🧠 Think + Act │
└──────────┬──────────┘
│
▼
┌─────────────────────┐
│ Has tool calls? │
│ │
└──────────┬──────────┘
│
┌───────────────┴───────────────┐
│ │
▼ ▼
┌──────────────────┐ ┌──────────────────┐
│ YES │ │ NO │
│ │ │ │
│ 🔄 Loop back │ │ ✅ Final answer │
│ to reasoner │ │ END │
└─────────┬────────┘ └──────────────────┘
│
└─────────────────┐
│
▼
┌─────────────────────┐
│ REASONER │
│ 🧠 Think + Act │
│ (next iteration) │
└─────────────────────┘
مثال ReAct كامل
use adk_agent::LlmAgentBuilder;
use adk_core::{Part, Tool};
use adk_graph::{
edge::{END, START},
graph::StateGraph,
node::{AgentNode, ExecutionConfig, NodeOutput},
state::State,
};
use adk_model::GeminiModel;
use adk_tool::FunctionTool;
use serde_json::json;
use std::sync::Arc;
#[tokio::main]
async fn main() -> anyhow::Result<()> {
dotenvy::dotenv().ok();
let api_key = std::env::var("GOOGLE_API_KEY")?;
let model = Arc::new(GeminiModel::new(&api_key, "gemini-2.0-flash")?);
// إنشاء الأدوات
let weather_tool = Arc::new(FunctionTool::new(
"get_weather",
"Get the current weather for a location. Takes a 'location' parameter (city name).",
|_ctx, args| async move {
let location = args.get("location").and_then(|v| v.as_str()).unwrap_or("unknown");
Ok(json!({
"location": location,
"temperature": "72°F",
"condition": "Sunny",
"humidity": "45%"
}))
},
)) as Arc<dyn Tool>;
let calculator_tool = Arc::new(FunctionTool::new(
"calculator",
"Perform mathematical calculations. Takes an 'expression' parameter (string).",
|_ctx, args| async move {
let expr = args.get("expression").and_then(|v| v.as_str()).unwrap_or("0");
let result = match expr {
"2 + 2" => "4",
"10 * 5" => "50",
"100 / 4" => "25",
"15 - 7" => "8",
_ => "Unable to evaluate",
};
Ok(json!({ "result": result, "expression": expr }))
},
)) as Arc<dyn Tool>;
// إنشاء وكيل التفكير باستخدام الأدوات
let reasoner_agent = Arc::new(
LlmAgentBuilder::new("reasoner")
.description("Reasoning agent with tools")
.model(model.clone())
.instruction(
"You are a helpful assistant with access to tools. Use tools when needed to answer questions. \
When you have enough information, provide a final answer without using more tools.",
)
.tool(weather_tool)
.tool(calculator_tool)
.build()?,
);
// إنشاء عقدة التفكير التي تكتشف استخدام الأدوات
let reasoner_node = AgentNode::new(reasoner_agent)
.with_input_mapper(|state| {
let question = state.get("question").and_then(|v| v.as_str()).unwrap_or("");
adk_core::Content::new("user").with_text(question)
})
.with_output_mapper(|events| {
let mut updates = std::collections::HashMap::new();
let mut has_tool_calls = false;
let mut response = String::new();
for event in events {
if let Some(content) = event.content() {
for part in &content.parts {
match part {
Part::FunctionCall { .. } => {
has_tool_calls = true;
}
Part::Text { text } => {
response.push_str(text);
}
_ => {}
}
}
}
}
updates.insert("has_tool_calls".to_string(), json!(has_tool_calls));
updates.insert("response".to_string(), json!(response));
updates
});
// بناء الرسم البياني ReAct مع دورة
let graph = StateGraph::with_channels(&["question", "has_tool_calls", "response", "iteration"])
.add_node(reasoner_node)
.add_node_fn("counter", |ctx| async move {
let i = ctx.get("iteration").and_then(|v| v.as_i64()).unwrap_or(0);
Ok(NodeOutput::new().with_update("iteration", json!(i + 1)))
})
.add_edge(START, "counter")
.add_edge("counter", "reasoner")
.add_conditional_edges(
"reasoner",
|state| {
let has_tools = state.get("has_tool_calls").and_then(|v| v.as_bool()).unwrap_or(false);
let iteration = state.get("iteration").and_then(|v| v.as_i64()).unwrap_or(0);
// حد أمان
if iteration >= 5 { return END.to_string(); }
if has_tools {
"counter".to_string() // العودة إلى التفكير
} else {
END.to_string() // انتهى - الإجابة النهائية
}
},
[("counter", "counter"), (END, END)],
)
.compile()?
.with_recursion_limit(10);
// اختبار وكيل ReAct
let mut input = State::new();
input.insert("question".to_string(), json!("What's the weather in Paris and what's 15 + 25?"));
let result = graph.invoke(input, ExecutionConfig::new("react-1")).await?;
println!("Final answer: {}", result.get("response").and_then(|v| v.as_str()).unwrap_or(""));
println!("Iterations: {}", result.get("iteration").and_then(|v| v.as_i64()).unwrap_or(0));
Ok(())
}
تدفق المثال:
Question: "What's the weather in Paris and what's 15 + 25?"
المرة الأولى:
Reasoner: "I need to get weather info and do math"
→ Calls get_weather(location="Paris") and calculator(expression="15 + 25")
→ has_tool_calls = true → العودة إلى التفكير
المرة الثانية:
Reasoner: "Based on the results: Paris is 72°F and sunny, 15 + 25 = 40"
→ No tool calls → has_tool_calls = false → END
Final Answer: "The weather in Paris is 72°F and sunny with 45% humidity.
And 15 + 25 equals 40."
AgentNode
يغلف أي Agent من ADK (عادةً LlmAgent) كعقدة رسم بياني:
let node = AgentNode::new(llm_agent)
.with_input_mapper(|state| {
// تحويل حالة الرسم البياني إلى مدخلات وكيل Content
let text = state.get("input").and_then(|v| v.as_str()).unwrap_or("");
adk_core::Content::new("user").with_text(text)
})
.with_output_mapper(|events| {
// تحويل أحداث الوكيل إلى تحديثات الحالة
let mut updates = std::collections::HashMap::new();
for event in events {
if let Some(content) = event.content() {
let text: String = content.parts.iter()
.filter_map(|p| p.text())
.collect::<Vec<_>>()
.join("");
updates.insert("output".to_string(), json!(text));
}
}
updates
});
Function Nodes
دوال async بسيطة تعالج الحالة:
.node_fn("process", |ctx| async move {
let input = ctx.state.get("input").unwrap();
let output = process_data(input).await?;
Ok(NodeOutput::new().with_update("output", output))
})
أنواع الحواف
الحواف الثابتة
اتصالات مباشرة بين العقد:
.edge(START, "first_node")
.edge("first_node", "second_node")
.edge("second_node", END)
الحواف الشرطية
التوجيه الديناميكي بناءً على الحالة:
.conditional_edge(
"router",
|state| {
match state.get("next").and_then(|v| v.as_str()) {
Some("research") => "research_node".to_string(),
Some("write") => "write_node".to_string(),
_ => END.to_string(),
}
},
[
("research_node", "research_node"),
("write_node", "write_node"),
(END, END),
],
)
مساعدات الموجه
استخدم الموجهات المضمنة للأنماط الشائعة:
use adk_graph::edge::Router;
// التوجيه بناءً على قيمة حقل الحالة
.conditional_edge("classifier", Router::by_field("sentiment"), [
("positive", "positive_handler"),
("negative", "negative_handler"),
("neutral", "neutral_handler"),
])
// التوجيه بناءً على حقل منطقي
.conditional_edge("check", Router::by_bool("approved"), [
("true", "execute"),
("false", "reject"),
])
// تحديد التكرارات
.conditional_edge("loop", Router::max_iterations("count", 5), [
("continue", "process"),
("done", END),
])
التنفيذ المتوازي
تُنفذ حواف متعددة من عقدة واحدة بالتوازي:
let agent = GraphAgent::builder("parallel_processor")
.channels(&["input", "translation", "summary", "analysis"])
.node(translator_node)
.node(summarizer_node)
.node(analyzer_node)
.node(combiner_node)
// تبدأ الثلاثة كلها في وقت واحد
.edge(START, "translator")
.edge(START, "summarizer")
.edge(START, "analyzer")
// انتظر حتى تكتمل جميعها قبل الدمج
.edge("translator", "combiner")
.edge("summarizer", "combiner")
.edge("analyzer", "combiner")
.edge("combiner", END)
.build()?;
الرسوم البيانية الدورية (نمط ReAct)
أنشئ وكلاء استدلال تكراريين باستخدام الدورات:
use adk_core::Part;
// Create agent with tools
let reasoner = Arc::new(
LlmAgentBuilder::new("reasoner")
.model(model)
.instruction("Use tools to answer questions. Provide final answer when done.")
.tool(search_tool)
.tool(calculator_tool)
.build()?
);
let reasoner_node = AgentNode::new(reasoner)
.with_input_mapper(|state| {
let question = state.get("question").and_then(|v| v.as_str()).unwrap_or("");
adk_core::Content::new("user").with_text(question)
})
.with_output_mapper(|events| {
let mut updates = std::collections::HashMap::new();
let mut has_tool_calls = false;
let mut response = String::new();
for event in events {
if let Some(content) = event.content() {
for part in &content.parts {
match part {
Part::FunctionCall { name, .. } => {
has_tool_calls = true;
}
Part::Text { text } => {
response.push_str(text);
}
_ => {}
}
}
}
}
updates.insert("has_tool_calls".to_string(), json!(has_tool_calls));
updates.insert("response".to_string(), json!(response));
updates
});
// Build graph with cycle
let react_agent = StateGraph::with_channels(&["question", "has_tool_calls", "response", "iteration"])
.add_node(reasoner_node)
.add_node_fn("counter", |ctx| async move {
let i = ctx.get("iteration").and_then(|v| v.as_i64()).unwrap_or(0);
Ok(NodeOutput::new().with_update("iteration", json!(i + 1)))
})
.add_edge(START, "counter")
.add_edge("counter", "reasoner")
.add_conditional_edges(
"reasoner",
|state| {
let has_tools = state.get("has_tool_calls").and_then(|v| v.as_bool()).unwrap_or(false);
let iteration = state.get("iteration").and_then(|v| v.as_i64()).unwrap_or(0);
// Safety limit
if iteration >= 5 { return END.to_string(); }
if has_tools {
"counter".to_string() // Loop back
} else {
END.to_string() // Done
}
},
[("counter", "counter"), (END, END)],
)
.compile()?
.with_recursion_limit(10);
مشرف متعدد الوكلاء
توجيه المهام إلى وكلاء متخصصين:
// Create supervisor agent
let supervisor = Arc::new(
LlmAgentBuilder::new("supervisor")
.model(model.clone())
.instruction("Route tasks to: researcher, writer, or coder. Reply with agent name only.")
.build()?
);
let supervisor_node = AgentNode::new(supervisor)
.with_output_mapper(|events| {
let mut updates = std::collections::HashMap::new();
for event in events {
if let Some(content) = event.content() {
let text: String = content.parts.iter()
.filter_map(|p| p.text())
.collect::<Vec<_>>()
.join("")
.to_lowercase();
let next = if text.contains("researcher") { "researcher" }
else if text.contains("writer") { "writer" }
else if text.contains("coder") { "coder" }
else { "done" };
updates.insert("next_agent".to_string(), json!(next));
}
}
updates
});
// Build supervisor graph
let graph = StateGraph::with_channels(&["task", "next_agent", "research", "content", "code"])
.add_node(supervisor_node)
.add_node(researcher_node)
.add_node(writer_node)
.add_node(coder_node)
.add_edge(START, "supervisor")
.add_conditional_edges(
"supervisor",
Router::by_field("next_agent"),
[
("researcher", "researcher"),
("writer", "writer"),
("coder", "coder"),
("done", END),
],
)
// Agents report back to supervisor
.add_edge("researcher", "supervisor")
.add_edge("writer", "supervisor")
.add_edge("coder", "supervisor")
.compile()?;
إدارة الحالة
مخطط الحالة مع أجهزة التخفيض
التحكم في كيفية دمج تحديثات الحالة:
let schema = StateSchema::builder()
.channel("current_step") // Overwrite (default)
.list_channel("messages") // Append to list
.channel_with_reducer("count", Reducer::Sum) // Sum values
.channel_with_reducer("data", Reducer::Custom(Arc::new(|old, new| {
// Custom merge logic
merge_json(old, new)
})))
.build();
let agent = GraphAgent::builder("stateful")
.state_schema(schema)
// ... nodes and edges
.build()?;
أنواع Reducer
| Reducer | السلوك |
|---|---|
Overwrite | استبدال القيمة القديمة بالجديدة (افتراضي) |
Append | الإلحاق بالقائمة |
Sum | جمع القيم الرقمية |
Custom | دالة دمج مخصصة |
نقاط التحقق
تمكين الحالة المستمرة لتحمل الأخطاء والتدخل البشري:
في الذاكرة (للتطوير)
use adk_graph::checkpoint::MemoryCheckpointer;
let checkpointer = Arc::new(MemoryCheckpointer::new());
let graph = StateGraph::with_channels(&["task", "result"])
// ... nodes and edges
.compile()?
.with_checkpointer_arc(checkpointer.clone());
SQLite (للإنتاج)
use adk_graph::checkpoint::SqliteCheckpointer;
let checkpointer = SqliteCheckpointer::new("checkpoints.db").await?;
let graph = StateGraph::with_channels(&["task", "result"])
// ... nodes and edges
.compile()?
.with_checkpointer(checkpointer);
سجل نقاط التحقق (السفر عبر الزمن)
// List all checkpoints for a thread
let checkpoints = checkpointer.list("thread-id").await?;
for cp in checkpoints {
println!("Step {}: {:?}", cp.step, cp.state.get("status"));
}
// Load a specific checkpoint
if let Some(checkpoint) = checkpointer.load_by_id(&checkpoint_id).await? {
println!("State at step {}: {:?}", checkpoint.step, checkpoint.state);
}
التدخل البشري في الحلقة
إيقاف التنفيذ مؤقتًا للحصول على موافقة بشرية باستخدام المقاطعات الديناميكية:
use adk_graph::{error::GraphError, node::NodeOutput};
// Planner agent assesses risk
let planner_node = AgentNode::new(planner_agent)
.with_output_mapper(|events| {
let mut updates = std::collections::HashMap::new();
for event in events {
if let Some(content) = event.content() {
let text: String = content.parts.iter()
.filter_map(|p| p.text())
.collect::<Vec<_>>()
.join("");
// Extract risk level from LLM response
let risk = if text.to_lowercase().contains("risk: high") { "high" }
else if text.to_lowercase().contains("risk: medium") { "medium" }
else { "low" };
updates.insert("plan".to_string(), json!(text));
updates.insert("risk_level".to_string(), json!(risk));
}
}
updates
});
// Review node with dynamic interrupt
let graph = StateGraph::with_channels(&["task", "plan", "risk_level", "approved", "result"])
.add_node(planner_node)
.add_node(executor_node)
.add_node_fn("review", |ctx| async move {
let risk = ctx.get("risk_level").and_then(|v| v.as_str()).unwrap_or("low");
let approved = ctx.get("approved").and_then(|v| v.as_bool());
// Already approved - continue
if approved == Some(true) {
return Ok(NodeOutput::new());
}
// High/medium risk - interrupt for approval
if risk == "high" || risk == "medium" {
return Ok(NodeOutput::interrupt_with_data(
&format!("{} RISK: Human approval required", risk.to_uppercase()),
json!({
"plan": ctx.get("plan"),
"risk_level": risk,
"action": "Set 'approved' to true to continue"
})
));
}
// Low risk - auto-approve
Ok(NodeOutput::new().with_update("approved", json!(true)))
})
.add_edge(START, "planner")
.add_edge("planner", "review")
.add_edge("review", "executor")
.add_edge("executor", END)
.compile()?
.with_checkpointer_arc(checkpointer.clone());
// Execute - may pause for approval
let thread_id = "task-001";
let result = graph.invoke(input, ExecutionConfig::new(thread_id)).await;
match result {
Err(GraphError::Interrupted(interrupt)) => {
println!("*** EXECUTION PAUSED ***");
println!("Reason: {}", interrupt.interrupt);
println!("Plan awaiting approval: {:?}", interrupt.state.get("plan"));
// Human reviews and approves...
// Update state with approval
graph.update_state(thread_id, [("approved".to_string(), json!(true))]).await?;
// Resume execution
let final_result = graph.invoke(State::new(), ExecutionConfig::new(thread_id)).await?;
println!("Final result: {:?}", final_result.get("result"));
}
Ok(result) => {
println!("Completed without interrupt: {:?}", result);
}
Err(e) => {
println!("Error: {}", e);
}
}
المقاطعات الثابتة
استخدم interrupt_before أو interrupt_after لتحديد نقاط توقف إلزامية:
let graph = StateGraph::with_channels(&["task", "plan", "result"])
.add_node(planner_node)
.add_node(executor_node)
.add_edge(START, "planner")
.add_edge("planner", "executor")
.add_edge("executor", END)
.compile()?
.with_interrupt_before(&["executor"]); // Always pause before execution
التنفيذ بالتدفق
تدفق الأحداث أثناء تنفيذ الرسم البياني:
use futures::StreamExt;
use adk_graph::stream::StreamMode;
let stream = agent.stream(input, config, StreamMode::Updates);
while let Some(event) = stream.next().await {
match event? {
StreamEvent::NodeStart(name) => println!("Starting: {}", name),
StreamEvent::Updates { node, updates } => {
println!("{} updated state: {:?}", node, updates);
}
StreamEvent::NodeEnd(name) => println!("Completed: {}", name),
StreamEvent::Done(state) => println!("Final state: {:?}", state),
_ => {}
}
}
أوضاع التدفق
| الوضع | الوصف |
|---|---|
Values | تدفق الحالة الكاملة بعد كل عقدة |
Updates | تدفق تغييرات الحالة فقط |
Messages | تدفق التحديثات من نوع الرسالة |
Debug | تدفق جميع الأحداث الداخلية |
تكامل ADK
يقوم GraphAgent بتطبيق سمة Agent الخاصة بـ ADK، لذلك فهو يعمل مع:
- Runner: يُستخدم مع
adk-runnerللتنفيذ القياسي - Callbacks: دعم كامل لاستدعاءات ما قبل/بعد (callbacks)
- Sessions: يعمل مع
adk-sessionلسجل المحادثات - Streaming: يعيد ADK
EventStream
use adk_runner::Runner;
let graph_agent = GraphAgent::builder("workflow")
.before_agent_callback(|ctx| async {
println!("Starting graph execution for session: {}", ctx.session_id());
Ok(())
})
.after_agent_callback(|ctx, event| async {
if let Some(content) = event.content() {
println!("Graph completed with content");
}
Ok(())
})
// ... تعريف الرسم البياني
.build()?;
// GraphAgent يطبق سمة Agent - استخدمه مع Launcher أو Runner
// راجع ملف README الخاص بـ adk-runner لتكوين Runner
أمثلة
تستخدم جميع الأمثلة تكامل LLM حقيقيًا مع AgentNode:
# وكلاء LLM متوازيون مع استدعاءات ما قبل/بعد (callbacks)
cargo run --example graph_agent
# مسار متعدد الوكلاء تسلسلي (مستخرج → محلل → منسق)
cargo run --example graph_workflow
# تصنيف المشاعر وتوجيه شرطي يعتمد على LLM
cargo run --example graph_conditional
# نمط ReAct مع الأدوات والتنفيذ الدوري
cargo run --example graph_react
# توجيه مشرف متعدد الوكلاء إلى المتخصصين
cargo run --example graph_supervisor
# إنسان في الحلقة مع مقاطعات قائمة على المخاطر
cargo run --example graph_hitl
# نقطة التحقق (Checkpointing) وتصحيح الأخطاء عبر الزمن
cargo run --example graph_checkpoint
مقارنة مع LangGraph
| الميزة | LangGraph | adk-graph |
|---|---|---|
| إدارة الحالة | TypedDict + Reducers | StateSchema + Reducers |
| نموذج التنفيذ | Pregel super-steps | Pregel super-steps |
| حفظ الحالة (Checkpointing) | الذاكرة، SQLite، Postgres | الذاكرة، SQLite |
| إنسان في الحلقة | interrupt_before/after | interrupt_before/after + ديناميكي |
| التدفق | 5 أوضاع | 5 أوضاع |
| الدورات | دعم أصلي | دعم أصلي |
| أمان النوع | Python typing | Rust type system |
| تكامل LLM | LangChain | AgentNode + وكلاء ADK |
السابق: ← أنظمة الوكلاء المتعددين | التالي: وكلاء الوقت الفعلي →