How to protect PII with dlt and Pydantic
Aman Gupta,
Data Engineer
INTRO
Let’s be honest: Most PII policies live in a Notion page or a PDF that everyone signs during onboarding but is often forgotten during work.
In a modern data stack, a policy that isn't enforceable is just a liability waiting for an audit.
To move from reactive cleanup to proactive governance, we have to "Shift Left". We need to stop treating PII as a documentation problem and start treating it as a Data Contract problem.
The dlt + Pydantic Stack
By placing Pydantic guards at the dlt ingestion gateway, we move the enforcement logic out of the legal department and into the CI/CD pipeline. This setup ensures that:
- The Code is the Documentation: Your Pydantic models are the "source of truth" for both engineers and auditors.
- Zero Leakage: Sensitive data is caught, hashed, or diverted before it ever touches your production storage.
Below, we’ll walk through four architectural patterns that move PII governance off the "dusty shelf" and into your production code. To showcase how to bridge this gap we created the following notebook:
Notebook
This guide demonstrates four architectural patterns for enforcing PII security using dlt and Pydantic.

[→ Run the PII-governance notebook]
Defining a privacy policy is easy; enforcing it in a production pipeline is not. Below, we analyze the logic that transforms static Pydantic models into active ingestion gatekeepers.
Let’s discuss them one by one:
1. Sanitizer
Strategy: Here we hash the known PII's and mask "leaky" fields automatically.
Some data is always sensitive (like names), while other data might contain PII by mistake (like a phone number typed into a "notes" box). We use tags in our Pydantic model to tell the pipeline how to handle each field:
x-pii: Flags known sensitive fields for immediate pseudonymization via hashing (or other techniques like masking).
x-scan-pii: Tags "risky" text fields to be scanned and masked for hidden leaks.
By putting these rules directly in the data contract, the unclean data is fixed while it's moving, ensuring it is already clean by the time it reaches your destination.
# 1. Define the Data Contract with PII Metadata
class CustomerProfile(BaseModel):
firstname: str = Field(json_schema_extra={"x-pii": True})
phone: str = Field(json_schema_extra={"x-pii": True})
notes: str = Field(json_schema_extra={"x-scan-pii": True})
class SupportTicket(BaseModel):
id: int
customer: CustomerProfile
# 2. A dlt transformer
"""
This transformer hides regex matches for privacy and hashes fields with the tags as shown above.
"""2. Gatekeeper
Strategy: Zero tolerance. Block entire batch if PII detected.
Sometimes masking isn't enough. If PII appears where it shouldn't, the entire batch needs to be rejected. This pattern uses Pydantic validators to enforce hard rules, combined with dlt's schema contracts.
If a single record violates the contract, the pipeline stops immediately and alerts the team.
try:
# We run the pipeline. If ANY record fails validation,
# dlt will raise a PipelineStepFailed exception.
p2.run(
dlt.resource(data_2, name="source_tickets") | gatekeeper(contract=SecureTicket),
write_disposition="replace",
schema_contract={"columns": "freeze"} # Lock schema
)
results_2.append("✅ Success: All data passed validation.")
except Exception as e:
# Dig to the root cause
root = e
while root.__cause__:
root = root.__cause__
# Identifies the contract breach
if "PRIVACY BREACH" in str(root):
status_msg = f"Blocked by Contract: {root}"
# Rest of the code lives in the marimo notebook.3. The New field scanner
Strategy: Allow schema evolution, but scan new columns for PII.
Source systems change. New columns appear unexpectedly, and they often contain unstructured data like comments or internal notes. This pattern lets your schema evolve but scans any new columns for PII before they're written to the destination.
It's the difference between blocking all changes and blocking only risky ones.
KNOWN_COLUMNS = frozenset(SupportTicket.model_fields.keys())
def pii_schema_guardian(item: Dict[str, Any]) -> Dict[str, Any]:
"""Scans new columns for PII before allowing schema evolution."""
# Detect schema drift
new_columns = set(item.keys()) - KNOWN_COLUMNS
for col in new_columns:
value = item[col]
# Scan unmanaged columns
if isinstance(value, str) and PII_RE.search(value):
msg_3 = f"Privacy breach: PII detected in column '{col}'"
send_alert(msg_3)
raise ValueError(msg_3)
return item
# Rest of the code lives in the marimo notebook.4. The Quarantine
Strategy: Redirect data exceptions to keep the pipeline moving.
A hard stop isn't always the best move for downstream teams who depend on timely data. This pattern uses "router" logic: records that meet the contract flow to production, while those needing review are gently diverted to a quarantine zone with a note explaining why.
By isolating specific records instead of stopping the whole process, you maintain uptime while giving your team a manageable queue to review later.
def quarantine(records, contract: type[BaseModel]):
"""Diverts clean records to 'validated', contaminated ones to 'quarantine'."""
validated = []
quarantine = []
for record in records:
try:
# 1. Attempt strict validation against the contract
clean = contract.model_validate(record)
validated.append(clean.model_dump())
except Exception as e:
# 2. Capture the root cause
root_cause = getattr(e, "__cause__", None) or e
# 3. Route to quarantine with metadata
contaminated = record.copy()
contaminated["_quarantine_reason"] = str(root_cause)
quarantine.append(contaminated)
# 4. Alert
send_alert(f"Quarantined record ID {record.get('id')}: {str(root_cause)}")
return validated, quarantine
# Rest of the code lives in the marimo notebook.In conclusion
dlt used with pydantic gives you the opportunity to create PII data contracts on your data pipelines before loading.
For more info about maintaining your data quality, check out the complete dlt + dltHub data quality lifecycle.