Skip to main content

Overview

Building comprehensive concept sets is fundamental to observational health research. A concept set defines the clinical codes that identify patients with a specific condition, treatment, or outcome. This guide demonstrates how to use the OMOPHub Recommended Concepts API—powered by OHDSI PHOEBE methodology—to automatically expand and refine concept sets for phenotype development.
Use Case: Automatically discover related medical concepts to build comprehensive, network-ready concept sets that identify more patients while maintaining clinical precision.

Business Problem

Healthcare researchers and data scientists face significant challenges when building concept sets for observational studies:
  • Incomplete Coverage: Manual code selection typically misses 15-30% of clinically relevant codes, resulting in missed patients
  • Heterogeneous Data Sources: Different institutions use different coding granularity—US data sources tend to use specific codes while international sources often use broader terms
  • Network Study Failures: Concept sets developed on one data source frequently underperform on others due to vocabulary differences
  • Time-Consuming Expert Review: Building comprehensive concept sets traditionally requires weeks of clinical informaticist time
  • Late Patient Identification: Using only narrow, specific codes captures patients later in their disease course, missing early presentations
  • AI/LLM Mapping Gaps: Automated mapping tools achieve only 70-90% accuracy, requiring extensive validation
Research shows that comprehensive concept sets built with data-driven recommendations can identify 5x more patients while maintaining equivalent or higher positive predictive value, and can capture patients up to 2 years earlier in their disease course.

Solution Architecture

Implementation Guide

Step 1: Set Up OMOPHub Client

from omophub import OMOPHubClient
from typing import List, Dict, Any, Optional
from collections import defaultdict
from datetime import datetime


class PhenotypeExpansionService:
    """Service for building comprehensive concept sets using PHOEBE recommendations."""

    def __init__(self, api_key: str):
        self.client = OMOPHubClient(api_key=api_key)

    def get_starting_concept(self, clinical_term: str) -> Dict[str, Any]:
        """Find the best starting concept for a clinical idea.

        Args:
            clinical_term: Clinical term to search (e.g., "type 2 diabetes mellitus")

        Returns:
            Best matching standard concept with metadata
        """
        # Search for the clinical term
        results = self.client.search_concepts({
            "query": clinical_term,
            "standard_concepts_only": True,
            "limit": 10
        })

        if not results.get("concepts"):
            raise ValueError(f"No concepts found for: {clinical_term}")

        # Return the top match (highest relevance)
        return results["concepts"][0]

Step 2: Get Recommendations for Concepts

    def get_recommendations(
        self,
        concept_ids: List[int],
        domains: Optional[List[str]] = None,
        vocabularies: Optional[List[str]] = None
    ) -> Dict[str, Any]:
        """Get recommended concepts for one or more starting concepts.

        Args:
            concept_ids: List of concept IDs to get recommendations for
            domains: Optional filter by domain (Condition, Drug, Measurement, etc.)
            vocabularies: Optional filter by vocabulary (SNOMED, ICD10CM, etc.)

        Returns:
            Dictionary with recommendations grouped by input concept
        """
        # Call the recommended concepts endpoint
        recommendations = self.client.get_recommended_concepts(
            concept_ids=concept_ids
        )

        # Apply filters if specified
        if domains or vocabularies:
            recommendations = self._filter_recommendations(
                recommendations,
                domains=domains,
                vocabularies=vocabularies
            )

        return recommendations

    def _filter_recommendations(
        self,
        recommendations: Dict[str, Any],
        domains: Optional[List[str]] = None,
        vocabularies: Optional[List[str]] = None
    ) -> Dict[str, Any]:
        """Filter recommendations by domain and/or vocabulary."""
        filtered = {}

        for concept_id, recs in recommendations.get("data", {}).items():
            filtered_recs = []
            for rec in recs:
                # Check domain filter
                if domains and rec.get("domain_id") not in domains:
                    continue
                # Check vocabulary filter
                if vocabularies and rec.get("vocabulary_id") not in vocabularies:
                    continue
                filtered_recs.append(rec)

            if filtered_recs:
                filtered[concept_id] = filtered_recs

        return {"data": filtered, "meta": recommendations.get("meta", {})}

Step 3: Categorize Recommendations by Relationship Type

