Skip to main content

Overview

Clinical coding is the process of transforming healthcare diagnoses, procedures, medical services, and equipment into universal medical alphanumeric codes. This guide demonstrates how to use the OMOPHub API to automate clinical coding workflows, improve accuracy, and ensure compliance with healthcare standards.
Use Case: Automatically convert clinical notes and diagnoses into standardized codes (ICD-10, HCPCS, SNOMED CT) for billing and reporting.

Business Problem

Healthcare organizations face several challenges with manual clinical coding:
  • Human Error: Manual coding leads to 15-20% error rates
  • Inconsistency: Different coders may assign different codes for the same condition
  • Compliance Risk: Incorrect coding can result in audit failures and penalties
  • Efficiency: Manual coding is time-consuming and expensive
  • Revenue Loss: Undercoding leads to lost revenue, overcoding leads to compliance issues

Solution Architecture

Implementation Guide

Step 1: Set Up OMOPHub Client

Production Warning: The extract_medical_terms method shown below is for demonstration purposes only and uses simple keyword matching. For production healthcare applications, use proper NLP libraries such as:
  • spaCy with medical models (scispaCy, en_core_sci_md)
  • NLTK with medical corpora
  • Specialized medical NLP: Amazon Comprehend Medical, Google Healthcare NLP API, or clinical BERT models
  • Open-source medical NLP: Apache cTAKES, MetaMap, or MedSpacy
Accurate medical term extraction is critical for patient safety and billing compliance.
from omophub import OMOPHubClient
import re
import traceback
from datetime import datetime
from typing import List, Dict, Any

class ClinicalCodingService:
    def __init__(self, api_key: str):
        self.client = OMOPHubClient(api_key=api_key)
        
    def extract_medical_terms(self, clinical_text: str) -> List[str]:
        """Extract potential medical terms from clinical text"""
        # Simple term extraction - in production, use NLP libraries
        medical_keywords = [
            'diabetes', 'hypertension', 'pneumonia', 'fracture',
            'myocardial infarction', 'stroke', 'cancer', 'infection'
        ]
        
        found_terms = []
        text_lower = clinical_text.lower()
        
        for keyword in medical_keywords:
            if keyword in text_lower:
                found_terms.append(keyword)
                
        return found_terms
    
    def get_clinical_text(self, encounter_id: str) -> str:
        """Retrieve clinical text for a given encounter ID
        
        Args:
            encounter_id: Unique identifier for the patient encounter
            
        Returns:
            Clinical text/notes for the encounter
            
        Note:
            This is a placeholder method. In production, integrate with your
            EHR system, FHIR server, or clinical data warehouse.
        """
        # Placeholder implementation - replace with actual data source
        # Examples of integration:
        # - FHIR API: GET /Encounter/{encounter_id}/notes
        # - EHR database query
        # - Document management system API
        
        # For demonstration purposes, return sample clinical text
        sample_texts = {
            "enc_001": "Patient presents with type 2 diabetes mellitus. Blood glucose elevated at 180 mg/dL. Prescribed metformin 500mg twice daily.",
            "enc_002": "Chief complaint: chest pain. Patient has history of hypertension and smoking. EKG shows no acute changes. Troponin negative.",
            "enc_003": "Follow-up visit for pneumonia treatment. Patient reports improvement in cough and fever. Chest X-ray shows resolving infiltrates.",
        }
        
        # Return sample text or raise error for unknown encounters
        if encounter_id in sample_texts:
            return sample_texts[encounter_id]
        else:
            # In production, this would query your actual data source
            raise NotImplementedError(
                f"Clinical text retrieval not implemented for encounter {encounter_id}. "
                "Please integrate with your EHR system, FHIR server, or clinical database."
            )

Step 2: Search and Map Medical Concepts

