Skip to main content

Overview

Clinical trial eligibility screening is a critical process for matching patients to appropriate research studies. This guide demonstrates how to use the OMOPHub API with SNOMED CT and other medical vocabularies to automate patient screening against complex inclusion and exclusion criteria.
Use Case: Automatically identify eligible patients for clinical trials by matching patient characteristics against structured eligibility criteria, improving recruitment efficiency and reducing screening time.

Business Problem

Clinical trial recruitment faces significant challenges:
  • Low Enrollment Rates: Only 3-5% of cancer patients participate in clinical trials
  • Manual Screening: Labor-intensive process to review patient records
  • Complex Criteria: Trials have 50+ inclusion/exclusion criteria on average
  • Time Constraints: 80% of trials fail to meet enrollment targets on time
  • Cost Impact: Delayed enrollment costs 600K600K-8M per day for Phase III trials

Solution Architecture

Implementation Guide

Step 1: Set Up Trial Eligibility Engine

from omophub import OMOPHubClient
from typing import List, Dict, Any, Optional, Union
from dataclasses import dataclass
from enum import Enum
import re
import logging
from datetime import datetime, timedelta

class CriteriaType(Enum):
    INCLUSION = "inclusion"
    EXCLUSION = "exclusion"

class MatchType(Enum):
    EXACT = "exact"
    HIERARCHICAL = "hierarchical"
    SEMANTIC = "semantic"
    NUMERICAL = "numerical"

@dataclass
class EligibilityCriteria:
    id: str
    type: CriteriaType
    description: str
    concept_codes: List[str]
    vocabularies: List[str]
    match_type: MatchType
    numerical_constraint: Optional[Dict[str, Any]] = None
    age_constraint: Optional[Dict[str, Any]] = None
    temporal_constraint: Optional[Dict[str, Any]] = None

@dataclass
class PatientData:
    patient_id: str
    age: int
    gender: str
    conditions: List[Dict[str, Any]]
    medications: List[Dict[str, Any]]
    lab_results: List[Dict[str, Any]]
    procedures: List[Dict[str, Any]]
    vitals: List[Dict[str, Any]]

@dataclass
class ScreeningResult:
    patient_id: str
    trial_id: str
    eligible: bool
    confidence_score: float
    matched_criteria: List[str]
    failed_criteria: List[str]
    partial_matches: List[str]
    exclusion_reasons: List[str]
    recommendation: str

