Skip to main content

Overview

Large Language Models (LLMs) have revolutionized how we process and understand natural language, but in healthcare, accuracy and reliability are paramount. This guide demonstrates how to integrate cutting-edge AI technologies like OpenAI’s GPT models and AWS Bedrock with the OMOPHub API to create intelligent healthcare applications that are both innovative and clinically sound.
Use Case: Build AI-powered healthcare applications that combine the natural language understanding of LLMs with the accuracy and standardization of medical vocabularies, ensuring both intelligence and clinical safety.

Business Problem

Healthcare AI faces unique challenges that generic LLMs struggle to address:
  • Medical Hallucination: LLMs can generate plausible but incorrect medical information
  • Terminology Inconsistency: Clinical terms vary widely across systems and specialties
  • Compliance Requirements: Healthcare applications must meet strict regulatory standards
  • Evidence-Based Medicine: Clinical decisions require validated, authoritative information
  • Interoperability: Medical data needs standardized vocabularies for system integration
  • Patient Safety: Incorrect AI-generated medical advice can have serious consequences

Solution Architecture

The key to successful healthcare AI is combining the natural language capabilities of LLMs with the structured, validated data from medical vocabularies. This creates a “grounded AI” approach where LLM outputs are validated and enhanced with authoritative medical information.

Implementation Examples

Clinical Note Analysis with OpenAI

Transform unstructured clinical notes into structured, standardized medical data using GPT-4 combined with OMOPHub vocabulary validation.
import openai
import asyncio
import json
from typing import List, Dict, Any, Optional
from dataclasses import dataclass
from omophub import OMOPHubClient

@dataclass
class MedicalEntity:
    text: str
    category: str
    confidence: float
    concept_id: Optional[int] = None
    concept_name: Optional[str] = None
    vocabulary_id: Optional[str] = None
    standard_concept: Optional[str] = None

class ClinicalNoteAnalyzer:
    def __init__(self, openai_api_key: str, omophub_api_key: str):
        self.openai_client = openai.AsyncOpenAI(api_key=openai_api_key)
        self.omophub = OMOPHubClient(api_key=omophub_api_key)
        
    async def extract_medical_entities(self, clinical_note: str) -> List[MedicalEntity]:
        """Extract medical entities using GPT-4 with structured output"""
        
        system_prompt = """You are a medical AI assistant specializing in clinical note analysis. 
        Extract medical entities from clinical notes and categorize them.
        
        Categories:
        - condition: Diseases, disorders, symptoms
        - medication: Drugs, treatments, prescriptions  
        - procedure: Medical procedures, surgeries, interventions
        - observation: Lab results, vital signs, measurements
        - anatomy: Body parts, organs, anatomical structures
        
        Return a single JSON object with an "entities" field containing an array of objects.
        Each entity object should have: text, category, and confidence (0-1).
        Return only valid JSON, no surrounding text.
        
        Expected format: {"entities": [{"text": "diabetes", "category": "condition", "confidence": 0.95}, ...]}"""
        
        response = await self.openai_client.chat.completions.create(
            model="gpt-4-turbo-preview",
            messages=[
                {"role": "system", "content": system_prompt},
                {"role": "user", "content": f"Extract medical entities from this clinical note:\n\n{clinical_note}"}
            ],
            response_format={"type": "json_object"},
            temperature=0.1
        )
        
        try:
            entities_data = json.loads(response.choices[0].message.content)
            entities = []
            
            for entity_data in entities_data.get("entities", []):
                entity = MedicalEntity(
                    text=entity_data["text"],
                    category=entity_data["category"],
                    confidence=entity_data["confidence"]
                )
                entities.append(entity)
                
            return entities
            
        except json.JSONDecodeError:
            return []
    
    async def validate_and_standardize_entities(self, entities: List[MedicalEntity]) -> List[MedicalEntity]:
        """Validate entities against OMOPHub vocabularies"""
        
        validated_entities = []
        
        for entity in entities:
            try:
                # Search for the entity in appropriate vocabularies
                vocabulary_mapping = {
                    "condition": ["SNOMED", "ICD10CM"],
                    "medication": ["RXNORM", "NDC"],
                    "procedure": ["SNOMED"],
                    "observation": ["LOINC", "SNOMED"],
                    "anatomy": ["SNOMED"]
                }
                
                vocabularies = vocabulary_mapping.get(entity.category, ["SNOMED"])
                
                # Search for matching concepts
                search_result = await self.omophub.search_concepts(
                    query=entity.text,
                    vocabulary_ids=vocabularies,
                    standard_concept="S",
                    page_size=5
                )
                
                if search_result["data"]:
                    # Take the best match
                    best_match = search_result["data"][0]
                    entity.concept_id = best_match["concept_id"]
                    entity.concept_name = best_match["concept_name"]
                    entity.vocabulary_id = best_match["vocabulary_id"]
                    entity.standard_concept = best_match["standard_concept"]
                
                validated_entities.append(entity)
                
            except Exception as e:
                # Keep original entity if validation fails
                validated_entities.append(entity)
                
        return validated_entities
    
    async def generate_structured_summary(self, clinical_note: str) -> Dict[str, Any]:
        """Generate comprehensive structured summary"""
        
        # Extract and validate entities
        entities = await self.extract_medical_entities(clinical_note)
        validated_entities = await self.validate_and_standardize_entities(entities)
        
        # Generate clinical summary using GPT-4
        summary_prompt = f"""Based on this clinical note, provide a structured summary:

        Clinical Note: {clinical_note}
        
        Provide:
        1. Primary diagnosis/condition
        2. Secondary diagnoses  
        3. Current medications
        4. Planned procedures/treatments
        5. Key observations/findings
        6. Clinical assessment
        
        Format as JSON object."""
        
        summary_response = await self.openai_client.chat.completions.create(
            model="gpt-4-turbo-preview",
            messages=[{"role": "user", "content": summary_prompt}],
            response_format={"type": "json_object"},
            temperature=0.2
        )
        
        try:
            summary = json.loads(summary_response.choices[0].message.content)
        except:
            summary = {"error": "Failed to parse summary"}
        
        return {
            "clinical_summary": summary,
            "extracted_entities": [
                {
                    "original_text": e.text,
                    "category": e.category,
                    "confidence": e.confidence,
                    "standardized_concept": {
                        "concept_id": e.concept_id,
                        "concept_name": e.concept_name,
                        "vocabulary": e.vocabulary_id
                    } if e.concept_id else None
                }
                for e in validated_entities
            ],
            "coding_suggestions": await self._generate_coding_suggestions(validated_entities)
        }
    
    async def _generate_coding_suggestions(self, entities: List[MedicalEntity]) -> List[Dict[str, Any]]:
        """Generate ICD-10 and SNOMED coding suggestions"""
        
        suggestions = []
        
        for entity in entities:
            if entity.concept_id and entity.category in ["condition", "procedure"]:
                try:
                    # Get mappings to other vocabularies
                    mappings = await self.omophub.get_concept_mappings(
                        concept_id=entity.concept_id,
                        target_vocabularies=["ICD10CM", "SNOMED"]
                    )
                    
                    if mappings["data"]:
                        suggestions.append({
                            "original_entity": entity.text,
                            "primary_concept": {
                                "concept_id": entity.concept_id,
                                "concept_name": entity.concept_name,
                                "vocabulary": entity.vocabulary_id
                            },
                            "coding_options": [
                                {
                                    "code": mapping["target_concept_code"],
                                    "name": mapping["target_concept_name"],
                                    "vocabulary": mapping["target_vocabulary_id"],
                                    "relationship": mapping["relationship_id"]
                                }
                                for mapping in mappings["data"]
                            ]
                        })
                        
                except Exception:
                    continue
                    
        return suggestions

