Skip to main content

FHIR Integration with OMOPHub

Overview

Fast Healthcare Interoperability Resources (FHIR) is the global standard for exchanging healthcare information electronically. OMOPHub provides comprehensive terminology services that enable FHIR-compliant applications to properly code, validate, and translate medical concepts across different vocabularies. This guide demonstrates how to leverage OMOPHub as a terminology server for FHIR resources, supporting ValueSet expansion, ConceptMap translation, and proper coding of clinical data.

Why FHIR Needs Robust Terminology Services

FHIR resources rely heavily on standardized medical terminologies:
  • Observation: LOINC codes for lab results and vital signs
  • Condition: SNOMED CT and ICD-10 for diagnoses
  • Medication: RxNorm for drug identification
  • Procedure: HCPCS and SNOMED CT for procedures
  • AllergyIntolerance: SNOMED CT for allergen coding
Without proper terminology services, FHIR implementations face:
  • Inconsistent coding across systems
  • Failed validation of resources
  • Inability to map between required code systems
  • Non-compliance with implementation guides

Architecture Overview

Core Implementation

FHIR Terminology Service Foundation

import requests
import time
from typing import List, Dict, Optional, Any
from dataclasses import dataclass
from datetime import datetime
import json
import logging
from enum import Enum

class FHIRResourceType(Enum):
    OBSERVATION = "Observation"
    CONDITION = "Condition"
    MEDICATION_REQUEST = "MedicationRequest"
    PROCEDURE = "Procedure"
    ALLERGY_INTOLERANCE = "AllergyIntolerance"

@dataclass
class CodeableConcept:
    """FHIR CodeableConcept representation"""
    text: str
    codings: List[Dict[str, str]]

@dataclass
class ValueSetExpansion:
    """Expanded ValueSet with all included concepts"""
    id: str
    name: str
    url: str
    concepts: List[Dict[str, Any]]
    total: int
    timestamp: str

@dataclass
class ConceptMapTranslation:
    """Translation between source and target code systems"""
    source_code: str
    source_system: str
    target_code: str
    target_system: str
    equivalence: str
    confidence: float

