ग्राफ़ एजेंट

नेटिव ADK-Rust इंटीग्रेशन के साथ LangGraph-style ऑर्केस्ट्रेशन का उपयोग करके जटिल, स्टेटफुल वर्कफ़्लो बनाएं।

अवलोकन

GraphAgent आपको नोड्स और एजेस के साथ निर्देशित ग्राफ़ के रूप में वर्कफ़्लो को परिभाषित करने की अनुमति देता है, जो निम्नलिखित का समर्थन करता है:

  • AgentNode: LLM agents को कस्टम input/output mappers के साथ ग्राफ़ नोड्स के रूप में रैप करें
  • Cyclic Workflows: लूप्स और इटरेटिव रीज़निंग (ReAct pattern) के लिए नेटिव समर्थन
  • Conditional Routing: स्टेट के आधार पर डायनामिक एज राउटिंग
  • State Management: reducers (ओवरराइट, अपेंड, सम, कस्टम) के साथ टाइप्ड स्टेट
  • Checkpointing: फ़ॉल्ट टॉलरेंस और human-in-the-loop के लिए परसिस्टेंट स्टेट
  • Streaming: मल्टीपल स्ट्रीम मोड्स (values, updates, messages, debug)

adk-graph crate जटिल, स्टेटफुल agent workflows के निर्माण के लिए LangGraph-style वर्कफ़्लो ऑर्केस्ट्रेशन प्रदान करता है। यह ADK के agent system के साथ पूर्ण संगतता बनाए रखते हुए ADK-Rust ecosystem में ग्राफ़-आधारित वर्कफ़्लो क्षमताएं लाता है।

मुख्य लाभ:

  • विजुअल वर्कफ़्लो डिज़ाइन: सहज node-and-edge ग्राफ़ के रूप में जटिल लॉजिक को परिभाषित करें
  • पैरेलल एग्जीक्यूशन: बेहतर performance के लिए मल्टीपल नोड्स एक साथ चल सकते हैं
  • स्टेट परसिस्टेंस: फ़ॉल्ट टॉलरेंस और human-in-the-loop के लिए बिल्ट-इन checkpointing
  • LLM इंटीग्रेशन: ADK agents को ग्राफ़ नोड्स के रूप में रैप करने के लिए नेटिव समर्थन
  • फ्लेक्सिबल राउटिंग: स्टैटिक एजेस, कंडीशनल राउटिंग और डायनामिक निर्णय लेना

आप क्या बनाएंगे

इस गाइड में, आप एक टेक्स्ट प्रोसेसिंग पाइपलाइन बनाएंगे जो अनुवाद और सारांश को समानांतर रूप से चलाती है:

                        ┌─────────────────────┐
       User Input       │                     │
      ────────────────▶ │       START         │
                        │                     │
                        └──────────┬──────────┘
                                   │
                   ┌───────────────┴───────────────┐
                   │                               │
                   ▼                               ▼
        ┌──────────────────┐            ┌──────────────────┐
        │   TRANSLATOR     │            │   SUMMARIZER     │
        │                  │            │                  │
        │  🇫🇷 French       │            │  📝 One sentence │
        │     Translation  │            │     Summary      │
        └─────────┬────────┘            └─────────┬────────┘
                  │                               │
                  └───────────────┬───────────────┘
                                  │
                                  ▼
                        ┌─────────────────────┐
                        │      COMBINE        │
                        │                     │
                        │  📋 Merge Results   │
                        └──────────┬──────────┘
                                   │
                                   ▼
                        ┌─────────────────────┐
                        │        END          │
                        │                     │
                        │   ✅ Complete       │
                        └─────────────────────┘

मुख्य अवधारणाएँ:

  • Nodes - प्रोसेसिंग इकाइयाँ जो काम करती हैं (LLM agents, फ़ंक्शन, या कस्टम लॉजिक)
  • Edges - Nodes के बीच नियंत्रण प्रवाह (स्थिर कनेक्शन या सशर्त रूटिंग)
  • State - साझा डेटा जो ग्राफ़ के माध्यम से प्रवाहित होता है और Nodes के बीच बना रहता है
  • Parallel Execution - बेहतर प्रदर्शन के लिए कई Nodes एक साथ चल सकते हैं

मुख्य घटकों को समझना

🔧 Nodes: कार्यकर्ता Nodes वह जगह है जहाँ वास्तविक काम होता है। प्रत्येक Node कर सकता है:

  • AgentNode: प्राकृतिक भाषा को संसाधित करने के लिए एक LLM agent को रैप करें
  • Function Node: डेटा प्रोसेसिंग के लिए कस्टम Rust कोड निष्पादित करें
  • Built-in Nodes: काउंटर या वैलिडेटर जैसे पूर्वनिर्धारित लॉजिक का उपयोग करें

Nodes को एक असेंबली लाइन में विशेष कर्मचारियों के रूप में सोचें - प्रत्येक का एक विशिष्ट कार्य और विशेषज्ञता होती है।

🔀 Edges: प्रवाह नियंत्रण Edges यह निर्धारित करते हैं कि निष्पादन आपके ग्राफ़ के माध्यम से कैसे आगे बढ़ता है:

  • Static Edges: सीधे कनेक्शन (A → B → C)
  • Conditional Edges: State के आधार पर गतिशील रूटिंग (if sentiment == "positive" → positive_handler)
  • Parallel Edges: एक Node से कई पथ (START → [translator, summarizer])

Edges ट्रैफिक सिग्नल और सड़क के संकेतों की तरह हैं जो काम के प्रवाह को निर्देशित करते हैं।