# Usage Example
async def analyze_clinical_note():
    analyzer = ClinicalNoteAnalyzer(
        openai_api_key="your-openai-key",
        omophub_api_key="your-omophub-key"
    )
    
    clinical_note = """
    Patient presents with acute onset chest pain radiating to left arm, 
    associated with shortness of breath and diaphoresis. EKG shows ST elevation 
    in leads II, III, aVF suggestive of inferior STEMI. Troponin I elevated at 15.2 ng/mL.
    Patient has history of hypertension, diabetes mellitus type 2, and smoking.
    Plan: Emergency cardiac catheterization, dual antiplatelet therapy with aspirin 
    and clopidogrel, atorvastatin, metoprolol.
    """
    
    result = await analyzer.generate_structured_summary(clinical_note)
    
    print("Clinical Summary:")
    print(json.dumps(result["clinical_summary"], indent=2))
    
    print("\nExtracted Entities:")
    for entity in result["extracted_entities"]:
        print(f"- {entity['original_text']} ({entity['category']})")
        if entity["standardized_concept"]:
            concept = entity["standardized_concept"]
            print(f"  → {concept['concept_name']} ({concept['vocabulary']}:{concept['concept_id']})")
    
    print("\nCoding Suggestions:")
    for suggestion in result["coding_suggestions"]:
        print(f"- {suggestion['original_entity']}")
        for option in suggestion["coding_options"]:
            print(f"  → {option['vocabulary']}: {option['code']} - {option['name']}")

# Run the example
if __name__ == "__main__":
    asyncio.run(analyze_clinical_note())

Medical Q&A System with AWS Bedrock

Create an intelligent medical question-answering system that combines Claude’s reasoning capabilities with OMOPHub vocabulary validation to provide accurate, evidence-based responses.
import boto3
import json
import asyncio
from typing import Dict, List, Any, Optional
from dataclasses import dataclass
from omophub import OMOPHubClient

@dataclass
class MedicalCitation:
    concept_id: int
    concept_name: str
    vocabulary: str
    definition: Optional[str] = None
    source_url: Optional[str] = None

@dataclass  
class MedicalResponse:
    answer: str
    confidence_score: float
    citations: List[MedicalCitation]
    safety_warnings: List[str]
    follow_up_questions: List[str]

