×

Tutorial: Using Trino and Apache Iceberg for data warehousing

Last Updated: February 6, 2024


This tutorial demonstrates how to use Trino’s support for Apache Iceberg for data warehousing, including how to use MERGE, UPDATE and DELETE commands to build and maintain reliable datasets.

Prerequisites

For this tutorial, you’ll need Trino connected to an Iceberg catalog.

If you don’t already have Trino running, the fastest way to try it is using Tabular and Starburst Galaxy.

Both services offer free accounts that you can sign up for by clicking the links below:

NYC Taxi Data

This tutorial uses the NYC Taxi dataset. The raw data is available as Parquet files, and has already been loaded in Tabular’s examples database/schema.

The data comes from NYC yellow taxi ride reports and includes the pickup time, dropoff time, approximate locations, number of passengers, distance, and fare information like fare, tips, and taxes.

In this tutorial, you’ll learn how use Trino and Iceberg to:

  1. Create a table with helpful descriptions
  2. Clean and deduplicate incoming data
  3. Build a reliable and idempotent import pipeline
  4. Improve import performance
  5. Make targeted changes to existing data
  6. Drop records that should not be retained

1. Creating a table

The first step is to create a table in Trino.

CREATE TABLE nyc_taxi_yellow (
    vendor_id int COMMENT 'TPEP provider code; 1=Creative Mobile Technologies, LLC; 2=VeriFone Inc.',
    pickup_time timestamp(6) with time zone COMMENT 'Date and time when the meter was engaged',
    pickup_location_id int COMMENT 'Location ID where the meter was engaged',
    dropoff_time timestamp(6) with time zone COMMENT 'Date and time when the meter was disengaged',
    dropoff_location_id int COMMENT 'Location ID where the meter was disengaged',
    passenger_count int COMMENT 'Number of passengers in the vehicle (driver entered)',
    trip_distance double COMMENT 'Elapsed trip distance in miles reported by the meter',
    ratecode_id int COMMENT 'Final rate code in effect when the trip ended; 1=Standard Rate; 2=JFK; 3=Newark; 4=Nassau or Westchester; 5=Negotiated fare; 6=Group ride',
    payment_type int COMMENT 'How the passgener paid; 1=Credit card; 2=Cash; 3=No charge; 4=Dispute; 5=Unknown; 6=Voided trip',
    total_amount double COMMENT 'Total amount charged to passengers; cash tips not included',
    fare_amount double COMMENT 'Time-and-distance fare in USD calculated by the meter',
    tip_amount double COMMENT 'Tip amount; automatically populated for credit card tips; cash tips not included',
    tolls_amount double COMMENT 'Total amount of all tolls paid in trip',
    mta_tax double COMMENT 'MTA tax automatically triggered based on the metered rate in use; $0.50',
    improvement_surcharge double COMMENT 'Improvement surcharge assessed trips at the flag drop; $0.30',
    congestion_surcharge double COMMENT 'Congestion surcharge',
    airport_fee double COMMENT 'Airport fee',
    extra_surcharges double COMMENT 'Misc. extras and surcharges; $0.50 and $1.00 rush hour and overnight charges',
    store_and_forward_flag varchar COMMENT 'Whether the trip record was held in vehicle memory; Y(es)/N(o)')
COMMENT 'NYC Yellow Taxi trip records dataset from the NYC Taxi & Limousine Commission; https://www1.nyc.gov/site/tlc/about/tlc-trip-record-data.page'
WITH (
    partitioning = ARRAY['month(pickup_time)'],
    format = 'PARQUET'
)

This follows a few best practices:

  • Use COMMENT clauses to add column and table descriptions
  • Use the partitioning property to configure the table layout for faster queries
  • Use PARQUET format for the widest compatibility with other frameworks and engines

To see the table’s details, use DESCRIBE:

DESCRIBE nyc_taxi_yellow
     Column         |            Type             | Extra |                                                                  Comment                                                                  
