Skip to main content

1. The Mapping Bottleneck

Every OMOP ETL has a mapping step: take the source system’s local codes and translate them to standard OMOP concept IDs. This step has two costs that people conflate but shouldn’t: Cost 1: The one-time setup. Figuring out what each local code maps to. “Cr_Serum” → LOINC 2160-0. “WBC_Count” → LOINC 6690-2. “Local_Sepsis_Dx” → SNOMED 91302008. This is the hard intellectual work - searching vocabularies, validating matches, handling ambiguity. Cost 2: The runtime application. Applying those mappings to millions of records every night. This is a SQL JOIN or pandas merge - fast, mechanical, and local. The mistake is conflating these two costs and loading a full Athena vocabulary database to handle both. You don’t need 4GB of vocabulary tables to map 500 unique local codes. But you also can’t make HTTP API calls for every row in a million-record batch. OMOPHub is the right tool for Cost 1: the mapping discovery and validation phase. Search for your local codes, verify the matches, build a mapping cache. Then use that cache - locally, with no API calls - for Cost 2: the nightly production run. This is “Lean ETL”: use OMOPHub to build smart, validated mapping files during development. Apply those mappings via local lookups during production. No full vocabulary load needed for the ETL. No API latency during batch processing. Best of both worlds.

2. The Core Concept: The Mapping Cache Pattern

The workflow has three phases: Phase 1: Extract unique source codes. Your source data has millions of records but a much smaller set of unique codes. A hospital lab system might produce 500,000 lab results per month, but only use 800 unique local lab codes. Extract those 800 codes - that’s your mapping workload. Phase 2: Look up each unique code via OMOPHub. Send each of the 800 codes to OMOPHub (search by display name, code, or fuzzy match). Get back the standard OMOP concept ID, name, vocabulary, and domain. This takes a few minutes for 800 codes. Save the results as a mapping file (CSV, JSON, or database table). Phase 3: Apply the mapping cache in production. Your nightly ETL reads the mapping file, joins it to the source data via local lookup (pandas merge, SQL JOIN, dictionary lookup), and writes the enriched records to OMOP CDM tables. Zero API calls. Full local speed. When to re-run Phase 2: When new source codes appear that aren’t in your mapping cache. Your ETL detects unmapped codes, queues them for OMOPHub lookup, and a human (or automated process) reviews and approves the new mappings before they enter the cache.

3. Use Case A: Building a Mapping Cache for a Sepsis Study

A multi-site sepsis research project receives data from four hospitals. Each hospital uses different local codes for lactate, WBC, blood cultures, and sepsis diagnoses. The ETL needs a consistent mapping from all local codes to standard OMOP concepts.
pip install omophub
Python
import omophub
import json

client = omophub.OMOPHub()

# --- Phase 1: Unique source codes from all hospitals ---
# In production, extract these from: SELECT DISTINCT source_code, source_display FROM source_data
unique_source_codes = [
    {"source_code": "Lactate_Serum", "display": "Serum Lactate", "domain_hint": "Measurement"},
    {"source_code": "WBC_Count", "display": "White Blood Cell Count", "domain_hint": "Measurement"},
    {"source_code": "BldCx_Aero", "display": "Blood Culture Aerobic", "domain_hint": "Measurement"},
    {"source_code": "Local_Sepsis_Dx", "display": "Sepsis Diagnosis", "domain_hint": "Condition"},
    {"source_code": "PROC_CVC", "display": "Central Venous Catheter Insertion", "domain_hint": "Procedure"},
    {"source_code": "INVALID_999", "display": "Unknown Local Test", "domain_hint": "Measurement"},
]

print(f"Phase 1: {len(unique_source_codes)} unique source codes to map\n")

# Domain → vocabulary mapping for targeted search
DOMAIN_VOCABS = {
    "Measurement": ["LOINC"],
    "Condition": ["SNOMED"],
    "Procedure": ["SNOMED", "CPT4"],
    "Drug": ["RxNorm"],
}

# --- Phase 2: Look up each code via OMOPHub ---
mapping_cache = []