class MedicalQASystem:
    def __init__(self, aws_region: str, omophub_api_key: str):
        self.bedrock = boto3.client(
            service_name='bedrock-runtime',
            region_name=aws_region
        )
        self.omophub = OMOPHubClient(api_key=omophub_api_key)
        
    async def answer_medical_question(self, question: str, context: Optional[str] = None) -> MedicalResponse:
        """Answer medical questions with vocabulary validation"""
        
        # Step 1: Extract medical concepts from the question
        medical_concepts = await self._extract_medical_concepts(question)
        
        # Step 2: Validate concepts with OMOPHub
        validated_concepts = await self._validate_medical_concepts(medical_concepts)
        
        # Step 3: Get related medical information
        context_info = await self._get_medical_context(validated_concepts)
        
        # Step 4: Generate response using Claude
        response = await self._generate_claude_response(
            question, 
            context or "",
            context_info, 
            validated_concepts
        )
        
        # Step 5: Post-process for safety
        safety_checked = await self._safety_check_response(response, question)
        
        return safety_checked
    
    async def _extract_medical_concepts(self, text: str) -> List[str]:
        """Extract medical concepts using Claude"""
        
        prompt = f"""
        <task>Extract medical concepts, terms, and entities from the following question.</task>
        
        <question>{text}</question>
        
        <instructions>
        - Identify medical conditions, symptoms, treatments, procedures, medications, anatomy
        - Return only the specific medical terms, not general words
        - Focus on standardized medical terminology
        - Return as JSON array of strings
        </instructions>
        """
        
        body = {
            "anthropic_version": "bedrock-2023-05-31",
            "max_tokens": 1000,
            "messages": [{"role": "user", "content": prompt}],
            "temperature": 0.1
        }
        
        response = self.bedrock.invoke_model(
            modelId='anthropic.claude-3-sonnet-20240229-v1:0',
            body=json.dumps(body),
            contentType='application/json',
            accept='application/json'
        )
        
        response_body = json.loads(response['body'].read())
        
        try:
            # Parse the JSON response to extract concepts
            content = response_body['content'][0]['text']
            # Remove any markdown formatting and extract JSON
            start = content.find('[')
            end = content.rfind(']') + 1
            concepts_json = content[start:end] if start != -1 and end != 0 else '[]'
            concepts = json.loads(concepts_json)
            return concepts
        except:
            return []
    
    async def _validate_medical_concepts(self, concepts: List[str]) -> List[MedicalCitation]:
        """Validate and enrich concepts using OMOPHub"""
        
        validated = []
        
        for concept in concepts:
            try:
                # Search for the concept in multiple vocabularies
                search_result = await self.omophub.search_concepts(
                    query=concept,
                    vocabulary_ids=["SNOMED", "ICD10CM", "LOINC", "RXNORM"],
                    standard_concept="S",
                    page_size=3
                )
                
                if search_result["data"]:
                    best_match = search_result["data"][0]
                    
                    # Get additional details if available
                    concept_details = await self.omophub.get_concept(
                        concept_id=best_match["concept_id"]
                    )
                    
                    citation = MedicalCitation(
                        concept_id=best_match["concept_id"],
                        concept_name=best_match["concept_name"],
                        vocabulary=best_match["vocabulary_id"],
                        definition=concept_details.get("definition"),
                        source_url=f"https://api.omophub.com/v1/concepts/{best_match['concept_id']}"
                    )
                    validated.append(citation)
                    
            except Exception as e:
                # Log but continue with other concepts
                print(f"Failed to validate concept '{concept}': {e}")
                continue
                
        return validated
    
    async def _get_medical_context(self, concepts: List[MedicalCitation]) -> Dict[str, Any]:
        """Get additional medical context for validated concepts"""
        
        context = {
            "concept_definitions": {},
            "relationships": {},
            "hierarchies": {}
        }
        
        for concept in concepts:
            try:
                # Get concept relationships
                relationships = await self.omophub.get_concept_relationships(
                    concept_id=concept.concept_id,
                    relationship_types=["Is a", "Has finding site", "Has causative agent"]
                )
                
                if relationships["data"]:
                    context["relationships"][concept.concept_name] = [
                        {
                            "type": rel["relationship_id"],
                            "target": rel["target_concept_name"]
                        }
                        for rel in relationships["data"][:5]  # Limit to top 5
                    ]
                
                # Get hierarchical information for conditions
                if concept.vocabulary == "SNOMED":
                    ancestors = await self.omophub.get_concept_ancestors(
                        concept_id=concept.concept_id,
                        max_levels=3
                    )
                    
                    if ancestors["data"]:
                        context["hierarchies"][concept.concept_name] = [
                            ancestor["ancestor_concept_name"] 
                            for ancestor in ancestors["data"][:3]
                        ]
                        
            except Exception:
                continue
                
        return context
    
    async def _generate_claude_response(
        self, 
        question: str, 
        user_context: str,
        medical_context: Dict[str, Any], 
        citations: List[MedicalCitation]
    ) -> MedicalResponse:
        """Generate response using Claude with medical context"""
        
        citations_text = "\n".join([
            f"- {c.concept_name} ({c.vocabulary}): {c.definition or 'No definition available'}"
            for c in citations
        ])
        
        relationships_text = "\n".join([
            f"{concept}: {', '.join([r['type'] + ' ' + r['target'] for r in rels])}"
            for concept, rels in medical_context.get("relationships", {}).items()
        ])
        
        prompt = f"""
        <role>You are a medical AI assistant providing evidence-based healthcare information.</role>
        
        <context>
        User Context: {user_context}
        
        Validated Medical Concepts:
        {citations_text}
        
        Medical Relationships:
        {relationships_text}
        </context>
        
        <question>{question}</question>
        
        <instructions>
        1. Provide a comprehensive, accurate answer based on the validated medical concepts
        2. Include relevant medical relationships and hierarchies in your explanation
        3. Cite specific concepts using their standardized names
        4. Include confidence assessment (high/medium/low)
        5. Add safety warnings if discussing treatments, medications, or diagnoses
        6. Suggest 2-3 relevant follow-up questions
        7. Format response as JSON with fields: answer, confidence_level, safety_warnings (array), follow_up_questions (array)
        </instructions>
        
        <safety>
        - Always recommend consulting healthcare professionals for medical advice
        - Do not provide specific dosage recommendations
        - Include disclaimers for diagnostic or treatment information
        - Emphasize when information is for educational purposes only
        </safety>
        """
        
        body = {
            "anthropic_version": "bedrock-2023-05-31",
            "max_tokens": 4000,
            "messages": [{"role": "user", "content": prompt}],
            "temperature": 0.2
        }
        
        response = self.bedrock.invoke_model(
            modelId='anthropic.claude-3-sonnet-20240229-v1:0',
            body=json.dumps(body),
            contentType='application/json',
            accept='application/json'
        )
        
        response_body = json.loads(response['body'].read())
        content = response_body['content'][0]['text']
        
        try:
            # Extract JSON from response
            start = content.find('{')
            end = content.rfind('}') + 1
            json_content = content[start:end] if start != -1 and end != 0 else '{}'
            response_data = json.loads(json_content)
            
            confidence_map = {"high": 0.9, "medium": 0.7, "low": 0.5}
            confidence_score = confidence_map.get(
                response_data.get("confidence_level", "medium").lower(), 
                0.7
            )
            
            return MedicalResponse(
                answer=response_data.get("answer", "Unable to generate response"),
                confidence_score=confidence_score,
                citations=citations,
                safety_warnings=response_data.get("safety_warnings", []),
                follow_up_questions=response_data.get("follow_up_questions", [])
            )
            
        except Exception as e:
            # Fallback response
            return MedicalResponse(
                answer="I encountered an error processing your medical question. Please consult a healthcare professional for medical advice.",
                confidence_score=0.0,
                citations=citations,
                safety_warnings=["Unable to validate response accuracy. Consult healthcare professional."],
                follow_up_questions=[]
            )
    
    async def _safety_check_response(self, response: MedicalResponse, question: str) -> MedicalResponse:
        """Additional safety checks for medical responses"""
        
        # Add standard medical disclaimer if not present
        if "consult" not in response.answer.lower() and "healthcare professional" not in response.answer.lower():
            response.safety_warnings.append(
                "This information is for educational purposes only. Always consult with a qualified healthcare professional for medical advice, diagnosis, or treatment."
            )
        
        # Check for potentially dangerous advice
        dangerous_keywords = ["self-medicate", "stop taking", "don't need doctor", "instead of seeing doctor"]
        if any(keyword in response.answer.lower() for keyword in dangerous_keywords):
            response.safety_warnings.append(
                "WARNING: Never self-diagnose or change medical treatments without professional supervision."
            )
            response.confidence_score *= 0.5  # Reduce confidence
        
        return response