💾 State: साझा मेमोरी State एक की-वैल्यू स्टोर है जिसे सभी Nodes पढ़ और लिख सकते हैं:

  • Input Data: ग्राफ़ में डाली गई प्रारंभिक जानकारी
  • Intermediate Results: एक Node का आउटपुट दूसरे के लिए इनपुट बन जाता है
  • Final Output: सभी प्रोसेसिंग के बाद पूरा हुआ परिणाम

State एक साझा व्हाइटबोर्ड की तरह कार्य करता है जहाँ Nodes दूसरों के उपयोग के लिए जानकारी छोड़ सकते हैं।

⚡ Parallel Execution: गति को बढ़ावा जब एक Node से कई Edges निकलते हैं, तो वे लक्ष्य Nodes एक साथ चलते हैं:

  • तेज़ प्रोसेसिंग: स्वतंत्र कार्य एक ही समय में चलते हैं
  • संसाधन दक्षता: CPU और I/O का बेहतर उपयोग
  • Scalability: रेखीय मंदी के बिना अधिक जटिल वर्कफ़्लो को संभालना

यह ऐसा है जैसे एक साथ कई कर्मचारी किसी कार्य के विभिन्न हिस्सों को संभाल रहे हों, बजाय इसके कि वे कतार में प्रतीक्षा करें।


त्वरित आरंभ

1. अपना प्रोजेक्ट बनाएँ

cargo new graph_demo
cd graph_demo

Cargo.toml में निर्भरताएँ जोड़ें:

[dependencies]
adk-graph = { version = "0.2", features = ["sqlite"] }
adk-agent = "0.2"
adk-model = "0.2"
adk-core = "0.2"
tokio = { version = "1", features = ["full"] }
dotenvy = "0.15"
serde_json = "1.0"

अपनी API कुंजी के साथ .env बनाएँ:

echo 'GOOGLE_API_KEY=your-api-key' > .env

2. समानांतर प्रसंस्करण उदाहरण

यहां एक पूर्ण कार्यशील उदाहरण दिया गया है जो पाठ को समानांतर में संसाधित करता है:

use adk_agent::LlmAgentBuilder;
use adk_graph::{
    agent::GraphAgent,
    edge::{END, START},
    node::{AgentNode, ExecutionConfig, NodeOutput},
    state::State,
};
use adk_model::GeminiModel;
use serde_json::json;
use std::sync::Arc;

#[tokio::main]
async fn main() -> anyhow::Result<()> {
    dotenvy::dotenv().ok();
    let api_key = std::env::var("GOOGLE_API_KEY")?;
    let model = Arc::new(GeminiModel::new(&api_key, "gemini-2.0-flash")?);

    // Create specialized LLM agents
    let translator_agent = Arc::new(
        LlmAgentBuilder::new("translator")
            .description("Translates text to French")
            .model(model.clone())
            .instruction("Translate the input text to French. Only output the translation.")
            .build()?,
    );

    let summarizer_agent = Arc::new(
        LlmAgentBuilder::new("summarizer")
            .description("Summarizes text")
            .model(model.clone())
            .instruction("Summarize the input text in one sentence.")
            .build()?,
    );

    // Wrap agents as graph nodes with input/output mappers
    let translator_node = AgentNode::new(translator_agent)
        .with_input_mapper(|state| {
            let text = state.get("input").and_then(|v| v.as_str()).unwrap_or("");
            adk_core::Content::new("user").with_text(text)
        })
        .with_output_mapper(|events| {
            let mut updates = std::collections::HashMap::new();
            for event in events {
                if let Some(content) = event.content() {
                    let text: String = content.parts.iter()
                        .filter_map(|p| p.text())
                        .collect::<Vec<_>>()
                        .join("");
                    if !text.is_empty() {
                        updates.insert("translation".to_string(), json!(text));
                    }
                }
            }
            updates
        });

    let summarizer_node = AgentNode::new(summarizer_agent)
        .with_input_mapper(|state| {
            let text = state.get("input").and_then(|v| v.as_str()).unwrap_or("");
            adk_core::Content::new("user").with_text(text)
        })
        .with_output_mapper(|events| {
            let mut updates = std::collections::HashMap::new();
            for event in events {
                if let Some(content) = event.content() {
                    let text: String = content.parts.iter()
                        .filter_map(|p| p.text())
                        .collect::<Vec<_>>()
                        .join("");
                    if !text.is_empty() {
                        updates.insert("summary".to_string(), json!(text));
                    }
                }
            }
            updates
        });

    // Build the graph with parallel execution
    let agent = GraphAgent::builder("text_processor")
        .description("Processes text with translation and summarization in parallel")
        .channels(&["input", "translation", "summary", "result"])
        .node(translator_node)
        .node(summarizer_node)
        .node_fn("combine", |ctx| async move {
            let translation = ctx.get("translation").and_then(|v| v.as_str()).unwrap_or("N/A");
            let summary = ctx.get("summary").and_then(|v| v.as_str()).unwrap_or("N/A");

            let result = format!(
                "=== Processing Complete ===\n\n\
                French Translation:\n{}\n\n\
                Summary:\n{}",
                translation, summary
            );

            Ok(NodeOutput::new().with_update("result", json!(result)))
        })
        // Parallel execution: both nodes start simultaneously
        .edge(START, "translator")
        .edge(START, "summarizer")
        .edge("translator", "combine")
        .edge("summarizer", "combine")
        .edge("combine", END)
        .build()?;

    // Execute the graph
    let mut input = State::new();
    input.insert("input".to_string(), json!("AI is transforming how we work and live."));

    let result = agent.invoke(input, ExecutionConfig::new("thread-1")).await?;
    println!("{}", result.get("result").and_then(|v| v.as_str()).unwrap_or(""));

    Ok(())
}

