top of page

Implementing Write Audit Push (WAP) for Modern Data Pipelines with Iceberg

Writer: Ankur JainAnkur Jain

The Importance of Data Quality

With AI-driven applications taking over the world (or at least your inbox), two truths stand out:

  1. AI is only as smart as the data it’s fed — garbage in, garbage out.

  2. Data applications operate in a constantly changing environment, unlike traditional software, where possibilities are predefined.

Since data teams typically have limited control over how data is generated upstream, the risk of accumulating errors — often termed “data cascades” — is significant. Without proper validation mechanisms, bad data can propagate across the pipeline, leading to erroneous predictions, faulty decision-making, and revenue losses.

Studies show data teams spend 30–40% of their time fixing data quality issues — time that could be spent on more enjoyable things, like automating reports or watching memes.

Hence, an effective mechanism to validate data at certain checkpoints is crucial. This is where the write-audit-publish (WAP) pattern comes into play.


What is Write-Audit-Publish (WAP)?

The WAP pattern is like quality control for data — it makes sure nothing shady gets into production. Originally popularised by Netflix in 2017 (because even streaming giants hate bad data), WAP has become a go-to strategy for keeping data squeaky clean.


WAP Flowchart
WAP Flowchart

1. Branch Creation

An audit branch (e.g., audit) is created from the production Iceberg table (main). This temporary branch allows modifications without impacting production data, enabling teams to test and refine changes safely before committing them.

2. Data Ingestion

New data is ingested into the audit branch using engines like Apache Spark.This approach isolates production data from potential errors while ensuring seamless updates. Incremental ingestion can be automated to maintain low latency and ensure data freshness.

3. Data Validation

Data validation is performed using tools such as Great Expectations or AWS Deequ. Validation checks include schema enforcement, duplicate detection, and statistical anomaly detection to uphold data integrity.Any inconsistencies are flagged early, allowing data engineers to investigate and resolve issues proactively.

4. Handling Validation Failures

If validation fails, the audit branch is discarded without affecting production. Alerts and monitoring systems (e.g., Monitors, Pagers) notify teams of issues in real time.Valid records can be moved to a staging branch before being published to production.

5. Publishing to Production

Once the data passes validation, it is merged into the production branch using Iceberg’s fast_forward command. This ensures that only high-quality data is integrated into downstream analytics, machine learning models, and business intelligence dashboards.


Write-Audit-Publish (WAP) using Iceberg Branches

We will implement a WAP pattern using Iceberg branches on AWS. To achieve this, we will utilise the Air Traffic Data along with the following tools:

  • AWS Glue Notebooks

  • AWS Glue Catalog

  • Amazon S3

  • AWS Deequ

This demo does not include IAM permissions required for the setup, as they are available in the AWS documentation.


Setting up AWS Glue Notebook

  1. Create a new database in AWS glue catalog named air_traffic

  2. Download the following jars from the iceberg release page and upload them to the s3 bucket at s3://<iceberg_s3_bucket>/jars/

    1. 1.8.0 Spark 3.5_with Scala 2.12 runtime Jar

    2. 1.8.0 aws-bundle Jar

  3. On the Jupyter Notebook that you created in AWS Glue Studio, run the following cell to use Iceberg with Glue

%idle_timeout 2880
%glue_version 5.0
%worker_type G.1X
%number_of_workers 5
%extra_jars s3://<iceberg_s3_bucket>/jars/iceberg-aws-bundle-1.8.0.jar,s3://<iceberg_s3_bucket>/jars/iceberg-spark-runtime-3.5_2.12-1.8.0.jar
%additional_python_modules pydeequ==1.4.0
%%configure 
{"--conf": "spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions"}
  1. Initialise the SparkSession by running the following cell

import os
import pyspark.sql.functions as f
from pyspark.sql import SparkSession
from pydeequ.checks import *
from pydeequ.verification import *

os.environ['SPARK_VERSION'] = '3.5'

CATALOG = 'glue_catalog' # Iceberg Catalog