------------------------+-----------------------------+-------+-------------------------------------------------------------------------------------------------------------------------------------------
 vendor_id              | integer                     |       | TPEP provider code; 1=Creative Mobile Technologies, LLC; 2=VeriFone Inc.                                                                  
 pickup_time            | timestamp(6) with time zone |       | Date and time when the meter was engaged                                                                                                  
 pickup_location_id     | integer                     |       | Location ID where the meter was engaged                                                                                                   
 dropoff_time           | timestamp(6) with time zone |       | Date and time when the meter was disengaged                                                                                               
 dropoff_location_id    | integer                     |       | Location ID where the meter was disengaged                                                                                                
 passenger_count        | integer                     |       | Number of passengers in the vehicle (driver entered)                                                                                      
 trip_distance          | double                      |       | Elapsed trip distance in miles reported by the meter                                                                                      
 ratecode_id            | integer                     |       | Final rate code in effect when the trip ended; 1=Standard Rate; 2=JFK; 3=Newark; 4=Nassau or Westchester; 5=Negotiated fare; 6=Group ride 
 payment_type           | integer                     |       | How the passgener paid; 1=Credit card; 2=Cash; 3=No charge; 4=Dispute; 5=Unknown; 6=Voided trip                                           
 total_amount           | double                      |       | Total amount charged to passengers; cash tips not included                                                                                
 fare_amount            | double                      |       | Time-and-distance fare in USD calculated by the meter                                                                                     
 tip_amount             | double                      |       | Tip amount; automatically populated for credit card tips; cash tips not included                                                          
 tolls_amount           | double                      |       | Total amount of all tolls paid in trip                                                                                                    
 mta_tax                | double                      |       | MTA tax automatically triggered based on the metered rate in use; $0.50                                                                   
 improvement_surcharge  | double                      |       | Improvement surcharge assessed trips at the flag drop; $0.30                                                                              
 congestion_surcharge   | double                      |       | Congestion surcharge                                                                                                                      
 airport_fee            | double                      |       | Airport fee                                                                                                                               
 extra_surcharges       | double                      |       | Misc. extras and surcharges; $0.50 and $1.00 rush hour and overnight charges                                                              
 store_and_forward_flag | varchar                     |       | Whether the trip record was held in vehicle memory; Y(es)/N(o)                                                                            
(19 rows)

2. Insert and inspect rows

Next, insert the records for Feb 2019.

In Tabular, the raw parquet data is available as the examples.raw_nyc_taxi_yellow table, partitioned by month:

INSERT INTO nyc_taxi_yellow
SELECT
    s.VendorID,
    s.tpep_pickup_datetime,
    s.PULocationID,
    s.tpep_dropoff_datetime,
    s.DOLocationID,
    s.passenger_count,
    s.trip_distance,
    s.RatecodeID,
    s.payment_type,
    s.total_amount,
    s.fare_amount,
    s.tip_amount,
    s.tolls_amount,
    s.mta_tax,
    s.improvement_surcharge,
    s.congestion_surcharge,
    s.airport_fee,
    s.extra,
    s.store_and_fwd_flag
FROM examples.raw_nyc_taxi_yellow s
WHERE month = '2019_02'

To view the newly inserted data, run a SELECT query. Don’t forget to use LIMIT to select just a few rows:

SELECT * FROM nyc_taxi_yellow LIMIT 10
 vendor_id |          pickup_time           | pickup_location_id |          dropoff_time          | dropoff_location_id | passenger_count | ... 
-----------+--------------------------------+--------------------+--------------------------------+---------------------+-----------------+-----
         2 | 2038-02-17 20:36:24.000000 UTC |                263 | 2038-02-17 20:39:22.000000 UTC |                 262 |               1 | ... 
         2 | 2038-02-17 21:46:29.000000 UTC |                249 | 2038-02-17 21:53:20.000000 UTC |                 114 |               1 | ... 
         2 | 2038-02-17 21:55:54.000000 UTC |                113 | 2038-02-18 21:13:21.000000 UTC |                 239 |               1 | ... 
         2 | 2019-08-17 21:17:29.000000 UTC |                230 | 2019-08-17 22:06:20.000000 UTC |                 265 |               1 | ... 
         2 | 2019-09-29 13:54:19.000000 UTC |                132 | 2019-09-29 14:09:04.000000 UTC |                 205 |               1 | ... 
         2 | 2019-08-03 14:20:58.000000 UTC |                100 | 2019-08-03 14:29:01.000000 UTC |                 233 |               6 | ... 
         2 | 2019-08-03 14:30:06.000000 UTC |                233 | 2019-08-03 14:42:04.000000 UTC |                 229 |               6 | ... 
         2 | 2019-08-12 05:03:06.000000 UTC |                141 | 2019-08-12 05:08:04.000000 UTC |                 237 |               2 | ... 
         2 | 2019-08-17 14:15:05.000000 UTC |                231 | 2019-08-17 14:45:40.000000 UTC |                 129 |               1 | ... 
         2 | 2009-01-01 01:09:35.000000 UTC |                141 | 2009-01-01 01:14:02.000000 UTC |                 140 |               1 | ... 
(10 rows)

Oh no! You inserted data from Feb 2019, so why are there rows from 2038?

Looks like the source data files contains some bad rows from other months and even some from the future.

Luckily, Iceberg has automatically partitioned the data and put the rows in the correct partitions:

SELECT * FROM "nyc_taxi_yellow$partitions"
        partition        | record_count | file_count | total_size | ... 