def search_medical_concepts(self, terms: List[str]) -> Dict[str, Any]:
    """Search for medical concepts and map to multiple vocabularies"""
    results = {}
    
    for term in terms:
        try:
            # Search for concepts across vocabularies
            search_results = self.client.advanced_search_concepts({
                "query": term,
                "vocabularies": ["SNOMED", "ICD10CM", "HCPCS"],
                "domains": ["Condition", "Procedure", "Drug"],
                "standard_concepts_only": True,
                "limit": 10
            })
            
            # Get the best match
            if search_results["concepts"]:
                best_match = search_results["concepts"][0]
                
                # Get mappings to other vocabularies
                mappings = self.client.get_concept_mappings(
                    best_match["concept_id"],
                    target_vocabularies=["ICD10CM", "HCPCS", "HCPCS"]
                )
                
                results[term] = {
                    "primary_concept": best_match,
                    "mappings": mappings,
                    "confidence": best_match.get("relevance_score", 0)
                }
                
        except Exception as e:
            print(f"Error processing term '{term}': {e}")
            # Collect detailed error information with consistent structure
            results[term] = {
                "error": str(e),
                "primary_concept": None,
                "mappings": None,
                "confidence": 0,
                "error_details": {
                    "timestamp": datetime.now().isoformat(),
                    "term": term,
                    "error_type": type(e).__name__,
                    "traceback": traceback.format_exc()
                }
            }
            
    return results

def get_billing_codes(self, concept_mappings: Dict[str, Any]) -> Dict[str, List[str]]:
    """Extract billing codes from concept mappings"""
    billing_codes = {
        "icd10": [],
        "cpt": [],
        "hcpcs": []
    }
    
    for term, data in concept_mappings.items():
        if "mappings" in data:
            for mapping in data["mappings"]:
                vocab = mapping["vocabulary_id"].lower()
                code = mapping["concept_code"]
                
                if "icd10" in vocab:
                    billing_codes["icd10"].append({
                        "code": code,
                        "description": mapping["concept_name"],
                        "term": term
                    })
                elif "cpt" in vocab:
                    billing_codes["cpt"].append({
                        "code": code,
                        "description": mapping["concept_name"],
                        "term": term
                    })
                elif "hcpcs" in vocab:
                    billing_codes["hcpcs"].append({
                        "code": code,
                        "description": mapping["concept_name"],
                        "term": term
                    })
                    
    return billing_codes

Step 3: Validate and Quality Check

def validate_coding_quality(self, billing_codes: Dict[str, List[str]]) -> Dict[str, Any]:
    """Validate coding quality and completeness"""
    validation_results = {
        "completeness": {},
        "conflicts": [],
        "recommendations": []
    }
    
    # Check completeness
    total_terms = len(billing_codes.get("icd10", [])) + len(billing_codes.get("cpt", []))
    validation_results["completeness"]["total_codes"] = total_terms
    validation_results["completeness"]["has_diagnosis"] = len(billing_codes.get("icd10", [])) > 0
    validation_results["completeness"]["has_procedure"] = len(billing_codes.get("cpt", [])) > 0
    
    # Check for conflicts (same term mapped to multiple codes)
    seen_terms = {}
    for code_type, codes in billing_codes.items():
        for code_data in codes:
            term = code_data["term"]
            if term in seen_terms:
                if seen_terms[term] != code_type:
                    validation_results["conflicts"].append({
                        "term": term,
                        "vocabularies": [seen_terms[term], code_type],
                        "message": f"Term '{term}' mapped to multiple vocabularies"
                    })
            seen_terms[term] = code_type
    
    # Generate recommendations
    if not validation_results["completeness"]["has_diagnosis"]:
        validation_results["recommendations"].append({
            "type": "missing_diagnosis",
            "message": "No diagnosis codes found. Consider adding ICD-10 codes."
        })
    
    if not validation_results["completeness"]["has_procedure"]:
        validation_results["recommendations"].append({
            "type": "missing_procedure", 
            "message": "No procedure codes found. Consider adding HCPCS codes."
        })
    
    return validation_results

def generate_coding_report(self, clinical_text: str) -> Dict[str, Any]:
    """Generate complete coding report for clinical text"""
    # Extract terms
    terms = self.extract_medical_terms(clinical_text)
    
    # Search and map concepts
    concept_mappings = self.search_medical_concepts(terms)
    
    # Extract billing codes
    billing_codes = self.get_billing_codes(concept_mappings)
    
    # Validate quality
    validation = self.validate_coding_quality(billing_codes)
    
    return {
        "input_text": clinical_text,
        "extracted_terms": terms,
        "concept_mappings": concept_mappings,
        "billing_codes": billing_codes,
        "validation": validation,
        "timestamp": datetime.now().isoformat()
    }

