Agent Hooks
Hooks provide lifecycle integration points to react to events during request processing. Hooks can be defined in skills or as standalone functions.
Hooks are executed in priority order (lower numbers first) and receive the unified request context. Keep hooks small and deterministic; avoid blocking operations and always return the context.
Hook Types
Skill Hooks
Defined within skills using the @hook decorator:
from robutler.agents.skills import Skill
from robutler.agents.skills.decorators import hook
class MySkill(Skill):
@hook("on_connection", priority=10)
async def setup_request(self, context):
"""Called when request starts"""
context["custom_data"] = "value"
return context
Standalone Hooks
Decorated functions that can be passed to agents:
from robutler.agents.skills.decorators import hook
from robutler.agents import BaseAgent
@hook("on_message", priority=5)
async def log_messages(context):
"""Log all messages"""
print(f"Message: {context.messages[-1]}")
return context
@hook("on_connection")
async def setup_analytics(context):
"""Initialize analytics tracking"""
context["session_start"] = time.time()
return context
# Pass to agent
agent = BaseAgent(
name="my-agent",
model="openai/gpt-4o",
hooks=[log_messages, setup_analytics]
)
## Available Hooks
### on_connection
Called when a new request connection is established.
Typical responsibilities:
- Authentication and identity extraction (e.g., `AuthSkill`)
- Payment token validation and minimum balance checks (e.g., `PaymentSkill`)
- Request-scoped initialization (timers, correlation IDs)
```python
@hook("on_connection")
async def on_connection(self, context):
"""Initialize request processing"""
# Access context data
user_id = context.peer_user_id
is_streaming = context.stream
# Set up request-specific state
context["request_start"] = time.time()
return context
on_message
Called for each message in the conversation.
Typical responsibilities: - Lightweight analytics and message enrichment - Intent detection, entity extraction - Safety checks for input/output
@hook("on_message")
async def on_message(self, context):
"""Process each message"""
# Get current message
message = context.messages[-1]
if message["role"] == "user":
# Analyze user input
context["intent"] = self.analyze_intent(message["content"])
return context
before_toolcall
Called before executing a tool.
Typical responsibilities: - Security and scope checks - Argument validation/normalization - Rate limiting and auditing
@hook("before_toolcall", priority=1)
async def before_toolcall(self, context):
"""Validate tool execution"""
tool_call = context["tool_call"]
function_name = tool_call["function"]["name"]
# Security check
if not self.is_tool_allowed(function_name, context.peer_user_id):
# Modify tool call to safe alternative
context["tool_call"]["function"]["name"] = "tool_blocked"
context["tool_call"]["function"]["arguments"] = "{}"
return context
after_toolcall
Called after tool execution completes.
Typical responsibilities: - Post-processing tool results - Adding usage metadata for priced tools - Observability metrics
@hook("after_toolcall")
async def after_toolcall(self, context):
"""Process tool results"""
tool_result = context["tool_result"]
tool_name = context["tool_call"]["function"]["name"]
# Log usage
await self.log_tool_usage(
tool=tool_name,
result_size=len(tool_result),
user=context.peer_user_id
)
# Enhance result
if tool_name == "search":
context["tool_result"] = self.format_search_results(tool_result)
return context
on_chunk
Called for each streaming chunk.
Typical responsibilities: - Realtime content filtering - Inline analytics/telemetry
@hook("on_chunk")
async def on_chunk(self, context):
"""Process streaming chunks"""
chunk = context["chunk"]
content = context.get("content", "")
# Real-time content analysis
if self.contains_sensitive_info(content):
# Redact sensitive content
context["chunk"]["choices"][0]["delta"]["content"] = "[REDACTED]"
# Track streaming metrics
context["chunks_processed"] = context.get("chunks_processed", 0) + 1
return context
before_handoff
Called before handing off to another agent.
@hook("before_handoff")
async def before_handoff(self, context):
"""Prepare for agent handoff"""
target_agent = context["handoff_agent"]
# Add handoff metadata
context["handoff_metadata"] = {
"source_agent": context.agent_name,
"timestamp": time.time(),
"reason": context.get("handoff_reason")
}
# Validate handoff
if not self.can_handoff_to(target_agent):
raise HandoffError(f"Cannot handoff to {target_agent}")
return context
after_handoff
Called after handoff completes.
@hook("after_handoff")
async def after_handoff(self, context):
"""Process handoff results"""
handoff_result = context["handoff_result"]
# Log handoff
await self.log_handoff(
target=context["handoff_agent"],
success=handoff_result.get("success"),
duration=time.time() - context["handoff_metadata"]["timestamp"]
)
return context
finalize_connection
Called when request processing completes.
@hook("finalize_connection")
async def finalize_connection(self, context):
"""Clean up and finalize"""
# Calculate metrics
duration = time.time() - context.get("request_start", time.time())
# Log final metrics
await self.log_request_complete(
request_id=context.completion_id,
duration=duration,
tokens=context.get("usage", {}),
chunks=context.get("chunks_processed", 0)
)
# Clean up resources
self.cleanup_request_resources(context.completion_id)
return context
Hook Priority
Hooks execute in priority order (lower numbers first):
class SecuritySkill(Skill):
@hook("on_message", priority=1) # Runs first
async def security_check(self, context):
return context
class LoggingSkill(Skill):
@hook("on_message", priority=10) # Runs second
async def log_message(self, context):
return context
class AnalyticsSkill(Skill):
@hook("on_message", priority=20) # Runs third
async def analyze_message(self, context):
return context
Context Object
The context object provides access to:
context = {
# Request data
"messages": List[Dict], # Conversation messages
"stream": bool, # Streaming enabled
"peer_user_id": str, # User identifier
"completion_id": str, # Request ID
"model": str, # Model name
# Agent data
"agent_name": str, # Agent name
"agent_skills": Dict[str, Skill], # Active skills
# Execution state
"usage": Dict, # Token usage
"tool_calls": List, # Tool executions
# Hook-specific data
"tool_call": Dict, # Current tool (before/after_toolcall)
"tool_result": str, # Tool result (after_toolcall)
"chunk": Dict, # Current chunk (on_chunk)
"content": str, # Chunk content (on_chunk)
# Custom data
**custom_fields # Any custom fields added by hooks
}
Practical Examples
Rate Limiting
class RateLimitSkill(Skill):
def __init__(self, config=None):
super().__init__(config)
self.request_counts = {}
@hook("on_connection", priority=1)
async def check_rate_limit(self, context):
user_id = context.peer_user_id
# Check rate limit
count = self.request_counts.get(user_id, 0)
if count >= 100: # 100 requests per hour
raise RateLimitError("Rate limit exceeded")
# Increment counter
self.request_counts[user_id] = count + 1
return context
Content Moderation
class ModerationSkill(Skill):
@hook("on_message", priority=5)
async def moderate_input(self, context):
"""Filter inappropriate content"""
message = context.messages[-1]
if message["role"] == "user":
# Check content
if self.is_inappropriate(message["content"]):
# Replace with safe message
context.messages[-1]["content"] = "I cannot process inappropriate content."
return context
@hook("on_chunk", priority=5)
async def moderate_output(self, context):
"""Filter streaming output"""
content = context.get("content", "")
if self.is_inappropriate(content):
# Replace chunk content
context["chunk"]["choices"][0]["delta"]["content"] = ""
return context
Analytics Collection
class AnalyticsSkill(Skill):
@hook("on_connection")
async def start_analytics(self, context):
context["analytics"] = {
"start_time": time.time(),
"events": []
}
return context
@hook("on_message")
async def track_message(self, context):
context["analytics"]["events"].append({
"type": "message",
"role": context.messages[-1]["role"],
"timestamp": time.time()
})
return context
@hook("before_toolcall")
async def track_tool_start(self, context):
context["tool_start_time"] = time.time()
return context
@hook("after_toolcall")
async def track_tool_end(self, context):
duration = time.time() - context.get("tool_start_time", time.time())
context["analytics"]["events"].append({
"type": "tool",
"name": context["tool_call"]["function"]["name"],
"duration": duration,
"timestamp": time.time()
})
return context
@hook("finalize_connection")
async def send_analytics(self, context):
analytics = context.get("analytics", {})
analytics["total_duration"] = time.time() - analytics.get("start_time", time.time())
await self.send_to_analytics_service(analytics)
return context
Best Practices
- Always Return Context - Hooks must return the context object
- Use Priorities Wisely - Order matters for dependent operations
- Handle Errors Gracefully - Don't break the request flow
- Keep Hooks Lightweight - Avoid heavy processing
- Use Context for State - Don't use instance variables for request state