class OMOPHubFHIRTerminologyService:
    """
    FHIR-compliant terminology service using OMOPHub API.
    Implements key FHIR terminology operations.
    """
    
    def __init__(self, api_key: str, base_url: str = "https://api.omophub.com"):
        self.api_key = api_key
        self.base_url = base_url
        self.headers = {"Authorization": f"Bearer {api_key}"}
        self.logger = logging.getLogger(__name__)
        
        # FHIR system URIs
        self.system_uris = {
            "SNOMED": "http://snomed.info/sct",
            "LOINC": "http://loinc.org",
            "ICD10CM": "http://hl7.org/fhir/sid/icd-10-cm",
            "ICD10PCS": "http://hl7.org/fhir/sid/icd-10-pcs",
            "RXNORM": "http://www.nlm.nih.gov/research/umls/rxnorm",
            "HCPCS": "https://www.cms.gov/Medicare/Coding/HCPCSReleaseCodeSets",
            "HCPCS": "https://www.cms.gov/Medicare/Coding/HCPCSReleaseCodeSets",
            "NDC": "http://hl7.org/fhir/sid/ndc",
            "CVX": "http://hl7.org/fhir/sid/cvx",
            "UCUM": "http://unitsofmeasure.org"
        }
        
        # OMOPHub to FHIR vocabulary mapping
        self.vocabulary_mapping = {
            "SNOMED": "SNOMED",
            "LOINC": "LOINC",
            "ICD10CM": "ICD10CM",
            "ICD10PCS": "ICD10PCS",
            "RxNorm": "RXNORM",
            "HCPCS": "HCPCS",
            "HCPCS": "HCPCS",
            "NDC": "NDC",
            "CVX": "CVX"
        }
    
    def validate_code(self, code: str, system: str, display: Optional[str] = None) -> Dict[str, Any]:
        """
        Validate a code within a specified system.
        FHIR $validate-code operation.
        """
        try:
            # Map FHIR system URI to OMOPHub vocabulary
            vocabulary_id = self._get_vocabulary_id(system)
            
            # Search for the code
            response = requests.get(
                f"{self.base_url}/v1/concepts/search",
                headers=self.headers,
                params={
                    'query': code,
                    'vocabulary_ids': vocabulary_id,
                    'page_size': 1
                }
            )
            response.raise_for_status()
            
            data = response.json()
            
            if data['data']:
                concept = data['data'][0]
                valid = concept['concept_code'] == code
                
                # Check display text if provided
                display_valid = True
                if display and valid:
                    display_valid = display.lower() in concept['concept_name'].lower()
                
                return {
                    'valid': valid and display_valid,
                    'concept': concept,
                    'message': 'Code is valid' if valid else f'Code {code} not found in {system}'
                }
            else:
                return {
                    'valid': False,
                    'message': f'Code {code} not found in {system}'
                }
                
        except requests.RequestException as e:
            self.logger.error(f"Error validating code: {e}")
            return {
                'valid': False,
                'message': f'Error validating code: {str(e)}'
            }
    
    def expand_valueset(self, valueset_definition: Dict[str, Any]) -> ValueSetExpansion:
        """
        Expand a FHIR ValueSet to include all member concepts.
        FHIR $expand operation.
        """
        expanded_concepts = []
        
        # Process each include section
        for include in valueset_definition.get('compose', {}).get('include', []):
            system = include.get('system')
            
            if not system:
                continue
            
            vocabulary_id = self._get_vocabulary_id(system)
            
            # Handle different inclusion patterns
            if 'concept' in include:
                # Explicit concept list
                for concept_ref in include['concept']:
                    code = concept_ref.get('code')
                    if code:
                        result = self.validate_code(code, system)
                        if result['valid']:
                            expanded_concepts.append({
                                'system': system,
                                'code': code,
                                'display': result['concept']['concept_name']
                            })
            
            elif 'filter' in include:
                # Filter-based inclusion
                for filter_item in include['filter']:
                    property_name = filter_item.get('property')
                    op = filter_item.get('op')
                    value = filter_item.get('value')
                    
                    if property_name == 'concept' and op == 'is-a':
                        # Get descendants of a concept
                        descendants = self._get_concept_descendants(value, vocabulary_id)
                        for desc in descendants:
                            expanded_concepts.append({
                                'system': system,
                                'code': desc['concept_code'],
                                'display': desc['concept_name']
                            })
            
            else:
                # Include all concepts from the system (limited for demo)
                all_concepts = self._get_system_concepts(vocabulary_id, limit=100)
                for concept in all_concepts:
                    expanded_concepts.append({
                        'system': system,
                        'code': concept['concept_code'],
                        'display': concept['concept_name']
                    })
        
        # Process exclusions
        for exclude in valueset_definition.get('compose', {}).get('exclude', []):
            system = exclude.get('system')
            if 'concept' in exclude:
                for concept_ref in exclude['concept']:
                    code = concept_ref.get('code')
                    expanded_concepts = [
                        c for c in expanded_concepts 
                        if not (c['system'] == system and c['code'] == code)
                    ]
        
        return ValueSetExpansion(
            id=valueset_definition.get('id', 'expanded-valueset'),
            name=valueset_definition.get('name', 'Expanded ValueSet'),
            url=valueset_definition.get('url', ''),
            concepts=expanded_concepts,
            total=len(expanded_concepts),
            timestamp=datetime.utcnow().isoformat()
        )
    
    def translate_concept(self, code: str, source_system: str, target_system: str) -> List[ConceptMapTranslation]:
        """
        Translate a concept from one code system to another.
        FHIR $translate operation.
        """
        translations = []
        
        try:
            source_vocab = self._get_vocabulary_id(source_system)
            target_vocab = self._get_vocabulary_id(target_system)
            
            # Get source concept
            source_response = requests.get(
                f"{self.base_url}/v1/concepts/search",
                headers=self.headers,
                params={
                    'query': code,
                    'vocabulary_ids': source_vocab,
                    'page_size': 1
                }
            )
            source_response.raise_for_status()
            source_data = source_response.json()
            
            if not source_data['data']:
                return translations
            
            source_concept_id = source_data['data'][0]['concept_id']
            
            # Get mappings
            mappings_response = requests.get(
                f"{self.base_url}/v1/concepts/{source_concept_id}/mappings",
                headers=self.headers,
                params={
                    'target_vocabularies': target_vocab
                }
            )
            mappings_response.raise_for_status()
            mappings_data = mappings_response.json()
            
            # Create translations
            for mapping in mappings_data.get('data', []):
                if mapping['target_vocabulary_id'] == target_vocab:
                    translations.append(ConceptMapTranslation(
                        source_code=code,
                        source_system=source_system,
                        target_code=mapping['target_concept_code'],
                        target_system=target_system,
                        equivalence=self._get_fhir_equivalence(mapping.get('relationship_id')),
                        confidence=mapping.get('confidence', 1.0)
                    ))
            
            return translations
            
        except requests.RequestException as e:
            self.logger.error(f"Error translating concept: {e}")
            return translations
    
    def create_codeable_concept(self, code: str, system: str, display: Optional[str] = None) -> CodeableConcept:
        """
        Create a FHIR CodeableConcept with proper coding.
        """
        # Validate and get full concept details
        validation = self.validate_code(code, system)
        
        if validation['valid']:
            concept = validation['concept']
            display_text = display or concept['concept_name']
            
            # Create primary coding
            codings = [{
                'system': system,
                'code': code,
                'display': display_text
            }]
            
            # Add additional codings from mappings if available
            try:
                # Get standard concept mappings
                if concept.get('standard_concept') == 'S':
                    mapping_endpoint = f"{self.base_url}/v1/concepts/{concept['concept_id']}/mappings"
                    mappings_response = requests.get(
                        mapping_endpoint,
                        headers=self.headers,
                        params={'relationship_types': 'Maps to'}
                    )
                    mappings_response.raise_for_status()
                    mappings = mappings_response.json()
                    
                    for mapping in mappings.get('data', [])[:2]:  # Limit to 2 additional codings
                        target_system = self._get_fhir_system(mapping['target_vocabulary_id'])
                        if target_system:
                            codings.append({
                                'system': target_system,
                                'code': mapping['target_concept_code'],
                                'display': mapping.get('target_concept_name', '')
                            })
            except requests.exceptions.RequestException as e:
                # Log network/HTTP errors with context
                if hasattr(self, 'logger'):
                    self.logger.exception(
                        f"Failed to fetch concept mappings for concept_id={concept['concept_id']} "
                        f"from endpoint={mapping_endpoint}: {e}"
                    )
                else:
                    logging.exception(
                        f"Failed to fetch concept mappings for concept_id={concept['concept_id']} "
                        f"from endpoint={mapping_endpoint}: {e}"
                    )
            except (KeyError, ValueError) as e:
                # Log data parsing errors with context
                if hasattr(self, 'logger'):
                    self.logger.exception(
                        f"Failed to parse concept mapping response for concept_id={concept['concept_id']}: {e}"
                    )
                else:
                    logging.exception(
                        f"Failed to parse concept mapping response for concept_id={concept['concept_id']}: {e}"
                    )
            
            return CodeableConcept(
                text=display_text,
                codings=codings
            )
        else:
            # Return basic CodeableConcept even if not validated
            return CodeableConcept(
                text=display or code,
                codings=[{
                    'system': system,
                    'code': code,
                    'display': display or code
                }]
            )
    
    def lookup_display(self, code: str, system: str) -> Optional[str]:
        """
        Look up the display text for a code.
        FHIR $lookup operation.
        """
        validation = self.validate_code(code, system)
        if validation['valid']:
            return validation['concept']['concept_name']
        return None
    
    def subsumes(self, code_a: str, code_b: str, system: str) -> Optional[str]:
        """
        Test the subsumption relationship between two codes.
        Returns: 'subsumes', 'subsumed-by', 'equivalent', or None
        """
        try:
            vocabulary_id = self._get_vocabulary_id(system)
            
            # Get both concepts
            response_a = requests.get(
                f"{self.base_url}/v1/concepts/search",
                headers=self.headers,
                params={'query': code_a, 'vocabulary_ids': vocabulary_id, 'page_size': 1}
            )
            response_b = requests.get(
                f"{self.base_url}/v1/concepts/search",
                headers=self.headers,
                params={'query': code_b, 'vocabulary_ids': vocabulary_id, 'page_size': 1}
            )
            
            response_a.raise_for_status()
            response_b.raise_for_status()
            
            data_a = response_a.json()
            data_b = response_b.json()
            
            if not (data_a['data'] and data_b['data']):
                return None
            
            concept_a = data_a['data'][0]
            concept_b = data_b['data'][0]
            
            if concept_a['concept_id'] == concept_b['concept_id']:
                return 'equivalent'
            
            # Check if A subsumes B (B is descendant of A)
            descendants_a = self._get_concept_descendants(
                code_a, 
                vocabulary_id, 
                include_self=False
            )
            descendant_ids_a = [d['concept_id'] for d in descendants_a]
            
            if concept_b['concept_id'] in descendant_ids_a:
                return 'subsumes'
            
            # Check if B subsumes A (A is descendant of B)
            descendants_b = self._get_concept_descendants(
                code_b, 
                vocabulary_id, 
                include_self=False
            )
            descendant_ids_b = [d['concept_id'] for d in descendants_b]
            
            if concept_a['concept_id'] in descendant_ids_b:
                return 'subsumed-by'
            
            return None
            
        except requests.RequestException as e:
            self.logger.error(f"Error checking subsumption: {e}")
            return None
    
    def _get_vocabulary_id(self, system_uri: str) -> str:
        """Map FHIR system URI to OMOPHub vocabulary ID."""
        # Reverse lookup from system URI
        for vocab, uri in self.system_uris.items():
            if uri == system_uri:
                return vocab
        
        # Check if it's already a vocabulary ID
        if system_uri in self.vocabulary_mapping:
            return system_uri
        
        raise ValueError(f"Unknown system URI: {system_uri}")
    
    def _get_fhir_system(self, vocabulary_id: str) -> Optional[str]:
        """Map OMOPHub vocabulary ID to FHIR system URI."""
        return self.system_uris.get(vocabulary_id)
    
    def _get_fhir_equivalence(self, relationship_id: str) -> str:
        """Map relationship to FHIR ConceptMap equivalence."""
        equivalence_map = {
            'Maps to': 'equivalent',
            'Is a': 'subsumes',
            'Subsumes': 'subsumes',
            'Has finding site': 'relatedto',
            'Has associated morphology': 'relatedto',
            'Has causative agent': 'relatedto'
        }
        return equivalence_map.get(relationship_id, 'relatedto')
    
    def _get_concept_descendants(self, code: str, vocabulary_id: str, include_self: bool = True) -> List[Dict]:
        """Get all descendant concepts."""
        try:
            # First get the concept
            search_response = requests.get(
                f"{self.base_url}/v1/concepts/search",
                headers=self.headers,
                params={
                    'query': code,
                    'vocabulary_ids': vocabulary_id,
                    'page_size': 1
                }
            )
            search_response.raise_for_status()
            search_data = search_response.json()
            
            if not search_data['data']:
                return []
            
            concept_id = search_data['data'][0]['concept_id']
            
            # Get descendants
            hierarchy_response = requests.get(
                f"{self.base_url}/v1/concepts/{concept_id}/hierarchy",
                headers=self.headers,
                params={
                    'include_descendants': True,
                    'max_levels': 10
                }
            )
            hierarchy_response.raise_for_status()
            hierarchy_data = hierarchy_response.json()
            
            descendants = []
            for item in hierarchy_data.get('data', []):
                if item.get('level', 0) > 0 or (include_self and item.get('level', 0) == 0):
                    descendants.append(item['concept'])
            
            return descendants
            
        except requests.RequestException as e:
            self.logger.error(f"Error getting descendants: {e}")
            return []
    
    def _get_system_concepts(self, vocabulary_id: str, limit: int = 100) -> List[Dict]:
        """Get concepts from a vocabulary (limited for performance)."""
        try:
            response = requests.get(
                f"{self.base_url}/v1/vocabularies/{vocabulary_id}/concepts",
                headers=self.headers,
                params={
                    'page_size': limit,
                    'standard_concept': 'S'
                }
            )
            response.raise_for_status()
            data = response.json()
            return data.get('data', [])
            
        except requests.RequestException as e:
            self.logger.error(f"Error getting system concepts: {e}")
            return []