Example Implementation

Sample Clinical Note

Patient presents with chest pain and shortness of breath. 
ECG shows ST elevation. Diagnosed with acute myocardial infarction.
Patient underwent percutaneous coronary intervention (PCI).
Also has history of type 2 diabetes mellitus and hypertension.

Generated Coding Report

# Initialize the service
coding_service = ClinicalCodingService("your_api_key")

# Generate coding report
clinical_note = """
Patient presents with chest pain and shortness of breath. 
ECG shows ST elevation. Diagnosed with acute myocardial infarction.
Patient underwent percutaneous coronary intervention (PCI).
Also has history of type 2 diabetes mellitus and hypertension.
"""

report = coding_service.generate_coding_report(clinical_note)

# Print results
print("=== CLINICAL CODING REPORT ===")
print(f"Extracted Terms: {report['extracted_terms']}")
print(f"Total Billing Codes: {report['validation']['completeness']['total_codes']}")

print("\n=== ICD-10 DIAGNOSIS CODES ===")
for code in report['billing_codes']['icd10']:
    print(f"  {code['code']}: {code['description']}")

print("\n=== HCPCS PROCEDURE CODES ===")
for code in report['billing_codes']['cpt']:
    print(f"  {code['code']}: {code['description']}")

print("\n=== VALIDATION RESULTS ===")
for rec in report['validation']['recommendations']:
    print(f"  ⚠️  {rec['message']}")

Expected Output

=== CLINICAL CODING REPORT ===
Extracted Terms: myocardial infarction, diabetes, hypertension
Total Billing Codes: 5

=== ICD-10 DIAGNOSIS CODES ===
  I21.9: Acute myocardial infarction, unspecified
  E11.9: Type 2 diabetes mellitus without complications  
  I10: Essential hypertension

=== HCPCS PROCEDURE CODES ===
  92928: Percutaneous transcatheter placement of intracoronary stent(s)
  93458: Catheter placement in coronary artery(s) for coronary angiography

=== VALIDATION RESULTS ===
  ✅ Complete diagnosis coding found
  ✅ Procedure codes identified
  ⚠️  Consider reviewing for additional comorbidities

Integration Patterns

1. EHR Integration

class EHRCodingIntegration:
    def __init__(self, omophub_client, ehr_client):
        self.omophub = omophub_client
        self.ehr = ehr_client
    
    def process_encounter(self, encounter_id: str):
        # Get clinical notes from EHR
        encounter = self.ehr.get_encounter(encounter_id)
        clinical_text = encounter.get('notes', '')
        
        # Generate codes using OMOPHub
        coding_service = ClinicalCodingService(self.omophub.config.api_key)
        report = coding_service.generate_coding_report(clinical_text)
        
        # Update EHR with suggested codes
        suggested_codes = []
        for code in report['billing_codes']['icd10']:
            suggested_codes.append({
                'type': 'diagnosis',
                'code': code['code'],
                'description': code['description']
            })
        
        for code in report['billing_codes']['cpt']:
            suggested_codes.append({
                'type': 'procedure', 
                'code': code['code'],
                'description': code['description']
            })
        
        # Send back to EHR for coder review
        self.ehr.update_encounter_codes(encounter_id, suggested_codes)
        
        return report

2. Real-time Coding Assistant

class RealTimeCodingAssistant {
  constructor(omophubClient) {
    this.client = omophubClient;
    this.debounceTimer = null;
  }
  
  // Real-time suggestions as user types
  async getSuggestions(clinicalText, callback) {
    // Debounce to avoid too many API calls
    clearTimeout(this.debounceTimer);
    
    this.debounceTimer = setTimeout(async () => {
      if (clinicalText.length < 10) return;
      
      try {
        const suggestions = await this.client.getSearchAutocomplete({
          query: clinicalText,
          vocabularies: ['SNOMED', 'ICD10CM'],
          limit: 5
        });
        
        callback(suggestions.data);
      } catch (error) {
        console.error('Error getting suggestions:', error);
      }
    }, 500); // Wait 500ms after user stops typing
  }
  