class ClinicalTrialEligibilityEngine:
    def __init__(self, api_key: str):
        self.client = OMOPHubClient(api_key=api_key)
        self.logger = logging.getLogger(__name__)
        
        # Common condition mappings for faster lookup
        self.condition_cache = {}
        
        # Demographic constraints
        self.demographic_validators = {
            'age': self.validate_age_constraint,
            'gender': self.validate_gender_constraint,
            'pregnancy': self.validate_pregnancy_constraint
        }
    
    def screen_patient(self, patient: PatientData, trial_criteria: List[EligibilityCriteria],
                      trial_id: str) -> ScreeningResult:
        """Screen a single patient against trial criteria"""
        
        matched_criteria = []
        failed_criteria = []
        partial_matches = []
        exclusion_reasons = []
        
        inclusion_criteria = [c for c in trial_criteria if c.type == CriteriaType.INCLUSION]
        exclusion_criteria = [c for c in trial_criteria if c.type == CriteriaType.EXCLUSION]
        
        # Check inclusion criteria
        inclusion_score = 0
        for criteria in inclusion_criteria:
            match_result = self.evaluate_criteria(patient, criteria)
            
            if match_result['matches']:
                matched_criteria.append(criteria.id)
                inclusion_score += match_result['confidence']
            elif match_result['partial']:
                partial_matches.append(criteria.id)
            else:
                failed_criteria.append(criteria.id)
        
        # Check exclusion criteria
        excluded = False
        for criteria in exclusion_criteria:
            match_result = self.evaluate_criteria(patient, criteria)
            
            if match_result['matches']:
                excluded = True
                exclusion_reasons.append(f"{criteria.id}: {criteria.description}")
        
        # Calculate overall eligibility
        inclusion_rate = len(matched_criteria) / len(inclusion_criteria) if inclusion_criteria else 1.0
        partial_rate = len(partial_matches) / len(inclusion_criteria) if inclusion_criteria else 0.0
        
        # Overall confidence considering partial matches
        confidence_score = (inclusion_score / len(inclusion_criteria)) if inclusion_criteria else 1.0
        if partial_matches:
            confidence_score = (confidence_score + partial_rate * 0.5) / 2
        
        # Apply exclusion penalty to confidence
        if excluded:
            confidence_score = confidence_score * 0.4  # Reduce confidence by 60% when excluded
        
        # Determine eligibility
        eligible = not excluded and inclusion_rate >= 0.8  # 80% of inclusion criteria must match
        
        # Generate recommendation
        recommendation = self.generate_recommendation(
            eligible, inclusion_rate, partial_rate, excluded, exclusion_reasons
        )
        
        return ScreeningResult(
            patient_id=patient.patient_id,
            trial_id=trial_id,
            eligible=eligible,
            confidence_score=confidence_score,
            matched_criteria=matched_criteria,
            failed_criteria=failed_criteria,
            partial_matches=partial_matches,
            exclusion_reasons=exclusion_reasons,
            recommendation=recommendation
        )
    
    def evaluate_criteria(self, patient: PatientData, criteria: EligibilityCriteria) -> Dict[str, Any]:
        """Evaluate a single criteria against patient data"""
        
        try:
            # Handle different types of criteria
            if criteria.age_constraint:
                return self.evaluate_age_criteria(patient, criteria)
            elif criteria.numerical_constraint:
                return self.evaluate_numerical_criteria(patient, criteria)
            elif criteria.temporal_constraint:
                return self.evaluate_temporal_criteria(patient, criteria)
            else:
                return self.evaluate_concept_criteria(patient, criteria)
                
        except Exception as e:
            self.logger.error(f"Error evaluating criteria {criteria.id}: {e}")
            return {'matches': False, 'partial': False, 'confidence': 0.0}
    
    def evaluate_concept_criteria(self, patient: PatientData, criteria: EligibilityCriteria) -> Dict[str, Any]:
        """Evaluate concept-based criteria (conditions, medications, procedures)"""
        
        # Get relevant patient data based on criteria
        patient_concepts = []
        
        # Collect concepts from different data sources
        for condition in patient.conditions:
            patient_concepts.append({
                'concept_code': condition.get('code'),
                'concept_name': condition.get('name'),
                'vocabulary': condition.get('vocabulary', 'ICD10CM'),
                'category': 'condition'
            })
        
        for medication in patient.medications:
            patient_concepts.append({
                'concept_code': medication.get('code'),
                'concept_name': medication.get('name'),
                'vocabulary': medication.get('vocabulary', 'RxNorm'),
                'category': 'medication'
            })
        
        for procedure in patient.procedures:
            patient_concepts.append({
                'concept_code': procedure.get('code'),
                'concept_name': procedure.get('name'), 
                'vocabulary': procedure.get('vocabulary', 'HCPCS'),
                'category': 'procedure'
            })
        
        # Match against criteria concepts
        matches = []
        partial_matches = []
        
        for target_code in criteria.concept_codes:
            for vocab in criteria.vocabularies:
                match_result = self.match_concepts(
                    patient_concepts, target_code, vocab, criteria.match_type
                )
                
                if match_result['exact_matches']:
                    matches.extend(match_result['exact_matches'])
                elif match_result['hierarchical_matches']:
                    if criteria.match_type in [MatchType.HIERARCHICAL, MatchType.SEMANTIC]:
                        matches.extend(match_result['hierarchical_matches'])
                    else:
                        partial_matches.extend(match_result['hierarchical_matches'])
        
        # Calculate confidence based on match quality
        confidence = 0.0
        if matches:
            exact_confidence = len([m for m in matches if m.get('match_type') == 'exact']) * 1.0
            hierarchical_confidence = len([m for m in matches if m.get('match_type') == 'hierarchical']) * 0.8
            confidence = min(1.0, (exact_confidence + hierarchical_confidence) / len(criteria.concept_codes))
        
        return {
            'matches': len(matches) > 0,
            'partial': len(partial_matches) > 0 and len(matches) == 0,
            'confidence': confidence,
            'details': {
                'exact_matches': matches,
                'partial_matches': partial_matches
            }
        }
    
    def match_concepts(self, patient_concepts: List[Dict[str, Any]], target_code: str, 
                      target_vocab: str, match_type: MatchType) -> Dict[str, Any]:
        """Match patient concepts against target criteria"""
        
        exact_matches = []
        hierarchical_matches = []
        
        try:
            # Get target concept information
            target_concept = self.client.get_concept_by_code(target_vocab, target_code)
            if not target_concept:
                return {'exact_matches': [], 'hierarchical_matches': []}
            
            # Check each patient concept
            for patient_concept in patient_concepts:
                # Exact code match
                if (patient_concept['concept_code'] == target_code and 
                    patient_concept['vocabulary'] == target_vocab):
                    exact_matches.append({
                        'patient_concept': patient_concept,
                        'target_concept': target_concept,
                        'match_type': 'exact',
                        'confidence': 1.0
                    })
                    continue
                
                # Hierarchical matching for SNOMED concepts
                if match_type in [MatchType.HIERARCHICAL, MatchType.SEMANTIC]:
                    hierarchical_match = self.check_hierarchical_relationship(
                        patient_concept, target_concept
                    )
                    
                    if hierarchical_match:
                        hierarchical_matches.append({
                            'patient_concept': patient_concept,
                            'target_concept': target_concept,
                            'match_type': 'hierarchical',
                            'confidence': hierarchical_match['confidence'],
                            'relationship': hierarchical_match['relationship']
                        })
            
            return {
                'exact_matches': exact_matches,
                'hierarchical_matches': hierarchical_matches
            }
            
        except Exception as e:
            self.logger.error(f"Error in concept matching: {e}")
            return {'exact_matches': [], 'hierarchical_matches': []}
    
    def check_hierarchical_relationship(self, patient_concept: Dict[str, Any], 
                                      target_concept: Dict[str, Any]) -> Optional[Dict[str, Any]]:
        """Check if patient concept has hierarchical relationship with target"""
        
        try:
            # Convert patient concept to SNOMED if needed
            patient_snomed = self.get_snomed_mapping(patient_concept)
            if not patient_snomed:
                return None
            
            # Check if patient concept is a descendant of target concept
            descendants = self.client.get_concept_descendants(
                target_concept["concept_id"],
                max_levels=5,
                vocabulary_ids=["SNOMED"]
            )
            
            for descendant in descendants.get("descendants", []):
                if descendant["concept_id"] == patient_snomed["concept_id"]:
                    return {
                        'confidence': 0.8,
                        'relationship': 'is_a_descendant'
                    }
            
            # Check if target concept is a descendant of patient concept
            ancestors = self.client.get_concept_ancestors(
                patient_snomed["concept_id"],
                max_levels=5,
                vocabulary_ids=["SNOMED"]
            )
            
            for ancestor in ancestors.get("ancestors", []):
                if ancestor["concept_id"] == target_concept["concept_id"]:
                    return {
                        'confidence': 0.7,
                        'relationship': 'is_an_ancestor'
                    }
            
            return None
            
        except Exception as e:
            self.logger.error(f"Error checking hierarchical relationship: {e}")
            return None
    
    def get_snomed_mapping(self, concept: Dict[str, Any]) -> Optional[Dict[str, Any]]:
        """Get SNOMED mapping for a concept from another vocabulary"""
        
        try:
            if concept['vocabulary'] == 'SNOMED':
                return self.client.get_concept_by_code('SNOMED', concept['concept_code'])
            
            # Get concept mappings to SNOMED
            source_concept = self.client.get_concept_by_code(
                concept['vocabulary'], 
                concept['concept_code']
            )
            
            if source_concept:
                mappings = self.client.get_concept_mappings(
                    source_concept["concept_id"],
                    target_vocabularies=["SNOMED"]
                )
                
                snomed_mappings = [
                    m for m in mappings.get("mappings", []) 
                    if m["target_vocabulary_id"] == "SNOMED"
                ]
                
                if snomed_mappings:
                    # Return the first mapping (could be improved with confidence scoring)
                    return self.client.get_concept(snomed_mappings[0]["target_concept_id"])
            
            return None
            
        except Exception as e:
            self.logger.error(f"Error getting SNOMED mapping: {e}")
            return None

