Skip to main content

Overview

Proper query optimization is essential for building high-performance healthcare applications with the OMOPHub API. This guide covers best practices, caching strategies, and advanced techniques to achieve sub-200ms response times even with complex vocabulary operations.
Performance Target: 95th percentile response times under 200ms for all optimized queries

Query Performance Fundamentals

Response Time Breakdown

Understanding where time is spent in API requests helps identify optimization opportunities:
Total Response Time = Network Latency + API Processing + Database Query + Serialization
                      (10-50ms)     + (20-100ms)      + (50-150ms)     + (5-20ms)

Performance Tiers

Query TypeTarget TimeOptimization Level
Concept Lookup< 50msBasic
Simple Search< 100msStandard
Advanced Search< 200msOptimized
Complex Hierarchy< 300msAdvanced
Batch Operations< 500msEnterprise

Basic Optimization Techniques

1. Use Specific Vocabularies

** Inefficient - searches all vocabularies:**
# Slow: searches across 100+ vocabularies
results = client.search_concepts({
    "query": "diabetes",
    "limit": 10
})
✅ Efficient - targeted vocabulary search:
# Fast: searches only relevant vocabularies
results = client.search_concepts({
    "query": "diabetes", 
    "vocabularies": ["SNOMED", "ICD10CM"],
    "limit": 10
})

2. Filter by Domain Early

❌ Inefficient:
# Searches all domains, then filters in application
results = client.search_concepts({
    "query": "insulin",
    "limit": 100
})
drug_concepts = [c for c in results.concepts if c.domain_id == "Drug"]
✅ Efficient:
# Filters at database level
results = client.search_concepts({
    "query": "insulin",
    "domains": ["Drug"],
    "limit": 10
})

3. Use Appropriate Page Sizes

# Optimal page sizes for different use cases
OPTIMAL_LIMITS = {
    "autocomplete": 10,
    "search_results": 20,
    "bulk_processing": 100,
    "data_export": 1000
}

# Example: Autocomplete
suggestions = client.get_search_autocomplete({
    "query": user_input,
    "limit": OPTIMAL_LIMITS["autocomplete"]
})

Advanced Search Optimization

1. Leverage Search Facets

Use facets to understand result distribution before making expensive queries:
class OptimizedSearchService:
    def __init__(self, client):
        self.client = client
        self.facet_cache = {}
    
    def get_search_strategy(self, query: str) -> Dict[str, Any]:
        """Determine optimal search strategy based on facets"""
        
        # Get facets to understand data distribution
        facets = self.client.get_search_facets({
            "query": query,
            "vocabularies": ["SNOMED", "ICD10CM", "RxNorm"]
        })
        
        # Choose vocabularies with most relevant results
        vocab_counts = {f["vocabulary_id"]: f["count"] 
                       for f in facets["vocabularies"]}
        
        # Select top 3 vocabularies by result count
        optimal_vocabs = sorted(vocab_counts.items(), 
                              key=lambda x: x[1], reverse=True)[:3]
        
        return {
            "vocabularies": [v[0] for v in optimal_vocabs],
            "expected_results": sum(v[1] for v in optimal_vocabs),
            "search_complexity": "low" if len(optimal_vocabs) <= 2 else "medium"
        }
    
    def optimized_search(self, query: str, **kwargs) -> Dict[str, Any]:
        """Perform optimized search with strategy selection"""
        
        strategy = self.get_search_strategy(query)
        
        search_params = {
            "query": query,
            "vocabularies": strategy["vocabularies"],
            "limit": min(kwargs.get("limit", 20), 50),  # Cap at 50 for performance
            **kwargs
        }
        
        return self.client.advanced_search_concepts(search_params)