# Usage Example
async def demo_medical_qa():
    qa_system = MedicalQASystem(
        aws_region="us-east-1",
        omophub_api_key="your-omophub-key"
    )
    
    questions = [
        "What is the relationship between hypertension and cardiovascular disease?",
        "What are the symptoms of Type 2 diabetes?",
        "How do ACE inhibitors work for treating high blood pressure?"
    ]
    
    for question in questions:
        print(f"\nQ: {question}")
        print("-" * 80)
        
        response = await qa_system.answer_medical_question(question)
        
        print(f"Answer (Confidence: {response.confidence_score:.1%}):")
        print(response.answer)
        
        if response.citations:
            print("\nMedical Citations:")
            for citation in response.citations:
                print(f"- {citation.concept_name} ({citation.vocabulary})")
        
        if response.safety_warnings:
            print("\nSafety Warnings:")
            for warning in response.safety_warnings:
                print(f"⚠️  {warning}")
        
        if response.follow_up_questions:
            print("\nSuggested Follow-up Questions:")
            for i, q in enumerate(response.follow_up_questions, 1):
                print(f"{i}. {q}")

# Run the demo
if __name__ == "__main__":
    asyncio.run(demo_medical_qa())

Best Practices for Healthcare AI

Prompt Engineering for Medical Context

Medical AI applications require specialized prompt engineering techniques:
  1. Structured Medical Prompts:
    • Include role definitions: “You are a medical AI assistant…”
    • Specify medical context: patient type, clinical setting, specialty
    • Define output format: JSON, structured text, specific fields
  2. Evidence-Based Instructions:
    • Request citations for medical claims
    • Ask for confidence levels on responses
    • Include safety warnings and disclaimers
    • Validate against authoritative vocabularies
  3. Safety-First Approach:
    • Always include professional consultation disclaimers
    • Flag potential harmful or dangerous advice
    • Implement content filtering for inappropriate responses
    • Monitor for medical misinformation