Step 2: Numerical and Temporal Constraints

def evaluate_age_criteria(self, patient: PatientData, criteria: EligibilityCriteria) -> Dict[str, Any]:
    """Evaluate age-based criteria"""
    age_constraint = criteria.age_constraint
    patient_age = patient.age
    
    matches = True
    
    if 'min_age' in age_constraint:
        matches = matches and (patient_age >= age_constraint['min_age'])
    
    if 'max_age' in age_constraint:
        matches = matches and (patient_age <= age_constraint['max_age'])
    
    return {
        'matches': matches,
        'partial': False,  # Age is binary match
        'confidence': 1.0 if matches else 0.0,
        'details': {
            'patient_age': patient_age,
            'constraint': age_constraint
        }
    }

def evaluate_numerical_criteria(self, patient: PatientData, criteria: EligibilityCriteria) -> Dict[str, Any]:
    """Evaluate numerical criteria (lab values, vitals, etc.)"""
    numerical_constraint = criteria.numerical_constraint
    
    # Get relevant patient values
    target_values = []
    
    if numerical_constraint.get('data_type') == 'lab_result':
        lab_code = numerical_constraint.get('code')
        for lab in patient.lab_results:
            if lab.get('code') == lab_code or lab.get('name') == numerical_constraint.get('name'):
                try:
                    value = float(lab.get('value', 0))
                    target_values.append({
                        'value': value,
                        'unit': lab.get('unit'),
                        'date': lab.get('date')
                    })
                except (ValueError, TypeError):
                    continue
    
    elif numerical_constraint.get('data_type') == 'vital_sign':
        vital_type = numerical_constraint.get('vital_type')
        for vital in patient.vitals:
            if vital.get('type') == vital_type:
                try:
                    value = float(vital.get('value', 0))
                    target_values.append({
                        'value': value,
                        'unit': vital.get('unit'),
                        'date': vital.get('date')
                    })
                except (ValueError, TypeError):
                    continue
    
    if not target_values:
        return {'matches': False, 'partial': False, 'confidence': 0.0}
    
    # Use most recent value
    target_values.sort(key=lambda x: x.get('date', ''), reverse=True)
    most_recent = target_values[0]
    
    # Check constraints
    matches = True
    partial = False
    
    if 'min_value' in numerical_constraint:
        if most_recent['value'] < numerical_constraint['min_value']:
            matches = False
        elif most_recent['value'] < numerical_constraint['min_value'] * 1.1:  # Within 10%
            partial = True
    
    if 'max_value' in numerical_constraint:
        if most_recent['value'] > numerical_constraint['max_value']:
            matches = False
        elif most_recent['value'] > numerical_constraint['max_value'] * 0.9:  # Within 10%
            partial = True
    
    # Calculate confidence based on how close to constraints
    confidence = 1.0 if matches else (0.5 if partial else 0.0)
    
    return {
        'matches': matches,
        'partial': partial and not matches,
        'confidence': confidence,
        'details': {
            'patient_value': most_recent,
            'constraint': numerical_constraint
        }
    }