For complex queries, implement progressive search that starts narrow and expands:
async def progressive_search(client, query: str, target_results: int = 20):
    """Progressive search that expands scope until target results found"""
    
    search_levels = [
        # Level 1: Most specific
        {
            "vocabularies": ["SNOMED"],
            "domains": ["Condition"],
            "standard_concepts_only": True
        },
        # Level 2: Broader vocabularies
        {
            "vocabularies": ["SNOMED", "ICD10CM"],
            "domains": ["Condition", "Procedure"],
            "standard_concepts_only": True
        },
        # Level 3: All standard concepts
        {
            "vocabularies": ["SNOMED", "ICD10CM", "HCPCS"],
            "standard_concepts_only": True
        },
        # Level 4: Include non-standard
        {
            "vocabularies": ["SNOMED", "ICD10CM", "HCPCS", "HCPCS"],
            "include_invalid": False
        }
    ]
    
    all_results = []
    
    for level, params in enumerate(search_levels):
        search_params = {
            "query": query,
            "limit": target_results,
            **params
        }
        
        try:
            results = await client.advanced_search_concepts(search_params)
            all_results.extend(results["concepts"])
            
            # Stop if we have enough results
            if len(all_results) >= target_results:
                break
                
            # Adjust next search to avoid duplicates
            found_ids = {c["concept_id"] for c in all_results}
            
        except Exception as e:
            print(f"Search level {level} failed: {e}")
            continue
    
    # Deduplicate and return top results
    unique_results = []
    seen_ids = set()
    
    for concept in all_results:
        if concept["concept_id"] not in seen_ids:
            unique_results.append(concept)
            seen_ids.add(concept["concept_id"])
            
        if len(unique_results) >= target_results:
            break
    
    return unique_results
Usage Example:
// Example: Using progressive search with proper async/await handling
async function searchForConditions() {
    try {
        // Call progressive_search with await
        const results = await progressive_search(client, "diabetes mellitus", 20);
        
        console.log(`Found ${results.length} concepts:`);
        results.forEach((concept, index) => {
            console.log(`${index + 1}. ${concept.concept_name} (${concept.concept_id})`);
        });
        
        return results;
    } catch (error) {
        console.error(`Progressive search failed: ${error.message}`);
        
        // Handle specific error types
        if (error.code === 'NETWORK_ERROR') {
            console.log('Check your internet connection and try again');
        } else if (error.code === 'API_ERROR') {
            console.log('API service temporarily unavailable');
        }
        
        return [];
    }
}

// Call the search function
searchForConditions()
    .then(results => {
        if (results.length > 0) {
            console.log('Search completed successfully');
        } else {
            console.log('No results found or search failed');
        }
    });

Hierarchy Optimization

1. Limit Traversal Depth

❌ Inefficient - unlimited depth:
# Can traverse hundreds of levels
ancestors = client.get_concept_ancestors(320128)
✅ Efficient - limited depth:
# Limits to clinically relevant depth
ancestors = client.get_concept_ancestors(
    320128,
    max_levels=5,  # Usually sufficient for clinical hierarchies
    vocabulary_ids=["SNOMED"]  # Focus on specific vocabulary
)

2. Use Batch Operations

❌ Inefficient - sequential requests:
# Makes N separate API calls
concept_ids = [201826, 320128, 432867, 4329847]
all_ancestors = []

for concept_id in concept_ids:
    ancestors = client.get_concept_ancestors(concept_id)
    all_ancestors.append(ancestors)
✅ Efficient - batch request:
# Single API call for multiple concepts
batch_request = {
    "queries": [
        {
            "query_id": f"ancestors_{cid}",
            "concept_id": cid,
            "operation": "ancestors",
            "max_levels": 5
        }
        for cid in concept_ids
    ]
}

batch_results = client.batch_hierarchy_queries(batch_request)

3. Optimize Path Finding

For concept path queries, use bidirectional search strategy:
def optimized_concept_path(client, source_id: int, target_id: int) -> Dict[str, Any]:
    """Find path using bidirectional search strategy"""
    
    # Start with short path search
    try:
        path = client.get_concept_path(
            source_id, 
            target_id,
            max_depth=3,  # Start with shallow search
            shortest_path_only=True
        )
        
        if path["paths"]:
            return path
            
    except Exception:
        pass
    
    # If short path not found, try deeper search
    try:
        path = client.get_concept_path(
            source_id,
            target_id, 
            max_depth=6,  # Deeper search
            shortest_path_only=True
        )
        
        return path
        
    except Exception as e:
        # Fallback: check if concepts are in same vocabulary
        source = client.get_concept(source_id)
        target = client.get_concept(target_id)
        
        if source["vocabulary_id"] != target["vocabulary_id"]:
            return {
                "error": "No path found - concepts in different vocabularies",
                "suggestion": "Try using concept mapping instead"
            }
        
        raise e

