Skip to main content

Overview

Population health analytics involves aggregating and analyzing health data across large patient populations to identify trends, measure outcomes, and improve public health initiatives. This guide demonstrates how to use the OMOPHub API to standardize health data, leverage concept hierarchies, and generate meaningful population health insights.
Use Case: Analyze disease prevalence, quality measures, risk stratification, and health outcomes across patient populations using standardized medical vocabularies and hierarchical concept relationships.

Business Problem

Healthcare organizations face significant challenges in population health management:
  • Data Fragmentation: Health data exists in multiple formats and coding systems
  • Quality Reporting: CMS, HEDIS, and other quality measures require standardized coding
  • Risk Stratification: Identifying high-risk populations for intervention programs
  • Outcome Measurement: Tracking health outcomes and intervention effectiveness
  • Public Health Surveillance: Monitoring disease outbreaks and health trends

Solution Architecture

Implementation Guide

Step 1: Set Up Population Health Analytics Engine

from omophub import OMOPHubClient
from typing import List, Dict, Any, Optional, Tuple
from dataclasses import dataclass
from enum import Enum
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
import logging
from collections import defaultdict

class AnalysisScope(Enum):
    PRACTICE = "practice"
    HEALTH_SYSTEM = "health_system" 
    REGION = "region"
    STATE = "state"
    NATIONAL = "national"

class QualityMeasure(Enum):
    DIABETES_HBA1C = "diabetes_hba1c_control"
    HYPERTENSION_BP = "hypertension_bp_control"
    BREAST_CANCER_SCREENING = "breast_cancer_screening"
    COLORECTAL_SCREENING = "colorectal_cancer_screening"
    CHOLESTEROL_MANAGEMENT = "cholesterol_management"

@dataclass
class PopulationCohort:
    cohort_id: str
    name: str
    description: str
    inclusion_criteria: List[str]  # SNOMED concept IDs
    exclusion_criteria: List[str]
    age_range: Optional[Tuple[int, int]]
    gender_filter: Optional[str]

@dataclass
class HealthOutcome:
    outcome_id: str
    name: str
    measure_type: str  # prevalence, incidence, mortality, etc.
    concept_codes: List[str]
    vocabularies: List[str]
    numerator_criteria: Dict[str, Any]
    denominator_criteria: Dict[str, Any]