उदाहरण आउटपुट:

=== Processing Complete ===

French Translation:
L'IA transforme notre façon de travailler et de vivre.

Summary:
AI is revolutionizing work and daily life through technological transformation.

ग्राफ निष्पादन कैसे काम करता है

बड़ी तस्वीर

Graph agents सुपर-स्टेप्स में निष्पादित होते हैं - सभी तैयार नोड्स समानांतर में चलते हैं, फिर ग्राफ अगले स्टेप से पहले सभी के पूरा होने का इंतजार करता है:

Step 1: START ──┬──▶ translator (running)
                └──▶ summarizer (running)
                
                ⏳ Wait for both to complete...
                
Step 2: translator ──┬──▶ combine (running)
        summarizer ──┘
        
                ⏳ Wait for combine to complete...
                
Step 3: combine ──▶ END ✅

नोड्स के माध्यम से स्टेट फ्लो

प्रत्येक नोड साझा स्टेट से पढ़ और उसमें लिख सकता है:

┌─────────────────────────────────────────────────────────────────────┐
│ STEP 1: Initial state                                               │
├─────────────────────────────────────────────────────────────────────┤
│                                                                     │
│   State: { "input": "AI is transforming how we work" }             │
│                                                                     │
│                              ↓                                      │
│                                                                     │
│   ┌──────────────────┐              ┌──────────────────┐           │
│   │   translator     │              │   summarizer     │           │
│   │  reads "input"   │              │  reads "input"   │           │
│   └──────────────────┘              └──────────────────┘           │
│                                                                     │
└─────────────────────────────────────────────────────────────────────┘
                              ↓
┌─────────────────────────────────────────────────────────────────────┐
│ STEP 2: After parallel execution                                    │
├─────────────────────────────────────────────────────────────────────┤
│                                                                     │
│   State: {                                                          │
│     "input": "AI is transforming how we work",                     │
│     "translation": "L'IA transforme notre façon de travailler",    │
│     "summary": "AI is revolutionizing work through technology"     │
│   }                                                                 │
│                                                                     │
│                              ↓                                      │
│                                                                     │
│   ┌──────────────────────────────────────┐                         │
│   │           combine                    │                         │
│   │  reads "translation" + "summary"     │                         │
│   │  writes "result"                     │                         │
│   └──────────────────────────────────────┘                         │
│                                                                     │
└─────────────────────────────────────────────────────────────────────┘
                              ↓
┌─────────────────────────────────────────────────────────────────────┐
│ STEP 3: Final state                                                 │
├─────────────────────────────────────────────────────────────────────┤
│                                                                     │
│   State: {                                                          │
│     "input": "AI is transforming how we work",                     │
│     "translation": "L'IA transforme notre façon de travailler",    │
│     "summary": "AI is revolutionizing work through technology",    │
│     "result": "=== Processing Complete ===\n\nFrench..."          │
│   }                                                                 │
│                                                                     │
└─────────────────────────────────────────────────────────────────────┘

क्या इसे कार्यशील बनाता है

ComponentRole
AgentNodeLLM agents को input/output mappers के साथ रैप करता है
input_mapperस्टेट को agent input Content में बदलता है
output_mapperagent events को स्टेट अपडेट में बदलता है
channelsस्टेट फ़ील्ड घोषित करता है जिनका ग्राफ उपयोग करेगा
edge()नोड्स के बीच निष्पादन प्रवाह को परिभाषित करता है
ExecutionConfigचेकपॉइंटिंग के लिए थ्रेड ID प्रदान करता है

LLM वर्गीकरण के साथ सशर्त रूटिंग

स्मार्ट रूटिंग सिस्टम बनाएं जहाँ LLM निष्पादन पथ तय करते हैं:

विज़ुअल: भावना-आधारित रूटिंग

                        ┌─────────────────────┐
       उपयोगकर्ता प्रतिक्रिया    │                     │
      ────────────────▶ │    CLASSIFIER       │
                        │  🧠 स्वर का विश्लेषण करें    │
                        └──────────┬──────────┘
                                   │
                   ┌───────────────┼───────────────┐
                   │               │               │
                   ▼               ▼               ▼
        ┌──────────────────┐ ┌──────────────────┐ ┌──────────────────┐
        │   सकारात्मक       │ │    नकारात्मक     │ │    तटस्थ        │
        │                  │ │                  │ │                  │
        │  😊 धन्यवाद!     │ │  😔 क्षमा याचना   │ │  😐 और प्रश्न पूछें  │
        │     खुशियाँ मनाएँ │ │     ठीक करने में मदद करें │     प्रश्न      │
        └──────────────────┘ └──────────────────┘ └──────────────────┘

पूर्ण उदाहरण कोड

use adk_agent::LlmAgentBuilder;
use adk_graph::{
    edge::{END, Router, START},
    graph::StateGraph,
    node::{AgentNode, ExecutionConfig},
    state::State,
};
use adk_model::GeminiModel;
use serde_json::json;
use std::sync::Arc;