Caching Strategies

1. Multi-Level Caching

Implement caching at multiple levels for optimal performance:
import redis
import json
from functools import wraps
from typing import Optional, Callable

class PerformanceOptimizedClient:
    def __init__(self, api_key: str, redis_url: str = "redis://localhost:6379"):
        self.client = OMOPHubClient(api_key=api_key)
        self.redis = redis.from_url(redis_url)
        self.memory_cache = {}  # In-memory cache for hot data
        
        # Cache TTL settings (in seconds)
        self.cache_ttl = {
            "concept": 86400,      # 24 hours - concepts rarely change
            "vocabulary": 3600,    # 1 hour - vocabulary metadata
            "search": 1800,        # 30 minutes - search results
            "hierarchy": 7200,     # 2 hours - hierarchy data
            "relationships": 3600   # 1 hour - relationship data
        }
    
    def cache_key(self, prefix: str, *args, **kwargs) -> str:
        """Generate consistent cache key"""
        key_parts = [prefix] + [str(arg) for arg in args]
        if kwargs:
            key_parts.append(json.dumps(sorted(kwargs.items())))
        return ":".join(key_parts)
    
    def cached_request(self, cache_type: str):
        """Decorator for caching API requests"""
        def decorator(func: Callable):
            @wraps(func)
            def wrapper(*args, **kwargs):
                # Generate cache key
                cache_key = self.cache_key(f"omophub:{cache_type}", *args, **kwargs)
                
                # Check memory cache first (fastest)
                if cache_key in self.memory_cache:
                    return self.memory_cache[cache_key]
                
                # Check Redis cache (fast)
                cached_result = self.redis.get(cache_key)
                if cached_result:
                    result = json.loads(cached_result)
                    # Store in memory cache for next time
                    self.memory_cache[cache_key] = result
                    return result
                
                # Call actual API (slowest)
                result = func(*args, **kwargs)
                
                # Cache the result
                ttl = self.cache_ttl.get(cache_type, 3600)
                self.redis.setex(cache_key, ttl, json.dumps(result))
                self.memory_cache[cache_key] = result
                
                return result
            
            return wrapper
        return decorator
    
    @cached_request("concept")
    def get_concept(self, concept_id: int) -> Dict[str, Any]:
        """Cached concept lookup"""
        return self.client.get_concept(concept_id)
    
    @cached_request("search")
    def search_concepts(self, **params) -> Dict[str, Any]:
        """Cached concept search"""
        return self.client.search_concepts(params)
    
    @cached_request("hierarchy")
    def get_concept_ancestors(self, concept_id: int, **params) -> Dict[str, Any]:
        """Cached ancestor lookup"""
        return self.client.get_concept_ancestors(concept_id, **params)

2. Smart Cache Invalidation

class SmartCacheManager:
    def __init__(self, redis_client):
        self.redis = redis_client
        
    def invalidate_concept_cache(self, concept_id: int):
        """Invalidate all cache entries related to a concept"""
        patterns = [
            f"omophub:concept:{concept_id}*",
            f"omophub:hierarchy:*{concept_id}*",
            f"omophub:relationships:{concept_id}*",
            f"omophub:search:*"  # Search cache may contain this concept
        ]
        
        for pattern in patterns:
            keys = self.redis.keys(pattern)
            if keys:
                self.redis.delete(*keys)
    
    def warm_cache(self, frequently_used_concepts: List[int]):
        """Pre-warm cache with frequently used concepts"""
        for concept_id in frequently_used_concepts:
            try:
                # Pre-load concept data
                self.client.get_concept(concept_id)
                
                # Pre-load common hierarchy queries
                self.client.get_concept_ancestors(concept_id, max_levels=3)
                self.client.get_concept_descendants(concept_id, max_levels=2)
                
            except Exception as e:
                print(f"Failed to warm cache for concept {concept_id}: {e}")

Database Query Optimization

1. Index-Aware Queries