Data Privacy and Compliance

Healthcare AI must handle sensitive data appropriately:
import hashlib
import re
from typing import Dict, Any

class HealthcareDataSanitizer:
    def __init__(self):
        # PHI patterns to detect and anonymize
        self.phi_patterns = {
            'mrn': r'\b\d{7,10}\b',  # Medical record numbers
            'ssn': r'\b\d{3}-\d{2}-\d{4}\b',
            'phone': r'\b\d{3}[-.]?\d{3}[-.]?\d{4}\b',
            'email': r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Za-z]{2,}\b',
            'date': r'\b\d{1,2}/\d{1,2}/\d{4}\b',
            'name': r'\b[A-Z][a-z]+ [A-Z][a-z]+\b'  # Simple name pattern
        }
    
    def sanitize_clinical_text(self, text: str, preserve_medical_context: bool = True) -> Dict[str, Any]:
        """Sanitize PHI while preserving medical context"""
        
        sanitized_text = text
        detected_phi = []
        
        for phi_type, pattern in self.phi_patterns.items():
            matches = re.findall(pattern, text)
            for match in matches:
                # Create consistent anonymized replacement
                anonymized = self._anonymize_value(match, phi_type)
                sanitized_text = sanitized_text.replace(match, anonymized)
                detected_phi.append({
                    'type': phi_type,
                    'original_length': len(match),
                    'anonymized': anonymized
                })
        
        # Preserve medical context by keeping medical terms
        if preserve_medical_context:
            sanitized_text = self._preserve_medical_terms(sanitized_text)
            
        return {
            'sanitized_text': sanitized_text,
            'phi_detected': detected_phi,
            'phi_count': len(detected_phi),
            'is_safe_for_ai': len(detected_phi) == 0
        }
    
    def _anonymize_value(self, value: str, phi_type: str) -> str:
        """Create consistent anonymized replacement"""
        hash_value = hashlib.md5(value.encode()).hexdigest()[:8]
        return f"[{phi_type.upper()}_{hash_value}]"
    
    def _preserve_medical_terms(self, text: str) -> str:
        """Preserve important medical terminology"""
        # Keep medical abbreviations, units, ranges
        medical_preserves = [
            r'\b\d+\s*(mg|ml|cc|units?)\b',  # Dosages
            r'\b\d+[-/]\d+\s*mmHg\b',       # Blood pressure
            r'\b\d+\.\d+\s*(ng/mL|mg/dL)\b' # Lab values
        ]
        
        # Additional processing to preserve medical context
        return text

# Usage example
sanitizer = HealthcareDataSanitizer()
clinical_note = "Patient John Smith (MRN: 1234567890) presents with chest pain. BP 140/90 mmHg."

result = sanitizer.sanitize_clinical_text(clinical_note)
print("Sanitized:", result['sanitized_text'])
print("PHI Detected:", result['phi_count'])

Performance Optimization

Healthcare AI applications need to balance accuracy with performance:
import asyncio
import time
from functools import wraps
from typing import Dict, List, Any, Optional
import redis
import json

class HealthcareAICache:
    def __init__(self, redis_url: str = "redis://localhost:6379"):
        self.redis_client = redis.from_url(redis_url)
        self.default_ttl = 3600  # 1 hour cache for medical concepts
        
    @staticmethod
    def cache_medical_concepts(ttl: int = None):
        """Decorator factory to cache medical concept lookups"""
        def decorator(func):
            @wraps(func)
            async def wrapper(*args, **kwargs):
                # Get instance from first argument
                instance = args[0]
                
                # Create cache key from function args
                cache_key = f"medical_concept:{hash(str(args) + str(kwargs))}"
                
                # Try to get from cache first
                cached_result = instance.redis_client.get(cache_key)
                if cached_result:
                    return json.loads(cached_result)
                
                # Execute function and cache result
                result = await func(*args, **kwargs)
                instance.redis_client.setex(
                    cache_key, 
                    ttl or instance.default_ttl, 
                    json.dumps(result, default=str)
                )
                
                return result
            return wrapper
        return decorator
    
    async def batch_concept_lookup(self, concepts: List[str], batch_size: int = 10) -> List[Dict]:
        """Batch concept lookups for better performance"""
        results = []
        
        for i in range(0, len(concepts), batch_size):
            batch = concepts[i:i + batch_size]
            batch_results = await asyncio.gather(*[
                self._lookup_single_concept(concept) 
                for concept in batch
            ])
            results.extend(batch_results)
            
        return results
    
    @cache_medical_concepts(ttl=7200)  # 2 hour cache
    async def _lookup_single_concept(self, concept: str) -> Dict:
        """Single concept lookup with caching"""
        # Implementation would use OMOPHub API
        pass