class PopulationHealthAnalytics:
    def __init__(self, api_key: str):
        self.client = OMOPHubClient(api_key=api_key)
        self.logger = logging.getLogger(__name__)
        
        # Cache for concept hierarchies
        self.hierarchy_cache = {}
        
        # Pre-defined quality measures
        self.quality_measures = {
            QualityMeasure.DIABETES_HBA1C: {
                'name': 'Diabetes HbA1c Control',
                'denominator_concepts': ['44054006'],  # Type 2 diabetes mellitus
                'numerator_concepts': ['2345-7'],      # HbA1c LOINC
                'target_value': '<7.0',
                'measure_period': 365  # days
            },
            QualityMeasure.HYPERTENSION_BP: {
                'name': 'Hypertension Blood Pressure Control',
                'denominator_concepts': ['38341003'],  # Essential hypertension  
                'numerator_concepts': ['8480-6', '8462-4'],  # Systolic/Diastolic BP
                'target_value': '<140/90',
                'measure_period': 365
            }
        }
    
    def analyze_disease_prevalence(self, patient_data: List[Dict[str, Any]], 
                                 disease_categories: List[str],
                                 analysis_scope: AnalysisScope) -> Dict[str, Any]:
        """Analyze disease prevalence using concept hierarchies"""
        
        # Standardize condition codes to SNOMED
        standardized_conditions = self.standardize_conditions(patient_data)
        
        # Build disease category hierarchies
        disease_hierarchies = {}
        for category in disease_categories:
            hierarchy = self.build_disease_hierarchy(category)
            disease_hierarchies[category] = hierarchy
        
        # Calculate prevalence for each category
        prevalence_results = {}
        total_patients = len(patient_data)
        
        for category, hierarchy in disease_hierarchies.items():
            # Count patients with conditions in this hierarchy
            patients_with_condition = set()
            
            for patient in standardized_conditions:
                patient_conditions = set(patient.get('condition_codes', []))
                
                # Check if any patient condition is in the hierarchy
                if hierarchy['all_concepts'].intersection(patient_conditions):
                    patients_with_condition.add(patient['patient_id'])
            
            prevalence_count = len(patients_with_condition)
            prevalence_rate = (prevalence_count / total_patients) * 100 if total_patients > 0 else 0
            
            # Age and gender stratification
            age_stratification = self.calculate_age_stratified_prevalence(
                patient_data, patients_with_condition
            )
            
            gender_stratification = self.calculate_gender_stratified_prevalence(
                patient_data, patients_with_condition  
            )
            
            prevalence_results[category] = {
                'disease_name': hierarchy['root_concept_name'],
                'total_patients': total_patients,
                'affected_patients': prevalence_count,
                'prevalence_rate': prevalence_rate,
                'confidence_interval': self.calculate_confidence_interval(prevalence_count, total_patients),
                'age_stratification': age_stratification,
                'gender_stratification': gender_stratification,
                'subcategory_breakdown': self.analyze_subcategories(
                    standardized_conditions, hierarchy, patients_with_condition
                )
            }
        
        # Generate comparative analysis
        comparative_analysis = self.generate_comparative_analysis(prevalence_results, analysis_scope)
        
        return {
            'analysis_scope': analysis_scope.value,
            'total_population': total_patients,
            'analysis_date': datetime.now().isoformat(),
            'disease_prevalence': prevalence_results,
            'comparative_analysis': comparative_analysis,
            'methodology': {
                'vocabulary_standard': 'SNOMED CT',
                'hierarchy_based_classification': True,
                'confidence_level': '95%'
            }
        }
    
    def build_disease_hierarchy(self, root_concept_code: str) -> Dict[str, Any]:
        """Build complete disease hierarchy from root concept"""
        
        # Check cache first
        if root_concept_code in self.hierarchy_cache:
            return self.hierarchy_cache[root_concept_code]
        
        try:
            # Get root concept information
            root_concept = self.client.get_concept_by_code("SNOMED", root_concept_code)
            if not root_concept:
                raise ValueError(f"Root concept {root_concept_code} not found")
            
            # Get all descendants
            descendants = self.client.get_concept_descendants(
                root_concept["concept_id"],
                max_levels=10,  # Deep hierarchy traversal
                vocabulary_ids=["SNOMED"]
            )
            
            # Build concept set using concept_code for consistency with patient data
            all_concepts = {root_concept["concept_code"]}
            concept_details = {root_concept["concept_code"]: root_concept}
            
            for descendant in descendants.get("descendants", []):
                all_concepts.add(descendant["concept_code"])
                concept_details[descendant["concept_code"]] = descendant
            
            # Organize by hierarchy levels
            hierarchy_levels = self.organize_by_hierarchy_levels(root_concept, descendants)
            
            hierarchy = {
                'root_concept_id': root_concept["concept_id"],
                'root_concept_name': root_concept["concept_name"],
                'all_concepts': all_concepts,
                'concept_details': concept_details,
                'hierarchy_levels': hierarchy_levels,
                'total_concepts': len(all_concepts)
            }
            
            # Cache the result
            self.hierarchy_cache[root_concept_code] = hierarchy
            
            return hierarchy
            
        except Exception as e:
            self.logger.error(f"Error building disease hierarchy for {root_concept_code}: {e}")
            return {
                'root_concept_id': None,
                'root_concept_name': f"Error: {root_concept_code}",
                'all_concepts': set(),
                'concept_details': {},
                'hierarchy_levels': {},
                'total_concepts': 0
            }
    
    def standardize_conditions(self, patient_data: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
        """Standardize patient condition codes to SNOMED"""
        
        standardized_data = []
        
        for patient in patient_data:
            standardized_patient = {
                'patient_id': patient.get('patient_id'),
                'age': patient.get('age'),
                'gender': patient.get('gender'),
                'condition_codes': []
            }
            
            # Process each condition
            for condition in patient.get('conditions', []):
                original_code = condition.get('code')
                original_vocab = condition.get('vocabulary', 'ICD10CM')
                
                # Convert to SNOMED if needed
                snomed_code = self.map_to_snomed(original_code, original_vocab)
                if snomed_code:
                    standardized_patient['condition_codes'].append(snomed_code)
            
            standardized_data.append(standardized_patient)
        
        return standardized_data
    
    def map_to_snomed(self, code: str, vocabulary: str) -> Optional[str]:
        """Map a code from any vocabulary to SNOMED"""
        
        try:
            if vocabulary == "SNOMED":
                return code
            
            # Get source concept
            source_concept = self.client.get_concept_by_code(vocabulary, code)
            if not source_concept:
                return None
            
            # Get mappings to SNOMED
            mappings = self.client.get_concept_mappings(
                source_concept["concept_id"],
                target_vocabularies=["SNOMED"]
            )
            
            # Find best SNOMED mapping
            snomed_mappings = [
                m for m in mappings.get("mappings", [])
                if m["target_vocabulary_id"] == "SNOMED"
            ]
            
            if snomed_mappings:
                # Return the first mapping (could be enhanced with confidence scoring)
                return snomed_mappings[0]["target_concept_code"]
            
            return None
            
        except Exception as e:
            self.logger.error(f"Error mapping {code} from {vocabulary} to SNOMED: {e}")
            return None
    
    def calculate_age_stratified_prevalence(self, patient_data: List[Dict[str, Any]], 
                                          affected_patients: set) -> Dict[str, Dict[str, float]]:
        """Calculate prevalence by age groups"""
        
        age_groups = {
            '0-17': (0, 17),
            '18-34': (18, 34), 
            '35-49': (35, 49),
            '50-64': (50, 64),
            '65-79': (65, 79),
            '80+': (80, 150)
        }
        
        age_stratification = {}
        
        for age_group, (min_age, max_age) in age_groups.items():
            # Count patients in age group
            patients_in_group = [
                p for p in patient_data 
                if min_age <= p.get('age', 0) <= max_age
            ]
            
            # Count affected patients in age group  
            affected_in_group = [
                p for p in patients_in_group
                if p.get('patient_id') in affected_patients
            ]
            
            total_in_group = len(patients_in_group)
            affected_count = len(affected_in_group)
            
            prevalence_rate = (affected_count / total_in_group * 100) if total_in_group > 0 else 0
            
            age_stratification[age_group] = {
                'total_patients': total_in_group,
                'affected_patients': affected_count,
                'prevalence_rate': prevalence_rate,
                'confidence_interval': self.calculate_confidence_interval(affected_count, total_in_group)
            }
        
        return age_stratification
    
    def calculate_gender_stratified_prevalence(self, patient_data: List[Dict[str, Any]], 
                                             affected_patients: set) -> Dict[str, Dict[str, float]]:
        """Calculate prevalence by gender"""
        
        gender_stratification = {}
        
        for gender in ['M', 'F', 'Other']:
            # Count patients of this gender
            patients_of_gender = [
                p for p in patient_data
                if p.get('gender') == gender
            ]
            
            # Count affected patients of this gender
            affected_of_gender = [
                p for p in patients_of_gender
                if p.get('patient_id') in affected_patients
            ]
            
            total_of_gender = len(patients_of_gender)
            affected_count = len(affected_of_gender)
            
            prevalence_rate = (affected_count / total_of_gender * 100) if total_of_gender > 0 else 0
            
            gender_stratification[gender] = {
                'total_patients': total_of_gender,
                'affected_patients': affected_count,
                'prevalence_rate': prevalence_rate,
                'confidence_interval': self.calculate_confidence_interval(affected_count, total_of_gender)
            }
        
        return gender_stratification
    
    def calculate_confidence_interval(self, numerator: int, denominator: int, 
                                    confidence_level: float = 0.95) -> Tuple[float, float]:
        """Calculate confidence interval for prevalence rate"""
        
        if denominator == 0:
            return (0.0, 0.0)
        
        p = numerator / denominator
        
        # Wilson score interval (better for small samples)
        z = 1.96 if confidence_level == 0.95 else 2.576  # 99% CI
        n = denominator
        
        center = (p + z**2 / (2*n)) / (1 + z**2 / n)
        margin = z * np.sqrt(p*(1-p)/n + z**2/(4*n**2)) / (1 + z**2/n)
        
        lower = max(0, center - margin) * 100
        upper = min(100, center + margin) * 100
        
        return (round(lower, 2), round(upper, 2))
    
    def organize_by_hierarchy_levels(self, root_concept, descendants):
        """Stub implementation for organizing concepts by hierarchy levels"""
        return {}
    
    def analyze_subcategories(self, standardized_conditions, hierarchy, patients_with_condition):
        """Stub implementation for analyzing subcategories"""
        return {}
    
    def generate_comparative_analysis(self, prevalence_results, analysis_scope):
        """Stub implementation for generating comparative analysis
        
        Returns:
            List[str]: List of analysis items/insights for iteration
        """
        return []
    
    def generate_quality_summary(self, measure_results):
        """Stub implementation for generating quality summary"""
        return {}

Step 2: Quality Measure Calculation

def calculate_quality_measures(self, patient_data: List[Dict[str, Any]], 
                             measure_types: List[QualityMeasure],
                             measurement_period_days: int = 365) -> Dict[str, Any]:
    """Calculate healthcare quality measures"""
    
    measure_results = {}
    
    for measure_type in measure_types:
        try:
            measure_config = self.quality_measures[measure_type]
            result = self.calculate_single_quality_measure(
                patient_data, measure_config, measurement_period_days
            )
            measure_results[measure_type.value] = result
        except Exception as e:
            self.logger.error(f"Error calculating quality measure {measure_type.value}: {e}")
            measure_results[measure_type.value] = {
                'error': str(e),
                'measure_rate': 0.0,
                'numerator': 0,
                'denominator': 0
            }
    
    # Calculate composite scores
    composite_scores = self.calculate_composite_quality_scores(measure_results)
    
    return {
        'measurement_period_days': measurement_period_days,
        'calculated_at': datetime.now().isoformat(),
        'individual_measures': measure_results,
        'composite_scores': composite_scores,
        'quality_summary': self.generate_quality_summary(measure_results)
    }

def calculate_single_quality_measure(self, patient_data: List[Dict[str, Any]], 
                                   measure_config: Dict[str, Any],
                                   measurement_period_days: int) -> Dict[str, Any]:
    """Calculate a single quality measure"""
    
    # Find denominator population (patients with condition)
    denominator_patients = set()
    denominator_concepts = set(measure_config['denominator_concepts'])
    
    for patient in patient_data:
        patient_conditions = set()
        
        # Collect all condition codes for patient
        for condition in patient.get('conditions', []):
            snomed_code = self.map_to_snomed(
                condition.get('code', ''),
                condition.get('vocabulary', 'ICD10CM')
            )
            if snomed_code:
                patient_conditions.add(snomed_code)
        
        # Check if patient has denominator condition
        if denominator_concepts.intersection(patient_conditions):
            denominator_patients.add(patient['patient_id'])
    
    # Find numerator population (patients meeting quality target)
    numerator_patients = set()
    cutoff_date = datetime.now() - timedelta(days=measurement_period_days)
    
    for patient in patient_data:
        if patient['patient_id'] not in denominator_patients:
            continue
            
        # Check if patient meets numerator criteria
        if self.patient_meets_quality_target(patient, measure_config, cutoff_date):
            numerator_patients.add(patient['patient_id'])
    
    # Calculate measure rate
    denominator = len(denominator_patients)
    numerator = len(numerator_patients)
    measure_rate = (numerator / denominator * 100) if denominator > 0 else 0
    
    # Calculate confidence interval
    confidence_interval = self.calculate_confidence_interval(numerator, denominator)
    
    # Identify improvement opportunities
    improvement_opportunities = self.identify_improvement_opportunities(
        patient_data, denominator_patients, numerator_patients, measure_config
    )
    
    return {
        'measure_name': measure_config['name'],
        'measure_rate': round(measure_rate, 2),
        'numerator': numerator,
        'denominator': denominator,
        'confidence_interval': confidence_interval,
        'target_value': measure_config['target_value'],
        'measurement_period': f"{measurement_period_days} days",
        'improvement_opportunities': improvement_opportunities,
        'performance_category': self.categorize_performance(measure_rate)
    }

def patient_meets_quality_target(self, patient: Dict[str, Any], 
                               measure_config: Dict[str, Any],
                               cutoff_date: datetime) -> bool:
    """Check if patient meets quality measure target"""
    
    # Get recent lab results/vital signs within measurement period
    recent_results = []
    
    for lab in patient.get('lab_results', []):
        result_date = datetime.fromisoformat(lab.get('date', '1900-01-01'))
        
        if result_date >= cutoff_date:
            # Check if this lab matches numerator concepts
            if lab.get('loinc_code') in measure_config['numerator_concepts']:
                recent_results.append(lab)
    
    for vital in patient.get('vitals', []):
        result_date = datetime.fromisoformat(vital.get('date', '1900-01-01'))
        
        if result_date >= cutoff_date:
            # Check if this vital matches numerator concepts
            if vital.get('loinc_code') in measure_config['numerator_concepts']:
                recent_results.append(vital)
    
    if not recent_results:
        return False
    
    # Use most recent result
    recent_results.sort(key=lambda x: x.get('date', ''), reverse=True)
    most_recent = recent_results[0]
    
    # Check if meets target
    return self.check_target_achievement(most_recent, measure_config['target_value'])

def check_target_achievement(self, result: Dict[str, Any], target_value: str) -> bool:
    """Check if a result meets the target value"""
    
    try:
        result_value = float(result.get('value', 0))
        
        # Parse target value (e.g., "<7.0", "<140/90")
        if target_value.startswith('<'):
            target = float(target_value[1:])
            return result_value < target
        elif target_value.startswith('>'):
            target = float(target_value[1:])
            return result_value > target
        elif target_value.startswith('≤'):
            target = float(target_value[1:])
            return result_value <= target
        elif target_value.startswith('≥'):
            target = float(target_value[1:])
            return result_value >= target
        elif '/' in target_value:
            # Blood pressure format (e.g., "<140/90")
            if target_value.startswith('<'):
                systolic_target, diastolic_target = target_value[1:].split('/')
                # This is simplified - would need both systolic and diastolic values
                return result_value < float(systolic_target)
        else:
            # Direct comparison
            return result_value == float(target_value)
            
    except (ValueError, IndexError) as e:
        self.logger.warning(f"Unable to parse target value {target_value}: {e}")
        return False
    
    return False

def identify_improvement_opportunities(self, patient_data: List[Dict[str, Any]], 
                                     denominator_patients: set,
                                     numerator_patients: set,
                                     measure_config: Dict[str, Any]) -> List[str]:
    """Identify opportunities to improve quality measure performance"""
    
    opportunities = []
    
    # Patients who didn't meet target
    gap_patients = denominator_patients - numerator_patients
    gap_rate = len(gap_patients) / len(denominator_patients) * 100 if denominator_patients else 0
    
    if gap_rate > 20:
        opportunities.append(f"{gap_rate:.1f}% of eligible patients not meeting target - focus on care gaps")
    
    # Check for missing recent measurements
    missing_recent_tests = 0
    cutoff_date = datetime.now() - timedelta(days=365)
    
    for patient in patient_data:
        if patient['patient_id'] in denominator_patients:
            has_recent_test = False
            
            # Check lab results
            for lab in patient.get('lab_results', []):
                result_date = datetime.fromisoformat(lab.get('date', '1900-01-01'))
                if (result_date >= cutoff_date and 
                    lab.get('loinc_code') in measure_config['numerator_concepts']):
                    has_recent_test = True
                    break
            
            # Also check vitals if no recent lab test found
            if not has_recent_test:
                for vital in patient.get('vitals', []):
                    result_date = datetime.fromisoformat(vital.get('date', '1900-01-01'))
                    if (result_date >= cutoff_date and 
                        vital.get('loinc_code') in measure_config.get('vital_concepts', [])):
                        has_recent_test = True
                        break
            
            if not has_recent_test:
                missing_recent_tests += 1
    
    missing_rate = missing_recent_tests / len(denominator_patients) * 100 if denominator_patients else 0
    
    if missing_rate > 15:
        opportunities.append(f"{missing_rate:.1f}% of patients missing recent tests - improve monitoring")
    
    # Age-specific opportunities
    elderly_gap_patients = []
    for patient in patient_data:
        if (patient['patient_id'] in gap_patients and 
            patient.get('age', 0) >= 65):
            elderly_gap_patients.append(patient)
    
    if len(elderly_gap_patients) > len(gap_patients) * 0.4:
        opportunities.append("Higher gap rate in elderly population - consider age-specific interventions")
    
    return opportunities

def calculate_composite_quality_scores(self, measure_results: Dict[str, Any]) -> Dict[str, Any]:
    """Calculate composite quality scores"""
    
    valid_measures = {k: v for k, v in measure_results.items() if 'error' not in v}
    
    if not valid_measures:
        return {'overall_score': 0.0, 'grade': 'F'}
    
    # Simple average of all measures
    total_score = sum(measure['measure_rate'] for measure in valid_measures.values())
    overall_score = total_score / len(valid_measures)
    
    # Weighted composite (diabetes and hypertension weighted higher)
    weights = {
        'diabetes_hba1c_control': 1.5,
        'hypertension_bp_control': 1.5,
        'breast_cancer_screening': 1.0,
        'colorectal_cancer_screening': 1.0,
        'cholesterol_management': 1.0
    }
    
    weighted_sum = 0
    weight_total = 0
    
    for measure_id, result in valid_measures.items():
        weight = weights.get(measure_id, 1.0)
        weighted_sum += result['measure_rate'] * weight
        weight_total += weight
    
    weighted_score = weighted_sum / weight_total if weight_total > 0 else 0
    
    # Assign grade
    grade = self.assign_quality_grade(overall_score)
    
    return {
        'overall_score': round(overall_score, 1),
        'weighted_score': round(weighted_score, 1),
        'grade': grade,
        'total_measures': len(valid_measures),
        'measures_above_benchmark': len([
            m for m in valid_measures.values() if m['measure_rate'] >= 70
        ])
    }

def assign_quality_grade(self, score: float) -> str:
    """Assign quality grade based on score"""
    if score >= 90:
        return 'A+'
    elif score >= 85:
        return 'A'
    elif score >= 80:
        return 'B+'
    elif score >= 75:
        return 'B'
    elif score >= 70:
        return 'C+'
    elif score >= 65:
        return 'C'
    elif score >= 60:
        return 'D'
    else:
        return 'F'

def categorize_performance(self, measure_rate: float) -> str:
    """Categorize performance level"""
    if measure_rate >= 90:
        return 'Excellent'
    elif measure_rate >= 80:
        return 'Good'
    elif measure_rate >= 70:
        return 'Satisfactory'
    elif measure_rate >= 60:
        return 'Needs Improvement'
    else:
        return 'Poor'

Step 3: Risk Stratification and Predictive Analytics

def perform_risk_stratification(self, patient_data: List[Dict[str, Any]], 
                              risk_models: List[str]) -> Dict[str, Any]:
    """Perform population risk stratification using multiple models"""
    
    risk_results = {
        'total_patients': len(patient_data),
        'stratification_models': {},
        'high_risk_patients': [],
        'composite_risk_scores': {},
        'intervention_recommendations': []
    }
    
    # Apply different risk models
    for model_name in risk_models:
        if model_name == 'cardiovascular_risk':
            model_results = self.calculate_cardiovascular_risk(patient_data)
        elif model_name == 'diabetes_complications':
            model_results = self.calculate_diabetes_complication_risk(patient_data)
        elif model_name == 'hospital_readmission':
            model_results = self.calculate_readmission_risk(patient_data)
        elif model_name == 'medication_adherence':
            model_results = self.assess_medication_adherence_risk(patient_data)
        else:
            continue
            
        risk_results['stratification_models'][model_name] = model_results
    
    # Create composite risk scores
    composite_scores = self.create_composite_risk_scores(
        patient_data, risk_results['stratification_models']
    )
    risk_results['composite_risk_scores'] = composite_scores
    
    # Identify high-risk patients across all models
    high_risk_patients = self.identify_high_risk_patients(composite_scores)
    risk_results['high_risk_patients'] = high_risk_patients
    
    # Generate intervention recommendations
    interventions = self.generate_intervention_recommendations(
        high_risk_patients, risk_results['stratification_models']
    )
    risk_results['intervention_recommendations'] = interventions
    
    return risk_results

def calculate_cardiovascular_risk(self, patient_data: List[Dict[str, Any]]) -> Dict[str, Any]:
    """Calculate cardiovascular disease risk using clinical factors"""
    
    risk_categories = {'low': [], 'moderate': [], 'high': [], 'very_high': []}
    risk_factors_analysis = {
        'hypertension_prevalence': 0,
        'diabetes_prevalence': 0,
        'smoking_prevalence': 0,
        'high_cholesterol_prevalence': 0,
        'multiple_risk_factors': 0
    }
    
    for patient in patient_data:
        risk_score = 0
        risk_factors = []
        
        # Age factor
        age = patient.get('age', 0)
        if age >= 65:
            risk_score += 2
            risk_factors.append('advanced_age')
        elif age >= 45:
            risk_score += 1
        
        # Gender factor
        if patient.get('gender') == 'M' and age >= 45:
            risk_score += 1
            risk_factors.append('male_gender')
        elif patient.get('gender') == 'F' and age >= 55:
            risk_score += 1
        
        # Check for cardiovascular risk conditions
        patient_conditions = [c.get('code') for c in patient.get('conditions', [])]
        
        # Hypertension
        hypertension_codes = ['38341003', 'I10']  # SNOMED and ICD10
        if any(code in patient_conditions for code in hypertension_codes):
            risk_score += 2
            risk_factors.append('hypertension')
            risk_factors_analysis['hypertension_prevalence'] += 1
        
        # Diabetes
        diabetes_codes = ['44054006', 'E11.9']  # SNOMED and ICD10
        if any(code in patient_conditions for code in diabetes_codes):
            risk_score += 2
            risk_factors.append('diabetes')
            risk_factors_analysis['diabetes_prevalence'] += 1
        
        # Check lab values for additional risk factors
        recent_labs = self.get_recent_lab_values(patient, days=365)
        
        # High cholesterol (>200 mg/dL)
        cholesterol_result = recent_labs.get('2093-3')  # Total cholesterol LOINC
        if cholesterol_result and float(cholesterol_result.get('value', 0)) > 200:
            risk_score += 1
            risk_factors.append('high_cholesterol')
            risk_factors_analysis['high_cholesterol_prevalence'] += 1
        
        # Smoking status (if available in patient data)
        if patient.get('smoking_status') == 'current':
            risk_score += 2
            risk_factors.append('smoking')
            risk_factors_analysis['smoking_prevalence'] += 1
        
        # Count multiple risk factors
        if len(risk_factors) >= 3:
            risk_factors_analysis['multiple_risk_factors'] += 1
        
        # Categorize risk level
        if risk_score >= 7:
            risk_categories['very_high'].append({
                'patient_id': patient['patient_id'],
                'risk_score': risk_score,
                'risk_factors': risk_factors,
                'age': age,
                'gender': patient.get('gender', 'Unknown')
            })
        elif risk_score >= 5:
            risk_categories['high'].append({
                'patient_id': patient['patient_id'],
                'risk_score': risk_score,
                'risk_factors': risk_factors,
                'age': age,
                'gender': patient.get('gender', 'Unknown')
            })
        elif risk_score >= 3:
            risk_categories['moderate'].append({
                'patient_id': patient['patient_id'],
                'risk_score': risk_score,
                'risk_factors': risk_factors,
                'age': age,
                'gender': patient.get('gender', 'Unknown')
            })
        else:
            risk_categories['low'].append({
                'patient_id': patient['patient_id'],
                'risk_score': risk_score,
                'risk_factors': risk_factors,
                'age': age,
                'gender': patient.get('gender', 'Unknown')
            })
    
    # Calculate prevalence rates
    total_patients = len(patient_data)
    for factor in risk_factors_analysis:
        risk_factors_analysis[factor] = (
            risk_factors_analysis[factor] / total_patients * 100 
            if total_patients > 0 else 0
        )
    
    return {
        'model_name': 'Cardiovascular Risk Assessment',
        'risk_categories': risk_categories,
        'population_summary': {
            'total_patients': total_patients,
            'low_risk': len(risk_categories['low']),
            'moderate_risk': len(risk_categories['moderate']),
            'high_risk': len(risk_categories['high']),
            'very_high_risk': len(risk_categories['very_high'])
        },
        'risk_factors_analysis': risk_factors_analysis,
        'high_risk_percentage': (
            (len(risk_categories['high']) + len(risk_categories['very_high'])) / 
            total_patients * 100 if total_patients > 0 else 0
        )
    }

def get_recent_lab_values(self, patient: Dict[str, Any], days: int = 365) -> Dict[str, Dict[str, Any]]:
    """Get most recent lab values within specified time period"""
    
    cutoff_date = datetime.now() - timedelta(days=days)
    recent_labs = {}
    
    for lab in patient.get('lab_results', []):
        lab_date = datetime.fromisoformat(lab.get('date', '1900-01-01'))
        loinc_code = lab.get('loinc_code')
        
        if lab_date >= cutoff_date and loinc_code:
            # Keep most recent result for each LOINC code
            if (loinc_code not in recent_labs or 
                lab_date > datetime.fromisoformat(recent_labs[loinc_code].get('date', '1900-01-01'))):
                recent_labs[loinc_code] = lab
    
    return recent_labs

def create_composite_risk_scores(self, patient_data: List[Dict[str, Any]], 
                               risk_models: Dict[str, Any]) -> Dict[str, Dict[str, Any]]:
    """Create composite risk scores combining multiple models"""
    
    composite_scores = {}
    
    for patient in patient_data:
        patient_id = patient['patient_id']
        patient_scores = {}
        total_weighted_score = 0
        total_weight = 0
        
        # Collect scores from each model
        for model_name, model_results in risk_models.items():
            patient_risk_data = self.find_patient_in_risk_model(patient_id, model_results)
            
            if patient_risk_data:
                risk_level = patient_risk_data.get('risk_level', 'low')
                
                # Convert risk level to numeric score
                risk_score = {
                    'low': 1,
                    'moderate': 2, 
                    'high': 3,
                    'very_high': 4
                }.get(risk_level, 1)
                
                # Weight different models
                model_weight = {
                    'cardiovascular_risk': 1.5,
                    'diabetes_complications': 1.3,
                    'hospital_readmission': 1.2,
                    'medication_adherence': 1.0
                }.get(model_name, 1.0)
                
                patient_scores[model_name] = {
                    'risk_score': risk_score,
                    'risk_level': risk_level,
                    'weight': model_weight
                }
                
                total_weighted_score += risk_score * model_weight
                total_weight += model_weight
        
        # Calculate composite score
        composite_score = total_weighted_score / total_weight if total_weight > 0 else 0
        
        # Determine overall risk level
        if composite_score >= 3.5:
            overall_risk = 'very_high'
        elif composite_score >= 2.5:
            overall_risk = 'high'
        elif composite_score >= 1.5:
            overall_risk = 'moderate'
        else:
            overall_risk = 'low'
        
        composite_scores[patient_id] = {
            'composite_score': round(composite_score, 2),
            'overall_risk_level': overall_risk,
            'individual_model_scores': patient_scores,
            'priority_rank': self.calculate_priority_rank(composite_score, patient)
        }
    
    return composite_scores

def identify_high_risk_patients(self, composite_scores: Dict[str, Dict[str, Any]]) -> List[Dict[str, Any]]:
    """Identify patients requiring immediate attention"""
    
    high_risk_patients = []
    
    for patient_id, scores in composite_scores.items():
        if scores['overall_risk_level'] in ['high', 'very_high']:
            high_risk_patients.append({
                'patient_id': patient_id,
                'composite_score': scores['composite_score'],
                'risk_level': scores['overall_risk_level'],
                'priority_rank': scores['priority_rank'],
                'contributing_factors': list(scores['individual_model_scores'].keys())
            })
    
    # Sort by priority rank (highest first)
    high_risk_patients.sort(key=lambda x: x['priority_rank'], reverse=True)
    
    return high_risk_patients

def generate_intervention_recommendations(self, high_risk_patients: List[Dict[str, Any]], 
                                        risk_models: Dict[str, Any]) -> List[Dict[str, Any]]:
    """Generate targeted intervention recommendations"""
    
    recommendations = []
    
    # Population-level recommendations
    if len(high_risk_patients) > 0:
        high_risk_rate = len(high_risk_patients) / len(risk_models.get('cardiovascular_risk', {}).get('population_summary', {}).get('total_patients', 1)) * 100
        
        if high_risk_rate > 20:
            recommendations.append({
                'type': 'population_level',
                'priority': 'high',
                'intervention': 'Comprehensive Population Health Program',
                'description': f'{high_risk_rate:.1f}% of population is high-risk - implement systematic screening and intervention programs',
                'target_population': 'All patients',
                'estimated_impact': 'Reduce high-risk population by 15-25%'
            })
    
    # Risk-specific interventions
    cv_risk_data = risk_models.get('cardiovascular_risk', {})
    if cv_risk_data:
        cv_high_risk = len(cv_risk_data.get('risk_categories', {}).get('high', [])) + len(cv_risk_data.get('risk_categories', {}).get('very_high', []))
        
        if cv_high_risk > 0:
            recommendations.append({
                'type': 'cardiovascular_focused',
                'priority': 'high',
                'intervention': 'Cardiovascular Risk Management Program',
                'description': f'{cv_high_risk} patients at high cardiovascular risk need intensive management',
                'target_population': 'High CV risk patients',
                'components': [
                    'Statin therapy optimization',
                    'Blood pressure management',
                    'Diabetes control enhancement',
                    'Lifestyle modification counseling'
                ],
                'estimated_impact': 'Reduce cardiovascular events by 20-30%'
            })
    
    # Individual patient recommendations (top 10 highest risk)
    for patient in high_risk_patients[:10]:
        patient_recommendations = []
        
        for model_name in patient['contributing_factors']:
            if model_name == 'cardiovascular_risk':
                patient_recommendations.extend([
                    'Cardiology referral within 30 days',
                    'Lipid panel and blood pressure monitoring',
                    'Medication adherence counseling'
                ])
            elif model_name == 'diabetes_complications':
                patient_recommendations.extend([
                    'Endocrinology consultation',
                    'Diabetic eye exam',
                    'Foot care assessment'
                ])
            elif model_name == 'hospital_readmission':
                patient_recommendations.extend([
                    'Care transition planning',
                    'Medication reconciliation',
                    'Follow-up appointment within 7 days'
                ])
        
        recommendations.append({
            'type': 'individual_patient',
            'patient_id': patient['patient_id'],
            'priority': patient['risk_level'],
            'intervention': 'Personalized Care Plan',
            'recommendations': list(set(patient_recommendations)),  # Remove duplicates
            'urgency': 'Within 2 weeks' if patient['risk_level'] == 'very_high' else 'Within 4 weeks'
        })
    
    return recommendations

Example Implementation

Sample Population Health Analysis

# Initialize the analytics engine
analytics = PopulationHealthAnalytics("your_api_key")

# Sample patient population (1000+ patients)
sample_population = [
    {
        'patient_id': 'PT001',
        'age': 65,
        'gender': 'M',
        'conditions': [
            {'code': '44054006', 'name': 'Type 2 diabetes mellitus', 'vocabulary': 'SNOMED'},
            {'code': '38341003', 'name': 'Essential hypertension', 'vocabulary': 'SNOMED'}
        ],
        'lab_results': [
            {'loinc_code': '2345-7', 'value': '7.2', 'unit': '%', 'date': '2023-06-15'},
            {'loinc_code': '2093-3', 'value': '220', 'unit': 'mg/dL', 'date': '2023-06-15'}
        ],
        'smoking_status': 'former'
    },
    {
        'patient_id': 'PT002',
        'age': 45,
        'gender': 'F',
        'conditions': [
            {'code': '254837009', 'name': 'Malignant neoplasm of breast', 'vocabulary': 'SNOMED'}
        ],
        'lab_results': [
            {'loinc_code': '2093-3', 'value': '180', 'unit': 'mg/dL', 'date': '2023-07-01'}
        ],
        'smoking_status': 'never'
    }
    # ... more patients
]

# Analyze disease prevalence
disease_categories = [
    '44054006',  # Type 2 diabetes mellitus
    '38341003',  # Essential hypertension
    '363346000', # Malignant neoplastic disease
    '195967001', # Asthma
    '13644009'   # Hypercholesterolemia
]

prevalence_analysis = analytics.analyze_disease_prevalence(
    sample_population, 
    disease_categories,
    AnalysisScope.HEALTH_SYSTEM
)

# Calculate quality measures
quality_measures = [
    QualityMeasure.DIABETES_HBA1C,
    QualityMeasure.HYPERTENSION_BP
]

quality_results = analytics.calculate_quality_measures(
    sample_population,
    quality_measures,
    measurement_period_days=365
)

# Perform risk stratification
risk_models = [
    'cardiovascular_risk',
    'diabetes_complications',
    'hospital_readmission'
]

risk_analysis = analytics.perform_risk_stratification(
    sample_population,
    risk_models
)

# Generate comprehensive population health report
print("=== POPULATION HEALTH ANALYTICS REPORT ===")
print(f"Analysis Scope: {prevalence_analysis['analysis_scope']}")
print(f"Population Size: {prevalence_analysis['total_population']:,}")
print(f"Analysis Date: {prevalence_analysis['analysis_date']}")

print("\n=== DISEASE PREVALENCE ANALYSIS ===")
for disease, data in prevalence_analysis['disease_prevalence'].items():
    print(f"\n{data['disease_name']}:")
    print(f"  Prevalence Rate: {data['prevalence_rate']:.1f}%")
    print(f"  Affected Patients: {data['affected_patients']:,}")
    print(f"  Confidence Interval: {data['confidence_interval'][0]:.1f}% - {data['confidence_interval'][1]:.1f}%")
    
    # Age stratification
    print("  Age Breakdown:")
    for age_group, stats in data['age_stratification'].items():
        print(f"    {age_group}: {stats['prevalence_rate']:.1f}% ({stats['affected_patients']}/{stats['total_patients']})")

print("\n=== QUALITY MEASURES RESULTS ===")
print(f"Overall Quality Grade: {quality_results['composite_scores']['grade']}")
print(f"Overall Score: {quality_results['composite_scores']['overall_score']:.1f}%")

for measure_id, result in quality_results['individual_measures'].items():
    if 'error' not in result:
        print(f"\n{result['measure_name']}:")
        print(f"  Performance: {result['measure_rate']:.1f}% ({result['performance_category']})")
        print(f"  Numerator/Denominator: {result['numerator']}/{result['denominator']}")
        print(f"  Target: {result['target_value']}")
        
        if result['improvement_opportunities']:
            print("  Improvement Opportunities:")
            for opportunity in result['improvement_opportunities']:
                print(f"    • {opportunity}")

print("\n=== RISK STRATIFICATION RESULTS ===")
print(f"Total Patients Analyzed: {risk_analysis['total_patients']:,}")

cv_risk = risk_analysis['stratification_models']['cardiovascular_risk']
print(f"\nCardiovascular Risk Distribution:")
print(f"  Low Risk: {cv_risk['population_summary']['low_risk']:,} patients")
print(f"  Moderate Risk: {cv_risk['population_summary']['moderate_risk']:,} patients") 
print(f"  High Risk: {cv_risk['population_summary']['high_risk']:,} patients")
print(f"  Very High Risk: {cv_risk['population_summary']['very_high_risk']:,} patients")

print(f"\nHigh-Risk Patients Requiring Intervention: {len(risk_analysis['high_risk_patients'])}")

print("\n=== INTERVENTION RECOMMENDATIONS ===")
for recommendation in risk_analysis['intervention_recommendations'][:5]:  # Show top 5
    print(f"\n{recommendation['intervention']} ({recommendation['priority'].upper()} Priority):")
    print(f"  Type: {recommendation['type']}")
    print(f"  Description: {recommendation['description']}")
    
    if 'components' in recommendation:
        print("  Components:")
        for component in recommendation['components']:
            print(f"    • {component}")
    
    if 'estimated_impact' in recommendation:
        print(f"  Estimated Impact: {recommendation['estimated_impact']}")

print("\n=== COMPARATIVE ANALYSIS ===")
for insight in prevalence_analysis['comparative_analysis']:
    print(f"• {insight}")

Expected Output

=== POPULATION HEALTH ANALYTICS REPORT ===
Analysis Scope: health_system
Population Size: 50,000
Analysis Date: 2023-12-01T10:30:00Z

=== DISEASE PREVALENCE ANALYSIS ===

Type 2 diabetes mellitus:
  Prevalence Rate: 8.5%
  Affected Patients: 4,250
  Confidence Interval: 8.2% - 8.8%
  Age Breakdown:
    18-34: 1.2% (45/3,750)
    35-49: 4.8% (576/12,000)
    50-64: 12.1% (1,815/15,000)
    65-79: 18.7% (1,496/8,000)
    80+: 28.9% (318/1,100)

Essential hypertension:
  Prevalence Rate: 15.2%
  Affected Patients: 7,600
  Confidence Interval: 14.8% - 15.6%
  Age Breakdown:
    18-34: 3.1% (116/3,750)
    35-49: 8.9% (1,068/12,000)
    50-64: 19.8% (2,970/15,000)
    65-79: 35.4% (2,832/8,000)
    80+: 55.8% (614/1,100)

=== QUALITY MEASURES RESULTS ===
Overall Quality Grade: B+
Overall Score: 82.3%

Diabetes HbA1c Control:
  Performance: 78.5% (Satisfactory)
  Numerator/Denominator: 3,337/4,250
  Target: <7.0
  Improvement Opportunities:
    • 21.5% of eligible patients not meeting target - focus on care gaps
    • 12.3% of patients missing recent tests - improve monitoring

Hypertension Blood Pressure Control:
  Performance: 86.1% (Good)
  Numerator/Denominator: 6,544/7,600
  Target: <140/90
  Improvement Opportunities:
    • Higher gap rate in elderly population - consider age-specific interventions

=== RISK STRATIFICATION RESULTS ===
Total Patients Analyzed: 50,000

Cardiovascular Risk Distribution:
  Low Risk: 28,500 patients
  Moderate Risk: 12,750 patients
  High Risk: 6,250 patients
  Very High Risk: 2,500 patients

High-Risk Patients Requiring Intervention: 8,750

=== INTERVENTION RECOMMENDATIONS ===

Comprehensive Population Health Program (HIGH Priority):
  Type: population_level
  Description: 17.5% of population is high-risk - implement systematic screening and intervention programs
  Estimated Impact: Reduce high-risk population by 15-25%

Cardiovascular Risk Management Program (HIGH Priority):
  Type: cardiovascular_focused
  Description: 8,750 patients at high cardiovascular risk need intensive management
  Components:
    • Statin therapy optimization
    • Blood pressure management
    • Diabetes control enhancement
    • Lifestyle modification counseling
  Estimated Impact: Reduce cardiovascular events by 20-30%

=== COMPARATIVE ANALYSIS ===
• Diabetes prevalence (8.5%) is above national average (7.8%)
• Hypertension control rate (86.1%) exceeds CMS benchmark (80%)
• High cardiovascular risk population (17.5%) indicates need for prevention programs
• Elderly population shows higher comorbidity burden requiring specialized care

Integration Patterns

1. Public Health Surveillance Integration

class PublicHealthSurveillance:
    def __init__(self, analytics_engine: PopulationHealthAnalytics):
        self.analytics = analytics_engine
        
    def monitor_disease_outbreaks(self, patient_data: List[Dict[str, Any]], 
                                surveillance_conditions: List[str],
                                time_window_days: int = 30) -> Dict[str, Any]:
        """Monitor for potential disease outbreaks"""
        
        cutoff_date = datetime.now() - timedelta(days=time_window_days)
        
        outbreak_alerts = []
        
        for condition_code in surveillance_conditions:
            # Get recent cases
            recent_cases = []
            
            for patient in patient_data:
                for condition in patient.get('conditions', []):
                    condition_date = datetime.fromisoformat(condition.get('date', '1900-01-01'))
                    
                    if (condition_date >= cutoff_date and 
                        condition.get('code') == condition_code):
                        recent_cases.append({
                            'patient_id': patient['patient_id'],
                            'condition_date': condition_date,
                            'location': patient.get('location', {})
                        })
            
            # Calculate expected vs actual cases
            expected_cases = self.calculate_expected_cases(condition_code, time_window_days)
            actual_cases = len(recent_cases)
            
            # Statistical significance testing
            if actual_cases > expected_cases * 1.5 and actual_cases >= 3:
                # Potential outbreak detected
                outbreak_alert = {
                    'condition_code': condition_code,
                    'condition_name': self.get_condition_name(condition_code),
                    'actual_cases': actual_cases,
                    'expected_cases': expected_cases,
                    'excess_cases': actual_cases - expected_cases,
                    'risk_ratio': actual_cases / expected_cases if expected_cases > 0 else float('inf'),
                    'time_window': time_window_days,
                    'geographic_clustering': self.analyze_geographic_clustering(recent_cases),
                    'alert_level': self.determine_alert_level(actual_cases, expected_cases),
                    'recommendations': self.generate_outbreak_recommendations(condition_code, actual_cases)
                }
                outbreak_alerts.append(outbreak_alert)
        
        return {
            'surveillance_period': f"{time_window_days} days",
            'conditions_monitored': len(surveillance_conditions),
            'outbreak_alerts': outbreak_alerts,
            'total_alerts': len(outbreak_alerts),
            'highest_risk_condition': max(outbreak_alerts, key=lambda x: x['risk_ratio']) if outbreak_alerts else None
        }

Next Steps

I