for entry in unique_source_codes:
    code = entry["source_code"]
    display = entry["display"]
    domain = entry["domain_hint"]
    vocabs = DOMAIN_VOCABS.get(domain, [])

    print(f"  Looking up: '{display}' ({code})")

    omop_id = None
    omop_name = None
    omop_vocab = None
    omop_code = None
    match_method = None

    try:
        # Step 1: Try basic search with vocabulary filter
        results = client.search.basic(
            display,
            vocabulary_ids=vocabs,
            domain_ids=[domain] if domain else [],
            page_size=3,
        )
        candidates = results.get("concepts", []) if results else []

        # Step 2: If basic search misses, try semantic
        if not candidates:
            semantic = client.search.semantic(display, vocabulary_ids=vocabs, domain_ids=[domain] if domain else [], page_size=3)
            candidates = (semantic.get("results", semantic.get("concepts", [])) if semantic else [])
            if candidates:
                match_method = "semantic"

        if candidates:
            best = candidates[0]
            omop_id = best["concept_id"]
            omop_name = best.get("concept_name")
            omop_vocab = best.get("vocabulary_id")
            omop_code = best.get("concept_code")
            match_method = match_method or "basic"


            # Step 3: If non-standard, follow "Maps to" to get standard concept
            if best.get("standard_concept") != "S":
                std_map = client.mappings.get(omop_id, target_vocabulary=vocabs[0] if vocabs else None)
                map_list = (
                    std_map if isinstance(std_map, list)
                    else std_map.get("concepts", std_map.get("mappings", []))
                ) if std_map else []
                if map_list:
                    std = map_list[0]
                    omop_id = std["concept_id"]
                    omop_name = std.get("concept_name")
                    omop_vocab = std.get("vocabulary_id")
                    omop_code = std.get("concept_code")
                    match_method = "mapped_to_standard"

            print(f"    -> {omop_name} ({omop_vocab}: {omop_code}, OMOP: {omop_id}) [{match_method}]")
        else:
            print(f"    -> NO MATCH - needs manual mapping")
            match_method = "unmapped"

    except omophub.APIError as e:
        print(f"    -> API error: {e.message}")
        match_method = "error"

    mapping_cache.append({
        "source_code": code,
        "source_display": display,
        "domain_hint": domain,
        "omop_concept_id": omop_id,
        "omop_concept_name": omop_name,
        "omop_vocabulary_id": omop_vocab,
        "omop_concept_code": omop_code,
        "match_method": match_method,
        "reviewed": False,  # Flag for human review
    })

# --- Save the mapping cache ---
print(f"\n--- Mapping Cache ({len(mapping_cache)} entries) ---")
mapped = sum(1 for m in mapping_cache if m["omop_concept_id"])
unmapped = len(mapping_cache) - mapped
print(f"  Mapped: {mapped}  |  Unmapped: {unmapped}")

# In production, save to CSV or database:
# pd.DataFrame(mapping_cache).to_csv("sepsis_mapping_cache.csv", index=False)

# Print for review
for m in mapping_cache:
    status = f"OMOP {m['omop_concept_id']}" if m["omop_concept_id"] else "UNMAPPED"
    print(f"  {m['source_code']:20s} -> {status:>12s}  [{m['match_method']}]")
The Key Insight: This script runs once during ETL development - not every night. It produces a mapping file that the production ETL uses as a local lookup. If tomorrow’s data batch contains a new source code not in the cache, the ETL logs it as unmapped and queues it for a new OMOPHub lookup + human review. The cache grows over time until it covers all source codes. OMOPHub calls drop to near-zero in steady state.

4. Use Case B: Applying the Mapping Cache in Production

Once you have the mapping cache, the nightly ETL is pure local processing - no API calls.
Python
import pandas as pd

# --- Production ETL: Apply the mapping cache ---

# Step 1: Load the mapping cache (built via OMOPHub in Phase 2)
mapping_cache = pd.DataFrame([
    {"source_code": "Lactate_Serum", "omop_concept_id": 3047181, "omop_concept_name": "Lactate [Moles/volume] in Blood"},
    {"source_code": "WBC_Count", "omop_concept_id": 3000905, "omop_concept_name": "Leukocytes [#/volume] in Blood"},
    {"source_code": "BldCx_Aero", "omop_concept_id": 3016407, "omop_concept_name": "Blood culture"},
    {"source_code": "Local_Sepsis_Dx", "omop_concept_id": 132797, "omop_concept_name": "Sepsis"},
    {"source_code": "PROC_CVC", "omop_concept_id": 4180032, "omop_concept_name": "Insertion of central venous catheter"},
    # "INVALID_999" intentionally absent - unmapped
])

# Step 2: Load today's source data batch (in production: millions of rows from EHR extract)
source_batch = pd.DataFrame([
    {"patient_id": "P001", "source_code": "Lactate_Serum", "value": 2.5, "datetime": "2025-02-01 09:00"},
    {"patient_id": "P001", "source_code": "WBC_Count", "value": 15.2, "datetime": "2025-02-01 09:00"},
    {"patient_id": "P002", "source_code": "Lactate_Serum", "value": 4.1, "datetime": "2025-02-01 10:30"},
    {"patient_id": "P002", "source_code": "Local_Sepsis_Dx", "value": None, "datetime": "2025-02-01 10:30"},
    {"patient_id": "P003", "source_code": "INVALID_999", "value": 99, "datetime": "2025-02-01 11:00"},
])