def evaluate_temporal_criteria(self, patient: PatientData, criteria: EligibilityCriteria) -> Dict[str, Any]:
    """Evaluate temporal criteria (recent diagnosis, treatment timeline, etc.)"""
    temporal_constraint = criteria.temporal_constraint
    
    current_date = datetime.now()
    
    # Find relevant conditions/procedures within timeframe
    relevant_items = []
    
    if temporal_constraint.get('applies_to') == 'conditions':
        for condition in patient.conditions:
            if condition.get('date'):
                condition_date = datetime.fromisoformat(condition['date'])
                days_ago = (current_date - condition_date).days
                
                # Check if condition matches concept codes
                if condition.get('code') in criteria.concept_codes:
                    relevant_items.append({
                        'item': condition,
                        'days_ago': days_ago
                    })
    
    elif temporal_constraint.get('applies_to') == 'procedures':
        for procedure in patient.procedures:
            if procedure.get('date'):
                procedure_date = datetime.fromisoformat(procedure['date'])
                days_ago = (current_date - procedure_date).days
                
                if procedure.get('code') in criteria.concept_codes:
                    relevant_items.append({
                        'item': procedure,
                        'days_ago': days_ago
                    })
    
    if not relevant_items:
        return {'matches': False, 'partial': False, 'confidence': 0.0}
    
    # Check temporal constraints
    matches = False
    partial = False
    
    for item in relevant_items:
        days_ago = item['days_ago']
        
        # Check minimum time since event
        if 'min_days_ago' in temporal_constraint:
            if days_ago >= temporal_constraint['min_days_ago']:
                matches = True
            elif days_ago >= temporal_constraint['min_days_ago'] * 0.8:  # Within 20%
                partial = True
        
        # Check maximum time since event
        if 'max_days_ago' in temporal_constraint:
            if days_ago <= temporal_constraint['max_days_ago']:
                matches = True
            elif days_ago <= temporal_constraint['max_days_ago'] * 1.2:  # Within 20%
                partial = True
        
        # If any item matches, we're good
        if matches:
            break
    
    confidence = 1.0 if matches else (0.5 if partial else 0.0)
    
    return {
        'matches': matches,
        'partial': partial and not matches,
        'confidence': confidence,
        'details': {
            'relevant_items': relevant_items,
            'constraint': temporal_constraint
        }
    }

def generate_recommendation(self, eligible: bool, inclusion_rate: float, partial_rate: float, 
                           excluded: bool, exclusion_reasons: List[str]) -> str:
    """Generate human-readable recommendation"""
    
    if excluded:
        return f"EXCLUDED: {'; '.join(exclusion_reasons)}"
    
    if eligible:
        return f"ELIGIBLE: High confidence match ({inclusion_rate:.1%} inclusion criteria met)"
    
    if inclusion_rate >= 0.6:
        return f"POTENTIAL: Moderate match ({inclusion_rate:.1%} inclusion criteria met) - Manual review recommended"
    
    if partial_rate >= 0.3:
        return f"REVIEW: Some criteria partially matched ({partial_rate:.1%} partial matches) - Detailed assessment needed"
    
    return f"NOT ELIGIBLE: Low match rate ({inclusion_rate:.1%} inclusion criteria met)"

Step 3: Batch Patient Screening

def screen_patient_cohort(self, patients: List[PatientData], trial_criteria: List[EligibilityCriteria], 
                         trial_id: str) -> Dict[str, Any]:
    """Screen multiple patients for trial eligibility"""
    
    screening_results = []
    summary_stats = {
        'total_patients': len(patients),
        'eligible_patients': 0,
        'excluded_patients': 0,
        'potential_patients': 0,
        'not_eligible_patients': 0,
        'criteria_performance': {}
    }
    
    # Track criteria performance
    criteria_stats = {}
    for criteria in trial_criteria:
        criteria_stats[criteria.id] = {
            'matched_count': 0,
            'failed_count': 0,
            'partial_count': 0,
            'type': criteria.type.value
        }
    
    # Screen each patient
    for patient in patients:
        try:
            result = self.screen_patient(patient, trial_criteria, trial_id)
            screening_results.append(result)
            
            # Update summary statistics
            if result.eligible:
                summary_stats['eligible_patients'] += 1
            elif result.exclusion_reasons:
                summary_stats['excluded_patients'] += 1
            elif result.confidence_score >= 0.6:
                summary_stats['potential_patients'] += 1
            else:
                summary_stats['not_eligible_patients'] += 1
            
            # Update criteria performance
            for criteria_id in result.matched_criteria:
                if criteria_id in criteria_stats:
                    criteria_stats[criteria_id]['matched_count'] += 1
            
            for criteria_id in result.failed_criteria:
                if criteria_id in criteria_stats:
                    criteria_stats[criteria_id]['failed_count'] += 1
            
            for criteria_id in result.partial_matches:
                if criteria_id in criteria_stats:
                    criteria_stats[criteria_id]['partial_count'] += 1
                    
        except Exception as e:
            self.logger.error(f"Error screening patient {patient.patient_id}: {e}")
            # Create error result
            screening_results.append(ScreeningResult(
                patient_id=patient.patient_id,
                trial_id=trial_id,
                eligible=False,
                confidence_score=0.0,
                matched_criteria=[],
                failed_criteria=[],
                partial_matches=[],
                exclusion_reasons=[f"Screening error: {str(e)}"],
                recommendation="ERROR: Unable to complete screening"
            ))
    
    # Calculate criteria performance metrics
    for criteria_id, stats in criteria_stats.items():
        total = stats['matched_count'] + stats['failed_count'] + stats['partial_count']
        if total > 0:
            stats['match_rate'] = stats['matched_count'] / total
            stats['partial_rate'] = stats['partial_count'] / total
            stats['fail_rate'] = stats['failed_count'] / total
        else:
            stats['match_rate'] = 0.0
            stats['partial_rate'] = 0.0
            stats['fail_rate'] = 0.0
    
    summary_stats['criteria_performance'] = criteria_stats
    
    # Generate cohort insights
    insights = self.generate_cohort_insights(screening_results, summary_stats)
    
    return {
        'trial_id': trial_id,
        'screening_results': screening_results,
        'summary_stats': summary_stats,
        'insights': insights,
        'screened_at': datetime.now().isoformat(),
        'eligible_patients': [r for r in screening_results if r.eligible],
        'potential_patients': [r for r in screening_results if not r.eligible and r.confidence_score >= 0.6]
    }

