dltHub
Blog /

How to protect PII with dlt and Pydantic

  • Aman Gupta,
    Data Engineer

INTRO

According to National Institute of Standards and Technology (NIST), Personally Identifiable Information (PII) is any information maintained by an agency that can be used to distinguish or trace an individual's identity such as a name, Social Security Number, or biometric records as well as any other data, like medical or financial history, that is linked or linkable to a specific person.

We’ve all seen "privacy policies" that live in dusty Confluence pages, but a policy is only as good as the code that enforces it. Rather than relying on manual audits or "Monday morning swearing," we can establish automated guardrails that manage sensitive data at the gateway. This is critical for staying compliant with global mandates, like GDPR’s strict "Right to be Forgotten" or HIPAA’s rigorous patient privacy standards.

With dlt and Pydantic guards applied at the gateway, we move beyond passive documentation to a living Data Contract. This ensures that our PII policy is strictly enforced as code, guaranteeing that only schema-compliant data enters the downstream lifecycle with complete accuracy.

To showcase how to bridge this gap we created the following notebook:

Notebook

This guide demonstrates four architectural patterns for enforcing PII security using dlt and Pydantic.

[→ Run the PII-governance Notebook]

Defining a privacy policy is easy; enforcing it in a production pipeline is not. Below, we analyze the logic that transforms static Pydantic models into active ingestion gatekeepers.

Let’s discuss them one by one:

1. Sanitizer

Strategy: Here we hash the known PII's and mask "leaky" fields automatically.

Some data is always sensitive (like names), while other data might contain PII by mistake (like a phone number typed into a "notes" box). We use tags in our Pydantic model to tell the pipeline how to handle each field:

x-pii: Flags known sensitive fields for immediate pseudonymization via hashing (or other techniques like masking).

x-scan-pii: Tags "risky" text fields to be scanned and masked for hidden leaks.

By putting these rules directly in the data contract, the unclean data is fixed while it's moving, ensuring it is already clean by the time it reaches your destination.

# 1. Define the Data Contract with PII Metadata
class CustomerProfile(BaseModel):
    firstname: str = Field(json_schema_extra={"x-pii": True})
    phone: str = Field(json_schema_extra={"x-pii": True})
    notes: str = Field(json_schema_extra={"x-scan-pii": True})

class SupportTicket(BaseModel):
    id: int
    customer: CustomerProfile
    
# 2. A dlt transformer
"""
This transformer hides regex matches for privacy and hashes fields with the ⁣ tags as shown above. 
"""

Logic for the sanitizer pattern is in cell[6] of the PII-Governance Notebook.

2. Gatekeeper

Strategy: Zero tolerance. Block entire batch if PII detected.

Sometimes masking isn't enough. If PII appears where it shouldn't, the entire batch needs to be rejected. This pattern uses Pydantic validators to enforce hard rules, combined with dlt's schema contracts.

If a single record violates the contract, the pipeline stops immediately and alerts the team.

try:
    # We run the pipeline. If ANY record fails validation, 
    # dlt will raise a PipelineStepFailed exception.
    p2.run(
        dlt.resource(data_2, name="source_tickets") | gatekeeper(contract=SecureTicket),
        write_disposition="replace",
        schema_contract={"columns": "freeze"}  # Lock schema
    )
    results_2.append("✅ Success: All data passed validation.")

except Exception as e:
    # Dig to the root cause
    root = e
    while root.__cause__:
        root = root.__cause__
    
    # Identifies the contract breach
    if "PRIVACY BREACH" in str(root):
        status_msg = f"Blocked by Contract: {root}"
        
# Rest of the code lives in the marimo notebook.

Logic for the gatekeeper pattern is in cell[8] of the PII-Governance Notebook.

3. The Guardian

Strategy: Allow schema evolution, but scan new columns for PII.

Source systems change. New columns appear unexpectedly, and they often contain unstructured data like comments or internal notes. This pattern lets your schema evolve but scans any new columns for PII before they're written to the destination.

It's the difference between blocking all changes and blocking only risky ones.

KNOWN_COLUMNS = frozenset(SupportTicket.model_fields.keys())

def pii_schema_guardian(item: Dict[str, Any]) -> Dict[str, Any]:
    """Scans new columns for PII before allowing schema evolution."""
    # Detect schema drift
    new_columns = set(item.keys()) - KNOWN_COLUMNS

    for col in new_columns:
        value = item[col]
        # Scan unmanaged columns
        if isinstance(value, str) and PII_RE.search(value):
            msg_3 = f"Privacy breach: PII detected in column '{col}'"
            send_alert(msg_3)
            raise ValueError(msg_3)
    return item
 
# Rest of the code lives in the marimo notebook.

Logic for the guardian pattern is in cell[10] of the PII-Governance Notebook.

4. The Quarantine

Strategy: Redirect data exceptions to keep the pipeline moving.

A hard stop isn't always the best move for downstream teams who depend on timely data. This pattern uses "router" logic: records that meet the contract flow to production, while those needing review are gently diverted to a quarantine zone with a note explaining why.

By isolating specific records instead of stopping the whole process, you maintain uptime while giving your team a manageable queue to review later.

def quarantine(records, contract: type[BaseModel]):
    """Diverts clean records to 'validated', contaminated ones to 'quarantine'."""
    validated = []
    quarantine = []
    for record in records:
        try:
            # 1. Attempt strict validation against the contract
            clean = contract.model_validate(record)
            validated.append(clean.model_dump())
        except Exception as e:
            # 2. Capture the root cause
            root_cause = getattr(e, "__cause__", None) or e

            # 3. Route to quarantine with metadata
            contaminated = record.copy()
            contaminated["_quarantine_reason"] = str(root_cause)
            quarantine.append(contaminated)

            # 4. Alert
            send_alert(f"Quarantined record ID {record.get('id')}: {str(root_cause)}")

    return validated, quarantine
    
 # Rest of the code lives in the marimo notebook.

Logic for the quarantine pattern is in cell[12] of the PII-Governance Notebook.

Why do we need governance with schema contracts using dlt and pydantic?

  • Shift Left: Catching PII at the ingestion gateway is 10x cheaper and safer than trying to "find and delete" it once reaches your warehouse.
  • A decoupled world: In Pydantic, the policy is defined by the managerial or security teams, while data engineers concentrate on the pipeline in dlt.
  • When a regulator asks how you handle PII, you show them your Pydantic models, not a word document.

CTA