The 8 AM Heartbeat Moments Before Your Data Pipelines Go Live

Verifying Cluster Availability with Starburst PyStarburst

Share

Linkedin iconFacebook iconTwitter icon

More deployment options

Imagine you’re a data engineer, ultimately responsible for the successful execution of your organization’s data pipelines. It is 8:00 AM, and you are brewing your coffee while reviewing emails. In an hour, a critical automated data auditing job is scheduled to start. This process is a complex, high-compute operation designed to detect data drift and validate schemas for petabytes of federated data across Amazon S3 buckets, Snowflake clusters, and relational databases, including Postgres. 

Even though it’s a routine weekly process, the stakes are enormous. If the Starburst cluster’s worker nodes responsible for these comparisons and computations are degraded, or if the cluster is experiencing version drift, the audit pipeline will fail. This could lead to data quality reports being stale and late, not the way anyone wants to start their day.

To prevent this, your team needs a peace-of-mind script that audits the cluster topology, verifies worker health, and then detects latency issues before the first expensive audit query is executed.

The old way of solving this problem would be to have a complex combo of raw SQL statements and manual query execution. This is where the PyStarburst DataFrame API steps in. By treating the cluster’s internal state as a first-class Python object, we ensure type safety, modularity, and seamless integration into modern data pipelines. 

Let’s walk through the code step by step.

Initialization

We start with some boilerplate Python code. A few imports, then capture the configuration needed to reference our cluster coordinator’s host and port, user credentials, and optionally root ourselves to a specific catalog and schema.

from pystarburst import Session
import json
configs = {
     "host": "localhost",
     "port": "8080",
     "http_scheme": "http",
     "user": "admin",
     "schema": "sf1",
     "catalog":"tpch"
}

With that needed configuration set, we create a session object that we can interact through. Fortunately, a builder pattern is provided that allows us to present the configs dictionary and then build the session object we need.

session = Session.builder.configs(configs).create()

When executed, the function above attempts to establish a connection and validates that the coordinator is available. 

DataFrames

The foundational collection container for data records with PyStarburst is called a DataFrame. The code below shows how the session can be used to assign a table reference to a DataFrame. 

unfiltered_df = session.table("system.runtime.nodes")

Here we query the built-in Trino system catalog that provides metadata about the actual cluster. The DataFrame does not actually have the data from the nodes table – it holds details about where that table is located and its structure. The DataFrame API often performs lazy evaluations such as this.

Transformations

Like the example above that created a DataFrame, most functions that you can call against a given DataFrame fall into this lazy evaluation model. These functions are called transformations. A given DataFrame represents an immutable set of records, even if they have not been populated yet with concrete data. If we need to make any changes to the DataFrame, a transformation is used to construct a new one.

The following code uses the DataFrame from the prior section and filters from it a new collection of records representing the active worker nodes in the Starburst cluster we are connected to.

filtered = unfiltered_df.filter(col("state) == lit("active")) and (col("coordinator) == lit(False))

The filter function is a transformation. As before, it creates a new DataFrame, but it does not yet contain any of the data from the actual table. PyStarburst is building an abstract syntax tree with all the transformation functions that are being identified. This is called the DataFrame lineage. No data operations have yet occurred on the Starburst cluster. Instead, PyStarburst is keeping track of the work being defined that will eventually be executed on the Starburst cluster.

We are performing another transformation function with this next snippet. It narrows down the records to specific columns. It builds the DataFrame lineage that much more.

discovery_df = filtered.select(
     col("node_id"),
     col("http_uri"),
     col("state"),
     col("version")
)

We only need these 4 cluster topology columns for our requirements. It may appear that we already performed these 3 separate options, one after another.

  1. Read all the data from the nodes table from the data lake.
  2. Looped through all records to keep only those representing active worker nodes.
  3. And yet another loop to project down to only 4 columns.

If PyStarburst were actually doing that, it would surely be inefficient, especially if we had many more transformations being called. Fortunately, that is where the power of lazy evaluation on the DataFrame lineage is helping us out. PyStarburst is keeping track of your intentions (the lineage) and has not yet turned them into action that needs to be executed on the cluster.

Actions

The counterpart to the transformation functions is called actions. They are events that require some actual data to be created for display, storing, or additional processing outside of the DataFrame API. The following “eager” code results in a physical Python collection being created based on logical requirements identified in the lineage until this point.

rows  = discovery_df.collect()

This is the Trino SQL statement that our PyStarburst code ultimately executes on the Starburst cluster.

SELECT
 node_id,
 http_uri,
 state,
 version
FROM system.runtime.nodes
WHERE state = 'active' AND coordinator = false;

Seeing the SQL should help you understand that the most efficient SQL implementation is constructed from the logical activities captured in the DataFrame lineage built up by the transformation functions. 

PyStarburst provides a programmatic interface to the Starburst cluster. The action function triggers the translation of the DataFrame lineage into SQL, which is then run through the coordinator’s query optimizer and executed in the cluster. Then, the results are sent back to the client – the Python program using the DataFrame API in this case. 

Creating a report

Now that the Python program has a fully populated collection of records from the nodes table, a simple bit of code can loop through the node records and produce a report of the state of the cluster.

For row in rows:
       report_data.append(row.as_dict())
print(json.dumps(report_data, indent=4))
#assuming we’re expecting to have 10 nodes
If len(report_data) < 10:
    Raise SystemExit(“CRITICAL: Cluster degraded. Audit aborted to save Compute costs.”)

A Python program can continue to use the DataFrame API to do additional work, and the pattern described earlier will continue. Lazy transformation operations will create logical lineage information until an action operation triggers the work to be sent to the Starburst cluster for execution. 

After completing the Starburst cluster, the developer just needs to perform a cleanup operation to wind down the connection to the cluster. 

session.stop()

By 8:15 AM, the rest of the team has finished pouring their coffee. You’ve checked the report the audit script has generated and verified that the cluster is healthy and available. No more hoping the cluster is healthy. Just worry-free validation that saves the organization time and money.

Start for Free with Starburst Galaxy

Try our free trial today and see how you can improve your data performance.
Start Free