#[tokio::main]
async fn main() -> anyhow::Result<()> {
    dotenvy::dotenv().ok();
    let api_key = std::env::var("GOOGLE_API_KEY")?;
    let model = Arc::new(GeminiModel::new(&api_key, "gemini-2.0-flash")?);

    // Create classifier agent
    let classifier_agent = Arc::new(
        LlmAgentBuilder::new("classifier")
            .description("Classifies text sentiment")
            .model(model.clone())
            .instruction(
                "You are a sentiment classifier. Analyze the input text and respond with \
                ONLY one word: 'positive', 'negative', or 'neutral'. Nothing else.",
            )
            .build()?,
    );

    // Create response agents for each sentiment
    let positive_agent = Arc::new(
        LlmAgentBuilder::new("positive")
            .description("Handles positive feedback")
            .model(model.clone())
            .instruction(
                "You are a customer success specialist. The customer has positive feedback. \
                Express gratitude, reinforce the positive experience, and suggest ways to \
                share their experience. Be warm and appreciative. Keep response under 3 sentences.",
            )
            .build()?,
    );

    let negative_agent = Arc::new(
        LlmAgentBuilder::new("negative")
            .description("Handles negative feedback")
            .model(model.clone())
            .instruction(
                "You are a customer support specialist. The customer has a complaint. \
                Acknowledge their frustration, apologize sincerely, and offer help. \
                Be empathetic. Keep response under 3 sentences.",
            )
            .build()?,
    );

    let neutral_agent = Arc::new(
        LlmAgentBuilder::new("neutral")
            .description("Handles neutral feedback")
            .model(model.clone())
            .instruction(
                "You are a customer service representative. The customer has neutral feedback. \
                Ask clarifying questions to better understand their needs. Be helpful and curious. \
                Keep response under 3 sentences.",
            )
            .build()?,
    );

    // Create AgentNodes with mappers
    let classifier_node = AgentNode::new(classifier_agent)
        .with_input_mapper(|state| {
            let text = state.get("feedback").and_then(|v| v.as_str()).unwrap_or("");
            adk_core::Content::new("user").with_text(text)
        })
        .with_output_mapper(|events| {
            let mut updates = std::collections::HashMap::new();
            for event in events {
                if let Some(content) = event.content() {
                    let text: String = content.parts.iter()
                        .filter_map(|p| p.text())
                        .collect::<Vec<_>>()
                        .join("")
                        .to_lowercase()
                        .trim()
                        .to_string();
                    
                    let sentiment = if text.contains("positive") { "positive" }
                        else if text.contains("negative") { "negative" }
                        else { "neutral" };
                    
                    updates.insert("sentiment".to_string(), json!(sentiment));
                }
            }
            updates
        });

    // Response nodes (similar pattern for each)
    let positive_node = AgentNode::new(positive_agent)
        .with_input_mapper(|state| {
            let text = state.get("feedback").and_then(|v| v.as_str()).unwrap_or("");
            adk_core::Content::new("user").with_text(text)
        })
        .with_output_mapper(|events| {
            let mut updates = std::collections::HashMap::new();
            for event in events {
                if let Some(content) = event.content() {
                    let text: String = content.parts.iter()
                        .filter_map(|p| p.text())
                        .collect::<Vec<_>>()
                        .join("");
                    updates.insert("response".to_string(), json!(text));
                }
            }
            updates
        });

    // Build graph with conditional routing
    let graph = StateGraph::with_channels(&["feedback", "sentiment", "response"])
        .add_node(classifier_node)
        .add_node(positive_node)
        // ... add negative_node and neutral_node similarly
        .add_edge(START, "classifier")
        .add_conditional_edges(
            "classifier",
            Router::by_field("sentiment"),  // Route based on sentiment field
            [
                ("positive", "positive"),
                ("negative", "negative"),
                ("neutral", "neutral"),
            ],
        )
        .add_edge("positive", END)
        .add_edge("negative", END)
        .add_edge("neutral", END)
        .compile()?;

    // Test with different feedback
    let mut input = State::new();
    input.insert("feedback".to_string(), json!("Your product is amazing! I love it!"));

    let result = graph.invoke(input, ExecutionConfig::new("feedback-1")).await?;
    println!("Sentiment: {}", result.get("sentiment").and_then(|v| v.as_str()).unwrap_or(""));
    println!("Response: {}", result.get("response").and_then(|v| v.as_str()).unwrap_or(""));

    Ok(())
}

उदाहरण प्रवाह:

इनपुट: "आपका उत्पाद अद्भुत है! मुझे यह बहुत पसंद है!"
       ↓
CLASSIFIER: "positive"
       ↓
Positive Agent: "अद्भुत प्रतिक्रिया के लिए आपका बहुत-बहुत धन्यवाद! 
                हमें खुशी है कि आपको हमारा उत्पाद पसंद आया। 
                क्या आप दूसरों की मदद के लिए एक समीक्षा लिखने पर विचार करेंगे?"

ReAct पैटर्न: तर्क + कार्य करना

ऐसे एजेंट बनाएं जो जटिल समस्याओं को हल करने के लिए बार-बार टूल का उपयोग कर सकें:

विजुअल: ReAct साइकिल

                        ┌─────────────────────┐
       User Question    │                     │
      ────────────────▶ │      REASONER       │
                        │  🧠 Think + Act     │
                        └──────────┬──────────┘
                                   │
                                   ▼
                        ┌─────────────────────┐
                        │   Has tool calls?   │
                        │                     │
                        └──────────┬──────────┘
                                   │
                   ┌───────────────┴───────────────┐
                   │                               │
                   ▼                               ▼
        ┌──────────────────┐            ┌──────────────────┐
        │       YES        │            │        NO        │
        │                  │            │                  │
        │  🔄 Loop back    │            │  ✅ Final answer │
        │     to reasoner  │            │      END         │
        └─────────┬────────┘            └──────────────────┘
                  │
                  └─────────────────┐
                                    │
                                    ▼
                        ┌─────────────────────┐
                        │      REASONER       │
                        │  🧠 Think + Act     │
                        │   (next iteration)  │
                        └─────────────────────┘

पूर्ण ReAct उदाहरण