  // Validate codes before submission
  async validateCodes(codes) {
    const validationResults = [];
    
    for (const code of codes) {
      try {
        const concept = await this.client.getConceptByCode(
          code.vocabulary_id,
          code.code
        );
        
        validationResults.push({
          code: code.code,
          valid: concept.data.success,
          current: concept.data.validEndDate > new Date().toISOString(),
          concept: concept.data
        });
      } catch (error) {
        validationResults.push({
          code: code.code,
          valid: false,
          error: error.message
        });
      }
    }
    
    return validationResults;
  }
}

Performance Optimization

Caching Strategy

import redis
import json
from typing import Optional

class CachedCodingService(ClinicalCodingService):
    def __init__(self, api_key: str, redis_url: str = "redis://localhost:6379"):
        super().__init__(api_key)
        self.redis_client = redis.from_url(redis_url)
        self.cache_ttl = 3600  # 1 hour
    
    def _cache_key(self, text: str) -> str:
        """Generate cache key from clinical text"""
        import hashlib
        return f"coding:{hashlib.md5(text.encode()).hexdigest()}"
    
    def generate_coding_report(self, clinical_text: str) -> Dict[str, Any]:
        # Check cache first
        cache_key = self._cache_key(clinical_text)
        cached_result = self.redis_client.get(cache_key)
        
        if cached_result:
            return json.loads(cached_result)
        
        # Generate new report
        report = super().generate_coding_report(clinical_text)
        
        # Cache the result
        self.redis_client.setex(
            cache_key,
            self.cache_ttl,
            json.dumps(report, default=str)
        )
        
        return report

Batch Processing

def process_batch_encounters(self, encounter_ids: List[str]) -> List[Dict[str, Any]]:
    """Process multiple encounters efficiently"""
    # Batch extract all terms first
    all_terms = []
    encounter_terms = {}
    
    for encounter_id in encounter_ids:
        clinical_text = self.get_clinical_text(encounter_id)
        terms = self.extract_medical_terms(clinical_text)
        all_terms.extend(terms)
        encounter_terms[encounter_id] = terms
    
    # Deduplicate terms
    unique_terms = list(set(all_terms))
    
    # Batch search all unique terms
    bulk_searches = [
        {"search_id": term, "query": term}
        for term in unique_terms
    ]
    
    bulk_results = self.client.bulk_concept_search({
        "searches": bulk_searches,
        "version": "latest"
    })
    
    # Process results for each encounter
    reports = []
    for encounter_id in encounter_ids:
        terms = encounter_terms[encounter_id]
        # Map bulk results back to encounter terms
        encounter_mappings = {
            term: bulk_results["results"].get(term, {})
            for term in terms
        }
        
        # Generate report using cached concept data
        report = self._generate_report_from_cache(
            encounter_id, 
            encounter_mappings
        )
        reports.append(report)
    
    return reports

Quality Assurance

Code Validation Rules

class CodingValidator:
    def __init__(self, omophub_client):
        self.client = omophub_client
        
    def validate_code_combination(self, diagnosis_codes: List[str], 
                                procedure_codes: List[str]) -> Dict[str, Any]:
        """Validate that procedure codes are appropriate for diagnoses"""
        validation_results = {
            "valid_combinations": [],
            "invalid_combinations": [],
            "warnings": []
        }
        
        for dx_code in diagnosis_codes:
            for proc_code in procedure_codes:
                # Check if procedure is clinically appropriate for diagnosis
                relationship = self.check_clinical_relationship(dx_code, proc_code)
                
                if relationship["appropriate"]:
                    validation_results["valid_combinations"].append({
                        "diagnosis": dx_code,
                        "procedure": proc_code,
                        "confidence": relationship["confidence"]
                    })
                else:
                    validation_results["invalid_combinations"].append({
                        "diagnosis": dx_code,
                        "procedure": proc_code,
                        "reason": relationship["reason"]
                    })
        
        return validation_results
    
    def check_clinical_relationship(self, dx_code: str, proc_code: str) -> Dict[str, Any]:
        """Check if procedure is clinically appropriate for diagnosis"""
        try:
            # Get diagnosis concept
            dx_concept = self.client.get_concept_by_code("ICD10CM", dx_code)
            
            # Get procedure concept  
            proc_concept = self.client.get_concept_by_code("HCPCS", proc_code)
            
            # Check relationships between concepts
            relationships = self.client.get_concept_relationships(
                dx_concept["concept_id"],
                relationship_types=["Has procedure", "Treatment of"]
            )
            
            # Look for connection to procedure concept
            for rel in relationships:
                if rel["target_concept_id"] == proc_concept["concept_id"]:
                    return {
                        "appropriate": True,
                        "confidence": 0.9,
                        "relationship": rel["relationship_type"]
                    }
            
            # No direct relationship found
            return {
                "appropriate": False,
                "confidence": 0.1,
                "reason": "No clinical relationship found between diagnosis and procedure"
            }
            
        except Exception as e:
            return {
                "appropriate": False,
                "confidence": 0.0,
                "reason": f"Error validating relationship: {str(e)}"
            }