print(f"Production ETL: {len(source_batch)} records to process\n")

# Step 3: Merge source data with mapping cache - pure local operation, zero API calls
enriched = source_batch.merge(
    mapping_cache[["source_code", "omop_concept_id", "omop_concept_name"]],
    on="source_code",
    how="left",
)

# Step 4: Split into mapped and unmapped
mapped_records = enriched[enriched["omop_concept_id"].notna()]
unmapped_records = enriched[enriched["omop_concept_id"].isna()]

print(f"  Mapped:   {len(mapped_records)} records -> ready for OMOP CDM load")
print(f"  Unmapped: {len(unmapped_records)} records -> queued for OMOPHub lookup\n")

if not unmapped_records.empty:
    new_codes = unmapped_records["source_code"].unique()
    print(f"  New unmapped codes to look up via OMOPHub: {list(new_codes)}")

# The mapped records are ready for OMOP CDM insertion
# (measurement table for labs, condition_occurrence for diagnoses, etc.)
print(f"\n--- Sample Output ---")
print(mapped_records[["patient_id", "source_code", "omop_concept_id", "value"]].to_string(index=False))
The Key Insight: The production ETL is a pandas merge - milliseconds for millions of rows. No HTTP calls, no API latency, no rate limits, no network dependency. OMOPHub was used once (during development) to build the mapping cache. The cache is the artifact that persists. This is the actual “Lean ETL” - lean on runtime resources because the vocabulary work was done upfront.

5. When New Codes Appear

The mapping cache isn’t static. As hospitals add new tests or change local codes, unmapped codes appear. The workflow:
  1. ETL detects unmapped codes (the unmapped_records in Use Case B)
  2. Queue the new codes for OMOPHub lookup
  3. Run the Phase 2 script (Use Case A) on just the new codes
  4. Human reviews the suggested mappings (critical for data quality)
  5. Append approved mappings to the cache
  6. Re-run the ETL for the previously unmapped records
This keeps the cache growing and OMOPHub calls shrinking. In a mature ETL, you might go months without needing a new OMOPHub lookup.
Python
import omophub

client = omophub.OMOPHub()

def lookup_new_codes(new_codes, mapping_cache_path="sepsis_mapping_cache.csv"):
    """Look up unmapped codes via OMOPHub and suggest mappings for review."""

    print(f"\nLooking up {len(new_codes)} new unmapped codes...\n")
    suggestions = []

    for code_info in new_codes:
        code = code_info["source_code"]
        display = code_info.get("display", code)

        try:
            # Step 1: Search OMOPHub for candidates
            results = client.search.basic(display, page_size=3)
            candidates = results.get("concepts", []) if results else []

            if candidates:
                for i, c in enumerate(candidates[:3]):
                    suggestions.append({
                        "source_code": code,
                        "source_display": display,
                        "suggestion_rank": i + 1,
                        "omop_concept_id": c["concept_id"],
                        "omop_concept_name": c.get("concept_name"),
                        "omop_vocabulary_id": c.get("vocabulary_id"),
                        "approved": False,  # Human must approve
                    })
                    print(f"  {code}: Suggestion {i+1}: {c.get('concept_name')} ({c.get('vocabulary_id')})")
            else:
                print(f"  {code}: No suggestions found - needs manual mapping")
                suggestions.append({
                    "source_code": code,
                    "source_display": display,
                    "suggestion_rank": None,
                    "omop_concept_id": None,
                    "omop_concept_name": "MANUAL MAPPING REQUIRED",
                    "approved": False,
                })

        except omophub.APIError as e:
            print(f"  {code}: API error - {e.message}")

    return suggestions

# Example: codes that failed mapping in today's ETL run
new_unmapped = [
    {"source_code": "INVALID_999", "display": "Unknown Local Test"},
    {"source_code": "BNP_Plasma", "display": "Brain Natriuretic Peptide"},
]

suggestions = lookup_new_codes(new_unmapped)

6. Conclusion: OMOPHub for Discovery, Local Cache for Production

The “Compute Tax” isn’t loading vocabulary tables - it’s doing vocabulary lookups at runtime instead of build time. The lean approach:
  • Build time: Use OMOPHub to discover and validate mappings for your unique source codes. Takes minutes. Run it once per new site or when new codes appear.
  • Runtime: Apply the mapping cache via local pandas merge or SQL JOIN. Takes milliseconds per million records. No API calls. No network dependency.
OMOPHub’s value isn’t replacing your local vocabulary - it’s making the mapping discovery phase fast and accessible without requiring a full Athena installation. Once the mappings are built, they live locally. The ETL stays lean because the vocabulary intelligence was front-loaded. Start with your messiest source system. Extract the unique codes. Run them through OMOPHub. Build the mapping cache. Plug it into your ETL. Measure the difference.