use adk_agent::LlmAgentBuilder;
use adk_core::{Part, Tool};
use adk_graph::{
    edge::{END, START},
    graph::StateGraph,
    node::{AgentNode, ExecutionConfig, NodeOutput},
    state::State,
};
use adk_model::GeminiModel;
use adk_tool::FunctionTool;
use serde_json::json;
use std::sync::Arc;

#[tokio::main]
async fn main() -> anyhow::Result<()> {
    dotenvy::dotenv().ok();
    let api_key = std::env::var("GOOGLE_API_KEY")?;
    let model = Arc::new(GeminiModel::new(&api_key, "gemini-2.0-flash")?);

    // Create tools
    let weather_tool = Arc::new(FunctionTool::new(
        "get_weather",
        "Get the current weather for a location. Takes a 'location' parameter (city name).",
        |_ctx, args| async move {
            let location = args.get("location").and_then(|v| v.as_str()).unwrap_or("unknown");
            Ok(json!({
                "location": location,
                "temperature": "72°F",
                "condition": "Sunny",
                "humidity": "45%"
            }))
        },
    )) as Arc<dyn Tool>;

    let calculator_tool = Arc::new(FunctionTool::new(
        "calculator",
        "Perform mathematical calculations. Takes an 'expression' parameter (string).",
        |_ctx, args| async move {
            let expr = args.get("expression").and_then(|v| v.as_str()).unwrap_or("0");
            let result = match expr {
                "2 + 2" => "4",
                "10 * 5" => "50",
                "100 / 4" => "25",
                "15 - 7" => "8",
                _ => "Unable to evaluate",
            };
            Ok(json!({ "result": result, "expression": expr }))
        },
    )) as Arc<dyn Tool>;

    // Create reasoner agent with tools
    let reasoner_agent = Arc::new(
        LlmAgentBuilder::new("reasoner")
            .description("Reasoning agent with tools")
            .model(model.clone())
            .instruction(
                "You are a helpful assistant with access to tools. Use tools when needed to answer questions. \
                When you have enough information, provide a final answer without using more tools.",
            )
            .tool(weather_tool)
            .tool(calculator_tool)
            .build()?,
    );

    // Create reasoner node that detects tool usage
    let reasoner_node = AgentNode::new(reasoner_agent)
        .with_input_mapper(|state| {
            let question = state.get("question").and_then(|v| v.as_str()).unwrap_or("");
            adk_core::Content::new("user").with_text(question)
        })
        .with_output_mapper(|events| {
            let mut updates = std::collections::HashMap::new();
            let mut has_tool_calls = false;
            let mut response = String::new();

            for event in events {
                if let Some(content) = event.content() {
                    for part in &content.parts {
                        match part {
                            Part::FunctionCall { .. } => {
                                has_tool_calls = true;
                            }
                            Part::Text { text } => {
                                response.push_str(text);
                            }
                            _ => {}
                        }
                    }
                }
            }

            updates.insert("has_tool_calls".to_string(), json!(has_tool_calls));
            updates.insert("response".to_string(), json!(response));
            updates
        });

    // Build ReAct graph with cycle
    let graph = StateGraph::with_channels(&["question", "has_tool_calls", "response", "iteration"])
        .add_node(reasoner_node)
        .add_node_fn("counter", |ctx| async move {
            let i = ctx.get("iteration").and_then(|v| v.as_i64()).unwrap_or(0);
            Ok(NodeOutput::new().with_update("iteration", json!(i + 1)))
        })
        .add_edge(START, "counter")
        .add_edge("counter", "reasoner")
        .add_conditional_edges(
            "reasoner",
            |state| {
                let has_tools = state.get("has_tool_calls").and_then(|v| v.as_bool()).unwrap_or(false);
                let iteration = state.get("iteration").and_then(|v| v.as_i64()).unwrap_or(0);

                // Safety limit
                if iteration >= 5 { return END.to_string(); }

                if has_tools {
                    "counter".to_string()  // Loop back for more reasoning
                } else {
                    END.to_string()  // Done - final answer
                }
            },
            [("counter", "counter"), (END, END)],
        )
        .compile()?
        .with_recursion_limit(10);

    // Test the ReAct agent
    let mut input = State::new();
    input.insert("question".to_string(), json!("What's the weather in Paris and what's 15 + 25?"));

    let result = graph.invoke(input, ExecutionConfig::new("react-1")).await?;
    println!("Final answer: {}", result.get("response").and_then(|v| v.as_str()).unwrap_or(""));
    println!("Iterations: {}", result.get("iteration").and_then(|v| v.as_i64()).unwrap_or(0));

    Ok(())
}

उदाहरण प्रवाह:

Question: "What's the weather in Paris and what's 15 + 25?"

Iteration 1:
  Reasoner: "मुझे मौसम की जानकारी चाहिए और गणित करना है"
  → Calls get_weather(location="Paris") and calculator(expression="15 + 25")
  → has_tool_calls = true → वापस लूप करें

Iteration 2:
  Reasoner: "परिणामों के आधार पर: पेरिस में 72°F तापमान और धूप है, 15 + 25 = 40"
  → No tool calls → has_tool_calls = false → समाप्त

अंतिम उत्तर: "पेरिस में मौसम 72°F और धूप वाला है जिसमें 45% आर्द्रता है। 
              और 15 + 25 बराबर 40 है।"

AgentNode

किसी भी ADK Agent (आमतौर पर LlmAgent) को एक ग्राफ़ नोड के रूप में लपेटता है:

