访问控制
adk-auth 为 AI 代理提供企业级访问控制。
概述
adk-auth 为 ADK 代理提供基于角色的访问控制(RBAC),支持审计日志和 SSO。它能够实现对哪些用户可以访问哪些 Tool 的安全、细粒度控制。
架构
┌─────────────────────────────────────────────────────────────────┐
│ Agent 请求 │
└─────────────────────────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────────┐
│ SSO 令牌验证 │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────────────────┐ │
│ │ Google │ │ Azure AD │ │ OIDC 发现 │ │
│ │ 提供商 │ │ 提供商 │ │ (Okta, Auth0 等) │ │
│ └─────────────┘ └─────────────┘ └─────────────────────────┘ │
│ │ │
│ ┌──────┴──────┐ │
│ │ JWKS 缓存 │ ← 自动刷新密钥 │
│ └─────────────┘ │
└─────────────────────────────────────────────────────────────────┘
│
▼ TokenClaims
┌─────────────────────────────────────────────────────────────────┐
│ 声明映射器 │
│ │
│ IdP Groups → adk-auth 角色 │
│ ───────────────────────────────────────── │
│ "AdminGroup" → "admin" │
│ "DataAnalysts" → "analyst" │
│ (默认) → "viewer" │
└─────────────────────────────────────────────────────────────────┘
│
▼ Roles
┌─────────────────────────────────────────────────────────────────┐
│ 访问控制 │
│ │
│ 角色: admin │
│ ├── 允许: AllTools │
│ └── 允许: AllAgents │
│ │
│ 角色: analyst │
│ ├── 允许: Tool("search") │
│ ├── 允许: Tool("summarize") │
│ └── 拒绝: Tool("code_exec") ← 拒绝优先 │
└─────────────────────────────────────────────────────────────────┘
│
▼ Check Result
┌─────────────────────────────────────────────────────────────────┐
│ 审计日志 │
│ │
│ {"user":"alice","resource":"search","outcome":"allowed"} │
│ {"user":"bob","resource":"exec","outcome":"denied"} │
└─────────────────────────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────────┐
│ 工具执行 │
│ (仅在授予访问权限时) │
└─────────────────────────────────────────────────────────────────┘
设计原则
1. 拒绝优先
当一个角色同时具有允许和拒绝规则时,拒绝总是优先:
let role = Role::new("limited")
.allow(Permission::AllTools) // 允许所有...
.deny(Permission::Tool("admin")); // ...除了 admin
// 结果:可以访问除了 "admin" 之外的任何 Tool
2. 多角色并集
拥有多个角色的用户将获得权限的并集,但任何角色的拒绝规则仍然适用:
let ac = AccessControl::builder()
.role(reader) // allow: search
.role(writer) // allow: write
.assign("alice", "reader")
.assign("alice", "writer")
.build()?;
// Alice 可以访问 "search" 和 "write"
3. 显式优于隐式
权限是显式的——默认情况下不授予任何访问权限:
let role = Role::new("empty");
// 这个角色不授予任何权限
ac.check("user", &Permission::Tool("anything")); // → 拒绝
4. 身份验证和授权分离
- 身份验证 (SSO):验证用户是谁(通过 JWT)
- 授权 (RBAC):确定他们可以访问什么
// 身份验证:验证 JWT,提取 claims
let claims = provider.validate(token).await?;
// 授权:检查特定权限
ac.check(&claims.sub, &Permission::Tool("search"))?;
// 组合:SsoAccessControl 同时执行这两项
sso.check_token(token, &permission).await?;
安装
[dependencies]
adk-auth = "0.2.0"
# 用于 SSO/OAuth 支持
adk-auth = { version = "0.2.0", features = ["sso"] }
核心组件
Permission
pub enum Permission {
Tool(String), // 按名称指定 Tool
AllTools, // 通配符:所有 Tool
Agent(String), // 按名称指定 Agent
AllAgents, // 通配符:所有 Agent
}
Role
let analyst = Role::new("analyst")
.allow(Permission::Tool("search".into()))
.allow(Permission::Tool("summarize".into()))
.deny(Permission::Tool("code_exec".into()));
AccessControl
let ac = AccessControl::builder()
.role(admin)
.role(analyst)
.assign("alice@company.com", "admin")
.assign("bob@company.com", "analyst")
.build()?;
// 检查权限
ac.check("bob@company.com", &Permission::Tool("search".into()))?;
ProtectedTool
用自动权限检查封装一个 Tool:
use adk_auth::ToolExt;
let protected = my_tool.with_access_control(Arc::new(ac));
// 执行时,在运行前检查权限
protected.execute(ctx, args).await?;
AuthMiddleware
批量保护多个 Tool:
let middleware = AuthMiddleware::new(ac);
let protected_tools = middleware.protect_all(tools);
SSO 集成
支持的提供商
| 提供商 | 构造函数 | 颁发者 |
|---|---|---|
GoogleProvider::new(client_id) | accounts.google.com | |
| Azure AD | AzureADProvider::new(tenant, client) | login.microsoftonline.com |
| Okta | OktaProvider::new(domain, client) | {domain}/oauth2/default |
| Auth0 | Auth0Provider::new(domain, audience) | {domain}/ |
| Generic | OidcProvider::from_discovery(issuer, client) | 任何 OIDC 提供商 |
TokenClaims
从已验证的 JWT 中提取的 Claims:
pub struct TokenClaims {
pub sub: String, // 主体(用户 ID)
pub email: Option<String>, // 电子邮件
pub name: Option<String>, // 显示名称
pub groups: Vec<String>, // IdP 群组
pub roles: Vec<String>, // IdP 角色
pub hd: Option<String>, // Google 托管域
pub tid: Option<String>, // Azure 租户 ID
// ... 更多标准 OIDC claims
}
ClaimsMapper
将 IdP 群组映射到 adk-auth 角色:
let mapper = ClaimsMapper::builder()
.map_group("AdminGroup", "admin")
.map_group("Users", "viewer")
.default_role("guest")
.user_id_from_email()
.build();
SsoAccessControl
在一个调用中结合 SSO 验证和 RBAC:
let sso = SsoAccessControl::builder()
.validator(GoogleProvider::new("client-id"))
.mapper(mapper)
.access_control(ac)
.audit_sink(audit)
.build()?;
// 验证令牌 + 检查权限 + 审计日志
let claims = sso.check_token(token, &Permission::Tool("search".into())).await?;
审计日志
FileAuditSink
let audit = FileAuditSink::new("/var/log/adk/audit.jsonl")?;
let middleware = AuthMiddleware::with_audit(ac, audit);
输出格式 (JSONL)
{"timestamp":"2025-01-01T10:30:00Z","user":"bob","session_id":"sess-123","event_type":"tool_access","resource":"search","outcome":"allowed"}
{"timestamp":"2025-01-01T10:30:01Z","user":"bob","session_id":"sess-123","event_type":"tool_access","resource":"code_exec","outcome":"denied"}
自定义审计 Sink
use adk_auth::{AuditSink, AuditEvent, AuthError};
use async_trait::async_trait;
pub struct DatabaseAuditSink { /* ... */ }
#[async_trait]
impl AuditSink for DatabaseAuditSink {
async fn log(&self, event: AuditEvent) -> Result<(), AuthError> {
// 插入到数据库
sqlx::query("INSERT INTO audit_log ...")
.bind(event.user)
.bind(event.resource)
.execute(&self.pool)
.await?;
Ok(())
}
}
示例
# 核心 RBAC
cargo run --example auth_basic # 基于角色的访问控制
cargo run --example auth_audit # 审计日志
# SSO (需要 --features sso)
cargo run --example auth_sso --features sso # 完整的 SSO 流程
cargo run --example auth_jwt --features sso # JWT 验证
cargo run --example auth_oidc --features sso # OIDC 发现
cargo run --example auth_google --features sso # Google 身份验证
安全最佳实践
| 实践 | 描述 |
|---|---|
| 默认拒绝 | 仅授予明确需要的权限 |
| 明确拒绝 | 为危险操作添加拒绝规则 |
| 审计所有内容 | 启用日志记录以符合规定 |
| 服务端验证 | 始终在服务器上验证 JWT |
| 使用 HTTPS | JWKS 端点需要安全连接 |
| 轮换密钥 | JWKS 缓存每小时自动刷新 |
| 限制令牌生命周期 | 使用短生命周期的访问令牌 |
错误处理
use adk_auth::{AccessDenied, AuthError};
use adk_auth::sso::TokenError;
// RBAC errors
match ac.check("user", &Permission::Tool("admin".into())) {
Ok(()) => { /* access granted */ }
Err(AccessDenied { user, permission }) => {
eprintln!("Denied: {} cannot access {}", user, permission);
}
}
// SSO errors
match provider.validate(token).await {
Ok(claims) => { /* token valid */ }
Err(TokenError::Expired) => { /* token expired */ }
Err(TokenError::InvalidSignature) => { /* signature invalid */ }
Err(TokenError::InvalidIssuer { expected, actual }) => { /* wrong issuer */ }
Err(e) => { /* other error */ }
}