Encouraging date partition use in iceberg merge and join

I’m a beginner with trino and iceberg, so please forgive me if I get my terminology wrong. I’m hoping I just don’t have the right syntax for optimizing joins involving partitions.

I have an iceberg table with >1B rows of process data, partitioned by the day of a timestamp column (~2 years of days) and process machine (a string identifier with only a few values). When I try to merge new data into this table from an staging table with identical columns, it seems like the partitions aren’t being used to prune files even though I include them in the ON clause. The staging table is in a memory connector database.

Here’s a summary of what I’ve tried:

  • When we try to merge into the table using the target table’s partitioned columns first in the ON clause, it still appears to be loading from all partitions even though the date range of the merge source should be able to eliminate nearly all the partitions
  • As a workaround, I joined the target table to the staging table and insert the rows that don’t have a match in the target. This works for historical data, but we want to use merge once we get to the current data so we can give the source systems the opportunity to correct data for a few days.
  • Even with the join, I had to explicitly limit the reference_time in the target table to get the query to fit within our 96GB query memory limit, but with this workaround it is now fast.

Here’s the beginning of the merge statement we were using:

MERGE INTO iceberg.default.target_table as tgt
USING scratch.default.staging_table as src
ON tgt.reference_time = src.reference_time
AND tgt.process_unit = src.process_unit
and tgt.query_name=src.query_name
and tgt.match_check_hash=src.match_check_hash
when not matched then […]

target_table was created with

partitioning = ARRAY ['day(reference_time)','process_unit'])

and the relevant table columns are

reference_time timestamp NOT null
process_unit varchar not null 

Checking the beginning of the table’s $partitions shows that the partitions are working as expected (except in the documentation it suggests that the partitioning will be by the integer days since 1970):

partition record_count file_count total_size
{reference_time_day=2023-01-01, process_unit=Common} 100,608 1 7,466,316
{reference_time_day=2023-01-01, process_unit=EAF4} 668,986 3 48,386,371
{reference_time_day=2023-01-01, process_unit=EAF5} 638,398 4 45,770,825
{reference_time_day=2023-01-02, process_unit=Common} 74,550 1 5,581,342
{reference_time_day=2023-01-02, process_unit=EAF4} 639,702 3 46,267,678
{reference_time_day=2023-01-02, process_unit=EAF5} 460,367 2 31,834,076
{reference_time_day=2023-01-03, process_unit=Common} 120,048 1 8,926,620

My initial workaround that improved performance at the 100 million row level was to perform something similar to the steps in the merge/when not matched by joining in the target table and explicitly limiting the time range:

create table scratch.default.final_staging
as
select <source columns>, tv.match_check_hash as tgt_match_check_hash
from scratch.default.staged_values as sv
left join iceberg.default.target_table as tv
on sv.match_check_hash=tv.match_check_hash
    and sv.reference_time = tv.reference_time
    AND sv.process_unit = tv.process_unit
    and sv.query_name=tv.query_name
    and sv.match_check_hash=tv.match_check_hash
WHERE
date_trunc('day', tv.reference_time) >= date_trunc('day',timestamp '2025-01-01'))
and date_trunc('day', tv.reference_time) <= date_trunc('day',timestamp '2025-01-01')

When using this on the 1 billion row level, it ended up exceeding the 96GB query memory limit until I moved the target table and its where clause into a subselect:

create table scratch.default.final_staging
as
select <source columns>, tv.match_check_hash as tgt_match_check_hash
from scratch.default.staged_values as sv
left join (select match_check_hash, reference_time, process_unit, query_name
              from iceberg.default.target_table
              WHERE
              date_trunc('day', tv.reference_time) >= date_trunc('day',timestamp '2025-01-01'))
              and date_trunc('day', tv.reference_time) <= date_trunc('day',timestamp '2025-01-01')
              ) as tv