let node = AgentNode::new(llm_agent)
    .with_input_mapper(|state| {
        // ग्राफ़ स्थिति को एजेंट इनपुट Content में बदलें
        let text = state.get("input").and_then(|v| v.as_str()).unwrap_or("");
        adk_core::Content::new("user").with_text(text)
    })
    .with_output_mapper(|events| {
        // एजेंट इवेंट्स को स्थिति अपडेट में बदलें
        let mut updates = std::collections::HashMap::new();
        for event in events {
            if let Some(content) = event.content() {
                let text: String = content.parts.iter()
                    .filter_map(|p| p.text())
                    .collect::<Vec<_>>()
                    .join("");
                updates.insert("output".to_string(), json!(text));
            }
        }
        updates
    });

Function Nodes

सरल async फ़ंक्शन जो स्थिति को संसाधित करते हैं:

.node_fn("process", |ctx| async move {
    let input = ctx.state.get("input").unwrap();
    let output = process_data(input).await?;
    Ok(NodeOutput::new().with_update("output", output))
})

एज प्रकार

स्थैतिक एज

नोड्स के बीच सीधा संबंध:

.edge(START, "first_node")
.edge("first_node", "second_node")
.edge("second_node", END)

सशर्त एज

स्टेट के आधार पर गतिशील रूटिंग:

.conditional_edge(
    "router",
    |state| {
        match state.get("next").and_then(|v| v.as_str()) {
            Some("research") => "research_node".to_string(),
            Some("write") => "write_node".to_string(),
            _ => END.to_string(),
        }
    },
    [
        ("research_node", "research_node"),
        ("write_node", "write_node"),
        (END, END),
    ],
)

राउटर हेल्पर

सामान्य पैटर्न के लिए बिल्ट-इन राउटर का उपयोग करें:

use adk_graph::edge::Router;

// Route based on a state field value
// एक स्टेट फ़ील्ड मान के आधार पर रूट करें
.conditional_edge("classifier", Router::by_field("sentiment"), [
    ("positive", "positive_handler"),
    ("negative", "negative_handler"),
    ("neutral", "neutral_handler"),
])

// Route based on boolean field
// बूलियन फ़ील्ड के आधार पर रूट करें
.conditional_edge("check", Router::by_bool("approved"), [
    ("true", "execute"),
    ("false", "reject"),
])

// Limit iterations
// पुनरावृति सीमित करें
.conditional_edge("loop", Router::max_iterations("count", 5), [
    ("continue", "process"),
    ("done", END),
])

समानांतर निष्पादन

एक सिंगल नोड से कई एज समानांतर में निष्पादित होते हैं:

let agent = GraphAgent::builder("parallel_processor")
    .channels(&["input", "translation", "summary", "analysis"])
    .node(translator_node)
    .node(summarizer_node)
    .node(analyzer_node)
    .node(combiner_node)
    // All three start simultaneously
    // तीनों एक साथ शुरू होते हैं
    .edge(START, "translator")
    .edge(START, "summarizer")
    .edge(START, "analyzer")
    // Wait for all to complete before combining
    // संयोजित करने से पहले सभी के पूरा होने की प्रतीक्षा करें
    .edge("translator", "combiner")
    .edge("summarizer", "combiner")
    .edge("analyzer", "combiner")
    .edge("combiner", END)
    .build()?;

चक्रीय ग्राफ़ (ReAct पैटर्न)

चक्रों के साथ पुनरावृत्त तर्क एजेंट बनाएं:

use adk_core::Part;

// Create agent with tools
let reasoner = Arc::new(
    LlmAgentBuilder::new("reasoner")
        .model(model)
        .instruction("Use tools to answer questions. Provide final answer when done.")
        .tool(search_tool)
        .tool(calculator_tool)
        .build()?
);

let reasoner_node = AgentNode::new(reasoner)
    .with_input_mapper(|state| {
        let question = state.get("question").and_then(|v| v.as_str()).unwrap_or("");
        adk_core::Content::new("user").with_text(question)
    })
    .with_output_mapper(|events| {
        let mut updates = std::collections::HashMap::new();
        let mut has_tool_calls = false;
        let mut response = String::new();

        for event in events {
            if let Some(content) = event.content() {
                for part in &content.parts {
                    match part {
                        Part::FunctionCall { name, .. } => {
                            has_tool_calls = true;
                        }
                        Part::Text { text } => {
                            response.push_str(text);
                        }
                        _ => {}
                    }
                }
            }
        }

        updates.insert("has_tool_calls".to_string(), json!(has_tool_calls));
        updates.insert("response".to_string(), json!(response));
        updates
    });

// Build graph with cycle
let react_agent = StateGraph::with_channels(&["question", "has_tool_calls", "response", "iteration"])
    .add_node(reasoner_node)
    .add_node_fn("counter", |ctx| async move {
        let i = ctx.get("iteration").and_then(|v| v.as_i64()).unwrap_or(0);
        Ok(NodeOutput::new().with_update("iteration", json!(i + 1)))
    })
    .add_edge(START, "counter")
    .add_edge("counter", "reasoner")
    .add_conditional_edges(
        "reasoner",
        |state| {
            let has_tools = state.get("has_tool_calls").and_then(|v| v.as_bool()).unwrap_or(false);
            let iteration = state.get("iteration").and_then(|v| v.as_i64()).unwrap_or(0);

            // Safety limit
            if iteration >= 5 { return END.to_string(); }

            if has_tools {
                "counter".to_string()  // Loop back
            } else {
                END.to_string()  // Done
            }
        },
        [("counter", "counter"), (END, END)],
    )
    .compile()?
    .with_recursion_limit(10);

मल्टी-एजेंट सुपरवाइजर

विशेषज्ञ एजेंटों को कार्य रूट करें:

