Skip to main content

EHR Integration with OMOPHub

Overview

Electronic Health Record (EHR) systems are the backbone of modern healthcare, but they often use different coding standards and vocabularies, creating data silos and interoperability challenges. OMOPHub provides comprehensive vocabulary services that enable seamless integration with major EHR platforms, ensuring consistent medical coding and improved clinical workflows. This guide demonstrates how to integrate OMOPHub with leading EHR systems including Epic®, Cerner, and Veradigm, supporting both real-time and batch processing scenarios.
Epic® is a registered trademark of Epic Systems Corporation. Veradigm is formerly known as Allscripts.

Business Problem

Healthcare organizations face significant challenges when working with EHR data:
  • Data Fragmentation: Different EHR systems use varying coding standards (ICD-9, ICD-10, SNOMED, proprietary codes)
  • Inconsistent Mapping: Manual code mapping is error-prone and time-intensive
  • Clinical Workflow Disruption: Terminology lookups interrupt clinical documentation
  • Interoperability Barriers: Difficulty sharing data between systems and organizations
  • Quality Reporting: Inconsistent codes affect quality metrics and population health analytics
  • Prior Authorization: Complex code validation delays patient care

Solution Architecture

Core Implementation

Universal EHR Integration Framework

import httpx
from typing import Dict, List, Optional, Any, Union
from dataclasses import dataclass
from datetime import datetime, timedelta
import logging
import asyncio
from abc import ABC, abstractmethod
import json

@dataclass
class PatientData:
    """Standardized patient data structure across EHR systems."""
    patient_id: str
    mrn: str
    demographics: Dict[str, Any]
    diagnoses: List[Dict[str, str]]
    procedures: List[Dict[str, str]]
    medications: List[Dict[str, str]]
    allergies: List[Dict[str, str]]
    lab_results: List[Dict[str, Any]]
    vital_signs: List[Dict[str, Any]]
    clinical_notes: List[Dict[str, str]]

@dataclass
class CodeMapping:
    """Represents a code mapping result."""
    source_code: str
    source_system: str
    target_code: str
    target_system: str
    confidence: float
    mapped_name: str

class EHRConnector(ABC):
    """Abstract base class for EHR system connectors."""
    
    @abstractmethod
    async def authenticate(self) -> bool:
        """Authenticate with the EHR system."""
        pass
    
    @abstractmethod
    async def get_patient_data(self, patient_id: str) -> PatientData:
        """Extract patient data from EHR."""
        pass
    
    @abstractmethod
    async def update_patient_codes(self, patient_id: str, mappings: List[CodeMapping]) -> bool:
        """Update patient records with mapped codes."""
        pass