Metrics and Monitoring

Coding Accuracy Metrics

class CodingMetrics:
    def __init__(self, omophub_client):
        self.client = omophub_client
        
    def calculate_coding_accuracy(self, automated_codes: List[str], 
                                manual_codes: List[str]) -> Dict[str, float]:
        """Calculate accuracy metrics comparing automated vs manual coding"""
        # Convert to sets for easier comparison
        auto_set = set(automated_codes)
        manual_set = set(manual_codes)
        
        # Calculate precision, recall, F1
        true_positives = len(auto_set.intersection(manual_set))
        false_positives = len(auto_set - manual_set)
        false_negatives = len(manual_set - auto_set)
        
        precision = true_positives / (true_positives + false_positives) if (true_positives + false_positives) > 0 else 0
        recall = true_positives / (true_positives + false_negatives) if (true_positives + false_negatives) > 0 else 0
        f1_score = 2 * (precision * recall) / (precision + recall) if (precision + recall) > 0 else 0
        
        return {
            "precision": precision,
            "recall": recall,
            "f1_score": f1_score,
            "accuracy": true_positives / len(manual_set) if len(manual_set) > 0 else 0
        }
    
    def track_coding_trends(self, time_period: str = "30d") -> Dict[str, Any]:
        """Track coding trends and patterns"""
        # This would integrate with your analytics system
        # to track metrics over time
        pass

Best Practices

1. Error Handling

def robust_coding_search(self, term: str, max_retries: int = 3) -> Optional[Dict]:
    """Robust search with retry logic and fallback strategies"""
    for attempt in range(max_retries):
        try:
            # Try exact search first
            results = self.client.search_concepts({
                "query": term,
                "vocabularies": ["SNOMED"],
                "limit": 1
            })
            
            if results["concepts"]:
                return results["concepts"][0]
            
            # Fallback to fuzzy search
            fuzzy_results = self.client.advanced_search_concepts({
                "query": term,
                "vocabularies": ["SNOMED", "ICD10CM"],
                "include_invalid": False,
                "limit": 5
            })
            
            if fuzzy_results["concepts"]:
                return fuzzy_results["concepts"][0]
                
        except Exception as e:
            if attempt == max_retries - 1:
                logger.error(f"Failed to search for term '{term}': {e}")
                return None
            time.sleep(2 ** attempt)  # Exponential backoff
    
    return None

2. Security and Compliance

def secure_coding_process(self, clinical_text: str, user_id: str) -> Dict[str, Any]:
    """HIPAA-compliant coding process with audit logging"""
    
    # Log access for audit
    audit_log = {
        "user_id": user_id,
        "action": "clinical_coding",
        "timestamp": datetime.now().isoformat(),
        "text_length": len(clinical_text),
        "text_hash": hashlib.sha256(clinical_text.encode()).hexdigest()
    }
    
    try:
        # Process coding (text is not logged)
        report = self.generate_coding_report(clinical_text)
        
        # Log successful completion
        audit_log["status"] = "success"
        audit_log["codes_generated"] = len(report["billing_codes"])
        
        # Remove sensitive data before returning
        sanitized_report = self.sanitize_report(report)
        
        return sanitized_report
        
    except Exception as e:
        audit_log["status"] = "error"
        audit_log["error"] = str(e)
        raise
        
    finally:
        # Always log audit information
        self.log_audit_event(audit_log)

Next Steps

I