on sv.match_check_hash=tv.match_check_hash
    and sv.reference_time = tv.reference_time
    AND sv.process_unit = tv.process_unit
    and sv.query_name=tv.query_name
    and sv.match_check_hash=tv.match_check_hash

With this workaround, the peak memory that query was reduced to ~4GB *. Any suggestions how to incorporate the partitioning columns into the merge or join more effectively?

P.S.: Here’s the error for the 96GB query memory limit:

SQL Error [131079]: Query failed (#20250505_183616_00359_iyvsi): Query exceeded per-node memory limit of 96GB [Allocated: 95.99GB, Delta: 9.36MB, Top Consumers: {HashBuilderOperator=95.56GB, ScanFilterAndProjectOperator=234.43MB, LazyOutputBuffer=204.30MB}]

* The final workaround with subselect took ~4GB of peak memory when joining 5 days of data (~7.5M rows) into the ~1.5B row target table and ~8GB of peak memory when joining 10 days of data (~15M rows). The initial workaround that fails with the query memory limit fails when importing one day of data (~1.5M rows) into the ~1.5B row target table. I’ve also tried cast(reference_time as date) for the filters instead of date_trunc with no luck. I also tried copying my staging table into an iceberg table with the same columns and partitioning as the target table with no luck.

I see that in the ON clause you do have your target table on the left and doubting it could be as easy as what’s being called out in Trino Merge query scanning complete target table. · Issue #23313 · trinodb/trino · GitHub, but it can be quickly tested by adding this before you query.

SET SESSION join_distribution_type = 'PARTITIONED'

If that doesn’t help (and as the issue link above suggests) try changing the following property (it doesn’t seem to have a session equivalent).

iceberg.dynamic-filtering.wait-timeout=10s

Let’s us know what that does and if not any better, can you attach at copy of the EXPLAIN ANALYZE output?

Thanks @lester, that link and your comment held the key. While setting the join_distribution type didn’t seem to help, flipping the join to a right join completely removed the need to include a where clause to prune the target table before joining. Now it finishes in a few seconds and only uses 2.5GB peak memory for the 15M row source join into 1.5B row target table.

Thanks for your help!

Here’s the EXPLAIN (TYPE LOGICAL) output for Target RIGHT JOIN Source with just one column from each table in the select (table names are different in my actual database):

select src.match_check_hash, tgt.match_check_hash from iceberg.bigdata.long_values as tgt
right join scratch.ahmad.wonderware_altogether as src
			on cast(tgt.reference_time as date) = cast(src.reference_time as date)
			and tgt.reference_time = src.reference_time
		   	AND tgt.process_unit = src.process_unit
			and tgt.query_name=src.query_name
			and tgt.match_check_hash=src.match_check_hash

Trino version: 471
Output[columnNames = [match_check_hash, match_check_hash]]
│ Layout: [match_check_hash_31:varbinary, match_check_hash:varbinary]
│ Estimates: {rows: ? (?), cpu: 0, memory: 0B, network: 0B}
│ match_check_hash := match_check_hash_31
└─ RightJoin[criteria = (expr = expr_33) AND (reference_time = reference_time_9) AND (process_unit = process_unit_8) AND (query_name = query_name_0) AND (match_check_hash = match_check_hash_31), distribution = PARTITIONED]
│ Layout: [match_check_hash:varbinary, match_check_hash_31:varbinary]
│ Estimates: {rows: ? (?), cpu: ?, memory: 2.56GB, network: 0B}
│ Distribution: PARTITIONED
│ dynamicFilterAssignments = {reference_time_9 → #df_405, process_unit_8 → #df_406, query_name_0 → #df_407, match_check_hash_31 → #df_408}
├─ RemoteExchange[type = REPARTITION]
│ │ Layout: [query_name:varchar, process_unit:varchar, reference_time:timestamp(6), match_check_hash:varbinary, expr:date]
│ │ Estimates: {rows: 48873 (3.47MB), cpu: 3.47M, memory: 0B, network: 3.47MB}
│ └─ ScanFilterProject[table = iceberg:bigdata.long_values$data@2116824150562516725, filterPredicate = (CAST(reference_time AS date) = CAST(reference_time AS date)), dynamicFilters = {reference_time = #df_405, process_unit = #df_406, query_name = #df_407, match_check_hash = #df_408}]
│ Layout: [query_name:varchar, process_unit:varchar, reference_time:timestamp(6), match_check_hash:varbinary, expr:date]
│ Estimates: {rows: 1426832841 (98.95GB), cpu: 92.30G, memory: 0B, network: 0B}/{rows: 48873 (3.47MB), cpu: 92.30G, memory: 0B, network: 0B}/{rows: 48873 (3.47MB), cpu: 3.47M, memory: 0B, network: 0B}
│ expr := CAST(reference_time AS date)
│ query_name := 1:query_name:varchar
│ match_check_hash := 32:match_check_hash:varbinary
│ reference_time := 10:reference_time:timestamp(6)
│ process_unit := 9:process_unit:varchar
└─ LocalExchange[partitioning = HASH, arguments = [expr_33::date, reference_time_9::timestamp(6), process_unit_8::varchar, query_name_0::varchar, match_check_hash_31::varbinary]]
│ Layout: [query_name_0:varchar, process_unit_8:varchar, reference_time_9:timestamp(6), match_check_hash_31:varbinary, expr_33:date]
│ Estimates: {rows: 15363436 (2.56GB), cpu: 2.56G, memory: 0B, network: 0B}
└─ RemoteExchange[type = REPARTITION]
│ Layout: [query_name_0:varchar, process_unit_8:varchar, reference_time_9:timestamp(6), match_check_hash_31:varbinary, expr_33:date]
│ Estimates: {rows: 15363436 (2.56GB), cpu: 2.56G, memory: 0B, network: 2.56GB}
└─ ScanProject[table = scratch:38]
Layout: [query_name_0:varchar, process_unit_8:varchar, reference_time_9:timestamp(6), match_check_hash_31:varbinary, expr_33:date]
Estimates: {rows: 15363436 (2.56GB), cpu: 2.49G, memory: 0B, network: 0B}/{rows: 15363436 (2.56GB), cpu: 2.56G, memory: 0B, network: 0B}
expr_33 := CAST(reference_time_9 AS date)
reference_time_9 := 9
match_check_hash_31 := 31
process_unit_8 := 8
query_name_0 := 0

and here’s the output for Source LEFT JOIN Target with the identical ON clause :

select src.match_check_hash, tgt.match_check_hash from scratch.ahmad.wonderware_altogether as src
left join iceberg.bigdata.long_values as tgt
			on cast(tgt.reference_time as date) = cast(src.reference_time as date)
			and tgt.reference_time = src.reference_time
		   	AND tgt.process_unit = src.process_unit
			and tgt.query_name=src.query_name
			and tgt.match_check_hash=src.match_check_hash

Trino version: 471
Output[columnNames = [match_check_hash, match_check_hash]]
│ Layout: [match_check_hash:varbinary, match_check_hash_31:varbinary]
│ Estimates: {rows: ? (?), cpu: 0, memory: 0B, network: 0B}
│ match_check_hash := match_check_hash_31
└─ LeftJoin[criteria = (expr = expr_33) AND (reference_time = reference_time_9) AND (process_unit = process_unit_8) AND (query_name = query_name_0) AND (match_check_hash = match_check_hash_31), distribution = PARTITIONED]
│ Layout: [match_check_hash:varbinary, match_check_hash_31:varbinary]
│ Estimates: {rows: ? (?), cpu: ?, memory: 3.47MB, network: 0B}
│ Distribution: PARTITIONED
├─ RemoteExchange[type = REPARTITION]
│ │ Layout: [query_name:varchar, process_unit:varchar, reference_time:timestamp(6), match_check_hash:varbinary, expr:date]
│ │ Estimates: {rows: 15363436 (2.56GB), cpu: 2.56G, memory: 0B, network: 2.56GB}
│ └─ ScanProject[table = scratch:38]
│ Layout: [query_name:varchar, process_unit:varchar, reference_time:timestamp(6), match_check_hash:varbinary, expr:date]
│ Estimates: {rows: 15363436 (2.56GB), cpu: 2.49G, memory: 0B, network: 0B}/{rows: 15363436 (2.56GB), cpu: 2.56G, memory: 0B, network: 0B}
│ expr := CAST(reference_time AS date)
│ query_name := 0
│ match_check_hash := 31
│ reference_time := 9
│ process_unit := 8
└─ LocalExchange[partitioning = HASH, arguments = [expr_33::date, reference_time_9::timestamp(6), process_unit_8::varchar, query_name_0::varchar, match_check_hash_31::varbinary]]
│ Layout: [query_name_0:varchar, process_unit_8:varchar, reference_time_9:timestamp(6), match_check_hash_31:varbinary, expr_33:date]
│ Estimates: {rows: 48873 (3.47MB), cpu: 3.47M, memory: 0B, network: 0B}
└─ RemoteExchange[type = REPARTITION]
│ Layout: [query_name_0:varchar, process_unit_8:varchar, reference_time_9:timestamp(6), match_check_hash_31:varbinary, expr_33:date]
│ Estimates: {rows: 48873 (3.47MB), cpu: 3.47M, memory: 0B, network: 3.47MB}
└─ ScanFilterProject[table = iceberg:bigdata.long_values$data@2116824150562516725, filterPredicate = (CAST(reference_time_9 AS date) = CAST(reference_time_9 AS date))]
Layout: [query_name_0:varchar, process_unit_8:varchar, reference_time_9:timestamp(6), match_check_hash_31:varbinary, expr_33:date]
Estimates: {rows: 1426832841 (98.95GB), cpu: 92.30G, memory: 0B, network: 0B}/{rows: 48873 (3.47MB), cpu: 92.30G, memory: 0B, network: 0B}/{rows: 48873 (3.47MB), cpu: 3.47M, memory: 0B, network: 0B}
expr_33 := CAST(reference_time_9 AS date)
reference_time_9 := 10:reference_time:timestamp(6)
match_check_hash_31 := 32:match_check_hash:varbinary
process_unit_8 := 9:process_unit:varchar
query_name_0 := 1:query_name:varchar

1 Like

well here’s something fun: it seems that trino has figured out how to be fast on both the left join and the right join. I’m not sure if something changed in its view of the data, but now both are fast and both are optimized to Right Join in the explain output. I’m even able to take out the cast reference_time as date from the on clause.

This is after a trino restart, without changing the session join setting from auto.

RIGHT JOIN Version

select src.match_check_hash, tgt.match_check_hash from iceberg.bigdata.long_values as tgt
right join scratch.ahmad.wonderware_altogether as src
			on tgt.reference_time = src.reference_time
		   	AND tgt.process_unit = src.process_unit
			and tgt.query_name=src.query_name
			and tgt.match_check_hash=src.match_check_hash

Trino version: 471
Output[columnNames = [match_check_hash, match_check_hash]]
│ Layout: [match_check_hash_31:varbinary, match_check_hash:varbinary]
│ Estimates: {rows: ? (?), cpu: 0, memory: 0B, network: 0B}
│ match_check_hash := match_check_hash_31
└─ RightJoin[criteria = (reference_time = reference_time_9) AND (process_unit = process_unit_8) AND (query_name = query_name_0) AND (match_check_hash = match_check_hash_31), distribution = PARTITIONED]
│ Layout: [match_check_hash:varbinary, match_check_hash_31:varbinary]
│ Estimates: {rows: ? (?), cpu: ?, memory: 2.49GB, network: 0B}
│ Distribution: PARTITIONED
│ dynamicFilterAssignments = {reference_time_9 → #df_245, process_unit_8 → #df_246, query_name_0 → #df_247, match_check_hash_31 → #df_248}
├─ RemoteExchange[type = REPARTITION]
│ │ Layout: [query_name:varchar, process_unit:varchar, reference_time:timestamp(6), match_check_hash:varbinary]
│ │ Estimates: {rows: 1426832841 (92.30GB), cpu: 92.30G, memory: 0B, network: 92.30GB}
│ └─ ScanFilter[table = iceberg:bigdata.long_values$data@2116824150562516725, dynamicFilters = {reference_time = #df_245, process_unit = #df_246, query_name = #df_247, match_check_hash = #df_248}]
│ Layout: [query_name:varchar, process_unit:varchar, reference_time:timestamp(6), match_check_hash:varbinary]
│ Estimates: {rows: 1426832841 (92.30GB), cpu: 92.30G, memory: 0B, network: 0B}/{rows: 1426832841 (92.30GB), cpu: 92.30G, memory: 0B, network: 0B}
│ query_name := 1:query_name:varchar
│ match_check_hash := 32:match_check_hash:varbinary
│ reference_time := 10:reference_time:timestamp(6)
│ process_unit := 9:process_unit:varchar
└─ LocalExchange[partitioning = HASH, arguments = [reference_time_9::timestamp(6), process_unit_8::varchar, query_name_0::varchar, match_check_hash_31::varbinary]]
│ Layout: [query_name_0:varchar, process_unit_8:varchar, reference_time_9:timestamp(6), match_check_hash_31:varbinary]
│ Estimates: {rows: 15363436 (2.49GB), cpu: 2.49G, memory: 0B, network: 0B}
└─ RemoteExchange[type = REPARTITION]
│ Layout: [query_name_0:varchar, process_unit_8:varchar, reference_time_9:timestamp(6), match_check_hash_31:varbinary]
│ Estimates: {rows: 15363436 (2.49GB), cpu: 2.49G, memory: 0B, network: 2.49GB}
└─ TableScan[table = scratch:33]
Layout: [query_name_0:varchar, process_unit_8:varchar, reference_time_9:timestamp(6), match_check_hash_31:varbinary]
Estimates: {rows: 15363436 (2.49GB), cpu: 2.49G, memory: 0B, network: 0B}
reference_time_9 := 9
match_check_hash_31 := 31
process_unit_8 := 8
query_name_0 := 0

LEFT JOIN Version

select src.match_check_hash, tgt.match_check_hash from scratch.ahmad.wonderware_altogether as src
left join iceberg.bigdata.long_values as tgt
			on tgt.reference_time = src.reference_time
		   	AND tgt.process_unit = src.process_unit
			and tgt.query_name=src.query_name
			and tgt.match_check_hash=src.match_check_hash

Trino version: 471
Output[columnNames = [match_check_hash, match_check_hash]]
│ Layout: [match_check_hash:varbinary, match_check_hash_31:varbinary]
│ Estimates: {rows: ? (?), cpu: 0, memory: 0B, network: 0B}
│ match_check_hash := match_check_hash_31
└─ RightJoin[criteria = (reference_time_9 = reference_time) AND (process_unit_8 = process_unit) AND (query_name_0 = query_name) AND (match_check_hash_31 = match_check_hash), distribution = PARTITIONED]
│ Layout: [match_check_hash_31:varbinary, match_check_hash:varbinary]
│ Estimates: {rows: ? (?), cpu: ?, memory: 2.49GB, network: 0B}
│ Distribution: PARTITIONED
│ dynamicFilterAssignments = {reference_time → #df_245, process_unit → #df_246, query_name → #df_247, match_check_hash → #df_248}
├─ RemoteExchange[type = REPARTITION]
│ │ Layout: [query_name_0:varchar, process_unit_8:varchar, reference_time_9:timestamp(6), match_check_hash_31:varbinary]
│ │ Estimates: {rows: 1426832841 (92.30GB), cpu: 92.30G, memory: 0B, network: 92.30GB}
│ └─ ScanFilter[table = iceberg:bigdata.long_values$data@2116824150562516725, dynamicFilters = {reference_time_9 = #df_245, process_unit_8 = #df_246, query_name_0 = #df_247, match_check_hash_31 = #df_248}]
│ Layout: [query_name_0:varchar, process_unit_8:varchar, reference_time_9:timestamp(6), match_check_hash_31:varbinary]
│ Estimates: {rows: 1426832841 (92.30GB), cpu: 92.30G, memory: 0B, network: 0B}/{rows: 1426832841 (92.30GB), cpu: 92.30G, memory: 0B, network: 0B}
│ reference_time_9 := 10:reference_time:timestamp(6)
│ match_check_hash_31 := 32:match_check_hash:varbinary
│ process_unit_8 := 9:process_unit:varchar
│ query_name_0 := 1:query_name:varchar
└─ LocalExchange[partitioning = HASH, arguments = [reference_time::timestamp(6), process_unit::varchar, query_name::varchar, match_check_hash::varbinary]]
│ Layout: [query_name:varchar, process_unit:varchar, reference_time:timestamp(6), match_check_hash:varbinary]
│ Estimates: {rows: 15363436 (2.49GB), cpu: 2.49G, memory: 0B, network: 0B}
└─ RemoteExchange[type = REPARTITION]
│ Layout: [query_name:varchar, process_unit:varchar, reference_time:timestamp(6), match_check_hash:varbinary]
│ Estimates: {rows: 15363436 (2.49GB), cpu: 2.49G, memory: 0B, network: 2.49GB}
└─ TableScan[table = scratch:33]
Layout: [query_name:varchar, process_unit:varchar, reference_time:timestamp(6), match_check_hash:varbinary]
Estimates: {rows: 15363436 (2.49GB), cpu: 2.49G, memory: 0B, network: 0B}
query_name := 0
match_check_hash := 31
reference_time := 9
process_unit := 8

1 Like

@lester I was beginning to question my sanity, but I think I’ve figured it out. In my flurry of testing after the excitement of the right join discovery, I restarted trino to try to recreate the problem. It turned out that the behavior of join_distribution_type=‘PARTITION’ and ‘AUTOMATIC’ for the left and right join queries depended on whether the cast(reference_time as date) part was in the ON clause, and when the query optimizer chose RightJoin, the query was fast.

CAST(reference_time) join_distribution_type SQL Join EXPLAIN Join execution time peak memory
included AUTOMATIC LEFT JOIN LeftJoin Stopped at 2m Stopped at 20GB
included AUTOMATIC RIGHT JOIN LeftJoin Stopped at 2m Stopped at 20GB
included PARTITIONED LEFT JOIN LeftJoin Stopped at 2m Stopped at 20GB
included PARTITIONED RIGHT JOIN RightJoin 22s 2.53GB
excluded AUTOMATIC LEFT JOIN RightJoin 20s 2.44GB
excluded AUTOMATIC RIGHT JOIN RightJoin 20s 2.40GB
excluded PARTITIONED LEFT JOIN LeftJoin Stopped at 2m Stopped at 20GB
excluded PARTITIONED RIGHT JOIN RightJoin 20s 2.36GB

I can put the queries and Explain output into a zip file if anyone wants to explore them further.

I also dusted off the merge query from my query file’s history and it works fast too! I even tried with a different source table with the same layout and got the same result. The only guess I have is that I switched from month partitions to day partitions around the same day that I switched from merge to join and then insert, so I must have changed the partitioning after going away from merge.

1 Like