Overview
Clinical coding is the process of transforming healthcare diagnoses, procedures, medical services, and equipment into universal medical alphanumeric codes. This guide demonstrates how to use the OMOPHub API to automate clinical coding workflows, improve accuracy, and ensure compliance with healthcare standards.Use Case: Automatically convert clinical notes and diagnoses into standardized codes (ICD-10, HCPCS, SNOMED CT) for billing and reporting.
Business Problem
Healthcare organizations face several challenges with manual clinical coding:- Human Error: Manual coding leads to 15-20% error rates
- Inconsistency: Different coders may assign different codes for the same condition
- Compliance Risk: Incorrect coding can result in audit failures and penalties
- Efficiency: Manual coding is time-consuming and expensive
- Revenue Loss: Undercoding leads to lost revenue, overcoding leads to compliance issues
Solution Architecture
Implementation Guide
Step 1: Set Up OMOPHub Client
Production Warning: The
extract_medical_terms
method shown below is for demonstration purposes only and uses simple keyword matching. For production healthcare applications, use proper NLP libraries such as:- spaCy with medical models (scispaCy, en_core_sci_md)
- NLTK with medical corpora
- Specialized medical NLP: Amazon Comprehend Medical, Google Healthcare NLP API, or clinical BERT models
- Open-source medical NLP: Apache cTAKES, MetaMap, or MedSpacy
Copy
from omophub import OMOPHubClient
import re
import traceback
from datetime import datetime
from typing import List, Dict, Any
class ClinicalCodingService:
def __init__(self, api_key: str):
self.client = OMOPHubClient(api_key=api_key)
def extract_medical_terms(self, clinical_text: str) -> List[str]:
"""Extract potential medical terms from clinical text"""
# Simple term extraction - in production, use NLP libraries
medical_keywords = [
'diabetes', 'hypertension', 'pneumonia', 'fracture',
'myocardial infarction', 'stroke', 'cancer', 'infection'
]
found_terms = []
text_lower = clinical_text.lower()
for keyword in medical_keywords:
if keyword in text_lower:
found_terms.append(keyword)
return found_terms
def get_clinical_text(self, encounter_id: str) -> str:
"""Retrieve clinical text for a given encounter ID
Args:
encounter_id: Unique identifier for the patient encounter
Returns:
Clinical text/notes for the encounter
Note:
This is a placeholder method. In production, integrate with your
EHR system, FHIR server, or clinical data warehouse.
"""
# Placeholder implementation - replace with actual data source
# Examples of integration:
# - FHIR API: GET /Encounter/{encounter_id}/notes
# - EHR database query
# - Document management system API
# For demonstration purposes, return sample clinical text
sample_texts = {
"enc_001": "Patient presents with type 2 diabetes mellitus. Blood glucose elevated at 180 mg/dL. Prescribed metformin 500mg twice daily.",
"enc_002": "Chief complaint: chest pain. Patient has history of hypertension and smoking. EKG shows no acute changes. Troponin negative.",
"enc_003": "Follow-up visit for pneumonia treatment. Patient reports improvement in cough and fever. Chest X-ray shows resolving infiltrates.",
}
# Return sample text or raise error for unknown encounters
if encounter_id in sample_texts:
return sample_texts[encounter_id]
else:
# In production, this would query your actual data source
raise NotImplementedError(
f"Clinical text retrieval not implemented for encounter {encounter_id}. "
"Please integrate with your EHR system, FHIR server, or clinical database."
)
Step 2: Search and Map Medical Concepts
Copy
def search_medical_concepts(self, terms: List[str]) -> Dict[str, Any]:
"""Search for medical concepts and map to multiple vocabularies"""
results = {}
for term in terms:
try:
# Search for concepts across vocabularies
search_results = self.client.advanced_search_concepts({
"query": term,
"vocabularies": ["SNOMED", "ICD10CM", "HCPCS"],
"domains": ["Condition", "Procedure", "Drug"],
"standard_concepts_only": True,
"limit": 10
})
# Get the best match
if search_results["concepts"]:
best_match = search_results["concepts"][0]
# Get mappings to other vocabularies
mappings = self.client.get_concept_mappings(
best_match["concept_id"],
target_vocabularies=["ICD10CM", "HCPCS", "HCPCS"]
)
results[term] = {
"primary_concept": best_match,
"mappings": mappings,
"confidence": best_match.get("relevance_score", 0)
}
except Exception as e:
print(f"Error processing term '{term}': {e}")
# Collect detailed error information with consistent structure
results[term] = {
"error": str(e),
"primary_concept": None,
"mappings": None,
"confidence": 0,
"error_details": {
"timestamp": datetime.now().isoformat(),
"term": term,
"error_type": type(e).__name__,
"traceback": traceback.format_exc()
}
}
return results
def get_billing_codes(self, concept_mappings: Dict[str, Any]) -> Dict[str, List[str]]:
"""Extract billing codes from concept mappings"""
billing_codes = {
"icd10": [],
"cpt": [],
"hcpcs": []
}
for term, data in concept_mappings.items():
if "mappings" in data:
for mapping in data["mappings"]:
vocab = mapping["vocabulary_id"].lower()
code = mapping["concept_code"]
if "icd10" in vocab:
billing_codes["icd10"].append({
"code": code,
"description": mapping["concept_name"],
"term": term
})
elif "cpt" in vocab:
billing_codes["cpt"].append({
"code": code,
"description": mapping["concept_name"],
"term": term
})
elif "hcpcs" in vocab:
billing_codes["hcpcs"].append({
"code": code,
"description": mapping["concept_name"],
"term": term
})
return billing_codes
Step 3: Validate and Quality Check
Copy
def validate_coding_quality(self, billing_codes: Dict[str, List[str]]) -> Dict[str, Any]:
"""Validate coding quality and completeness"""
validation_results = {
"completeness": {},
"conflicts": [],
"recommendations": []
}
# Check completeness
total_terms = len(billing_codes.get("icd10", [])) + len(billing_codes.get("cpt", []))
validation_results["completeness"]["total_codes"] = total_terms
validation_results["completeness"]["has_diagnosis"] = len(billing_codes.get("icd10", [])) > 0
validation_results["completeness"]["has_procedure"] = len(billing_codes.get("cpt", [])) > 0
# Check for conflicts (same term mapped to multiple codes)
seen_terms = {}
for code_type, codes in billing_codes.items():
for code_data in codes:
term = code_data["term"]
if term in seen_terms:
if seen_terms[term] != code_type:
validation_results["conflicts"].append({
"term": term,
"vocabularies": [seen_terms[term], code_type],
"message": f"Term '{term}' mapped to multiple vocabularies"
})
seen_terms[term] = code_type
# Generate recommendations
if not validation_results["completeness"]["has_diagnosis"]:
validation_results["recommendations"].append({
"type": "missing_diagnosis",
"message": "No diagnosis codes found. Consider adding ICD-10 codes."
})
if not validation_results["completeness"]["has_procedure"]:
validation_results["recommendations"].append({
"type": "missing_procedure",
"message": "No procedure codes found. Consider adding HCPCS codes."
})
return validation_results
def generate_coding_report(self, clinical_text: str) -> Dict[str, Any]:
"""Generate complete coding report for clinical text"""
# Extract terms
terms = self.extract_medical_terms(clinical_text)
# Search and map concepts
concept_mappings = self.search_medical_concepts(terms)
# Extract billing codes
billing_codes = self.get_billing_codes(concept_mappings)
# Validate quality
validation = self.validate_coding_quality(billing_codes)
return {
"input_text": clinical_text,
"extracted_terms": terms,
"concept_mappings": concept_mappings,
"billing_codes": billing_codes,
"validation": validation,
"timestamp": datetime.now().isoformat()
}
Example Implementation
Sample Clinical Note
Copy
Patient presents with chest pain and shortness of breath.
ECG shows ST elevation. Diagnosed with acute myocardial infarction.
Patient underwent percutaneous coronary intervention (PCI).
Also has history of type 2 diabetes mellitus and hypertension.
Generated Coding Report
Copy
# Initialize the service
coding_service = ClinicalCodingService("your_api_key")
# Generate coding report
clinical_note = """
Patient presents with chest pain and shortness of breath.
ECG shows ST elevation. Diagnosed with acute myocardial infarction.
Patient underwent percutaneous coronary intervention (PCI).
Also has history of type 2 diabetes mellitus and hypertension.
"""
report = coding_service.generate_coding_report(clinical_note)
# Print results
print("=== CLINICAL CODING REPORT ===")
print(f"Extracted Terms: {report['extracted_terms']}")
print(f"Total Billing Codes: {report['validation']['completeness']['total_codes']}")
print("\n=== ICD-10 DIAGNOSIS CODES ===")
for code in report['billing_codes']['icd10']:
print(f" {code['code']}: {code['description']}")
print("\n=== HCPCS PROCEDURE CODES ===")
for code in report['billing_codes']['cpt']:
print(f" {code['code']}: {code['description']}")
print("\n=== VALIDATION RESULTS ===")
for rec in report['validation']['recommendations']:
print(f" ⚠️ {rec['message']}")
Expected Output
Copy
=== CLINICAL CODING REPORT ===
Extracted Terms: myocardial infarction, diabetes, hypertension
Total Billing Codes: 5
=== ICD-10 DIAGNOSIS CODES ===
I21.9: Acute myocardial infarction, unspecified
E11.9: Type 2 diabetes mellitus without complications
I10: Essential hypertension
=== HCPCS PROCEDURE CODES ===
92928: Percutaneous transcatheter placement of intracoronary stent(s)
93458: Catheter placement in coronary artery(s) for coronary angiography
=== VALIDATION RESULTS ===
✅ Complete diagnosis coding found
✅ Procedure codes identified
⚠️ Consider reviewing for additional comorbidities
Integration Patterns
1. EHR Integration
Copy
class EHRCodingIntegration:
def __init__(self, omophub_client, ehr_client):
self.omophub = omophub_client
self.ehr = ehr_client
def process_encounter(self, encounter_id: str):
# Get clinical notes from EHR
encounter = self.ehr.get_encounter(encounter_id)
clinical_text = encounter.get('notes', '')
# Generate codes using OMOPHub
coding_service = ClinicalCodingService(self.omophub.config.api_key)
report = coding_service.generate_coding_report(clinical_text)
# Update EHR with suggested codes
suggested_codes = []
for code in report['billing_codes']['icd10']:
suggested_codes.append({
'type': 'diagnosis',
'code': code['code'],
'description': code['description']
})
for code in report['billing_codes']['cpt']:
suggested_codes.append({
'type': 'procedure',
'code': code['code'],
'description': code['description']
})
# Send back to EHR for coder review
self.ehr.update_encounter_codes(encounter_id, suggested_codes)
return report
2. Real-time Coding Assistant
Copy
class RealTimeCodingAssistant {
constructor(omophubClient) {
this.client = omophubClient;
this.debounceTimer = null;
}
// Real-time suggestions as user types
async getSuggestions(clinicalText, callback) {
// Debounce to avoid too many API calls
clearTimeout(this.debounceTimer);
this.debounceTimer = setTimeout(async () => {
if (clinicalText.length < 10) return;
try {
const suggestions = await this.client.getSearchAutocomplete({
query: clinicalText,
vocabularies: ['SNOMED', 'ICD10CM'],
limit: 5
});
callback(suggestions.data);
} catch (error) {
console.error('Error getting suggestions:', error);
}
}, 500); // Wait 500ms after user stops typing
}
// Validate codes before submission
async validateCodes(codes) {
const validationResults = [];
for (const code of codes) {
try {
const concept = await this.client.getConceptByCode(
code.vocabulary_id,
code.code
);
validationResults.push({
code: code.code,
valid: concept.data.success,
current: concept.data.validEndDate > new Date().toISOString(),
concept: concept.data
});
} catch (error) {
validationResults.push({
code: code.code,
valid: false,
error: error.message
});
}
}
return validationResults;
}
}
Performance Optimization
Caching Strategy
Copy
import redis
import json
from typing import Optional
class CachedCodingService(ClinicalCodingService):
def __init__(self, api_key: str, redis_url: str = "redis://localhost:6379"):
super().__init__(api_key)
self.redis_client = redis.from_url(redis_url)
self.cache_ttl = 3600 # 1 hour
def _cache_key(self, text: str) -> str:
"""Generate cache key from clinical text"""
import hashlib
return f"coding:{hashlib.md5(text.encode()).hexdigest()}"
def generate_coding_report(self, clinical_text: str) -> Dict[str, Any]:
# Check cache first
cache_key = self._cache_key(clinical_text)
cached_result = self.redis_client.get(cache_key)
if cached_result:
return json.loads(cached_result)
# Generate new report
report = super().generate_coding_report(clinical_text)
# Cache the result
self.redis_client.setex(
cache_key,
self.cache_ttl,
json.dumps(report, default=str)
)
return report
Batch Processing
Copy
def process_batch_encounters(self, encounter_ids: List[str]) -> List[Dict[str, Any]]:
"""Process multiple encounters efficiently"""
# Batch extract all terms first
all_terms = []
encounter_terms = {}
for encounter_id in encounter_ids:
clinical_text = self.get_clinical_text(encounter_id)
terms = self.extract_medical_terms(clinical_text)
all_terms.extend(terms)
encounter_terms[encounter_id] = terms
# Deduplicate terms
unique_terms = list(set(all_terms))
# Batch search all unique terms
bulk_searches = [
{"search_id": term, "query": term}
for term in unique_terms
]
bulk_results = self.client.bulk_concept_search({
"searches": bulk_searches,
"version": "latest"
})
# Process results for each encounter
reports = []
for encounter_id in encounter_ids:
terms = encounter_terms[encounter_id]
# Map bulk results back to encounter terms
encounter_mappings = {
term: bulk_results["results"].get(term, {})
for term in terms
}
# Generate report using cached concept data
report = self._generate_report_from_cache(
encounter_id,
encounter_mappings
)
reports.append(report)
return reports
Quality Assurance
Code Validation Rules
Copy
class CodingValidator:
def __init__(self, omophub_client):
self.client = omophub_client
def validate_code_combination(self, diagnosis_codes: List[str],
procedure_codes: List[str]) -> Dict[str, Any]:
"""Validate that procedure codes are appropriate for diagnoses"""
validation_results = {
"valid_combinations": [],
"invalid_combinations": [],
"warnings": []
}
for dx_code in diagnosis_codes:
for proc_code in procedure_codes:
# Check if procedure is clinically appropriate for diagnosis
relationship = self.check_clinical_relationship(dx_code, proc_code)
if relationship["appropriate"]:
validation_results["valid_combinations"].append({
"diagnosis": dx_code,
"procedure": proc_code,
"confidence": relationship["confidence"]
})
else:
validation_results["invalid_combinations"].append({
"diagnosis": dx_code,
"procedure": proc_code,
"reason": relationship["reason"]
})
return validation_results
def check_clinical_relationship(self, dx_code: str, proc_code: str) -> Dict[str, Any]:
"""Check if procedure is clinically appropriate for diagnosis"""
try:
# Get diagnosis concept
dx_concept = self.client.get_concept_by_code("ICD10CM", dx_code)
# Get procedure concept
proc_concept = self.client.get_concept_by_code("HCPCS", proc_code)
# Check relationships between concepts
relationships = self.client.get_concept_relationships(
dx_concept["concept_id"],
relationship_types=["Has procedure", "Treatment of"]
)
# Look for connection to procedure concept
for rel in relationships:
if rel["target_concept_id"] == proc_concept["concept_id"]:
return {
"appropriate": True,
"confidence": 0.9,
"relationship": rel["relationship_type"]
}
# No direct relationship found
return {
"appropriate": False,
"confidence": 0.1,
"reason": "No clinical relationship found between diagnosis and procedure"
}
except Exception as e:
return {
"appropriate": False,
"confidence": 0.0,
"reason": f"Error validating relationship: {str(e)}"
}
Metrics and Monitoring
Coding Accuracy Metrics
Copy
class CodingMetrics:
def __init__(self, omophub_client):
self.client = omophub_client
def calculate_coding_accuracy(self, automated_codes: List[str],
manual_codes: List[str]) -> Dict[str, float]:
"""Calculate accuracy metrics comparing automated vs manual coding"""
# Convert to sets for easier comparison
auto_set = set(automated_codes)
manual_set = set(manual_codes)
# Calculate precision, recall, F1
true_positives = len(auto_set.intersection(manual_set))
false_positives = len(auto_set - manual_set)
false_negatives = len(manual_set - auto_set)
precision = true_positives / (true_positives + false_positives) if (true_positives + false_positives) > 0 else 0
recall = true_positives / (true_positives + false_negatives) if (true_positives + false_negatives) > 0 else 0
f1_score = 2 * (precision * recall) / (precision + recall) if (precision + recall) > 0 else 0
return {
"precision": precision,
"recall": recall,
"f1_score": f1_score,
"accuracy": true_positives / len(manual_set) if len(manual_set) > 0 else 0
}
def track_coding_trends(self, time_period: str = "30d") -> Dict[str, Any]:
"""Track coding trends and patterns"""
# This would integrate with your analytics system
# to track metrics over time
pass
Best Practices
1. Error Handling
Copy
def robust_coding_search(self, term: str, max_retries: int = 3) -> Optional[Dict]:
"""Robust search with retry logic and fallback strategies"""
for attempt in range(max_retries):
try:
# Try exact search first
results = self.client.search_concepts({
"query": term,
"vocabularies": ["SNOMED"],
"limit": 1
})
if results["concepts"]:
return results["concepts"][0]
# Fallback to fuzzy search
fuzzy_results = self.client.advanced_search_concepts({
"query": term,
"vocabularies": ["SNOMED", "ICD10CM"],
"include_invalid": False,
"limit": 5
})
if fuzzy_results["concepts"]:
return fuzzy_results["concepts"][0]
except Exception as e:
if attempt == max_retries - 1:
logger.error(f"Failed to search for term '{term}': {e}")
return None
time.sleep(2 ** attempt) # Exponential backoff
return None
2. Security and Compliance
Copy
def secure_coding_process(self, clinical_text: str, user_id: str) -> Dict[str, Any]:
"""HIPAA-compliant coding process with audit logging"""
# Log access for audit
audit_log = {
"user_id": user_id,
"action": "clinical_coding",
"timestamp": datetime.now().isoformat(),
"text_length": len(clinical_text),
"text_hash": hashlib.sha256(clinical_text.encode()).hexdigest()
}
try:
# Process coding (text is not logged)
report = self.generate_coding_report(clinical_text)
# Log successful completion
audit_log["status"] = "success"
audit_log["codes_generated"] = len(report["billing_codes"])
# Remove sensitive data before returning
sanitized_report = self.sanitize_report(report)
return sanitized_report
except Exception as e:
audit_log["status"] = "error"
audit_log["error"] = str(e)
raise
finally:
# Always log audit information
self.log_audit_event(audit_log)