Data masking for SQL database sources
info
The source code for this example can be found in our repository at: https://github.com/dlt-hub/dlt/tree/devel/docs/examples/data_masking
About this Example
This example shows how to build a reusable column-masking function for the
sql_database source. The function uses a closure to capture masking
configuration, so you can apply it to any resource via add_map().
Based on an implementation by Michał Zawadzki and Varun Chawla.
We'll learn how to:
- Use a closure to create a configurable masking callback
- Handle all backends supported by sql_database: PyArrow, ConnectorX, Pandas, and SQLAlchemy
- Support two masking strategies: replace with a mask string, or nullify
Usage with sql_table
from dlt.sources.sql_database import sql_table
table = sql_table(table="users")
table.add_map(mask_columns(columns=["email", "ssn"]))
pipeline = dlt.pipeline(
pipeline_name="masked_data",
destination="duckdb",
dataset_name="mydata",
)
load_info = pipeline.run(table)
Full source code
from enum import Enum
from typing import Any, Callable, Dict, List, Optional, Union
import pyarrow as pa
import pandas as pd
class MaskingMethod(str, Enum):
MASK = "mask"
NULLIFY = "nullify"
def mask_columns(
columns: List[str],
method: Optional[MaskingMethod] = None,
mask: str = "******",
) -> Callable[..., Any]:
"""Return a mapping function that masks the specified columns.
Args:
columns (List[str]): Column names to mask.
method (Optional[MaskingMethod]): MASK replaces with `mask` string,
NULLIFY sets to None. Defaults to MASK.
mask (str): Replacement string used when method is MASK.
Returns:
Callable: A function suitable for `resource.add_map()`.
"""
resolved_method: MaskingMethod = (
method if method is not None else MaskingMethod.MASK
)
def _apply(
table_or_row: Union[pa.Table, pd.DataFrame, Dict[str, Any]],
) -> Union[pa.Table, pd.DataFrame, Dict[str, Any]]:
# pyarrow / connectorx backends
if isinstance(table_or_row, pa.Table):
table = table_or_row
for col in table.schema.names:
if col in columns:
if resolved_method == MaskingMethod.MASK:
replacement = pa.array([mask] * table.num_rows)
else:
replacement = pa.nulls(
table.num_rows, type=table.schema.field(col).type
)
table = table.set_column(
table.schema.get_field_index(col), col, replacement
)
return table
# pandas backend
if isinstance(table_or_row, pd.DataFrame):
df = table_or_row
for col in df.columns:
if col in columns:
df[col] = mask if resolved_method == MaskingMethod.MASK else None
return df
# sqlalchemy backend (dict rows)
if isinstance(table_or_row, dict):
row = table_or_row
for col in row:
if col in columns:
row[col] = mask if resolved_method == MaskingMethod.MASK else None
return row
raise NotImplementedError(f"Unsupported data type: {type(table_or_row)}")
return _apply
if __name__ == "__main__":
import dlt
# create a dummy source with sensitive columns
@dlt.resource(write_disposition="replace")
def users():
yield [
{
"id": 1,
"name": "Alice",
"email": "alice@example.com",
"ssn": "123-45-6789",
},
{"id": 2, "name": "Bob", "email": "bob@example.com", "ssn": "987-65-4321"},
{
"id": 3,
"name": "Charlie",
"email": "charlie@example.com",
"ssn": "555-12-3456",
},
]
# mask email and ssn before loading
masked_users = users()
masked_users.add_map(mask_columns(columns=["email", "ssn"]))
pipeline = dlt.pipeline(
pipeline_name="data_masking_example",
destination="duckdb",
dataset_name="mydata",
)
load_info = pipeline.run(masked_users)
print(load_info)
# verify: sensitive columns are masked, other columns are untouched
with pipeline.sql_client() as client:
rows = client.execute_sql(
"SELECT id, name, email, ssn FROM mydata.users ORDER BY id"
)
for row in rows:
assert row[2] == "******", f"email should be masked, got {row[2]}"
assert row[3] == "******", f"ssn should be masked, got {row[3]}"
assert rows[0][1] == "Alice"
assert rows[1][1] == "Bob"
# now demonstrate NULLIFY method on a second table
@dlt.resource(
write_disposition="replace",
columns={"phone": {"data_type": "text"}},
)
def customers():
yield [
{"id": 1, "name": "Dana", "phone": "555-0001"},
{"id": 2, "name": "Eve", "phone": "555-0002"},
]
nullified_customers = customers()
nullified_customers.add_map(
mask_columns(columns=["phone"], method=MaskingMethod.NULLIFY)
)
load_info = pipeline.run(nullified_customers)
print(load_info)
with pipeline.sql_client() as client:
rows = client.execute_sql(
"SELECT id, name, phone FROM mydata.customers ORDER BY id"
)
for row in rows:
assert row[2] is None, f"phone should be nullified, got {row[2]}"
assert rows[0][1] == "Dana"
print("All masking pipeline checks passed.")