Overview
Building comprehensive concept sets is fundamental to observational health research. A concept set defines the clinical codes that identify patients with a specific condition, treatment, or outcome. This guide demonstrates how to use the OMOPHub Recommended Concepts API—powered by OHDSI PHOEBE methodology—to automatically expand and refine concept sets for phenotype development.Use Case: Automatically discover related medical concepts to build comprehensive, network-ready concept sets that identify more patients while maintaining clinical precision.
Business Problem
Healthcare researchers and data scientists face significant challenges when building concept sets for observational studies:- Incomplete Coverage: Manual code selection typically misses 15-30% of clinically relevant codes, resulting in missed patients
- Heterogeneous Data Sources: Different institutions use different coding granularity—US data sources tend to use specific codes while international sources often use broader terms
- Network Study Failures: Concept sets developed on one data source frequently underperform on others due to vocabulary differences
- Time-Consuming Expert Review: Building comprehensive concept sets traditionally requires weeks of clinical informaticist time
- Late Patient Identification: Using only narrow, specific codes captures patients later in their disease course, missing early presentations
- AI/LLM Mapping Gaps: Automated mapping tools achieve only 70-90% accuracy, requiring extensive validation
Solution Architecture
Implementation Guide
Step 1: Set Up OMOPHub Client
Copy
from omophub import OMOPHubClient
from typing import List, Dict, Any, Optional
from collections import defaultdict
from datetime import datetime
class PhenotypeExpansionService:
"""Service for building comprehensive concept sets using PHOEBE recommendations."""
def __init__(self, api_key: str):
self.client = OMOPHubClient(api_key=api_key)
def get_starting_concept(self, clinical_term: str) -> Dict[str, Any]:
"""Find the best starting concept for a clinical idea.
Args:
clinical_term: Clinical term to search (e.g., "type 2 diabetes mellitus")
Returns:
Best matching standard concept with metadata
"""
# Search for the clinical term
results = self.client.search_concepts({
"query": clinical_term,
"standard_concepts_only": True,
"limit": 10
})
if not results.get("concepts"):
raise ValueError(f"No concepts found for: {clinical_term}")
# Return the top match (highest relevance)
return results["concepts"][0]
Step 2: Get Recommendations for Concepts
Copy
def get_recommendations(
self,
concept_ids: List[int],
domains: Optional[List[str]] = None,
vocabularies: Optional[List[str]] = None
) -> Dict[str, Any]:
"""Get recommended concepts for one or more starting concepts.
Args:
concept_ids: List of concept IDs to get recommendations for
domains: Optional filter by domain (Condition, Drug, Measurement, etc.)
vocabularies: Optional filter by vocabulary (SNOMED, ICD10CM, etc.)
Returns:
Dictionary with recommendations grouped by input concept
"""
# Call the recommended concepts endpoint
recommendations = self.client.get_recommended_concepts(
concept_ids=concept_ids
)
# Apply filters if specified
if domains or vocabularies:
recommendations = self._filter_recommendations(
recommendations,
domains=domains,
vocabularies=vocabularies
)
return recommendations
def _filter_recommendations(
self,
recommendations: Dict[str, Any],
domains: Optional[List[str]] = None,
vocabularies: Optional[List[str]] = None
) -> Dict[str, Any]:
"""Filter recommendations by domain and/or vocabulary."""
filtered = {}
for concept_id, recs in recommendations.get("data", {}).items():
filtered_recs = []
for rec in recs:
# Check domain filter
if domains and rec.get("domain_id") not in domains:
continue
# Check vocabulary filter
if vocabularies and rec.get("vocabulary_id") not in vocabularies:
continue
filtered_recs.append(rec)
if filtered_recs:
filtered[concept_id] = filtered_recs
return {"data": filtered, "meta": recommendations.get("meta", {})}
Step 3: Categorize Recommendations by Relationship Type
Relationship types indicate how the concept was recommended:
- Ontology: Hierarchical relationships (parent, child, sibling)
- Lexical: Similar names/synonyms
- Patient Context: Concepts that co-occur in patient records
- Clinical: Clinical relationships (findings, treatments)
Copy
def categorize_by_relationship(
self,
recommendations: List[Dict[str, Any]]
) -> Dict[str, List[Dict[str, Any]]]:
"""Group recommendations by their relationship type.
Args:
recommendations: List of recommended concepts
Returns:
Dictionary with recommendations grouped by relationship category
"""
categories = {
"ontology": [], # Hierarchical relationships
"lexical": [], # Name/synonym matches
"patient_context": [], # Co-occurrence based
"clinical": [] # Clinical relationships (findings, treatments)
}
# Relationship type mappings
ontology_rels = {
"Is a", "Subsumes", "Has parent", "Has child",
"Has ancestor", "Has descendant"
}
lexical_rels = {
"Has lexical match", "Mapped from", "Maps to",
"Has synonym", "Source - RxNorm eq"
}
clinical_rels = {
"Has finding", "Associated finding", "Has procedure",
"Has treatment", "Has manifestation", "Due to",
"Caused by", "Associated with"
}
for rec in recommendations:
rel_type = rec.get("relationship_id", "")
if rel_type in ontology_rels:
categories["ontology"].append(rec)
elif rel_type in lexical_rels:
categories["lexical"].append(rec)
elif rel_type in clinical_rels:
categories["clinical"].append(rec)
else:
# Default: likely patient context or other
categories["patient_context"].append(rec)
return categories
Step 4: Build Expanded Concept Set Iteratively
Copy
def expand_concept_set(
self,
seed_concepts: List[int],
target_domains: Optional[List[str]] = None,
max_iterations: int = 3,
min_relevance_score: float = 0.5
) -> Dict[str, Any]:
"""Iteratively expand a concept set using recommendations.
Args:
seed_concepts: Initial concept IDs to start with
target_domains: Domains to include (None = all domains)
max_iterations: Maximum expansion iterations
min_relevance_score: Minimum score to include a recommendation
Returns:
Expanded concept set with metadata
"""
# Track all concepts
included_concepts = set(seed_concepts)
expansion_history = []
# Get initial concept details
concept_details = {}
for concept_id in seed_concepts:
details = self.client.get_concept(concept_id)
if details.get("data"):
concept_details[concept_id] = details["data"]
current_frontier = list(seed_concepts)
for iteration in range(max_iterations):
if not current_frontier:
break
# Get recommendations for current frontier
recommendations = self.get_recommendations(
concept_ids=current_frontier,
domains=target_domains
)
new_concepts = []
iteration_additions = []
for source_id, recs in recommendations.get("data", {}).items():
for rec in recs:
concept_id = rec.get("concept_id")
# Skip if already included
if concept_id in included_concepts:
continue
# Skip non-standard concepts
if rec.get("standard_concept") != "S":
continue
# Add to expanded set
included_concepts.add(concept_id)
new_concepts.append(concept_id)
concept_details[concept_id] = rec
iteration_additions.append({
"concept_id": concept_id,
"concept_name": rec.get("concept_name"),
"source_concept_id": int(source_id),
"relationship": rec.get("relationship_id"),
"iteration": iteration + 1
})
expansion_history.append({
"iteration": iteration + 1,
"frontier_size": len(current_frontier),
"new_concepts_added": len(new_concepts),
"additions": iteration_additions
})
# New frontier is the newly added concepts
current_frontier = new_concepts
return {
"concept_set": {
"concept_ids": list(included_concepts),
"concepts": concept_details
},
"expansion_summary": {
"seed_count": len(seed_concepts),
"final_count": len(included_concepts),
"expansion_ratio": len(included_concepts) / len(seed_concepts),
"iterations": len(expansion_history)
},
"history": expansion_history
}
Step 5: Analyze Cross-Vocabulary Coverage
Cross-vocabulary coverage is critical for network studies to ensure concept sets work across international data sources with different coding practices.
Copy
def analyze_vocabulary_coverage(
self,
concept_set: Dict[str, Any]
) -> Dict[str, Any]:
"""Analyze vocabulary coverage of a concept set.
Important for network studies to ensure concept sets
work across international data sources.
Args:
concept_set: Expanded concept set from expand_concept_set()
Returns:
Coverage analysis by vocabulary and domain
"""
concepts = concept_set.get("concept_set", {}).get("concepts", {})
# Count by vocabulary
vocab_counts = defaultdict(int)
domain_counts = defaultdict(int)
vocab_domain_matrix = defaultdict(lambda: defaultdict(int))
for concept_id, details in concepts.items():
vocab = details.get("vocabulary_id", "Unknown")
domain = details.get("domain_id", "Unknown")
vocab_counts[vocab] += 1
domain_counts[domain] += 1
vocab_domain_matrix[vocab][domain] += 1
# Identify coverage gaps
expected_vocabs = {
"Condition": ["SNOMED", "ICD10CM", "ICD10", "Read", "ICD9CM"],
"Drug": ["RxNorm", "RxNorm Extension", "ATC", "NDC"],
"Measurement": ["LOINC", "SNOMED"],
"Procedure": ["SNOMED", "HCPCS", "ICD10PCS"]
}
coverage_gaps = {}
for domain, expected in expected_vocabs.items():
if domain_counts.get(domain, 0) > 0:
missing = [v for v in expected
if vocab_domain_matrix.get(v, {}).get(domain, 0) == 0]
if missing:
coverage_gaps[domain] = missing
return {
"vocabulary_distribution": dict(vocab_counts),
"domain_distribution": dict(domain_counts),
"vocabulary_by_domain": {k: dict(v) for k, v in vocab_domain_matrix.items()},
"coverage_gaps": coverage_gaps,
"network_ready": len(coverage_gaps) == 0
}
Example Implementation
Building a Type 2 Diabetes Phenotype
Copy
# Initialize the service
expansion_service = PhenotypeExpansionService("your_api_key")
# Step 1: Find the starting concept
starting_concept = expansion_service.get_starting_concept("type 2 diabetes mellitus")
print(f"Starting concept: {starting_concept['concept_name']} ({starting_concept['concept_id']})")
# Output: Starting concept: Type 2 diabetes mellitus (201826)
# Step 2: Get initial recommendations
recommendations = expansion_service.get_recommendations(
concept_ids=[starting_concept["concept_id"]]
)
# Step 3: Review recommendations by category
categorized = expansion_service.categorize_by_relationship(
recommendations["data"][str(starting_concept["concept_id"])]
)
print("\n=== RECOMMENDATIONS BY CATEGORY ===")
for category, recs in categorized.items():
print(f"\n{category.upper()} ({len(recs)} concepts):")
for rec in recs[:5]: # Show first 5
print(f" - {rec['concept_name']} ({rec['vocabulary_id']})")
Expected Output
Copy
Starting concept: Type 2 diabetes mellitus (201826)
=== RECOMMENDATIONS BY CATEGORY ===
CLINICAL (12 concepts):
- Hyperglycemia (SNOMED)
- Diabetic retinopathy (SNOMED)
- Hemoglobin A1c measurement (LOINC)
- Diabetic nephropathy (SNOMED)
- Peripheral neuropathy due to diabetes mellitus (SNOMED)
ONTOLOGY (8 concepts):
- Diabetes mellitus (SNOMED)
- Type 2 diabetes mellitus without complication (SNOMED)
- Type 2 diabetes mellitus with kidney complications (SNOMED)
LEXICAL (15 concepts):
- Type II diabetes mellitus (ICD10CM)
- Non-insulin-dependent diabetes mellitus (Read)
- Adult-onset diabetes (SNOMED)
PATIENT_CONTEXT (6 concepts):
- Metformin (RxNorm)
- Glucose measurement (LOINC)
- Diabetic foot examination (SNOMED)
Full Expansion Workflow
Copy
# Step 4: Expand the concept set iteratively
expanded_set = expansion_service.expand_concept_set(
seed_concepts=[201826], # Type 2 diabetes mellitus
target_domains=["Condition"], # Focus on conditions
max_iterations=2
)
print("\n=== EXPANSION SUMMARY ===")
summary = expanded_set["expansion_summary"]
print(f"Seed concepts: {summary['seed_count']}")
print(f"Final concepts: {summary['final_count']}")
print(f"Expansion ratio: {summary['expansion_ratio']:.1f}x")
# Step 5: Analyze vocabulary coverage
coverage = expansion_service.analyze_vocabulary_coverage(expanded_set)
print("\n=== VOCABULARY COVERAGE ===")
for vocab, count in coverage["vocabulary_distribution"].items():
print(f" {vocab}: {count} concepts")
if coverage["coverage_gaps"]:
print("\n Coverage Gaps Detected:")
for domain, missing_vocabs in coverage["coverage_gaps"].items():
print(f" {domain}: Missing {', '.join(missing_vocabs)}")
else:
print("\n Concept set is network-ready!")
Expected Output
Copy
=== EXPANSION SUMMARY ===
Seed concepts: 1
Final concepts: 47
Expansion ratio: 47.0x
=== VOCABULARY COVERAGE ===
SNOMED: 28 concepts
ICD10CM: 12 concepts
ICD10: 4 concepts
Read: 3 concepts
Concept set is network-ready!
Integration Patterns
1. ATLAS/OHDSI Workflow Integration
Pre-populate concept recommendations before importing to ATLAS:Copy
class ATLASConceptSetBuilder:
"""Build ATLAS-compatible concept set expressions."""
def __init__(self, omophub_client):
self.client = omophub_client
self.expansion_service = PhenotypeExpansionService(
omophub_client.config.api_key
)
def build_atlas_expression(
self,
clinical_term: str,
include_descendants: bool = True
) -> Dict[str, Any]:
"""Build an ATLAS-compatible concept set expression.
Args:
clinical_term: Clinical term to build concept set for
include_descendants: Whether to include descendants
Returns:
ATLAS concept set expression JSON
"""
# Get expanded concept set
starting = self.expansion_service.get_starting_concept(clinical_term)
expanded = self.expansion_service.expand_concept_set(
seed_concepts=[starting["concept_id"]],
max_iterations=2
)
# Build ATLAS expression format
items = []
for concept_id, details in expanded["concept_set"]["concepts"].items():
items.append({
"concept": {
"CONCEPT_ID": int(concept_id),
"CONCEPT_NAME": details.get("concept_name"),
"STANDARD_CONCEPT": details.get("standard_concept"),
"STANDARD_CONCEPT_CAPTION": "Standard",
"INVALID_REASON": details.get("invalid_reason"),
"CONCEPT_CODE": details.get("concept_code"),
"DOMAIN_ID": details.get("domain_id"),
"VOCABULARY_ID": details.get("vocabulary_id"),
"CONCEPT_CLASS_ID": details.get("concept_class_id")
},
"isExcluded": False,
"includeDescendants": include_descendants,
"includeMapped": True
})
return {
"items": items,
"name": f"{clinical_term} - Expanded",
"description": f"Auto-generated from PHOEBE recommendations. "
f"Expansion ratio: {expanded['expansion_summary']['expansion_ratio']:.1f}x"
}
def export_for_atlas(self, expression: Dict[str, Any], filepath: str):
"""Export concept set expression as JSON for ATLAS import."""
import json
with open(filepath, 'w') as f:
json.dump(expression, f, indent=2)
print(f"Exported to {filepath}")
2. LLM/AI Mapping Validation Pipeline
Use recommendations to validate AI-generated concept mappings:Copy
class LLMMappingValidator:
"""Validate LLM-generated concept mappings using PHOEBE recommendations."""
def __init__(self, omophub_client):
self.client = omophub_client
def validate_llm_mapping(
self,
source_term: str,
llm_mapped_concept_id: int,
confidence_threshold: float = 0.7
) -> Dict[str, Any]:
"""Validate an LLM-generated mapping against PHOEBE recommendations.
Args:
source_term: Original term that was mapped
llm_mapped_concept_id: Concept ID suggested by LLM
confidence_threshold: Minimum confidence for validation
Returns:
Validation result with confidence score and alternatives
"""
# Get the LLM's mapped concept details
llm_concept = self.client.get_concept(llm_mapped_concept_id)
# Search for the source term to find expected concepts
search_results = self.client.search_concepts({
"query": source_term,
"standard_concepts_only": True,
"limit": 20
})
# Get recommendations for top search results
top_concept_ids = [c["concept_id"] for c in search_results.get("concepts", [])[:5]]
if not top_concept_ids:
return {
"valid": False,
"confidence": 0.0,
"reason": "No matching concepts found for source term",
"alternatives": []
}
recommendations = self.client.get_recommended_concepts(
concept_ids=top_concept_ids
)
# Check if LLM mapping appears in recommendations
all_recommended_ids = set()
for concept_id, recs in recommendations.get("data", {}).items():
all_recommended_ids.add(int(concept_id))
for rec in recs:
all_recommended_ids.add(rec.get("concept_id"))
# Also include direct search results
search_concept_ids = {c["concept_id"] for c in search_results.get("concepts", [])}
# Calculate validation confidence
in_search = llm_mapped_concept_id in search_concept_ids
in_recommendations = llm_mapped_concept_id in all_recommended_ids
if in_search:
# Direct match in search results
confidence = 0.95
validation_path = "direct_search_match"
elif in_recommendations:
# Found in recommendations
confidence = 0.85
validation_path = "recommendation_match"
else:
# Not found - check semantic similarity
confidence = 0.3
validation_path = "no_match"
# Get alternative suggestions
alternatives = []
if confidence < confidence_threshold:
for concept in search_results.get("concepts", [])[:5]:
if concept["concept_id"] != llm_mapped_concept_id:
alternatives.append({
"concept_id": concept["concept_id"],
"concept_name": concept["concept_name"],
"vocabulary_id": concept["vocabulary_id"]
})
return {
"valid": confidence >= confidence_threshold,
"confidence": confidence,
"validation_path": validation_path,
"llm_mapping": {
"concept_id": llm_mapped_concept_id,
"concept_name": llm_concept.get("data", {}).get("concept_name")
},
"alternatives": alternatives,
"recommendation": "accept" if confidence >= confidence_threshold else "review"
}
def batch_validate(
self,
mappings: List[Dict[str, Any]]
) -> Dict[str, Any]:
"""Validate multiple LLM mappings in batch.
Args:
mappings: List of {"source_term": str, "concept_id": int}
Returns:
Batch validation results with summary statistics
"""
results = []
for mapping in mappings:
result = self.validate_llm_mapping(
source_term=mapping["source_term"],
llm_mapped_concept_id=mapping["concept_id"]
)
results.append({
"source_term": mapping["source_term"],
"mapped_concept_id": mapping["concept_id"],
**result
})
# Calculate summary statistics
valid_count = sum(1 for r in results if r["valid"])
return {
"results": results,
"summary": {
"total": len(results),
"valid": valid_count,
"needs_review": len(results) - valid_count,
"accuracy_rate": valid_count / len(results) if results else 0
}
}
3. Network Study Harmonization
Ensure concept sets work across international data sources:Copy
class NetworkStudyHarmonizer:
"""Harmonize concept sets for multi-site network studies."""
# Vocabulary priorities by region
REGIONAL_VOCABS = {
"US": ["ICD10CM", "ICD9CM", "HCPCS", "NDC"],
"EU": ["ICD10", "ATC", "Read"],
"UK": ["Read", "ICD10", "SNOMED"],
"Asia": ["ICD10", "KCD", "SNOMED"],
"International": ["SNOMED", "LOINC", "RxNorm", "ATC"]
}
def __init__(self, omophub_client):
self.client = omophub_client
self.expansion_service = PhenotypeExpansionService(
omophub_client.config.api_key
)
def harmonize_for_network(
self,
concept_ids: List[int],
target_regions: List[str] = None
) -> Dict[str, Any]:
"""Expand concept set to cover target regional vocabularies.
Args:
concept_ids: Starting concept IDs
target_regions: Regions to target (default: all)
Returns:
Harmonized concept set with regional coverage analysis
"""
if target_regions is None:
target_regions = ["US", "EU", "UK", "International"]
# Get all target vocabularies
target_vocabs = set()
for region in target_regions:
target_vocabs.update(self.REGIONAL_VOCABS.get(region, []))
# Expand concept set
expanded = self.expansion_service.expand_concept_set(
seed_concepts=concept_ids,
max_iterations=3
)
# Analyze coverage by region
concepts = expanded["concept_set"]["concepts"]
regional_coverage = {}
for region in target_regions:
region_vocabs = self.REGIONAL_VOCABS[region]
covered = []
missing = []
for vocab in region_vocabs:
vocab_concepts = [
c for c in concepts.values()
if c.get("vocabulary_id") == vocab
]
if vocab_concepts:
covered.append(vocab)
else:
missing.append(vocab)
regional_coverage[region] = {
"covered_vocabularies": covered,
"missing_vocabularies": missing,
"coverage_score": len(covered) / len(region_vocabs) if region_vocabs else 1.0
}
# Overall network readiness
min_coverage = min(r["coverage_score"] for r in regional_coverage.values())
return {
"concept_set": expanded["concept_set"],
"regional_coverage": regional_coverage,
"network_readiness": {
"minimum_regional_coverage": min_coverage,
"ready_for_network": min_coverage >= 0.7,
"recommendation": self._get_harmonization_recommendation(regional_coverage)
}
}
def _get_harmonization_recommendation(
self,
regional_coverage: Dict[str, Any]
) -> str:
"""Generate recommendation based on coverage analysis."""
gaps = []
for region, coverage in regional_coverage.items():
if coverage["missing_vocabularies"]:
gaps.append(f"{region}: missing {', '.join(coverage['missing_vocabularies'])}")
if not gaps:
return "Concept set has comprehensive coverage across all target regions."
else:
return f"Consider adding mappings for: {'; '.join(gaps)}"
Performance Optimization
Batch Processing Multiple Concept Sets
Copy
import asyncio
from concurrent.futures import ThreadPoolExecutor
class BatchPhenotypeProcessor:
"""Process multiple phenotypes efficiently in batch."""
def __init__(self, omophub_client, max_workers: int = 5):
self.client = omophub_client
self.max_workers = max_workers
self.expansion_service = PhenotypeExpansionService(
omophub_client.config.api_key
)
def process_phenotype_batch(
self,
phenotype_definitions: List[Dict[str, Any]]
) -> List[Dict[str, Any]]:
"""Process multiple phenotypes in parallel.
Args:
phenotype_definitions: List of {"name": str, "seed_concepts": List[int]}
Returns:
List of expanded phenotype definitions
"""
# First, collect all unique concept IDs
all_concept_ids = set()
for phenotype in phenotype_definitions:
all_concept_ids.update(phenotype.get("seed_concepts", []))
# Batch fetch all recommendations at once
all_recommendations = self.client.get_recommended_concepts(
concept_ids=list(all_concept_ids)
)
# Process each phenotype using cached recommendations
results = []
for phenotype in phenotype_definitions:
expanded = self._expand_with_cache(
phenotype=phenotype,
recommendations_cache=all_recommendations
)
results.append(expanded)
return results
def _expand_with_cache(
self,
phenotype: Dict[str, Any],
recommendations_cache: Dict[str, Any]
) -> Dict[str, Any]:
"""Expand a phenotype using cached recommendations."""
seed_concepts = phenotype.get("seed_concepts", [])
included = set(seed_concepts)
for concept_id in seed_concepts:
recs = recommendations_cache.get("data", {}).get(str(concept_id), [])
for rec in recs:
if rec.get("standard_concept") == "S":
included.add(rec.get("concept_id"))
return {
"name": phenotype.get("name"),
"seed_concepts": seed_concepts,
"expanded_concepts": list(included),
"expansion_ratio": len(included) / len(seed_concepts) if seed_concepts else 0
}
# Usage Example
processor = BatchPhenotypeProcessor(client)
phenotypes = [
{"name": "Type 2 Diabetes", "seed_concepts": [201826]},
{"name": "Hypertension", "seed_concepts": [320128]},
{"name": "Heart Failure", "seed_concepts": [316139]},
{"name": "COPD", "seed_concepts": [255573]},
{"name": "Asthma", "seed_concepts": [317009]}
]
results = processor.process_phenotype_batch(phenotypes)
for result in results:
print(f"{result['name']}: {len(result['seed_concepts'])} → {len(result['expanded_concepts'])} concepts")
Caching Strategy
Copy
import redis
import json
import hashlib
from typing import Optional
class CachedExpansionService(PhenotypeExpansionService):
"""Expansion service with Redis caching for improved performance."""
def __init__(
self,
api_key: str,
redis_url: str = "redis://localhost:6379",
cache_ttl: int = 86400 # 24 hours
):
super().__init__(api_key)
self.redis = redis.from_url(redis_url)
self.cache_ttl = cache_ttl
def _cache_key(self, prefix: str, *args) -> str:
"""Generate cache key from arguments."""
key_data = json.dumps(args, sort_keys=True)
hash_val = hashlib.md5(key_data.encode()).hexdigest()
return f"phenotype:{prefix}:{hash_val}"
def get_recommendations(
self,
concept_ids: List[int],
domains: Optional[List[str]] = None,
vocabularies: Optional[List[str]] = None
) -> Dict[str, Any]:
"""Get recommendations with caching."""
cache_key = self._cache_key("recs", concept_ids, domains, vocabularies)
# Check cache
cached = self.redis.get(cache_key)
if cached:
return json.loads(cached)
# Fetch from API
result = super().get_recommendations(concept_ids, domains, vocabularies)
# Cache result
self.redis.setex(
cache_key,
self.cache_ttl,
json.dumps(result, default=str)
)
return result
def invalidate_cache(self, pattern: str = "phenotype:*"):
"""Invalidate cached recommendations."""
keys = self.redis.keys(pattern)
if keys:
self.redis.delete(*keys)
print(f"Invalidated {len(keys)} cache entries")
Quality Assurance
Concept Set Validation
Copy
class ConceptSetValidator:
"""Validate expanded concept sets for quality and completeness."""
def __init__(self, omophub_client):
self.client = omophub_client
def validate_concept_set(
self,
concept_ids: List[int],
expected_domain: str = None
) -> Dict[str, Any]:
"""Comprehensive validation of a concept set.
Args:
concept_ids: Concept IDs to validate
expected_domain: Expected primary domain (Condition, Drug, etc.)
Returns:
Validation results with issues and recommendations
"""
issues = []
warnings = []
# Fetch all concept details
concepts = {}
for concept_id in concept_ids:
result = self.client.get_concept(concept_id)
if result.get("data"):
concepts[concept_id] = result["data"]
# Check 1: All concepts exist and are valid
missing = set(concept_ids) - set(concepts.keys())
if missing:
issues.append({
"type": "missing_concepts",
"severity": "error",
"message": f"{len(missing)} concept IDs not found",
"details": list(missing)
})
# Check 2: All concepts are standard
non_standard = [
cid for cid, c in concepts.items()
if c.get("standard_concept") != "S"
]
if non_standard:
issues.append({
"type": "non_standard_concepts",
"severity": "warning",
"message": f"{len(non_standard)} non-standard concepts included",
"details": non_standard
})
# Check 3: Domain consistency
if expected_domain:
wrong_domain = [
cid for cid, c in concepts.items()
if c.get("domain_id") != expected_domain
]
if wrong_domain:
warnings.append({
"type": "domain_mismatch",
"severity": "warning",
"message": f"{len(wrong_domain)} concepts outside expected domain '{expected_domain}'",
"details": wrong_domain
})
# Check 4: Invalid/deprecated concepts
invalid = [
cid for cid, c in concepts.items()
if c.get("invalid_reason") is not None
]
if invalid:
issues.append({
"type": "invalid_concepts",
"severity": "error",
"message": f"{len(invalid)} concepts are invalid/deprecated",
"details": invalid
})
# Check 5: Vocabulary diversity
vocab_counts = {}
for c in concepts.values():
vocab = c.get("vocabulary_id", "Unknown")
vocab_counts[vocab] = vocab_counts.get(vocab, 0) + 1
if len(vocab_counts) == 1:
warnings.append({
"type": "single_vocabulary",
"severity": "info",
"message": "Concept set uses only one vocabulary - may have limited network coverage",
"details": list(vocab_counts.keys())
})
# Calculate validation score
error_count = sum(1 for i in issues if i["severity"] == "error")
warning_count = sum(1 for i in issues + warnings if i["severity"] == "warning")
score = 1.0
score -= error_count * 0.2
score -= warning_count * 0.05
score = max(0, score)
return {
"valid": error_count == 0,
"score": score,
"concept_count": len(concepts),
"vocabulary_distribution": vocab_counts,
"issues": issues,
"warnings": warnings,
"recommendation": self._get_validation_recommendation(issues, warnings)
}
def _get_validation_recommendation(
self,
issues: List[Dict],
warnings: List[Dict]
) -> str:
"""Generate recommendation based on validation results."""
if not issues and not warnings:
return "Concept set passed all validation checks."
recommendations = []
for issue in issues:
if issue["type"] == "non_standard_concepts":
recommendations.append(
"Replace non-standard concepts with their standard equivalents"
)
elif issue["type"] == "invalid_concepts":
recommendations.append(
"Remove or replace deprecated concepts with current versions"
)
for warning in warnings:
if warning["type"] == "single_vocabulary":
recommendations.append(
"Consider expanding to include mappings from other vocabularies"
)
return "; ".join(recommendations) if recommendations else "Review flagged items."
Best Practices
1. Start with Standard Concepts
Always begin phenotype development with standard SNOMED concepts:Copy
# Good: Start with standard concept
starting_concept = client.search_concepts({
"query": "type 2 diabetes mellitus",
"vocabularies": ["SNOMED"],
"standard_concepts_only": True
})
# Avoid: Starting with source vocabulary codes
# ICD codes should be discovered through expansion, not used as seeds
2. Iterative Refinement Workflow
Copy
def iterative_phenotype_development(
client,
clinical_term: str,
max_rounds: int = 3
) -> Dict[str, Any]:
"""
Recommended workflow for developing production phenotypes.
1. Start with core concept
2. Get recommendations
3. Clinical expert review
4. Add approved concepts
5. Repeat until comprehensive
"""
expansion_service = PhenotypeExpansionService(client.config.api_key)
# Round 1: Get starting point
print(f"=== ROUND 1: Finding starting concept for '{clinical_term}' ===")
starting = expansion_service.get_starting_concept(clinical_term)
current_concepts = [starting["concept_id"]]
all_reviewed = set(current_concepts)
for round_num in range(2, max_rounds + 1):
print(f"\n=== ROUND {round_num}: Expanding concept set ===")
# Get recommendations for current set
recommendations = expansion_service.get_recommendations(current_concepts)
# Categorize for review
new_candidates = []
for concept_id, recs in recommendations.get("data", {}).items():
for rec in recs:
rec_id = rec.get("concept_id")
if rec_id not in all_reviewed:
new_candidates.append(rec)
all_reviewed.add(rec_id)
if not new_candidates:
print("No new recommendations - concept set is complete")
break
# In production, this would be clinical expert review
# Here we auto-accept standard concepts
accepted = [
c for c in new_candidates
if c.get("standard_concept") == "S"
]
print(f" Reviewed: {len(new_candidates)} candidates")
print(f" Accepted: {len(accepted)} concepts")
current_concepts = [c["concept_id"] for c in accepted]
return {
"final_concept_ids": list(all_reviewed),
"total_concepts": len(all_reviewed),
"rounds_completed": round_num
}
3. Network Study Considerations
Copy
# When building phenotypes for network studies:
# 1. Always check vocabulary coverage
coverage = expansion_service.analyze_vocabulary_coverage(expanded_set)
if not coverage["network_ready"]:
print("Warning: Concept set may not perform well across all sites")
print(f"Coverage gaps: {coverage['coverage_gaps']}")
# 2. Include both specific and broad codes
# Specific codes capture definitive cases
# Broad codes capture cases coded with less specificity
# 3. Consider regional coding practices
# US tends to use specific ICD10CM codes
# EU/international often use broader ICD10 codes
# 4. Test on multiple data sources before deployment
# Use Cohort Diagnostics to compare cohort characteristics
4. Documentation and Provenance
Copy
def document_concept_set(
concept_set: Dict[str, Any],
author: str,
purpose: str
) -> Dict[str, Any]:
"""Create comprehensive documentation for a concept set."""
return {
"metadata": {
"name": concept_set.get("name"),
"author": author,
"created_date": datetime.now().isoformat(),
"purpose": purpose,
"generation_method": "PHOEBE-based expansion via OMOPHub API"
},
"concept_set": concept_set,
"provenance": {
"seed_concepts": concept_set.get("seed_concepts", []),
"expansion_iterations": concept_set.get("iterations", 0),
"vocabularies_included": list(
concept_set.get("vocabulary_distribution", {}).keys()
),
"validation_score": concept_set.get("validation_score")
},
"usage_notes": [
"This concept set was generated using data-driven recommendations",
"Clinical review is recommended before use in regulatory submissions",
"Vocabulary coverage has been optimized for network studies"
]
}