Relationship types indicate how the concept was recommended:
  • Ontology: Hierarchical relationships (parent, child, sibling)
  • Lexical: Similar names/synonyms
  • Patient Context: Concepts that co-occur in patient records
  • Clinical: Clinical relationships (findings, treatments)
    def categorize_by_relationship(
        self,
        recommendations: List[Dict[str, Any]]
    ) -> Dict[str, List[Dict[str, Any]]]:
        """Group recommendations by their relationship type.

        Args:
            recommendations: List of recommended concepts

        Returns:
            Dictionary with recommendations grouped by relationship category
        """
        categories = {
            "ontology": [],      # Hierarchical relationships
            "lexical": [],       # Name/synonym matches
            "patient_context": [], # Co-occurrence based
            "clinical": []       # Clinical relationships (findings, treatments)
        }

        # Relationship type mappings
        ontology_rels = {
            "Is a", "Subsumes", "Has parent", "Has child",
            "Has ancestor", "Has descendant"
        }

        lexical_rels = {
            "Has lexical match", "Mapped from", "Maps to",
            "Has synonym", "Source - RxNorm eq"
        }

        clinical_rels = {
            "Has finding", "Associated finding", "Has procedure",
            "Has treatment", "Has manifestation", "Due to",
            "Caused by", "Associated with"
        }

        for rec in recommendations:
            rel_type = rec.get("relationship_id", "")

            if rel_type in ontology_rels:
                categories["ontology"].append(rec)
            elif rel_type in lexical_rels:
                categories["lexical"].append(rec)
            elif rel_type in clinical_rels:
                categories["clinical"].append(rec)
            else:
                # Default: likely patient context or other
                categories["patient_context"].append(rec)

        return categories

Step 4: Build Expanded Concept Set Iteratively

    def expand_concept_set(
        self,
        seed_concepts: List[int],
        target_domains: Optional[List[str]] = None,
        max_iterations: int = 3,
        min_relevance_score: float = 0.5
    ) -> Dict[str, Any]:
        """Iteratively expand a concept set using recommendations.

        Args:
            seed_concepts: Initial concept IDs to start with
            target_domains: Domains to include (None = all domains)
            max_iterations: Maximum expansion iterations
            min_relevance_score: Minimum score to include a recommendation

        Returns:
            Expanded concept set with metadata
        """
        # Track all concepts
        included_concepts = set(seed_concepts)
        expansion_history = []

        # Get initial concept details
        concept_details = {}
        for concept_id in seed_concepts:
            details = self.client.get_concept(concept_id)
            if details.get("data"):
                concept_details[concept_id] = details["data"]

        current_frontier = list(seed_concepts)

        for iteration in range(max_iterations):
            if not current_frontier:
                break

            # Get recommendations for current frontier
            recommendations = self.get_recommendations(
                concept_ids=current_frontier,
                domains=target_domains
            )

            new_concepts = []
            iteration_additions = []

            for source_id, recs in recommendations.get("data", {}).items():
                for rec in recs:
                    concept_id = rec.get("concept_id")

                    # Skip if already included
                    if concept_id in included_concepts:
                        continue

                    # Skip non-standard concepts
                    if rec.get("standard_concept") != "S":
                        continue

                    # Add to expanded set
                    included_concepts.add(concept_id)
                    new_concepts.append(concept_id)
                    concept_details[concept_id] = rec

                    iteration_additions.append({
                        "concept_id": concept_id,
                        "concept_name": rec.get("concept_name"),
                        "source_concept_id": int(source_id),
                        "relationship": rec.get("relationship_id"),
                        "iteration": iteration + 1
                    })

            expansion_history.append({
                "iteration": iteration + 1,
                "frontier_size": len(current_frontier),
                "new_concepts_added": len(new_concepts),
                "additions": iteration_additions
            })

            # New frontier is the newly added concepts
            current_frontier = new_concepts

        return {
            "concept_set": {
                "concept_ids": list(included_concepts),
                "concepts": concept_details
            },
            "expansion_summary": {
                "seed_count": len(seed_concepts),
                "final_count": len(included_concepts),
                "expansion_ratio": len(included_concepts) / len(seed_concepts),
                "iterations": len(expansion_history)
            },
            "history": expansion_history
        }

Step 5: Analyze Cross-Vocabulary Coverage

