Direct API Integration
Build custom agents using Glean's REST APIs with our official client libraries. This approach gives you full control over the agent logic while leveraging Glean's search and chat capabilities.
When to Use
- Custom business logic requirements
- Integration with existing systems
- Fine-grained control over agent behavior
- Multi-language support needs
Available Client Libraries
Python SDK
pip install glean-api-client
TypeScript SDK
npm install @gleanwork/api-client
Go SDK
go get github.com/gleanwork/api-client-go
Java SDK
<dependency>
<groupId>com.glean.api-client</groupId>
<artifactId>glean-api-client</artifactId>
<version>0.x.x</version>
</dependency>
Key APIs for Agent Building
API | Purpose | Use Case |
---|---|---|
Agents API | Manage and execute pre-built agents | Use agents created in Agent Builder |
Chat API | Conversational AI with context | Build chat-based agents |
Search API | Enterprise search capabilities | Retrieve relevant documents |
Summarization API | AI-powered content summarization | Process and digest information |
Example: Python Agent
- Basic Agent
- Advanced Agent
import os
from glean.api_client import Glean, models
class CustomAgent:
def __init__(self):
self.api_token = os.getenv("GLEAN_API_TOKEN")
self.instance = os.getenv("GLEAN_INSTANCE")
if not self.api_token or not self.instance:
raise ValueError("GLEAN_API_TOKEN and GLEAN_INSTANCE must be set")
def search_documents(self, query: str, page_size: int = 5):
"""Search for relevant documents"""
with Glean(api_token=self.api_token, instance=self.instance) as glean:
try:
response = glean.client.search.search(
query=query,
page_size=page_size
)
return response.results if hasattr(response, 'results') else []
except Exception as e:
print(f"Search error: {e}")
return []
def chat_with_context(self, query: str, search_results=None) -> str:
"""Generate response using chat API"""
with Glean(api_token=self.api_token, instance=self.instance) as glean:
try:
# Prepare context from search results
context = ""
if search_results:
context = "\n".join([
f"- {result.get('title', '')}: {result.get('snippet', '')}"
for result in search_results[:3]
])
# Prepare the message with context
message_text = query
if context:
message_text = f"Context:\n{context}\n\nQuestion: {query}"
response = glean.client.chat.create(
messages=[
{
"fragments": [
models.ChatMessageFragment(text=message_text)
]
}
]
)
# Extract response text
if response.messages:
last_message = response.messages[-1]
if hasattr(last_message, 'fragments') and last_message.fragments:
for fragment in last_message.fragments:
if hasattr(fragment, 'text') and fragment.text:
return fragment.text
return "No response received"
except Exception as e:
return f"Chat error: {e}"
def process_query(self, query: str) -> dict:
"""Process a query by searching and then chatting"""
# First search for relevant documents
search_results = self.search_documents(query)
# Then use chat to answer based on search results
answer = self.chat_with_context(query, search_results)
return {
"answer": answer,
"sources": search_results[:3],
"total_sources": len(search_results)
}
# Usage
agent = CustomAgent()
result = agent.process_query("What is our vacation policy?")
print(f"Answer: {result['answer']}")
print(f"Found {result['total_sources']} relevant documents")
import os
from glean.api_client import Glean, models
from typing import List, Dict, Optional
class AdvancedAgent:
def __init__(self):
self.api_token = os.getenv("GLEAN_API_TOKEN")
self.instance = os.getenv("GLEAN_INSTANCE")
self.conversation_history = []
if not self.api_token or not self.instance:
raise ValueError("GLEAN_API_TOKEN and GLEAN_INSTANCE must be set")
def search_with_filters(self, query: str, filters: Optional[Dict] = None, page_size: int = 10) -> List[Dict]:
"""Search with optional filters"""
with Glean(api_token=self.api_token, instance=self.instance) as glean:
try:
search_params = {
"query": query,
"page_size": page_size
}
if filters:
search_params["request_options"] = {"facet_filters": filters}
response = glean.client.search.search(**search_params)
return response.results if hasattr(response, 'results') else []
except Exception as e:
print(f"Search error: {e}")
return []
def chat_with_context(self, query: str, context_docs: Optional[List[Dict]] = None, save_to_history: bool = True) -> str:
"""Chat with document context and conversation history"""
with Glean(api_token=self.api_token, instance=self.instance) as glean:
try:
# Build message with context
message_parts = []
if context_docs:
context_text = "\n".join([
f"- {doc.get('title', 'Unknown')}: {doc.get('snippet', '')}"
for doc in context_docs[:3]
if doc.get('snippet')
])
if context_text:
message_parts.append(f"Relevant context:\n{context_text}")
message_parts.append(f"Question: {query}")
full_message = "\n\n".join(message_parts)
# Include conversation history if available
messages = []
for hist_msg in self.conversation_history[-4:]: # Last 4 messages for context
messages.append(hist_msg)
# Add current message
current_message = {
"fragments": [models.ChatMessageFragment(text=full_message)]
}
messages.append(current_message)
response = glean.client.chat.create(messages=messages)
# Extract response
answer = "No response received"
if response.messages:
last_message = response.messages[-1]
if hasattr(last_message, 'fragments') and last_message.fragments:
for fragment in last_message.fragments:
if hasattr(fragment, 'text') and fragment.text:
answer = fragment.text
break
# Save to conversation history
if save_to_history:
self.conversation_history.append(current_message)
self.conversation_history.append({
"fragments": [models.ChatMessageFragment(text=answer)]
})
# Keep only last 10 messages
if len(self.conversation_history) > 10:
self.conversation_history = self.conversation_history[-10:]
return answer
except Exception as e:
return f"Chat error: {e}"
def summarize_documents(self, documents: List[Dict]) -> str:
"""Summarize a list of documents"""
with Glean(api_token=self.api_token, instance=self.instance) as glean:
try:
doc_texts = []
for doc in documents[:5]: # Limit to 5 docs
title = doc.get('title', 'Unknown Document')
snippet = doc.get('snippet', '')
if snippet:
doc_texts.append(f"{title}: {snippet}")
if not doc_texts:
return "No documents to summarize"
content_to_summarize = "\n\n".join(doc_texts)
response = glean.client.summarize.summarize(
text=content_to_summarize,
summary_type="brief"
)
return response.summary if hasattr(response, 'summary') else "Could not generate summary"
except Exception as e:
return f"Summarization error: {e}"
def process_complex_query(self, query: str, department: Optional[str] = None, include_summary: bool = False) -> Dict:
"""Process query with department filtering and optional summarization"""
filters = {}
if department:
filters["department"] = [department]
# Search for documents
search_results = self.search_with_filters(query, filters)
# Generate answer with context
answer = self.chat_with_context(query, search_results)
result = {
"answer": answer,
"sources": search_results[:5],
"total_sources": len(search_results),
"department_filter": department
}
# Add summary if requested
if include_summary and search_results:
result["summary"] = self.summarize_documents(search_results)
return result
def reset_conversation(self):
"""Clear conversation history"""
self.conversation_history = []
# Usage
agent = AdvancedAgent()
# Basic query
result = agent.process_complex_query(
"What are the remote work guidelines?",
department="HR",
include_summary=True
)
print(f"Answer: {result['answer']}")
print(f"Summary: {result.get('summary', 'No summary available')}")
print(f"Found {result['total_sources']} sources")
# Follow-up question (uses conversation history)
followup = agent.process_complex_query("Are there any exceptions to these guidelines?")
print(f"Follow-up answer: {followup['answer']}")
Best Practices
Error Handling
from glean.api_client.errors import GleanError
import time
def robust_agent_call(agent, query, max_retries=3):
"""Execute agent call with retry logic"""
for attempt in range(max_retries):
try:
return agent.process_query(query)
except GleanError as e:
if hasattr(e, 'status_code') and e.status_code == 429: # Rate limited
wait_time = 2 ** attempt # Exponential backoff
print(f"Rate limited, waiting {wait_time} seconds...")
time.sleep(wait_time)
elif attempt == max_retries - 1:
return {"answer": f"Error after {max_retries} attempts: {e}", "sources": [], "total_sources": 0}
else:
print(f"Attempt {attempt + 1} failed: {e}")
except Exception as e:
if attempt == max_retries - 1:
return {"answer": f"Unexpected error: {e}", "sources": [], "total_sources": 0}
return {"answer": "Failed to get response", "sources": [], "total_sources": 0}
Caching Results
import hashlib
import json
from datetime import datetime, timedelta
class CachedAgent(CustomAgent):
def __init__(self, cache_ttl_minutes=30):
super().__init__()
self._cache = {}
self.cache_ttl = timedelta(minutes=cache_ttl_minutes)
def _cache_key(self, query: str, filters: dict = None) -> str:
"""Generate cache key from query and filters"""
cache_data = {"query": query, "filters": filters or {}}
cache_string = json.dumps(cache_data, sort_keys=True)
return hashlib.md5(cache_string.encode()).hexdigest()
def _is_cache_valid(self, timestamp: datetime) -> bool:
"""Check if cache entry is still valid"""
return datetime.now() - timestamp < self.cache_ttl
def process_query(self, query: str, filters: dict = None) -> dict:
cache_key = self._cache_key(query, filters)
# Check cache
if cache_key in self._cache:
cached_result, timestamp = self._cache[cache_key]
if self._is_cache_valid(timestamp):
print("Returning cached result")
return cached_result
# Process query
result = super().process_query(query)
# Cache result
self._cache[cache_key] = (result, datetime.now())
return result
Rate Limiting
import time
from datetime import datetime, timedelta
from collections import deque
class RateLimitedAgent(CustomAgent):
def __init__(self, requests_per_minute: int = 60):
super().__init__()
self.requests_per_minute = requests_per_minute
self.request_times = deque()
def _wait_if_needed(self):
"""Implement rate limiting"""
now = datetime.now()
# Remove requests older than 1 minute
while self.request_times and now - self.request_times[0] > timedelta(minutes=1):
self.request_times.popleft()
# Check if we need to wait
if len(self.request_times) >= self.requests_per_minute:
sleep_time = 60 - (now - self.request_times[0]).total_seconds()
if sleep_time > 0:
print(f"Rate limit reached, waiting {sleep_time:.1f} seconds...")
time.sleep(sleep_time)
# Record this request
self.request_times.append(now)
def process_query(self, query: str) -> dict:
self._wait_if_needed()
return super().process_query(query)
Next Steps
- Explore the Chat Guide for building conversational agents
- Check out Search Examples for advanced search capabilities
- See Agent Examples for real-world implementations
- Review the API Reference for complete API documentation