FHIR Integration with OMOPHub
Overview
Fast Healthcare Interoperability Resources (FHIR) is the global standard for exchanging healthcare information electronically. OMOPHub provides comprehensive terminology services that enable FHIR-compliant applications to properly code, validate, and translate medical concepts across different vocabularies. This guide demonstrates how to leverage OMOPHub as a terminology server for FHIR resources, supporting ValueSet expansion, ConceptMap translation, and proper coding of clinical data.Why FHIR Needs Robust Terminology Services
FHIR resources rely heavily on standardized medical terminologies:- Observation: LOINC codes for lab results and vital signs
- Condition: SNOMED CT and ICD-10 for diagnoses
- Medication: RxNorm for drug identification
- Procedure: HCPCS and SNOMED CT for procedures
- AllergyIntolerance: SNOMED CT for allergen coding
- Inconsistent coding across systems
- Failed validation of resources
- Inability to map between required code systems
- Non-compliance with implementation guides
Architecture Overview
Core Implementation
FHIR Terminology Service Foundation
Copy
import requests
import time
from typing import List, Dict, Optional, Any
from dataclasses import dataclass
from datetime import datetime
import json
import logging
from enum import Enum
class FHIRResourceType(Enum):
OBSERVATION = "Observation"
CONDITION = "Condition"
MEDICATION_REQUEST = "MedicationRequest"
PROCEDURE = "Procedure"
ALLERGY_INTOLERANCE = "AllergyIntolerance"
@dataclass
class CodeableConcept:
"""FHIR CodeableConcept representation"""
text: str
codings: List[Dict[str, str]]
@dataclass
class ValueSetExpansion:
"""Expanded ValueSet with all included concepts"""
id: str
name: str
url: str
concepts: List[Dict[str, Any]]
total: int
timestamp: str
@dataclass
class ConceptMapTranslation:
"""Translation between source and target code systems"""
source_code: str
source_system: str
target_code: str
target_system: str
equivalence: str
confidence: float
class OMOPHubFHIRTerminologyService:
"""
FHIR-compliant terminology service using OMOPHub API.
Implements key FHIR terminology operations.
"""
def __init__(self, api_key: str, base_url: str = "https://api.omophub.com"):
self.api_key = api_key
self.base_url = base_url
self.headers = {"Authorization": f"Bearer {api_key}"}
self.logger = logging.getLogger(__name__)
# FHIR system URIs
self.system_uris = {
"SNOMED": "http://snomed.info/sct",
"LOINC": "http://loinc.org",
"ICD10CM": "http://hl7.org/fhir/sid/icd-10-cm",
"ICD10PCS": "http://hl7.org/fhir/sid/icd-10-pcs",
"RXNORM": "http://www.nlm.nih.gov/research/umls/rxnorm",
"HCPCS": "https://www.cms.gov/Medicare/Coding/HCPCSReleaseCodeSets",
"HCPCS": "https://www.cms.gov/Medicare/Coding/HCPCSReleaseCodeSets",
"NDC": "http://hl7.org/fhir/sid/ndc",
"CVX": "http://hl7.org/fhir/sid/cvx",
"UCUM": "http://unitsofmeasure.org"
}
# OMOPHub to FHIR vocabulary mapping
self.vocabulary_mapping = {
"SNOMED": "SNOMED",
"LOINC": "LOINC",
"ICD10CM": "ICD10CM",
"ICD10PCS": "ICD10PCS",
"RxNorm": "RXNORM",
"HCPCS": "HCPCS",
"HCPCS": "HCPCS",
"NDC": "NDC",
"CVX": "CVX"
}
def validate_code(self, code: str, system: str, display: Optional[str] = None) -> Dict[str, Any]:
"""
Validate a code within a specified system.
FHIR $validate-code operation.
"""
try:
# Map FHIR system URI to OMOPHub vocabulary
vocabulary_id = self._get_vocabulary_id(system)
# Search for the code
response = requests.get(
f"{self.base_url}/v1/concepts/search",
headers=self.headers,
params={
'query': code,
'vocabulary_ids': vocabulary_id,
'page_size': 1
}
)
response.raise_for_status()
data = response.json()
if data['data']:
concept = data['data'][0]
valid = concept['concept_code'] == code
# Check display text if provided
display_valid = True
if display and valid:
display_valid = display.lower() in concept['concept_name'].lower()
return {
'valid': valid and display_valid,
'concept': concept,
'message': 'Code is valid' if valid else f'Code {code} not found in {system}'
}
else:
return {
'valid': False,
'message': f'Code {code} not found in {system}'
}
except requests.RequestException as e:
self.logger.error(f"Error validating code: {e}")
return {
'valid': False,
'message': f'Error validating code: {str(e)}'
}
def expand_valueset(self, valueset_definition: Dict[str, Any]) -> ValueSetExpansion:
"""
Expand a FHIR ValueSet to include all member concepts.
FHIR $expand operation.
"""
expanded_concepts = []
# Process each include section
for include in valueset_definition.get('compose', {}).get('include', []):
system = include.get('system')
if not system:
continue
vocabulary_id = self._get_vocabulary_id(system)
# Handle different inclusion patterns
if 'concept' in include:
# Explicit concept list
for concept_ref in include['concept']:
code = concept_ref.get('code')
if code:
result = self.validate_code(code, system)
if result['valid']:
expanded_concepts.append({
'system': system,
'code': code,
'display': result['concept']['concept_name']
})
elif 'filter' in include:
# Filter-based inclusion
for filter_item in include['filter']:
property_name = filter_item.get('property')
op = filter_item.get('op')
value = filter_item.get('value')
if property_name == 'concept' and op == 'is-a':
# Get descendants of a concept
descendants = self._get_concept_descendants(value, vocabulary_id)
for desc in descendants:
expanded_concepts.append({
'system': system,
'code': desc['concept_code'],
'display': desc['concept_name']
})
else:
# Include all concepts from the system (limited for demo)
all_concepts = self._get_system_concepts(vocabulary_id, limit=100)
for concept in all_concepts:
expanded_concepts.append({
'system': system,
'code': concept['concept_code'],
'display': concept['concept_name']
})
# Process exclusions
for exclude in valueset_definition.get('compose', {}).get('exclude', []):
system = exclude.get('system')
if 'concept' in exclude:
for concept_ref in exclude['concept']:
code = concept_ref.get('code')
expanded_concepts = [
c for c in expanded_concepts
if not (c['system'] == system and c['code'] == code)
]
return ValueSetExpansion(
id=valueset_definition.get('id', 'expanded-valueset'),
name=valueset_definition.get('name', 'Expanded ValueSet'),
url=valueset_definition.get('url', ''),
concepts=expanded_concepts,
total=len(expanded_concepts),
timestamp=datetime.utcnow().isoformat()
)
def translate_concept(self, code: str, source_system: str, target_system: str) -> List[ConceptMapTranslation]:
"""
Translate a concept from one code system to another.
FHIR $translate operation.
"""
translations = []
try:
source_vocab = self._get_vocabulary_id(source_system)
target_vocab = self._get_vocabulary_id(target_system)
# Get source concept
source_response = requests.get(
f"{self.base_url}/v1/concepts/search",
headers=self.headers,
params={
'query': code,
'vocabulary_ids': source_vocab,
'page_size': 1
}
)
source_response.raise_for_status()
source_data = source_response.json()
if not source_data['data']:
return translations
source_concept_id = source_data['data'][0]['concept_id']
# Get mappings
mappings_response = requests.get(
f"{self.base_url}/v1/concepts/{source_concept_id}/mappings",
headers=self.headers,
params={
'target_vocabularies': target_vocab
}
)
mappings_response.raise_for_status()
mappings_data = mappings_response.json()
# Create translations
for mapping in mappings_data.get('data', []):
if mapping['target_vocabulary_id'] == target_vocab:
translations.append(ConceptMapTranslation(
source_code=code,
source_system=source_system,
target_code=mapping['target_concept_code'],
target_system=target_system,
equivalence=self._get_fhir_equivalence(mapping.get('relationship_id')),
confidence=mapping.get('confidence', 1.0)
))
return translations
except requests.RequestException as e:
self.logger.error(f"Error translating concept: {e}")
return translations
def create_codeable_concept(self, code: str, system: str, display: Optional[str] = None) -> CodeableConcept:
"""
Create a FHIR CodeableConcept with proper coding.
"""
# Validate and get full concept details
validation = self.validate_code(code, system)
if validation['valid']:
concept = validation['concept']
display_text = display or concept['concept_name']
# Create primary coding
codings = [{
'system': system,
'code': code,
'display': display_text
}]
# Add additional codings from mappings if available
try:
# Get standard concept mappings
if concept.get('standard_concept') == 'S':
mapping_endpoint = f"{self.base_url}/v1/concepts/{concept['concept_id']}/mappings"
mappings_response = requests.get(
mapping_endpoint,
headers=self.headers,
params={'relationship_types': 'Maps to'}
)
mappings_response.raise_for_status()
mappings = mappings_response.json()
for mapping in mappings.get('data', [])[:2]: # Limit to 2 additional codings
target_system = self._get_fhir_system(mapping['target_vocabulary_id'])
if target_system:
codings.append({
'system': target_system,
'code': mapping['target_concept_code'],
'display': mapping.get('target_concept_name', '')
})
except requests.exceptions.RequestException as e:
# Log network/HTTP errors with context
if hasattr(self, 'logger'):
self.logger.exception(
f"Failed to fetch concept mappings for concept_id={concept['concept_id']} "
f"from endpoint={mapping_endpoint}: {e}"
)
else:
logging.exception(
f"Failed to fetch concept mappings for concept_id={concept['concept_id']} "
f"from endpoint={mapping_endpoint}: {e}"
)
except (KeyError, ValueError) as e:
# Log data parsing errors with context
if hasattr(self, 'logger'):
self.logger.exception(
f"Failed to parse concept mapping response for concept_id={concept['concept_id']}: {e}"
)
else:
logging.exception(
f"Failed to parse concept mapping response for concept_id={concept['concept_id']}: {e}"
)
return CodeableConcept(
text=display_text,
codings=codings
)
else:
# Return basic CodeableConcept even if not validated
return CodeableConcept(
text=display or code,
codings=[{
'system': system,
'code': code,
'display': display or code
}]
)
def lookup_display(self, code: str, system: str) -> Optional[str]:
"""
Look up the display text for a code.
FHIR $lookup operation.
"""
validation = self.validate_code(code, system)
if validation['valid']:
return validation['concept']['concept_name']
return None
def subsumes(self, code_a: str, code_b: str, system: str) -> Optional[str]:
"""
Test the subsumption relationship between two codes.
Returns: 'subsumes', 'subsumed-by', 'equivalent', or None
"""
try:
vocabulary_id = self._get_vocabulary_id(system)
# Get both concepts
response_a = requests.get(
f"{self.base_url}/v1/concepts/search",
headers=self.headers,
params={'query': code_a, 'vocabulary_ids': vocabulary_id, 'page_size': 1}
)
response_b = requests.get(
f"{self.base_url}/v1/concepts/search",
headers=self.headers,
params={'query': code_b, 'vocabulary_ids': vocabulary_id, 'page_size': 1}
)
response_a.raise_for_status()
response_b.raise_for_status()
data_a = response_a.json()
data_b = response_b.json()
if not (data_a['data'] and data_b['data']):
return None
concept_a = data_a['data'][0]
concept_b = data_b['data'][0]
if concept_a['concept_id'] == concept_b['concept_id']:
return 'equivalent'
# Check if A subsumes B (B is descendant of A)
descendants_a = self._get_concept_descendants(
code_a,
vocabulary_id,
include_self=False
)
descendant_ids_a = [d['concept_id'] for d in descendants_a]
if concept_b['concept_id'] in descendant_ids_a:
return 'subsumes'
# Check if B subsumes A (A is descendant of B)
descendants_b = self._get_concept_descendants(
code_b,
vocabulary_id,
include_self=False
)
descendant_ids_b = [d['concept_id'] for d in descendants_b]
if concept_a['concept_id'] in descendant_ids_b:
return 'subsumed-by'
return None
except requests.RequestException as e:
self.logger.error(f"Error checking subsumption: {e}")
return None
def _get_vocabulary_id(self, system_uri: str) -> str:
"""Map FHIR system URI to OMOPHub vocabulary ID."""
# Reverse lookup from system URI
for vocab, uri in self.system_uris.items():
if uri == system_uri:
return vocab
# Check if it's already a vocabulary ID
if system_uri in self.vocabulary_mapping:
return system_uri
raise ValueError(f"Unknown system URI: {system_uri}")
def _get_fhir_system(self, vocabulary_id: str) -> Optional[str]:
"""Map OMOPHub vocabulary ID to FHIR system URI."""
return self.system_uris.get(vocabulary_id)
def _get_fhir_equivalence(self, relationship_id: str) -> str:
"""Map relationship to FHIR ConceptMap equivalence."""
equivalence_map = {
'Maps to': 'equivalent',
'Is a': 'subsumes',
'Subsumes': 'subsumes',
'Has finding site': 'relatedto',
'Has associated morphology': 'relatedto',
'Has causative agent': 'relatedto'
}
return equivalence_map.get(relationship_id, 'relatedto')
def _get_concept_descendants(self, code: str, vocabulary_id: str, include_self: bool = True) -> List[Dict]:
"""Get all descendant concepts."""
try:
# First get the concept
search_response = requests.get(
f"{self.base_url}/v1/concepts/search",
headers=self.headers,
params={
'query': code,
'vocabulary_ids': vocabulary_id,
'page_size': 1
}
)
search_response.raise_for_status()
search_data = search_response.json()
if not search_data['data']:
return []
concept_id = search_data['data'][0]['concept_id']
# Get descendants
hierarchy_response = requests.get(
f"{self.base_url}/v1/concepts/{concept_id}/hierarchy",
headers=self.headers,
params={
'include_descendants': True,
'max_levels': 10
}
)
hierarchy_response.raise_for_status()
hierarchy_data = hierarchy_response.json()
descendants = []
for item in hierarchy_data.get('data', []):
if item.get('level', 0) > 0 or (include_self and item.get('level', 0) == 0):
descendants.append(item['concept'])
return descendants
except requests.RequestException as e:
self.logger.error(f"Error getting descendants: {e}")
return []
def _get_system_concepts(self, vocabulary_id: str, limit: int = 100) -> List[Dict]:
"""Get concepts from a vocabulary (limited for performance)."""
try:
response = requests.get(
f"{self.base_url}/v1/vocabularies/{vocabulary_id}/concepts",
headers=self.headers,
params={
'page_size': limit,
'standard_concept': 'S'
}
)
response.raise_for_status()
data = response.json()
return data.get('data', [])
except requests.RequestException as e:
self.logger.error(f"Error getting system concepts: {e}")
return []
# FHIR Resource Builders
class FHIRResourceBuilder:
"""Build FHIR resources with proper terminology coding."""
def __init__(self, terminology_service: OMOPHubFHIRTerminologyService):
self.terminology = terminology_service
def create_observation(self,
loinc_code: str,
value: Any,
patient_id: str,
effective_datetime: str,
status: str = 'final') -> Dict[str, Any]:
"""
Create a FHIR Observation resource with proper LOINC coding.
"""
# Validate and get display text
display = self.terminology.lookup_display(loinc_code, "http://loinc.org")
# Create CodeableConcept
code_concept = self.terminology.create_codeable_concept(
loinc_code,
"http://loinc.org",
display
)
observation = {
"resourceType": "Observation",
"status": status,
"code": {
"coding": code_concept.codings,
"text": code_concept.text
},
"subject": {
"reference": f"Patient/{patient_id}"
},
"effectiveDateTime": effective_datetime
}
# Add value based on type
if isinstance(value, (int, float)):
observation["valueQuantity"] = {
"value": value,
"unit": "mg/dL", # Should be determined based on LOINC code
"system": "http://unitsofmeasure.org",
"code": "mg/dL"
}
elif isinstance(value, str):
observation["valueString"] = value
elif isinstance(value, dict) and 'code' in value:
# Coded value
value_concept = self.terminology.create_codeable_concept(
value['code'],
value.get('system', 'http://snomed.info/sct')
)
observation["valueCodeableConcept"] = {
"coding": value_concept.codings,
"text": value_concept.text
}
return observation
def create_condition(self,
diagnosis_code: str,
code_system: str,
patient_id: str,
onset_date: str,
clinical_status: str = 'active',
verification_status: str = 'confirmed') -> Dict[str, Any]:
"""
Create a FHIR Condition resource with proper diagnosis coding.
"""
# Create primary coding
code_concept = self.terminology.create_codeable_concept(
diagnosis_code,
code_system
)
# Try to add SNOMED translation if not already SNOMED
if code_system != "http://snomed.info/sct":
translations = self.terminology.translate_concept(
diagnosis_code,
code_system,
"http://snomed.info/sct"
)
for trans in translations:
if trans.equivalence in ['equivalent', 'subsumes']:
code_concept.codings.append({
'system': trans.target_system,
'code': trans.target_code,
'display': self.terminology.lookup_display(
trans.target_code,
trans.target_system
)
})
break
condition = {
"resourceType": "Condition",
"clinicalStatus": {
"coding": [{
"system": "http://terminology.hl7.org/CodeSystem/condition-clinical",
"code": clinical_status
}]
},
"verificationStatus": {
"coding": [{
"system": "http://terminology.hl7.org/CodeSystem/condition-ver-status",
"code": verification_status
}]
},
"code": {
"coding": code_concept.codings,
"text": code_concept.text
},
"subject": {
"reference": f"Patient/{patient_id}"
},
"onsetDateTime": onset_date
}
return condition
def create_medication_request(self,
rxnorm_code: str,
patient_id: str,
prescriber_id: str,
dosage_instructions: str,
quantity: int,
refills: int = 0) -> Dict[str, Any]:
"""
Create a FHIR MedicationRequest with RxNorm coding.
"""
# Get medication details
med_concept = self.terminology.create_codeable_concept(
rxnorm_code,
"http://www.nlm.nih.gov/research/umls/rxnorm"
)
medication_request = {
"resourceType": "MedicationRequest",
"status": "active",
"intent": "order",
"medicationCodeableConcept": {
"coding": med_concept.codings,
"text": med_concept.text
},
"subject": {
"reference": f"Patient/{patient_id}"
},
"requester": {
"reference": f"Practitioner/{prescriber_id}"
},
"dosageInstruction": [{
"text": dosage_instructions
}],
"dispenseRequest": {
"quantity": {
"value": quantity,
"unit": "tablet"
},
"numberOfRepeatsAllowed": refills
}
}
return medication_request
def create_procedure(self,
procedure_code: str,
code_system: str,
patient_id: str,
performed_date: str,
performer_id: Optional[str] = None) -> Dict[str, Any]:
"""
Create a FHIR Procedure resource with proper coding.
"""
# Create procedure coding
proc_concept = self.terminology.create_codeable_concept(
procedure_code,
code_system
)
procedure = {
"resourceType": "Procedure",
"status": "completed",
"code": {
"coding": proc_concept.codings,
"text": proc_concept.text
},
"subject": {
"reference": f"Patient/{patient_id}"
},
"performedDateTime": performed_date
}
if performer_id:
procedure["performer"] = [{
"actor": {
"reference": f"Practitioner/{performer_id}"
}
}]
return procedure
def create_allergy_intolerance(self,
allergen_code: str,
patient_id: str,
reaction_code: Optional[str] = None,
severity: str = 'moderate') -> Dict[str, Any]:
"""
Create a FHIR AllergyIntolerance resource.
"""
# Create allergen coding (typically SNOMED)
allergen_concept = self.terminology.create_codeable_concept(
allergen_code,
"http://snomed.info/sct"
)
allergy = {
"resourceType": "AllergyIntolerance",
"clinicalStatus": {
"coding": [{
"system": "http://terminology.hl7.org/CodeSystem/allergyintolerance-clinical",
"code": "active"
}]
},
"verificationStatus": {
"coding": [{
"system": "http://terminology.hl7.org/CodeSystem/allergyintolerance-verification",
"code": "confirmed"
}]
},
"code": {
"coding": allergen_concept.codings,
"text": allergen_concept.text
},
"patient": {
"reference": f"Patient/{patient_id}"
},
"criticality": "low" if severity == 'mild' else "high" if severity == 'severe' else "unable-to-assess"
}
if reaction_code:
reaction_concept = self.terminology.create_codeable_concept(
reaction_code,
"http://snomed.info/sct"
)
allergy["reaction"] = [{
"manifestation": [{
"coding": reaction_concept.codings,
"text": reaction_concept.text
}],
"severity": severity
}]
return allergy
# ValueSet Management
class ValueSetManager:
"""Manage FHIR ValueSets using OMOPHub terminology."""
def __init__(self, terminology_service: OMOPHubFHIRTerminologyService):
self.terminology = terminology_service
def create_valueset_for_condition_category(self, category: str) -> Dict[str, Any]:
"""
Create a ValueSet for a specific condition category.
Example: Diabetes, Hypertension, etc.
"""
# Define common condition categories with SNOMED codes
category_roots = {
"diabetes": "73211009", # Diabetes mellitus
"hypertension": "38341003", # Hypertensive disorder
"asthma": "195967001", # Asthma
"cardiovascular": "49601007", # Disorder of cardiovascular system
"infectious": "40733004" # Infectious disease
}
root_code = category_roots.get(category.lower())
if not root_code:
raise ValueError(f"Unknown condition category: {category}")
valueset = {
"resourceType": "ValueSet",
"id": f"condition-{category.lower()}",
"url": f"http://example.org/fhir/ValueSet/condition-{category.lower()}",
"name": f"{category.title()}Conditions",
"title": f"{category.title()} Conditions ValueSet",
"status": "active",
"compose": {
"include": [{
"system": "http://snomed.info/sct",
"filter": [{
"property": "concept",
"op": "is-a",
"value": root_code
}]
}]
}
}
return valueset
def create_lab_test_valueset(self, panel_name: str) -> Dict[str, Any]:
"""
Create a ValueSet for laboratory test panels.
"""
# Define common lab panels with LOINC codes
lab_panels = {
"basic_metabolic": [
"2345-7", # Glucose
"2160-0", # Creatinine
"3094-0", # BUN
"2951-2", # Sodium
"2823-3", # Potassium
"2075-0", # Chloride
"2028-9" # CO2
],
"lipid": [
"2093-3", # Total cholesterol
"2085-9", # HDL cholesterol
"2089-1", # LDL cholesterol
"2571-8" # Triglycerides
],
"liver": [
"1742-6", # ALT
"1920-8", # AST
"1975-2", # Bilirubin total
"1968-7", # Bilirubin direct
"2885-2", # Protein total
"1751-7" # Albumin
]
}
codes = lab_panels.get(panel_name.lower())
if not codes:
raise ValueError(f"Unknown lab panel: {panel_name}")
valueset = {
"resourceType": "ValueSet",
"id": f"lab-{panel_name.lower().replace('_', '-')}",
"url": f"http://example.org/fhir/ValueSet/lab-{panel_name.lower().replace('_', '-')}",
"name": f"{panel_name.replace('_', ' ').title()}Panel",
"title": f"{panel_name.replace('_', ' ').title()} Laboratory Panel",
"status": "active",
"compose": {
"include": [{
"system": "http://loinc.org",
"concept": [{"code": code} for code in codes]
}]
}
}
return valueset
def expand_and_validate(self, valueset: Dict[str, Any]) -> ValueSetExpansion:
"""
Expand a ValueSet and validate all included codes.
"""
expansion = self.terminology.expand_valueset(valueset)
# Validate each concept
validated_concepts = []
for concept in expansion.concepts:
validation = self.terminology.validate_code(
concept['code'],
concept['system']
)
if validation['valid']:
validated_concepts.append(concept)
else:
self.terminology.logger.warning(
f"Invalid concept in ValueSet: {concept['code']} in {concept['system']}"
)
expansion.concepts = validated_concepts
expansion.total = len(validated_concepts)
return expansion
# ConceptMap Translation Service
class ConceptMapService:
"""Manage FHIR ConceptMaps for terminology translation."""
def __init__(self, terminology_service: OMOPHubFHIRTerminologyService):
self.terminology = terminology_service
def create_concept_map(self,
source_system: str,
target_system: str,
codes: List[str]) -> Dict[str, Any]:
"""
Create a ConceptMap for translating between code systems.
"""
concept_map = {
"resourceType": "ConceptMap",
"id": f"map-{source_system.split('/')[-1]}-to-{target_system.split('/')[-1]}",
"url": f"http://example.org/fhir/ConceptMap/{source_system.split('/')[-1]}-to-{target_system.split('/')[-1]}",
"name": f"{source_system.split('/')[-1]}To{target_system.split('/')[-1]}Map",
"status": "active",
"sourceUri": source_system,
"targetUri": target_system,
"group": [{
"source": source_system,
"target": target_system,
"element": []
}]
}
# Add mappings for each code
for code in codes:
translations = self.terminology.translate_concept(code, source_system, target_system)
if translations:
element = {
"code": code,
"display": self.terminology.lookup_display(code, source_system),
"target": []
}
for trans in translations:
element["target"].append({
"code": trans.target_code,
"display": self.terminology.lookup_display(
trans.target_code,
trans.target_system
),
"equivalence": trans.equivalence
})
concept_map["group"][0]["element"].append(element)
return concept_map
def translate_bulk(self,
codes: List[str],
source_system: str,
target_system: str) -> List[Dict[str, Any]]:
"""
Bulk translate multiple codes between systems.
"""
translations = []
for code in codes:
trans_list = self.terminology.translate_concept(code, source_system, target_system)
if trans_list:
# Use the highest confidence translation
best_trans = max(trans_list, key=lambda x: x.confidence)
translations.append({
'source_code': code,
'source_display': self.terminology.lookup_display(code, source_system),
'target_code': best_trans.target_code,
'target_display': self.terminology.lookup_display(
best_trans.target_code,
best_trans.target_system
),
'equivalence': best_trans.equivalence,
'confidence': best_trans.confidence
})
else:
translations.append({
'source_code': code,
'source_display': self.terminology.lookup_display(code, source_system),
'target_code': None,
'target_display': None,
'equivalence': 'unmapped',
'confidence': 0.0
})
return translations
# Example usage
def main():
# Initialize services
terminology_service = OMOPHubFHIRTerminologyService("your-api-key")
resource_builder = FHIRResourceBuilder(terminology_service)
valueset_manager = ValueSetManager(terminology_service)
conceptmap_service = ConceptMapService(terminology_service)
# Example 1: Create and validate a FHIR Observation
observation = resource_builder.create_observation(
loinc_code="2345-7", # Glucose
value=110,
patient_id="patient123",
effective_datetime="2024-03-15T10:30:00Z"
)
print("Created Observation:", json.dumps(observation, indent=2))
# Example 2: Create a Condition with ICD-10 to SNOMED translation
condition = resource_builder.create_condition(
diagnosis_code="E11.9", # Type 2 diabetes
code_system="http://hl7.org/fhir/sid/icd-10-cm",
patient_id="patient123",
onset_date="2024-01-15"
)
print("Created Condition:", json.dumps(condition, indent=2))
# Example 3: Create and expand a ValueSet
diabetes_valueset = valueset_manager.create_valueset_for_condition_category("diabetes")
expanded = valueset_manager.expand_and_validate(diabetes_valueset)
print(f"Expanded ValueSet contains {expanded.total} concepts")
# Example 4: Create a ConceptMap for ICD-10 to SNOMED
icd_codes = ["E11.9", "I10", "J45.909"]
concept_map = conceptmap_service.create_concept_map(
"http://hl7.org/fhir/sid/icd-10-cm",
"http://snomed.info/sct",
icd_codes
)
print("Created ConceptMap:", json.dumps(concept_map, indent=2))
# Example 5: Bulk translate codes
translations = conceptmap_service.translate_bulk(
icd_codes,
"http://hl7.org/fhir/sid/icd-10-cm",
"http://snomed.info/sct"
)
for trans in translations:
print(f"{trans['source_code']} -> {trans['target_code']} ({trans['equivalence']})")
if __name__ == "__main__":
main()
Implementation Examples
Example 1: SMART on FHIR Application
Copy
import httpx
from typing import Dict
class SmartOnFhirApp:
"""
SMART on FHIR application using OMOPHub for terminology.
"""
def __init__(self, omophub_api_key: str, fhir_server_url: str):
self.terminology = OMOPHubFHIRTerminologyService(omophub_api_key)
self.resource_builder = FHIRResourceBuilder(self.terminology)
self.fhir_server = fhir_server_url
async def process_patient_encounter(self, encounter_data: Dict) -> Dict:
"""
Process a patient encounter and create FHIR resources.
"""
patient_id = encounter_data['patient_id']
resources_created = []
# Process diagnoses
for diagnosis in encounter_data.get('diagnoses', []):
condition = self.resource_builder.create_condition(
diagnosis_code=diagnosis['code'],
code_system=diagnosis['system'],
patient_id=patient_id,
onset_date=diagnosis['date']
)
# Post to FHIR server
response = await self.post_to_fhir_server(condition)
resources_created.append({
'type': 'Condition',
'id': response.get('id'),
'code': diagnosis['code']
})
# Process lab results
for lab in encounter_data.get('labs', []):
observation = self.resource_builder.create_observation(
loinc_code=lab['loinc_code'],
value=lab['value'],
patient_id=patient_id,
effective_datetime=lab['datetime']
)
response = await self.post_to_fhir_server(observation)
resources_created.append({
'type': 'Observation',
'id': response.get('id'),
'code': lab['loinc_code']
})
# Process medications
for med in encounter_data.get('medications', []):
med_request = self.resource_builder.create_medication_request(
rxnorm_code=med['rxnorm_code'],
patient_id=patient_id,
prescriber_id=med['prescriber_id'],
dosage_instructions=med['instructions'],
quantity=med['quantity']
)
response = await self.post_to_fhir_server(med_request)
resources_created.append({
'type': 'MedicationRequest',
'id': response.get('id'),
'code': med['rxnorm_code']
})
return {
'encounter_id': encounter_data['encounter_id'],
'resources_created': resources_created,
'status': 'completed'
}
async def post_to_fhir_server(self, resource: Dict) -> Dict:
"""Post a FHIR resource to the server."""
headers = {
'Content-Type': 'application/fhir+json',
'Accept': 'application/fhir+json'
}
timeout = httpx.Timeout(connect=10.0, read=30.0, write=10.0, pool=5.0)
async with httpx.AsyncClient(timeout=timeout) as client:
response = await client.post(
f"{self.fhir_server}/{resource['resourceType']}",
json=resource,
headers=headers
)
if response.status_code in [200, 201]:
return response.json()
else:
raise Exception(f"Failed to post resource: HTTP {response.status_code} - {response.text}")
Example 2: Bulk FHIR Export with Terminology Validation
Copy
class FHIRBulkExporter:
"""
Export clinical data in FHIR Bulk Data format with terminology validation.
"""
def __init__(self, omophub_api_key: str):
self.terminology = OMOPHubFHIRTerminologyService(omophub_api_key)
self.resource_builder = FHIRResourceBuilder(self.terminology)
def export_patient_data(self, patients_data: List[Dict], output_dir: str):
"""
Export patient data as NDJSON files per FHIR Bulk Data spec.
"""
import os
import ndjson
# Prepare output files
os.makedirs(output_dir, exist_ok=True)
conditions_file = open(os.path.join(output_dir, 'Condition.ndjson'), 'w')
observations_file = open(os.path.join(output_dir, 'Observation.ndjson'), 'w')
medications_file = open(os.path.join(output_dir, 'MedicationRequest.ndjson'), 'w')
export_summary = {
'patients_processed': 0,
'conditions_exported': 0,
'observations_exported': 0,
'medications_exported': 0,
'validation_errors': []
}
for patient_data in patients_data:
patient_id = patient_data['patient_id']
# Export conditions with validation
for condition_data in patient_data.get('conditions', []):
try:
# Validate code first
validation = self.terminology.validate_code(
condition_data['code'],
condition_data['system']
)
if validation['valid']:
condition = self.resource_builder.create_condition(
diagnosis_code=condition_data['code'],
code_system=condition_data['system'],
patient_id=patient_id,
onset_date=condition_data['onset_date']
)
conditions_file.write(json.dumps(condition) + '\n')
export_summary['conditions_exported'] += 1
else:
export_summary['validation_errors'].append({
'patient_id': patient_id,
'resource_type': 'Condition',
'code': condition_data['code'],
'error': validation['message']
})
except Exception as e:
export_summary['validation_errors'].append({
'patient_id': patient_id,
'error': str(e)
})
# Export observations with validation
for obs_data in patient_data.get('observations', []):
try:
validation = self.terminology.validate_code(
obs_data['loinc_code'],
'http://loinc.org'
)
if validation['valid']:
observation = self.resource_builder.create_observation(
loinc_code=obs_data['loinc_code'],
value=obs_data['value'],
patient_id=patient_id,
effective_datetime=obs_data['datetime']
)
observations_file.write(json.dumps(observation) + '\n')
export_summary['observations_exported'] += 1
except Exception as e:
export_summary['validation_errors'].append({
'patient_id': patient_id,
'error': str(e)
})
export_summary['patients_processed'] += 1
# Close files
conditions_file.close()
observations_file.close()
medications_file.close()
# Write export manifest
manifest = {
"transactionTime": datetime.utcnow().isoformat(),
"request": "$export",
"requiresAccessToken": True,
"output": [
{
"type": "Condition",
"url": f"file://{output_dir}/Condition.ndjson",
"count": export_summary['conditions_exported']
},
{
"type": "Observation",
"url": f"file://{output_dir}/Observation.ndjson",
"count": export_summary['observations_exported']
},
{
"type": "MedicationRequest",
"url": f"file://{output_dir}/MedicationRequest.ndjson",
"count": export_summary['medications_exported']
}
],
"error": []
}
with open(os.path.join(output_dir, 'manifest.json'), 'w') as f:
json.dump(manifest, f, indent=2)
return export_summary
Example 3: FHIR Terminology Server Implementation
Copy
from flask import Flask, request, jsonify
class FHIRTerminologyServer:
"""
Implement FHIR terminology server operations using OMOPHub.
"""
def __init__(self, omophub_api_key: str):
self.app = Flask(__name__)
self.terminology = OMOPHubFHIRTerminologyService(omophub_api_key)
self.setup_routes()
def setup_routes(self):
"""Setup FHIR terminology operation endpoints."""
@self.app.route('/CodeSystem/$validate-code', methods=['POST'])
def validate_code():
"""FHIR $validate-code operation."""
params = request.get_json()
code = params.get('code')
system = params.get('system')
display = params.get('display')
if not code or not system:
return jsonify({
'resourceType': 'OperationOutcome',
'issue': [{
'severity': 'error',
'code': 'required',
'diagnostics': 'Code and system are required'
}]
}), 400
result = self.terminology.validate_code(code, system, display)
return jsonify({
'resourceType': 'Parameters',
'parameter': [
{
'name': 'result',
'valueBoolean': result['valid']
},
{
'name': 'message',
'valueString': result['message']
}
]
})
@self.app.route('/ValueSet/$expand', methods=['POST'])
def expand_valueset():
"""FHIR $expand operation."""
valueset = request.get_json()
try:
expansion = self.terminology.expand_valueset(valueset)
return jsonify({
'resourceType': 'ValueSet',
'id': expansion.id,
'url': expansion.url,
'name': expansion.name,
'status': 'active',
'expansion': {
'timestamp': expansion.timestamp,
'total': expansion.total,
'contains': expansion.concepts
}
})
except Exception as e:
return jsonify({
'resourceType': 'OperationOutcome',
'issue': [{
'severity': 'error',
'code': 'exception',
'diagnostics': str(e)
}]
}), 500
@self.app.route('/ConceptMap/$translate', methods=['POST'])
def translate():
"""FHIR $translate operation."""
params = request.get_json()
code = params.get('code')
source = params.get('source')
target = params.get('target')
translations = self.terminology.translate_concept(code, source, target)
matches = []
for trans in translations:
matches.append({
'equivalence': trans.equivalence,
'concept': {
'code': trans.target_code,
'system': trans.target_system
}
})
return jsonify({
'resourceType': 'Parameters',
'parameter': [
{
'name': 'result',
'valueBoolean': len(matches) > 0
},
{
'name': 'match',
'part': matches
}
]
})
@self.app.route('/CodeSystem/$subsumes', methods=['POST'])
def subsumes():
"""FHIR $subsumes operation."""
params = request.get_json()
code_a = params.get('codeA')
code_b = params.get('codeB')
system = params.get('system')
result = self.terminology.subsumes(code_a, code_b, system)
return jsonify({
'resourceType': 'Parameters',
'parameter': [{
'name': 'outcome',
'valueCode': result or 'not-subsumed'
}]
})
def run(self, host='0.0.0.0', port=5000):
"""Run the terminology server."""
self.app.run(host=host, port=port)
# Usage
if __name__ == '__main__':
server = FHIRTerminologyServer('your-api-key')
server.run()
Best Practices
1. Efficient ValueSet Management
Copy
class OptimizedValueSetManager:
"""Optimized ValueSet operations with caching."""
def __init__(self, terminology_service: OMOPHubFHIRTerminologyService):
self.terminology = terminology_service
self.cache = {}
self.cache_ttl = 3600 # 1 hour
def get_or_expand_valueset(self, valueset_url: str) -> ValueSetExpansion:
"""Get ValueSet expansion with caching."""
cache_key = f"valueset:{valueset_url}"
# Check cache
if cache_key in self.cache:
cached_data, timestamp = self.cache[cache_key]
if time.time() - timestamp < self.cache_ttl:
return cached_data
# Expand ValueSet
valueset = self.load_valueset_definition(valueset_url)
expansion = self.terminology.expand_valueset(valueset)
# Cache result
self.cache[cache_key] = (expansion, time.time())
return expansion
def validate_against_valueset(self, code: str, system: str, valueset_url: str) -> bool:
"""Validate if a code is in a ValueSet."""
expansion = self.get_or_expand_valueset(valueset_url)
for concept in expansion.concepts:
if concept['code'] == code and concept['system'] == system:
return True
return False
2. Batch Operations for Performance
Copy
class BatchFHIRProcessor:
"""Process FHIR resources in batches for performance."""
def __init__(self, terminology_service: OMOPHubFHIRTerminologyService):
self.terminology = terminology_service
self.batch_size = 100
async def validate_codes_batch(self, codes: List[Tuple[str, str]]) -> List[Dict]:
"""Validate multiple codes in parallel."""
import asyncio
tasks = []
for code, system in codes:
task = asyncio.create_task(
self.terminology.validate_code(code, system)
)
tasks.append(task)
results = await asyncio.gather(*tasks)
return [
{
'code': codes[i][0],
'system': codes[i][1],
'valid': results[i]['valid'],
'message': results[i].get('message', '')
}
for i in range(len(codes))
]
def translate_batch(self,
translations: List[Tuple[str, str, str]]) -> List[ConceptMapTranslation]:
"""Batch translate concepts between systems."""
all_translations = []
for batch_start in range(0, len(translations), self.batch_size):
batch = translations[batch_start:batch_start + self.batch_size]
for code, source, target in batch:
trans = self.terminology.translate_concept(code, source, target)
all_translations.extend(trans)
return all_translations
3. Error Handling and Resilience
Copy
class ResilientFHIRService:
"""FHIR service with error handling and fallbacks."""
def __init__(self, terminology_service: OMOPHubFHIRTerminologyService):
self.terminology = terminology_service
self.fallback_mappings = self.load_fallback_mappings()
def safe_validate_code(self, code: str, system: str) -> Dict:
"""Validate code with fallback to local mappings."""
try:
return self.terminology.validate_code(code, system)
except Exception as e:
# Fallback to local mappings
if system in self.fallback_mappings:
if code in self.fallback_mappings[system]:
return {
'valid': True,
'concept': self.fallback_mappings[system][code],
'message': 'Validated from local cache'
}
return {
'valid': False,
'message': f'Validation failed: {str(e)}'
}
def load_fallback_mappings(self) -> Dict:
"""Load common codes for offline validation."""
return {
'http://loinc.org': {
'2345-7': {'concept_name': 'Glucose'},
'718-7': {'concept_name': 'Hemoglobin'},
# ... more common codes
},
'http://snomed.info/sct': {
'38341003': {'concept_name': 'Hypertensive disorder'},
'73211009': {'concept_name': 'Diabetes mellitus'},
# ... more common codes
}
}
Support and Resources
For FHIR integration support and questions:- Technical Support: https://omophub.com/contact
- FHIR Documentation: https://www.hl7.org/fhir/
- API Documentation: https://docs.omophub.com