Cross-vocabulary coverage is critical for network studies to ensure concept sets work across international data sources with different coding practices.
    def analyze_vocabulary_coverage(
        self,
        concept_set: Dict[str, Any]
    ) -> Dict[str, Any]:
        """Analyze vocabulary coverage of a concept set.

        Important for network studies to ensure concept sets
        work across international data sources.

        Args:
            concept_set: Expanded concept set from expand_concept_set()

        Returns:
            Coverage analysis by vocabulary and domain
        """
        concepts = concept_set.get("concept_set", {}).get("concepts", {})

        # Count by vocabulary
        vocab_counts = defaultdict(int)
        domain_counts = defaultdict(int)
        vocab_domain_matrix = defaultdict(lambda: defaultdict(int))

        for concept_id, details in concepts.items():
            vocab = details.get("vocabulary_id", "Unknown")
            domain = details.get("domain_id", "Unknown")

            vocab_counts[vocab] += 1
            domain_counts[domain] += 1
            vocab_domain_matrix[vocab][domain] += 1

        # Identify coverage gaps
        expected_vocabs = {
            "Condition": ["SNOMED", "ICD10CM", "ICD10", "Read", "ICD9CM"],
            "Drug": ["RxNorm", "RxNorm Extension", "ATC", "NDC"],
            "Measurement": ["LOINC", "SNOMED"],
            "Procedure": ["SNOMED", "HCPCS", "ICD10PCS"]
        }

        coverage_gaps = {}
        for domain, expected in expected_vocabs.items():
            if domain_counts.get(domain, 0) > 0:
                missing = [v for v in expected
                          if vocab_domain_matrix.get(v, {}).get(domain, 0) == 0]
                if missing:
                    coverage_gaps[domain] = missing

        return {
            "vocabulary_distribution": dict(vocab_counts),
            "domain_distribution": dict(domain_counts),
            "vocabulary_by_domain": {k: dict(v) for k, v in vocab_domain_matrix.items()},
            "coverage_gaps": coverage_gaps,
            "network_ready": len(coverage_gaps) == 0
        }

Example Implementation

Building a Type 2 Diabetes Phenotype

# Initialize the service
expansion_service = PhenotypeExpansionService("your_api_key")

# Step 1: Find the starting concept
starting_concept = expansion_service.get_starting_concept("type 2 diabetes mellitus")
print(f"Starting concept: {starting_concept['concept_name']} ({starting_concept['concept_id']})")
# Output: Starting concept: Type 2 diabetes mellitus (201826)

# Step 2: Get initial recommendations
recommendations = expansion_service.get_recommendations(
    concept_ids=[starting_concept["concept_id"]]
)

# Step 3: Review recommendations by category
categorized = expansion_service.categorize_by_relationship(
    recommendations["data"][str(starting_concept["concept_id"])]
)

print("\n=== RECOMMENDATIONS BY CATEGORY ===")
for category, recs in categorized.items():
    print(f"\n{category.upper()} ({len(recs)} concepts):")
    for rec in recs[:5]:  # Show first 5
        print(f"  - {rec['concept_name']} ({rec['vocabulary_id']})")

Expected Output

Starting concept: Type 2 diabetes mellitus (201826)

=== RECOMMENDATIONS BY CATEGORY ===

CLINICAL (12 concepts):
  - Hyperglycemia (SNOMED)
  - Diabetic retinopathy (SNOMED)
  - Hemoglobin A1c measurement (LOINC)
  - Diabetic nephropathy (SNOMED)
  - Peripheral neuropathy due to diabetes mellitus (SNOMED)

ONTOLOGY (8 concepts):
  - Diabetes mellitus (SNOMED)
  - Type 2 diabetes mellitus without complication (SNOMED)
  - Type 2 diabetes mellitus with kidney complications (SNOMED)

LEXICAL (15 concepts):
  - Type II diabetes mellitus (ICD10CM)
  - Non-insulin-dependent diabetes mellitus (Read)
  - Adult-onset diabetes (SNOMED)

PATIENT_CONTEXT (6 concepts):
  - Metformin (RxNorm)
  - Glucose measurement (LOINC)
  - Diabetic foot examination (SNOMED)

Full Expansion Workflow

# Step 4: Expand the concept set iteratively
expanded_set = expansion_service.expand_concept_set(
    seed_concepts=[201826],  # Type 2 diabetes mellitus
    target_domains=["Condition"],  # Focus on conditions
    max_iterations=2
)

print("\n=== EXPANSION SUMMARY ===")
summary = expanded_set["expansion_summary"]
print(f"Seed concepts: {summary['seed_count']}")
print(f"Final concepts: {summary['final_count']}")
print(f"Expansion ratio: {summary['expansion_ratio']:.1f}x")

# Step 5: Analyze vocabulary coverage
coverage = expansion_service.analyze_vocabulary_coverage(expanded_set)

print("\n=== VOCABULARY COVERAGE ===")
for vocab, count in coverage["vocabulary_distribution"].items():
    print(f"  {vocab}: {count} concepts")