# FHIR Resource Builders
class FHIRResourceBuilder:
    """Build FHIR resources with proper terminology coding."""
    
    def __init__(self, terminology_service: OMOPHubFHIRTerminologyService):
        self.terminology = terminology_service
    
    def create_observation(self, 
                          loinc_code: str,
                          value: Any,
                          patient_id: str,
                          effective_datetime: str,
                          status: str = 'final') -> Dict[str, Any]:
        """
        Create a FHIR Observation resource with proper LOINC coding.
        """
        # Validate and get display text
        display = self.terminology.lookup_display(loinc_code, "http://loinc.org")
        
        # Create CodeableConcept
        code_concept = self.terminology.create_codeable_concept(
            loinc_code, 
            "http://loinc.org",
            display
        )
        
        observation = {
            "resourceType": "Observation",
            "status": status,
            "code": {
                "coding": code_concept.codings,
                "text": code_concept.text
            },
            "subject": {
                "reference": f"Patient/{patient_id}"
            },
            "effectiveDateTime": effective_datetime
        }
        
        # Add value based on type
        if isinstance(value, (int, float)):
            observation["valueQuantity"] = {
                "value": value,
                "unit": "mg/dL",  # Should be determined based on LOINC code
                "system": "http://unitsofmeasure.org",
                "code": "mg/dL"
            }
        elif isinstance(value, str):
            observation["valueString"] = value
        elif isinstance(value, dict) and 'code' in value:
            # Coded value
            value_concept = self.terminology.create_codeable_concept(
                value['code'],
                value.get('system', 'http://snomed.info/sct')
            )
            observation["valueCodeableConcept"] = {
                "coding": value_concept.codings,
                "text": value_concept.text
            }
        
        return observation
    
    def create_condition(self,
                        diagnosis_code: str,
                        code_system: str,
                        patient_id: str,
                        onset_date: str,
                        clinical_status: str = 'active',
                        verification_status: str = 'confirmed') -> Dict[str, Any]:
        """
        Create a FHIR Condition resource with proper diagnosis coding.
        """
        # Create primary coding
        code_concept = self.terminology.create_codeable_concept(
            diagnosis_code,
            code_system
        )
        
        # Try to add SNOMED translation if not already SNOMED
        if code_system != "http://snomed.info/sct":
            translations = self.terminology.translate_concept(
                diagnosis_code,
                code_system,
                "http://snomed.info/sct"
            )
            for trans in translations:
                if trans.equivalence in ['equivalent', 'subsumes']:
                    code_concept.codings.append({
                        'system': trans.target_system,
                        'code': trans.target_code,
                        'display': self.terminology.lookup_display(
                            trans.target_code, 
                            trans.target_system
                        )
                    })
                    break
        
        condition = {
            "resourceType": "Condition",
            "clinicalStatus": {
                "coding": [{
                    "system": "http://terminology.hl7.org/CodeSystem/condition-clinical",
                    "code": clinical_status
                }]
            },
            "verificationStatus": {
                "coding": [{
                    "system": "http://terminology.hl7.org/CodeSystem/condition-ver-status",
                    "code": verification_status
                }]
            },
            "code": {
                "coding": code_concept.codings,
                "text": code_concept.text
            },
            "subject": {
                "reference": f"Patient/{patient_id}"
            },
            "onsetDateTime": onset_date
        }
        
        return condition
    
    def create_medication_request(self,
                                 rxnorm_code: str,
                                 patient_id: str,
                                 prescriber_id: str,
                                 dosage_instructions: str,
                                 quantity: int,
                                 refills: int = 0) -> Dict[str, Any]:
        """
        Create a FHIR MedicationRequest with RxNorm coding.
        """
        # Get medication details
        med_concept = self.terminology.create_codeable_concept(
            rxnorm_code,
            "http://www.nlm.nih.gov/research/umls/rxnorm"
        )
        
        medication_request = {
            "resourceType": "MedicationRequest",
            "status": "active",
            "intent": "order",
            "medicationCodeableConcept": {
                "coding": med_concept.codings,
                "text": med_concept.text
            },
            "subject": {
                "reference": f"Patient/{patient_id}"
            },
            "requester": {
                "reference": f"Practitioner/{prescriber_id}"
            },
            "dosageInstruction": [{
                "text": dosage_instructions
            }],
            "dispenseRequest": {
                "quantity": {
                    "value": quantity,
                    "unit": "tablet"
                },
                "numberOfRepeatsAllowed": refills
            }
        }
        
        return medication_request
    
    def create_procedure(self,
                        procedure_code: str,
                        code_system: str,
                        patient_id: str,
                        performed_date: str,
                        performer_id: Optional[str] = None) -> Dict[str, Any]:
        """
        Create a FHIR Procedure resource with proper coding.
        """
        # Create procedure coding
        proc_concept = self.terminology.create_codeable_concept(
            procedure_code,
            code_system
        )
        
        procedure = {
            "resourceType": "Procedure",
            "status": "completed",
            "code": {
                "coding": proc_concept.codings,
                "text": proc_concept.text
            },
            "subject": {
                "reference": f"Patient/{patient_id}"
            },
            "performedDateTime": performed_date
        }
        
        if performer_id:
            procedure["performer"] = [{
                "actor": {
                    "reference": f"Practitioner/{performer_id}"
                }
            }]
        
        return procedure
    
    def create_allergy_intolerance(self,
                                  allergen_code: str,
                                  patient_id: str,
                                  reaction_code: Optional[str] = None,
                                  severity: str = 'moderate') -> Dict[str, Any]:
        """
        Create a FHIR AllergyIntolerance resource.
        """
        # Create allergen coding (typically SNOMED)
        allergen_concept = self.terminology.create_codeable_concept(
            allergen_code,
            "http://snomed.info/sct"
        )
        
        allergy = {
            "resourceType": "AllergyIntolerance",
            "clinicalStatus": {
                "coding": [{
                    "system": "http://terminology.hl7.org/CodeSystem/allergyintolerance-clinical",
                    "code": "active"
                }]
            },
            "verificationStatus": {
                "coding": [{
                    "system": "http://terminology.hl7.org/CodeSystem/allergyintolerance-verification",
                    "code": "confirmed"
                }]
            },
            "code": {
                "coding": allergen_concept.codings,
                "text": allergen_concept.text
            },
            "patient": {
                "reference": f"Patient/{patient_id}"
            },
            "criticality": "low" if severity == 'mild' else "high" if severity == 'severe' else "unable-to-assess"
        }
        
        if reaction_code:
            reaction_concept = self.terminology.create_codeable_concept(
                reaction_code,
                "http://snomed.info/sct"
            )
            allergy["reaction"] = [{
                "manifestation": [{
                    "coding": reaction_concept.codings,
                    "text": reaction_concept.text
                }],
                "severity": severity
            }]
        
        return allergy

