Today, Starburst, the Trino company, announced the general availability of fully managed streaming ingestion from Apache Kafka to Apache Iceberg tables. With this new capability, users can now easily configure and ingest data at a verified scale of up to 100GB/second per Iceberg table at leading price-performance In addition, we are excited to announce the public preview of file loading coming in November 2024.
Businesses that require data to be available for analytics in their cloud data lake with minimal delay traditionally build complex ingestion systems that require cobbling together multiple tools and writing custom software to stream data into cloud data lakes. Alternatively, these organizations may rely on incomplete solutions that only handle the ingestion process. Both approaches tend to be fragile, difficult to scale, costly to maintain, and solve only part of the problem. After the data lands in the lake, it still needs to be transformed and optimized for efficient querying—requiring even more code, pipelines, tools, and added complexity. In addition, the pressure for cost optimization across analytics functions is increasing.
As described in the Icehouse Manifesto, the foundation to an Icehouse combines Trino and Iceberg for a data warehouse experience. And building on the foundation, an Icehouse implementation must provide data ingestion (e.g., streaming and batch), data governance (e.g., access control, data lineage, auditing, Iceberg data management (e.g., compaction, retention, snapshot expiration), and automatic capacity management (e.g., increase or decrease Trino cluster size).
In this blog, we’ll focus on the data ingestion and Iceberg data management capabilities in Starburst’s Icehouse implementation. We’ll expand beyond the initial public preview announcement to dive deep into how we simplified streaming ingestion to Iceberg tables, address common operational challenges, and deliver unmatched scalability and cost efficiency. We’ll also briefly describe the upcoming public preview of file loading.
Simplifying Streaming Data Ingestion: From Kafka to Iceberg
Applications, IoT, and other sources that generate data are a very rich source of data for performing certain types of analytics. Data exhaust from applications can be used for clickstream analytics, crash reporting, or ad conversions. IoT data such as industrial telemetry from oil rigs or car fleets can be used to detect and take action on operational efficiency. Or data from networks and other infrastructure can be used to detect security threats. This data is continuously generated and needs to reside somewhere in order to perform analytics. It is common to send this event data to a Kafka topic for further downstream processing and ingestion into a data warehouse or data lakehouse. Often, the size of this type of data is so large that the most practical solution from a scale and cost perspective is to store the data in a data lakehouse.
Built from the ground up without relying on Kafka Connect, Apache Flink, Spark Structured Streaming, or orchestration tools, we enable the creation of ingestion pipelines for any number of Kafka topics at massive scale and at significantly less tooling and infrastructure costs of alternative solutions. Our cost analysis shows that Starburst streaming ingest is up to 12 times cheaper compared to a leading vendor solution. It is also extremely easy; it only takes a few minutes to configure and use.
The simplicity of our approach is a significant advantage for every business pursuing faster and more accurate analytics in near real-time. Ingesting data from Kafka into the data lake presents significant challenges. Without the right tools, this process can be complex and error-prone, requiring intricate orchestration systems, manual tuning, and separate tools for ingestion and maintenance. Streaming ingestion in Galaxy changes the game by offering a fully managed, serverless solution that streamlines the process from start to finish.
Iceberg Data Maintenance
Ingesting data from Kafka to Iceberg tables does not provide an entire solution. Once the data is written to Iceberg, it needs to be continually maintained for performance, cost, and compliance requirements. To solve for this, Starburst also provides a serverless data maintenance system to help manage the entire lifecycle of Iceberg tables. After you set up your ingestion preferences, Starburst handles the rest with no need to ever tune or schedule processes in order to maintain a healthy state of your Iceberg tables. The data maintenance system provides compaction for good performance and optimal file sizes, snapshot expiration to delete older data snapshots reducing storage costs, orphaned file removal to remove unused files reducing storage costs, and statistics collection to help improve query performance through Trino’s cost based optimizer. Starburst’s data maintenance system also provides data retention so that users can adhere to compliance requirements by setting data retention periods.
Data maintenance is automatically enabled for any table Starburst creates and ingests into as part of streaming ingestion. It is designed to pair with the ingestion system so that they work in unison. This makes for a simpler solution than to use multiple tools that are neither coordinated nor aware of each other.
Let’s examine our approach in greater technical detail. You can also watch our talk from Datanova or watch a quick demo.
Tackling the Challenge of Scalability
There are two types of scale challenges with data ingestion from streaming systems. 1/ Data scale is the capacity to handle large volumes of data in a performant and efficient manner. 2/ Operational scale is the effort needed to operate a system in light of common data management issues and unintentional human errors. We will discuss the challenges with each of these with existing solutions on the market and how Starburst’s new streaming ingestion capability solves for them.
Data Scale
Challenge: Parallelism vs Compaction
Streaming ingestion scale is traditionally measured by data throughput (e.g., 1 GBps). For Kafka, data throughput is scaled by adding more Kafka partitions. This is because Kafka deployments have a maximum effective throughput for each partition. As users add more partitions to a Kafka topic, the total data throughput can scale to arbitrarily high rates since each partition is processed in parallel. However, while increasing the partition count allows for greater ingestion throughput, it introduces a significant challenge: balancing parallelism with file sizes.
Users must carefully balance data consumption parallelism to ensure that each ingestion task can handle its assigned partition workload. Excessive parallelism can lead to the creation of many small files, which degrades query performance and increases the workload for the background compactor, ultimately raising processing costs. Conversely, insufficient parallelism can result in data backlogs, where consumers struggle to keep up with the data flow, causing delays and reducing performance.
If a Kafka load remained fairly static over time, an administrator could likely hand-tune the system using common Kafka client tools and never have to adjust it again. However, the reality is that most systems need to plan for growth, and administrators typically don’t want to start a use case with the maximum number of partitions. As a result, there is usually an expectation that the number of partitions will increase over time as usage grows. This means that as more partitions are added, administrators will need to consider retuning the system to achieve peak performance. Therefore, maintaining an ideal balance becomes necessary.
With streaming ingestion in Galaxy, we built a dynamic load coordination system to handle the packing and rebalancing of ingestion tasks and partitions. This ensures that there is enough parallelism to keep up with the data flow while also optimizing the output Iceberg files to be as pre-compacted as possible. This results in more efficient queries on the freshest data, and compaction requires less work later on. For example, when backfilling data, the streaming ingestion output Iceberg files often arrive in a fully compacted format from the start!
For comparison, the classic way of handling this with most Kafka-native consumers (e.g., Kafka Connect, Flink, Spark Structured Streaming) is to use the default Kafka broker-based partition load-balancing strategy. This approach blindly distributes partitions uniformly across a static maximum number of processing tasks. This setup must be manually tuned because the load balancer is not sensitive to the specific workload’s parameters.
To further illustrate this point, the diagram below shows that as parallelism increases, the file sizes decrease, resulting in a greater need for compaction. Ideally, you want to be at the point marked by the red circle. Specifically, you need just enough parallelism so you aren’t backlogged while still being able to write files that are compacted on write for efficiency gains. Starburst’s dynamic load coordination automatically maintains this sweet spot without requiring any user configuration.
Challenge – Commit Contention
As ingestion processing parallelism scales up, a downstream concern arises regarding how to commit Iceberg data files without causing excessive commit contention. It’s important to note that Iceberg uses an optimistic concurrency strategy for table commits, meaning that after a certain number of concurrent actors, the contention overhead increases to the point where progress becomes difficult. Early tests show that somewhere between 10 and 20 concurrent committers can lead to unacceptable levels of commit contention.
To address this problem, we built a custom commit coordination service and protocol that efficiently allows commits to proceed without contention. This commit service is itself a scalable distributed system. With these advancements, streaming ingestion can seamlessly and efficiently scale from small use cases to large workloads without any manual tuning. The system automatically scales as new Kafka partitions are added and as data volume increases.
Operational Scale
A lot of emphasis is often placed on data throughput scale, but it’s important to also consider “operational scale”—that is, how much effort is required to operate the system when facing common operational issues. Starburst streaming ingestion has been built from the ground up to provide administrators with a powerful set of tools to handle these common challenges with ingestion.
Challenge – Head-of-Line Blocking
One of the most common issues in Kafka-based ingestion is head-of-line blocking. This occurs when invalid or misconfigured records are encountered, causing the entire ingestion process to halt until the issue is resolved. Since there is no way to purge data from a Kafka queue, Kafka ingestion systems must handle this issue. In many systems, this requires manual intervention, leading to downtime and potential cascading failures in the pipeline.
We address this with a transactional dead-letter queue (DLQ), represented in an Iceberg table called the errors table. Instead of halting the entire process, Starburst diverts failed records into the errors table, allowing ingestion to continue without interruption. Importantly, this dead-lettering system is fully transactional, meaning records are either committed to the output table or the errors table, ensuring no data is lost or duplicated.
Challenge – Misconfigured Schema
With flexible data formats like JSON, it’s quite common for a data engineer to initially set up a specific table loading schema, only to realize later that they had made incorrect assumptions about the structure of the data. This can result in loading errors if there’s a type mismatch (e.g., a field was assumed to be a number but contained a struct) or missing columns if a rarely occurring column wasn’t specified. Identifying the root causes of these issues in popular ingestion systems can be quite challenging, often requiring a deep dive into server logs filled with cryptic messages.
Starburst streaming ingestion provides an error detection, classification, and notification system that immediately alerts users to what is happening and why. Once a user has identified these issues, they often want to fix the schema and recover any affected data. In most other ingestion systems, this process is not straightforward and often requires complex modifications to the table and manual adjustments to the ingestion checkpoint states.
Additionally, in an upcoming feature, our streaming ingestion will include a “reset and replay” capability that allows users to update the loading schema and then roll back the Iceberg table (leveraging Iceberg snapshotting) and loading process to a prior state—all in one step. The entire process is transactional across the ingestion processes, the output table, and the errors table, ensuring that no data is duplicated or lost. This is a unique feature that doesn’t exist anywhere else.
Challenge – Temporary Load Spikes and Backlogs
Nearly all Kafka-based ingestion systems process data in a strict linear fashion, meaning the only way for these systems to handle a backlog is to process it sequentially until they catch up. For example, if six hours of excess data is generated, it could take another six hours for downstream systems to return to normal. During this time, downstream processing systems handle increasingly outdated records, reducing the visibility of current data. For instance, a dashboard showing the current number of user actions might display data from an hour ago, when it should be showing data from just 10 minutes ago.
We offer two unique solutions in Galaxy to address this issue with two upcoming features very soon:
- Backfill with Recency Bias: This feature allows users to prioritize ingesting recent data while backfilling older data in the background. This ensures that dashboards and analytics reflect the most current information without sacrificing data completeness.
- Backfill Boost: This feature allows multiple consumers to process a single Kafka partition concurrently during backfill, significantly increasing throughput and reducing the time required to catch up after a data spike.
Going beyond streaming to include file loading
In just a few weeks, we are expanding our ingestion capabilities by introducing file loading, providing customers with a powerful, automated alternative to DIY or off-the-shelf solutions. This feature, available within Starburst Galaxy, will read, parse, and write records from files directly into Iceberg tables in S3. File loading will also leverage the new ingestion capabilities discussed above, automatically optimizing tables for read performance through features like compaction, snapshot retention, orphaned file removal, and statistics collection.
Conclusion
By simplifying streaming ingestion, automating maintenance, and delivering unbeatable scalability and cost efficiency, Starburst empowers organizations to leverage their streaming data effectively and become more agile and responsive. Whether you’re handling petabytes of data or just getting started, Starburst offers a seamless, Trino SQL-based solution for near real-time analytics. Start your free trial today of Starburst Galaxy to experience industry-leading data discovery and access, price-performance SQL analytics, and the simplicity of data products to democratize data and power your analytics and AI.
Want to know more? This video demo walks you through streaming ingest using Starburst Galaxy.