Structure queries to leverage database indexes:
def index_optimized_search(client, query_params: Dict[str, Any]) -> Dict[str, Any]:
    """Structure search to use optimal database indexes"""
    
    # Order filters by selectivity (most selective first)
    optimized_params = {}
    
    # 1. Concept ID filters (most selective)
    if "concept_ids" in query_params:
        optimized_params["concept_ids"] = query_params["concept_ids"]
    
    # 2. Vocabulary filters (highly selective)
    if "vocabularies" in query_params:
        optimized_params["vocabularies"] = query_params["vocabularies"]
    
    # 3. Domain filters (moderately selective)  
    if "domains" in query_params:
        optimized_params["domains"] = query_params["domains"]
    
    # 4. Standard concept filter (uses index)
    if "standard_concepts_only" in query_params:
        optimized_params["standard_concepts_only"] = query_params["standard_concepts_only"]
    
    # 5. Text search (least selective, but uses full-text index)
    if "query" in query_params:
        optimized_params["query"] = query_params["query"]
    
    # 6. Date filters (if needed)
    if "date_range" in query_params:
        optimized_params["date_range"] = query_params["date_range"]
    
    return client.advanced_search_concepts(optimized_params)

2. Minimize Data Transfer

Request only needed fields to reduce network transfer:
def lightweight_search(client, query: str, **kwargs) -> List[Dict[str, Any]]:
    """Search with minimal data transfer"""
    
    # Get basic results first
    results = client.search_concepts({
        "query": query,
        "limit": kwargs.get("limit", 20),
        **kwargs
    })
    
    # Return lightweight concept data
    lightweight_concepts = []
    for concept in results["concepts"]:
        lightweight_concepts.append({
            "concept_id": concept["concept_id"],
            "concept_name": concept["concept_name"],
            "concept_code": concept["concept_code"],
            "vocabulary_id": concept["vocabulary_id"],
            "domain_id": concept["domain_id"]
            # Exclude heavy fields like synonyms, descriptions, etc.
        })
    
    return lightweight_concepts

def get_full_concept_details(client, concept_ids: List[int]) -> List[Dict[str, Any]]:
    """Get full details only when needed"""
    
    # Use batch API for efficiency
    batch_request = {
        "concept_ids": concept_ids
    }
    
    return client.batch_get_concepts(batch_request)

Connection and Network Optimization

1. Connection Pooling

import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry

class HighPerformanceClient(OMOPHubClient):
    def __init__(self, api_key: str, **kwargs):
        super().__init__(api_key, **kwargs)
        
        # Configure connection pooling
        retry_strategy = Retry(
            total=3,
            backoff_factor=0.3,
            status_forcelist=[429, 500, 502, 503, 504],
        )
        
        adapter = HTTPAdapter(
            pool_connections=20,      # Number of connection pools
            pool_maxsize=50,          # Max connections per pool
            max_retries=retry_strategy,
            pool_block=False          # Don't block on pool exhaustion
        )
        
        self.session.mount("http://", adapter)
        self.session.mount("https://", adapter)
        
        # Configure keep-alive
        self.session.headers.update({
            'Connection': 'keep-alive',
            'Keep-Alive': 'timeout=30, max=100'
        })

2. Request Compression

import gzip
import json
import requests

def compressed_request(url, data):
    """
    Send compressed request with proper headers and body compression
    """
    # Compress the request body
    compressed_data = gzip.compress(json.dumps(data).encode('utf-8'))
    
    # Set proper headers for compression
    headers = {
        'Content-Type': 'application/json',
        'Content-Encoding': 'gzip',         # Indicates compressed request body
        'Accept-Encoding': 'gzip, deflate'  # Accepts compressed responses
    }
    
    return requests.post(url, data=compressed_data, headers=headers)

# Example usage with OMOPHub API
def bulk_search_compressed(client, search_queries):
    """
    Perform bulk search with request compression for large payloads
    """
    bulk_request_data = {
        "searches": search_queries,
        "options": {
            "include_facets": True,
            "include_relationships": False
        }
    }
    
    # Use compression for large bulk requests
    api_url = f"{client.base_url}/concepts/search/bulk"
    response = compressed_request(api_url, bulk_request_data)
    
    return response.json()