# ValueSet Management
class ValueSetManager:
    """Manage FHIR ValueSets using OMOPHub terminology."""
    
    def __init__(self, terminology_service: OMOPHubFHIRTerminologyService):
        self.terminology = terminology_service
    
    def create_valueset_for_condition_category(self, category: str) -> Dict[str, Any]:
        """
        Create a ValueSet for a specific condition category.
        Example: Diabetes, Hypertension, etc.
        """
        # Define common condition categories with SNOMED codes
        category_roots = {
            "diabetes": "73211009",  # Diabetes mellitus
            "hypertension": "38341003",  # Hypertensive disorder
            "asthma": "195967001",  # Asthma
            "cardiovascular": "49601007",  # Disorder of cardiovascular system
            "infectious": "40733004"  # Infectious disease
        }
        
        root_code = category_roots.get(category.lower())
        if not root_code:
            raise ValueError(f"Unknown condition category: {category}")
        
        valueset = {
            "resourceType": "ValueSet",
            "id": f"condition-{category.lower()}",
            "url": f"http://example.org/fhir/ValueSet/condition-{category.lower()}",
            "name": f"{category.title()}Conditions",
            "title": f"{category.title()} Conditions ValueSet",
            "status": "active",
            "compose": {
                "include": [{
                    "system": "http://snomed.info/sct",
                    "filter": [{
                        "property": "concept",
                        "op": "is-a",
                        "value": root_code
                    }]
                }]
            }
        }
        
        return valueset
    
    def create_lab_test_valueset(self, panel_name: str) -> Dict[str, Any]:
        """
        Create a ValueSet for laboratory test panels.
        """
        # Define common lab panels with LOINC codes
        lab_panels = {
            "basic_metabolic": [
                "2345-7",  # Glucose
                "2160-0",  # Creatinine
                "3094-0",  # BUN
                "2951-2",  # Sodium
                "2823-3",  # Potassium
                "2075-0",  # Chloride
                "2028-9"   # CO2
            ],
            "lipid": [
                "2093-3",  # Total cholesterol
                "2085-9",  # HDL cholesterol
                "2089-1",  # LDL cholesterol
                "2571-8"   # Triglycerides
            ],
            "liver": [
                "1742-6",  # ALT
                "1920-8",  # AST
                "1975-2",  # Bilirubin total
                "1968-7",  # Bilirubin direct
                "2885-2",  # Protein total
                "1751-7"   # Albumin
            ]
        }
        
        codes = lab_panels.get(panel_name.lower())
        if not codes:
            raise ValueError(f"Unknown lab panel: {panel_name}")
        
        valueset = {
            "resourceType": "ValueSet",
            "id": f"lab-{panel_name.lower().replace('_', '-')}",
            "url": f"http://example.org/fhir/ValueSet/lab-{panel_name.lower().replace('_', '-')}",
            "name": f"{panel_name.replace('_', ' ').title()}Panel",
            "title": f"{panel_name.replace('_', ' ').title()} Laboratory Panel",
            "status": "active",
            "compose": {
                "include": [{
                    "system": "http://loinc.org",
                    "concept": [{"code": code} for code in codes]
                }]
            }
        }
        
        return valueset
    
    def expand_and_validate(self, valueset: Dict[str, Any]) -> ValueSetExpansion:
        """
        Expand a ValueSet and validate all included codes.
        """
        expansion = self.terminology.expand_valueset(valueset)
        
        # Validate each concept
        validated_concepts = []
        for concept in expansion.concepts:
            validation = self.terminology.validate_code(
                concept['code'],
                concept['system']
            )
            if validation['valid']:
                validated_concepts.append(concept)
            else:
                self.terminology.logger.warning(
                    f"Invalid concept in ValueSet: {concept['code']} in {concept['system']}"
                )
        
        expansion.concepts = validated_concepts
        expansion.total = len(validated_concepts)
        
        return expansion

