The Importance of Data Quality
With AI-driven applications taking over the world (or at least your inbox), two truths stand out:
AI is only as smart as the data it’s fed — garbage in, garbage out.
Data applications operate in a constantly changing environment, unlike traditional software, where possibilities are predefined.
Since data teams typically have limited control over how data is generated upstream, the risk of accumulating errors — often termed “data cascades” — is significant. Without proper validation mechanisms, bad data can propagate across the pipeline, leading to erroneous predictions, faulty decision-making, and revenue losses.
Studies show data teams spend 30–40% of their time fixing data quality issues — time that could be spent on more enjoyable things, like automating reports or watching memes.
Hence, an effective mechanism to validate data at certain checkpoints is crucial. This is where the write-audit-publish (WAP) pattern comes into play.
What is Write-Audit-Publish (WAP)?
The WAP pattern is like quality control for data — it makes sure nothing shady gets into production. Originally popularised by Netflix in 2017 (because even streaming giants hate bad data), WAP has become a go-to strategy for keeping data squeaky clean.

1. Branch Creation
An audit branch (e.g., audit) is created from the production Iceberg table (main). This temporary branch allows modifications without impacting production data, enabling teams to test and refine changes safely before committing them.
2. Data Ingestion
New data is ingested into the audit branch using engines like Apache Spark.This approach isolates production data from potential errors while ensuring seamless updates. Incremental ingestion can be automated to maintain low latency and ensure data freshness.
3. Data Validation
Data validation is performed using tools such as Great Expectations or AWS Deequ. Validation checks include schema enforcement, duplicate detection, and statistical anomaly detection to uphold data integrity.Any inconsistencies are flagged early, allowing data engineers to investigate and resolve issues proactively.
4. Handling Validation Failures
If validation fails, the audit branch is discarded without affecting production. Alerts and monitoring systems (e.g., Monitors, Pagers) notify teams of issues in real time.Valid records can be moved to a staging branch before being published to production.
5. Publishing to Production
Once the data passes validation, it is merged into the production branch using Iceberg’s fast_forward command. This ensures that only high-quality data is integrated into downstream analytics, machine learning models, and business intelligence dashboards.
Write-Audit-Publish (WAP) using Iceberg Branches
We will implement a WAP pattern using Iceberg branches on AWS. To achieve this, we will utilise the Air Traffic Data along with the following tools:
AWS Glue Notebooks
AWS Glue Catalog
Amazon S3
AWS Deequ
This demo does not include IAM permissions required for the setup, as they are available in the AWS documentation.
Setting up AWS Glue Notebook
Create a new database in AWS glue catalog named air_traffic
Download the following jars from the iceberg release page and upload them to the s3 bucket at s3://<iceberg_s3_bucket>/jars/
On the Jupyter Notebook that you created in AWS Glue Studio, run the following cell to use Iceberg with Glue
%idle_timeout 2880
%glue_version 5.0
%worker_type G.1X
%number_of_workers 5
%extra_jars s3://<iceberg_s3_bucket>/jars/iceberg-aws-bundle-1.8.0.jar,s3://<iceberg_s3_bucket>/jars/iceberg-spark-runtime-3.5_2.12-1.8.0.jar
%additional_python_modules pydeequ==1.4.0
%%configure
{"--conf": "spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions"}
Initialise the SparkSession by running the following cell
import os
import pyspark.sql.functions as f
from pyspark.sql import SparkSession
from pydeequ.checks import *
from pydeequ.verification import *
os.environ['SPARK_VERSION'] = '3.5'
CATALOG = 'glue_catalog' # Iceberg Catalog
# Iceberg configuration
spark = SparkSession.builder \
.config(f"spark.sql.catalog.{CATALOG}", "org.apache.iceberg.spark.SparkCatalog") \
.config(f"spark.sql.catalog.{CATALOG}.type", "glue") \
.config("spark.sql.extensions","org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \
.config("spark.sql.defaultCatalog", CATALOG)\
.config("spark.sql.execution.arrow.pyspark.enabled", "true")\
.getOrCreate()
Preparing Table and Data for WAP
Load the Air Traffic data into a PySpark DataFrame and apply basic data cleaning operations.
def clean_df(df):
for column in df.columns:
df = df.withColumnRenamed(column, column.lower())
df = df.withColumnRenamed("month_num", "month")
return df
europe_data_df = spark.read \
.format("csv") \
.option("header", "true") \
.option("inferSchema", "true") \
.load(f"s3://{s3_bucket_name}/raw_data/air_traffic_europe/inital_load/")
europe_data_df = clean_df(europe_data_df)
europe_data_df.show()
+----+-----+---------+--------+--------+----------+----------+---------+---------+---------+-------------+-------------+-------------+
|year|month|month_mon|flt_date|apt_icao| apt_name|state_name|flt_dep_1|flt_arr_1|flt_tot_1|flt_dep_ifr_2|flt_arr_ifr_2|flt_tot_ifr_2|
+----+-----+---------+--------+--------+----------+----------+---------+---------+---------+-------------+-------------+-------------+
|2019| 1| JAN|01-01-19| LATI| Tirana| Albania| 29| 26| 55| NULL| NULL| NULL|
|2019| 1| JAN|01-01-19| UDYZ| Yerevan| Armenia| 29| 33| 62| NULL| NULL| NULL|
|2019| 1| JAN|01-01-19| LOWG| Graz| Austria| 5| 7| 12| NULL| NULL| NULL|
|2019| 1| JAN|01-01-19| LOWI| Innsbruck| Austria| 28| 26| 54| NULL| NULL| NULL|
|2019| 1| JAN|01-01-19| LOWK|Klagenfurt| Austria| 4| 4| 8| NULL| NULL| NULL|
|2019| 1| JAN|01-01-19| LOWL| Linz| Austria| 1| 2| 3| NULL| NULL| NULL|
|2019| 1| JAN|01-01-19| LOWS| Salzburg| Austria| 22| 26| 48| NULL| NULL| NULL|
|2019| 1| JAN|01-01-19| LOWW| Vienna| Austria| 257| 232| 489| 257| 230| 487|
|2019| 1| JAN|01-01-19| EBAW| Antwerp| Belgium| 7| 5| 12| NULL| NULL| NULL|
|2019| 1| JAN|01-01-19| EBBR| Brussels| Belgium| 214| 205| 419| 214| 208| 422|
+----+-----+---------+--------+--------+----------+----------+---------+---------+---------+-------------+-------------+-------------+
only showing top 10 rows
2. Create an iceberg table from the dataframe
ICEBERG_LOC = f's3://{s3_bucket_name}/warehouse/air_traffic.europe' # Iceberg data location
DB_TBL = 'air_traffic.europe'
europe_data_df.createOrReplaceTempView('tmp')
spark.sql(f"""
CREATE TABLE {DB_TBL}
USING iceberg
PARTITIONED BY (year, month)
LOCATION '{ICEBERG_LOC}'
AS SELECT * FROM tmp ORDER BY year, month
""")
# Verify the data
prod_table = f"{CATALOG}.{DB_TBL}"
spark.table(prod_table).groupBy("year", "month").count().show()()
+----+-----+-----+
|year|month|count|
+----+-----+-----+
|2016| 12| 8011|
|2016| 9| 7988|
|2016| 8| 8218|
|2016| 7| 8246|
|2016| 6| 8008|
|2016| 5| 8296|
|2019| 10| 9772|
|2018| 2| 8667|
|2018| 1| 9482|
|2021| 2| 8517|
+----+-----+-----+
WAP Implementation
Further steps show how to implement different stages in WAP i.e. Write — Audit — Publish with a bit of explanation.
Write
Create the stg and audit branches.
stg_branch = "stg"
audit_branch = "audit"
spark.sql(f"ALTER TABLE {prod_table} CREATE BRANCH {stg_branch}")
spark.sql(f"ALTER TABLE {prod_table} CREATE BRANCH {audit_branch}")
spark.sql(f"SELECT * FROM {prod_table}.refs").show()
+-----+------+-----------------+-----------------------+---------------------+----------------------+
| name| type| snapshot_id|max_reference_age_in_ms|min_snapshots_to_keep|max_snapshot_age_in_ms|
+-----+------+-----------------+-----------------------+---------------------+----------------------+
| main|BRANCH|57787198217408245| NULL| NULL| NULL|
|audit|BRANCH|57787198217408245| NULL| NULL| NULL|
| stg|BRANCH|57787198217408245| NULL| NULL| NULL|
+-----+------+-----------------+-----------------------+---------------------+----------------------+
2. Load the Delta data and store it in the audit branch.
europe_data_delta_df = spark.read.format("csv") \
.option("header", "true") \
.option("inferSchema", "true") \
.load(f"s3://{s3_bucket_name}/raw_data/air_traffic_europe/delta/")
europe_data_delta_df = clean_df(europe_data_delta_df)
europe_data_delta_df.write \
.format("iceberg") \
.mode("append") \
.option("BRANCH", audit_branch) \
.save(prod_table)
spark.read.option("BRANCH", audit_branch).table(prod_table).count()
983842
Audit
To audit the data, we will use AWS Deequ. We will verify that the year, month, flt_date, and apt_icao fields do not contain any null values.
check = Check(spark, CheckLevel.Error, "Not Null Check") \
.isComplete("year") \
.isComplete("month") \
.isComplete("flt_date") \
.isComplete("apt_icao")
# Run Deequ Verification Suite
verification_result = VerificationSuite(spark) \
.onData(europe_data_delta_df) \
.addCheck(check) \
.run()
VerificationResult.checkResultsAsDataFrame(spark, verification_result).show()
+--------------+-----------+------------+--------------------+-----------------+--------------------+
| check|check_level|check_status| constraint|constraint_status| constraint_message|
+--------------+-----------+------------+--------------------+-----------------+--------------------+
|Not Null Check| Error| Error|CompletenessConst...| Success| |
|Not Null Check| Error| Error|CompletenessConst...| Success| |
|Not Null Check| Error| Error|CompletenessConst...| Success| |
|Not Null Check| Error| Error|CompletenessConst...| Failure|Value: 0.99996514...|
+--------------+-----------+------------+--------------------+-----------------+--------------------+
It appears that there are records with null values in this table. At this stage, we have several options:
Investigate the source data to understand the origin of these null values.
If the issue is related to the ETL job, we can review the process and implement more robust error-handling measures.
Alert the downstream teams and applications about this issue, allowing them to determine the appropriate course of action based on their use case.
Discuss the situation with stakeholders to decide whether to apply imputation methods, fix, or delete the records.
2. In this case, we will filter out the invalid records and move the valid ones to the stg branch.
df_to_write = europe_data_delta_df.filter(f.col('apt_icao').isNotNull())
df_to_write.write \
.format("iceberg") \
.mode("append") \
.option("BRANCH", stg_branch) \
.save(prod_table)
spark.read.option("BRANCH", stg_branch).table(prod_table).count()
983838
Publish
This stage allows data to be made available to the production tables for consumption by the downstream analytical applications.
To publish the branch to prod, we can use iceberg’s fast_forward procedure.
spark.sql(f"""CALL glue_catalog.system.fast_forward('{prod_table}', 'main', '{stg_branch}')""")
983838
2. Delete the audit and stg branch since it’s not needed anymore.
spark.sql(f"ALTER TABLE {prod_table} DROP BRANCH {stg_branch}")
spark.sql(f"ALTER TABLE {prod_table} DROP BRANCH {audit_branch}")
# Verify the branches are deleted
spark.sql(f"SELECT * FROM {prod_table}.refs").show()
+----+------+-------------------+-----------------------+---------------------+----------------------+
|name| type| snapshot_id|max_reference_age_in_ms|min_snapshots_to_keep|max_snapshot_age_in_ms|
+----+------+-------------------+-----------------------+---------------------+----------------------+
|main|BRANCH|8039561082908667724| NULL| NULL| NULL|
+----+------+-------------------+-----------------------+---------------------+----------------------+
Need Assistance?
To learn more about how Innablr can help with your data needs, feel free to reach out to us. Our Data & AI team is ready to help you out with your business challenges.
Conclusion
By leveraging Apache Iceberg’s branching features, the Write-Audit-Publish pattern provides a structured and scalable approach to maintaining data integrity. Ensuring data is validated before entering production helps reduce downstream errors, enhances the overall reliability of data-driven decisions, and fosters confidence in AI and analytics applications.
As organisations continue to scale their data infrastructure, adopting best practices like WAP will be crucial in maintaining high data quality standards. Investing in proactive data validation mechanisms not only improves operational efficiency but also protects businesses from costly errors and misinformed strategic decisions.