if coverage["coverage_gaps"]:
    print("\n  Coverage Gaps Detected:")
    for domain, missing_vocabs in coverage["coverage_gaps"].items():
        print(f"  {domain}: Missing {', '.join(missing_vocabs)}")
else:
    print("\n Concept set is network-ready!")

Expected Output

=== EXPANSION SUMMARY ===
Seed concepts: 1
Final concepts: 47
Expansion ratio: 47.0x

=== VOCABULARY COVERAGE ===
  SNOMED: 28 concepts
  ICD10CM: 12 concepts
  ICD10: 4 concepts
  Read: 3 concepts

Concept set is network-ready!

Integration Patterns

1. ATLAS/OHDSI Workflow Integration

Pre-populate concept recommendations before importing to ATLAS:
class ATLASConceptSetBuilder:
    """Build ATLAS-compatible concept set expressions."""

    def __init__(self, omophub_client):
        self.client = omophub_client
        self.expansion_service = PhenotypeExpansionService(
            omophub_client.config.api_key
        )

    def build_atlas_expression(
        self,
        clinical_term: str,
        include_descendants: bool = True
    ) -> Dict[str, Any]:
        """Build an ATLAS-compatible concept set expression.

        Args:
            clinical_term: Clinical term to build concept set for
            include_descendants: Whether to include descendants

        Returns:
            ATLAS concept set expression JSON
        """
        # Get expanded concept set
        starting = self.expansion_service.get_starting_concept(clinical_term)
        expanded = self.expansion_service.expand_concept_set(
            seed_concepts=[starting["concept_id"]],
            max_iterations=2
        )

        # Build ATLAS expression format
        items = []
        for concept_id, details in expanded["concept_set"]["concepts"].items():
            items.append({
                "concept": {
                    "CONCEPT_ID": int(concept_id),
                    "CONCEPT_NAME": details.get("concept_name"),
                    "STANDARD_CONCEPT": details.get("standard_concept"),
                    "STANDARD_CONCEPT_CAPTION": "Standard",
                    "INVALID_REASON": details.get("invalid_reason"),
                    "CONCEPT_CODE": details.get("concept_code"),
                    "DOMAIN_ID": details.get("domain_id"),
                    "VOCABULARY_ID": details.get("vocabulary_id"),
                    "CONCEPT_CLASS_ID": details.get("concept_class_id")
                },
                "isExcluded": False,
                "includeDescendants": include_descendants,
                "includeMapped": True
            })

        return {
            "items": items,
            "name": f"{clinical_term} - Expanded",
            "description": f"Auto-generated from PHOEBE recommendations. "
                          f"Expansion ratio: {expanded['expansion_summary']['expansion_ratio']:.1f}x"
        }

    def export_for_atlas(self, expression: Dict[str, Any], filepath: str):
        """Export concept set expression as JSON for ATLAS import."""
        import json
        with open(filepath, 'w') as f:
            json.dump(expression, f, indent=2)
        print(f"Exported to {filepath}")

2. LLM/AI Mapping Validation Pipeline

