Overview
Proper query optimization is essential for building high-performance healthcare applications with the OMOPHub API. This guide covers best practices, caching strategies, and advanced techniques to achieve sub-200ms response times even with complex vocabulary operations.
Performance Target: 95th percentile response times under 200ms for all optimized queries
Response Time Breakdown
Understanding where time is spent in API requests helps identify optimization opportunities:
Total Response Time = Network Latency + API Processing + Database Query + Serialization
(10-50ms) + (20-100ms) + (50-150ms) + (5-20ms)
Query Type | Target Time | Optimization Level |
---|
Concept Lookup | < 50ms | Basic |
Simple Search | < 100ms | Standard |
Advanced Search | < 200ms | Optimized |
Complex Hierarchy | < 300ms | Advanced |
Batch Operations | < 500ms | Enterprise |
Basic Optimization Techniques
1. Use Specific Vocabularies
** Inefficient - searches all vocabularies:**
# Slow: searches across 100+ vocabularies
results = client.search_concepts({
"query": "diabetes",
"limit": 10
})
✅ Efficient - targeted vocabulary search:
# Fast: searches only relevant vocabularies
results = client.search_concepts({
"query": "diabetes",
"vocabularies": ["SNOMED", "ICD10CM"],
"limit": 10
})
2. Filter by Domain Early
❌ Inefficient:
# Searches all domains, then filters in application
results = client.search_concepts({
"query": "insulin",
"limit": 100
})
drug_concepts = [c for c in results.concepts if c.domain_id == "Drug"]
✅ Efficient:
# Filters at database level
results = client.search_concepts({
"query": "insulin",
"domains": ["Drug"],
"limit": 10
})
3. Use Appropriate Page Sizes
# Optimal page sizes for different use cases
OPTIMAL_LIMITS = {
"autocomplete": 10,
"search_results": 20,
"bulk_processing": 100,
"data_export": 1000
}
# Example: Autocomplete
suggestions = client.get_search_autocomplete({
"query": user_input,
"limit": OPTIMAL_LIMITS["autocomplete"]
})
Advanced Search Optimization
1. Leverage Search Facets
Use facets to understand result distribution before making expensive queries:
class OptimizedSearchService:
def __init__(self, client):
self.client = client
self.facet_cache = {}
def get_search_strategy(self, query: str) -> Dict[str, Any]:
"""Determine optimal search strategy based on facets"""
# Get facets to understand data distribution
facets = self.client.get_search_facets({
"query": query,
"vocabularies": ["SNOMED", "ICD10CM", "RxNorm"]
})
# Choose vocabularies with most relevant results
vocab_counts = {f["vocabulary_id"]: f["count"]
for f in facets["vocabularies"]}
# Select top 3 vocabularies by result count
optimal_vocabs = sorted(vocab_counts.items(),
key=lambda x: x[1], reverse=True)[:3]
return {
"vocabularies": [v[0] for v in optimal_vocabs],
"expected_results": sum(v[1] for v in optimal_vocabs),
"search_complexity": "low" if len(optimal_vocabs) <= 2 else "medium"
}
def optimized_search(self, query: str, **kwargs) -> Dict[str, Any]:
"""Perform optimized search with strategy selection"""
strategy = self.get_search_strategy(query)
search_params = {
"query": query,
"vocabularies": strategy["vocabularies"],
"limit": min(kwargs.get("limit", 20), 50), # Cap at 50 for performance
**kwargs
}
return self.client.advanced_search_concepts(search_params)
2. Implement Progressive Search
For complex queries, implement progressive search that starts narrow and expands:
async def progressive_search(client, query: str, target_results: int = 20):
"""Progressive search that expands scope until target results found"""
search_levels = [
# Level 1: Most specific
{
"vocabularies": ["SNOMED"],
"domains": ["Condition"],
"standard_concepts_only": True
},
# Level 2: Broader vocabularies
{
"vocabularies": ["SNOMED", "ICD10CM"],
"domains": ["Condition", "Procedure"],
"standard_concepts_only": True
},
# Level 3: All standard concepts
{
"vocabularies": ["SNOMED", "ICD10CM", "HCPCS"],
"standard_concepts_only": True
},
# Level 4: Include non-standard
{
"vocabularies": ["SNOMED", "ICD10CM", "HCPCS", "HCPCS"],
"include_invalid": False
}
]
all_results = []
for level, params in enumerate(search_levels):
search_params = {
"query": query,
"limit": target_results,
**params
}
try:
results = await client.advanced_search_concepts(search_params)
all_results.extend(results["concepts"])
# Stop if we have enough results
if len(all_results) >= target_results:
break
# Adjust next search to avoid duplicates
found_ids = {c["concept_id"] for c in all_results}
except Exception as e:
print(f"Search level {level} failed: {e}")
continue
# Deduplicate and return top results
unique_results = []
seen_ids = set()
for concept in all_results:
if concept["concept_id"] not in seen_ids:
unique_results.append(concept)
seen_ids.add(concept["concept_id"])
if len(unique_results) >= target_results:
break
return unique_results
Usage Example:
// Example: Using progressive search with proper async/await handling
async function searchForConditions() {
try {
// Call progressive_search with await
const results = await progressive_search(client, "diabetes mellitus", 20);
console.log(`Found ${results.length} concepts:`);
results.forEach((concept, index) => {
console.log(`${index + 1}. ${concept.concept_name} (${concept.concept_id})`);
});
return results;
} catch (error) {
console.error(`Progressive search failed: ${error.message}`);
// Handle specific error types
if (error.code === 'NETWORK_ERROR') {
console.log('Check your internet connection and try again');
} else if (error.code === 'API_ERROR') {
console.log('API service temporarily unavailable');
}
return [];
}
}
// Call the search function
searchForConditions()
.then(results => {
if (results.length > 0) {
console.log('Search completed successfully');
} else {
console.log('No results found or search failed');
}
});
Hierarchy Optimization
1. Limit Traversal Depth
❌ Inefficient - unlimited depth:
# Can traverse hundreds of levels
ancestors = client.get_concept_ancestors(320128)
✅ Efficient - limited depth:
# Limits to clinically relevant depth
ancestors = client.get_concept_ancestors(
320128,
max_levels=5, # Usually sufficient for clinical hierarchies
vocabulary_ids=["SNOMED"] # Focus on specific vocabulary
)
2. Use Batch Operations
❌ Inefficient - sequential requests:
# Makes N separate API calls
concept_ids = [201826, 320128, 432867, 4329847]
all_ancestors = []
for concept_id in concept_ids:
ancestors = client.get_concept_ancestors(concept_id)
all_ancestors.append(ancestors)
✅ Efficient - batch request:
# Single API call for multiple concepts
batch_request = {
"queries": [
{
"query_id": f"ancestors_{cid}",
"concept_id": cid,
"operation": "ancestors",
"max_levels": 5
}
for cid in concept_ids
]
}
batch_results = client.batch_hierarchy_queries(batch_request)
3. Optimize Path Finding
For concept path queries, use bidirectional search strategy:
def optimized_concept_path(client, source_id: int, target_id: int) -> Dict[str, Any]:
"""Find path using bidirectional search strategy"""
# Start with short path search
try:
path = client.get_concept_path(
source_id,
target_id,
max_depth=3, # Start with shallow search
shortest_path_only=True
)
if path["paths"]:
return path
except Exception:
pass
# If short path not found, try deeper search
try:
path = client.get_concept_path(
source_id,
target_id,
max_depth=6, # Deeper search
shortest_path_only=True
)
return path
except Exception as e:
# Fallback: check if concepts are in same vocabulary
source = client.get_concept(source_id)
target = client.get_concept(target_id)
if source["vocabulary_id"] != target["vocabulary_id"]:
return {
"error": "No path found - concepts in different vocabularies",
"suggestion": "Try using concept mapping instead"
}
raise e
Caching Strategies
1. Multi-Level Caching
Implement caching at multiple levels for optimal performance:
import redis
import json
from functools import wraps
from typing import Optional, Callable
class PerformanceOptimizedClient:
def __init__(self, api_key: str, redis_url: str = "redis://localhost:6379"):
self.client = OMOPHubClient(api_key=api_key)
self.redis = redis.from_url(redis_url)
self.memory_cache = {} # In-memory cache for hot data
# Cache TTL settings (in seconds)
self.cache_ttl = {
"concept": 86400, # 24 hours - concepts rarely change
"vocabulary": 3600, # 1 hour - vocabulary metadata
"search": 1800, # 30 minutes - search results
"hierarchy": 7200, # 2 hours - hierarchy data
"relationships": 3600 # 1 hour - relationship data
}
def cache_key(self, prefix: str, *args, **kwargs) -> str:
"""Generate consistent cache key"""
key_parts = [prefix] + [str(arg) for arg in args]
if kwargs:
key_parts.append(json.dumps(sorted(kwargs.items())))
return ":".join(key_parts)
def cached_request(self, cache_type: str):
"""Decorator for caching API requests"""
def decorator(func: Callable):
@wraps(func)
def wrapper(*args, **kwargs):
# Generate cache key
cache_key = self.cache_key(f"omophub:{cache_type}", *args, **kwargs)
# Check memory cache first (fastest)
if cache_key in self.memory_cache:
return self.memory_cache[cache_key]
# Check Redis cache (fast)
cached_result = self.redis.get(cache_key)
if cached_result:
result = json.loads(cached_result)
# Store in memory cache for next time
self.memory_cache[cache_key] = result
return result
# Call actual API (slowest)
result = func(*args, **kwargs)
# Cache the result
ttl = self.cache_ttl.get(cache_type, 3600)
self.redis.setex(cache_key, ttl, json.dumps(result))
self.memory_cache[cache_key] = result
return result
return wrapper
return decorator
@cached_request("concept")
def get_concept(self, concept_id: int) -> Dict[str, Any]:
"""Cached concept lookup"""
return self.client.get_concept(concept_id)
@cached_request("search")
def search_concepts(self, **params) -> Dict[str, Any]:
"""Cached concept search"""
return self.client.search_concepts(params)
@cached_request("hierarchy")
def get_concept_ancestors(self, concept_id: int, **params) -> Dict[str, Any]:
"""Cached ancestor lookup"""
return self.client.get_concept_ancestors(concept_id, **params)
2. Smart Cache Invalidation
class SmartCacheManager:
def __init__(self, redis_client):
self.redis = redis_client
def invalidate_concept_cache(self, concept_id: int):
"""Invalidate all cache entries related to a concept"""
patterns = [
f"omophub:concept:{concept_id}*",
f"omophub:hierarchy:*{concept_id}*",
f"omophub:relationships:{concept_id}*",
f"omophub:search:*" # Search cache may contain this concept
]
for pattern in patterns:
keys = self.redis.keys(pattern)
if keys:
self.redis.delete(*keys)
def warm_cache(self, frequently_used_concepts: List[int]):
"""Pre-warm cache with frequently used concepts"""
for concept_id in frequently_used_concepts:
try:
# Pre-load concept data
self.client.get_concept(concept_id)
# Pre-load common hierarchy queries
self.client.get_concept_ancestors(concept_id, max_levels=3)
self.client.get_concept_descendants(concept_id, max_levels=2)
except Exception as e:
print(f"Failed to warm cache for concept {concept_id}: {e}")
Database Query Optimization
1. Index-Aware Queries
Structure queries to leverage database indexes:
def index_optimized_search(client, query_params: Dict[str, Any]) -> Dict[str, Any]:
"""Structure search to use optimal database indexes"""
# Order filters by selectivity (most selective first)
optimized_params = {}
# 1. Concept ID filters (most selective)
if "concept_ids" in query_params:
optimized_params["concept_ids"] = query_params["concept_ids"]
# 2. Vocabulary filters (highly selective)
if "vocabularies" in query_params:
optimized_params["vocabularies"] = query_params["vocabularies"]
# 3. Domain filters (moderately selective)
if "domains" in query_params:
optimized_params["domains"] = query_params["domains"]
# 4. Standard concept filter (uses index)
if "standard_concepts_only" in query_params:
optimized_params["standard_concepts_only"] = query_params["standard_concepts_only"]
# 5. Text search (least selective, but uses full-text index)
if "query" in query_params:
optimized_params["query"] = query_params["query"]
# 6. Date filters (if needed)
if "date_range" in query_params:
optimized_params["date_range"] = query_params["date_range"]
return client.advanced_search_concepts(optimized_params)
2. Minimize Data Transfer
Request only needed fields to reduce network transfer:
def lightweight_search(client, query: str, **kwargs) -> List[Dict[str, Any]]:
"""Search with minimal data transfer"""
# Get basic results first
results = client.search_concepts({
"query": query,
"limit": kwargs.get("limit", 20),
**kwargs
})
# Return lightweight concept data
lightweight_concepts = []
for concept in results["concepts"]:
lightweight_concepts.append({
"concept_id": concept["concept_id"],
"concept_name": concept["concept_name"],
"concept_code": concept["concept_code"],
"vocabulary_id": concept["vocabulary_id"],
"domain_id": concept["domain_id"]
# Exclude heavy fields like synonyms, descriptions, etc.
})
return lightweight_concepts
def get_full_concept_details(client, concept_ids: List[int]) -> List[Dict[str, Any]]:
"""Get full details only when needed"""
# Use batch API for efficiency
batch_request = {
"concept_ids": concept_ids
}
return client.batch_get_concepts(batch_request)
Connection and Network Optimization
1. Connection Pooling
import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
class HighPerformanceClient(OMOPHubClient):
def __init__(self, api_key: str, **kwargs):
super().__init__(api_key, **kwargs)
# Configure connection pooling
retry_strategy = Retry(
total=3,
backoff_factor=0.3,
status_forcelist=[429, 500, 502, 503, 504],
)
adapter = HTTPAdapter(
pool_connections=20, # Number of connection pools
pool_maxsize=50, # Max connections per pool
max_retries=retry_strategy,
pool_block=False # Don't block on pool exhaustion
)
self.session.mount("http://", adapter)
self.session.mount("https://", adapter)
# Configure keep-alive
self.session.headers.update({
'Connection': 'keep-alive',
'Keep-Alive': 'timeout=30, max=100'
})
2. Request Compression
import gzip
import json
import requests
def compressed_request(url, data):
"""
Send compressed request with proper headers and body compression
"""
# Compress the request body
compressed_data = gzip.compress(json.dumps(data).encode('utf-8'))
# Set proper headers for compression
headers = {
'Content-Type': 'application/json',
'Content-Encoding': 'gzip', # Indicates compressed request body
'Accept-Encoding': 'gzip, deflate' # Accepts compressed responses
}
return requests.post(url, data=compressed_data, headers=headers)
# Example usage with OMOPHub API
def bulk_search_compressed(client, search_queries):
"""
Perform bulk search with request compression for large payloads
"""
bulk_request_data = {
"searches": search_queries,
"options": {
"include_facets": True,
"include_relationships": False
}
}
# Use compression for large bulk requests
api_url = f"{client.base_url}/concepts/search/bulk"
response = compressed_request(api_url, bulk_request_data)
return response.json()
1. Request Timing
import time
from contextlib import contextmanager
@contextmanager
def timing_context(operation_name: str):
"""Context manager for timing operations"""
start_time = time.time()
try:
yield
finally:
elapsed = (time.time() - start_time) * 1000 # Convert to milliseconds
print(f"{operation_name}: {elapsed:.2f}ms")
# Log slow queries
if elapsed > 200: # Threshold for slow queries
logger.warning(f"Slow query detected: {operation_name} took {elapsed:.2f}ms")
# Usage
with timing_context("Concept Search"):
results = client.search_concepts({"query": "diabetes"})
with timing_context("Hierarchy Traversal"):
ancestors = client.get_concept_ancestors(201826, max_levels=5)
class PerformanceTracker:
def __init__(self):
self.metrics = {
"request_count": 0,
"total_time": 0,
"slow_queries": 0,
"cache_hits": 0,
"cache_misses": 0
}
def track_request(self, operation: str, duration_ms: float, cache_hit: bool = False):
"""Track request performance metrics"""
self.metrics["request_count"] += 1
self.metrics["total_time"] += duration_ms
if duration_ms > 200:
self.metrics["slow_queries"] += 1
if cache_hit:
self.metrics["cache_hits"] += 1
else:
self.metrics["cache_misses"] += 1
def get_performance_report(self) -> Dict[str, Any]:
"""Generate performance report"""
if self.metrics["request_count"] == 0:
return {"message": "No requests tracked"}
total_cache_requests = self.metrics["cache_hits"] + self.metrics["cache_misses"]
cache_hit_rate = (self.metrics["cache_hits"] / total_cache_requests * 100) if total_cache_requests > 0 else 0
return {
"total_requests": self.metrics["request_count"],
"average_response_time": self.metrics["total_time"] / self.metrics["request_count"],
"slow_query_rate": (self.metrics["slow_queries"] / self.metrics["request_count"] * 100),
"cache_hit_rate": cache_hit_rate,
"recommendations": self._generate_recommendations()
}
def _generate_recommendations(self) -> List[str]:
"""Generate performance recommendations"""
recommendations = []
avg_time = self.metrics["total_time"] / self.metrics["request_count"]
slow_rate = self.metrics["slow_queries"] / self.metrics["request_count"] * 100
cache_hit_rate = self.metrics["cache_hits"] / (self.metrics["cache_hits"] + self.metrics["cache_misses"]) * 100
if avg_time > 150:
recommendations.append("Consider using more specific vocabulary filters")
if slow_rate > 10:
recommendations.append("High rate of slow queries - review query complexity")
if cache_hit_rate < 60:
recommendations.append("Low cache hit rate - consider cache warming strategies")
return recommendations
Best Practices Summary
Do’s
- Filter Early: Use vocabulary, domain, and concept class filters
- Limit Results: Use appropriate page sizes for your use case
- Cache Aggressively: Implement multi-level caching
- Batch Operations: Use batch APIs for multiple concepts
- Monitor Performance: Track response times and cache hit rates
- Use Specific Queries: Avoid broad, unfocused searches
Don’ts
- Unlimited Searches: Always set reasonable limits
- Sequential Requests: Use batch APIs instead
- Deep Hierarchies: Limit traversal depth appropriately
- Ignore Caching: Don’t make the same request multiple times
- Large Transfers: Don’t request unnecessary data
- Synchronous Processing: Use async/await for multiple requests
Next Steps