-------------------------+--------------+------------+------------+-----
 {pickup_time_month=591} |           30 |          2 |       8181 | ... 
 {pickup_time_month=592} |           36 |          1 |       4775 | ... 
 {pickup_time_month=589} |      7048744 |          2 |  107531883 | ... 
 {pickup_time_month=590} |          124 |          2 |      11060 | ... 
 {pickup_time_month=595} |            5 |          1 |       3549 | ... 
 {pickup_time_month=467} |           22 |          2 |       7649 | ... 
 {pickup_time_month=596} |            1 |          1 |       2942 | ... 
 {pickup_time_month=468} |           61 |          2 |       8926 | ... 
 {pickup_time_month=817} |            4 |          2 |       6368 | ... 
 {pickup_time_month=593} |           26 |          2 |       7984 | ... 
 {pickup_time_month=594} |           10 |          2 |       7141 | ... 
 {pickup_time_month=588} |          307 |          1 |      10617 | ... 
(12 rows)

3. Build a reliable pipeline

Because the data has bad rows, you want to build a reliable pipeline and clean up the data.

To build that pipeline, use MERGE INTO. With MERGE, you can match incoming rows with the rows already in the table, and instruct Trino how to handle rows that match, don’t match, and custom cases. You can use MERGE to deduplicate incoming rows so the writes are idempotent: if the same rows appear more than once, only one will make it into the final dataset.

MERGE works with two datasets, the target table where data will be written and the source dataset that contains updates. In this case, nyc_taxi_yellow is the target table and examples.raw_nyc_taxi_yellow contains the source rows.

First, prepare the source rows. MERGE requries that there is only matching source row for any given target row. Otherwise, there would be two different ways to update a given target row. Preparing the source data requires deduplicating the incoming rows by trip information: the pickup/dropoff times and locations.

-- deduplicate rows
SELECT * FROM (
    SELECT *,
        row_number() over (
            PARTITION BY VendorID, tpep_pickup_datetime, PULocationID, tpep_dropoff_datetime, DOLocationID
        ) AS row_num
    FROM examples.raw_nyc_taxi_yellow
    WHERE month = '2019_03')
WHERE row_num = 1
LIMIT 10

Next, use the deduplicated target rows for the MERGE command’s source table, in the USING clause.

For this use case, the only action is to insert a row if there was no matching row already in the target table. Keep in mind that merge commands can do much more!

MERGE INTO nyc_taxi_yellow t
USING (
    -- deduplicate rows
    SELECT * FROM (
        SELECT *,
            row_number() over (
                PARTITION BY VendorID, tpep_pickup_datetime, PULocationID, tpep_dropoff_datetime, DOLocationID
            ) AS row_num
        FROM examples.raw_nyc_taxi_yellow
        WHERE month = '2019_03')
    WHERE row_num = 1) s
ON
    t.vendor_id = s.VendorID AND
    t.pickup_time = s.tpep_pickup_datetime AND
    t.pickup_location_id = s.PULocationID AND
    t.dropoff_time = s.tpep_dropoff_datetime AND
    t.dropoff_location_id = s.DOLocationID
WHEN NOT MATCHED THEN INSERT (
    vendor_id, pickup_time, pickup_location_id, dropoff_time, dropoff_location_id, passenger_count, trip_distance, ratecode_id, payment_type, total_amount, fare_amount, tip_amount, tolls_amount, mta_tax, improvement_surcharge, congestion_surcharge, extra_surcharges, store_and_forward_flag)
VALUES (
    s.VendorID, s.tpep_pickup_datetime, s.PULocationID, s.tpep_dropoff_datetime, s.DOLocationID, s.passenger_count, s.trip_distance, s.RatecodeID, s.payment_type, s.total_amount, s.fare_amount, s.tip_amount, s.tolls_amount, s.mta_tax, s.improvement_surcharge, s.congestion_surcharge, s.extra, s.store_and_fwd_flag)
MERGE: 7846177 rows

To make sure the MERGE is idempotent, run the same command a second time to verify that no additional rows are added.

MERGE: 0 rows

It works!

4. Improve performance

After importing data for April, May, and June, it looks like the merge command takes longer and reads more rows every time it runs:

2019_05: 3:52 [30M rows, 314MB] [129K rows/s, 1.36MB/s]
2019_06: 3:32 [36.9M rows, 382MB] [174K rows/s, 1.8MB/s]

The problem is that every run needs to match incoming rows against a larger and larger target table.

To solve this problem, add filters to the ON clause for both the incoming rows — to filter out bad data — and the target table where no rows could match.

MERGE INTO nyc_taxi_yellow t
USING (
    -- deduplicate rows
    SELECT * FROM (
        SELECT *,
            row_number() over (
                PARTITION BY VendorID, tpep_pickup_datetime, PULocationID, tpep_dropoff_datetime, DOLocationID
            ) AS row_num
        FROM examples.raw_nyc_taxi_yellow
        WHERE month = '2019_07')
    WHERE row_num = 1) s