Use recommendations to validate AI-generated concept mappings:
class LLMMappingValidator:
    """Validate LLM-generated concept mappings using PHOEBE recommendations."""

    def __init__(self, omophub_client):
        self.client = omophub_client

    def validate_llm_mapping(
        self,
        source_term: str,
        llm_mapped_concept_id: int,
        confidence_threshold: float = 0.7
    ) -> Dict[str, Any]:
        """Validate an LLM-generated mapping against PHOEBE recommendations.

        Args:
            source_term: Original term that was mapped
            llm_mapped_concept_id: Concept ID suggested by LLM
            confidence_threshold: Minimum confidence for validation

        Returns:
            Validation result with confidence score and alternatives
        """
        # Get the LLM's mapped concept details
        llm_concept = self.client.get_concept(llm_mapped_concept_id)

        # Search for the source term to find expected concepts
        search_results = self.client.search_concepts({
            "query": source_term,
            "standard_concepts_only": True,
            "limit": 20
        })

        # Get recommendations for top search results
        top_concept_ids = [c["concept_id"] for c in search_results.get("concepts", [])[:5]]

        if not top_concept_ids:
            return {
                "valid": False,
                "confidence": 0.0,
                "reason": "No matching concepts found for source term",
                "alternatives": []
            }

        recommendations = self.client.get_recommended_concepts(
            concept_ids=top_concept_ids
        )

        # Check if LLM mapping appears in recommendations
        all_recommended_ids = set()
        for concept_id, recs in recommendations.get("data", {}).items():
            all_recommended_ids.add(int(concept_id))
            for rec in recs:
                all_recommended_ids.add(rec.get("concept_id"))

        # Also include direct search results
        search_concept_ids = {c["concept_id"] for c in search_results.get("concepts", [])}

        # Calculate validation confidence
        in_search = llm_mapped_concept_id in search_concept_ids
        in_recommendations = llm_mapped_concept_id in all_recommended_ids

        if in_search:
            # Direct match in search results
            confidence = 0.95
            validation_path = "direct_search_match"
        elif in_recommendations:
            # Found in recommendations
            confidence = 0.85
            validation_path = "recommendation_match"
        else:
            # Not found - check semantic similarity
            confidence = 0.3
            validation_path = "no_match"

        # Get alternative suggestions
        alternatives = []
        if confidence < confidence_threshold:
            for concept in search_results.get("concepts", [])[:5]:
                if concept["concept_id"] != llm_mapped_concept_id:
                    alternatives.append({
                        "concept_id": concept["concept_id"],
                        "concept_name": concept["concept_name"],
                        "vocabulary_id": concept["vocabulary_id"]
                    })

        return {
            "valid": confidence >= confidence_threshold,
            "confidence": confidence,
            "validation_path": validation_path,
            "llm_mapping": {
                "concept_id": llm_mapped_concept_id,
                "concept_name": llm_concept.get("data", {}).get("concept_name")
            },
            "alternatives": alternatives,
            "recommendation": "accept" if confidence >= confidence_threshold else "review"
        }

    def batch_validate(
        self,
        mappings: List[Dict[str, Any]]
    ) -> Dict[str, Any]:
        """Validate multiple LLM mappings in batch.

        Args:
            mappings: List of {"source_term": str, "concept_id": int}

        Returns:
            Batch validation results with summary statistics
        """
        results = []

        for mapping in mappings:
            result = self.validate_llm_mapping(
                source_term=mapping["source_term"],
                llm_mapped_concept_id=mapping["concept_id"]
            )
            results.append({
                "source_term": mapping["source_term"],
                "mapped_concept_id": mapping["concept_id"],
                **result
            })

        # Calculate summary statistics
        valid_count = sum(1 for r in results if r["valid"])

        return {
            "results": results,
            "summary": {
                "total": len(results),
                "valid": valid_count,
                "needs_review": len(results) - valid_count,
                "accuracy_rate": valid_count / len(results) if results else 0
            }
        }

3. Network Study Harmonization

Ensure concept sets work across international data sources:
class NetworkStudyHarmonizer:
    """Harmonize concept sets for multi-site network studies."""

    # Vocabulary priorities by region
    REGIONAL_VOCABS = {
        "US": ["ICD10CM", "ICD9CM", "HCPCS", "NDC"],
        "EU": ["ICD10", "ATC", "Read"],
        "UK": ["Read", "ICD10", "SNOMED"],
        "Asia": ["ICD10", "KCD", "SNOMED"],
        "International": ["SNOMED", "LOINC", "RxNorm", "ATC"]
    }

    def __init__(self, omophub_client):
        self.client = omophub_client
        self.expansion_service = PhenotypeExpansionService(
            omophub_client.config.api_key
        )

    def harmonize_for_network(
        self,
        concept_ids: List[int],
        target_regions: List[str] = None
    ) -> Dict[str, Any]:
        """Expand concept set to cover target regional vocabularies.

        Args:
            concept_ids: Starting concept IDs
            target_regions: Regions to target (default: all)

        Returns:
            Harmonized concept set with regional coverage analysis
        """
        if target_regions is None:
            target_regions = ["US", "EU", "UK", "International"]

        # Get all target vocabularies
        target_vocabs = set()
        for region in target_regions:
            target_vocabs.update(self.REGIONAL_VOCABS.get(region, []))

        # Expand concept set
        expanded = self.expansion_service.expand_concept_set(
            seed_concepts=concept_ids,
            max_iterations=3
        )

        # Analyze coverage by region
        concepts = expanded["concept_set"]["concepts"]
        regional_coverage = {}

        for region in target_regions:
            region_vocabs = self.REGIONAL_VOCABS[region]
            covered = []
            missing = []

            for vocab in region_vocabs:
                vocab_concepts = [
                    c for c in concepts.values()
                    if c.get("vocabulary_id") == vocab
                ]
                if vocab_concepts:
                    covered.append(vocab)
                else:
                    missing.append(vocab)

            regional_coverage[region] = {
                "covered_vocabularies": covered,
                "missing_vocabularies": missing,
                "coverage_score": len(covered) / len(region_vocabs) if region_vocabs else 1.0
            }

        # Overall network readiness
        min_coverage = min(r["coverage_score"] for r in regional_coverage.values())

        return {
            "concept_set": expanded["concept_set"],
            "regional_coverage": regional_coverage,
            "network_readiness": {
                "minimum_regional_coverage": min_coverage,
                "ready_for_network": min_coverage >= 0.7,
                "recommendation": self._get_harmonization_recommendation(regional_coverage)
            }
        }

    def _get_harmonization_recommendation(
        self,
        regional_coverage: Dict[str, Any]
    ) -> str:
        """Generate recommendation based on coverage analysis."""
        gaps = []
        for region, coverage in regional_coverage.items():
            if coverage["missing_vocabularies"]:
                gaps.append(f"{region}: missing {', '.join(coverage['missing_vocabularies'])}")

        if not gaps:
            return "Concept set has comprehensive coverage across all target regions."
        else:
            return f"Consider adding mappings for: {'; '.join(gaps)}"