// Create supervisor agent
let supervisor = Arc::new(
    LlmAgentBuilder::new("supervisor")
        .model(model.clone())
        .instruction("Route tasks to: researcher, writer, or coder. Reply with agent name only.")
        .build()?
);

let supervisor_node = AgentNode::new(supervisor)
    .with_output_mapper(|events| {
        let mut updates = std::collections::HashMap::new();
        for event in events {
            if let Some(content) = event.content() {
                let text: String = content.parts.iter()
                    .filter_map(|p| p.text())
                    .collect::<Vec<_>>()
                    .join("")
                    .to_lowercase();

                let next = if text.contains("researcher") { "researcher" }
                    else if text.contains("writer") { "writer" }
                    else if text.contains("coder") { "coder" }
                    else { "done" };

                updates.insert("next_agent".to_string(), json!(next));
            }
        }
        updates
    });

// Build supervisor graph
let graph = StateGraph::with_channels(&["task", "next_agent", "research", "content", "code"])
    .add_node(supervisor_node)
    .add_node(researcher_node)
    .add_node(writer_node)
    .add_node(coder_node)
    .add_edge(START, "supervisor")
    .add_conditional_edges(
        "supervisor",
        Router::by_field("next_agent"),
        [
            ("researcher", "researcher"),
            ("writer", "writer"),
            ("coder", "coder"),
            ("done", END),
        ],
    )
    // Agents report back to supervisor
    .add_edge("researcher", "supervisor")
    .add_edge("writer", "supervisor")
    .add_edge("coder", "supervisor")
    .compile()?;

स्टेट मैनेजमेंट

रिड्यूसर के साथ स्टेट स्कीमा

नियंत्रित करें कि स्टेट अपडेट कैसे मर्ज किए जाते हैं:

let schema = StateSchema::builder()
    .channel("current_step")                    // Overwrite (default)
    .list_channel("messages")                   // Append to list
    .channel_with_reducer("count", Reducer::Sum) // Sum values
    .channel_with_reducer("data", Reducer::Custom(Arc::new(|old, new| {
        // Custom merge logic
        merge_json(old, new)
    })))
    .build();

let agent = GraphAgent::builder("stateful")
    .state_schema(schema)
    // ... nodes and edges
    .build()?;

रिड्यूसर प्रकार

रिड्यूसरव्यवहार
Overwriteपुराने मान को नए मान से बदलें (डिफ़ॉल्ट)
Appendसूची में जोड़ें
Sumसंख्यात्मक मानों को जोड़ें
Customकस्टम मर्ज फ़ंक्शन

चेकपॉइंटिंग

फॉल्ट टॉलरेंस और मानव-इन-द-लूप के लिए स्थायी स्थिति सक्षम करें:

इन-मेमोरी (विकास)

use adk_graph::checkpoint::MemoryCheckpointer;

let checkpointer = Arc::new(MemoryCheckpointer::new());

let graph = StateGraph::with_channels(&["task", "result"])
    // ... nodes and edges
    .compile()?
    .with_checkpointer_arc(checkpointer.clone());

SQLite (उत्पादन)

use adk_graph::checkpoint::SqliteCheckpointer;

let checkpointer = SqliteCheckpointer::new("checkpoints.db").await?;

let graph = StateGraph::with_channels(&["task", "result"])
    // ... nodes and edges
    .compile()?
    .with_checkpointer(checkpointer);

चेकपॉइंट हिस्ट्री (टाइम ट्रैवल)

// List all checkpoints for a thread
let checkpoints = checkpointer.list("thread-id").await?;
for cp in checkpoints {
    println!("Step {}: {:?}", cp.step, cp.state.get("status"));
}

// Load a specific checkpoint
if let Some(checkpoint) = checkpointer.load_by_id(&checkpoint_id).await? {
    println!("State at step {}: {:?}", checkpoint.step, checkpoint.state);
}

मानव-इन-द-लूप

डायनामिक इंटरप्ट का उपयोग करके मानव अनुमोदन के लिए निष्पादन को रोकें:

use adk_graph::{error::GraphError, node::NodeOutput};

// Planner Agent जोखिम का आकलन करता है
let planner_node = AgentNode::new(planner_agent)
    .with_output_mapper(|events| {
        let mut updates = std::collections::HashMap::new();
        for event in events {
            if let Some(content) = event.content() {
                let text: String = content.parts.iter()
                    .filter_map(|p| p.text())
                    .collect::<Vec<_>>()
                    .join("");

                // LLM प्रतिक्रिया से जोखिम स्तर निकालें
                let risk = if text.to_lowercase().contains("risk: high") { "high" }
                    else if text.to_lowercase().contains("risk: medium") { "medium" }
                    else { "low" };

                updates.insert("plan".to_string(), json!(text));
                updates.insert("risk_level".to_string(), json!(risk));
            }
        }
        updates
    });

// Review node with dynamic interrupt
let graph = StateGraph::with_channels(&["task", "plan", "risk_level", "approved", "result"])
    .add_node(planner_node)
    .add_node(executor_node)
    .add_node_fn("review", |ctx| async move {
        let risk = ctx.get("risk_level").and_then(|v| v.as_str()).unwrap_or("low");
        let approved = ctx.get("approved").and_then(|v| v.as_bool());

        // पहले से अनुमोदित - जारी रखें
        if approved == Some(true) {
            return Ok(NodeOutput::new());
        }

        // उच्च/मध्यम जोखिम - अनुमोदन के लिए इंटरप्ट करें
        if risk == "high" || risk == "medium" {
            return Ok(NodeOutput::interrupt_with_data(
                &format!("{} RISK: Human approval required", risk.to_uppercase()),
                json!({
                    "plan": ctx.get("plan"),
                    "risk_level": risk,
                    "action": "Set 'approved' to true to continue"
                })
            ));
        }

        // कम जोखिम - स्वतः-अनुमोदित करें
        Ok(NodeOutput::new().with_update("approved", json!(true)))
    })
    .add_edge(START, "planner")
    .add_edge("planner", "review")
    .add_edge("review", "executor")
    .add_edge("executor", END)
    .compile()?
    .with_checkpointer_arc(checkpointer.clone());

