
Announcing Starburst’s AWS EKS Marketplace Listing

Raghwendra Singh
Software Development Engineer 4
Meesho
Raghwendra Singh
Software Development Engineer 4
Meesho
Share
Our data was scaling at an increasing rate. We needed more advanced compute engines with the ability to scale, audit, and run existing and newer workloads with better control.
Meesho’s first data lake consisted of a framework that ingested data from different pipelines into a managed data warehouse. We had issues scaling the managed data warehouse due to read after write locks, and storage coupled with compute, increasing cluster costs.
To overcome these scaling issues, we moved towards a data lake architecture where our data is written in an object store with partitioning that allows us to write data in append, upsert and overwrite modes. A Hive metastore (HMS) is used so that different compute engines like Starburst Enterprise Platform (SEP) & Spark can be employed for different use cases.
We use SEP as one of the engines because it provides similar or better query performance compared to a managed data warehouse for dashboard data sources, and ad hoc queries from Zeppelin and the long running queries. In addition, SEP provides the following benefits:
Our team had not transitioned fully to Kubernetes-based deployments when we initially deployed SEP. Instead, we used SEP’s Cloudformation template (CFT) to create multiple clusters designed for different workloads:
The SEP cluster deployed using CFT on r5.12xlarge instances, providing access to Delta Lake and Hive catalogs. HMS and Ranger for RBAC provide supporting services.
Once we were up and running, we encountered the following cluster design issues:
To solve these problems, we ran a PoC on a new SEP cluster deployed on Kubernetes with Starburst’s SEP Helm charts.
We kicked off a proof-of-concept cluster in Kubernetes using the same instance type r5.12xlarge for the nodegroups to keep performance the same, and running 1 pod per worker utilizing ~80-85% of the memory from the instance. As the SEP images were in the us-east-1 repository, image pull times were higher, so we cloned the images in Meesho’s production ECR repository to reduce pull time, and therefore overall pod launch time.
To recreate the different query workloads for short running and long running queries, we designed a query replay system that allows you to select a desired query run length, and executes a matching query to check performance. With this system, we can execute queries on the same schedule as they are run on the production SEP cluster.
Additionally, we added a regression task in our Airflow DAG. We enable this task when we want to do performance testing during design iterations without affecting the production workflow performance.
With the migration to Kubernetes, we also wanted to utilize spot instances for running queries in a fault tolerant manner. We enabled fault toleration execution (FTE) in SEP configured with an exchange manager and a task retry policy for short- (three minute) and medium-running (15 minutes ) queries. We found that the query run times increased three-fold using S3 as a buffer. Similarly, long-running queries taking less than 10 minutes increased the runtime by 1.5x to 2x or more using a task retry policy.
Given these results, we next used the FTE with a query retry policy without configuring the exchange manager. This uses memory to store the intermediate data from the tasks. We found that this produced similar performance as without FTE configured, but still executes query retries if a node is terminated due to spot instance preemption.
After running SEP in an Kubernetes cluster with FTE query retry policies, it was clear that for shorter running queries the runtime was close to those observed in the CFT-based cluster. However, long-running queries were still slower by ~1.5x.
For the most part, the Kubernetes cluster used the same configuration as the CFS cluster, except using multiple availability zones (AZs) for the Kubernetes node group. While running the same queries on both clusters with the same resources at the same time, we also observed that the Kubernetes cluster processed half the data in bytes per second as the CFT-based cluster.
We then switched the Kubernetes cluster to run in a single availability zone, and got the same performance as observed on the CFT cluster.
In this iteration, we switched to running multiple pods on larger r5.16xlarge and r5.24xlarge instance types. This did not yield better performance compared to running one pod per instance of r5.12xlarge type, so we reverted to using one pod per r5.12xlarge instance.
Once we were sure we had our basic Kubernetes cluster design correct, we added stability-related features.
As we wanted to roll out the queries from the CFT-based clusters to the new Kubernetes clusters gradually while adding high availability, we implemented Lyft’s Presto gateway.
With this architecture, we are able to distribute short-, medium- and long-running queries in round-robin fashion between the Kubernetes and CFT clusters running at 50% capacity with the same resources and configuration with high availability.
The gateway also allows us to set up clusters in multiple AZs, increasing our spot instance access. This enables us to do blue/green deployments, achieving zero downtime even during updates.
To support safely downscaling without query failures, we configured different graceful shutdown periods on each cluster based on the maximum query runtime time of the workload each cluster was designated to handle.
We also added a set of scheduled, CPU-based auto scaling policies for the worker autoscaling groups (ASGs) in the CFT-based clusters, enabling us to scale the cluster according to the active users and query pattern on the different clusters.
To add similar functionality in the Kubernetes cluster worker ASGs, we added lifecycle hooks on termination to keep the terminating node in its terminating wait state for the graceful shutdown period. We also added a node termination handler, which captures node termination events and triggers a sigterm on the pod running on the instance marked for termination. This allows us to use the scheduled scaling policies in worker nodegroups with the Kubernetes HPA horizontal pod autoscaler (HPA).
Finally, we wanted to also terminate the instances where the pod was running when the pods were being terminated. To achieve this, we added a sidecar to run the aws-cli container in the worker pods, which allows us to terminate the instance running the terminating worker pods.
We rolled Kubernetes clusters out to production by creating separate nodegroups for the coordinator and for the workers in each Kubernetes cluster. This allows us to have separate instance types for the coordinator and workers, as well as allowing us to specify different spot instance percentages for worker nodes.
Existing workloads were shifted until we had equivalent loads in CFT and Kubernetes. Once we had shifted a full 50% of queries from CFT-based clusters to their Kubernetes-based counterparts, we were able to compare the performance of the queries and the cost differences on both clusters and found the following:
We wanted to better utilize spot instances on the different ad hoc clusters based on the average query runtimes of their workloads, and set the spot instance targets for peak periods as follows:
For non-peak periods, we wanted to run with higher spot percentages, and even higher overnight when usage, to around 95%. To configure these different spot percentages, we created a cron-based system that schedules the on-demand percentages for the different worker autoscaling groups.
To ensure that we minimize launch failures stemming from the unavailability of the prescribed percentages of spot instances, we wanted to dynamically increase the on-demand percentage. To do this, we created a fallback framework where, on every spot instance of launch failure, an SQS topic notification is sent which triggers a lambda function that increases the worker nodegroup ASG on-demand percentage by a given percent.
With this new system, if a situation arises where there are 0 spot instances available over a duration that causes the on-demand percentage to increase to 100%, the cron-based system will eventually reset the percentages to the optimal values for the worker nodegroup ASG.
Combined, the cron system and fallback framework together help us reduce costs by approximately 30%.
TL;DR
The migration of Trino clusters from AWS CFT to K8s allows us to:
Activate the data in and around your data lake