This post is part of the Iceberg blog series. Read the entire series:
- Introduction to Apache Iceberg in Trino
- Iceberg Partitioning and Performance Optimizations in Trino
- Apache Iceberg DML (update/delete/merge) & Maintenance in Trino
- Apache Iceberg Schema Evolution in Trino
- Apache Iceberg Time Travel & Rollbacks in Trino
- Automated maintenance for Apache Iceberg tables in Starburst Galaxy
- Improving performance with Iceberg sorted tables
- How to migrate your Hive tables to Apache Iceberg
What are sorted tables?
Sorted Iceberg tables can provide a huge increase in performance in query times and can also lead to a decrease in cloud object storage request costs. The table is sorted by at least one column and the data is written to your object storage in files that are sorted by that column or columns.
How do they work?
Below are two tables that show a very simple example of how sorted tables work.
|File Name||Column Name||Min||Max|
|File Name||Column Name||Min||Max|
Now, the following query was issued:
select * from customer_iceberg where custkey = 111029;
First the “not sorted” table, the min and max are checked in the Iceberg manifest files and since they are not sorted and the custkey we’re looking for is 111029, all 3 files need to be read as this value could be found in these files.
Now, for the sorted table, it’s pretty obvious that only file 2 would need to be read. This saves 2 files from being read but now imagine there are 1000s and even millions of files. Having a sorted table could save a very large amount of files from being read which of course would grealty reduce query times.
Creating sorted Iceberg tables
Creating a sorted Iceberg table is easy, you just add sorted_by and the column name(s):
CREATE TABLE catalog_sales_sorted ( cs_sold_date_sk bigint, more columns... ) WITH ( format = 'PARQUET', sorted_by = ARRAY['cs_sold_date_sk'], type = 'ICEBERG' )
Performance and cost benefits
Let’s look at a simple example of how this can greatly improve your performance of queries for your end users and with a bonus on top, save on your cloud storage costs.
In this very basic example, I took a tpc-ds table named catalog_sales and created a sorted version of it on cs_sold_date_sk. I did this as I noticed many tpc-ds queries using this table and column as part of their predicts. Example here.
Here is the simple query based on the non-sorted Iceberg table:
SELECT d.d_date,sum(cs_ext_sales_price) from s3lakehouse.iceberg_ds_sf1000.catalog_sales s, s3lakehouse.iceberg_ds_sf1000.date_dim d WHERE s.cs_sold_date_sk = d.d_date_sk and d_date BETWEEN (cast ('2002-06-01' AS DATE) - INTERVAL '30' day) AND (cast ('2002-06-01' AS date) + INTERVAL '30' day) GROUP BY d.d_date
Here is the query based on the sorted Iceberg table:
SELECT d.d_date,sum(cs_ext_sales_price) from s3lakehouse.iceberg_ds_sf1000.catalog_sales_sorted s, s3lakehouse.iceberg_ds_sf1000.date_dim d WHERE s.cs_sold_date_sk = d.d_date_sk and d_date BETWEEN (Cast ('2002-06-01' AS DATE) - INTERVAL '30' day) AND ( cast ('2002-06-01' AS date) + INTERVAL '30' day) GROUP BY d.d_date
As you can see even from this simple example, the amount of data read is almost half! This can result in a huge amount of performance improvement and reduced costs in object store “gets”.
More complex queries also see a huge performance improvement. For my test, I took a 1TB tpc-ds table named catalog_sales and created a sorted version based on the cs_sold_date_sk column:
create table iceberg_tpcds_sf1000_parquet_sorted.catalog_sales with (format='parquet',sorted_by=ARRAY['cs_sold_date_sk']) as select * from iceberg_tpcds_sf1000_parquet.catalog_sales;
The query I used is a standard tpc-ds one:
use iceberg_tpcds_sf1000_parquet_sorted; with v1 as( select i_category, i_brand, cc_name, d_year, d_moy, sum(cs_sales_price) sum_sales, avg(sum(cs_sales_price)) over (partition by i_category, i_brand, cc_name, d_year) avg_monthly_sales, rank() over (partition by i_category, i_brand, cc_name order by d_year, d_moy) rn from item, catalog_sales, date_dim, call_center where cs_item_sk = i_item_sk and cs_sold_date_sk = d_date_sk and cc_call_center_sk= cs_call_center_sk and ( d_year = 1999 or ( d_year = 1999-1 and d_moy =12) or ( d_year = 1999+1 and d_moy =1) ) group by i_category, i_brand, cc_name , d_year, d_moy), v2 as( select v1.i_category ,v1.d_year, v1.d_moy ,v1.avg_monthly_sales ,v1.sum_sales, v1_lag.sum_sales psum, v1_lead.sum_sales nsum from v1, v1 v1_lag, v1 v1_lead where v1.i_category = v1_lag.i_category and v1.i_category = v1_lead.i_category and v1.i_brand = v1_lag.i_brand and v1.i_brand = v1_lead.i_brand and v1.cc_name = v1_lag.cc_name and v1.cc_name = v1_lead.cc_name and v1.rn = v1_lag.rn + 1 and v1.rn = v1_lead.rn - 1) select * from v2 where d_year = 1999 and avg_monthly_sales > 0 and case when avg_monthly_sales > 0 then abs(sum_sales - avg_monthly_sales) / avg_monthly_sales else null end > 0.1 order by sum_sales - avg_monthly_sales, 3 limit 100;
For the unsorted version, the number of rows read are 1.4 Billion and 8.09GB Bytes read:
For the sorted version, the number of rows is 387M rows and 2.4GB Bytes so there is a huge difference!
Sorted tables, if used correctly, are one of the biggest performance features I’ve seen in the big data space in a long time.
Materialized views (MV) also support sorted columns. The view is stored as an Iceberg table on object storage and it sorted by the column(s) in the MV ddl:
CREATE MATERIALIZED VIEW s3lakehouse.demo_tpch.iceberg_vw_sorted with (storage_schema='mv_storage',sorted_by = ARRAY['custkey']) AS select * from "tpch"."sf1000"."customer"; REFRESH MATERIALIZED VIEW s3lakehouse.demo_tpch.iceberg_vw_sorted;
Here are the results of a very basic query:
select * from s3lakehouse.demo_tpch.iceberg_vw where custkey = 77746;
Non Sorted MV:
As you can see, from this very simple example, we read 100K rows vs. 370K. Now, imagine if this was a very large table with many partitions, the performance would be much higher and the amount of data read from your cloud object store would be greatly reduced. (saving $$$ as well)
Optimizing sorted Iceberg tables
In Iceberg, the optimize command looks for small files and combines them into larger ones improving performance for queries of all types.
Luckily, the Optimize command will sort the data based on the DDL of the table:
ALTER TABLE catalog_sales_sorted EXECUTE optimize
This command will optimize the
catalog_sales_sorted table by combining smaller files into larger ones that are sorted by the
cs_sold_date_sk column as noted above. This is very handy when you are streaming/micro-batching data into an Iceberg table and need to optimize it as given intervals and still want to benefit from the sorting.
Wrapping it up
There are many benefits of using Iceberg tables with Starburst Galaxy and adding the ability to sort tables by columns that are often used for filtering can provide a huge boost in performance and also save a bit on the pocketbook when it comes to cloud storage costs.
If you have any questions, please feel free to reach out to us. We have also launched Starburst Academy with many free courses including our Data Foundations, our self-paced, hands-on learning course which covers data lakes extensively.