def generate_cohort_insights(self, results: List[ScreeningResult], stats: Dict[str, Any]) -> List[str]:
    """Generate insights about the screening cohort"""
    insights = []
    
    total = stats['total_patients']
    eligible = stats['eligible_patients']
    excluded = stats['excluded_patients']
    potential = stats['potential_patients']
    
    # Eligibility rate insights
    eligibility_rate = (eligible / total) * 100 if total > 0 else 0
    if eligibility_rate > 20:
        insights.append(f"High eligibility rate ({eligibility_rate:.1f}%) - Good patient population match")
    elif eligibility_rate < 5:
        insights.append(f"Low eligibility rate ({eligibility_rate:.1f}%) - Consider criteria refinement")
    
    # Exclusion analysis
    exclusion_rate = (excluded / total) * 100 if total > 0 else 0
    if exclusion_rate > 50:
        insights.append(f"High exclusion rate ({exclusion_rate:.1f}%) - Review exclusion criteria")
    
    # Most common exclusion reasons
    exclusion_reasons = {}
    for result in results:
        for reason in result.exclusion_reasons:
            exclusion_reasons[reason] = exclusion_reasons.get(reason, 0) + 1
    
    if exclusion_reasons:
        most_common = max(exclusion_reasons.items(), key=lambda x: x[1])
        insights.append(f"Most common exclusion: {most_common[0]} ({most_common[1]} patients)")
    
    # Criteria performance analysis
    criteria_perf = stats['criteria_performance']
    inclusion_criteria = {k: v for k, v in criteria_perf.items() if v['type'] == 'inclusion'}
    
    if inclusion_criteria:
        # Find hardest criteria to match
        hardest_criteria = min(inclusion_criteria.items(), key=lambda x: x[1]['match_rate'])
        if hardest_criteria[1]['match_rate'] < 0.1:
            insights.append(f"Restrictive criteria: {hardest_criteria[0]} (only {hardest_criteria[1]['match_rate']:.1%} match rate)")
    
    # Potential patients insights
    if potential > 0:
        insights.append(f"{potential} patients show potential eligibility - recommend manual review")
    
    return insights

def create_recruitment_report(self, cohort_results: Dict[str, Any]) -> Dict[str, Any]:
    """Create comprehensive recruitment report"""
    
    eligible_patients = cohort_results['eligible_patients']
    potential_patients = cohort_results['potential_patients']
    
    # Patient prioritization
    prioritized_eligible = sorted(
        eligible_patients,
        key=lambda x: x.confidence_score,
        reverse=True
    )
    
    prioritized_potential = sorted(
        potential_patients,
        key=lambda x: x.confidence_score,
        reverse=True
    )
    
    # Generate contact recommendations
    contact_recommendations = []
    
    # High-priority eligible patients
    for patient in prioritized_eligible[:10]:  # Top 10
        contact_recommendations.append({
            'patient_id': patient.patient_id,
            'priority': 'HIGH',
            'eligibility_status': 'ELIGIBLE',
            'confidence': patient.confidence_score,
            'next_action': 'Schedule screening visit',
            'contact_urgency': 'Within 48 hours',
            'notes': f"Strong match - {len(patient.matched_criteria)} criteria met"
        })
    
    # Potential patients requiring review
    for patient in prioritized_potential[:5]:  # Top 5 potential
        contact_recommendations.append({
            'patient_id': patient.patient_id,
            'priority': 'MEDIUM',
            'eligibility_status': 'POTENTIAL',
            'confidence': patient.confidence_score,
            'next_action': 'Clinical review required',
            'contact_urgency': 'Within 1 week',
            'notes': f"Partial match - {len(patient.partial_matches)} criteria need review"
        })
    
    return {
        'trial_id': cohort_results['trial_id'],
        'recruitment_summary': {
            'total_screened': cohort_results['summary_stats']['total_patients'],
            'immediately_eligible': len(eligible_patients),
            'requires_review': len(potential_patients),
            'projected_enrollment': len(eligible_patients) + int(len(potential_patients) * 0.3)  # 30% conversion
        },
        'contact_recommendations': contact_recommendations,
        'criteria_insights': cohort_results['insights'],
        'performance_metrics': {
            'screening_efficiency': f"{len(eligible_patients + potential_patients)}/{cohort_results['summary_stats']['total_patients']} patients identified",
            'time_to_identify': 'Real-time automated screening',
            'manual_review_reduction': f"{100 - (len(potential_patients) / cohort_results['summary_stats']['total_patients'] * 100):.1f}% reduction in manual reviews"
        },
        'generated_at': datetime.now().isoformat()
    }