class PerformanceMonitor:
    def __init__(self):
        self.metrics = {}
    
    def time_operation(self, operation_name: str):
        """Decorator to monitor operation performance"""
        def decorator(func):
            @wraps(func)
            async def wrapper(*args, **kwargs):
                start_time = time.time()
                
                try:
                    result = await func(*args, **kwargs)
                    duration = time.time() - start_time
                    
                    # Store performance metrics
                    if operation_name not in self.metrics:
                        self.metrics[operation_name] = []
                    
                    self.metrics[operation_name].append({
                        'duration': duration,
                        'status': 'success',
                        'timestamp': start_time
                    })
                    
                    return result
                    
                except Exception as e:
                    duration = time.time() - start_time
                    
                    if operation_name not in self.metrics:
                        self.metrics[operation_name] = []
                    
                    self.metrics[operation_name].append({
                        'duration': duration,
                        'status': 'error',
                        'error': str(e),
                        'timestamp': start_time
                    })
                    
                    raise
                    
            return wrapper
        return decorator
    
    def get_performance_summary(self, operation_name: str) -> Dict[str, Any]:
        """Get performance statistics for an operation"""
        if operation_name not in self.metrics:
            return {}
            
        operations = self.metrics[operation_name]
        durations = [op['duration'] for op in operations if op['status'] == 'success']
        
        if not durations:
            return {'error': 'No successful operations recorded'}
        
        return {
            'total_operations': len(operations),
            'successful_operations': len(durations),
            'error_rate': (len(operations) - len(durations)) / len(operations),
            'avg_duration': sum(durations) / len(durations),
            'min_duration': min(durations),
            'max_duration': max(durations),
            'p95_duration': sorted(durations)[int(len(durations) * 0.95)]
        }

# Usage example
cache = HealthcareAICache()
monitor = PerformanceMonitor()

@monitor.time_operation("medical_concept_validation")
@cache.cache_medical_concepts(ttl=3600)
async def validate_medical_concept(concept: str) -> Dict:
    """Cached and monitored concept validation"""
    # OMOPHub API call implementation
    pass

Advanced Patterns

RAG with Medical Knowledge Bases

Retrieval Augmented Generation (RAG) patterns work exceptionally well with medical vocabularies:
class MedicalRAGSystem:
    def __init__(self, omophub_client, llm_client):
        self.omophub = omophub_client
        self.llm = llm_client
        
    async def medical_rag_query(self, question: str, context_types: List[str] = None) -> Dict:
        """Execute RAG pattern with medical knowledge retrieval"""
        
        # 1. Extract medical entities from question
        entities = await self._extract_medical_entities(question)
        
        # 2. Retrieve relevant medical knowledge
        knowledge = await self._retrieve_medical_knowledge(entities, context_types)
        
        # 3. Generate response with retrieved context
        response = await self._generate_with_context(question, knowledge)
        
        return {
            'answer': response,
            'retrieved_knowledge': knowledge,
            'source_citations': self._generate_citations(knowledge)
        }
    
    async def _retrieve_medical_knowledge(self, entities: List[str], context_types: List[str]) -> Dict:
        """Retrieve comprehensive medical context"""
        
        knowledge = {
            'concept_definitions': {},
            'relationships': {},
            'hierarchies': {},
            'mappings': {}
        }
        
        for entity in entities:
            # Get concept details
            concepts = await self.omophub.search_concepts(query=entity)
            if concepts['data']:
                concept = concepts['data'][0]
                knowledge['concept_definitions'][entity] = concept
                
                # Get relationships if requested
                if not context_types or 'relationships' in context_types:
                    relationships = await self.omophub.get_concept_relationships(
                        concept_id=concept['concept_id']
                    )
                    knowledge['relationships'][entity] = relationships['data']
                
                # Get hierarchies if requested  
                if not context_types or 'hierarchies' in context_types:
                    ancestors = await self.omophub.get_concept_ancestors(
                        concept_id=concept['concept_id']
                    )
                    knowledge['hierarchies'][entity] = ancestors['data']
        
        return knowledge

Multi-Agent Medical Systems

Combine multiple AI agents for complex medical workflows:
class MedicalMultiAgentSystem:
    def __init__(self, omophub_client):
        self.omophub = omophub_client
        self.agents = {
            'entity_extractor': EntityExtractionAgent(),
            'concept_validator': ConceptValidationAgent(omophub_client),
            'clinical_reasoner': ClinicalReasoningAgent(),
            'safety_checker': SafetyCheckAgent()
        }
    
    async def process_clinical_case(self, case_description: str) -> Dict:
        """Process clinical case through multi-agent pipeline"""
        
        # Agent 1: Extract medical entities
        entities = await self.agents['entity_extractor'].extract(case_description)
        
        # Agent 2: Validate and standardize concepts
        validated = await self.agents['concept_validator'].validate(entities)
        
        # Agent 3: Generate clinical reasoning
        reasoning = await self.agents['clinical_reasoner'].reason(validated, case_description)
        
        # Agent 4: Safety check final output
        final_output = await self.agents['safety_checker'].check(reasoning)
        
        return {
            'extracted_entities': entities,
            'validated_concepts': validated,
            'clinical_reasoning': reasoning,
            'safety_checked_output': final_output,
            'agent_trace': self._get_agent_trace()
        }