Performance Optimization

Batch Processing Multiple Concept Sets

import asyncio
from concurrent.futures import ThreadPoolExecutor


class BatchPhenotypeProcessor:
    """Process multiple phenotypes efficiently in batch."""

    def __init__(self, omophub_client, max_workers: int = 5):
        self.client = omophub_client
        self.max_workers = max_workers
        self.expansion_service = PhenotypeExpansionService(
            omophub_client.config.api_key
        )

    def process_phenotype_batch(
        self,
        phenotype_definitions: List[Dict[str, Any]]
    ) -> List[Dict[str, Any]]:
        """Process multiple phenotypes in parallel.

        Args:
            phenotype_definitions: List of {"name": str, "seed_concepts": List[int]}

        Returns:
            List of expanded phenotype definitions
        """
        # First, collect all unique concept IDs
        all_concept_ids = set()
        for phenotype in phenotype_definitions:
            all_concept_ids.update(phenotype.get("seed_concepts", []))

        # Batch fetch all recommendations at once
        all_recommendations = self.client.get_recommended_concepts(
            concept_ids=list(all_concept_ids)
        )

        # Process each phenotype using cached recommendations
        results = []
        for phenotype in phenotype_definitions:
            expanded = self._expand_with_cache(
                phenotype=phenotype,
                recommendations_cache=all_recommendations
            )
            results.append(expanded)

        return results

    def _expand_with_cache(
        self,
        phenotype: Dict[str, Any],
        recommendations_cache: Dict[str, Any]
    ) -> Dict[str, Any]:
        """Expand a phenotype using cached recommendations."""
        seed_concepts = phenotype.get("seed_concepts", [])
        included = set(seed_concepts)

        for concept_id in seed_concepts:
            recs = recommendations_cache.get("data", {}).get(str(concept_id), [])
            for rec in recs:
                if rec.get("standard_concept") == "S":
                    included.add(rec.get("concept_id"))

        return {
            "name": phenotype.get("name"),
            "seed_concepts": seed_concepts,
            "expanded_concepts": list(included),
            "expansion_ratio": len(included) / len(seed_concepts) if seed_concepts else 0
        }


# Usage Example
processor = BatchPhenotypeProcessor(client)

phenotypes = [
    {"name": "Type 2 Diabetes", "seed_concepts": [201826]},
    {"name": "Hypertension", "seed_concepts": [320128]},
    {"name": "Heart Failure", "seed_concepts": [316139]},
    {"name": "COPD", "seed_concepts": [255573]},
    {"name": "Asthma", "seed_concepts": [317009]}
]

results = processor.process_phenotype_batch(phenotypes)

for result in results:
    print(f"{result['name']}: {len(result['seed_concepts'])}{len(result['expanded_concepts'])} concepts")

Caching Strategy

import redis
import json
import hashlib
from typing import Optional