# Iceberg configuration
spark = SparkSession.builder \
    .config(f"spark.sql.catalog.{CATALOG}", "org.apache.iceberg.spark.SparkCatalog") \
    .config(f"spark.sql.catalog.{CATALOG}.type", "glue") \
    .config("spark.sql.extensions","org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \
    .config("spark.sql.defaultCatalog", CATALOG)\
    .config("spark.sql.execution.arrow.pyspark.enabled", "true")\
    .getOrCreate()

Preparing Table and Data for WAP

  1. Load the Air Traffic data into a PySpark DataFrame and apply basic data cleaning operations.

def clean_df(df):
    for column in df.columns:
        df = df.withColumnRenamed(column, column.lower())
    df = df.withColumnRenamed("month_num", "month")
    return df

europe_data_df = spark.read \
    .format("csv") \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .load(f"s3://{s3_bucket_name}/raw_data/air_traffic_europe/inital_load/")
europe_data_df = clean_df(europe_data_df)
europe_data_df.show()
+----+-----+---------+--------+--------+----------+----------+---------+---------+---------+-------------+-------------+-------------+
|year|month|month_mon|flt_date|apt_icao|  apt_name|state_name|flt_dep_1|flt_arr_1|flt_tot_1|flt_dep_ifr_2|flt_arr_ifr_2|flt_tot_ifr_2|
+----+-----+---------+--------+--------+----------+----------+---------+---------+---------+-------------+-------------+-------------+
|2019|    1|      JAN|01-01-19|    LATI|    Tirana|   Albania|       29|       26|       55|         NULL|         NULL|         NULL|
|2019|    1|      JAN|01-01-19|    UDYZ|   Yerevan|   Armenia|       29|       33|       62|         NULL|         NULL|         NULL|
|2019|    1|      JAN|01-01-19|    LOWG|      Graz|   Austria|        5|        7|       12|         NULL|         NULL|         NULL|
|2019|    1|      JAN|01-01-19|    LOWI| Innsbruck|   Austria|       28|       26|       54|         NULL|         NULL|         NULL|
|2019|    1|      JAN|01-01-19|    LOWK|Klagenfurt|   Austria|        4|        4|        8|         NULL|         NULL|         NULL|
|2019|    1|      JAN|01-01-19|    LOWL|      Linz|   Austria|        1|        2|        3|         NULL|         NULL|         NULL|
|2019|    1|      JAN|01-01-19|    LOWS|  Salzburg|   Austria|       22|       26|       48|         NULL|         NULL|         NULL|
|2019|    1|      JAN|01-01-19|    LOWW|    Vienna|   Austria|      257|      232|      489|          257|          230|          487|
|2019|    1|      JAN|01-01-19|    EBAW|   Antwerp|   Belgium|        7|        5|       12|         NULL|         NULL|         NULL|
|2019|    1|      JAN|01-01-19|    EBBR|  Brussels|   Belgium|      214|      205|      419|          214|          208|          422|
+----+-----+---------+--------+--------+----------+----------+---------+---------+---------+-------------+-------------+-------------+
only showing top 10 rows

2. Create an iceberg table from the dataframe

ICEBERG_LOC = f's3://{s3_bucket_name}/warehouse/air_traffic.europe' # Iceberg data location
DB_TBL = 'air_traffic.europe'

europe_data_df.createOrReplaceTempView('tmp')

spark.sql(f"""
CREATE TABLE {DB_TBL}
USING iceberg
PARTITIONED BY (year, month)
LOCATION '{ICEBERG_LOC}'
AS SELECT * FROM tmp ORDER BY year, month
""")
# Verify the data
prod_table = f"{CATALOG}.{DB_TBL}"
spark.table(prod_table).groupBy("year", "month").count().show()()
+----+-----+-----+
|year|month|count|
+----+-----+-----+
|2016|   12| 8011|
|2016|    9| 7988|
|2016|    8| 8218|
|2016|    7| 8246|
|2016|    6| 8008|
|2016|    5| 8296|
|2019|   10| 9772|
|2018|    2| 8667|
|2018|    1| 9482|
|2021|    2| 8517|
+----+-----+-----+

WAP Implementation

Further steps show how to implement different stages in WAP i.e. Write — Audit — Publish with a bit of explanation.

Write
  1. Create the stg and audit branches.

stg_branch = "stg"
audit_branch = "audit"
spark.sql(f"ALTER TABLE {prod_table} CREATE BRANCH {stg_branch}")
spark.sql(f"ALTER TABLE {prod_table} CREATE BRANCH {audit_branch}")
spark.sql(f"SELECT * FROM {prod_table}.refs").show()
+-----+------+-----------------+-----------------------+---------------------+----------------------+
| name|  type|      snapshot_id|max_reference_age_in_ms|min_snapshots_to_keep|max_snapshot_age_in_ms|
+-----+------+-----------------+-----------------------+---------------------+----------------------+
| main|BRANCH|57787198217408245|                   NULL|                 NULL|                  NULL|
|audit|BRANCH|57787198217408245|                   NULL|                 NULL|                  NULL|
|  stg|BRANCH|57787198217408245|                   NULL|                 NULL|                  NULL|
+-----+------+-----------------+-----------------------+---------------------+----------------------+

2. Load the Delta data and store it in the audit branch.

europe_data_delta_df = spark.read.format("csv") \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .load(f"s3://{s3_bucket_name}/raw_data/air_traffic_europe/delta/")
europe_data_delta_df = clean_df(europe_data_delta_df)

europe_data_delta_df.write \
    .format("iceberg") \
    .mode("append") \
    .option("BRANCH", audit_branch) \
    .save(prod_table)
spark.read.option("BRANCH", audit_branch).table(prod_table).count()
983842
Audit
  1. To audit the data, we will use AWS Deequ. We will verify that the year, month, flt_date, and apt_icao fields do not contain any null values.

check = Check(spark, CheckLevel.Error, "Not Null Check") \
    .isComplete("year") \
    .isComplete("month") \
    .isComplete("flt_date") \
    .isComplete("apt_icao")

# Run Deequ Verification Suite
verification_result = VerificationSuite(spark) \
    .onData(europe_data_delta_df) \
    .addCheck(check) \
    .run()

VerificationResult.checkResultsAsDataFrame(spark, verification_result).show()
+--------------+-----------+------------+--------------------+-----------------+--------------------+
|         check|check_level|check_status|          constraint|constraint_status|  constraint_message|
+--------------+-----------+------------+--------------------+-----------------+--------------------+
|Not Null Check|      Error|       Error|CompletenessConst...|          Success|                    |
|Not Null Check|      Error|       Error|CompletenessConst...|          Success|                    |
|Not Null Check|      Error|       Error|CompletenessConst...|          Success|                    |
|Not Null Check|      Error|       Error|CompletenessConst...|          Failure|Value: 0.99996514...|
+--------------+-----------+------------+--------------------+-----------------+--------------------+

It appears that there are records with null values in this table. At this stage, we have several options:

  • Investigate the source data to understand the origin of these null values.

  • If the issue is related to the ETL job, we can review the process and implement more robust error-handling measures.

  • Alert the downstream teams and applications about this issue, allowing them to determine the appropriate course of action based on their use case.

  • Discuss the situation with stakeholders to decide whether to apply imputation methods, fix, or delete the records.

2. In this case, we will filter out the invalid records and move the valid ones to the stg branch.

df_to_write = europe_data_delta_df.filter(f.col('apt_icao').isNotNull())
df_to_write.write \
        .format("iceberg") \
        .mode("append") \
        .option("BRANCH", stg_branch) \
        .save(prod_table)
spark.read.option("BRANCH", stg_branch).table(prod_table).count()
983838
Publish

This stage allows data to be made available to the production tables for consumption by the downstream analytical applications.

  1. To publish the branch to prod, we can use iceberg’s fast_forward procedure.

spark.sql(f"""CALL glue_catalog.system.fast_forward('{prod_table}', 'main', '{stg_branch}')""")
spark.read.table(prod_table).count()
983838

2. Delete the audit and stg branch since it’s not needed anymore.

spark.sql(f"ALTER TABLE {prod_table} DROP BRANCH {stg_branch}")
spark.sql(f"ALTER TABLE {prod_table} DROP BRANCH {audit_branch}")
# Verify the branches are deleted
spark.sql(f"SELECT * FROM {prod_table}.refs").show()
+----+------+-------------------+-----------------------+---------------------+----------------------+
|name|  type|        snapshot_id|max_reference_age_in_ms|min_snapshots_to_keep|max_snapshot_age_in_ms|
+----+------+-------------------+-----------------------+---------------------+----------------------+
|main|BRANCH|8039561082908667724|                   NULL|                 NULL|                  NULL|
+----+------+-------------------+-----------------------+---------------------+----------------------+

Need Assistance?

To learn more about how Innablr can help with your data needs, feel free to reach out to us. Our Data & AI team is ready to help you out with your business challenges.


Conclusion

By leveraging Apache Iceberg’s branching features, the Write-Audit-Publish pattern provides a structured and scalable approach to maintaining data integrity. Ensuring data is validated before entering production helps reduce downstream errors, enhances the overall reliability of data-driven decisions, and fosters confidence in AI and analytics applications.

As organisations continue to scale their data infrastructure, adopting best practices like WAP will be crucial in maintaining high data quality standards. Investing in proactive data validation mechanisms not only improves operational efficiency but also protects businesses from costly errors and misinformed strategic decisions.



bottom of page