# ConceptMap Translation Service
class ConceptMapService:
    """Manage FHIR ConceptMaps for terminology translation."""
    
    def __init__(self, terminology_service: OMOPHubFHIRTerminologyService):
        self.terminology = terminology_service
    
    def create_concept_map(self,
                          source_system: str,
                          target_system: str,
                          codes: List[str]) -> Dict[str, Any]:
        """
        Create a ConceptMap for translating between code systems.
        """
        concept_map = {
            "resourceType": "ConceptMap",
            "id": f"map-{source_system.split('/')[-1]}-to-{target_system.split('/')[-1]}",
            "url": f"http://example.org/fhir/ConceptMap/{source_system.split('/')[-1]}-to-{target_system.split('/')[-1]}",
            "name": f"{source_system.split('/')[-1]}To{target_system.split('/')[-1]}Map",
            "status": "active",
            "sourceUri": source_system,
            "targetUri": target_system,
            "group": [{
                "source": source_system,
                "target": target_system,
                "element": []
            }]
        }
        
        # Add mappings for each code
        for code in codes:
            translations = self.terminology.translate_concept(code, source_system, target_system)
            
            if translations:
                element = {
                    "code": code,
                    "display": self.terminology.lookup_display(code, source_system),
                    "target": []
                }
                
                for trans in translations:
                    element["target"].append({
                        "code": trans.target_code,
                        "display": self.terminology.lookup_display(
                            trans.target_code, 
                            trans.target_system
                        ),
                        "equivalence": trans.equivalence
                    })
                
                concept_map["group"][0]["element"].append(element)
        
        return concept_map
    
    def translate_bulk(self,
                      codes: List[str],
                      source_system: str,
                      target_system: str) -> List[Dict[str, Any]]:
        """
        Bulk translate multiple codes between systems.
        """
        translations = []
        
        for code in codes:
            trans_list = self.terminology.translate_concept(code, source_system, target_system)
            
            if trans_list:
                # Use the highest confidence translation
                best_trans = max(trans_list, key=lambda x: x.confidence)
                translations.append({
                    'source_code': code,
                    'source_display': self.terminology.lookup_display(code, source_system),
                    'target_code': best_trans.target_code,
                    'target_display': self.terminology.lookup_display(
                        best_trans.target_code, 
                        best_trans.target_system
                    ),
                    'equivalence': best_trans.equivalence,
                    'confidence': best_trans.confidence
                })
            else:
                translations.append({
                    'source_code': code,
                    'source_display': self.terminology.lookup_display(code, source_system),
                    'target_code': None,
                    'target_display': None,
                    'equivalence': 'unmapped',
                    'confidence': 0.0
                })
        
        return translations