class OMOPHubEHRIntegrator:
    """
    Main integration service that works with any EHR system.
    Provides vocabulary services and code standardization.
    """
    
    def __init__(self, api_key: str, base_url: str = "https://api.omophub.com"):
        self.api_key = api_key
        self.base_url = base_url
        self.headers = {"Authorization": f"Bearer {api_key}"}
        self.logger = logging.getLogger(__name__)
        self.cache = {}
        
    def standardize_diagnosis_codes(self, diagnoses: List[Dict[str, str]]) -> List[CodeMapping]:
        """
        Standardize diagnosis codes to SNOMED CT or ICD-10.
        """
        mappings = []
        
        for diagnosis in diagnoses:
            source_code = diagnosis.get('code')
            source_system = diagnosis.get('system', 'Unknown')
            
            if not source_code:
                continue
                
            # Determine target vocabulary based on source
            target_vocab = self._determine_target_vocabulary(source_system, 'diagnosis')
            
            try:
                # Search for concept in OMOPHub
                response = httpx.get(
                    f"{self.base_url}/v1/concepts/search",
                    headers=self.headers,
                    params={
                        'query': source_code,
                        'vocabulary_ids': self._map_ehr_to_omophub_vocab(source_system),
                        'standard_concept': 'S',
                        'page_size': 1
                    }
                )
                response.raise_for_status()
                
                data = response.json()
                if data['data']:
                    concept = data['data'][0]
                    
                    # Get mapping to target vocabulary
                    mapping_response = httpx.get(
                        f"{self.base_url}/v1/concepts/{concept['concept_id']}/mappings",
                        headers=self.headers,
                        params={
                            'target_vocabularies': target_vocab,
                            'relationship_types': 'Maps to'
                        }
                    )
                    mapping_response.raise_for_status()
                    
                    mapping_data = mapping_response.json()
                    if mapping_data['data']:
                        best_mapping = mapping_data['data'][0]
                        mappings.append(CodeMapping(
                            source_code=source_code,
                            source_system=source_system,
                            target_code=best_mapping['target_concept_code'],
                            target_system=target_vocab,
                            confidence=best_mapping.get('confidence', 1.0),
                            mapped_name=best_mapping['target_concept_name']
                        ))
                    else:
                        # Use the standard concept itself
                        mappings.append(CodeMapping(
                            source_code=source_code,
                            source_system=source_system,
                            target_code=concept['concept_code'],
                            target_system=concept['vocabulary_id'],
                            confidence=0.9,
                            mapped_name=concept['concept_name']
                        ))
            
            except httpx.RequestError as e:
                self.logger.error(f"Error mapping diagnosis {source_code}: {e}")
                continue
        
        return mappings
    
    def standardize_procedure_codes(self, procedures: List[Dict[str, str]]) -> List[CodeMapping]:
        """
        Standardize procedure codes to HCPCS or SNOMED.
        """
        mappings = []
        
        for procedure in procedures:
            source_code = procedure.get('code')
            source_system = procedure.get('system', 'Unknown')
            
            if not source_code:
                continue
            
            target_vocab = self._determine_target_vocabulary(source_system, 'procedure')
            
            try:
                response = httpx.get(
                    f"{self.base_url}/v1/concepts/search",
                    headers=self.headers,
                    params={
                        'query': source_code,
                        'vocabulary_ids': self._map_ehr_to_omophub_vocab(source_system),
                        'domain_id': 'Procedure',
                        'page_size': 1
                    }
                )
                response.raise_for_status()
                
                data = response.json()
                if data['data']:
                    concept = data['data'][0]
                    mappings.append(CodeMapping(
                        source_code=source_code,
                        source_system=source_system,
                        target_code=concept['concept_code'],
                        target_system=concept['vocabulary_id'],
                        confidence=0.95,
                        mapped_name=concept['concept_name']
                    ))
            
            except httpx.RequestError as e:
                self.logger.error(f"Error mapping procedure {source_code}: {e}")
                continue
        
        return mappings
    
    def standardize_medication_codes(self, medications: List[Dict[str, str]]) -> List[CodeMapping]:
        """
        Standardize medication codes to RxNorm.
        """
        mappings = []
        
        for medication in medications:
            source_code = medication.get('code')
            medication_name = medication.get('name', '')
            source_system = medication.get('system', 'Unknown')
            
            # Try code first, then name if no code
            search_term = source_code if source_code else medication_name
            if not search_term:
                continue
            
            try:
                response = httpx.get(
                    f"{self.base_url}/v1/concepts/search",
                    headers=self.headers,
                    params={
                        'query': search_term,
                        'vocabulary_ids': 'RxNorm',
                        'domain_id': 'Drug',
                        'standard_concept': 'S',
                        'page_size': 1
                    }
                )
                response.raise_for_status()
                
                data = response.json()
                if data['data']:
                    concept = data['data'][0]
                    mappings.append(CodeMapping(
                        source_code=source_code or medication_name,
                        source_system=source_system,
                        target_code=concept['concept_code'],
                        target_system='RxNorm',
                        confidence=0.9,
                        mapped_name=concept['concept_name']
                    ))
            
            except httpx.RequestError as e:
                self.logger.error(f"Error mapping medication {search_term}: {e}")
                continue
        
        return mappings
    
    def normalize_lab_results(self, lab_results: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
        """
        Normalize lab results to LOINC codes with unit standardization.
        """
        normalized_results = []
        
        for lab_result in lab_results:
            test_name = lab_result.get('test_name', '')
            test_code = lab_result.get('test_code', '')
            value = lab_result.get('value')
            unit = lab_result.get('unit', '')
            
            search_term = test_code if test_code else test_name
            if not search_term:
                continue
            
            try:
                # Find LOINC code
                response = httpx.get(
                    f"{self.base_url}/v1/concepts/search",
                    headers=self.headers,
                    params={
                        'query': search_term,
                        'vocabulary_ids': 'LOINC',
                        'domain_id': 'Measurement',
                        'page_size': 1
                    }
                )
                response.raise_for_status()
                
                data = response.json()
                if data['data']:
                    loinc_concept = data['data'][0]
                    
                    # Normalize units if needed
                    normalized_unit = self._normalize_units(unit, loinc_concept['concept_code'])
                    
                    normalized_results.append({
                        'original_test_name': test_name,
                        'original_test_code': test_code,
                        'loinc_code': loinc_concept['concept_code'],
                        'loinc_name': loinc_concept['concept_name'],
                        'value': value,
                        'original_unit': unit,
                        'normalized_unit': normalized_unit,
                        'confidence': 0.95
                    })
            
            except httpx.RequestError as e:
                self.logger.error(f"Error normalizing lab result {search_term}: {e}")
                continue
        
        return normalized_results
    
    def process_patient_data(self, patient_data: PatientData) -> Dict[str, Any]:
        """
        Comprehensive patient data processing with all vocabulary standardization.
        """
        results = {
            'patient_id': patient_data.patient_id,
            'mrn': patient_data.mrn,
            'processed_timestamp': datetime.utcnow().isoformat(),
            'diagnosis_mappings': [],
            'procedure_mappings': [],
            'medication_mappings': [],
            'normalized_lab_results': [],
            'clinical_insights': {},
            'quality_indicators': {}
        }
        
        # Process diagnoses
        results['diagnosis_mappings'] = self.standardize_diagnosis_codes(patient_data.diagnoses)
        
        # Process procedures
        results['procedure_mappings'] = self.standardize_procedure_codes(patient_data.procedures)
        
        # Process medications
        results['medication_mappings'] = self.standardize_medication_codes(patient_data.medications)
        
        # Process lab results
        results['normalized_lab_results'] = self.normalize_lab_results(patient_data.lab_results)
        
        # Generate clinical insights
        results['clinical_insights'] = self._generate_clinical_insights(results)
        
        # Calculate quality indicators
        results['quality_indicators'] = self._calculate_quality_indicators(results)
        
        return results
    
    def _determine_target_vocabulary(self, source_system: str, data_type: str) -> str:
        """Determine optimal target vocabulary based on source system and data type."""
        mapping = {
            'diagnosis': {
                'ICD9CM': 'ICD10CM',
                'ICD10CM': 'SNOMED',
                'SNOMED': 'SNOMED',
                'default': 'SNOMED'
            },
            'procedure': {
                'ICD9Proc': 'HCPCS',
                'ICD10PCS': 'HCPCS',
                'HCPCS': 'HCPCS',
                'SNOMED': 'SNOMED',
                'default': 'HCPCS'
            }
        }
        
        data_mapping = mapping.get(data_type, {})
        return data_mapping.get(source_system, data_mapping.get('default', 'SNOMED'))
    
    def _map_ehr_to_omophub_vocab(self, ehr_system: str) -> str:
        """Map EHR system vocabulary names to OMOPHub vocabulary IDs."""
        mapping = {
            'ICD9CM': 'ICD9CM',
            'ICD10CM': 'ICD10CM',
            'ICD10PCS': 'ICD10PCS',
            'SNOMED': 'SNOMED',
            'HCPCS': 'HCPCS',
            'HCPCS': 'HCPCS',
            'LOINC': 'LOINC',
            'RxNorm': 'RxNorm',
            'NDC': 'NDC'
        }
        return mapping.get(ehr_system, 'SNOMED')  # Default to SNOMED
    
    def _normalize_units(self, unit: str, loinc_code: str) -> str:
        """Normalize lab units to UCUM standard."""
        # This would typically involve a lookup table or API call
        # For demo purposes, returning the original unit
        return unit
    
    def _generate_clinical_insights(self, processed_data: Dict) -> Dict[str, Any]:
        """Generate clinical insights from processed vocabulary data."""
        insights = {
            'chronic_conditions': [],
            'drug_interactions': [],
            'preventive_care_gaps': [],
            'clinical_decision_support': []
        }
        
        # Identify chronic conditions from diagnoses
        chronic_condition_codes = ['E11', 'I10', 'J44']  # Diabetes, Hypertension, COPD
        for mapping in processed_data.get('diagnosis_mappings', []):
            if any(mapping.target_code.startswith(code) for code in chronic_condition_codes):
                insights['chronic_conditions'].append({
                    'condition': mapping.mapped_name,
                    'code': mapping.target_code,
                    'system': mapping.target_system
                })
        
        return insights
    
    def _calculate_quality_indicators(self, processed_data: Dict) -> Dict[str, Any]:
        """Calculate quality indicators based on standardized codes."""
        indicators = {
            'diabetes_control_eligible': False,
            'hypertension_control_eligible': False,
            'preventive_screening_due': [],
            'medication_adherence_concerns': []
        }
        
        # Check for diabetes
        for mapping in processed_data.get('diagnosis_mappings', []):
            if mapping.target_code.startswith('E11'):  # Type 2 diabetes
                indicators['diabetes_control_eligible'] = True
                break
        
        # Check for hypertension
        for mapping in processed_data.get('diagnosis_mappings', []):
            if mapping.target_code == 'I10':  # Essential hypertension
                indicators['hypertension_control_eligible'] = True
                break
        
        return indicators

# Epic® Integration (FHIR-based)
class EpicFHIRConnector(EHRConnector):
    """
    Epic® FHIR R4 integration connector.
    Epic® is a registered trademark of Epic Systems Corporation.
    """
    
    def __init__(self, fhir_base_url: str, client_id: str, client_secret: str):
        self.fhir_base_url = fhir_base_url
        self.client_id = client_id
        self.client_secret = client_secret
        self.access_token = None
        self.token_expires = None
    
    async def authenticate(self) -> bool:
        """Authenticate using Epic® OAuth 2.0."""
        try:
            auth_url = f"{self.fhir_base_url}/oauth2/token"
            auth_data = {
                'grant_type': 'client_credentials',
                'client_id': self.client_id,
                'client_secret': self.client_secret,
                'scope': 'system/*.read'
            }
            
            async with httpx.AsyncClient() as client:
                response = await client.post(auth_url, data=auth_data)
                response.raise_for_status()
                
                token_data = response.json()
                self.access_token = token_data['access_token']
                self.token_expires = datetime.utcnow() + timedelta(seconds=token_data['expires_in'])
            
            return True
        except Exception as e:
            logging.error(f"Epic® authentication failed: {e}")
            return False
    
    async def get_patient_data(self, patient_id: str) -> PatientData:
        """Extract patient data from Epic® FHIR APIs."""
        if not self.access_token or datetime.utcnow() >= self.token_expires:
            await self.authenticate()
        
        headers = {
            'Authorization': f'Bearer {self.access_token}',
            'Accept': 'application/fhir+json'
        }
        
        async with httpx.AsyncClient() as client:
            # Get patient demographics
            patient_response = await client.get(
                f"{self.fhir_base_url}/Patient/{patient_id}",
                headers=headers
            )
            patient_response.raise_for_status()
            patient = patient_response.json()
            
            # Get conditions (diagnoses)
            conditions_response = await client.get(
                f"{self.fhir_base_url}/Condition",
                headers=headers,
                params={'patient': patient_id}
            )
            conditions_response.raise_for_status()
            conditions = conditions_response.json()
            
            # Get procedures
            procedures_response = await client.get(
                f"{self.fhir_base_url}/Procedure",
                headers=headers,
                params={'patient': patient_id}
            )
            procedures_response.raise_for_status()
            procedures = procedures_response.json()
            
            # Get medications
            medications_response = await client.get(
                f"{self.fhir_base_url}/MedicationRequest",
                headers=headers,
                params={'patient': patient_id}
            )
            medications_response.raise_for_status()
            medications = medications_response.json()
            
            # Get lab results
            observations_response = await client.get(
                f"{self.fhir_base_url}/Observation",
                headers=headers,
                params={
                    'patient': patient_id,
                    'category': 'laboratory'
                }
            )
            observations_response.raise_for_status()
            observations = observations_response.json()
        
        # Convert FHIR resources to standardized format
        return PatientData(
            patient_id=patient_id,
            mrn=self._extract_mrn(patient),
            demographics=self._extract_demographics(patient),
            diagnoses=self._extract_diagnoses(conditions),
            procedures=self._extract_procedures(procedures),
            medications=self._extract_medications(medications),
            allergies=[],  # Would extract from AllergyIntolerance resources
            lab_results=self._extract_lab_results(observations),
            vital_signs=[],  # Would extract from vital signs observations
            clinical_notes=[]  # Would extract from DocumentReference resources
        )
    
    async def update_patient_codes(self, patient_id: str, mappings: List[CodeMapping]) -> bool:
        """Update patient records with standardized codes (if supported by Epic®)."""
        # This would typically involve creating or updating FHIR resources
        # Implementation depends on Epic® configuration and permissions
        logging.info(f"Would update patient {patient_id} with {len(mappings)} code mappings")
        return True
    
    def _extract_mrn(self, patient_fhir: Dict) -> str:
        """Extract MRN from Epic® patient resource."""
        for identifier in patient_fhir.get('identifier', []):
            if identifier.get('type', {}).get('coding', [{}])[0].get('code') == 'MR':
                return identifier.get('value', '')
        return patient_fhir.get('id', '')
    
    def _extract_demographics(self, patient_fhir: Dict) -> Dict[str, Any]:
        """Extract demographics from Epic® patient resource."""
        return {
            'name': patient_fhir.get('name', [{}])[0].get('text', ''),
            'gender': patient_fhir.get('gender', ''),
            'birth_date': patient_fhir.get('birthDate', ''),
            'address': patient_fhir.get('address', [{}])[0] if patient_fhir.get('address') else {}
        }
    
    def _extract_diagnoses(self, conditions_bundle: Dict) -> List[Dict[str, str]]:
        """Extract diagnoses from Epic® condition resources."""
        diagnoses = []
        for entry in conditions_bundle.get('entry', []):
            condition = entry.get('resource', {})
            code_info = condition.get('code', {})
            
            for coding in code_info.get('coding', []):
                diagnoses.append({
                    'code': coding.get('code', ''),
                    'system': self._map_fhir_system_to_vocab(coding.get('system', '')),
                    'display': coding.get('display', ''),
                    'clinical_status': condition.get('clinicalStatus', {}).get('coding', [{}])[0].get('code', '')
                })
        return diagnoses
    
    def _extract_procedures(self, procedures_bundle: Dict) -> List[Dict[str, str]]:
        """Extract procedures from Epic® procedure resources."""
        procedures = []
        for entry in procedures_bundle.get('entry', []):
            procedure = entry.get('resource', {})
            code_info = procedure.get('code', {})
            
            for coding in code_info.get('coding', []):
                procedures.append({
                    'code': coding.get('code', ''),
                    'system': self._map_fhir_system_to_vocab(coding.get('system', '')),
                    'display': coding.get('display', ''),
                    'performed_date': procedure.get('performedDateTime', '')
                })
        return procedures
    
    def _extract_medications(self, medications_bundle: Dict) -> List[Dict[str, str]]:
        """Extract medications from Epic® medication request resources."""
        medications = []
        for entry in medications_bundle.get('entry', []):
            med_request = entry.get('resource', {})
            medication = med_request.get('medicationCodeableConcept', {})
            
            for coding in medication.get('coding', []):
                medications.append({
                    'code': coding.get('code', ''),
                    'system': self._map_fhir_system_to_vocab(coding.get('system', '')),
                    'display': coding.get('display', ''),
                    'status': med_request.get('status', ''),
                    'intent': med_request.get('intent', '')
                })
        return medications
    
    def _extract_lab_results(self, observations_bundle: Dict) -> List[Dict[str, Any]]:
        """Extract lab results from Epic® observation resources."""
        lab_results = []
        for entry in observations_bundle.get('entry', []):
            observation = entry.get('resource', {})
            code_info = observation.get('code', {})
            
            # Find LOINC coding
            loinc_coding = None
            for coding in code_info.get('coding', []):
                if 'loinc.org' in coding.get('system', ''):
                    loinc_coding = coding
                    break
            
            if loinc_coding:
                value_info = observation.get('valueQuantity', {})
                lab_results.append({
                    'test_code': loinc_coding.get('code', ''),
                    'test_name': loinc_coding.get('display', ''),
                    'value': value_info.get('value'),
                    'unit': value_info.get('unit', ''),
                    'reference_range': observation.get('referenceRange', []),
                    'status': observation.get('status', ''),
                    'effective_date': observation.get('effectiveDateTime', '')
                })
        return lab_results
    
    def _map_fhir_system_to_vocab(self, fhir_system: str) -> str:
        """Map FHIR system URLs to vocabulary names."""
        mapping = {
            'http://snomed.info/sct': 'SNOMED',
            'http://hl7.org/fhir/sid/icd-10-cm': 'ICD10CM',
            'http://www.ama-assn.org/go/cpt': 'HCPCS',
            'http://www.nlm.nih.gov/research/umls/rxnorm': 'RxNorm',
            'http://loinc.org': 'LOINC'
        }
        return mapping.get(fhir_system, 'Unknown')

# Cerner Integration
class CernerFHIRConnector(EHRConnector):
    """Cerner PowerChart FHIR R4 integration connector."""
    
    def __init__(self, fhir_base_url: str, client_id: str, client_secret: str):
        self.fhir_base_url = fhir_base_url
        self.client_id = client_id
        self.client_secret = client_secret
        self.access_token = None
        self.token_expires = None
    
    async def authenticate(self) -> bool:
        """Authenticate using Cerner OAuth 2.0."""
        try:
            auth_url = f"{self.fhir_base_url}/authorization/oauth/token"
            auth_data = {
                'grant_type': 'client_credentials',
                'client_id': self.client_id,
                'client_secret': self.client_secret,
                'scope': 'system/Patient.read system/Observation.read system/Condition.read'
            }
            
            timeout = httpx.Timeout(15.0)
            async with httpx.AsyncClient(timeout=timeout) as client:
                response = await client.post(auth_url, data=auth_data)
                response.raise_for_status()
                
                token_data = response.json()
                self.access_token = token_data['access_token']
                self.token_expires = datetime.utcnow() + timedelta(seconds=token_data.get('expires_in', 3600))
            
            return True
        except Exception as e:
            logging.error(f"Cerner authentication failed: {e}")
            return False
    
    async def get_patient_data(self, patient_id: str) -> PatientData:
        """Extract patient data from Cerner FHIR APIs."""
        # Similar implementation to Epic® but with Cerner-specific API patterns
        if not self.access_token or (self.token_expires and datetime.utcnow() >= self.token_expires):
            await self.authenticate()
        
        headers = {
            'Authorization': f'Bearer {self.access_token}',
            'Accept': 'application/json+fhir'
        }
        
        # Implementation would follow Cerner FHIR API documentation
        # For brevity, returning a placeholder
        return PatientData(
            patient_id=patient_id,
            mrn=f"CERNER_{patient_id}",
            demographics={},
            diagnoses=[],
            procedures=[],
            medications=[],
            allergies=[],
            lab_results=[],
            vital_signs=[],
            clinical_notes=[]
        )
    
    async def update_patient_codes(self, patient_id: str, mappings: List[CodeMapping]) -> bool:
        """Update patient records in Cerner system."""
        logging.info(f"Would update Cerner patient {patient_id} with {len(mappings)} mappings")
        return True

# Veradigm (formerly Allscripts) Integration  
class VeradigmConnector(EHRConnector):
    """
    Veradigm (formerly Allscripts) integration connector.
    Supports both Unity API and Developer API.
    """
    
    def __init__(self, api_base_url: str, username: str, password: str, app_name: str):
        self.api_base_url = api_base_url
        self.username = username
        self.password = password
        self.app_name = app_name
        self.security_token = None
        self.app_user_id = None
    
    async def authenticate(self) -> bool:
        """Authenticate using Veradigm Unity API."""
        try:
            auth_url = f"{self.api_base_url}/Unity/UnityService.svc/json/GetSecurityToken"
            auth_data = {
                'username': self.username,
                'password': self.password,
                'appname': self.app_name
            }
            
            async with httpx.AsyncClient() as client:
                response = await client.post(auth_url, json=auth_data)
                response.raise_for_status()
                
                auth_result = response.json()
                self.security_token = auth_result.get('Token')
                self.app_user_id = auth_result.get('AppUserID')
            
            return self.security_token is not None
        except Exception as e:
            logging.error(f"Veradigm authentication failed: {e}")
            return False
    
    async def get_patient_data(self, patient_id: str) -> PatientData:
        """Extract patient data from Veradigm Unity API."""
        if not self.security_token:
            await self.authenticate()
        
        headers = {
            'Content-Type': 'application/json'
        }
        
        # Get patient demographics
        patient_url = f"{self.api_base_url}/Unity/UnityService.svc/json/GetPatient"
        patient_params = {
            'token': self.security_token,
            'appUserID': self.app_user_id,
            'patientID': patient_id
        }
        
        async with httpx.AsyncClient() as client:
            patient_response = await client.get(patient_url, headers=headers, params=patient_params)
            patient_response.raise_for_status()
            patient_data = patient_response.json()
            
            # Get patient problems (diagnoses)
            problems_url = f"{self.api_base_url}/Unity/UnityService.svc/json/GetPatientProblems"
            problems_response = await client.get(problems_url, headers=headers, params=patient_params)
            problems_response.raise_for_status()
            problems_data = problems_response.json()
            
            # Get medications
            meds_url = f"{self.api_base_url}/Unity/UnityService.svc/json/GetPatientMedications"
            meds_response = await client.get(meds_url, headers=headers, params=patient_params)
            meds_response.raise_for_status()
            medications_data = meds_response.json()
        
        return PatientData(
            patient_id=patient_id,
            mrn=patient_data.get('MRN', ''),
            demographics=self._extract_veradigm_demographics(patient_data),
            diagnoses=self._extract_veradigm_problems(problems_data),
            procedures=[],  # Would need separate API call
            medications=self._extract_veradigm_medications(medications_data),
            allergies=[],  # Would need separate API call
            lab_results=[],  # Would need separate API call
            vital_signs=[],  # Would need separate API call
            clinical_notes=[]
        )
    
    async def update_patient_codes(self, patient_id: str, mappings: List[CodeMapping]) -> bool:
        """Update patient records in Veradigm system."""
        logging.info(f"Would update Veradigm patient {patient_id} with {len(mappings)} mappings")
        return True
    
    def _extract_veradigm_demographics(self, patient_data: Dict) -> Dict[str, Any]:
        """Extract demographics from Veradigm patient data."""
        return {
            'name': f"{patient_data.get('FirstName', '')} {patient_data.get('LastName', '')}",
            'gender': patient_data.get('Sex', ''),
            'birth_date': patient_data.get('DOB', ''),
            'address': {
                'street': patient_data.get('Address1', ''),
                'city': patient_data.get('City', ''),
                'state': patient_data.get('State', ''),
                'zip': patient_data.get('Zip', '')
            }
        }
    
    def _extract_veradigm_problems(self, problems_data: Dict) -> List[Dict[str, str]]:
        """Extract diagnoses from Veradigm patient problems."""
        diagnoses = []
        for problem in problems_data.get('Problems', []):
            diagnoses.append({
                'code': problem.get('DiagnosisCode', ''),
                'system': 'ICD10CM',  # Assuming ICD-10
                'display': problem.get('ProblemDescription', ''),
                'status': problem.get('ProblemStatus', '')
            })
        return diagnoses
    
    def _extract_veradigm_medications(self, medications_data: Dict) -> List[Dict[str, str]]:
        """Extract medications from Veradigm medications data."""
        medications = []
        for med in medications_data.get('Medications', []):
            medications.append({
                'name': med.get('MedicationName', ''),
                'code': med.get('NDC', ''),
                'system': 'NDC',
                'display': med.get('MedicationName', ''),
                'status': med.get('MedicationStatus', '')
            })
        return medications

# Main Integration Orchestrator
class EHRIntegrationOrchestrator:
    """
    Orchestrates integration across multiple EHR systems.
    Provides unified interface for vocabulary standardization.
    """
    
    def __init__(self, omophub_api_key: str):
        self.omophub_integrator = OMOPHubEHRIntegrator(omophub_api_key)
        self.ehr_connectors = {}
        self.logger = logging.getLogger(__name__)
    
    def register_ehr_connector(self, ehr_name: str, connector: EHRConnector):
        """Register an EHR system connector."""
        self.ehr_connectors[ehr_name] = connector
    
    async def process_patient(self, ehr_name: str, patient_id: str) -> Dict[str, Any]:
        """
        Process a patient's data from specified EHR system.
        Returns standardized vocabulary mappings and clinical insights.
        """
        if ehr_name not in self.ehr_connectors:
            raise ValueError(f"EHR system '{ehr_name}' not registered")
        
        connector = self.ehr_connectors[ehr_name]
        
        try:
            # Extract patient data from EHR
            self.logger.info(f"Extracting patient {patient_id} from {ehr_name}")
            patient_data = await connector.get_patient_data(patient_id)
            
            # Process with OMOPHub vocabulary services
            self.logger.info(f"Processing vocabulary standardization for patient {patient_id}")
            processed_data = self.omophub_integrator.process_patient_data(patient_data)
            
            # Optionally update EHR with standardized codes
            if hasattr(connector, 'update_patient_codes'):
                all_mappings = (
                    processed_data['diagnosis_mappings'] +
                    processed_data['procedure_mappings'] +
                    processed_data['medication_mappings']
                )
                await connector.update_patient_codes(patient_id, all_mappings)
            
            return {
                'ehr_system': ehr_name,
                'processing_timestamp': datetime.utcnow().isoformat(),
                'patient_data': processed_data,
                'status': 'success'
            }
            
        except Exception as e:
            self.logger.error(f"Error processing patient {patient_id} from {ehr_name}: {e}")
            return {
                'ehr_system': ehr_name,
                'patient_id': patient_id,
                'error': str(e),
                'status': 'failed'
            }
    
    async def batch_process_patients(self, ehr_name: str, patient_ids: List[str]) -> List[Dict[str, Any]]:
        """Process multiple patients efficiently."""
        results = []
        
        # Process in batches of 10 to avoid overwhelming APIs
        batch_size = 10
        for i in range(0, len(patient_ids), batch_size):
            batch = patient_ids[i:i + batch_size]
            
            batch_tasks = [
                self.process_patient(ehr_name, patient_id)
                for patient_id in batch
            ]
            
            batch_results = await asyncio.gather(*batch_tasks, return_exceptions=True)
            results.extend(batch_results)
            
            # Add delay between batches to respect API rate limits
            await asyncio.sleep(1)
        
        return results

# Example usage
async def main():
    # Initialize the orchestrator
    orchestrator = EHRIntegrationOrchestrator("your-omophub-api-key")
    
    # Register EHR connectors
    epic_connector = EpicFHIRConnector(
        fhir_base_url="https://fhir.epic.com/interconnect-fhir-oauth/api/FHIR/R4",
        client_id="your-epic-client-id",
        client_secret="your-epic-client-secret"
    )
    orchestrator.register_ehr_connector("epic", epic_connector)
    
    cerner_connector = CernerFHIRConnector(
        fhir_base_url="https://fhir-open.cerner.com/r4",
        client_id="your-cerner-client-id", 
        client_secret="your-cerner-client-secret"
    )
    orchestrator.register_ehr_connector("cerner", cerner_connector)
    
    veradigm_connector = VeradigmConnector(
        api_base_url="https://unity.developer.allscripts.com",
        username="your-username",
        password="your-password",
        app_name="your-app-name"
    )
    orchestrator.register_ehr_connector("veradigm", veradigm_connector)
    
    # Process a patient from Epic®
    result = await orchestrator.process_patient("epic", "patient-123")
    print(json.dumps(result, indent=2))
    
    # Batch process multiple patients
    patient_ids = ["patient-123", "patient-456", "patient-789"]
    batch_results = await orchestrator.batch_process_patients("epic", patient_ids)
    
    print(f"Processed {len(batch_results)} patients")
    for result in batch_results:
        if result['status'] == 'success':
            patient_data = result['patient_data']
            print(f"Patient {patient_data['patient_id']}: "
                  f"{len(patient_data['diagnosis_mappings'])} diagnoses mapped, "
                  f"{len(patient_data['procedure_mappings'])} procedures mapped")

if __name__ == "__main__":
    asyncio.run(main())

JavaScript/Node.js Implementation

const axios = require('axios');
const { promisify } = require('util');

class OMOPHubEHRIntegrator {
  constructor(apiKey, baseUrl = 'https://api.omophub.com') {
    this.apiKey = apiKey;
    this.baseUrl = baseUrl;
    this.headers = { 'Authorization': `Bearer ${apiKey}` };
  }

  async standardizeDiagnosisCodes(diagnoses) {
    const mappings = [];
    
    for (const diagnosis of diagnoses) {
      const sourceCode = diagnosis.code;
      const sourceSystem = diagnosis.system || 'Unknown';
      
      if (!sourceCode) continue;
      
      try {
        const response = await axios.get(`${this.baseUrl}/v1/concepts/search`, {
          headers: this.headers,
          params: {
            query: sourceCode,
            vocabulary_ids: this.mapEHRToOMOPHubVocab(sourceSystem),
            standard_concept: 'S',
            page_size: 1
          }
        });
        
        if (response.data.data && response.data.data.length > 0) {
          const concept = response.data.data[0];
          
          // Get mapping to target vocabulary
          const mappingResponse = await axios.get(
            `${this.baseUrl}/v1/concepts/${concept.concept_id}/mappings`,
            {
              headers: this.headers,
              params: {
                target_vocabularies: 'SNOMED',
                relationship_types: 'Maps to'
              }
            }
          );
          
          if (mappingResponse.data.data && mappingResponse.data.data.length > 0) {
            const bestMapping = mappingResponse.data.data[0];
            mappings.push({
              sourceCode,
              sourceSystem,
              targetCode: bestMapping.target_concept_code,
              targetSystem: 'SNOMED',
              confidence: bestMapping.confidence || 1.0,
              mappedName: bestMapping.target_concept_name
            });
          }
        }
      } catch (error) {
        console.error(`Error mapping diagnosis ${sourceCode}:`, error.message);
      }
    }
    
    return mappings;
  }

  async processPatientData(patientData) {
    const results = {
      patientId: patientData.patient_id,
      mrn: patientData.mrn,
      processedTimestamp: new Date().toISOString(),
      diagnosisMappings: [],
      procedureMappings: [],
      medicationMappings: [],
      normalizedLabResults: [],
      clinicalInsights: {},
      qualityIndicators: {}
    };

    // Process diagnoses
    results.diagnosisMappings = await this.standardizeDiagnosisCodes(patientData.diagnoses || []);
    
    // Process procedures
    results.procedureMappings = await this.standardizeProcedureCodes(patientData.procedures || []);
    
    // Process medications  
    results.medicationMappings = await this.standardizeMedicationCodes(patientData.medications || []);
    
    // Generate insights
    results.clinicalInsights = this.generateClinicalInsights(results);
    results.qualityIndicators = this.calculateQualityIndicators(results);
    
    return results;
  }

  mapEHRToOMOPHubVocab(ehrSystem) {
    const mapping = {
      'ICD9CM': 'ICD9CM',
      'ICD10CM': 'ICD10CM',
      'SNOMED': 'SNOMED',
      'HCPCS': 'HCPCS',
      'LOINC': 'LOINC',
      'RxNorm': 'RxNorm'
    };
    return mapping[ehrSystem] || 'SNOMED';
  }

  generateClinicalInsights(processedData) {
    const insights = {
      chronicConditions: [],
      drugInteractions: [],
      preventiveCareGaps: []
    };

    // Identify chronic conditions
    const chronicCodes = ['E11', 'I10', 'J44']; // Diabetes, Hypertension, COPD
    for (const mapping of processedData.diagnosisMappings) {
      if (chronicCodes.some(code => mapping.targetCode.startsWith(code))) {
        insights.chronicConditions.push({
          condition: mapping.mappedName,
          code: mapping.targetCode,
          system: mapping.targetSystem
        });
      }
    }

    return insights;
  }

  calculateQualityIndicators(processedData) {
    const indicators = {
      diabetesControlEligible: false,
      hypertensionControlEligible: false,
      preventiveScreeningDue: []
    };

    // Check for diabetes
    for (const mapping of processedData.diagnosisMappings) {
      if (mapping.targetCode.startsWith('E11')) {
        indicators.diabetesControlEligible = true;
        break;
      }
    }

    return indicators;
  }
}

// Epic® FHIR Connector
class EpicFHIRConnector {
  constructor(fhirBaseUrl, clientId, clientSecret) {
    this.fhirBaseUrl = fhirBaseUrl;
    this.clientId = clientId;
    this.clientSecret = clientSecret;
    this.accessToken = null;
    this.tokenExpires = null;
  }

  async authenticate() {
    try {
      const authUrl = `${this.fhirBaseUrl}/oauth2/token`;
      const authData = {
        grant_type: 'client_credentials',
        client_id: this.clientId,
        client_secret: this.clientSecret,
        scope: 'system/*.read'
      };

      // URL-encode the auth payload for OAuth2 token endpoint
      const formData = new URLSearchParams(authData).toString();
      
      const response = await axios.post(authUrl, formData, {
        headers: {
          'Content-Type': 'application/x-www-form-urlencoded'
        }
      });
      const tokenData = response.data;
      
      this.accessToken = tokenData.access_token;
      this.tokenExpires = new Date(Date.now() + (tokenData.expires_in * 1000));
      
      return true;
    } catch (error) {
      console.error('Epic® authentication failed:', error.message);
      return false;
    }
  }

  async getPatientData(patientId) {
    if (!this.accessToken || new Date() >= this.tokenExpires) {
      await this.authenticate();
    }

    const headers = {
      'Authorization': `Bearer ${this.accessToken}`,
      'Accept': 'application/fhir+json'
    };

    try {
      // Get patient demographics
      const patientResponse = await axios.get(
        `${this.fhirBaseUrl}/Patient/${patientId}`,
        { headers }
      );
      const patient = patientResponse.data;

      // Get conditions
      const conditionsResponse = await axios.get(
        `${this.fhirBaseUrl}/Condition`,
        { 
          headers,
          params: { patient: patientId }
        }
      );
      const conditions = conditionsResponse.data;

      // Get medications
      const medicationsResponse = await axios.get(
        `${this.fhirBaseUrl}/MedicationRequest`,
        {
          headers,
          params: { patient: patientId }
        }
      );
      const medications = medicationsResponse.data;

      return {
        patient_id: patientId,
        mrn: this.extractMRN(patient),
        demographics: this.extractDemographics(patient),
        diagnoses: this.extractDiagnoses(conditions),
        procedures: [], // Would extract from Procedure resources
        medications: this.extractMedications(medications),
        allergies: [],
        lab_results: [],
        vital_signs: [],
        clinical_notes: []
      };
    } catch (error) {
      console.error(`Error extracting patient data from Epic®:`, error.message);
      throw error;
    }
  }

  extractMRN(patient) {
    const identifiers = patient.identifier || [];
    for (const identifier of identifiers) {
      const coding = identifier.type?.coding?.[0];
      if (coding?.code === 'MR') {
        return identifier.value || '';
      }
    }
    return patient.id || '';
  }

  extractDemographics(patient) {
    return {
      name: patient.name?.[0]?.text || '',
      gender: patient.gender || '',
      birth_date: patient.birthDate || '',
      address: patient.address?.[0] || {}
    };
  }

  extractDiagnoses(conditionsBundle) {
    const diagnoses = [];
    
    for (const entry of conditionsBundle.entry || []) {
      const condition = entry.resource || {};
      const codeInfo = condition.code || {};
      
      for (const coding of codeInfo.coding || []) {
        diagnoses.push({
          code: coding.code || '',
          system: this.mapFHIRSystemToVocab(coding.system || ''),
          display: coding.display || '',
          clinical_status: condition.clinicalStatus?.coding?.[0]?.code || ''
        });
      }
    }
    
    return diagnoses;
  }

  extractMedications(medicationsBundle) {
    const medications = [];
    
    for (const entry of medicationsBundle.entry || []) {
      const medRequest = entry.resource || {};
      const medication = medRequest.medicationCodeableConcept || {};
      
      for (const coding of medication.coding || []) {
        medications.push({
          code: coding.code || '',
          system: this.mapFHIRSystemToVocab(coding.system || ''),
          display: coding.display || '',
          status: medRequest.status || '',
          intent: medRequest.intent || ''
        });
      }
    }
    
    return medications;
  }

  mapFHIRSystemToVocab(fhirSystem) {
    const mapping = {
      'http://snomed.info/sct': 'SNOMED',
      'http://hl7.org/fhir/sid/icd-10-cm': 'ICD10CM',
      'http://www.ama-assn.org/go/cpt': 'HCPCS',
      'http://www.nlm.nih.gov/research/umls/rxnorm': 'RxNorm',
      'http://loinc.org': 'LOINC'
    };
    return mapping[fhirSystem] || 'Unknown';
  }
}

// Integration Orchestrator
class EHRIntegrationOrchestrator {
  constructor(omophubApiKey) {
    this.OMOPHUBIntegrator = new OMOPHubEHRIntegrator(omophubApiKey);
    this.ehrConnectors = new Map();
  }

  registerEHRConnector(ehrName, connector) {
    this.ehrConnectors.set(ehrName, connector);
  }

  async processPatient(ehrName, patientId) {
    if (!this.ehrConnectors.has(ehrName)) {
      throw new Error(`EHR system '${ehrName}' not registered`);
    }

    const connector = this.ehrConnectors.get(ehrName);

    try {
      console.log(`Extracting patient ${patientId} from ${ehrName}`);
      const patientData = await connector.getPatientData(patientId);

      console.log(`Processing vocabulary standardization for patient ${patientId}`);
      const processedData = await this.OMOPHUBIntegrator.processPatientData(patientData);

      return {
        ehrSystem: ehrName,
        processingTimestamp: new Date().toISOString(),
        patientData: processedData,
        status: 'success'
      };
    } catch (error) {
      console.error(`Error processing patient ${patientId} from ${ehrName}:`, error.message);
      return {
        ehrSystem: ehrName,
        patientId,
        error: error.message,
        status: 'failed'
      };
    }
  }

  async batchProcessPatients(ehrName, patientIds, batchSize = 10) {
    const results = [];
    
    for (let i = 0; i < patientIds.length; i += batchSize) {
      const batch = patientIds.slice(i, i + batchSize);
      
      const batchPromises = batch.map(patientId =>
        this.processPatient(ehrName, patientId)
      );
      
      const batchResults = await Promise.allSettled(batchPromises);
      results.push(...batchResults.map(result => result.value || result.reason));
      
      // Add delay between batches
      if (i + batchSize < patientIds.length) {
        await new Promise(resolve => setTimeout(resolve, 1000));
      }
    }
    
    return results;
  }
}

// Example usage
async function main() {
  const orchestrator = new EHRIntegrationOrchestrator('your-omophub-api-key');
  
  // Register Epic® connector
  const epicConnector = new EpicFHIRConnector(
    'https://fhir.epic.com/interconnect-fhir-oauth/api/FHIR/R4',
    'your-epic-client-id',
    'your-epic-client-secret'
  );
  orchestrator.registerEHRConnector('epic', epicConnector);
  
  try {
    // Process a single patient
    const result = await orchestrator.processPatient('epic', 'patient-123');
    console.log(JSON.stringify(result, null, 2));
    
    // Batch process multiple patients
    const patientIds = ['patient-123', 'patient-456', 'patient-789'];
    const batchResults = await orchestrator.batchProcessPatients('epic', patientIds);
    
    console.log(`Processed ${batchResults.length} patients`);
    batchResults.forEach(result => {
      if (result.status === 'success') {
        const patientData = result.patientData;
        console.log(`Patient ${patientData.patientId}: ${patientData.diagnosisMappings.length} diagnoses mapped`);
      }
    });
  } catch (error) {
    console.error('Integration error:', error.message);
  }
}

if (require.main === module) {
  main();
}

module.exports = {
  OMOPHubEHRIntegrator,
  EpicFHIRConnector,
  EHRIntegrationOrchestrator
};

Clinical Workflow Integration Patterns

Real-time Decision Support

class ClinicalDecisionSupport:
    """
    Real-time clinical decision support using OMOPHub vocabulary services.
    Integrates with EHR workflow to provide point-of-care alerts and suggestions.
    """
    
    def __init__(self, omophub_integrator: OMOPHubEHRIntegrator):
        self.omophub = omophub_integrator
        self.clinical_rules = self._load_clinical_rules()
    
    async def evaluate_patient_context(self, patient_data: PatientData) -> Dict[str, Any]:
        """
        Evaluate patient context for clinical decision support.
        Called during clinical documentation or order entry.
        """
        alerts = []
        suggestions = []
        
        # Process current patient data
        processed_data = self.omophub.process_patient_data(patient_data)
        
        # Check for drug interactions
        drug_alerts = await self._check_drug_interactions(processed_data['medication_mappings'])
        alerts.extend(drug_alerts)
        
        # Check for allergy conflicts
        allergy_alerts = await self._check_allergy_conflicts(
            processed_data['medication_mappings'],
            patient_data.allergies
        )
        alerts.extend(allergy_alerts)
        
        # Generate preventive care suggestions
        preventive_suggestions = await self._generate_preventive_care_suggestions(processed_data)
        suggestions.extend(preventive_suggestions)
        
        # Check for quality measure opportunities
        quality_suggestions = await self._check_quality_measures(processed_data)
        suggestions.extend(quality_suggestions)
        
        return {
            'alerts': alerts,
            'suggestions': suggestions,
            'patient_summary': self._generate_patient_summary(processed_data),
            'risk_scores': self._calculate_risk_scores(processed_data)
        }
    
    async def _check_drug_interactions(self, medications: List[CodeMapping]) -> List[Dict]:
        """Check for potential drug-drug interactions using RxNorm codes."""
        interactions = []
        
        # Get RxNorm codes
        rxnorm_codes = [med.target_code for med in medications if med.target_system == 'RxNorm']
        
        # Check pairwise interactions
        for i in range(len(rxnorm_codes)):
            for j in range(i + 1, len(rxnorm_codes)):
                # This would typically call a drug interaction API
                # For demo purposes, flagging based on code patterns
                if self._has_potential_interaction(rxnorm_codes[i], rxnorm_codes[j]):
                    interactions.append({
                        'type': 'drug_interaction',
                        'severity': 'moderate',
                        'message': f'Potential interaction between {rxnorm_codes[i]} and {rxnorm_codes[j]}',
                        'drug1': rxnorm_codes[i],
                        'drug2': rxnorm_codes[j],
                        'recommendation': 'Monitor patient closely'
                    })
        
        return interactions
    
    async def _check_allergy_conflicts(self, medications: List[CodeMapping], allergies: List[Dict]) -> List[Dict]:
        """Check medications against patient allergies."""
        conflicts = []
        
        for medication in medications:
            for allergy in allergies:
                # Check if medication could trigger allergy
                if self._medication_triggers_allergy(medication, allergy):
                    conflicts.append({
                        'type': 'allergy_conflict',
                        'severity': 'high',
                        'message': f'Medication {medication.mapped_name} may trigger {allergy.get("allergen", "")} allergy',
                        'medication': medication.mapped_name,
                        'allergen': allergy.get('allergen', ''),
                        'recommendation': 'Consider alternative medication'
                    })
        
        return conflicts
    
    async def _generate_preventive_care_suggestions(self, processed_data: Dict) -> List[Dict]:
        """Generate preventive care suggestions based on patient conditions."""
        suggestions = []
        
        # Check for diabetes management
        if processed_data['quality_indicators'].get('diabetes_control_eligible'):
            suggestions.append({
                'type': 'preventive_care',
                'category': 'diabetes_management',
                'message': 'Patient eligible for HbA1c monitoring',
                'recommendation': 'Schedule HbA1c test within 3 months',
                'priority': 'medium'
            })
        
        # Check for hypertension management
        if processed_data['quality_indicators'].get('hypertension_control_eligible'):
            suggestions.append({
                'type': 'preventive_care',
                'category': 'hypertension_management', 
                'message': 'Patient eligible for blood pressure monitoring',
                'recommendation': 'Monitor blood pressure regularly',
                'priority': 'medium'
            })
        
        return suggestions
    
    def _has_potential_interaction(self, drug1: str, drug2: str) -> bool:
        """Simple drug interaction check (would be more sophisticated in production)."""
        # Example: ACE inhibitors and potassium supplements
        ace_inhibitors = ['lisinopril', 'enalapril', 'captopril']
        potassium_supplements = ['potassium_chloride']
        
        return (any(ace in drug1.lower() for ace in ace_inhibitors) and
                any(pot in drug2.lower() for pot in potassium_supplements))
    
    def _medication_triggers_allergy(self, medication: CodeMapping, allergy: Dict) -> bool:
        """Check if medication might trigger patient allergy."""
        allergen = allergy.get('allergen', '').lower()
        medication_name = medication.mapped_name.lower()
        
        # Simple string matching - production would use more sophisticated methods
        return allergen in medication_name or medication_name in allergen

# Workflow Integration Example
class EHRWorkflowIntegration:
    """
    Integrates vocabulary services into clinical workflows.
    Provides hooks for common EHR workflow events.
    """
    
    def __init__(self, orchestrator: EHRIntegrationOrchestrator):
        self.orchestrator = orchestrator
        self.decision_support = ClinicalDecisionSupport(orchestrator.omophub_integrator)
    
    async def on_patient_chart_open(self, ehr_name: str, patient_id: str) -> Dict[str, Any]:
        """
        Called when clinician opens patient chart.
        Pre-loads standardized vocabulary data and clinical insights.
        """
        try:
            # Process patient data in background
            result = await self.orchestrator.process_patient(ehr_name, patient_id)
            
            if result['status'] == 'success':
                # Generate clinical decision support
                patient_raw_data = result['patient_data']  # Would need actual PatientData object
                # cds_result = await self.decision_support.evaluate_patient_context(patient_raw_data)
                
                return {
                    'patient_summary': {
                        'total_diagnoses': len(result['patient_data']['diagnosis_mappings']),
                        'chronic_conditions': len(result['patient_data']['clinical_insights'].get('chronic_conditions', [])),
                        'active_medications': len(result['patient_data']['medication_mappings'])
                    },
                    'standardized_data_ready': True,
                    'last_updated': result['processing_timestamp']
                }
            else:
                return {'error': result.get('error'), 'standardized_data_ready': False}
                
        except Exception as e:
            return {'error': str(e), 'standardized_data_ready': False}
    
    async def on_diagnosis_entry(self, diagnosis_text: str, ehr_name: str, patient_id: str) -> Dict[str, Any]:
        """
        Called when clinician enters diagnosis.
        Provides real-time vocabulary suggestions and validation.
        """
        suggestions = []
        
        try:
            # Search for matching concepts
            async with httpx.AsyncClient() as client:
                response = await client.get(
                    f"{self.orchestrator.omophub_integrator.base_url}/v1/concepts/search",
                    headers=self.orchestrator.omophub_integrator.headers,
                    params={
                        'query': diagnosis_text,
                        'domain_id': 'Condition',
                        'standard_concept': 'S',
                        'page_size': 5
                    }
                )
                response.raise_for_status()
                
                data = response.json()
                for concept in data.get('data', []):
                    suggestions.append({
                        'concept_id': concept['concept_id'],
                        'concept_code': concept['concept_code'],
                        'concept_name': concept['concept_name'],
                        'vocabulary_id': concept['vocabulary_id'],
                        'confidence': 0.9  # Would be calculated based on text similarity
                    })
        
        except Exception as e:
            return {'error': str(e), 'suggestions': []}
        
        return {
            'suggestions': suggestions,
            'auto_coding_available': len(suggestions) > 0
        }
    
    async def on_medication_order(self, medication_name: str, patient_id: str, ehr_name: str) -> Dict[str, Any]:
        """
        Called when clinician orders medication.
        Provides safety checks and standardized coding.
        """
        try:
            # Get patient data for safety checks
            patient_result = await self.orchestrator.process_patient(ehr_name, patient_id)
            
            if patient_result['status'] != 'success':
                return {'error': 'Unable to retrieve patient data for safety checks'}
            
            # Find RxNorm code for medication
            async with httpx.AsyncClient() as client:
                response = await client.get(
                    f"{self.orchestrator.omophub_integrator.base_url}/v1/concepts/search",
                    headers=self.orchestrator.omophub_integrator.headers,
                    params={
                        'query': medication_name,
                        'vocabulary_ids': 'RxNorm',
                        'domain_id': 'Drug',
                        'page_size': 1
                    }
                )
                response.raise_for_status()
                
                data = response.json()
            if not data.get('data'):
                return {'error': 'Medication not found in RxNorm', 'safety_alerts': []}
            
            rxnorm_concept = data['data'][0]
            
            # Simulate safety checks (would be more comprehensive in production)
            safety_alerts = []
            
            # Check against patient allergies
            patient_data = patient_result['patient_data']
            # Would perform actual allergy checking here
            
            return {
                'rxnorm_code': rxnorm_concept['concept_code'],
                'rxnorm_name': rxnorm_concept['concept_name'],
                'safety_alerts': safety_alerts,
                'coding_confidence': 0.95
            }
            
        except Exception as e:
            return {'error': str(e)}

Performance and Monitoring

Integration Health Monitoring

class EHRIntegrationMonitor:
    """Monitor EHR integration health and performance."""
    
    def __init__(self, orchestrator: EHRIntegrationOrchestrator):
        self.orchestrator = orchestrator
        self.metrics = {}
        self.health_checks = {}
    
    async def run_health_checks(self) -> Dict[str, Any]:
        """Run comprehensive health checks on all EHR connections."""
        results = {}
        
        for ehr_name, connector in self.orchestrator.ehr_connectors.items():
            try:
                # Test authentication
                auth_success = await connector.authenticate()
                
                # Test basic API connectivity
                if hasattr(connector, 'test_connection'):
                    api_success = await connector.test_connection()
                else:
                    api_success = True  # Assume working if no test method
                
                results[ehr_name] = {
                    'authentication': 'pass' if auth_success else 'fail',
                    'api_connectivity': 'pass' if api_success else 'fail',
                    'overall_status': 'healthy' if auth_success and api_success else 'unhealthy',
                    'last_checked': datetime.utcnow().isoformat()
                }
            
            except Exception as e:
                results[ehr_name] = {
                    'error': str(e),
                    'overall_status': 'error',
                    'last_checked': datetime.utcnow().isoformat()
                }
        
        return results
    
    def track_processing_metrics(self, ehr_name: str, processing_time: float, success: bool):
        """Track performance metrics for processing operations."""
        if ehr_name not in self.metrics:
            self.metrics[ehr_name] = {
                'total_requests': 0,
                'successful_requests': 0,
                'failed_requests': 0,
                'average_processing_time': 0,
                'processing_times': []
            }
        
        metrics = self.metrics[ehr_name]
        metrics['total_requests'] += 1
        
        if success:
            metrics['successful_requests'] += 1
        else:
            metrics['failed_requests'] += 1
        
        metrics['processing_times'].append(processing_time)
        if len(metrics['processing_times']) > 100:  # Keep only last 100 times
            metrics['processing_times'] = metrics['processing_times'][-100:]
        
        metrics['average_processing_time'] = sum(metrics['processing_times']) / len(metrics['processing_times'])
    
    def get_performance_report(self) -> Dict[str, Any]:
        """Generate performance report for all EHR systems."""
        report = {
            'generated_at': datetime.utcnow().isoformat(),
            'ehr_systems': {}
        }
        
        for ehr_name, metrics in self.metrics.items():
            success_rate = (metrics['successful_requests'] / metrics['total_requests'] * 100) if metrics['total_requests'] > 0 else 0
            
            report['ehr_systems'][ehr_name] = {
                'total_requests': metrics['total_requests'],
                'success_rate': round(success_rate, 2),
                'average_processing_time': round(metrics['average_processing_time'], 3),
                'status': 'healthy' if success_rate > 95 else 'warning' if success_rate > 80 else 'critical'
            }
        
        return report

Security and Compliance

HIPAA-Compliant Integration

class HIPAACompliantEHRIntegrator(OMOPHubEHRIntegrator):
    """
    HIPAA-compliant version of EHR integrator with enhanced security and audit logging.
    """
    
    def __init__(self, api_key: str, base_url: str = "https://api.omophub.com"):
        super().__init__(api_key, base_url)
        self.audit_logger = self._setup_audit_logger()
        self.encryption_key = self._load_encryption_key()
    
    def process_patient_data(self, patient_data: PatientData) -> Dict[str, Any]:
        """Process patient data with full HIPAA audit trail."""
        audit_id = self._generate_audit_id()
        
        try:
            # Log access attempt
            self.audit_logger.info({
                'audit_id': audit_id,
                'action': 'patient_data_processing_start',
                'patient_id': patient_data.patient_id,
                'timestamp': datetime.utcnow().isoformat(),
                'user_context': self._get_user_context()
            })
            
            # Sanitize PHI before processing
            sanitized_data = self._sanitize_phi(patient_data)
            
            # Process with standard method
            result = super().process_patient_data(sanitized_data)
            
            # Log successful completion
            self.audit_logger.info({
                'audit_id': audit_id,
                'action': 'patient_data_processing_complete',
                'status': 'success',
                'records_processed': {
                    'diagnoses': len(result['diagnosis_mappings']),
                    'procedures': len(result['procedure_mappings']),
                    'medications': len(result['medication_mappings'])
                },
                'timestamp': datetime.utcnow().isoformat()
            })
            
            return result
            
        except Exception as e:
            # Log error
            self.audit_logger.error({
                'audit_id': audit_id,
                'action': 'patient_data_processing_error',
                'error': str(e),
                'timestamp': datetime.utcnow().isoformat()
            })
            raise
    
    def _setup_audit_logger(self):
        """Setup HIPAA-compliant audit logging."""
        audit_logger = logging.getLogger('hipaa_ehr_audit')
        audit_logger.setLevel(logging.INFO)
        
        # Configure secure, encrypted log storage
        handler = logging.FileHandler('/var/log/hipaa/ehr_integration_audit.log')
        formatter = logging.Formatter(
            '%(asctime)s - %(levelname)s - %(message)s'
        )
        handler.setFormatter(formatter)
        audit_logger.addHandler(handler)
        
        return audit_logger
    
    def _sanitize_phi(self, patient_data: PatientData) -> PatientData:
        """Remove or mask PHI from patient data before processing."""
        # Implementation would depend on specific PHI requirements
        # For demo purposes, returning original data
        return patient_data
    
    def _generate_audit_id(self) -> str:
        """Generate unique audit ID for tracking."""
        import uuid
        return str(uuid.uuid4())
    
    def _get_user_context(self) -> Dict[str, str]:
        """Get current user context for audit logging."""
        return {
            'user_id': 'system',  # Would be actual user ID
            'session_id': 'session_123',  # Would be actual session
            'ip_address': '127.0.0.1'  # Would be actual IP
        }
    
    def _load_encryption_key(self) -> bytes:
        """Load encryption key for sensitive data."""
        # Would load from secure key management system
        return b'demo_key_would_be_from_key_management_system'

Best Practices and Recommendations

Configuration Management

class EHRIntegrationConfig:
    """Centralized configuration management for EHR integrations."""
    
    def __init__(self):
        self.config = {
            'omophub': {
                'base_url': 'https://api.omophub.com',
                'timeout': 30,
                'retry_attempts': 3,
                'rate_limit': {
                    'requests_per_minute': 1000,
                    'burst_limit': 100
                }
            },
            'epic': {
                'timeout': 60,
                'batch_size': 10,
                'max_concurrent_requests': 5,
                'auth_refresh_buffer': 300  # Refresh token 5 minutes before expiry
            },
            'cerner': {
                'timeout': 45,
                'batch_size': 8,
                'max_concurrent_requests': 3
            },
            'veradigm': {
                'timeout': 90,  # Veradigm APIs can be slower
                'batch_size': 5,
                'max_concurrent_requests': 2
            },
            'monitoring': {
                'health_check_interval': 300,  # 5 minutes
                'performance_logging': True,
                'alert_thresholds': {
                    'success_rate_warning': 95,
                    'success_rate_critical': 80,
                    'response_time_warning': 5.0,
                    'response_time_critical': 10.0
                }
            }
        }
    
    def get_ehr_config(self, ehr_name: str) -> Dict[str, Any]:
        """Get configuration for specific EHR system."""
        return self.config.get(ehr_name.lower(), {})
    
    def get_omophub_config(self) -> Dict[str, Any]:
        """Get OMOPHub API configuration."""
        return self.config['omophub']

EHR-Specific Resources

The OMOPHub API provides comprehensive vocabulary services that seamlessly integrate with major EHR systems, enabling standardized medical coding, improved clinical workflows, and enhanced patient care through better data interoperability.
Epic® is a registered trademark of Epic Systems Corporation. This integration guide is provided for informational purposes and does not constitute an endorsement by Epic Systems Corporation.
I