Production Considerations

Monitoring and Observability

Healthcare AI requires comprehensive monitoring:
import logging
import json
import copy
from typing import Dict, Any, List, Optional
from dataclasses import dataclass, asdict
from datetime import datetime
import uuid

@dataclass
class HealthcareAIEvent:
    event_id: str
    timestamp: datetime
    user_id: str
    session_id: str
    operation: str
    input_data: Dict[str, Any]
    output_data: Dict[str, Any]
    performance_metrics: Dict[str, float]
    safety_flags: List[str]
    concept_validations: List[Dict]
    error_details: Optional[str] = None

class HealthcareAILogger:
    def __init__(self, log_level: str = "INFO"):
        self.logger = logging.getLogger("healthcare_ai")
        self.logger.setLevel(getattr(logging, log_level))
        
        # Configure structured logging for healthcare compliance
        handler = logging.StreamHandler()
        formatter = logging.Formatter(
            '%(asctime)s - %(name)s - %(levelname)s - %(message)s'
        )
        handler.setFormatter(formatter)
        self.logger.addHandler(handler)
    
    def log_ai_operation(self, event: HealthcareAIEvent):
        """Log AI operations with healthcare compliance requirements"""
        
        # Sanitize PHI from logs
        sanitized_event = self._sanitize_phi(event)
        
        log_entry = {
            'event_id': sanitized_event.event_id,
            'timestamp': sanitized_event.timestamp.isoformat(),
            'operation': sanitized_event.operation,
            'performance': sanitized_event.performance_metrics,
            'safety_flags': sanitized_event.safety_flags,
            'concept_count': len(sanitized_event.concept_validations),
            'error': sanitized_event.error_details is not None
        }
        
        if sanitized_event.error_details:
            self.logger.error(f"AI Operation Failed: {json.dumps(log_entry)}")
        else:
            self.logger.info(f"AI Operation Complete: {json.dumps(log_entry)}")
    
    def _sanitize_phi(self, event: HealthcareAIEvent) -> HealthcareAIEvent:
        """Remove PHI from log events"""
        # Implementation to remove/anonymize PHI
        sanitized = copy.deepcopy(event)
        # Remove potential PHI fields
        if 'patient_name' in sanitized.input_data:
            sanitized.input_data['patient_name'] = '[REDACTED]'
        return sanitized

class HealthcareAIMetrics:
    def __init__(self, metrics_backend: str = "prometheus"):
        self.metrics_backend = metrics_backend
        self.counters = {}
        self.histograms = {}
    
    def increment_counter(self, metric_name: str, labels: Dict[str, str] = None):
        """Increment counter metric"""
        key = f"{metric_name}:{json.dumps(labels or {}, sort_keys=True)}"
        self.counters[key] = self.counters.get(key, 0) + 1
    
    def record_duration(self, metric_name: str, duration: float, labels: Dict[str, str] = None):
        """Record duration metric"""
        key = f"{metric_name}:{json.dumps(labels or {}, sort_keys=True)}"
        if key not in self.histograms:
            self.histograms[key] = []
        self.histograms[key].append(duration)
    
    def get_metrics_summary(self) -> Dict[str, Any]:
        """Get current metrics summary"""
        return {
            'counters': self.counters,
            'histograms': {
                k: {
                    'count': len(v),
                    'avg': sum(v) / len(v) if v else 0,
                    'p95': sorted(v)[int(len(v) * 0.95)] if v else 0
                }
                for k, v in self.histograms.items()
            }
        }

# Example usage with monitoring
class MonitoredMedicalAI:
    def __init__(self, omophub_client):
        self.omophub = omophub_client
        self.logger = HealthcareAILogger()
        self.metrics = HealthcareAIMetrics()
    
    async def process_with_monitoring(self, user_input: str, user_id: str) -> Dict:
        event_id = str(uuid.uuid4())
        session_id = f"session_{int(datetime.now().timestamp())}"
        start_time = datetime.now()
        
        try:
            # Step 1: Sanitize input and check for PHI
            sanitizer = HealthcareDataSanitizer()
            sanitization_result = sanitizer.sanitize_clinical_text(user_input)
            
            # Step 2: Check if PHI was detected
            if sanitization_result.get('detected_phi'):
                # Log PHI detection
                self.logger.log_security_event({
                    'event_type': 'PHI_DETECTED',
                    'user_id': user_id,
                    'session_id': session_id,
                    'phi_types': list(sanitization_result['detected_phi'].keys())
                })
                
                # Return error - do not process PHI without BAA
                return {
                    'success': False,
                    'error': 'PHI detected in input. Please remove personal information or ensure BAA compliance.',
                    'event_id': event_id
                }
            
            # Step 3: Process sanitized input
            sanitized_input = sanitization_result['sanitized_text']
            self.logger.log_info(f"Input sanitized for processing: {event_id}")
            
            # Process the medical AI request with sanitized input
            result = await self._process_medical_query(sanitized_input)
            
            # Calculate performance metrics
            duration = (datetime.now() - start_time).total_seconds()
            
            # Create monitoring event
            event = HealthcareAIEvent(
                event_id=event_id,
                timestamp=start_time,
                user_id=user_id,
                session_id=session_id,
                operation="medical_query_processing",
                input_data={"query_length": len(user_input)},
                output_data={"response_length": len(str(result))},
                performance_metrics={"duration_seconds": duration},
                safety_flags=result.get('safety_warnings', []),
                concept_validations=result.get('validated_concepts', [])
            )
            
            # Log the event
            self.logger.log_ai_operation(event)
            
            # Record metrics
            self.metrics.increment_counter(
                "medical_ai_requests_total", 
                {"operation": "query", "status": "success"}
            )
            self.metrics.record_duration(
                "medical_ai_duration_seconds",
                duration,
                {"operation": "query"}
            )
            
            return result
            
        except Exception as e:
            duration = (datetime.now() - start_time).total_seconds()
            
            # Create error event
            event = HealthcareAIEvent(
                event_id=event_id,
                timestamp=start_time,
                user_id=user_id,
                session_id=session_id,
                operation="medical_query_processing",
                input_data={"query_length": len(user_input)},
                output_data={},
                performance_metrics={"duration_seconds": duration},
                safety_flags=["PROCESSING_ERROR"],
                concept_validations=[],
                error_details=str(e)
            )
            
            # Log error
            self.logger.log_ai_operation(event)
            
            # Record error metrics
            self.metrics.increment_counter(
                "medical_ai_requests_total", 
                {"operation": "query", "status": "error"}
            )
            
            raise
    
    async def _process_medical_query(self, query: str) -> Dict:
        """Process medical query with OMOPHub integration"""
        # Implementation here
        pass

