Row vs. Batch Contracts: Using AWAP to Prevent Schema Scars and State Corruption with dlt
Roshni Melwani,
Working Student
In the data engineering community, the debate usually swings between two polar opposites, each with its own set of headaches. Most engineers feel stuck between two difficult choices: building a "strict" pipeline that breaks at every minor change, or a "permissive" one that silently fills the warehouse with errors.
This dilemma exists because ingestion is more than just moving data; it is an event that modifies your production state. Relying on downstream SQL filters assumes that ingestion is a harmless, additive activity. In reality, by the time an anomaly triggers an alert, the damage is likely already done—either through permanent schema scars or corrupted historical records.
Before I start with the tutorial, let's first look at both the opposite approaches a little closer:
Strict Schema Enforcement
This approach treats any upstream drift, like a field renaming from user_id to UserID as a fatal error. While this prevents ‘garbage’ from entering the warehouse, it creates a tight coupling with upstream producers. The result is frequent SLA breaches: the pipeline halts until a human manually updates the schema, often turning the data team into an operational bottleneck.
Permissive Auto-Evolution
Modern tools like dlt are designed with the intelligence to offer auto-evolution, significantly reducing downtime and preventing unnecessary pipeline breaks by adapting to changes on the fly. However, when this evolution is left completely unchecked to prioritize "uptime" at all costs, it creates a dangerous form of technical debt:
- Schema Scars: A single malformed row can force a permanent metadata mutation (e.g., promoting a
FLOATto aSTRING). This breaks downstream models and requires manual DDL reversals to fix. - State Corruption: Semantic errors such as a batch where 90% of values are
NULLcan pass structural validation but silently overwrite historical accuracy. In stateful tables, aMERGEof these values can wipe your SCD2 history, turning a simple bug into a multi-day recovery project.
Fixing these issues after the fact isn't ‘filtering’; it’s performing risky SQL surgery on production tables.
The Middle Ground: AWAP
Mature engineering requires a distinction between recoverable drift and destructive anomalies. You shouldn’t have to choose between a broken pipeline and a corrupted warehouse.
AWAP (Audit-Write-Audit-Publish) introduces a multi-gate validation layer. It provides the flexibility to accept minor upstream changes while maintaining a hard block on data that would compromise system integrity.
To implement AWAP effectively, you have to move away from the idea of a single data quality check. Real-world data corruption happens at different granularities, requiring a two-gate approach that separates Syntactic validation from Semantic validation.
The Two-Gate Architecture: Row vs. Batch
1. The Row-Level Failure (Micro)
These are syntactic violations: structural errors that can be identified row-by-row.
- Examples: Missing primary keys, type mismatches (string in an integer column), or malformed JSON.
- The Threat: The Schema Scar. Modern warehouses often use "Auto-Evolution." If one malformed row forces a DDL change (e.g.,
INT→STRING), the metadata is mutated forever. Even if you delete the row, the type change remains, breaking downstream models and requiring manual intervention.
2. The Batch-Level Failure (Macro)
These are semantic violations: logic problems that only appear when you look at the data in aggregate.
- Examples: A sudden spike in NULL values or a batch where 100% of the timestamps are identical.
- The Threat: State Corruption. Imagine a bug causes an upstream system to send
NULLfor auser_statusfield. Individually, aNULLis valid. But if 90% of your batch is suddenlyNULL, it’s a semantic failure. If youMERGEthis, you effectively "wipe" the history of your existing users, replacing known truth with unverified values.
Evolution: From WAP to AWAP
In data engineering, WAP (Write‑Audit‑Publish) is the standard pattern for protecting production. You write to staging, audit the batch, and then promote it.
Why extend to AWAP?
Standard WAP is insufficient when dealing with untrusted sources or auto-evolving schemas. If you wait until data is in staging to audit it, the Schema Scar is already born.
| The Logic Shift |
|---|
| WAP (Write-Audit-Publish): Audits the batch after it lands in staging. It protects the final production table, but it’s too late to shield the warehouse from Schema Scars or the storage noise of thousands of invalid rows. |
| AWAP (Audit-Write-Audit-Publish): Implements a ‘Front Door’ check. It handles Row-level integrity in-flight to allow for safe schema evolution, then uses the staging layer to audit Batch-level trust before the final merge. |
The Four-Stage Protocol:
- Audit (Pre-Ingest): A stateless filter checking for syntactic integrity (e.g., required keys, type-casting). Prevents malformed rows from triggering DDL changes.
- Write: Ingestion of the verified payload into the staging layer.
- Audit (Post-Staging): A stateful check against the batch (e.g., statistical drift, null-rate thresholds) to identify semantic anomalies.
- Publish: Atomic promotion (e.g.,
ALTER TABLE ... APPENDor View Swap) of the audited batch to production.
Practical Example: The Street Survey System
For the sake of seeing AWAP in action, we’ll use a simplified street‑survey system:
- Field agents use a mobile app to interview people on the street.
- Each submitted survey becomes one row with fields like
survey_id,agent_id,started_at, andsubmitted_atthat hits our pipeline.
Preview: Incoming App Data
| survey_id | agent_id | started_at | submitted_at |
|---|---|---|---|
| S001 | agent_wolf | 2025-01-10T09:00:00 | 2025-01-10T09:00:05 |
| S002 | — | 2025-01-10T09:10:00 | 2025-01-10T09:10:07 |
| S003 | agent_wolf | 2025-01-10T09:20:00 | 2025-01-10T09:20:06 |
| S004 | agent_wolf | 2025-01-10T09:30:00 | 2025-01-10T09:40:10 |
| S009 | agent_lion | 2025-01-10T09:00:00 | 2025-01-10T09:10:05 |
| S026 | agent_panda | 2025-01-10T09:15:00 | 2025-01-10T09:25:15 |
| S035 | — | 2025-01-10T09:20:00 | 2025-01-10T09:20:20 |
How ‘Bad Data’ Surfaces
In this scenario, quality issues fall into two distinct categories:
- Unusable Rows (Micro): These are syntactic failures like a missing
agent_id(seeS002). Since they can’t be tied to a real event, they shouldn't enter your warehouse at all. We filter these at the front door (Audit 1). - Untrusted Aggregates (Macro): Other rows look fine individually but become suspicious in aggregate. For example,
agent_wolfis completing "10-minute" surveys in 10 seconds.
A single fast response could be a fast walker. But once 30% of an agent's batch is that quick, the question changes:
- From: “Is this row usable?” (Row-level integrity)
- To: “Do we trust this agent's data at all?” (Batch-level trust)
Implementing AWAP with dlt
To put this into practice, we’ll use dlt to build the two gates.
Audit 1: The Row-Level Gate
We drop surveys without agent_id and flag suspicious response times in-flight before they land in staging.
import dlt
from datetime import datetime
from dlt.sources.filesystem import filesystem, read_csv
FAST_THRESHOLD_SECONDS = 15
@dlt.resource(name="survey_staging")
def survey_resource(bucket_url: str):
files = filesystem(bucket_url=bucket_url, file_glob="**/*.csv")
yield from files | read_csv(chunksize=1000)
# Filter: Micro-check for missing IDs
def has_agent_id(row):
return bool(row.get("agent_id"))
# Map: Tag suspiciously fast submissions
def audit_response_time(row):
started = datetime.fromisoformat(row["started_at"])
submitted = datetime.fromisoformat(row["submitted_at"])
response_time = int((submitted - started).total_seconds())
row["response_time_seconds"] = response_time
row["is_suspicious"] = response_time < FAST_THRESHOLD_SECONDS
return row
survey_resource.add_filter(has_agent_id)
survey_resource.add_map(audit_response_time)
pipeline = dlt.pipeline(pipeline_name="survey_demo", destination="duckdb", dataset_name="staging")
pipeline.run(survey_resource(bucket_url))Preview of data in staging after filtering and flagging
| survey_id | agent_id | started_at | submitted_at | response_time_seconds | suspicious |
|---|---|---|---|---|---|
| S001 | agent_wolf | 2025-01-10T09:00:00 | 2025-01-10T09:00:05 | 5 | TRUE |
| S003 | agent_wolf | 2025-01-10T09:20:00 | 2025-01-10T09:20:06 | 6 | TRUE |
| S004 | agent_wolf | 2025-01-10T09:30:00 | 2025-01-10T09:40:10 | 610 | FALSE |
| S009 | agent_lion | 2025-01-10T09:00:00 | 2025-01-10T09:10:05 | 605 | FALSE |
| S026 | agent_panda | 2025-01-10T09:15:00 | 2025-01-10T09:25:15 | 615 | FALSE |
| S024 | agent_fox | 2025-01-10T10:15:00 | 2025-01-10T10:25:40 | 640 | FALSE |
| S036 | agent_dragon | 2025-01-10T09:30:00 | 2025-01-10T09:30:09 | 9 | TRUE |
Seeing the bigger picture in staging
Now that the data is in staging, we can compute aggregates without the noise of invalid rows that were caught in the first audit. This is where the agent_wolf pattern shows up clearly.
SUSPICIOUS_RATE_THRESHOLD = 0.25
with pipeline.sql_client() as client:
# Identify untrusted agents in the current batch
client.execute_query(f"""
CREATE OR REPLACE TEMP TABLE untrusted_agents AS
SELECT
agent_id,
AVG(CASE WHEN is_suspicious THEN 1 ELSE 0 END) as suspicious_rate
FROM survey_staging
GROUP BY agent_id
HAVING suspicious_rate > {SUSPICIOUS_RATE_THRESHOLD};
""")untrusted_agents
| agent_id | suspicious_rate |
|---|---|
| agent_wolf | 0.33 |
| agent_dragon | 0.43 |
Audit 2: The Batch‑Level Gate
In the second audit, we apply the batch‑level gate: we block any agent with a suspicious rate of above 25% from going to prod.
[Note on Implementation: In this demo, we create a new table for clarity. In a high-scale environment (Snowflake/BigQuery), you would use metadata-only operations like zero-copy clones or view swaps to promote this audited snapshot without the cost of moving data twice.] put in a box!!!
# Publish: Promote trusted data to production, excluding the anomalies
client.execute_query("""
CREATE OR REPLACE TABLE survey_analytics AS
SELECT s.*
FROM survey_staging s
WHERE s.agent_id NOT IN (SELECT agent_id FROM untrusted_agents);
""")Data powering production after the second audit
| survey_id | agent_id | started_at | submitted_at | response_time_seconds | suspicious |
|---|---|---|---|---|---|
| S010 | agent_lion | 2025-01-10T09:10:00 | 2025-01-10T09:20:20 | 620 | FALSE |
| S025 | agent_panda | 2025-01-10T09:05:00 | 2025-01-10T09:05:08 | 8 | TRUE |
| S026 | agent_panda | 2025-01-10T09:15:00 | 2025-01-10T09:25:15 | 615 | FALSE |
| S017 | agent_fox | 2025-01-10T09:05:00 | 2025-01-10T09:14:40 | 580 | FALSE |
| S022 | agent_fox | 2025-01-10T09:55:00 | 2025-01-10T10:05:05 | 605 | FALSE |
| S024 | agent_fox | 2025-01-10T10:15:00 | 2025-01-10T10:25:40 | 640 | FALSE |
Conclusion: Engineering for Resilience
AWAP is the middle ground between a brittle pipeline and a permissive one that silently corrupts your history.
It provides an architecture that acknowledges the reality of upstream drift: allowing for healthy Schema Evolution, without accepting the risk of state corruption. By separating row-level syntax from batch-level semantics, you stop treating every minor naming change as a crisis, while ensuring your production tables remain a reliable source of truth.
The complexity is a deliberate trade-off: you are choosing a more rigorous ingestion pattern today to avoid the high cost of manual ‘SQL Surgery’ tomorrow.
- WAP protects the output state (Production).
- AWAP protects system integrity by securing the input to ensure a reliable output.
Learn More:
- Go deeper with our Data Quality Lifecycle Guide.
- This pattern builds on the foundational concepts in our WAP (Write-Audit-Publish) Article.