# Example usage
def main():
    # Initialize services
    terminology_service = OMOPHubFHIRTerminologyService("your-api-key")
    resource_builder = FHIRResourceBuilder(terminology_service)
    valueset_manager = ValueSetManager(terminology_service)
    conceptmap_service = ConceptMapService(terminology_service)
    
    # Example 1: Create and validate a FHIR Observation
    observation = resource_builder.create_observation(
        loinc_code="2345-7",  # Glucose
        value=110,
        patient_id="patient123",
        effective_datetime="2024-03-15T10:30:00Z"
    )
    print("Created Observation:", json.dumps(observation, indent=2))
    
    # Example 2: Create a Condition with ICD-10 to SNOMED translation
    condition = resource_builder.create_condition(
        diagnosis_code="E11.9",  # Type 2 diabetes
        code_system="http://hl7.org/fhir/sid/icd-10-cm",
        patient_id="patient123",
        onset_date="2024-01-15"
    )
    print("Created Condition:", json.dumps(condition, indent=2))
    
    # Example 3: Create and expand a ValueSet
    diabetes_valueset = valueset_manager.create_valueset_for_condition_category("diabetes")
    expanded = valueset_manager.expand_and_validate(diabetes_valueset)
    print(f"Expanded ValueSet contains {expanded.total} concepts")
    
    # Example 4: Create a ConceptMap for ICD-10 to SNOMED
    icd_codes = ["E11.9", "I10", "J45.909"]
    concept_map = conceptmap_service.create_concept_map(
        "http://hl7.org/fhir/sid/icd-10-cm",
        "http://snomed.info/sct",
        icd_codes
    )
    print("Created ConceptMap:", json.dumps(concept_map, indent=2))
    
    # Example 5: Bulk translate codes
    translations = conceptmap_service.translate_bulk(
        icd_codes,
        "http://hl7.org/fhir/sid/icd-10-cm",
        "http://snomed.info/sct"
    )
    for trans in translations:
        print(f"{trans['source_code']} -> {trans['target_code']} ({trans['equivalence']})")

if __name__ == "__main__":
    main()

Implementation Examples

Example 1: SMART on FHIR Application

import httpx
from typing import Dict

class SmartOnFhirApp:
    """
    SMART on FHIR application using OMOPHub for terminology.
    """
    
    def __init__(self, omophub_api_key: str, fhir_server_url: str):
        self.terminology = OMOPHubFHIRTerminologyService(omophub_api_key)
        self.resource_builder = FHIRResourceBuilder(self.terminology)
        self.fhir_server = fhir_server_url
    
    async def process_patient_encounter(self, encounter_data: Dict) -> Dict:
        """
        Process a patient encounter and create FHIR resources.
        """
        patient_id = encounter_data['patient_id']
        resources_created = []
        
        # Process diagnoses
        for diagnosis in encounter_data.get('diagnoses', []):
            condition = self.resource_builder.create_condition(
                diagnosis_code=diagnosis['code'],
                code_system=diagnosis['system'],
                patient_id=patient_id,
                onset_date=diagnosis['date']
            )
            
            # Post to FHIR server
            response = await self.post_to_fhir_server(condition)
            resources_created.append({
                'type': 'Condition',
                'id': response.get('id'),
                'code': diagnosis['code']
            })
        
        # Process lab results
        for lab in encounter_data.get('labs', []):
            observation = self.resource_builder.create_observation(
                loinc_code=lab['loinc_code'],
                value=lab['value'],
                patient_id=patient_id,
                effective_datetime=lab['datetime']
            )
            
            response = await self.post_to_fhir_server(observation)
            resources_created.append({
                'type': 'Observation',
                'id': response.get('id'),
                'code': lab['loinc_code']
            })
        
        # Process medications
        for med in encounter_data.get('medications', []):
            med_request = self.resource_builder.create_medication_request(
                rxnorm_code=med['rxnorm_code'],
                patient_id=patient_id,
                prescriber_id=med['prescriber_id'],
                dosage_instructions=med['instructions'],
                quantity=med['quantity']
            )
            
            response = await self.post_to_fhir_server(med_request)
            resources_created.append({
                'type': 'MedicationRequest',
                'id': response.get('id'),
                'code': med['rxnorm_code']
            })
        
        return {
            'encounter_id': encounter_data['encounter_id'],
            'resources_created': resources_created,
            'status': 'completed'
        }
    
    async def post_to_fhir_server(self, resource: Dict) -> Dict:
        """Post a FHIR resource to the server."""
        headers = {
            'Content-Type': 'application/fhir+json',
            'Accept': 'application/fhir+json'
        }
        
        timeout = httpx.Timeout(connect=10.0, read=30.0, write=10.0, pool=5.0)
        async with httpx.AsyncClient(timeout=timeout) as client:
            response = await client.post(
                f"{self.fhir_server}/{resource['resourceType']}",
                json=resource,
                headers=headers
            )
            
            if response.status_code in [200, 201]:
                return response.json()
            else:
                raise Exception(f"Failed to post resource: HTTP {response.status_code} - {response.text}")