class CachedExpansionService(PhenotypeExpansionService):
    """Expansion service with Redis caching for improved performance."""

    def __init__(
        self,
        api_key: str,
        redis_url: str = "redis://localhost:6379",
        cache_ttl: int = 86400  # 24 hours
    ):
        super().__init__(api_key)
        self.redis = redis.from_url(redis_url)
        self.cache_ttl = cache_ttl

    def _cache_key(self, prefix: str, *args) -> str:
        """Generate cache key from arguments."""
        key_data = json.dumps(args, sort_keys=True)
        hash_val = hashlib.md5(key_data.encode()).hexdigest()
        return f"phenotype:{prefix}:{hash_val}"

    def get_recommendations(
        self,
        concept_ids: List[int],
        domains: Optional[List[str]] = None,
        vocabularies: Optional[List[str]] = None
    ) -> Dict[str, Any]:
        """Get recommendations with caching."""
        cache_key = self._cache_key("recs", concept_ids, domains, vocabularies)

        # Check cache
        cached = self.redis.get(cache_key)
        if cached:
            return json.loads(cached)

        # Fetch from API
        result = super().get_recommendations(concept_ids, domains, vocabularies)

        # Cache result
        self.redis.setex(
            cache_key,
            self.cache_ttl,
            json.dumps(result, default=str)
        )

        return result

    def invalidate_cache(self, pattern: str = "phenotype:*"):
        """Invalidate cached recommendations."""
        keys = self.redis.keys(pattern)
        if keys:
            self.redis.delete(*keys)
            print(f"Invalidated {len(keys)} cache entries")

Quality Assurance

Concept Set Validation

class ConceptSetValidator:
    """Validate expanded concept sets for quality and completeness."""

    def __init__(self, omophub_client):
        self.client = omophub_client

    def validate_concept_set(
        self,
        concept_ids: List[int],
        expected_domain: str = None
    ) -> Dict[str, Any]:
        """Comprehensive validation of a concept set.

        Args:
            concept_ids: Concept IDs to validate
            expected_domain: Expected primary domain (Condition, Drug, etc.)

        Returns:
            Validation results with issues and recommendations
        """
        issues = []
        warnings = []

        # Fetch all concept details
        concepts = {}
        for concept_id in concept_ids:
            result = self.client.get_concept(concept_id)
            if result.get("data"):
                concepts[concept_id] = result["data"]

        # Check 1: All concepts exist and are valid
        missing = set(concept_ids) - set(concepts.keys())
        if missing:
            issues.append({
                "type": "missing_concepts",
                "severity": "error",
                "message": f"{len(missing)} concept IDs not found",
                "details": list(missing)
            })

        # Check 2: All concepts are standard
        non_standard = [
            cid for cid, c in concepts.items()
            if c.get("standard_concept") != "S"
        ]
        if non_standard:
            issues.append({
                "type": "non_standard_concepts",
                "severity": "warning",
                "message": f"{len(non_standard)} non-standard concepts included",
                "details": non_standard
            })

        # Check 3: Domain consistency
        if expected_domain:
            wrong_domain = [
                cid for cid, c in concepts.items()
                if c.get("domain_id") != expected_domain
            ]
            if wrong_domain:
                warnings.append({
                    "type": "domain_mismatch",
                    "severity": "warning",
                    "message": f"{len(wrong_domain)} concepts outside expected domain '{expected_domain}'",
                    "details": wrong_domain
                })

        # Check 4: Invalid/deprecated concepts
        invalid = [
            cid for cid, c in concepts.items()
            if c.get("invalid_reason") is not None
        ]
        if invalid:
            issues.append({
                "type": "invalid_concepts",
                "severity": "error",
                "message": f"{len(invalid)} concepts are invalid/deprecated",
                "details": invalid
            })

        # Check 5: Vocabulary diversity
        vocab_counts = {}
        for c in concepts.values():
            vocab = c.get("vocabulary_id", "Unknown")
            vocab_counts[vocab] = vocab_counts.get(vocab, 0) + 1

        if len(vocab_counts) == 1:
            warnings.append({
                "type": "single_vocabulary",
                "severity": "info",
                "message": "Concept set uses only one vocabulary - may have limited network coverage",
                "details": list(vocab_counts.keys())
            })

        # Calculate validation score
        error_count = sum(1 for i in issues if i["severity"] == "error")
        warning_count = sum(1 for i in issues + warnings if i["severity"] == "warning")

        score = 1.0
        score -= error_count * 0.2
        score -= warning_count * 0.05
        score = max(0, score)

        return {
            "valid": error_count == 0,
            "score": score,
            "concept_count": len(concepts),
            "vocabulary_distribution": vocab_counts,
            "issues": issues,
            "warnings": warnings,
            "recommendation": self._get_validation_recommendation(issues, warnings)
        }

    def _get_validation_recommendation(
        self,
        issues: List[Dict],
        warnings: List[Dict]
    ) -> str:
        """Generate recommendation based on validation results."""
        if not issues and not warnings:
            return "Concept set passed all validation checks."

        recommendations = []

        for issue in issues:
            if issue["type"] == "non_standard_concepts":
                recommendations.append(
                    "Replace non-standard concepts with their standard equivalents"
                )
            elif issue["type"] == "invalid_concepts":
                recommendations.append(
                    "Remove or replace deprecated concepts with current versions"
                )

        for warning in warnings:
            if warning["type"] == "single_vocabulary":
                recommendations.append(
                    "Consider expanding to include mappings from other vocabularies"
                )

        return "; ".join(recommendations) if recommendations else "Review flagged items."