Example Implementation

Sample Clinical Trial Criteria

NCT05123456: Phase II Oncology Trial
Target: Non-small cell lung cancer patients

INCLUSION CRITERIA:
1. Histologically confirmed NSCLC (Stage IIIB-IV)
2. Age 18-75 years
3. ECOG Performance Status 0-2
4. Adequate organ function (Creatinine ≤1.5x ULN)
5. Previous chemotherapy required

EXCLUSION CRITERIA:
1. Active brain metastases
2. Severe cardiac disease (EF <50%)
3. Pregnancy or nursing
4. Previous treatment with study drug class

Implementation Example

# Initialize the engine
engine = ClinicalTrialEligibilityEngine("your_api_key")

# Define trial criteria
trial_criteria = [
    # Inclusion criteria
    EligibilityCriteria(
        id="INCL_001",
        type=CriteriaType.INCLUSION,
        description="Histologically confirmed non-small cell lung cancer",
        concept_codes=["254637007"],  # SNOMED: Non-small cell lung cancer
        vocabularies=["SNOMED"],
        match_type=MatchType.HIERARCHICAL
    ),
    EligibilityCriteria(
        id="INCL_002", 
        type=CriteriaType.INCLUSION,
        description="Age 18-75 years",
        concept_codes=[],
        vocabularies=[],
        match_type=MatchType.NUMERICAL,
        age_constraint={"min_age": 18, "max_age": 75}
    ),
    EligibilityCriteria(
        id="INCL_003",
        type=CriteriaType.INCLUSION,
        description="Creatinine ≤1.5x ULN (≤1.95 mg/dL)",
        concept_codes=[],
        vocabularies=[],
        match_type=MatchType.NUMERICAL,
        numerical_constraint={
            "data_type": "lab_result",
            "code": "2160-0",  # LOINC: Creatinine
            "max_value": 1.95,
            "unit": "mg/dL"
        }
    ),
    
    # Exclusion criteria
    EligibilityCriteria(
        id="EXCL_001",
        type=CriteriaType.EXCLUSION,
        description="Active brain metastases",
        concept_codes=["94225005"],  # SNOMED: Secondary malignant neoplasm of brain
        vocabularies=["SNOMED"],
        match_type=MatchType.HIERARCHICAL
    ),
    EligibilityCriteria(
        id="EXCL_002",
        type=CriteriaType.EXCLUSION,
        description="Pregnancy",
        concept_codes=["77386006"],  # SNOMED: Pregnancy
        vocabularies=["SNOMED"],
        match_type=MatchType.EXACT
    )
]

# Sample patient data
sample_patients = [
    PatientData(
        patient_id="PT001",
        age=62,
        gender="M",
        conditions=[
            {"code": "254637007", "name": "Non-small cell lung cancer", "vocabulary": "SNOMED", "date": "2023-01-15"},
            {"code": "C78.00", "name": "Secondary malignant neoplasm of unspecified lung", "vocabulary": "ICD10CM", "date": "2023-02-01"}
        ],
        medications=[
            {"code": "40048-14", "name": "Carboplatin", "vocabulary": "RxNorm", "date": "2023-01-20"}
        ],
        lab_results=[
            {"code": "2160-0", "name": "Creatinine", "value": "1.2", "unit": "mg/dL", "date": "2023-03-01"}
        ],
        procedures=[
            {"code": "32507", "name": "Thoracotomy", "vocabulary": "HCPCS", "date": "2023-01-18"}
        ],
        vitals=[]
    ),
    PatientData(
        patient_id="PT002",
        age=45,
        gender="F",
        conditions=[
            {"code": "254637007", "name": "Non-small cell lung cancer", "vocabulary": "SNOMED", "date": "2023-01-10"},
            {"code": "77386006", "name": "Pregnancy", "vocabulary": "SNOMED", "date": "2023-02-15"}
        ],
        medications=[],
        lab_results=[
            {"code": "2160-0", "name": "Creatinine", "value": "0.9", "unit": "mg/dL", "date": "2023-03-01"}
        ],
        procedures=[],
        vitals=[]
    )
]

# Screen patient cohort
cohort_results = engine.screen_patient_cohort(sample_patients, trial_criteria, "NCT05123456")

# Generate recruitment report
recruitment_report = engine.create_recruitment_report(cohort_results)