Performance Monitoring

1. Request Timing

import time
from contextlib import contextmanager

@contextmanager
def timing_context(operation_name: str):
    """Context manager for timing operations"""
    start_time = time.time()
    try:
        yield
    finally:
        elapsed = (time.time() - start_time) * 1000  # Convert to milliseconds
        print(f"{operation_name}: {elapsed:.2f}ms")
        
        # Log slow queries
        if elapsed > 200:  # Threshold for slow queries
            logger.warning(f"Slow query detected: {operation_name} took {elapsed:.2f}ms")

# Usage
with timing_context("Concept Search"):
    results = client.search_concepts({"query": "diabetes"})

with timing_context("Hierarchy Traversal"):
    ancestors = client.get_concept_ancestors(201826, max_levels=5)

2. Performance Metrics Collection

class PerformanceTracker:
    def __init__(self):
        self.metrics = {
            "request_count": 0,
            "total_time": 0,
            "slow_queries": 0,
            "cache_hits": 0,
            "cache_misses": 0
        }
    
    def track_request(self, operation: str, duration_ms: float, cache_hit: bool = False):
        """Track request performance metrics"""
        self.metrics["request_count"] += 1
        self.metrics["total_time"] += duration_ms
        
        if duration_ms > 200:
            self.metrics["slow_queries"] += 1
        
        if cache_hit:
            self.metrics["cache_hits"] += 1
        else:
            self.metrics["cache_misses"] += 1
    
    def get_performance_report(self) -> Dict[str, Any]:
        """Generate performance report"""
        if self.metrics["request_count"] == 0:
            return {"message": "No requests tracked"}
        
        total_cache_requests = self.metrics["cache_hits"] + self.metrics["cache_misses"]
        cache_hit_rate = (self.metrics["cache_hits"] / total_cache_requests * 100) if total_cache_requests > 0 else 0
        
        return {
            "total_requests": self.metrics["request_count"],
            "average_response_time": self.metrics["total_time"] / self.metrics["request_count"],
            "slow_query_rate": (self.metrics["slow_queries"] / self.metrics["request_count"] * 100),
            "cache_hit_rate": cache_hit_rate,
            "recommendations": self._generate_recommendations()
        }
    
    def _generate_recommendations(self) -> List[str]:
        """Generate performance recommendations"""
        recommendations = []
        
        avg_time = self.metrics["total_time"] / self.metrics["request_count"]
        slow_rate = self.metrics["slow_queries"] / self.metrics["request_count"] * 100
        cache_hit_rate = self.metrics["cache_hits"] / (self.metrics["cache_hits"] + self.metrics["cache_misses"]) * 100
        
        if avg_time > 150:
            recommendations.append("Consider using more specific vocabulary filters")
        
        if slow_rate > 10:
            recommendations.append("High rate of slow queries - review query complexity")
        
        if cache_hit_rate < 60:
            recommendations.append("Low cache hit rate - consider cache warming strategies")
        
        return recommendations

Best Practices Summary

Do’s

  1. Filter Early: Use vocabulary, domain, and concept class filters
  2. Limit Results: Use appropriate page sizes for your use case
  3. Cache Aggressively: Implement multi-level caching
  4. Batch Operations: Use batch APIs for multiple concepts
  5. Monitor Performance: Track response times and cache hit rates
  6. Use Specific Queries: Avoid broad, unfocused searches

Don’ts

  1. Unlimited Searches: Always set reasonable limits
  2. Sequential Requests: Use batch APIs instead
  3. Deep Hierarchies: Limit traversal depth appropriately
  4. Ignore Caching: Don’t make the same request multiple times
  5. Large Transfers: Don’t request unnecessary data
  6. Synchronous Processing: Use async/await for multiple requests

Performance Checklist

  • Vocabulary filters applied where possible
  • Domain filters used for specific use cases
  • Appropriate page sizes configured
  • Caching implemented at multiple levels
  • Batch operations used for multiple concepts
  • Request timeout configured
  • Connection pooling enabled
  • Performance monitoring in place
  • Error handling and retries implemented
  • Request compression enabled

Next Steps

I