Example 2: Bulk FHIR Export with Terminology Validation

class FHIRBulkExporter:
    """
    Export clinical data in FHIR Bulk Data format with terminology validation.
    """
    
    def __init__(self, omophub_api_key: str):
        self.terminology = OMOPHubFHIRTerminologyService(omophub_api_key)
        self.resource_builder = FHIRResourceBuilder(self.terminology)
    
    def export_patient_data(self, patients_data: List[Dict], output_dir: str):
        """
        Export patient data as NDJSON files per FHIR Bulk Data spec.
        """
        import os
        import ndjson
        
        # Prepare output files
        os.makedirs(output_dir, exist_ok=True)
        
        conditions_file = open(os.path.join(output_dir, 'Condition.ndjson'), 'w')
        observations_file = open(os.path.join(output_dir, 'Observation.ndjson'), 'w')
        medications_file = open(os.path.join(output_dir, 'MedicationRequest.ndjson'), 'w')
        
        export_summary = {
            'patients_processed': 0,
            'conditions_exported': 0,
            'observations_exported': 0,
            'medications_exported': 0,
            'validation_errors': []
        }
        
        for patient_data in patients_data:
            patient_id = patient_data['patient_id']
            
            # Export conditions with validation
            for condition_data in patient_data.get('conditions', []):
                try:
                    # Validate code first
                    validation = self.terminology.validate_code(
                        condition_data['code'],
                        condition_data['system']
                    )
                    
                    if validation['valid']:
                        condition = self.resource_builder.create_condition(
                            diagnosis_code=condition_data['code'],
                            code_system=condition_data['system'],
                            patient_id=patient_id,
                            onset_date=condition_data['onset_date']
                        )
                        conditions_file.write(json.dumps(condition) + '\n')
                        export_summary['conditions_exported'] += 1
                    else:
                        export_summary['validation_errors'].append({
                            'patient_id': patient_id,
                            'resource_type': 'Condition',
                            'code': condition_data['code'],
                            'error': validation['message']
                        })
                except Exception as e:
                    export_summary['validation_errors'].append({
                        'patient_id': patient_id,
                        'error': str(e)
                    })
            
            # Export observations with validation
            for obs_data in patient_data.get('observations', []):
                try:
                    validation = self.terminology.validate_code(
                        obs_data['loinc_code'],
                        'http://loinc.org'
                    )
                    
                    if validation['valid']:
                        observation = self.resource_builder.create_observation(
                            loinc_code=obs_data['loinc_code'],
                            value=obs_data['value'],
                            patient_id=patient_id,
                            effective_datetime=obs_data['datetime']
                        )
                        observations_file.write(json.dumps(observation) + '\n')
                        export_summary['observations_exported'] += 1
                except Exception as e:
                    export_summary['validation_errors'].append({
                        'patient_id': patient_id,
                        'error': str(e)
                    })
            
            export_summary['patients_processed'] += 1
        
        # Close files
        conditions_file.close()
        observations_file.close()
        medications_file.close()
        
        # Write export manifest
        manifest = {
            "transactionTime": datetime.utcnow().isoformat(),
            "request": "$export",
            "requiresAccessToken": True,
            "output": [
                {
                    "type": "Condition",
                    "url": f"file://{output_dir}/Condition.ndjson",
                    "count": export_summary['conditions_exported']
                },
                {
                    "type": "Observation",
                    "url": f"file://{output_dir}/Observation.ndjson",
                    "count": export_summary['observations_exported']
                },
                {
                    "type": "MedicationRequest",
                    "url": f"file://{output_dir}/MedicationRequest.ndjson",
                    "count": export_summary['medications_exported']
                }
            ],
            "error": []
        }
        
        with open(os.path.join(output_dir, 'manifest.json'), 'w') as f:
            json.dump(manifest, f, indent=2)
        
        return export_summary

Example 3: FHIR Terminology Server Implementation

from flask import Flask, request, jsonify