# Print results
print("=== CLINICAL TRIAL ELIGIBILITY SCREENING REPORT ===")
print(f"Trial ID: {recruitment_report['trial_id']}")
print(f"Patients Screened: {recruitment_report['recruitment_summary']['total_screened']}")
print(f"Immediately Eligible: {recruitment_report['recruitment_summary']['immediately_eligible']}")
print(f"Requires Review: {recruitment_report['recruitment_summary']['requires_review']}")

print("\n=== INDIVIDUAL SCREENING RESULTS ===")
for result in cohort_results['screening_results']:
    print(f"\nPatient: {result.patient_id}")
    print(f"  Eligible: {'Yes' if result.eligible else 'No'}")
    print(f"  Confidence: {result.confidence_score:.1%}")
    print(f"  Recommendation: {result.recommendation}")
    
    if result.matched_criteria:
        print(f"  Matched Criteria: {', '.join(result.matched_criteria)}")
    
    if result.exclusion_reasons:
        print(f"  Exclusion Reasons: {'; '.join(result.exclusion_reasons)}")

print("\n=== RECRUITMENT RECOMMENDATIONS ===")
for rec in recruitment_report['contact_recommendations']:
    print(f"Patient {rec['patient_id']}: {rec['priority']} priority - {rec['next_action']}")
    print(f"  Contact within: {rec['contact_urgency']}")
    print(f"  Notes: {rec['notes']}")
    print()

print("=== INSIGHTS ===")
for insight in cohort_results['insights']:
    print(f"• {insight}")

Expected Output

=== CLINICAL TRIAL ELIGIBILITY SCREENING REPORT ===
Trial ID: NCT05123456
Patients Screened: 2
Immediately Eligible: 1
Requires Review: 0

=== INDIVIDUAL SCREENING RESULTS ===

Patient: PT001
  Eligible: Yes
  Confidence: 100.0%
  Recommendation: ELIGIBLE: High confidence match (100.0% inclusion criteria met)
  Matched Criteria: INCL_001, INCL_002, INCL_003

Patient: PT002
  Eligible: No
  Confidence: 40.0%
  Recommendation: EXCLUDED: EXCL_002: Pregnancy
  Matched Criteria: INCL_001, INCL_002, INCL_003
  Exclusion Reasons: EXCL_002: Pregnancy

=== RECRUITMENT RECOMMENDATIONS ===
Patient PT001: HIGH priority - Schedule screening visit
  Contact within: Within 48 hours
  Notes: Strong match - 3 criteria met

=== INSIGHTS ===
• High eligibility rate (50.0%) - Good patient population match
• Most common exclusion: EXCL_002: Pregnancy (1 patients)

Integration Patterns

1. EHR Integration with Real-time Screening

class EHRTrialIntegration:
    def __init__(self, omophub_api_key: str, ehr_client):
        self.eligibility_engine = ClinicalTrialEligibilityEngine(omophub_api_key)
        self.ehr = ehr_client
        
    def setup_automated_screening(self, trial_id: str, trial_criteria: List[EligibilityCriteria]) -> Dict[str, Any]:
        """Set up automated screening for new patients"""
        
        # Register screening criteria in system
        screening_config = {
            'trial_id': trial_id,
            'criteria': trial_criteria,
            'auto_screen_new_patients': True,
            'auto_screen_updated_records': True,
            'notification_settings': {
                'eligible_patients': True,
                'potential_patients': True,
                'daily_summary': True
            }
        }
        
        return screening_config
    
    def screen_on_patient_update(self, patient_id: str, updated_fields: List[str]) -> Optional[ScreeningResult]:
        """Screen patient when their record is updated"""
        
        # Check if update is relevant to trial criteria
        relevant_fields = ['conditions', 'medications', 'lab_results', 'procedures', 'demographics']
        
        if not any(field in updated_fields for field in relevant_fields):
            return None
        
        # Get current patient data
        patient_data = self.extract_patient_data(patient_id)
        
        # Get active trial criteria (could be from database)
        active_trials = self.get_active_trials()
        
        screening_results = []
        for trial in active_trials:
            result = self.eligibility_engine.screen_patient(
                patient_data, 
                trial['criteria'], 
                trial['trial_id']
            )
            screening_results.append(result)
        
        # Send notifications for newly eligible patients
        for result in screening_results:
            if result.eligible:
                self.send_eligibility_notification(result)
        
        return screening_results
    
    def extract_patient_data(self, patient_id: str) -> PatientData:
        """Extract patient data from EHR"""
        patient_record = self.ehr.get_patient(patient_id)
        
        return PatientData(
            patient_id=patient_id,
            age=patient_record.get('age', 0),
            gender=patient_record.get('gender', ''),
            conditions=self.extract_conditions(patient_record),
            medications=self.extract_medications(patient_record),
            lab_results=self.extract_lab_results(patient_record),
            procedures=self.extract_procedures(patient_record),
            vitals=self.extract_vitals(patient_record)
        )
    
    def send_eligibility_notification(self, result: ScreeningResult):
        """Send notification to research team"""
        
        notification = {
            'type': 'PATIENT_ELIGIBLE',
            'trial_id': result.trial_id,
            'patient_id': result.patient_id,
            'confidence': result.confidence_score,
            'matched_criteria': result.matched_criteria,
            'message': f"Patient {result.patient_id} is eligible for trial {result.trial_id}",
            'action_required': 'Contact patient to schedule screening visit',
            'generated_at': datetime.now().isoformat()
        }
        
        # Send to research coordinator (implementation depends on notification system)
        self.send_notification(notification)

