
If you want to run a modern data operation, especially if it involves AI, you have to solve the problem of Change Data Capture (CDC) at scale. For a long time, the industry default was to move all that transactional data into a centralized warehouse just to keep it up to date. But those projects often run over budget and over time, and with good reason. Today, with AI projects, it’s the same situation. Data centralization of contextual data could work, but it doesn’t for many of the same reasons that all data centralization projects often fail.
Considering a new data architecture
How do you get around this? The answer is interesting. It involves both Trino and Iceberg, and therefore, a reference architecture that we call the Icehouse architecture. We’re going to use another technology, dbt, in our example today. When you pair the speed of Trino, the metadata of Iceberg, and the transformation power of dbt, you get an architecture that can handle complex CDC workloads directly on your data lake. You get the consistency of a warehouse with the optionality and cost profile of a lakehouse.
In this post, we will look at how to use dbt and Trino to process data from Amazon DMS. We will unpack how to handle inserts, updates, and deletes in your CDC stream while maintaining a shared ground truth. Whether you are implementing soft deletes to maintain history or hard deletes for compliance, this approach gives you the flexibility to manage your data exactly where it lives.
Let us dive into the mechanics of building a high-performance CDC pipeline on an open data lakehouse.
Using Amazon DMS data on Trino and Iceberg
First, let’s get started. In this example, we will use data from Amazon DMS to demonstrate how to use dbt and Trino with Iceberg for CDC in a data lake. We’re going to operate on this using Trino and Iceberg.
Note: In this article, we assume that the DMS output is already available on S3.
Step 1 – Reviewing the DMS data in CSV format
The data from DMS is in CSV format for this example. We will have 1 file for the initial load that has the following contents:
I,100,Furniture,Product 1,25,2022-03-01T09:51:39.340396Z I,101,Cosmetic,Product 2,20,2022-03-01T10:14:58.597216Z I,102,Furniture,Product 3,30,2022-03-01T11:51:40.417052Z I,103,Electronics,Product 4,10,2022-03-01T11:51:40.519832Z I,104,Electronics,Product 5,50,2022-03-01T11:58:00.512679Z
Step 2 – Considering what’s needed for CDC
Then we will have multiple CSV files for CDC records. The records scattered through those files are:
I,105,Furniture,Product 5,45,2022-03-02T09:51:39.340396Z I,106,Electronics,Product 6,10,2022-03-02T09:52:39.340396Z U,102,Furniture,Product 3,29,2022-03-02T11:53:40.417052Z U,102,Furniture,Product 3,28,2022-03-02T11:55:40.417052Z D,103,Electronics,Product 4,10,2022-03-02T11:56:40.519832Z
Step 3 – Identifying the requisite operations
Now it’s time to delve a bit deeper. Notice that each row has an op attribute that represents the operation applied to the source record. This includes the following.
- I – insert operations
- U – update operations
- D – delete operations
There’s a reason for this. DMS enables the inclusion of this attribute, which we will use as part of our incremental dbt models.
Step 4 – Schema discovery
Now it’s time to dive deeper and explore the schema involved. To do this, we can either use schema discovery in Starburst Galaxy to automatically create a table from these CSV files, or we can create an external table that reads from the folder on Amazon S3 where the files are stored. We will use an external table for this article.
The table can be created with:
CREATE TABLE products (
op VARCHAR,
product_id VARCHAR,
category VARCHAR,
product_name VARCHAR,
quantity_available VARCHAR,
last_update_time VARCHAR
) WITH (
type = 'hive',
format = 'CSV',
external_location = 's3://posullivdata/dms/products'
)
Step 5 – DMS source model
Now it’s time to get started with the DMS source model. We will have a model named stg_dms__products that reads from the source DMS table. This model will use the incremental materialization type in dbt, so that after the initial load, we will only process new CDC records created by DMS.
We achieve this using the following CTE code.
source as (
select * from {{ source('dms', 'products')}}
{% if is_incremental() %}
where last_update_time > (
select max(last_update_time)
from {{ this }}
)
{% endif %}
)
Step 6 – Ensure that the latest update is applied to each row
This model must also ensure that only the latest update for each row is applied. This is achieved by selecting the last operation for each row in this CTE:
dedup as (
select *
from (
select
*,
row_number() over (
partition by product_id order by last_update_time desc
) as row_num
from source
)
where row_num = 1
)
Step 7 – Staging Models
Now it’s time for staging models. We want to use an efficient incremental materialization for the stg_products model.
The most efficient way to do this with Trino and Iceberg is to have dbt generate a MERGE statement. We are targeting generating a MERGE statement such as in the following code.
MERGE INTO stg_products_merged spm
USING (SELECT * FROM cdc_products) cdc
ON spm.id = cdc.id
WHEN MATCHED AND cdc.op = 'D' THEN DELETE
WHEN MATCHED THEN UPDATE SET ...
WHEN NOT MATCHED THEN INSERT ...
Step 8 – Use dbt to generate a MERGE statement
To have dbt generate a MERGE statement, we set incremental_strategy to the value merge.
Now, based on recommendations from the dbt documentation showing how to write a MERGE statement, we first need to write a CTE for inserts, updates, and deletes. Use the following SQL to do this.
with
updates as (
select
{{
dbt_utils.star(
from=ref('stg_dms__products'),
except=["op"]
)
}},
false "to_delete"
from
{{ ref('stg_dms__products') }}
where op = 'U'
{% if is_incremental() %}
and last_update_time > (
select max(last_update_time)
from {{ this }}
)
{% endif %}
),
deletes as (
select
{{
dbt_utils.star(
from=ref('stg_dms__products'),
except=["op"]
)
}},
true "to_delete"
from
{{ ref('stg_dms__products') }}
where op = 'D'
{% if is_incremental() %}
and last_update_time > (
select max(last_update_time)
from {{ this }}
)
{% endif %}
),
inserts as (
select
{{
dbt_utils.star(
from=ref('stg_dms__products'),
except=["op"]
)
}},
false "to_delete"
from
{{ ref('stg_dms__cdc_products') }}
where op = 'I'
{% if is_incremental() %}
and last_update_time > (
select max(last_update_time)
from {{ this }}
)
{% endif %}
)
Note: You can see that the logic for the updates, inserts, and deletes CTEs is the same, except for the op value. This could be extracted to a macro. Since this is an incremental materialization, we also only care about new rows.
Step 9 – Unionize the CTEs
Finally, we want to unionize the CTEs. dbt will create a temporary view that is joined in the generated MERGE statement. The generated MERGE will look something like:
merge into stg_products_merged as DBT_INTERNAL_DEST
using stg_products_merged__dbt_tmp as DBT_INTERNAL_SOURCE
on (
DBT_INTERNAL_SOURCE.product_id = DBT_INTERNAL_DEST.product_id
)
when matched then update set ...
when not matched then insert …
Step 10 – Further considerations
There are some further considerations:
- The model assumes that the stg_dms__products table contains no duplicate data. This allows us to use UNION ALL instead of UNION, resulting in more efficient query execution in Trino.
- We can make the MERGE statement more efficient and read less data from the target table by including incremental_predicates in our model config. A common strategy is to read only the last 7 days of data from the target table.
Deletions can be handled in 2 ways, which we will discuss next.
Step 11 – Soft Deletes
Now it’s time for the soft deletes. dbt recommends a soft-delete approach, where no data is actually deleted from a model. This is implemented by somehow marking the rows that should be considered deleted.
The CDC data produced by Amazon DMS includes an operation type in the op field for each change operation. Using this field, we can identify insert, update, and delete operations.
First, let’s define the config for our model:
-- depends_on: {{ ref('stg_dms__products') }}
{{
config(
materialized = 'incremental',
unique_key = 'product_id',
incremental_strategy = 'merge',
on_schema_change = 'sync_all_columns'
)
}}
Step 12 – Implement the soft delete
This is going to be an incremental model and use the merge strategy, which means a MERGE statement will be generated.
We create CTEs that correspond to insert, update, and delete operations as shown in the previous section.
To implement the soft delete approach, we add an additional field named to_delete. This is set to false only in the CTE for delete operations.
Now we can build another model, stg_products, which references this model that filters out any data marked as deleted.
{{
config(
materialized = 'view'
)
}}
select
{{
dbt_utils.star(
from=ref('stg_products_merged'),
except=["to_delete"]
)
}}
from
{{ ref('stg_products_merged') }}
where
to_delete = false
Since this model is materialized as a view, there is no additional overhead to using this approach.
Step 13 – Hard deletes
If there is a requirement to delete data from a model, we can do so with a post_hook in the model. We have a model named stg_products_hard_delete that shows this approach.
With this approach, our model is exactly the same, with the addition of the to_delete field. To do this, we just change our model config using the code below.
-- depends_on: {{ ref('stg_dms__cdc_products') }}
{{
config(
materialized = 'incremental',
unique_key = 'product_id',
incremental_strategy = 'merge',
on_schema_change = 'sync_all_columns',
post_hook = [
"DELETE FROM {{ this }} WHERE to_delete = true"
]
)
}}
We now have a post_hook that issues a DELETE statement to remove any data that is marked as deleted.
This DELETE statement can be made faster by adding an extra predicate like:
DELETE FROM {{ this }} WHERE to_delete = true AND last_update_time > date_add('day', -7, current_timestamp)
Step 14 – Trino and Iceberg Considerations
There are certain considerations that involve using this method for Trino and Iceberg. The table properties for an Iceberg table can be specified in the properties map of the config macro for a model in dbt.
The operations referenced above can be called directly from a dbt model in a post_hook. For example, this model calls the expire_snapshots procedure and analyze function after the model is built in a post_hook:
{{
config(
materialized = 'incremental',
properties = {
"partitioning" : "ARRAY['month(last_update_time)']",
"sorted_by" : "ARRAY['last_update_time']"
},
unique_key = 'product_id',
incremental_strategy = 'merge',
on_schema_change = 'sync_all_columns',
post_hook = [
"ALTER TABLE {{ this }} EXECUTE expire_snapshots(retention_threshold => '7d')",
"ANALYZE {{ this }}"
]
)
}}
Taking the next step
Want to take the next step? The dbt project with all models discussed in this post can be found in this GitHub repository.
This post was originally published here