ON
    t.vendor_id = s.VendorID AND
    t.pickup_time = s.tpep_pickup_datetime AND
    t.pickup_location_id = s.PULocationID AND
    t.dropoff_time = s.tpep_dropoff_datetime AND
    t.dropoff_location_id = s.DOLocationID AND
    t.pickup_time >= DATE '2019-07-01' AND t.pickup_time < DATE '2019-08-01' AND
    s.tpep_pickup_datetime >= DATE '2019-07-01' AND s.tpep_pickup_datetime < DATE '2019-08-01'
WHEN NOT MATCHED THEN INSERT (
    vendor_id, pickup_time, pickup_location_id, dropoff_time, dropoff_location_id, passenger_count, trip_distance, ratecode_id, payment_type, total_amount, fare_amount, tip_amount, tolls_amount, mta_tax, improvement_surcharge, congestion_surcharge, extra_surcharges, store_and_forward_flag)
VALUES (
    s.VendorID, s.tpep_pickup_datetime, s.PULocationID, s.tpep_dropoff_datetime, s.DOLocationID, s.passenger_count, s.trip_distance, s.RatecodeID, s.payment_type, s.total_amount, s.fare_amount, s.tip_amount, s.tolls_amount, s.mta_tax, s.improvement_surcharge, s.congestion_surcharge, s.extra, s.store_and_fwd_flag)
MERGE: 6297927 rows
2:37 [6.31M rows, 88.7MB] [40.2K rows/s, 578KB/s]

Problem solved: the query finished much more quickly.

The SQL is a bit more complicated, but worth the performance improvement.

5. Fix existing data

What if you need to update older data in a table to fix a problem?

For example, the NYC taxi dataset was de-anonymized in 2014 and had to be updated by reducing location accuracy.

To avoid a similar problem, you want to round the trip distances to the nearest mile. You can use the UPDATE command to modify existing rows.

Use filters to break the problem down into smaller tasks that modify one month at a time.

UPDATE nyc_taxi_yellow
SET trip_distance = round(trip_distance)
WHERE pickup_time < DATE '2019-02-01'
UPDATE: 390 rows

Verify the data was updated correctly:

SELECT trip_distance FROM nyc_taxi_yellow WHERE pickup_time < DATE '2019-02-01' LIMIT 10
trip_distance 
---------------
           1.0 
           2.0 
           1.0 
          15.0 
           0.0 
           6.0 
           4.0 
           2.0 
           1.0 
           2.0 
(10 rows)

You can use time travel to view the old data before the snapshots age out of table metadata.

To view history, select from the $history metadata table.

SELECT * FROM "nyc_taxi_yellow$history";
       made_current_at       |     snapshot_id     |      parent_id      | is_current_ancestor 
-----------------------------+---------------------+---------------------+---------------------
 ...                         | ...                 | ...                 | ...
 2023-03-26 18:16:37.207 UTC | 8913164221797263216 |  258142289701857512 | true                
 2023-03-26 18:44:36.289 UTC | 9212304825531099487 | 5180778067236676580 | true

To read a specific snapshot, use FOR VERSION AS OF.

SELECT trip_distance
FROM nyc_taxi_yellow FOR VERSION AS OF 8913164221797263216
WHERE pickup_time < DATE '2019-02-01'
LIMIT 10
 trip_distance 
---------------
          1.19 
          0.86 
          1.36 
          3.75 
          3.71 
          0.97 
          9.53 
         14.13 
          1.41 
          1.19

6. Remove protected rows

Another common problem is needing to remove rows that can or should no longer be retained. You can drop rows using DELETE FROM.

For example, what if a particular pickup location doesn’t want their data shared anymore? Let’s say the “Alphabet City” location, id 4, wants their data removed.

DELETE FROM nyc_taxi_yellow WHERE pickup_location_id = 4
DELETE: 107045 rows
SELECT * FROM nyc_taxi_yellow WHERE pickup_location_id = 4
(0 rows)

Next steps

This tutorial showed you how to build a reliable and idempotent data pipeline with Trino and Iceberg. Next, try it out on your own data! If you don’t have Trino and Iceberg available to you, sign up for free and learn more with Tabular and Starburst Galaxy!

This tutorial was originally published on Tabular.

 

Start Free with
Starburst Galaxy

Up to $500 in usage credits included

  • Query your data lake fast with Starburst's best-in-class MPP SQL query engine
  • Get up and running in less than 5 minutes
  • Easily deploy clusters in AWS, Azure and Google Cloud
For more deployment options:
Download Starburst Enterprise

Please fill in all required fields and ensure you are using a valid email address.

s