// निष्पादित करें - अनुमोदन के लिए रुक सकता है
let thread_id = "task-001";
let result = graph.invoke(input, ExecutionConfig::new(thread_id)).await;

match result {
    Err(GraphError::Interrupted(interrupt)) => {
        println!("*** EXECUTION PAUSED ***");
        println!("Reason: {}", interrupt.interrupt);
        println!("Plan awaiting approval: {:?}", interrupt.state.get("plan"));

        // मानव समीक्षा करता है और अनुमोदित करता है...

        // अनुमोदन के साथ स्थिति अपडेट करें
        graph.update_state(thread_id, [("approved".to_string(), json!(true))]).await?;

        // निष्पादन फिर से शुरू करें
        let final_result = graph.invoke(State::new(), ExecutionConfig::new(thread_id)).await?;
        println!("Final result: {:?}", final_result.get("result"));
    }
    Ok(result) => {
        println!("Completed without interrupt: {:?}", result);
    }
    Err(e) => {
        println!("Error: {}", e);
    }
}

स्थैतिक इंटरप्ट

अनिवार्य ठहराव बिंदुओं के लिए interrupt_before या interrupt_after का उपयोग करें:

let graph = StateGraph::with_channels(&["task", "plan", "result"])
    .add_node(planner_node)
    .add_node(executor_node)
    .add_edge(START, "planner")
    .add_edge("planner", "executor")
    .add_edge("executor", END)
    .compile()?
    .with_interrupt_before(&["executor"]);  // निष्पादन से पहले हमेशा रोकें

स्ट्रीमिंग निष्पादन

ग्राफ के निष्पादन के दौरान इवेंट्स को स्ट्रीम करें:

use futures::StreamExt;
use adk_graph::stream::StreamMode;

let stream = agent.stream(input, config, StreamMode::Updates);

while let Some(event) = stream.next().await {
    match event? {
        StreamEvent::NodeStart(name) => println!("Starting: {}", name),
        StreamEvent::Updates { node, updates } => {
            println!("{} updated state: {:?}", node, updates);
        }
        StreamEvent::NodeEnd(name) => println!("Completed: {}", name),
        StreamEvent::Done(state) => println!("Final state: {:?}", state),
        _ => {}
    }
}

स्ट्रीम मोड

Modeविवरण
Valuesप्रत्येक नोड के बाद पूर्ण स्थिति स्ट्रीम करें
Updatesकेवल स्थिति परिवर्तनों को स्ट्रीम करें
Messagesसंदेश-प्रकार के अपडेट स्ट्रीम करें
Debugसभी आंतरिक इवेंट्स स्ट्रीम करें

ADK एकीकरण

GraphAgent ADK Agent ट्रेट को लागू करता है, इसलिए यह इसके साथ काम करता है:

  • Runner: मानक निष्पादन के लिए adk-runner के साथ उपयोग करें
  • Callbacks: before/after कॉलबैक के लिए पूर्ण समर्थन
  • Sessions: बातचीत के इतिहास के लिए adk-session के साथ काम करता है
  • Streaming: ADK EventStream लौटाता है
use adk_runner::Runner;

let graph_agent = GraphAgent::builder("workflow")
    .before_agent_callback(|ctx| async {
        println!("Starting graph execution for session: {}", ctx.session_id());
        Ok(())
    })
    .after_agent_callback(|ctx, event| async {
        if let Some(content) = event.content() {
            println!("Graph completed with content");
        }
        Ok(())
    })
    // ... graph definition
    .build()?;

// GraphAgent implements Agent trait - use with Launcher or Runner
// See adk-runner README for Runner configuration

उदाहरण

सभी उदाहरणों में AgentNode के साथ वास्तविक LLM एकीकरण का उपयोग किया गया है:

# Parallel LLM agents with before/after callbacks
cargo run --example graph_agent

# Sequential multi-agent pipeline (extractor → analyzer → formatter)
cargo run --example graph_workflow

# LLM-based sentiment classification and conditional routing
cargo run --example graph_conditional

# ReAct pattern with tools and cyclic execution
cargo run --example graph_react

# Multi-agent supervisor routing to specialists
cargo run --example graph_supervisor

# Human-in-the-loop with risk-based interrupts
cargo run --example graph_hitl

# Checkpointing and time travel debugging
cargo run --example graph_checkpoint

LangGraph के साथ तुलना

विशेषताLangGraphadk-graph
स्थिति प्रबंधनTypedDict + ReducersStateSchema + Reducers
निष्पादन मॉडलPregel super-stepsPregel super-steps
चेकपॉइंटिंगMemory, SQLite, PostgresMemory, SQLite
ह्यूमन-इन-लूपinterrupt_before/afterinterrupt_before/after + dynamic
स्ट्रीमिंग5 modes5 modes
चक्रदेशी समर्थनदेशी समर्थन
टाइप सुरक्षाPython typingRust type system
LLM एकीकरणLangChainAgentNode + ADK agents

पिछला: ← मल्टी-एजेंट सिस्टम | अगला: रीयलटाइम एजेंट →