class FHIRTerminologyServer:
    """
    Implement FHIR terminology server operations using OMOPHub.
    """
    
    def __init__(self, omophub_api_key: str):
        self.app = Flask(__name__)
        self.terminology = OMOPHubFHIRTerminologyService(omophub_api_key)
        self.setup_routes()
    
    def setup_routes(self):
        """Setup FHIR terminology operation endpoints."""
        
        @self.app.route('/CodeSystem/$validate-code', methods=['POST'])
        def validate_code():
            """FHIR $validate-code operation."""
            params = request.get_json()
            
            code = params.get('code')
            system = params.get('system')
            display = params.get('display')
            
            if not code or not system:
                return jsonify({
                    'resourceType': 'OperationOutcome',
                    'issue': [{
                        'severity': 'error',
                        'code': 'required',
                        'diagnostics': 'Code and system are required'
                    }]
                }), 400
            
            result = self.terminology.validate_code(code, system, display)
            
            return jsonify({
                'resourceType': 'Parameters',
                'parameter': [
                    {
                        'name': 'result',
                        'valueBoolean': result['valid']
                    },
                    {
                        'name': 'message',
                        'valueString': result['message']
                    }
                ]
            })
        
        @self.app.route('/ValueSet/$expand', methods=['POST'])
        def expand_valueset():
            """FHIR $expand operation."""
            valueset = request.get_json()
            
            try:
                expansion = self.terminology.expand_valueset(valueset)
                
                return jsonify({
                    'resourceType': 'ValueSet',
                    'id': expansion.id,
                    'url': expansion.url,
                    'name': expansion.name,
                    'status': 'active',
                    'expansion': {
                        'timestamp': expansion.timestamp,
                        'total': expansion.total,
                        'contains': expansion.concepts
                    }
                })
            except Exception as e:
                return jsonify({
                    'resourceType': 'OperationOutcome',
                    'issue': [{
                        'severity': 'error',
                        'code': 'exception',
                        'diagnostics': str(e)
                    }]
                }), 500
        
        @self.app.route('/ConceptMap/$translate', methods=['POST'])
        def translate():
            """FHIR $translate operation."""
            params = request.get_json()
            
            code = params.get('code')
            source = params.get('source')
            target = params.get('target')
            
            translations = self.terminology.translate_concept(code, source, target)
            
            matches = []
            for trans in translations:
                matches.append({
                    'equivalence': trans.equivalence,
                    'concept': {
                        'code': trans.target_code,
                        'system': trans.target_system
                    }
                })
            
            return jsonify({
                'resourceType': 'Parameters',
                'parameter': [
                    {
                        'name': 'result',
                        'valueBoolean': len(matches) > 0
                    },
                    {
                        'name': 'match',
                        'part': matches
                    }
                ]
            })
        
        @self.app.route('/CodeSystem/$subsumes', methods=['POST'])
        def subsumes():
            """FHIR $subsumes operation."""
            params = request.get_json()
            
            code_a = params.get('codeA')
            code_b = params.get('codeB')
            system = params.get('system')
            
            result = self.terminology.subsumes(code_a, code_b, system)
            
            return jsonify({
                'resourceType': 'Parameters',
                'parameter': [{
                    'name': 'outcome',
                    'valueCode': result or 'not-subsumed'
                }]
            })
    
    def run(self, host='0.0.0.0', port=5000):
        """Run the terminology server."""
        self.app.run(host=host, port=port)

# Usage
if __name__ == '__main__':
    server = FHIRTerminologyServer('your-api-key')
    server.run()

Best Practices

1. Efficient ValueSet Management

class OptimizedValueSetManager:
    """Optimized ValueSet operations with caching."""
    
    def __init__(self, terminology_service: OMOPHubFHIRTerminologyService):
        self.terminology = terminology_service
        self.cache = {}
        self.cache_ttl = 3600  # 1 hour
    
    def get_or_expand_valueset(self, valueset_url: str) -> ValueSetExpansion:
        """Get ValueSet expansion with caching."""
        cache_key = f"valueset:{valueset_url}"
        
        # Check cache
        if cache_key in self.cache:
            cached_data, timestamp = self.cache[cache_key]
            if time.time() - timestamp < self.cache_ttl:
                return cached_data
        
        # Expand ValueSet
        valueset = self.load_valueset_definition(valueset_url)
        expansion = self.terminology.expand_valueset(valueset)
        
        # Cache result
        self.cache[cache_key] = (expansion, time.time())
        
        return expansion
    
    def validate_against_valueset(self, code: str, system: str, valueset_url: str) -> bool:
        """Validate if a code is in a ValueSet."""
        expansion = self.get_or_expand_valueset(valueset_url)
        
        for concept in expansion.concepts:
            if concept['code'] == code and concept['system'] == system:
                return True
        
        return False

2. Batch Operations for Performance

class BatchFHIRProcessor:
    """Process FHIR resources in batches for performance."""
    
    def __init__(self, terminology_service: OMOPHubFHIRTerminologyService):
        self.terminology = terminology_service
        self.batch_size = 100
    
    async def validate_codes_batch(self, codes: List[Tuple[str, str]]) -> List[Dict]:
        """Validate multiple codes in parallel."""
        import asyncio
        
        tasks = []
        for code, system in codes:
            task = asyncio.create_task(
                self.terminology.validate_code(code, system)
            )
            tasks.append(task)
        
        results = await asyncio.gather(*tasks)
        
        return [
            {
                'code': codes[i][0],
                'system': codes[i][1],
                'valid': results[i]['valid'],
                'message': results[i].get('message', '')
            }
            for i in range(len(codes))
        ]
    
    def translate_batch(self, 
                       translations: List[Tuple[str, str, str]]) -> List[ConceptMapTranslation]:
        """Batch translate concepts between systems."""
        all_translations = []
        
        for batch_start in range(0, len(translations), self.batch_size):
            batch = translations[batch_start:batch_start + self.batch_size]
            
            for code, source, target in batch:
                trans = self.terminology.translate_concept(code, source, target)
                all_translations.extend(trans)
        
        return all_translations

3. Error Handling and Resilience

class ResilientFHIRService:
    """FHIR service with error handling and fallbacks."""
    
    def __init__(self, terminology_service: OMOPHubFHIRTerminologyService):
        self.terminology = terminology_service
        self.fallback_mappings = self.load_fallback_mappings()
    
    def safe_validate_code(self, code: str, system: str) -> Dict:
        """Validate code with fallback to local mappings."""
        try:
            return self.terminology.validate_code(code, system)
        except Exception as e:
            # Fallback to local mappings
            if system in self.fallback_mappings:
                if code in self.fallback_mappings[system]:
                    return {
                        'valid': True,
                        'concept': self.fallback_mappings[system][code],
                        'message': 'Validated from local cache'
                    }
            
            return {
                'valid': False,
                'message': f'Validation failed: {str(e)}'
            }
    
    def load_fallback_mappings(self) -> Dict:
        """Load common codes for offline validation."""
        return {
            'http://loinc.org': {
                '2345-7': {'concept_name': 'Glucose'},
                '718-7': {'concept_name': 'Hemoglobin'},
                # ... more common codes
            },
            'http://snomed.info/sct': {
                '38341003': {'concept_name': 'Hypertensive disorder'},
                '73211009': {'concept_name': 'Diabetes mellitus'},
                # ... more common codes
            }
        }

Support and Resources

For FHIR integration support and questions:
I