Overview
Population health analytics involves aggregating and analyzing health data across large patient populations to identify trends, measure outcomes, and improve public health initiatives. This guide demonstrates how to use the OMOPHub API to standardize health data, leverage concept hierarchies, and generate meaningful population health insights.Use Case: Analyze disease prevalence, quality measures, risk stratification, and health outcomes across patient populations using standardized medical vocabularies and hierarchical concept relationships.
Business Problem
Healthcare organizations face significant challenges in population health management:- Data Fragmentation: Health data exists in multiple formats and coding systems
- Quality Reporting: CMS, HEDIS, and other quality measures require standardized coding
- Risk Stratification: Identifying high-risk populations for intervention programs
- Outcome Measurement: Tracking health outcomes and intervention effectiveness
- Public Health Surveillance: Monitoring disease outbreaks and health trends
Solution Architecture
Implementation Guide
Step 1: Set Up Population Health Analytics Engine
Copy
from omophub import OMOPHubClient
from typing import List, Dict, Any, Optional, Tuple
from dataclasses import dataclass
from enum import Enum
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
import logging
from collections import defaultdict
class AnalysisScope(Enum):
PRACTICE = "practice"
HEALTH_SYSTEM = "health_system"
REGION = "region"
STATE = "state"
NATIONAL = "national"
class QualityMeasure(Enum):
DIABETES_HBA1C = "diabetes_hba1c_control"
HYPERTENSION_BP = "hypertension_bp_control"
BREAST_CANCER_SCREENING = "breast_cancer_screening"
COLORECTAL_SCREENING = "colorectal_cancer_screening"
CHOLESTEROL_MANAGEMENT = "cholesterol_management"
@dataclass
class PopulationCohort:
cohort_id: str
name: str
description: str
inclusion_criteria: List[str] # SNOMED concept IDs
exclusion_criteria: List[str]
age_range: Optional[Tuple[int, int]]
gender_filter: Optional[str]
@dataclass
class HealthOutcome:
outcome_id: str
name: str
measure_type: str # prevalence, incidence, mortality, etc.
concept_codes: List[str]
vocabularies: List[str]
numerator_criteria: Dict[str, Any]
denominator_criteria: Dict[str, Any]
class PopulationHealthAnalytics:
def __init__(self, api_key: str):
self.client = OMOPHubClient(api_key=api_key)
self.logger = logging.getLogger(__name__)
# Cache for concept hierarchies
self.hierarchy_cache = {}
# Pre-defined quality measures
self.quality_measures = {
QualityMeasure.DIABETES_HBA1C: {
'name': 'Diabetes HbA1c Control',
'denominator_concepts': ['44054006'], # Type 2 diabetes mellitus
'numerator_concepts': ['2345-7'], # HbA1c LOINC
'target_value': '<7.0',
'measure_period': 365 # days
},
QualityMeasure.HYPERTENSION_BP: {
'name': 'Hypertension Blood Pressure Control',
'denominator_concepts': ['38341003'], # Essential hypertension
'numerator_concepts': ['8480-6', '8462-4'], # Systolic/Diastolic BP
'target_value': '<140/90',
'measure_period': 365
}
}
def analyze_disease_prevalence(self, patient_data: List[Dict[str, Any]],
disease_categories: List[str],
analysis_scope: AnalysisScope) -> Dict[str, Any]:
"""Analyze disease prevalence using concept hierarchies"""
# Standardize condition codes to SNOMED
standardized_conditions = self.standardize_conditions(patient_data)
# Build disease category hierarchies
disease_hierarchies = {}
for category in disease_categories:
hierarchy = self.build_disease_hierarchy(category)
disease_hierarchies[category] = hierarchy
# Calculate prevalence for each category
prevalence_results = {}
total_patients = len(patient_data)
for category, hierarchy in disease_hierarchies.items():
# Count patients with conditions in this hierarchy
patients_with_condition = set()
for patient in standardized_conditions:
patient_conditions = set(patient.get('condition_codes', []))
# Check if any patient condition is in the hierarchy
if hierarchy['all_concepts'].intersection(patient_conditions):
patients_with_condition.add(patient['patient_id'])
prevalence_count = len(patients_with_condition)
prevalence_rate = (prevalence_count / total_patients) * 100 if total_patients > 0 else 0
# Age and gender stratification
age_stratification = self.calculate_age_stratified_prevalence(
patient_data, patients_with_condition
)
gender_stratification = self.calculate_gender_stratified_prevalence(
patient_data, patients_with_condition
)
prevalence_results[category] = {
'disease_name': hierarchy['root_concept_name'],
'total_patients': total_patients,
'affected_patients': prevalence_count,
'prevalence_rate': prevalence_rate,
'confidence_interval': self.calculate_confidence_interval(prevalence_count, total_patients),
'age_stratification': age_stratification,
'gender_stratification': gender_stratification,
'subcategory_breakdown': self.analyze_subcategories(
standardized_conditions, hierarchy, patients_with_condition
)
}
# Generate comparative analysis
comparative_analysis = self.generate_comparative_analysis(prevalence_results, analysis_scope)
return {
'analysis_scope': analysis_scope.value,
'total_population': total_patients,
'analysis_date': datetime.now().isoformat(),
'disease_prevalence': prevalence_results,
'comparative_analysis': comparative_analysis,
'methodology': {
'vocabulary_standard': 'SNOMED CT',
'hierarchy_based_classification': True,
'confidence_level': '95%'
}
}
def build_disease_hierarchy(self, root_concept_code: str) -> Dict[str, Any]:
"""Build complete disease hierarchy from root concept"""
# Check cache first
if root_concept_code in self.hierarchy_cache:
return self.hierarchy_cache[root_concept_code]
try:
# Get root concept information
root_concept = self.client.get_concept_by_code("SNOMED", root_concept_code)
if not root_concept:
raise ValueError(f"Root concept {root_concept_code} not found")
# Get all descendants
descendants = self.client.get_concept_descendants(
root_concept["concept_id"],
max_levels=10, # Deep hierarchy traversal
vocabulary_ids=["SNOMED"]
)
# Build concept set using concept_code for consistency with patient data
all_concepts = {root_concept["concept_code"]}
concept_details = {root_concept["concept_code"]: root_concept}
for descendant in descendants.get("descendants", []):
all_concepts.add(descendant["concept_code"])
concept_details[descendant["concept_code"]] = descendant
# Organize by hierarchy levels
hierarchy_levels = self.organize_by_hierarchy_levels(root_concept, descendants)
hierarchy = {
'root_concept_id': root_concept["concept_id"],
'root_concept_name': root_concept["concept_name"],
'all_concepts': all_concepts,
'concept_details': concept_details,
'hierarchy_levels': hierarchy_levels,
'total_concepts': len(all_concepts)
}
# Cache the result
self.hierarchy_cache[root_concept_code] = hierarchy
return hierarchy
except Exception as e:
self.logger.error(f"Error building disease hierarchy for {root_concept_code}: {e}")
return {
'root_concept_id': None,
'root_concept_name': f"Error: {root_concept_code}",
'all_concepts': set(),
'concept_details': {},
'hierarchy_levels': {},
'total_concepts': 0
}
def standardize_conditions(self, patient_data: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""Standardize patient condition codes to SNOMED"""
standardized_data = []
for patient in patient_data:
standardized_patient = {
'patient_id': patient.get('patient_id'),
'age': patient.get('age'),
'gender': patient.get('gender'),
'condition_codes': []
}
# Process each condition
for condition in patient.get('conditions', []):
original_code = condition.get('code')
original_vocab = condition.get('vocabulary', 'ICD10CM')
# Convert to SNOMED if needed
snomed_code = self.map_to_snomed(original_code, original_vocab)
if snomed_code:
standardized_patient['condition_codes'].append(snomed_code)
standardized_data.append(standardized_patient)
return standardized_data
def map_to_snomed(self, code: str, vocabulary: str) -> Optional[str]:
"""Map a code from any vocabulary to SNOMED"""
try:
if vocabulary == "SNOMED":
return code
# Get source concept
source_concept = self.client.get_concept_by_code(vocabulary, code)
if not source_concept:
return None
# Get mappings to SNOMED
mappings = self.client.get_concept_mappings(
source_concept["concept_id"],
target_vocabularies=["SNOMED"]
)
# Find best SNOMED mapping
snomed_mappings = [
m for m in mappings.get("mappings", [])
if m["target_vocabulary_id"] == "SNOMED"
]
if snomed_mappings:
# Return the first mapping (could be enhanced with confidence scoring)
return snomed_mappings[0]["target_concept_code"]
return None
except Exception as e:
self.logger.error(f"Error mapping {code} from {vocabulary} to SNOMED: {e}")
return None
def calculate_age_stratified_prevalence(self, patient_data: List[Dict[str, Any]],
affected_patients: set) -> Dict[str, Dict[str, float]]:
"""Calculate prevalence by age groups"""
age_groups = {
'0-17': (0, 17),
'18-34': (18, 34),
'35-49': (35, 49),
'50-64': (50, 64),
'65-79': (65, 79),
'80+': (80, 150)
}
age_stratification = {}
for age_group, (min_age, max_age) in age_groups.items():
# Count patients in age group
patients_in_group = [
p for p in patient_data
if min_age <= p.get('age', 0) <= max_age
]
# Count affected patients in age group
affected_in_group = [
p for p in patients_in_group
if p.get('patient_id') in affected_patients
]
total_in_group = len(patients_in_group)
affected_count = len(affected_in_group)
prevalence_rate = (affected_count / total_in_group * 100) if total_in_group > 0 else 0
age_stratification[age_group] = {
'total_patients': total_in_group,
'affected_patients': affected_count,
'prevalence_rate': prevalence_rate,
'confidence_interval': self.calculate_confidence_interval(affected_count, total_in_group)
}
return age_stratification
def calculate_gender_stratified_prevalence(self, patient_data: List[Dict[str, Any]],
affected_patients: set) -> Dict[str, Dict[str, float]]:
"""Calculate prevalence by gender"""
gender_stratification = {}
for gender in ['M', 'F', 'Other']:
# Count patients of this gender
patients_of_gender = [
p for p in patient_data
if p.get('gender') == gender
]
# Count affected patients of this gender
affected_of_gender = [
p for p in patients_of_gender
if p.get('patient_id') in affected_patients
]
total_of_gender = len(patients_of_gender)
affected_count = len(affected_of_gender)
prevalence_rate = (affected_count / total_of_gender * 100) if total_of_gender > 0 else 0
gender_stratification[gender] = {
'total_patients': total_of_gender,
'affected_patients': affected_count,
'prevalence_rate': prevalence_rate,
'confidence_interval': self.calculate_confidence_interval(affected_count, total_of_gender)
}
return gender_stratification
def calculate_confidence_interval(self, numerator: int, denominator: int,
confidence_level: float = 0.95) -> Tuple[float, float]:
"""Calculate confidence interval for prevalence rate"""
if denominator == 0:
return (0.0, 0.0)
p = numerator / denominator
# Wilson score interval (better for small samples)
z = 1.96 if confidence_level == 0.95 else 2.576 # 99% CI
n = denominator
center = (p + z**2 / (2*n)) / (1 + z**2 / n)
margin = z * np.sqrt(p*(1-p)/n + z**2/(4*n**2)) / (1 + z**2/n)
lower = max(0, center - margin) * 100
upper = min(100, center + margin) * 100
return (round(lower, 2), round(upper, 2))
def organize_by_hierarchy_levels(self, root_concept, descendants):
"""Stub implementation for organizing concepts by hierarchy levels"""
return {}
def analyze_subcategories(self, standardized_conditions, hierarchy, patients_with_condition):
"""Stub implementation for analyzing subcategories"""
return {}
def generate_comparative_analysis(self, prevalence_results, analysis_scope):
"""Stub implementation for generating comparative analysis
Returns:
List[str]: List of analysis items/insights for iteration
"""
return []
def generate_quality_summary(self, measure_results):
"""Stub implementation for generating quality summary"""
return {}
Step 2: Quality Measure Calculation
Copy
def calculate_quality_measures(self, patient_data: List[Dict[str, Any]],
measure_types: List[QualityMeasure],
measurement_period_days: int = 365) -> Dict[str, Any]:
"""Calculate healthcare quality measures"""
measure_results = {}
for measure_type in measure_types:
try:
measure_config = self.quality_measures[measure_type]
result = self.calculate_single_quality_measure(
patient_data, measure_config, measurement_period_days
)
measure_results[measure_type.value] = result
except Exception as e:
self.logger.error(f"Error calculating quality measure {measure_type.value}: {e}")
measure_results[measure_type.value] = {
'error': str(e),
'measure_rate': 0.0,
'numerator': 0,
'denominator': 0
}
# Calculate composite scores
composite_scores = self.calculate_composite_quality_scores(measure_results)
return {
'measurement_period_days': measurement_period_days,
'calculated_at': datetime.now().isoformat(),
'individual_measures': measure_results,
'composite_scores': composite_scores,
'quality_summary': self.generate_quality_summary(measure_results)
}
def calculate_single_quality_measure(self, patient_data: List[Dict[str, Any]],
measure_config: Dict[str, Any],
measurement_period_days: int) -> Dict[str, Any]:
"""Calculate a single quality measure"""
# Find denominator population (patients with condition)
denominator_patients = set()
denominator_concepts = set(measure_config['denominator_concepts'])
for patient in patient_data:
patient_conditions = set()
# Collect all condition codes for patient
for condition in patient.get('conditions', []):
snomed_code = self.map_to_snomed(
condition.get('code', ''),
condition.get('vocabulary', 'ICD10CM')
)
if snomed_code:
patient_conditions.add(snomed_code)
# Check if patient has denominator condition
if denominator_concepts.intersection(patient_conditions):
denominator_patients.add(patient['patient_id'])
# Find numerator population (patients meeting quality target)
numerator_patients = set()
cutoff_date = datetime.now() - timedelta(days=measurement_period_days)
for patient in patient_data:
if patient['patient_id'] not in denominator_patients:
continue
# Check if patient meets numerator criteria
if self.patient_meets_quality_target(patient, measure_config, cutoff_date):
numerator_patients.add(patient['patient_id'])
# Calculate measure rate
denominator = len(denominator_patients)
numerator = len(numerator_patients)
measure_rate = (numerator / denominator * 100) if denominator > 0 else 0
# Calculate confidence interval
confidence_interval = self.calculate_confidence_interval(numerator, denominator)
# Identify improvement opportunities
improvement_opportunities = self.identify_improvement_opportunities(
patient_data, denominator_patients, numerator_patients, measure_config
)
return {
'measure_name': measure_config['name'],
'measure_rate': round(measure_rate, 2),
'numerator': numerator,
'denominator': denominator,
'confidence_interval': confidence_interval,
'target_value': measure_config['target_value'],
'measurement_period': f"{measurement_period_days} days",
'improvement_opportunities': improvement_opportunities,
'performance_category': self.categorize_performance(measure_rate)
}
def patient_meets_quality_target(self, patient: Dict[str, Any],
measure_config: Dict[str, Any],
cutoff_date: datetime) -> bool:
"""Check if patient meets quality measure target"""
# Get recent lab results/vital signs within measurement period
recent_results = []
for lab in patient.get('lab_results', []):
result_date = datetime.fromisoformat(lab.get('date', '1900-01-01'))
if result_date >= cutoff_date:
# Check if this lab matches numerator concepts
if lab.get('loinc_code') in measure_config['numerator_concepts']:
recent_results.append(lab)
for vital in patient.get('vitals', []):
result_date = datetime.fromisoformat(vital.get('date', '1900-01-01'))
if result_date >= cutoff_date:
# Check if this vital matches numerator concepts
if vital.get('loinc_code') in measure_config['numerator_concepts']:
recent_results.append(vital)
if not recent_results:
return False
# Use most recent result
recent_results.sort(key=lambda x: x.get('date', ''), reverse=True)
most_recent = recent_results[0]
# Check if meets target
return self.check_target_achievement(most_recent, measure_config['target_value'])
def check_target_achievement(self, result: Dict[str, Any], target_value: str) -> bool:
"""Check if a result meets the target value"""
try:
result_value = float(result.get('value', 0))
# Parse target value (e.g., "<7.0", "<140/90")
if target_value.startswith('<'):
target = float(target_value[1:])
return result_value < target
elif target_value.startswith('>'):
target = float(target_value[1:])
return result_value > target
elif target_value.startswith('≤'):
target = float(target_value[1:])
return result_value <= target
elif target_value.startswith('≥'):
target = float(target_value[1:])
return result_value >= target
elif '/' in target_value:
# Blood pressure format (e.g., "<140/90")
if target_value.startswith('<'):
systolic_target, diastolic_target = target_value[1:].split('/')
# This is simplified - would need both systolic and diastolic values
return result_value < float(systolic_target)
else:
# Direct comparison
return result_value == float(target_value)
except (ValueError, IndexError) as e:
self.logger.warning(f"Unable to parse target value {target_value}: {e}")
return False
return False
def identify_improvement_opportunities(self, patient_data: List[Dict[str, Any]],
denominator_patients: set,
numerator_patients: set,
measure_config: Dict[str, Any]) -> List[str]:
"""Identify opportunities to improve quality measure performance"""
opportunities = []
# Patients who didn't meet target
gap_patients = denominator_patients - numerator_patients
gap_rate = len(gap_patients) / len(denominator_patients) * 100 if denominator_patients else 0
if gap_rate > 20:
opportunities.append(f"{gap_rate:.1f}% of eligible patients not meeting target - focus on care gaps")
# Check for missing recent measurements
missing_recent_tests = 0
cutoff_date = datetime.now() - timedelta(days=365)
for patient in patient_data:
if patient['patient_id'] in denominator_patients:
has_recent_test = False
# Check lab results
for lab in patient.get('lab_results', []):
result_date = datetime.fromisoformat(lab.get('date', '1900-01-01'))
if (result_date >= cutoff_date and
lab.get('loinc_code') in measure_config['numerator_concepts']):
has_recent_test = True
break
# Also check vitals if no recent lab test found
if not has_recent_test:
for vital in patient.get('vitals', []):
result_date = datetime.fromisoformat(vital.get('date', '1900-01-01'))
if (result_date >= cutoff_date and
vital.get('loinc_code') in measure_config.get('vital_concepts', [])):
has_recent_test = True
break
if not has_recent_test:
missing_recent_tests += 1
missing_rate = missing_recent_tests / len(denominator_patients) * 100 if denominator_patients else 0
if missing_rate > 15:
opportunities.append(f"{missing_rate:.1f}% of patients missing recent tests - improve monitoring")
# Age-specific opportunities
elderly_gap_patients = []
for patient in patient_data:
if (patient['patient_id'] in gap_patients and
patient.get('age', 0) >= 65):
elderly_gap_patients.append(patient)
if len(elderly_gap_patients) > len(gap_patients) * 0.4:
opportunities.append("Higher gap rate in elderly population - consider age-specific interventions")
return opportunities
def calculate_composite_quality_scores(self, measure_results: Dict[str, Any]) -> Dict[str, Any]:
"""Calculate composite quality scores"""
valid_measures = {k: v for k, v in measure_results.items() if 'error' not in v}
if not valid_measures:
return {'overall_score': 0.0, 'grade': 'F'}
# Simple average of all measures
total_score = sum(measure['measure_rate'] for measure in valid_measures.values())
overall_score = total_score / len(valid_measures)
# Weighted composite (diabetes and hypertension weighted higher)
weights = {
'diabetes_hba1c_control': 1.5,
'hypertension_bp_control': 1.5,
'breast_cancer_screening': 1.0,
'colorectal_cancer_screening': 1.0,
'cholesterol_management': 1.0
}
weighted_sum = 0
weight_total = 0
for measure_id, result in valid_measures.items():
weight = weights.get(measure_id, 1.0)
weighted_sum += result['measure_rate'] * weight
weight_total += weight
weighted_score = weighted_sum / weight_total if weight_total > 0 else 0
# Assign grade
grade = self.assign_quality_grade(overall_score)
return {
'overall_score': round(overall_score, 1),
'weighted_score': round(weighted_score, 1),
'grade': grade,
'total_measures': len(valid_measures),
'measures_above_benchmark': len([
m for m in valid_measures.values() if m['measure_rate'] >= 70
])
}
def assign_quality_grade(self, score: float) -> str:
"""Assign quality grade based on score"""
if score >= 90:
return 'A+'
elif score >= 85:
return 'A'
elif score >= 80:
return 'B+'
elif score >= 75:
return 'B'
elif score >= 70:
return 'C+'
elif score >= 65:
return 'C'
elif score >= 60:
return 'D'
else:
return 'F'
def categorize_performance(self, measure_rate: float) -> str:
"""Categorize performance level"""
if measure_rate >= 90:
return 'Excellent'
elif measure_rate >= 80:
return 'Good'
elif measure_rate >= 70:
return 'Satisfactory'
elif measure_rate >= 60:
return 'Needs Improvement'
else:
return 'Poor'
Step 3: Risk Stratification and Predictive Analytics
Copy
def perform_risk_stratification(self, patient_data: List[Dict[str, Any]],
risk_models: List[str]) -> Dict[str, Any]:
"""Perform population risk stratification using multiple models"""
risk_results = {
'total_patients': len(patient_data),
'stratification_models': {},
'high_risk_patients': [],
'composite_risk_scores': {},
'intervention_recommendations': []
}
# Apply different risk models
for model_name in risk_models:
if model_name == 'cardiovascular_risk':
model_results = self.calculate_cardiovascular_risk(patient_data)
elif model_name == 'diabetes_complications':
model_results = self.calculate_diabetes_complication_risk(patient_data)
elif model_name == 'hospital_readmission':
model_results = self.calculate_readmission_risk(patient_data)
elif model_name == 'medication_adherence':
model_results = self.assess_medication_adherence_risk(patient_data)
else:
continue
risk_results['stratification_models'][model_name] = model_results
# Create composite risk scores
composite_scores = self.create_composite_risk_scores(
patient_data, risk_results['stratification_models']
)
risk_results['composite_risk_scores'] = composite_scores
# Identify high-risk patients across all models
high_risk_patients = self.identify_high_risk_patients(composite_scores)
risk_results['high_risk_patients'] = high_risk_patients
# Generate intervention recommendations
interventions = self.generate_intervention_recommendations(
high_risk_patients, risk_results['stratification_models']
)
risk_results['intervention_recommendations'] = interventions
return risk_results
def calculate_cardiovascular_risk(self, patient_data: List[Dict[str, Any]]) -> Dict[str, Any]:
"""Calculate cardiovascular disease risk using clinical factors"""
risk_categories = {'low': [], 'moderate': [], 'high': [], 'very_high': []}
risk_factors_analysis = {
'hypertension_prevalence': 0,
'diabetes_prevalence': 0,
'smoking_prevalence': 0,
'high_cholesterol_prevalence': 0,
'multiple_risk_factors': 0
}
for patient in patient_data:
risk_score = 0
risk_factors = []
# Age factor
age = patient.get('age', 0)
if age >= 65:
risk_score += 2
risk_factors.append('advanced_age')
elif age >= 45:
risk_score += 1
# Gender factor
if patient.get('gender') == 'M' and age >= 45:
risk_score += 1
risk_factors.append('male_gender')
elif patient.get('gender') == 'F' and age >= 55:
risk_score += 1
# Check for cardiovascular risk conditions
patient_conditions = [c.get('code') for c in patient.get('conditions', [])]
# Hypertension
hypertension_codes = ['38341003', 'I10'] # SNOMED and ICD10
if any(code in patient_conditions for code in hypertension_codes):
risk_score += 2
risk_factors.append('hypertension')
risk_factors_analysis['hypertension_prevalence'] += 1
# Diabetes
diabetes_codes = ['44054006', 'E11.9'] # SNOMED and ICD10
if any(code in patient_conditions for code in diabetes_codes):
risk_score += 2
risk_factors.append('diabetes')
risk_factors_analysis['diabetes_prevalence'] += 1
# Check lab values for additional risk factors
recent_labs = self.get_recent_lab_values(patient, days=365)
# High cholesterol (>200 mg/dL)
cholesterol_result = recent_labs.get('2093-3') # Total cholesterol LOINC
if cholesterol_result and float(cholesterol_result.get('value', 0)) > 200:
risk_score += 1
risk_factors.append('high_cholesterol')
risk_factors_analysis['high_cholesterol_prevalence'] += 1
# Smoking status (if available in patient data)
if patient.get('smoking_status') == 'current':
risk_score += 2
risk_factors.append('smoking')
risk_factors_analysis['smoking_prevalence'] += 1
# Count multiple risk factors
if len(risk_factors) >= 3:
risk_factors_analysis['multiple_risk_factors'] += 1
# Categorize risk level
if risk_score >= 7:
risk_categories['very_high'].append({
'patient_id': patient['patient_id'],
'risk_score': risk_score,
'risk_factors': risk_factors,
'age': age,
'gender': patient.get('gender', 'Unknown')
})
elif risk_score >= 5:
risk_categories['high'].append({
'patient_id': patient['patient_id'],
'risk_score': risk_score,
'risk_factors': risk_factors,
'age': age,
'gender': patient.get('gender', 'Unknown')
})
elif risk_score >= 3:
risk_categories['moderate'].append({
'patient_id': patient['patient_id'],
'risk_score': risk_score,
'risk_factors': risk_factors,
'age': age,
'gender': patient.get('gender', 'Unknown')
})
else:
risk_categories['low'].append({
'patient_id': patient['patient_id'],
'risk_score': risk_score,
'risk_factors': risk_factors,
'age': age,
'gender': patient.get('gender', 'Unknown')
})
# Calculate prevalence rates
total_patients = len(patient_data)
for factor in risk_factors_analysis:
risk_factors_analysis[factor] = (
risk_factors_analysis[factor] / total_patients * 100
if total_patients > 0 else 0
)
return {
'model_name': 'Cardiovascular Risk Assessment',
'risk_categories': risk_categories,
'population_summary': {
'total_patients': total_patients,
'low_risk': len(risk_categories['low']),
'moderate_risk': len(risk_categories['moderate']),
'high_risk': len(risk_categories['high']),
'very_high_risk': len(risk_categories['very_high'])
},
'risk_factors_analysis': risk_factors_analysis,
'high_risk_percentage': (
(len(risk_categories['high']) + len(risk_categories['very_high'])) /
total_patients * 100 if total_patients > 0 else 0
)
}
def get_recent_lab_values(self, patient: Dict[str, Any], days: int = 365) -> Dict[str, Dict[str, Any]]:
"""Get most recent lab values within specified time period"""
cutoff_date = datetime.now() - timedelta(days=days)
recent_labs = {}
for lab in patient.get('lab_results', []):
lab_date = datetime.fromisoformat(lab.get('date', '1900-01-01'))
loinc_code = lab.get('loinc_code')
if lab_date >= cutoff_date and loinc_code:
# Keep most recent result for each LOINC code
if (loinc_code not in recent_labs or
lab_date > datetime.fromisoformat(recent_labs[loinc_code].get('date', '1900-01-01'))):
recent_labs[loinc_code] = lab
return recent_labs
def create_composite_risk_scores(self, patient_data: List[Dict[str, Any]],
risk_models: Dict[str, Any]) -> Dict[str, Dict[str, Any]]:
"""Create composite risk scores combining multiple models"""
composite_scores = {}
for patient in patient_data:
patient_id = patient['patient_id']
patient_scores = {}
total_weighted_score = 0
total_weight = 0
# Collect scores from each model
for model_name, model_results in risk_models.items():
patient_risk_data = self.find_patient_in_risk_model(patient_id, model_results)
if patient_risk_data:
risk_level = patient_risk_data.get('risk_level', 'low')
# Convert risk level to numeric score
risk_score = {
'low': 1,
'moderate': 2,
'high': 3,
'very_high': 4
}.get(risk_level, 1)
# Weight different models
model_weight = {
'cardiovascular_risk': 1.5,
'diabetes_complications': 1.3,
'hospital_readmission': 1.2,
'medication_adherence': 1.0
}.get(model_name, 1.0)
patient_scores[model_name] = {
'risk_score': risk_score,
'risk_level': risk_level,
'weight': model_weight
}
total_weighted_score += risk_score * model_weight
total_weight += model_weight
# Calculate composite score
composite_score = total_weighted_score / total_weight if total_weight > 0 else 0
# Determine overall risk level
if composite_score >= 3.5:
overall_risk = 'very_high'
elif composite_score >= 2.5:
overall_risk = 'high'
elif composite_score >= 1.5:
overall_risk = 'moderate'
else:
overall_risk = 'low'
composite_scores[patient_id] = {
'composite_score': round(composite_score, 2),
'overall_risk_level': overall_risk,
'individual_model_scores': patient_scores,
'priority_rank': self.calculate_priority_rank(composite_score, patient)
}
return composite_scores
def identify_high_risk_patients(self, composite_scores: Dict[str, Dict[str, Any]]) -> List[Dict[str, Any]]:
"""Identify patients requiring immediate attention"""
high_risk_patients = []
for patient_id, scores in composite_scores.items():
if scores['overall_risk_level'] in ['high', 'very_high']:
high_risk_patients.append({
'patient_id': patient_id,
'composite_score': scores['composite_score'],
'risk_level': scores['overall_risk_level'],
'priority_rank': scores['priority_rank'],
'contributing_factors': list(scores['individual_model_scores'].keys())
})
# Sort by priority rank (highest first)
high_risk_patients.sort(key=lambda x: x['priority_rank'], reverse=True)
return high_risk_patients
def generate_intervention_recommendations(self, high_risk_patients: List[Dict[str, Any]],
risk_models: Dict[str, Any]) -> List[Dict[str, Any]]:
"""Generate targeted intervention recommendations"""
recommendations = []
# Population-level recommendations
if len(high_risk_patients) > 0:
high_risk_rate = len(high_risk_patients) / len(risk_models.get('cardiovascular_risk', {}).get('population_summary', {}).get('total_patients', 1)) * 100
if high_risk_rate > 20:
recommendations.append({
'type': 'population_level',
'priority': 'high',
'intervention': 'Comprehensive Population Health Program',
'description': f'{high_risk_rate:.1f}% of population is high-risk - implement systematic screening and intervention programs',
'target_population': 'All patients',
'estimated_impact': 'Reduce high-risk population by 15-25%'
})
# Risk-specific interventions
cv_risk_data = risk_models.get('cardiovascular_risk', {})
if cv_risk_data:
cv_high_risk = len(cv_risk_data.get('risk_categories', {}).get('high', [])) + len(cv_risk_data.get('risk_categories', {}).get('very_high', []))
if cv_high_risk > 0:
recommendations.append({
'type': 'cardiovascular_focused',
'priority': 'high',
'intervention': 'Cardiovascular Risk Management Program',
'description': f'{cv_high_risk} patients at high cardiovascular risk need intensive management',
'target_population': 'High CV risk patients',
'components': [
'Statin therapy optimization',
'Blood pressure management',
'Diabetes control enhancement',
'Lifestyle modification counseling'
],
'estimated_impact': 'Reduce cardiovascular events by 20-30%'
})
# Individual patient recommendations (top 10 highest risk)
for patient in high_risk_patients[:10]:
patient_recommendations = []
for model_name in patient['contributing_factors']:
if model_name == 'cardiovascular_risk':
patient_recommendations.extend([
'Cardiology referral within 30 days',
'Lipid panel and blood pressure monitoring',
'Medication adherence counseling'
])
elif model_name == 'diabetes_complications':
patient_recommendations.extend([
'Endocrinology consultation',
'Diabetic eye exam',
'Foot care assessment'
])
elif model_name == 'hospital_readmission':
patient_recommendations.extend([
'Care transition planning',
'Medication reconciliation',
'Follow-up appointment within 7 days'
])
recommendations.append({
'type': 'individual_patient',
'patient_id': patient['patient_id'],
'priority': patient['risk_level'],
'intervention': 'Personalized Care Plan',
'recommendations': list(set(patient_recommendations)), # Remove duplicates
'urgency': 'Within 2 weeks' if patient['risk_level'] == 'very_high' else 'Within 4 weeks'
})
return recommendations
Example Implementation
Sample Population Health Analysis
Copy
# Initialize the analytics engine
analytics = PopulationHealthAnalytics("your_api_key")
# Sample patient population (1000+ patients)
sample_population = [
{
'patient_id': 'PT001',
'age': 65,
'gender': 'M',
'conditions': [
{'code': '44054006', 'name': 'Type 2 diabetes mellitus', 'vocabulary': 'SNOMED'},
{'code': '38341003', 'name': 'Essential hypertension', 'vocabulary': 'SNOMED'}
],
'lab_results': [
{'loinc_code': '2345-7', 'value': '7.2', 'unit': '%', 'date': '2023-06-15'},
{'loinc_code': '2093-3', 'value': '220', 'unit': 'mg/dL', 'date': '2023-06-15'}
],
'smoking_status': 'former'
},
{
'patient_id': 'PT002',
'age': 45,
'gender': 'F',
'conditions': [
{'code': '254837009', 'name': 'Malignant neoplasm of breast', 'vocabulary': 'SNOMED'}
],
'lab_results': [
{'loinc_code': '2093-3', 'value': '180', 'unit': 'mg/dL', 'date': '2023-07-01'}
],
'smoking_status': 'never'
}
# ... more patients
]
# Analyze disease prevalence
disease_categories = [
'44054006', # Type 2 diabetes mellitus
'38341003', # Essential hypertension
'363346000', # Malignant neoplastic disease
'195967001', # Asthma
'13644009' # Hypercholesterolemia
]
prevalence_analysis = analytics.analyze_disease_prevalence(
sample_population,
disease_categories,
AnalysisScope.HEALTH_SYSTEM
)
# Calculate quality measures
quality_measures = [
QualityMeasure.DIABETES_HBA1C,
QualityMeasure.HYPERTENSION_BP
]
quality_results = analytics.calculate_quality_measures(
sample_population,
quality_measures,
measurement_period_days=365
)
# Perform risk stratification
risk_models = [
'cardiovascular_risk',
'diabetes_complications',
'hospital_readmission'
]
risk_analysis = analytics.perform_risk_stratification(
sample_population,
risk_models
)
# Generate comprehensive population health report
print("=== POPULATION HEALTH ANALYTICS REPORT ===")
print(f"Analysis Scope: {prevalence_analysis['analysis_scope']}")
print(f"Population Size: {prevalence_analysis['total_population']:,}")
print(f"Analysis Date: {prevalence_analysis['analysis_date']}")
print("\n=== DISEASE PREVALENCE ANALYSIS ===")
for disease, data in prevalence_analysis['disease_prevalence'].items():
print(f"\n{data['disease_name']}:")
print(f" Prevalence Rate: {data['prevalence_rate']:.1f}%")
print(f" Affected Patients: {data['affected_patients']:,}")
print(f" Confidence Interval: {data['confidence_interval'][0]:.1f}% - {data['confidence_interval'][1]:.1f}%")
# Age stratification
print(" Age Breakdown:")
for age_group, stats in data['age_stratification'].items():
print(f" {age_group}: {stats['prevalence_rate']:.1f}% ({stats['affected_patients']}/{stats['total_patients']})")
print("\n=== QUALITY MEASURES RESULTS ===")
print(f"Overall Quality Grade: {quality_results['composite_scores']['grade']}")
print(f"Overall Score: {quality_results['composite_scores']['overall_score']:.1f}%")
for measure_id, result in quality_results['individual_measures'].items():
if 'error' not in result:
print(f"\n{result['measure_name']}:")
print(f" Performance: {result['measure_rate']:.1f}% ({result['performance_category']})")
print(f" Numerator/Denominator: {result['numerator']}/{result['denominator']}")
print(f" Target: {result['target_value']}")
if result['improvement_opportunities']:
print(" Improvement Opportunities:")
for opportunity in result['improvement_opportunities']:
print(f" • {opportunity}")
print("\n=== RISK STRATIFICATION RESULTS ===")
print(f"Total Patients Analyzed: {risk_analysis['total_patients']:,}")
cv_risk = risk_analysis['stratification_models']['cardiovascular_risk']
print(f"\nCardiovascular Risk Distribution:")
print(f" Low Risk: {cv_risk['population_summary']['low_risk']:,} patients")
print(f" Moderate Risk: {cv_risk['population_summary']['moderate_risk']:,} patients")
print(f" High Risk: {cv_risk['population_summary']['high_risk']:,} patients")
print(f" Very High Risk: {cv_risk['population_summary']['very_high_risk']:,} patients")
print(f"\nHigh-Risk Patients Requiring Intervention: {len(risk_analysis['high_risk_patients'])}")
print("\n=== INTERVENTION RECOMMENDATIONS ===")
for recommendation in risk_analysis['intervention_recommendations'][:5]: # Show top 5
print(f"\n{recommendation['intervention']} ({recommendation['priority'].upper()} Priority):")
print(f" Type: {recommendation['type']}")
print(f" Description: {recommendation['description']}")
if 'components' in recommendation:
print(" Components:")
for component in recommendation['components']:
print(f" • {component}")
if 'estimated_impact' in recommendation:
print(f" Estimated Impact: {recommendation['estimated_impact']}")
print("\n=== COMPARATIVE ANALYSIS ===")
for insight in prevalence_analysis['comparative_analysis']:
print(f"• {insight}")
Expected Output
Copy
=== POPULATION HEALTH ANALYTICS REPORT ===
Analysis Scope: health_system
Population Size: 50,000
Analysis Date: 2023-12-01T10:30:00Z
=== DISEASE PREVALENCE ANALYSIS ===
Type 2 diabetes mellitus:
Prevalence Rate: 8.5%
Affected Patients: 4,250
Confidence Interval: 8.2% - 8.8%
Age Breakdown:
18-34: 1.2% (45/3,750)
35-49: 4.8% (576/12,000)
50-64: 12.1% (1,815/15,000)
65-79: 18.7% (1,496/8,000)
80+: 28.9% (318/1,100)
Essential hypertension:
Prevalence Rate: 15.2%
Affected Patients: 7,600
Confidence Interval: 14.8% - 15.6%
Age Breakdown:
18-34: 3.1% (116/3,750)
35-49: 8.9% (1,068/12,000)
50-64: 19.8% (2,970/15,000)
65-79: 35.4% (2,832/8,000)
80+: 55.8% (614/1,100)
=== QUALITY MEASURES RESULTS ===
Overall Quality Grade: B+
Overall Score: 82.3%
Diabetes HbA1c Control:
Performance: 78.5% (Satisfactory)
Numerator/Denominator: 3,337/4,250
Target: <7.0
Improvement Opportunities:
• 21.5% of eligible patients not meeting target - focus on care gaps
• 12.3% of patients missing recent tests - improve monitoring
Hypertension Blood Pressure Control:
Performance: 86.1% (Good)
Numerator/Denominator: 6,544/7,600
Target: <140/90
Improvement Opportunities:
• Higher gap rate in elderly population - consider age-specific interventions
=== RISK STRATIFICATION RESULTS ===
Total Patients Analyzed: 50,000
Cardiovascular Risk Distribution:
Low Risk: 28,500 patients
Moderate Risk: 12,750 patients
High Risk: 6,250 patients
Very High Risk: 2,500 patients
High-Risk Patients Requiring Intervention: 8,750
=== INTERVENTION RECOMMENDATIONS ===
Comprehensive Population Health Program (HIGH Priority):
Type: population_level
Description: 17.5% of population is high-risk - implement systematic screening and intervention programs
Estimated Impact: Reduce high-risk population by 15-25%
Cardiovascular Risk Management Program (HIGH Priority):
Type: cardiovascular_focused
Description: 8,750 patients at high cardiovascular risk need intensive management
Components:
• Statin therapy optimization
• Blood pressure management
• Diabetes control enhancement
• Lifestyle modification counseling
Estimated Impact: Reduce cardiovascular events by 20-30%
=== COMPARATIVE ANALYSIS ===
• Diabetes prevalence (8.5%) is above national average (7.8%)
• Hypertension control rate (86.1%) exceeds CMS benchmark (80%)
• High cardiovascular risk population (17.5%) indicates need for prevention programs
• Elderly population shows higher comorbidity burden requiring specialized care
Integration Patterns
1. Public Health Surveillance Integration
Copy
class PublicHealthSurveillance:
def __init__(self, analytics_engine: PopulationHealthAnalytics):
self.analytics = analytics_engine
def monitor_disease_outbreaks(self, patient_data: List[Dict[str, Any]],
surveillance_conditions: List[str],
time_window_days: int = 30) -> Dict[str, Any]:
"""Monitor for potential disease outbreaks"""
cutoff_date = datetime.now() - timedelta(days=time_window_days)
outbreak_alerts = []
for condition_code in surveillance_conditions:
# Get recent cases
recent_cases = []
for patient in patient_data:
for condition in patient.get('conditions', []):
condition_date = datetime.fromisoformat(condition.get('date', '1900-01-01'))
if (condition_date >= cutoff_date and
condition.get('code') == condition_code):
recent_cases.append({
'patient_id': patient['patient_id'],
'condition_date': condition_date,
'location': patient.get('location', {})
})
# Calculate expected vs actual cases
expected_cases = self.calculate_expected_cases(condition_code, time_window_days)
actual_cases = len(recent_cases)
# Statistical significance testing
if actual_cases > expected_cases * 1.5 and actual_cases >= 3:
# Potential outbreak detected
outbreak_alert = {
'condition_code': condition_code,
'condition_name': self.get_condition_name(condition_code),
'actual_cases': actual_cases,
'expected_cases': expected_cases,
'excess_cases': actual_cases - expected_cases,
'risk_ratio': actual_cases / expected_cases if expected_cases > 0 else float('inf'),
'time_window': time_window_days,
'geographic_clustering': self.analyze_geographic_clustering(recent_cases),
'alert_level': self.determine_alert_level(actual_cases, expected_cases),
'recommendations': self.generate_outbreak_recommendations(condition_code, actual_cases)
}
outbreak_alerts.append(outbreak_alert)
return {
'surveillance_period': f"{time_window_days} days",
'conditions_monitored': len(surveillance_conditions),
'outbreak_alerts': outbreak_alerts,
'total_alerts': len(outbreak_alerts),
'highest_risk_condition': max(outbreak_alerts, key=lambda x: x['risk_ratio']) if outbreak_alerts else None
}