Best Practices

1. Start with Standard Concepts

Always begin phenotype development with standard SNOMED concepts:
# Good: Start with standard concept
starting_concept = client.search_concepts({
    "query": "type 2 diabetes mellitus",
    "vocabularies": ["SNOMED"],
    "standard_concepts_only": True
})

# Avoid: Starting with source vocabulary codes
# ICD codes should be discovered through expansion, not used as seeds

2. Iterative Refinement Workflow

def iterative_phenotype_development(
    client,
    clinical_term: str,
    max_rounds: int = 3
) -> Dict[str, Any]:
    """
    Recommended workflow for developing production phenotypes.

    1. Start with core concept
    2. Get recommendations
    3. Clinical expert review
    4. Add approved concepts
    5. Repeat until comprehensive
    """
    expansion_service = PhenotypeExpansionService(client.config.api_key)

    # Round 1: Get starting point
    print(f"=== ROUND 1: Finding starting concept for '{clinical_term}' ===")
    starting = expansion_service.get_starting_concept(clinical_term)

    current_concepts = [starting["concept_id"]]
    all_reviewed = set(current_concepts)

    for round_num in range(2, max_rounds + 1):
        print(f"\n=== ROUND {round_num}: Expanding concept set ===")

        # Get recommendations for current set
        recommendations = expansion_service.get_recommendations(current_concepts)

        # Categorize for review
        new_candidates = []
        for concept_id, recs in recommendations.get("data", {}).items():
            for rec in recs:
                rec_id = rec.get("concept_id")
                if rec_id not in all_reviewed:
                    new_candidates.append(rec)
                    all_reviewed.add(rec_id)

        if not new_candidates:
            print("No new recommendations - concept set is complete")
            break

        # In production, this would be clinical expert review
        # Here we auto-accept standard concepts
        accepted = [
            c for c in new_candidates
            if c.get("standard_concept") == "S"
        ]

        print(f"  Reviewed: {len(new_candidates)} candidates")
        print(f"  Accepted: {len(accepted)} concepts")

        current_concepts = [c["concept_id"] for c in accepted]

    return {
        "final_concept_ids": list(all_reviewed),
        "total_concepts": len(all_reviewed),
        "rounds_completed": round_num
    }

3. Network Study Considerations

# When building phenotypes for network studies:

# 1. Always check vocabulary coverage
coverage = expansion_service.analyze_vocabulary_coverage(expanded_set)
if not coverage["network_ready"]:
    print("Warning: Concept set may not perform well across all sites")
    print(f"Coverage gaps: {coverage['coverage_gaps']}")

# 2. Include both specific and broad codes
# Specific codes capture definitive cases
# Broad codes capture cases coded with less specificity

# 3. Consider regional coding practices
# US tends to use specific ICD10CM codes
# EU/international often use broader ICD10 codes

# 4. Test on multiple data sources before deployment
# Use Cohort Diagnostics to compare cohort characteristics

4. Documentation and Provenance

def document_concept_set(
    concept_set: Dict[str, Any],
    author: str,
    purpose: str
) -> Dict[str, Any]:
    """Create comprehensive documentation for a concept set."""
    return {
        "metadata": {
            "name": concept_set.get("name"),
            "author": author,
            "created_date": datetime.now().isoformat(),
            "purpose": purpose,
            "generation_method": "PHOEBE-based expansion via OMOPHub API"
        },
        "concept_set": concept_set,
        "provenance": {
            "seed_concepts": concept_set.get("seed_concepts", []),
            "expansion_iterations": concept_set.get("iterations", 0),
            "vocabularies_included": list(
                concept_set.get("vocabulary_distribution", {}).keys()
            ),
            "validation_score": concept_set.get("validation_score")
        },
        "usage_notes": [
            "This concept set was generated using data-driven recommendations",
            "Clinical review is recommended before use in regulatory submissions",
            "Vocabulary coverage has been optimized for network studies"
        ]
    }