Best Practices

1. Criteria Management and Validation

class TrialCriteriaValidator:
    def __init__(self, omophub_client):
        self.client = omophub_client
        
    def validate_criteria_set(self, criteria_list: List[EligibilityCriteria]) -> Dict[str, Any]:
        """Validate a set of trial criteria for completeness and accuracy"""
        
        validation_result = {
            'is_valid': True,
            'warnings': [],
            'errors': [],
            'suggestions': []
        }
        
        # Check for conflicting criteria
        conflicts = self.check_criteria_conflicts(criteria_list)
        if conflicts:
            validation_result['warnings'].extend(conflicts)
        
        # Validate concept codes
        for criteria in criteria_list:
            code_validation = self.validate_concept_codes(criteria)
            if code_validation['errors']:
                validation_result['errors'].extend(code_validation['errors'])
                validation_result['is_valid'] = False
            if code_validation['warnings']:
                validation_result['warnings'].extend(code_validation['warnings'])
        
        # Check criteria balance
        inclusion_count = len([c for c in criteria_list if c.type == CriteriaType.INCLUSION])
        exclusion_count = len([c for c in criteria_list if c.type == CriteriaType.EXCLUSION])
        
        if inclusion_count < 3:
            validation_result['warnings'].append(
                f"Few inclusion criteria ({inclusion_count}) - may result in over-broad matching"
            )
        
        if exclusion_count > inclusion_count * 2:
            validation_result['warnings'].append(
                f"Many exclusion criteria ({exclusion_count}) - may result in very low eligibility rates"
            )
        
        # Generate optimization suggestions
        suggestions = self.generate_optimization_suggestions(criteria_list)
        validation_result['suggestions'].extend(suggestions)
        
        return validation_result
    
    def validate_concept_codes(self, criteria: EligibilityCriteria) -> Dict[str, Any]:
        """Validate concept codes in criteria"""
        
        code_validation = {
            'errors': [],
            'warnings': []
        }
        
        for code in criteria.concept_codes:
            for vocab in criteria.vocabularies:
                try:
                    concept = self.client.get_concept_by_code(vocab, code)
                    if not concept:
                        code_validation['errors'].append(
                            f"Invalid concept code: {code} in {vocab}"
                        )
                    else:
                        # Check if concept is standard
                        if concept.get('standard_concept') != 'S':
                            code_validation['warnings'].append(
                                f"Non-standard concept used: {code} ({concept.get('concept_name')})"
                            )
                        
                        # Check if concept is active
                        if concept.get('invalid_reason'):
                            code_validation['warnings'].append(
                                f"Deprecated concept used: {code} - {concept.get('invalid_reason')}"
                            )
                            
                except Exception as e:
                    code_validation['errors'].append(
                        f"Error validating {code} in {vocab}: {str(e)}"
                    )
        
        return code_validation
    
    def check_criteria_conflicts(self, criteria_list: List[EligibilityCriteria]) -> List[str]:
        """Check for potentially conflicting criteria"""
        
        conflicts = []
        
        # Check for age conflicts
        age_criteria = [c for c in criteria_list if c.age_constraint]
        if len(age_criteria) > 1:
            min_ages = [c.age_constraint.get('min_age', 0) for c in age_criteria]
            max_ages = [c.age_constraint.get('max_age', 150) for c in age_criteria]
            
            effective_min = max(min_ages)
            effective_max = min(max_ages)
            
            if effective_min >= effective_max:
                conflicts.append(f"Conflicting age criteria: effective range is {effective_min}-{effective_max}")
        
        # Check inclusion vs exclusion conflicts
        inclusion_codes = set()
        exclusion_codes = set()
        
        for criteria in criteria_list:
            if criteria.type == CriteriaType.INCLUSION:
                inclusion_codes.update(criteria.concept_codes)
            else:
                exclusion_codes.update(criteria.concept_codes)
        
        overlap = inclusion_codes.intersection(exclusion_codes)
        if overlap:
            conflicts.append(f"Codes appear in both inclusion and exclusion: {list(overlap)}")
        
        return conflicts
    
    def generate_optimization_suggestions(self, criteria_list: List[EligibilityCriteria]) -> List[str]:
        """Generate suggestions for optimizing criteria"""
        
        suggestions = []
        
        # Suggest hierarchical matching for condition-based criteria
        exact_match_criteria = [c for c in criteria_list if c.match_type == MatchType.EXACT]
        if len(exact_match_criteria) > 3:
            suggestions.append(
                "Consider using hierarchical matching for some condition criteria to increase patient pool"
            )
        
        # Suggest temporal constraints for more precision
        condition_criteria = [c for c in criteria_list if not c.temporal_constraint and c.concept_codes]
        if len(condition_criteria) > 2:
            suggestions.append(
                "Consider adding temporal constraints (recent diagnosis, treatment timeline) for more precise matching"
            )
        
        return suggestions

Next Steps

I