Cost Optimization Strategies

Healthcare AI can be expensive; optimize costs without compromising quality:
class HealthcareAICostOptimizer:
    def __init__(self, omophub_client):
        self.omophub = omophub_client
        self.cost_tracking = {}
        
    async def optimize_llm_calls(self, query: str) -> Dict:
        """Optimize LLM usage through intelligent routing"""
        
        # 1. Check if query can be answered with vocabulary lookup only
        vocabulary_answer = await self._try_vocabulary_only_answer(query)
        if vocabulary_answer['confidence'] > 0.8:
            return {
                'answer': vocabulary_answer['response'],
                'cost_optimization': 'vocabulary_only',
                'estimated_savings': 0.95  # 95% cost saving vs full LLM
            }
        
        # 2. Use smaller model for simple queries
        complexity_score = self._assess_query_complexity(query)
        if complexity_score < 0.5:
            model = 'claude-3-haiku'  # Cheaper model
        else:
            model = 'claude-3-sonnet'  # More capable model
        
        # 3. Optimize context window usage
        optimized_context = await self._optimize_context_window(query)
        
        return {
            'selected_model': model,
            'context_optimization': optimized_context,
            'estimated_cost': self._estimate_cost(model, optimized_context)
        }
    
    def _assess_query_complexity(self, query: str) -> float:
        """Assess query complexity to choose appropriate model"""
        complexity_indicators = {
            'multi_step_reasoning': ['analyze', 'compare', 'evaluate', 'determine'],
            'medical_calculations': ['calculate', 'dose', 'dosage', 'mg/kg'],
            'differential_diagnosis': ['differential', 'diagnose', 'rule out'],
            'complex_relationships': ['interaction', 'contraindication', 'mechanism']
        }
        
        score = 0.0
        for category, keywords in complexity_indicators.items():
            if any(keyword in query.lower() for keyword in keywords):
                score += 0.25
                
        return min(score, 1.0)
    
    async def _try_vocabulary_only_answer(self, query: str) -> Dict:
        """Attempt to answer using only vocabulary lookups"""
        
        # Simple keyword extraction
        medical_terms = self._extract_simple_medical_terms(query)
        
        if len(medical_terms) == 1:
            # Single concept query - can often be answered with vocabulary
            concept_info = await self.omophub.search_concepts(
                query=medical_terms[0],
                page_size=1
            )
            
            if concept_info['data']:
                concept = concept_info['data'][0]
                response = f"{concept['concept_name']}: {concept.get('definition', 'No definition available')}"
                return {
                    'response': response,
                    'confidence': 0.9 if concept.get('definition') else 0.6
                }
        
        return {'response': '', 'confidence': 0.0}

Conclusion

Integrating AI and LLMs with healthcare vocabularies represents the future of intelligent medical applications. By combining the natural language understanding of models like GPT-4 and Claude with the structured, validated data from OMOPHub, developers can create applications that are both innovative and clinically sound. Key takeaways for successful healthcare AI integration:
  1. Always validate AI outputs with authoritative medical vocabularies
  2. Implement robust safety checks and include professional consultation disclaimers
  3. Handle PHI appropriately with proper anonymization and compliance measures
  4. Monitor performance and costs to ensure sustainable, effective operations
  5. Use structured prompts and evidence-based approaches for medical contexts
The examples in this guide provide production-ready patterns for building the next generation of healthcare AI applications that prioritize both innovation and patient safety.
Next Steps: Explore the FHIR Integration Guide to learn how to combine AI capabilities with healthcare interoperability standards, or review our Clinical Decision Support use case for more specific implementation patterns.
I