Skip to main content

Direct API Integration

Build custom agents using Glean's REST APIs with our official client libraries. This approach gives you full control over the agent logic while leveraging Glean's search and chat capabilities.

When to Use

  • Custom business logic requirements
  • Integration with existing systems
  • Fine-grained control over agent behavior
  • Multi-language support needs

Available Client Libraries

Key APIs for Agent Building

APIPurposeUse Case
Agents APIManage and execute pre-built agentsUse agents created in Agent Builder
Chat APIConversational AI with contextBuild chat-based agents
Search APIEnterprise search capabilitiesRetrieve relevant documents
Summarization APIAI-powered content summarizationProcess and digest information

Example: Python Agent

import os
from glean.api_client import Glean, models

class CustomAgent:
def __init__(self):
self.api_token = os.getenv("GLEAN_API_TOKEN")
self.instance = os.getenv("GLEAN_INSTANCE")

if not self.api_token or not self.instance:
raise ValueError("GLEAN_API_TOKEN and GLEAN_INSTANCE must be set")

def search_documents(self, query: str, page_size: int = 5):
"""Search for relevant documents"""
with Glean(api_token=self.api_token, instance=self.instance) as glean:
try:
response = glean.client.search.search(
query=query,
page_size=page_size
)
return response.results if hasattr(response, 'results') else []
except Exception as e:
print(f"Search error: {e}")
return []

def chat_with_context(self, query: str, search_results=None) -> str:
"""Generate response using chat API"""
with Glean(api_token=self.api_token, instance=self.instance) as glean:
try:
# Prepare context from search results
context = ""
if search_results:
context = "\n".join([
f"- {result.get('title', '')}: {result.get('snippet', '')}"
for result in search_results[:3]
])

# Prepare the message with context
message_text = query
if context:
message_text = f"Context:\n{context}\n\nQuestion: {query}"

response = glean.client.chat.create(
messages=[
{
"fragments": [
models.ChatMessageFragment(text=message_text)
]
}
]
)

# Extract response text
if response.messages:
last_message = response.messages[-1]
if hasattr(last_message, 'fragments') and last_message.fragments:
for fragment in last_message.fragments:
if hasattr(fragment, 'text') and fragment.text:
return fragment.text

return "No response received"

except Exception as e:
return f"Chat error: {e}"

def process_query(self, query: str) -> dict:
"""Process a query by searching and then chatting"""
# First search for relevant documents
search_results = self.search_documents(query)

# Then use chat to answer based on search results
answer = self.chat_with_context(query, search_results)

return {
"answer": answer,
"sources": search_results[:3],
"total_sources": len(search_results)
}

# Usage
agent = CustomAgent()
result = agent.process_query("What is our vacation policy?")
print(f"Answer: {result['answer']}")
print(f"Found {result['total_sources']} relevant documents")

Best Practices

Error Handling

from glean.api_client.errors import GleanError
import time

def robust_agent_call(agent, query, max_retries=3):
"""Execute agent call with retry logic"""
for attempt in range(max_retries):
try:
return agent.process_query(query)
except GleanError as e:
if hasattr(e, 'status_code') and e.status_code == 429: # Rate limited
wait_time = 2 ** attempt # Exponential backoff
print(f"Rate limited, waiting {wait_time} seconds...")
time.sleep(wait_time)
elif attempt == max_retries - 1:
return {"answer": f"Error after {max_retries} attempts: {e}", "sources": [], "total_sources": 0}
else:
print(f"Attempt {attempt + 1} failed: {e}")
except Exception as e:
if attempt == max_retries - 1:
return {"answer": f"Unexpected error: {e}", "sources": [], "total_sources": 0}

return {"answer": "Failed to get response", "sources": [], "total_sources": 0}

Caching Results

import hashlib
import json
from datetime import datetime, timedelta

class CachedAgent(CustomAgent):
def __init__(self, cache_ttl_minutes=30):
super().__init__()
self._cache = {}
self.cache_ttl = timedelta(minutes=cache_ttl_minutes)

def _cache_key(self, query: str, filters: dict = None) -> str:
"""Generate cache key from query and filters"""
cache_data = {"query": query, "filters": filters or {}}
cache_string = json.dumps(cache_data, sort_keys=True)
return hashlib.md5(cache_string.encode()).hexdigest()

def _is_cache_valid(self, timestamp: datetime) -> bool:
"""Check if cache entry is still valid"""
return datetime.now() - timestamp < self.cache_ttl

def process_query(self, query: str, filters: dict = None) -> dict:
cache_key = self._cache_key(query, filters)

# Check cache
if cache_key in self._cache:
cached_result, timestamp = self._cache[cache_key]
if self._is_cache_valid(timestamp):
print("Returning cached result")
return cached_result

# Process query
result = super().process_query(query)

# Cache result
self._cache[cache_key] = (result, datetime.now())

return result

Rate Limiting

import time
from datetime import datetime, timedelta
from collections import deque

class RateLimitedAgent(CustomAgent):
def __init__(self, requests_per_minute: int = 60):
super().__init__()
self.requests_per_minute = requests_per_minute
self.request_times = deque()

def _wait_if_needed(self):
"""Implement rate limiting"""
now = datetime.now()

# Remove requests older than 1 minute
while self.request_times and now - self.request_times[0] > timedelta(minutes=1):
self.request_times.popleft()

# Check if we need to wait
if len(self.request_times) >= self.requests_per_minute:
sleep_time = 60 - (now - self.request_times[0]).total_seconds()
if sleep_time > 0:
print(f"Rate limit reached, waiting {sleep_time:.1f} seconds...")
time.sleep(sleep_time)

# Record this request
self.request_times.append(now)

def process_query(self, query: str) -> dict:
self._wait_if_needed()
return super().process_query(query)

Next Steps