I’m a beginner with trino and iceberg, so please forgive me if I get my terminology wrong. I’m hoping I just don’t have the right syntax for optimizing joins involving partitions.
I have an iceberg table with >1B rows of process data, partitioned by the day of a timestamp column (~2 years of days) and process machine (a string identifier with only a few values). When I try to merge new data into this table from an staging table with identical columns, it seems like the partitions aren’t being used to prune files even though I include them in the ON clause. The staging table is in a memory connector database.
Here’s a summary of what I’ve tried:
- When we try to merge into the table using the target table’s partitioned columns first in the ON clause, it still appears to be loading from all partitions even though the date range of the merge source should be able to eliminate nearly all the partitions
- As a workaround, I joined the target table to the staging table and insert the rows that don’t have a match in the target. This works for historical data, but we want to use merge once we get to the current data so we can give the source systems the opportunity to correct data for a few days.
- Even with the join, I had to explicitly limit the reference_time in the target table to get the query to fit within our 96GB query memory limit, but with this workaround it is now fast.
Here’s the beginning of the merge statement we were using:
MERGE INTO iceberg.default.target_table as tgt
USING scratch.default.staging_table as src
ON tgt.reference_time = src.reference_time
AND tgt.process_unit = src.process_unit
and tgt.query_name=src.query_name
and tgt.match_check_hash=src.match_check_hash
when not matched then […]
target_table was created with
partitioning = ARRAY ['day(reference_time)','process_unit'])
and the relevant table columns are
reference_time timestamp NOT null
process_unit varchar not null
Checking the beginning of the table’s $partitions shows that the partitions are working as expected (except in the documentation it suggests that the partitioning will be by the integer days since 1970):
partition | record_count | file_count | total_size |
---|---|---|---|
{reference_time_day=2023-01-01, process_unit=Common} | 100,608 | 1 | 7,466,316 |
{reference_time_day=2023-01-01, process_unit=EAF4} | 668,986 | 3 | 48,386,371 |
{reference_time_day=2023-01-01, process_unit=EAF5} | 638,398 | 4 | 45,770,825 |
{reference_time_day=2023-01-02, process_unit=Common} | 74,550 | 1 | 5,581,342 |
{reference_time_day=2023-01-02, process_unit=EAF4} | 639,702 | 3 | 46,267,678 |
{reference_time_day=2023-01-02, process_unit=EAF5} | 460,367 | 2 | 31,834,076 |
{reference_time_day=2023-01-03, process_unit=Common} | 120,048 | 1 | 8,926,620 |
My initial workaround that improved performance at the 100 million row level was to perform something similar to the steps in the merge/when not matched by joining in the target table and explicitly limiting the time range:
create table scratch.default.final_staging
as
select <source columns>, tv.match_check_hash as tgt_match_check_hash
from scratch.default.staged_values as sv
left join iceberg.default.target_table as tv
on sv.match_check_hash=tv.match_check_hash
and sv.reference_time = tv.reference_time
AND sv.process_unit = tv.process_unit
and sv.query_name=tv.query_name
and sv.match_check_hash=tv.match_check_hash
WHERE
date_trunc('day', tv.reference_time) >= date_trunc('day',timestamp '2025-01-01'))
and date_trunc('day', tv.reference_time) <= date_trunc('day',timestamp '2025-01-01')
When using this on the 1 billion row level, it ended up exceeding the 96GB query memory limit until I moved the target table and its where clause into a subselect:
create table scratch.default.final_staging
as
select <source columns>, tv.match_check_hash as tgt_match_check_hash
from scratch.default.staged_values as sv
left join (select match_check_hash, reference_time, process_unit, query_name
from iceberg.default.target_table
WHERE
date_trunc('day', tv.reference_time) >= date_trunc('day',timestamp '2025-01-01'))
and date_trunc('day', tv.reference_time) <= date_trunc('day',timestamp '2025-01-01')
) as tv
on sv.match_check_hash=tv.match_check_hash
and sv.reference_time = tv.reference_time
AND sv.process_unit = tv.process_unit
and sv.query_name=tv.query_name
and sv.match_check_hash=tv.match_check_hash
With this workaround, the peak memory that query was reduced to ~4GB *. Any suggestions how to incorporate the partitioning columns into the merge or join more effectively?
P.S.: Here’s the error for the 96GB query memory limit:
SQL Error [131079]: Query failed (#20250505_183616_00359_iyvsi): Query exceeded per-node memory limit of 96GB [Allocated: 95.99GB, Delta: 9.36MB, Top Consumers: {HashBuilderOperator=95.56GB, ScanFilterAndProjectOperator=234.43MB, LazyOutputBuffer=204.30MB}]
* The final workaround with subselect took ~4GB of peak memory when joining 5 days of data (~7.5M rows) into the ~1.5B row target table and ~8GB of peak memory when joining 10 days of data (~15M rows). The initial workaround that fails with the query memory limit fails when importing one day of data (~1.5M rows) into the ~1.5B row target table. I’ve also tried cast(reference_time as date) for the filters instead of date_trunc with no luck. I also tried copying my staging table into an iceberg table with the same columns and partitioning as the target table with no luck.