EHR Integration with OMOPHub
Overview
Electronic Health Record (EHR) systems are the backbone of modern healthcare, but they often use different coding standards and vocabularies, creating data silos and interoperability challenges. OMOPHub provides comprehensive vocabulary services that enable seamless integration with major EHR platforms, ensuring consistent medical coding and improved clinical workflows. This guide demonstrates how to integrate OMOPHub with leading EHR systems including Epic®, Cerner, and Veradigm, supporting both real-time and batch processing scenarios.Epic® is a registered trademark of Epic Systems Corporation. Veradigm is formerly known as Allscripts.
Business Problem
Healthcare organizations face significant challenges when working with EHR data:- Data Fragmentation: Different EHR systems use varying coding standards (ICD-9, ICD-10, SNOMED, proprietary codes)
- Inconsistent Mapping: Manual code mapping is error-prone and time-intensive
- Clinical Workflow Disruption: Terminology lookups interrupt clinical documentation
- Interoperability Barriers: Difficulty sharing data between systems and organizations
- Quality Reporting: Inconsistent codes affect quality metrics and population health analytics
- Prior Authorization: Complex code validation delays patient care
Solution Architecture
Core Implementation
Universal EHR Integration Framework
Copy
import httpx
from typing import Dict, List, Optional, Any, Union
from dataclasses import dataclass
from datetime import datetime, timedelta
import logging
import asyncio
from abc import ABC, abstractmethod
import json
@dataclass
class PatientData:
"""Standardized patient data structure across EHR systems."""
patient_id: str
mrn: str
demographics: Dict[str, Any]
diagnoses: List[Dict[str, str]]
procedures: List[Dict[str, str]]
medications: List[Dict[str, str]]
allergies: List[Dict[str, str]]
lab_results: List[Dict[str, Any]]
vital_signs: List[Dict[str, Any]]
clinical_notes: List[Dict[str, str]]
@dataclass
class CodeMapping:
"""Represents a code mapping result."""
source_code: str
source_system: str
target_code: str
target_system: str
confidence: float
mapped_name: str
class EHRConnector(ABC):
"""Abstract base class for EHR system connectors."""
@abstractmethod
async def authenticate(self) -> bool:
"""Authenticate with the EHR system."""
pass
@abstractmethod
async def get_patient_data(self, patient_id: str) -> PatientData:
"""Extract patient data from EHR."""
pass
@abstractmethod
async def update_patient_codes(self, patient_id: str, mappings: List[CodeMapping]) -> bool:
"""Update patient records with mapped codes."""
pass
class OMOPHubEHRIntegrator:
"""
Main integration service that works with any EHR system.
Provides vocabulary services and code standardization.
"""
def __init__(self, api_key: str, base_url: str = "https://api.omophub.com"):
self.api_key = api_key
self.base_url = base_url
self.headers = {"Authorization": f"Bearer {api_key}"}
self.logger = logging.getLogger(__name__)
self.cache = {}
def standardize_diagnosis_codes(self, diagnoses: List[Dict[str, str]]) -> List[CodeMapping]:
"""
Standardize diagnosis codes to SNOMED CT or ICD-10.
"""
mappings = []
for diagnosis in diagnoses:
source_code = diagnosis.get('code')
source_system = diagnosis.get('system', 'Unknown')
if not source_code:
continue
# Determine target vocabulary based on source
target_vocab = self._determine_target_vocabulary(source_system, 'diagnosis')
try:
# Search for concept in OMOPHub
response = httpx.get(
f"{self.base_url}/v1/concepts/search",
headers=self.headers,
params={
'query': source_code,
'vocabulary_ids': self._map_ehr_to_omophub_vocab(source_system),
'standard_concept': 'S',
'page_size': 1
}
)
response.raise_for_status()
data = response.json()
if data['data']:
concept = data['data'][0]
# Get mapping to target vocabulary
mapping_response = httpx.get(
f"{self.base_url}/v1/concepts/{concept['concept_id']}/mappings",
headers=self.headers,
params={
'target_vocabularies': target_vocab,
'relationship_types': 'Maps to'
}
)
mapping_response.raise_for_status()
mapping_data = mapping_response.json()
if mapping_data['data']:
best_mapping = mapping_data['data'][0]
mappings.append(CodeMapping(
source_code=source_code,
source_system=source_system,
target_code=best_mapping['target_concept_code'],
target_system=target_vocab,
confidence=best_mapping.get('confidence', 1.0),
mapped_name=best_mapping['target_concept_name']
))
else:
# Use the standard concept itself
mappings.append(CodeMapping(
source_code=source_code,
source_system=source_system,
target_code=concept['concept_code'],
target_system=concept['vocabulary_id'],
confidence=0.9,
mapped_name=concept['concept_name']
))
except httpx.RequestError as e:
self.logger.error(f"Error mapping diagnosis {source_code}: {e}")
continue
return mappings
def standardize_procedure_codes(self, procedures: List[Dict[str, str]]) -> List[CodeMapping]:
"""
Standardize procedure codes to HCPCS or SNOMED.
"""
mappings = []
for procedure in procedures:
source_code = procedure.get('code')
source_system = procedure.get('system', 'Unknown')
if not source_code:
continue
target_vocab = self._determine_target_vocabulary(source_system, 'procedure')
try:
response = httpx.get(
f"{self.base_url}/v1/concepts/search",
headers=self.headers,
params={
'query': source_code,
'vocabulary_ids': self._map_ehr_to_omophub_vocab(source_system),
'domain_id': 'Procedure',
'page_size': 1
}
)
response.raise_for_status()
data = response.json()
if data['data']:
concept = data['data'][0]
mappings.append(CodeMapping(
source_code=source_code,
source_system=source_system,
target_code=concept['concept_code'],
target_system=concept['vocabulary_id'],
confidence=0.95,
mapped_name=concept['concept_name']
))
except httpx.RequestError as e:
self.logger.error(f"Error mapping procedure {source_code}: {e}")
continue
return mappings
def standardize_medication_codes(self, medications: List[Dict[str, str]]) -> List[CodeMapping]:
"""
Standardize medication codes to RxNorm.
"""
mappings = []
for medication in medications:
source_code = medication.get('code')
medication_name = medication.get('name', '')
source_system = medication.get('system', 'Unknown')
# Try code first, then name if no code
search_term = source_code if source_code else medication_name
if not search_term:
continue
try:
response = httpx.get(
f"{self.base_url}/v1/concepts/search",
headers=self.headers,
params={
'query': search_term,
'vocabulary_ids': 'RxNorm',
'domain_id': 'Drug',
'standard_concept': 'S',
'page_size': 1
}
)
response.raise_for_status()
data = response.json()
if data['data']:
concept = data['data'][0]
mappings.append(CodeMapping(
source_code=source_code or medication_name,
source_system=source_system,
target_code=concept['concept_code'],
target_system='RxNorm',
confidence=0.9,
mapped_name=concept['concept_name']
))
except httpx.RequestError as e:
self.logger.error(f"Error mapping medication {search_term}: {e}")
continue
return mappings
def normalize_lab_results(self, lab_results: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""
Normalize lab results to LOINC codes with unit standardization.
"""
normalized_results = []
for lab_result in lab_results:
test_name = lab_result.get('test_name', '')
test_code = lab_result.get('test_code', '')
value = lab_result.get('value')
unit = lab_result.get('unit', '')
search_term = test_code if test_code else test_name
if not search_term:
continue
try:
# Find LOINC code
response = httpx.get(
f"{self.base_url}/v1/concepts/search",
headers=self.headers,
params={
'query': search_term,
'vocabulary_ids': 'LOINC',
'domain_id': 'Measurement',
'page_size': 1
}
)
response.raise_for_status()
data = response.json()
if data['data']:
loinc_concept = data['data'][0]
# Normalize units if needed
normalized_unit = self._normalize_units(unit, loinc_concept['concept_code'])
normalized_results.append({
'original_test_name': test_name,
'original_test_code': test_code,
'loinc_code': loinc_concept['concept_code'],
'loinc_name': loinc_concept['concept_name'],
'value': value,
'original_unit': unit,
'normalized_unit': normalized_unit,
'confidence': 0.95
})
except httpx.RequestError as e:
self.logger.error(f"Error normalizing lab result {search_term}: {e}")
continue
return normalized_results
def process_patient_data(self, patient_data: PatientData) -> Dict[str, Any]:
"""
Comprehensive patient data processing with all vocabulary standardization.
"""
results = {
'patient_id': patient_data.patient_id,
'mrn': patient_data.mrn,
'processed_timestamp': datetime.utcnow().isoformat(),
'diagnosis_mappings': [],
'procedure_mappings': [],
'medication_mappings': [],
'normalized_lab_results': [],
'clinical_insights': {},
'quality_indicators': {}
}
# Process diagnoses
results['diagnosis_mappings'] = self.standardize_diagnosis_codes(patient_data.diagnoses)
# Process procedures
results['procedure_mappings'] = self.standardize_procedure_codes(patient_data.procedures)
# Process medications
results['medication_mappings'] = self.standardize_medication_codes(patient_data.medications)
# Process lab results
results['normalized_lab_results'] = self.normalize_lab_results(patient_data.lab_results)
# Generate clinical insights
results['clinical_insights'] = self._generate_clinical_insights(results)
# Calculate quality indicators
results['quality_indicators'] = self._calculate_quality_indicators(results)
return results
def _determine_target_vocabulary(self, source_system: str, data_type: str) -> str:
"""Determine optimal target vocabulary based on source system and data type."""
mapping = {
'diagnosis': {
'ICD9CM': 'ICD10CM',
'ICD10CM': 'SNOMED',
'SNOMED': 'SNOMED',
'default': 'SNOMED'
},
'procedure': {
'ICD9Proc': 'HCPCS',
'ICD10PCS': 'HCPCS',
'HCPCS': 'HCPCS',
'SNOMED': 'SNOMED',
'default': 'HCPCS'
}
}
data_mapping = mapping.get(data_type, {})
return data_mapping.get(source_system, data_mapping.get('default', 'SNOMED'))
def _map_ehr_to_omophub_vocab(self, ehr_system: str) -> str:
"""Map EHR system vocabulary names to OMOPHub vocabulary IDs."""
mapping = {
'ICD9CM': 'ICD9CM',
'ICD10CM': 'ICD10CM',
'ICD10PCS': 'ICD10PCS',
'SNOMED': 'SNOMED',
'HCPCS': 'HCPCS',
'HCPCS': 'HCPCS',
'LOINC': 'LOINC',
'RxNorm': 'RxNorm',
'NDC': 'NDC'
}
return mapping.get(ehr_system, 'SNOMED') # Default to SNOMED
def _normalize_units(self, unit: str, loinc_code: str) -> str:
"""Normalize lab units to UCUM standard."""
# This would typically involve a lookup table or API call
# For demo purposes, returning the original unit
return unit
def _generate_clinical_insights(self, processed_data: Dict) -> Dict[str, Any]:
"""Generate clinical insights from processed vocabulary data."""
insights = {
'chronic_conditions': [],
'drug_interactions': [],
'preventive_care_gaps': [],
'clinical_decision_support': []
}
# Identify chronic conditions from diagnoses
chronic_condition_codes = ['E11', 'I10', 'J44'] # Diabetes, Hypertension, COPD
for mapping in processed_data.get('diagnosis_mappings', []):
if any(mapping.target_code.startswith(code) for code in chronic_condition_codes):
insights['chronic_conditions'].append({
'condition': mapping.mapped_name,
'code': mapping.target_code,
'system': mapping.target_system
})
return insights
def _calculate_quality_indicators(self, processed_data: Dict) -> Dict[str, Any]:
"""Calculate quality indicators based on standardized codes."""
indicators = {
'diabetes_control_eligible': False,
'hypertension_control_eligible': False,
'preventive_screening_due': [],
'medication_adherence_concerns': []
}
# Check for diabetes
for mapping in processed_data.get('diagnosis_mappings', []):
if mapping.target_code.startswith('E11'): # Type 2 diabetes
indicators['diabetes_control_eligible'] = True
break
# Check for hypertension
for mapping in processed_data.get('diagnosis_mappings', []):
if mapping.target_code == 'I10': # Essential hypertension
indicators['hypertension_control_eligible'] = True
break
return indicators
# Epic® Integration (FHIR-based)
class EpicFHIRConnector(EHRConnector):
"""
Epic® FHIR R4 integration connector.
Epic® is a registered trademark of Epic Systems Corporation.
"""
def __init__(self, fhir_base_url: str, client_id: str, client_secret: str):
self.fhir_base_url = fhir_base_url
self.client_id = client_id
self.client_secret = client_secret
self.access_token = None
self.token_expires = None
async def authenticate(self) -> bool:
"""Authenticate using Epic® OAuth 2.0."""
try:
auth_url = f"{self.fhir_base_url}/oauth2/token"
auth_data = {
'grant_type': 'client_credentials',
'client_id': self.client_id,
'client_secret': self.client_secret,
'scope': 'system/*.read'
}
async with httpx.AsyncClient() as client:
response = await client.post(auth_url, data=auth_data)
response.raise_for_status()
token_data = response.json()
self.access_token = token_data['access_token']
self.token_expires = datetime.utcnow() + timedelta(seconds=token_data['expires_in'])
return True
except Exception as e:
logging.error(f"Epic® authentication failed: {e}")
return False
async def get_patient_data(self, patient_id: str) -> PatientData:
"""Extract patient data from Epic® FHIR APIs."""
if not self.access_token or datetime.utcnow() >= self.token_expires:
await self.authenticate()
headers = {
'Authorization': f'Bearer {self.access_token}',
'Accept': 'application/fhir+json'
}
async with httpx.AsyncClient() as client:
# Get patient demographics
patient_response = await client.get(
f"{self.fhir_base_url}/Patient/{patient_id}",
headers=headers
)
patient_response.raise_for_status()
patient = patient_response.json()
# Get conditions (diagnoses)
conditions_response = await client.get(
f"{self.fhir_base_url}/Condition",
headers=headers,
params={'patient': patient_id}
)
conditions_response.raise_for_status()
conditions = conditions_response.json()
# Get procedures
procedures_response = await client.get(
f"{self.fhir_base_url}/Procedure",
headers=headers,
params={'patient': patient_id}
)
procedures_response.raise_for_status()
procedures = procedures_response.json()
# Get medications
medications_response = await client.get(
f"{self.fhir_base_url}/MedicationRequest",
headers=headers,
params={'patient': patient_id}
)
medications_response.raise_for_status()
medications = medications_response.json()
# Get lab results
observations_response = await client.get(
f"{self.fhir_base_url}/Observation",
headers=headers,
params={
'patient': patient_id,
'category': 'laboratory'
}
)
observations_response.raise_for_status()
observations = observations_response.json()
# Convert FHIR resources to standardized format
return PatientData(
patient_id=patient_id,
mrn=self._extract_mrn(patient),
demographics=self._extract_demographics(patient),
diagnoses=self._extract_diagnoses(conditions),
procedures=self._extract_procedures(procedures),
medications=self._extract_medications(medications),
allergies=[], # Would extract from AllergyIntolerance resources
lab_results=self._extract_lab_results(observations),
vital_signs=[], # Would extract from vital signs observations
clinical_notes=[] # Would extract from DocumentReference resources
)
async def update_patient_codes(self, patient_id: str, mappings: List[CodeMapping]) -> bool:
"""Update patient records with standardized codes (if supported by Epic®)."""
# This would typically involve creating or updating FHIR resources
# Implementation depends on Epic® configuration and permissions
logging.info(f"Would update patient {patient_id} with {len(mappings)} code mappings")
return True
def _extract_mrn(self, patient_fhir: Dict) -> str:
"""Extract MRN from Epic® patient resource."""
for identifier in patient_fhir.get('identifier', []):
if identifier.get('type', {}).get('coding', [{}])[0].get('code') == 'MR':
return identifier.get('value', '')
return patient_fhir.get('id', '')
def _extract_demographics(self, patient_fhir: Dict) -> Dict[str, Any]:
"""Extract demographics from Epic® patient resource."""
return {
'name': patient_fhir.get('name', [{}])[0].get('text', ''),
'gender': patient_fhir.get('gender', ''),
'birth_date': patient_fhir.get('birthDate', ''),
'address': patient_fhir.get('address', [{}])[0] if patient_fhir.get('address') else {}
}
def _extract_diagnoses(self, conditions_bundle: Dict) -> List[Dict[str, str]]:
"""Extract diagnoses from Epic® condition resources."""
diagnoses = []
for entry in conditions_bundle.get('entry', []):
condition = entry.get('resource', {})
code_info = condition.get('code', {})
for coding in code_info.get('coding', []):
diagnoses.append({
'code': coding.get('code', ''),
'system': self._map_fhir_system_to_vocab(coding.get('system', '')),
'display': coding.get('display', ''),
'clinical_status': condition.get('clinicalStatus', {}).get('coding', [{}])[0].get('code', '')
})
return diagnoses
def _extract_procedures(self, procedures_bundle: Dict) -> List[Dict[str, str]]:
"""Extract procedures from Epic® procedure resources."""
procedures = []
for entry in procedures_bundle.get('entry', []):
procedure = entry.get('resource', {})
code_info = procedure.get('code', {})
for coding in code_info.get('coding', []):
procedures.append({
'code': coding.get('code', ''),
'system': self._map_fhir_system_to_vocab(coding.get('system', '')),
'display': coding.get('display', ''),
'performed_date': procedure.get('performedDateTime', '')
})
return procedures
def _extract_medications(self, medications_bundle: Dict) -> List[Dict[str, str]]:
"""Extract medications from Epic® medication request resources."""
medications = []
for entry in medications_bundle.get('entry', []):
med_request = entry.get('resource', {})
medication = med_request.get('medicationCodeableConcept', {})
for coding in medication.get('coding', []):
medications.append({
'code': coding.get('code', ''),
'system': self._map_fhir_system_to_vocab(coding.get('system', '')),
'display': coding.get('display', ''),
'status': med_request.get('status', ''),
'intent': med_request.get('intent', '')
})
return medications
def _extract_lab_results(self, observations_bundle: Dict) -> List[Dict[str, Any]]:
"""Extract lab results from Epic® observation resources."""
lab_results = []
for entry in observations_bundle.get('entry', []):
observation = entry.get('resource', {})
code_info = observation.get('code', {})
# Find LOINC coding
loinc_coding = None
for coding in code_info.get('coding', []):
if 'loinc.org' in coding.get('system', ''):
loinc_coding = coding
break
if loinc_coding:
value_info = observation.get('valueQuantity', {})
lab_results.append({
'test_code': loinc_coding.get('code', ''),
'test_name': loinc_coding.get('display', ''),
'value': value_info.get('value'),
'unit': value_info.get('unit', ''),
'reference_range': observation.get('referenceRange', []),
'status': observation.get('status', ''),
'effective_date': observation.get('effectiveDateTime', '')
})
return lab_results
def _map_fhir_system_to_vocab(self, fhir_system: str) -> str:
"""Map FHIR system URLs to vocabulary names."""
mapping = {
'http://snomed.info/sct': 'SNOMED',
'http://hl7.org/fhir/sid/icd-10-cm': 'ICD10CM',
'http://www.ama-assn.org/go/cpt': 'HCPCS',
'http://www.nlm.nih.gov/research/umls/rxnorm': 'RxNorm',
'http://loinc.org': 'LOINC'
}
return mapping.get(fhir_system, 'Unknown')
# Cerner Integration
class CernerFHIRConnector(EHRConnector):
"""Cerner PowerChart FHIR R4 integration connector."""
def __init__(self, fhir_base_url: str, client_id: str, client_secret: str):
self.fhir_base_url = fhir_base_url
self.client_id = client_id
self.client_secret = client_secret
self.access_token = None
self.token_expires = None
async def authenticate(self) -> bool:
"""Authenticate using Cerner OAuth 2.0."""
try:
auth_url = f"{self.fhir_base_url}/authorization/oauth/token"
auth_data = {
'grant_type': 'client_credentials',
'client_id': self.client_id,
'client_secret': self.client_secret,
'scope': 'system/Patient.read system/Observation.read system/Condition.read'
}
timeout = httpx.Timeout(15.0)
async with httpx.AsyncClient(timeout=timeout) as client:
response = await client.post(auth_url, data=auth_data)
response.raise_for_status()
token_data = response.json()
self.access_token = token_data['access_token']
self.token_expires = datetime.utcnow() + timedelta(seconds=token_data.get('expires_in', 3600))
return True
except Exception as e:
logging.error(f"Cerner authentication failed: {e}")
return False
async def get_patient_data(self, patient_id: str) -> PatientData:
"""Extract patient data from Cerner FHIR APIs."""
# Similar implementation to Epic® but with Cerner-specific API patterns
if not self.access_token or (self.token_expires and datetime.utcnow() >= self.token_expires):
await self.authenticate()
headers = {
'Authorization': f'Bearer {self.access_token}',
'Accept': 'application/json+fhir'
}
# Implementation would follow Cerner FHIR API documentation
# For brevity, returning a placeholder
return PatientData(
patient_id=patient_id,
mrn=f"CERNER_{patient_id}",
demographics={},
diagnoses=[],
procedures=[],
medications=[],
allergies=[],
lab_results=[],
vital_signs=[],
clinical_notes=[]
)
async def update_patient_codes(self, patient_id: str, mappings: List[CodeMapping]) -> bool:
"""Update patient records in Cerner system."""
logging.info(f"Would update Cerner patient {patient_id} with {len(mappings)} mappings")
return True
# Veradigm (formerly Allscripts) Integration
class VeradigmConnector(EHRConnector):
"""
Veradigm (formerly Allscripts) integration connector.
Supports both Unity API and Developer API.
"""
def __init__(self, api_base_url: str, username: str, password: str, app_name: str):
self.api_base_url = api_base_url
self.username = username
self.password = password
self.app_name = app_name
self.security_token = None
self.app_user_id = None
async def authenticate(self) -> bool:
"""Authenticate using Veradigm Unity API."""
try:
auth_url = f"{self.api_base_url}/Unity/UnityService.svc/json/GetSecurityToken"
auth_data = {
'username': self.username,
'password': self.password,
'appname': self.app_name
}
async with httpx.AsyncClient() as client:
response = await client.post(auth_url, json=auth_data)
response.raise_for_status()
auth_result = response.json()
self.security_token = auth_result.get('Token')
self.app_user_id = auth_result.get('AppUserID')
return self.security_token is not None
except Exception as e:
logging.error(f"Veradigm authentication failed: {e}")
return False
async def get_patient_data(self, patient_id: str) -> PatientData:
"""Extract patient data from Veradigm Unity API."""
if not self.security_token:
await self.authenticate()
headers = {
'Content-Type': 'application/json'
}
# Get patient demographics
patient_url = f"{self.api_base_url}/Unity/UnityService.svc/json/GetPatient"
patient_params = {
'token': self.security_token,
'appUserID': self.app_user_id,
'patientID': patient_id
}
async with httpx.AsyncClient() as client:
patient_response = await client.get(patient_url, headers=headers, params=patient_params)
patient_response.raise_for_status()
patient_data = patient_response.json()
# Get patient problems (diagnoses)
problems_url = f"{self.api_base_url}/Unity/UnityService.svc/json/GetPatientProblems"
problems_response = await client.get(problems_url, headers=headers, params=patient_params)
problems_response.raise_for_status()
problems_data = problems_response.json()
# Get medications
meds_url = f"{self.api_base_url}/Unity/UnityService.svc/json/GetPatientMedications"
meds_response = await client.get(meds_url, headers=headers, params=patient_params)
meds_response.raise_for_status()
medications_data = meds_response.json()
return PatientData(
patient_id=patient_id,
mrn=patient_data.get('MRN', ''),
demographics=self._extract_veradigm_demographics(patient_data),
diagnoses=self._extract_veradigm_problems(problems_data),
procedures=[], # Would need separate API call
medications=self._extract_veradigm_medications(medications_data),
allergies=[], # Would need separate API call
lab_results=[], # Would need separate API call
vital_signs=[], # Would need separate API call
clinical_notes=[]
)
async def update_patient_codes(self, patient_id: str, mappings: List[CodeMapping]) -> bool:
"""Update patient records in Veradigm system."""
logging.info(f"Would update Veradigm patient {patient_id} with {len(mappings)} mappings")
return True
def _extract_veradigm_demographics(self, patient_data: Dict) -> Dict[str, Any]:
"""Extract demographics from Veradigm patient data."""
return {
'name': f"{patient_data.get('FirstName', '')} {patient_data.get('LastName', '')}",
'gender': patient_data.get('Sex', ''),
'birth_date': patient_data.get('DOB', ''),
'address': {
'street': patient_data.get('Address1', ''),
'city': patient_data.get('City', ''),
'state': patient_data.get('State', ''),
'zip': patient_data.get('Zip', '')
}
}
def _extract_veradigm_problems(self, problems_data: Dict) -> List[Dict[str, str]]:
"""Extract diagnoses from Veradigm patient problems."""
diagnoses = []
for problem in problems_data.get('Problems', []):
diagnoses.append({
'code': problem.get('DiagnosisCode', ''),
'system': 'ICD10CM', # Assuming ICD-10
'display': problem.get('ProblemDescription', ''),
'status': problem.get('ProblemStatus', '')
})
return diagnoses
def _extract_veradigm_medications(self, medications_data: Dict) -> List[Dict[str, str]]:
"""Extract medications from Veradigm medications data."""
medications = []
for med in medications_data.get('Medications', []):
medications.append({
'name': med.get('MedicationName', ''),
'code': med.get('NDC', ''),
'system': 'NDC',
'display': med.get('MedicationName', ''),
'status': med.get('MedicationStatus', '')
})
return medications
# Main Integration Orchestrator
class EHRIntegrationOrchestrator:
"""
Orchestrates integration across multiple EHR systems.
Provides unified interface for vocabulary standardization.
"""
def __init__(self, omophub_api_key: str):
self.omophub_integrator = OMOPHubEHRIntegrator(omophub_api_key)
self.ehr_connectors = {}
self.logger = logging.getLogger(__name__)
def register_ehr_connector(self, ehr_name: str, connector: EHRConnector):
"""Register an EHR system connector."""
self.ehr_connectors[ehr_name] = connector
async def process_patient(self, ehr_name: str, patient_id: str) -> Dict[str, Any]:
"""
Process a patient's data from specified EHR system.
Returns standardized vocabulary mappings and clinical insights.
"""
if ehr_name not in self.ehr_connectors:
raise ValueError(f"EHR system '{ehr_name}' not registered")
connector = self.ehr_connectors[ehr_name]
try:
# Extract patient data from EHR
self.logger.info(f"Extracting patient {patient_id} from {ehr_name}")
patient_data = await connector.get_patient_data(patient_id)
# Process with OMOPHub vocabulary services
self.logger.info(f"Processing vocabulary standardization for patient {patient_id}")
processed_data = self.omophub_integrator.process_patient_data(patient_data)
# Optionally update EHR with standardized codes
if hasattr(connector, 'update_patient_codes'):
all_mappings = (
processed_data['diagnosis_mappings'] +
processed_data['procedure_mappings'] +
processed_data['medication_mappings']
)
await connector.update_patient_codes(patient_id, all_mappings)
return {
'ehr_system': ehr_name,
'processing_timestamp': datetime.utcnow().isoformat(),
'patient_data': processed_data,
'status': 'success'
}
except Exception as e:
self.logger.error(f"Error processing patient {patient_id} from {ehr_name}: {e}")
return {
'ehr_system': ehr_name,
'patient_id': patient_id,
'error': str(e),
'status': 'failed'
}
async def batch_process_patients(self, ehr_name: str, patient_ids: List[str]) -> List[Dict[str, Any]]:
"""Process multiple patients efficiently."""
results = []
# Process in batches of 10 to avoid overwhelming APIs
batch_size = 10
for i in range(0, len(patient_ids), batch_size):
batch = patient_ids[i:i + batch_size]
batch_tasks = [
self.process_patient(ehr_name, patient_id)
for patient_id in batch
]
batch_results = await asyncio.gather(*batch_tasks, return_exceptions=True)
results.extend(batch_results)
# Add delay between batches to respect API rate limits
await asyncio.sleep(1)
return results
# Example usage
async def main():
# Initialize the orchestrator
orchestrator = EHRIntegrationOrchestrator("your-omophub-api-key")
# Register EHR connectors
epic_connector = EpicFHIRConnector(
fhir_base_url="https://fhir.epic.com/interconnect-fhir-oauth/api/FHIR/R4",
client_id="your-epic-client-id",
client_secret="your-epic-client-secret"
)
orchestrator.register_ehr_connector("epic", epic_connector)
cerner_connector = CernerFHIRConnector(
fhir_base_url="https://fhir-open.cerner.com/r4",
client_id="your-cerner-client-id",
client_secret="your-cerner-client-secret"
)
orchestrator.register_ehr_connector("cerner", cerner_connector)
veradigm_connector = VeradigmConnector(
api_base_url="https://unity.developer.allscripts.com",
username="your-username",
password="your-password",
app_name="your-app-name"
)
orchestrator.register_ehr_connector("veradigm", veradigm_connector)
# Process a patient from Epic®
result = await orchestrator.process_patient("epic", "patient-123")
print(json.dumps(result, indent=2))
# Batch process multiple patients
patient_ids = ["patient-123", "patient-456", "patient-789"]
batch_results = await orchestrator.batch_process_patients("epic", patient_ids)
print(f"Processed {len(batch_results)} patients")
for result in batch_results:
if result['status'] == 'success':
patient_data = result['patient_data']
print(f"Patient {patient_data['patient_id']}: "
f"{len(patient_data['diagnosis_mappings'])} diagnoses mapped, "
f"{len(patient_data['procedure_mappings'])} procedures mapped")
if __name__ == "__main__":
asyncio.run(main())
JavaScript/Node.js Implementation
Copy
const axios = require('axios');
const { promisify } = require('util');
class OMOPHubEHRIntegrator {
constructor(apiKey, baseUrl = 'https://api.omophub.com') {
this.apiKey = apiKey;
this.baseUrl = baseUrl;
this.headers = { 'Authorization': `Bearer ${apiKey}` };
}
async standardizeDiagnosisCodes(diagnoses) {
const mappings = [];
for (const diagnosis of diagnoses) {
const sourceCode = diagnosis.code;
const sourceSystem = diagnosis.system || 'Unknown';
if (!sourceCode) continue;
try {
const response = await axios.get(`${this.baseUrl}/v1/concepts/search`, {
headers: this.headers,
params: {
query: sourceCode,
vocabulary_ids: this.mapEHRToOMOPHubVocab(sourceSystem),
standard_concept: 'S',
page_size: 1
}
});
if (response.data.data && response.data.data.length > 0) {
const concept = response.data.data[0];
// Get mapping to target vocabulary
const mappingResponse = await axios.get(
`${this.baseUrl}/v1/concepts/${concept.concept_id}/mappings`,
{
headers: this.headers,
params: {
target_vocabularies: 'SNOMED',
relationship_types: 'Maps to'
}
}
);
if (mappingResponse.data.data && mappingResponse.data.data.length > 0) {
const bestMapping = mappingResponse.data.data[0];
mappings.push({
sourceCode,
sourceSystem,
targetCode: bestMapping.target_concept_code,
targetSystem: 'SNOMED',
confidence: bestMapping.confidence || 1.0,
mappedName: bestMapping.target_concept_name
});
}
}
} catch (error) {
console.error(`Error mapping diagnosis ${sourceCode}:`, error.message);
}
}
return mappings;
}
async processPatientData(patientData) {
const results = {
patientId: patientData.patient_id,
mrn: patientData.mrn,
processedTimestamp: new Date().toISOString(),
diagnosisMappings: [],
procedureMappings: [],
medicationMappings: [],
normalizedLabResults: [],
clinicalInsights: {},
qualityIndicators: {}
};
// Process diagnoses
results.diagnosisMappings = await this.standardizeDiagnosisCodes(patientData.diagnoses || []);
// Process procedures
results.procedureMappings = await this.standardizeProcedureCodes(patientData.procedures || []);
// Process medications
results.medicationMappings = await this.standardizeMedicationCodes(patientData.medications || []);
// Generate insights
results.clinicalInsights = this.generateClinicalInsights(results);
results.qualityIndicators = this.calculateQualityIndicators(results);
return results;
}
mapEHRToOMOPHubVocab(ehrSystem) {
const mapping = {
'ICD9CM': 'ICD9CM',
'ICD10CM': 'ICD10CM',
'SNOMED': 'SNOMED',
'HCPCS': 'HCPCS',
'LOINC': 'LOINC',
'RxNorm': 'RxNorm'
};
return mapping[ehrSystem] || 'SNOMED';
}
generateClinicalInsights(processedData) {
const insights = {
chronicConditions: [],
drugInteractions: [],
preventiveCareGaps: []
};
// Identify chronic conditions
const chronicCodes = ['E11', 'I10', 'J44']; // Diabetes, Hypertension, COPD
for (const mapping of processedData.diagnosisMappings) {
if (chronicCodes.some(code => mapping.targetCode.startsWith(code))) {
insights.chronicConditions.push({
condition: mapping.mappedName,
code: mapping.targetCode,
system: mapping.targetSystem
});
}
}
return insights;
}
calculateQualityIndicators(processedData) {
const indicators = {
diabetesControlEligible: false,
hypertensionControlEligible: false,
preventiveScreeningDue: []
};
// Check for diabetes
for (const mapping of processedData.diagnosisMappings) {
if (mapping.targetCode.startsWith('E11')) {
indicators.diabetesControlEligible = true;
break;
}
}
return indicators;
}
}
// Epic® FHIR Connector
class EpicFHIRConnector {
constructor(fhirBaseUrl, clientId, clientSecret) {
this.fhirBaseUrl = fhirBaseUrl;
this.clientId = clientId;
this.clientSecret = clientSecret;
this.accessToken = null;
this.tokenExpires = null;
}
async authenticate() {
try {
const authUrl = `${this.fhirBaseUrl}/oauth2/token`;
const authData = {
grant_type: 'client_credentials',
client_id: this.clientId,
client_secret: this.clientSecret,
scope: 'system/*.read'
};
// URL-encode the auth payload for OAuth2 token endpoint
const formData = new URLSearchParams(authData).toString();
const response = await axios.post(authUrl, formData, {
headers: {
'Content-Type': 'application/x-www-form-urlencoded'
}
});
const tokenData = response.data;
this.accessToken = tokenData.access_token;
this.tokenExpires = new Date(Date.now() + (tokenData.expires_in * 1000));
return true;
} catch (error) {
console.error('Epic® authentication failed:', error.message);
return false;
}
}
async getPatientData(patientId) {
if (!this.accessToken || new Date() >= this.tokenExpires) {
await this.authenticate();
}
const headers = {
'Authorization': `Bearer ${this.accessToken}`,
'Accept': 'application/fhir+json'
};
try {
// Get patient demographics
const patientResponse = await axios.get(
`${this.fhirBaseUrl}/Patient/${patientId}`,
{ headers }
);
const patient = patientResponse.data;
// Get conditions
const conditionsResponse = await axios.get(
`${this.fhirBaseUrl}/Condition`,
{
headers,
params: { patient: patientId }
}
);
const conditions = conditionsResponse.data;
// Get medications
const medicationsResponse = await axios.get(
`${this.fhirBaseUrl}/MedicationRequest`,
{
headers,
params: { patient: patientId }
}
);
const medications = medicationsResponse.data;
return {
patient_id: patientId,
mrn: this.extractMRN(patient),
demographics: this.extractDemographics(patient),
diagnoses: this.extractDiagnoses(conditions),
procedures: [], // Would extract from Procedure resources
medications: this.extractMedications(medications),
allergies: [],
lab_results: [],
vital_signs: [],
clinical_notes: []
};
} catch (error) {
console.error(`Error extracting patient data from Epic®:`, error.message);
throw error;
}
}
extractMRN(patient) {
const identifiers = patient.identifier || [];
for (const identifier of identifiers) {
const coding = identifier.type?.coding?.[0];
if (coding?.code === 'MR') {
return identifier.value || '';
}
}
return patient.id || '';
}
extractDemographics(patient) {
return {
name: patient.name?.[0]?.text || '',
gender: patient.gender || '',
birth_date: patient.birthDate || '',
address: patient.address?.[0] || {}
};
}
extractDiagnoses(conditionsBundle) {
const diagnoses = [];
for (const entry of conditionsBundle.entry || []) {
const condition = entry.resource || {};
const codeInfo = condition.code || {};
for (const coding of codeInfo.coding || []) {
diagnoses.push({
code: coding.code || '',
system: this.mapFHIRSystemToVocab(coding.system || ''),
display: coding.display || '',
clinical_status: condition.clinicalStatus?.coding?.[0]?.code || ''
});
}
}
return diagnoses;
}
extractMedications(medicationsBundle) {
const medications = [];
for (const entry of medicationsBundle.entry || []) {
const medRequest = entry.resource || {};
const medication = medRequest.medicationCodeableConcept || {};
for (const coding of medication.coding || []) {
medications.push({
code: coding.code || '',
system: this.mapFHIRSystemToVocab(coding.system || ''),
display: coding.display || '',
status: medRequest.status || '',
intent: medRequest.intent || ''
});
}
}
return medications;
}
mapFHIRSystemToVocab(fhirSystem) {
const mapping = {
'http://snomed.info/sct': 'SNOMED',
'http://hl7.org/fhir/sid/icd-10-cm': 'ICD10CM',
'http://www.ama-assn.org/go/cpt': 'HCPCS',
'http://www.nlm.nih.gov/research/umls/rxnorm': 'RxNorm',
'http://loinc.org': 'LOINC'
};
return mapping[fhirSystem] || 'Unknown';
}
}
// Integration Orchestrator
class EHRIntegrationOrchestrator {
constructor(omophubApiKey) {
this.OMOPHUBIntegrator = new OMOPHubEHRIntegrator(omophubApiKey);
this.ehrConnectors = new Map();
}
registerEHRConnector(ehrName, connector) {
this.ehrConnectors.set(ehrName, connector);
}
async processPatient(ehrName, patientId) {
if (!this.ehrConnectors.has(ehrName)) {
throw new Error(`EHR system '${ehrName}' not registered`);
}
const connector = this.ehrConnectors.get(ehrName);
try {
console.log(`Extracting patient ${patientId} from ${ehrName}`);
const patientData = await connector.getPatientData(patientId);
console.log(`Processing vocabulary standardization for patient ${patientId}`);
const processedData = await this.OMOPHUBIntegrator.processPatientData(patientData);
return {
ehrSystem: ehrName,
processingTimestamp: new Date().toISOString(),
patientData: processedData,
status: 'success'
};
} catch (error) {
console.error(`Error processing patient ${patientId} from ${ehrName}:`, error.message);
return {
ehrSystem: ehrName,
patientId,
error: error.message,
status: 'failed'
};
}
}
async batchProcessPatients(ehrName, patientIds, batchSize = 10) {
const results = [];
for (let i = 0; i < patientIds.length; i += batchSize) {
const batch = patientIds.slice(i, i + batchSize);
const batchPromises = batch.map(patientId =>
this.processPatient(ehrName, patientId)
);
const batchResults = await Promise.allSettled(batchPromises);
results.push(...batchResults.map(result => result.value || result.reason));
// Add delay between batches
if (i + batchSize < patientIds.length) {
await new Promise(resolve => setTimeout(resolve, 1000));
}
}
return results;
}
}
// Example usage
async function main() {
const orchestrator = new EHRIntegrationOrchestrator('your-omophub-api-key');
// Register Epic® connector
const epicConnector = new EpicFHIRConnector(
'https://fhir.epic.com/interconnect-fhir-oauth/api/FHIR/R4',
'your-epic-client-id',
'your-epic-client-secret'
);
orchestrator.registerEHRConnector('epic', epicConnector);
try {
// Process a single patient
const result = await orchestrator.processPatient('epic', 'patient-123');
console.log(JSON.stringify(result, null, 2));
// Batch process multiple patients
const patientIds = ['patient-123', 'patient-456', 'patient-789'];
const batchResults = await orchestrator.batchProcessPatients('epic', patientIds);
console.log(`Processed ${batchResults.length} patients`);
batchResults.forEach(result => {
if (result.status === 'success') {
const patientData = result.patientData;
console.log(`Patient ${patientData.patientId}: ${patientData.diagnosisMappings.length} diagnoses mapped`);
}
});
} catch (error) {
console.error('Integration error:', error.message);
}
}
if (require.main === module) {
main();
}
module.exports = {
OMOPHubEHRIntegrator,
EpicFHIRConnector,
EHRIntegrationOrchestrator
};
Clinical Workflow Integration Patterns
Real-time Decision Support
Copy
class ClinicalDecisionSupport:
"""
Real-time clinical decision support using OMOPHub vocabulary services.
Integrates with EHR workflow to provide point-of-care alerts and suggestions.
"""
def __init__(self, omophub_integrator: OMOPHubEHRIntegrator):
self.omophub = omophub_integrator
self.clinical_rules = self._load_clinical_rules()
async def evaluate_patient_context(self, patient_data: PatientData) -> Dict[str, Any]:
"""
Evaluate patient context for clinical decision support.
Called during clinical documentation or order entry.
"""
alerts = []
suggestions = []
# Process current patient data
processed_data = self.omophub.process_patient_data(patient_data)
# Check for drug interactions
drug_alerts = await self._check_drug_interactions(processed_data['medication_mappings'])
alerts.extend(drug_alerts)
# Check for allergy conflicts
allergy_alerts = await self._check_allergy_conflicts(
processed_data['medication_mappings'],
patient_data.allergies
)
alerts.extend(allergy_alerts)
# Generate preventive care suggestions
preventive_suggestions = await self._generate_preventive_care_suggestions(processed_data)
suggestions.extend(preventive_suggestions)
# Check for quality measure opportunities
quality_suggestions = await self._check_quality_measures(processed_data)
suggestions.extend(quality_suggestions)
return {
'alerts': alerts,
'suggestions': suggestions,
'patient_summary': self._generate_patient_summary(processed_data),
'risk_scores': self._calculate_risk_scores(processed_data)
}
async def _check_drug_interactions(self, medications: List[CodeMapping]) -> List[Dict]:
"""Check for potential drug-drug interactions using RxNorm codes."""
interactions = []
# Get RxNorm codes
rxnorm_codes = [med.target_code for med in medications if med.target_system == 'RxNorm']
# Check pairwise interactions
for i in range(len(rxnorm_codes)):
for j in range(i + 1, len(rxnorm_codes)):
# This would typically call a drug interaction API
# For demo purposes, flagging based on code patterns
if self._has_potential_interaction(rxnorm_codes[i], rxnorm_codes[j]):
interactions.append({
'type': 'drug_interaction',
'severity': 'moderate',
'message': f'Potential interaction between {rxnorm_codes[i]} and {rxnorm_codes[j]}',
'drug1': rxnorm_codes[i],
'drug2': rxnorm_codes[j],
'recommendation': 'Monitor patient closely'
})
return interactions
async def _check_allergy_conflicts(self, medications: List[CodeMapping], allergies: List[Dict]) -> List[Dict]:
"""Check medications against patient allergies."""
conflicts = []
for medication in medications:
for allergy in allergies:
# Check if medication could trigger allergy
if self._medication_triggers_allergy(medication, allergy):
conflicts.append({
'type': 'allergy_conflict',
'severity': 'high',
'message': f'Medication {medication.mapped_name} may trigger {allergy.get("allergen", "")} allergy',
'medication': medication.mapped_name,
'allergen': allergy.get('allergen', ''),
'recommendation': 'Consider alternative medication'
})
return conflicts
async def _generate_preventive_care_suggestions(self, processed_data: Dict) -> List[Dict]:
"""Generate preventive care suggestions based on patient conditions."""
suggestions = []
# Check for diabetes management
if processed_data['quality_indicators'].get('diabetes_control_eligible'):
suggestions.append({
'type': 'preventive_care',
'category': 'diabetes_management',
'message': 'Patient eligible for HbA1c monitoring',
'recommendation': 'Schedule HbA1c test within 3 months',
'priority': 'medium'
})
# Check for hypertension management
if processed_data['quality_indicators'].get('hypertension_control_eligible'):
suggestions.append({
'type': 'preventive_care',
'category': 'hypertension_management',
'message': 'Patient eligible for blood pressure monitoring',
'recommendation': 'Monitor blood pressure regularly',
'priority': 'medium'
})
return suggestions
def _has_potential_interaction(self, drug1: str, drug2: str) -> bool:
"""Simple drug interaction check (would be more sophisticated in production)."""
# Example: ACE inhibitors and potassium supplements
ace_inhibitors = ['lisinopril', 'enalapril', 'captopril']
potassium_supplements = ['potassium_chloride']
return (any(ace in drug1.lower() for ace in ace_inhibitors) and
any(pot in drug2.lower() for pot in potassium_supplements))
def _medication_triggers_allergy(self, medication: CodeMapping, allergy: Dict) -> bool:
"""Check if medication might trigger patient allergy."""
allergen = allergy.get('allergen', '').lower()
medication_name = medication.mapped_name.lower()
# Simple string matching - production would use more sophisticated methods
return allergen in medication_name or medication_name in allergen
# Workflow Integration Example
class EHRWorkflowIntegration:
"""
Integrates vocabulary services into clinical workflows.
Provides hooks for common EHR workflow events.
"""
def __init__(self, orchestrator: EHRIntegrationOrchestrator):
self.orchestrator = orchestrator
self.decision_support = ClinicalDecisionSupport(orchestrator.omophub_integrator)
async def on_patient_chart_open(self, ehr_name: str, patient_id: str) -> Dict[str, Any]:
"""
Called when clinician opens patient chart.
Pre-loads standardized vocabulary data and clinical insights.
"""
try:
# Process patient data in background
result = await self.orchestrator.process_patient(ehr_name, patient_id)
if result['status'] == 'success':
# Generate clinical decision support
patient_raw_data = result['patient_data'] # Would need actual PatientData object
# cds_result = await self.decision_support.evaluate_patient_context(patient_raw_data)
return {
'patient_summary': {
'total_diagnoses': len(result['patient_data']['diagnosis_mappings']),
'chronic_conditions': len(result['patient_data']['clinical_insights'].get('chronic_conditions', [])),
'active_medications': len(result['patient_data']['medication_mappings'])
},
'standardized_data_ready': True,
'last_updated': result['processing_timestamp']
}
else:
return {'error': result.get('error'), 'standardized_data_ready': False}
except Exception as e:
return {'error': str(e), 'standardized_data_ready': False}
async def on_diagnosis_entry(self, diagnosis_text: str, ehr_name: str, patient_id: str) -> Dict[str, Any]:
"""
Called when clinician enters diagnosis.
Provides real-time vocabulary suggestions and validation.
"""
suggestions = []
try:
# Search for matching concepts
async with httpx.AsyncClient() as client:
response = await client.get(
f"{self.orchestrator.omophub_integrator.base_url}/v1/concepts/search",
headers=self.orchestrator.omophub_integrator.headers,
params={
'query': diagnosis_text,
'domain_id': 'Condition',
'standard_concept': 'S',
'page_size': 5
}
)
response.raise_for_status()
data = response.json()
for concept in data.get('data', []):
suggestions.append({
'concept_id': concept['concept_id'],
'concept_code': concept['concept_code'],
'concept_name': concept['concept_name'],
'vocabulary_id': concept['vocabulary_id'],
'confidence': 0.9 # Would be calculated based on text similarity
})
except Exception as e:
return {'error': str(e), 'suggestions': []}
return {
'suggestions': suggestions,
'auto_coding_available': len(suggestions) > 0
}
async def on_medication_order(self, medication_name: str, patient_id: str, ehr_name: str) -> Dict[str, Any]:
"""
Called when clinician orders medication.
Provides safety checks and standardized coding.
"""
try:
# Get patient data for safety checks
patient_result = await self.orchestrator.process_patient(ehr_name, patient_id)
if patient_result['status'] != 'success':
return {'error': 'Unable to retrieve patient data for safety checks'}
# Find RxNorm code for medication
async with httpx.AsyncClient() as client:
response = await client.get(
f"{self.orchestrator.omophub_integrator.base_url}/v1/concepts/search",
headers=self.orchestrator.omophub_integrator.headers,
params={
'query': medication_name,
'vocabulary_ids': 'RxNorm',
'domain_id': 'Drug',
'page_size': 1
}
)
response.raise_for_status()
data = response.json()
if not data.get('data'):
return {'error': 'Medication not found in RxNorm', 'safety_alerts': []}
rxnorm_concept = data['data'][0]
# Simulate safety checks (would be more comprehensive in production)
safety_alerts = []
# Check against patient allergies
patient_data = patient_result['patient_data']
# Would perform actual allergy checking here
return {
'rxnorm_code': rxnorm_concept['concept_code'],
'rxnorm_name': rxnorm_concept['concept_name'],
'safety_alerts': safety_alerts,
'coding_confidence': 0.95
}
except Exception as e:
return {'error': str(e)}
Performance and Monitoring
Integration Health Monitoring
Copy
class EHRIntegrationMonitor:
"""Monitor EHR integration health and performance."""
def __init__(self, orchestrator: EHRIntegrationOrchestrator):
self.orchestrator = orchestrator
self.metrics = {}
self.health_checks = {}
async def run_health_checks(self) -> Dict[str, Any]:
"""Run comprehensive health checks on all EHR connections."""
results = {}
for ehr_name, connector in self.orchestrator.ehr_connectors.items():
try:
# Test authentication
auth_success = await connector.authenticate()
# Test basic API connectivity
if hasattr(connector, 'test_connection'):
api_success = await connector.test_connection()
else:
api_success = True # Assume working if no test method
results[ehr_name] = {
'authentication': 'pass' if auth_success else 'fail',
'api_connectivity': 'pass' if api_success else 'fail',
'overall_status': 'healthy' if auth_success and api_success else 'unhealthy',
'last_checked': datetime.utcnow().isoformat()
}
except Exception as e:
results[ehr_name] = {
'error': str(e),
'overall_status': 'error',
'last_checked': datetime.utcnow().isoformat()
}
return results
def track_processing_metrics(self, ehr_name: str, processing_time: float, success: bool):
"""Track performance metrics for processing operations."""
if ehr_name not in self.metrics:
self.metrics[ehr_name] = {
'total_requests': 0,
'successful_requests': 0,
'failed_requests': 0,
'average_processing_time': 0,
'processing_times': []
}
metrics = self.metrics[ehr_name]
metrics['total_requests'] += 1
if success:
metrics['successful_requests'] += 1
else:
metrics['failed_requests'] += 1
metrics['processing_times'].append(processing_time)
if len(metrics['processing_times']) > 100: # Keep only last 100 times
metrics['processing_times'] = metrics['processing_times'][-100:]
metrics['average_processing_time'] = sum(metrics['processing_times']) / len(metrics['processing_times'])
def get_performance_report(self) -> Dict[str, Any]:
"""Generate performance report for all EHR systems."""
report = {
'generated_at': datetime.utcnow().isoformat(),
'ehr_systems': {}
}
for ehr_name, metrics in self.metrics.items():
success_rate = (metrics['successful_requests'] / metrics['total_requests'] * 100) if metrics['total_requests'] > 0 else 0
report['ehr_systems'][ehr_name] = {
'total_requests': metrics['total_requests'],
'success_rate': round(success_rate, 2),
'average_processing_time': round(metrics['average_processing_time'], 3),
'status': 'healthy' if success_rate > 95 else 'warning' if success_rate > 80 else 'critical'
}
return report
Security and Compliance
HIPAA-Compliant Integration
Copy
class HIPAACompliantEHRIntegrator(OMOPHubEHRIntegrator):
"""
HIPAA-compliant version of EHR integrator with enhanced security and audit logging.
"""
def __init__(self, api_key: str, base_url: str = "https://api.omophub.com"):
super().__init__(api_key, base_url)
self.audit_logger = self._setup_audit_logger()
self.encryption_key = self._load_encryption_key()
def process_patient_data(self, patient_data: PatientData) -> Dict[str, Any]:
"""Process patient data with full HIPAA audit trail."""
audit_id = self._generate_audit_id()
try:
# Log access attempt
self.audit_logger.info({
'audit_id': audit_id,
'action': 'patient_data_processing_start',
'patient_id': patient_data.patient_id,
'timestamp': datetime.utcnow().isoformat(),
'user_context': self._get_user_context()
})
# Sanitize PHI before processing
sanitized_data = self._sanitize_phi(patient_data)
# Process with standard method
result = super().process_patient_data(sanitized_data)
# Log successful completion
self.audit_logger.info({
'audit_id': audit_id,
'action': 'patient_data_processing_complete',
'status': 'success',
'records_processed': {
'diagnoses': len(result['diagnosis_mappings']),
'procedures': len(result['procedure_mappings']),
'medications': len(result['medication_mappings'])
},
'timestamp': datetime.utcnow().isoformat()
})
return result
except Exception as e:
# Log error
self.audit_logger.error({
'audit_id': audit_id,
'action': 'patient_data_processing_error',
'error': str(e),
'timestamp': datetime.utcnow().isoformat()
})
raise
def _setup_audit_logger(self):
"""Setup HIPAA-compliant audit logging."""
audit_logger = logging.getLogger('hipaa_ehr_audit')
audit_logger.setLevel(logging.INFO)
# Configure secure, encrypted log storage
handler = logging.FileHandler('/var/log/hipaa/ehr_integration_audit.log')
formatter = logging.Formatter(
'%(asctime)s - %(levelname)s - %(message)s'
)
handler.setFormatter(formatter)
audit_logger.addHandler(handler)
return audit_logger
def _sanitize_phi(self, patient_data: PatientData) -> PatientData:
"""Remove or mask PHI from patient data before processing."""
# Implementation would depend on specific PHI requirements
# For demo purposes, returning original data
return patient_data
def _generate_audit_id(self) -> str:
"""Generate unique audit ID for tracking."""
import uuid
return str(uuid.uuid4())
def _get_user_context(self) -> Dict[str, str]:
"""Get current user context for audit logging."""
return {
'user_id': 'system', # Would be actual user ID
'session_id': 'session_123', # Would be actual session
'ip_address': '127.0.0.1' # Would be actual IP
}
def _load_encryption_key(self) -> bytes:
"""Load encryption key for sensitive data."""
# Would load from secure key management system
return b'demo_key_would_be_from_key_management_system'
Best Practices and Recommendations
Configuration Management
Copy
class EHRIntegrationConfig:
"""Centralized configuration management for EHR integrations."""
def __init__(self):
self.config = {
'omophub': {
'base_url': 'https://api.omophub.com',
'timeout': 30,
'retry_attempts': 3,
'rate_limit': {
'requests_per_minute': 1000,
'burst_limit': 100
}
},
'epic': {
'timeout': 60,
'batch_size': 10,
'max_concurrent_requests': 5,
'auth_refresh_buffer': 300 # Refresh token 5 minutes before expiry
},
'cerner': {
'timeout': 45,
'batch_size': 8,
'max_concurrent_requests': 3
},
'veradigm': {
'timeout': 90, # Veradigm APIs can be slower
'batch_size': 5,
'max_concurrent_requests': 2
},
'monitoring': {
'health_check_interval': 300, # 5 minutes
'performance_logging': True,
'alert_thresholds': {
'success_rate_warning': 95,
'success_rate_critical': 80,
'response_time_warning': 5.0,
'response_time_critical': 10.0
}
}
}
def get_ehr_config(self, ehr_name: str) -> Dict[str, Any]:
"""Get configuration for specific EHR system."""
return self.config.get(ehr_name.lower(), {})
def get_omophub_config(self) -> Dict[str, Any]:
"""Get OMOPHub API configuration."""
return self.config['omophub']
EHR-Specific Resources
- Epic® FHIR Documentation: https://fhir.epic.com/
- Cerner FHIR Documentation: https://fhir.cerner.com/
- Veradigm Developer Portal: https://developer.veradigm.com/
Epic® is a registered trademark of Epic Systems Corporation. This integration guide is provided for informational purposes and does not constitute an endorsement by Epic Systems Corporation.