Organizations across industries are choosing Starburst Galaxy to power a range of data applications, from real-time cyber threat prediction software to AI-driven recommendation engines. With our compromise-free approach, teams can scale from their first hundred users to their next thousand—without incurring growth-prohibitive operating costs.

In this tutorial, we will walk you through the process of creating a customer analysis application using Galaxy + PyStarburst, hosted on Hugging Face. Sitting at the foundation of the app is Starburst Galaxy, acting as the engine for collecting, analyzing, governing, and distributing data to users. PyStarburst brings the power of Python functionalities to Starburst, while Ibis serves as the powerful toolkit for expressing analytical queries. Finally, we've integrated with OpenAI to ask some basic questions in natural language.

Here is a diagram showing the high-level architecture.

Prerequisite: Setting Up Your Environment

First, you'll need a Starburst Galaxy account. If you don't already have one, you can sign up here.

We're going to use the out of the box catalog called sample and the schema "burstbank" that has some mock customer data we'll use (no extra setup needed).

(Optional) have an OpenAI API key if you want to test the integration with ChatGPT to add natural language to your application.

(Optional) have a huggingface account if you want to host your application on Spaces

Clone the Starburst samples repository

Begin by cloning the Starburst repository using the code below.

$ git clone https://github.com/starburstdata/pystarburst-examples.git
$ cd apps/gradio/data_apps_burstbank
$ mv .env.template .env

Set environment variables

Edit your .env settings file supply the cluster url (found on the connection info section of the cluster in Galaxy) along with your OpenAI API Key.

export HOST="free-cluster-demo.galaxy.starburst.io"
export SB_USER="user@demo.com/accountadmin"
export OPEN_API_KEY="sk-addyourkeyhere"

Set Python virtual environment

Next setup the Python virtual environment

$ python3.11 -m venv .venv
$ source .venv/bin/activate
$ pip install -r requirements.txt

Run the app

Finally, you can run the app using the following command.

$ python app.py

Visiting http://127.0.0.1:7860 will bring up our application. Doing so will also open a login window to Galaxy so whoever is using your application will be doing so as themselves.

Gradio is a framework designed for making semi-production quality data applications. It has several built-in components allowing users to make moderately complex applications with ease.

The majority of the front end logic is housed in the app.py file. You can find more information about Gradio and its API on gradio.app.

app.py

import gradio as gr

import pandas as pd
pd.options.plotting.backend = "plotly"

from dotenv import load_dotenv
load_dotenv("./.env")
import env

from dataModels import Data
from mlModels import OpenAI

ML_MODELS = ['OpenAI ChatGPT']

# Settings
sb_host = env.HOST
open_ai_key = env.OPENAI_API_KEY
open_ai_model = env.OPENAI_MODEL

def main():
    # Load Local Modules
    my_data = Data()

    my_model = OpenAI(my_data)

    # Main Tab for the demo
    with gr.Blocks() as demo_tab:
        gr.Markdown(
        """
        ![Starburst](https://breshears.galaxy.starburst.io/images/galaxy-logo.svg)
        
        # 
        # A quick customer analysis using PyStarburst and an ML Model
        ## Source code is available on [Github](https://github.com/starburstdata/pystarburst-examples)
        
        First load the data frames
        """)

        # Filters
        seg = gr.Dropdown(my_data.segments, label='Segment', value=my_data.segments, allow_custom_value=True, multiselect=True)
        ris = gr.Dropdown(choices=my_data.risk_appetites, value = my_data.risk_appetites, label='Risk Appetite', allow_custom_value=True, multiselect=True)
        sta = gr.Dropdown(choices=my_data.states, value = my_data.states, label='State', allow_custom_value=True, multiselect=True)

        # Graphs & Tables
        gr.Markdown('# Number of Customer by Risk Appetite and State')
        plt = gr.BarPlot(x='state', y='count', tooltip=['state','count'], 
                         color='risk_appetite', y_title='Num of Customers', x_title='State')
        
        gr.Markdown('## Details')
        d = gr.DataFrame([], interactive=False, wrap=True)


        # Now some Gen AI
        gr.Markdown('Ask a question in naturual language on the data')

        model = gr.Dropdown(ML_MODELS, label='Model Type', value='OpenAI ChatGPT', allow_custom_value=False)
        
        question = gr.Textbox('Which 5 states have the highest risk appetite? Why?', label='Question')
        btn_ask = gr.Button('Ask')         

        response = gr.TextArea('', label='Response', lines=5, interactive=False)

        demo_tab.load(my_data.get_agg_data, [seg, ris, sta], [plt], queue=True)
        demo_tab.load(my_data.filter_accounts, [seg, ris, sta], [d], queue=True)

        btn_ask.click(fn=my_model.predict,
                inputs=[question],
                outputs=[response], queue=True)

        seg.change(my_data.get_agg_data, [seg, ris, sta], plt, queue=True)
        ris.change(my_data.get_agg_data, [seg, ris, sta], plt, queue=True)
        sta.change(my_data.get_agg_data, [seg, ris, sta], plt, queue=True)

        seg.change(my_data.filter_accounts, [seg, ris, sta], d, queue=True)
        ris.change(my_data.filter_accounts, [seg, ris, sta], d, queue=True)
        sta.change(my_data.filter_accounts, [seg, ris, sta], d, queue=True)

    # Query History
    with gr.Blocks() as query_tab:
            gr.Markdown('Query History')
            queries = gr.Dataframe([], type='array', interactive=False, wrap=True,
                                headers=['QueryID', 'QueryText'], datatype=['str', 'str'], label='Query History')

            btn = gr.Button('Refresh Query History')
            btn.click(my_data.get_queries, [], queries, queue=True, every=20)

    demo = gr.TabbedInterface([demo_tab, query_tab], ['Demo', 'Query History'], analytics_enabled=True).queue()
    demo.launch(server_name=env.BIND_HOST, server_port=env.PORT, share=env.SHARE, debug=env.DEBUG)


if __name__ == '__main__': main()

You'll notice we loaded a custom handler for managing data for our application. We've created a class to handle our most common tasks for future reuse in other applications.

In our dataModels.py we use PyStarburst to get data out of our Galaxy environment via creating Python DataFrames, pushing all the heavy lifting to Galaxy.

from pystarburst import Session, DataFrame
from pystarburst import functions as f
from pystarburst.functions import col

import trino

from dotenv import load_dotenv
load_dotenv("./.env")
import env

import pyarrow as pa
import pandas as pd

# Class to handle connection to Starburst and retrieve data
class Data():
    # Setup connection to Starburst
    host = env.HOST

    session_properties = {
        "host":host,
        "port": 443,
        # Needed for https secured clusters
        "http_scheme": "https",
        # Setup authentication through login or password or any other supported authentication methods
        # See docs: https://github.com/trinodb/trino-python-client#authentication-mechanisms
        "auth": trino.auth.OAuth2Authentication()
    }

    def __init__(self):
        # Setup a session, query history logger, and initial data frames
        if env.DEBUG: print("INFO: Data Init")
        self.session = Session.builder.configs(self.session_properties).create()

        self.query_history = self.session.query_history()
        self.queries_list = list()

        self.segments = ['silver', 'gold', 'platinum', 'bronze', 'diamond']
        self.risk_appetites = ['wild_west', 'conservative', 'low', 'high', 'medium']
        self.states = ['AK', 'AL', 'AR', 'AZ', 'CA', 'CO', 'CT', 'DE', 'FL', 'GA', 'HI', 'IA', 'ID', 'IL', 'IN', 'KS', 'KY', 'LA', 'MA', 'MD', 'ME', 'MI', 'MN', 'MO', 'MS', 'MT', 'NC', 'ND', 'NE', 'NH', 'NJ', 'NM', 'NV', 'NY', 'OH', 'OK', 'OR', 'PA', 'RI', 'SC', 'SD', 'TN', 'TX', 'UT', 'VA', 'VT', 'WA', 'WI', 'WV', 'WY']
        
        self.initialized = False

    def get_initial_data(self, do_agg = True):
        if env.DEBUG: print("INFO: Get Initial Data")

        # Load Tables
        self.df_account = self.session.table(f'{env.CATALOG}.{env.SCHEMA}.account')
        self.df_customer = self.session.table(f'{env.CATALOG}.{env.SCHEMA}.customer')
        self.df_customer_profile = self.session.table(f'{env.CATALOG}.{env.SCHEMA}.customer_profile')

        self.df_joined_account = self.df_account.join(self.df_customer, self.df_account['custkey'] == self.df_customer['custkey'])
        self.df_joined_account = self.df_joined_account.join(self.df_customer_profile, self.df_account['custkey'] == self.df_customer_profile['custkey'])

        if do_agg:
            self.get_agg_data(self.segments, self.risk_appetites, self.states)
        
        self.initialized = True
    
    def refresh_session(self):
        self.session = Session.builder.configs(self.session_properties).create()

    def save_settings(self, h):
        self.host = h
        self.refresh_session()

    def get_queries(self) -> list:
        if not self.initialized:
            self.get_initial_data(do_agg=False)
        
        return pd.DataFrame(self.query_history.queries)

    def filter_accounts(self, segments, risks, states) -> pd.DataFrame:
        # Get filtered account details
        print("INFO: Filter Accounts")
        if not self.initialized:
            self.get_initial_data(do_agg=False)
        
        if risks is None:
            risks = self.risk_appetites

        if states is None:
            states = self.states

        # Create a dataframe
        self.accounts = self.df_joined_account = self.df_joined_account.filter(col('customer_segment').in_(segments))\
        .filter(col('risk_appetite').in_(risks))\
        .filter(col('state').in_(states))

        # Execute in Galaxy convert to pandas
        return self.accounts.to_pandas()
    
    def get_agg_data(self, segments, risks, states) -> pd.DataFrame:
        # Get aggeragated summary data
        print("INFO: Get Agg Data")
        if not self.initialized:
            self.get_initial_data(do_agg=False)
        
        if risks is None:
            risks = self.risk_appetites

        if states is None:
            states = self.states

        # Create a dataframe
        self.df_summary = self.df_joined_account.filter(col('customer_segment').in_(segments))\
        .filter(col('risk_appetite').in_(risks))\
        .filter(col('state').in_(states))\
        .group_by('state', 'risk_appetite')\
        .count()\
        .sort(col('count').desc())

        # Execute in Galaxy convert to pandas
        return self.df_summary.to_pandas()
    
    def write_agg_data(self):
        if env.DEBUG: print("INFO: Write Agg Data")
        if not self.initialized:
            self.get_initial_data()
        self.session.sql("CREATE SCHEMA IF NOT EXISTS s3lakehouse.pystarburst_360_sum").collect()

        self.session.sql("DROP TABLE IF EXISTS s3lakehouse.pystarburst_360_sum.s360_summary").collect()

        self.df_summary.write.save_as_table(
            "s3lakehouse.pystarburst_360_sum.s360_summary",
        )
        new_table = self.session.table("s3lakehouse.pystarburst_360_sum.s360_summary")

        return new_table.to_pandas()

    def to_pyarrow(df: DataFrame) -> pa.Table:
        return pa.deserialize_pandas(df.to_pandas())

Finally, we want to add some natural language analysis to our application via OpenAI's ChatGPT. We've formatted a system prompt with some context (the aggregated data):

   def set_system_message(self):
        '''Set system message for OpenAI chatbot.'''
        
        message_data_type = 'table'
        message_data = tabulate(self.data_class.df_summary.to_pandas().rename(columns={'state': 'State', 'risk_appetite': 'Risk_Appetite', 'count': 'Count_of_Customers'}), headers='keys', tablefmt='outline', showindex=False)
        
        self.system_message = f'''You are an AI assistant who's purpose is to provide information on structured data.
                                The data formated as {message_data_type} is: 
                                {message_data}"'''

Now we can use the OpenAI class we created in mlModels.py to run predictions:

def predict(self, message, system_message = None):
        '''Predict response from OpenAI chatbot from the supplied question
        Args:  message: Question to ask the OpenAI chatbot
                system_message = None: System message for OpenAI chatbot'''
        
        if system_message is None:
            system_message = self.set_system_message()
        
        if system_message and self.system_message is None:
            raise Exception("System message is not defined. Please use set_system_message() method to set system message.")
        
        if env.DEBUG: print(self.system_message)

        response = openai.ChatCompletion.create(
            model=self.model,
            messages=[
                {
                "role": "system",
                "content": self.system_message
                },
                {
                "role": "user",
                "content": message
                }
            ],
            temperature=1,
            max_tokens=2048,
            top_p=1,
            frequency_penalty=0,
            presence_penalty=0,
            n = 3
        )
        responses = [i.message.content + "\n\n" for i in response.choices]
        if env.DEBUG: print(responses)
        return responses[0]

Here's the full mlModels.py file

import openai

import env

from dataModels import Data

import pandas as pd
pd.options.plotting.backend = "plotly"

import env

from tabulate import tabulate

class OpenAI():
    '''Class to handle OpenAI API calls
    Attributes
    ----------
        api_key : str
            OpenAI API key
        model : str
            OpenAI model name
        data_class : dataModels.Data
            Data class for the model
        system_message : str
            System message for OpenAI chatbot
        
    Methods
    -------
        get_models()
            Get list of OpenAI models
        save_settings(api_key, model)
            Save settings for the session.
        set_system_message()
            Set system message for OpenAI chatbot.
        predict(message, system_message = None)
            Predict response from OpenAI chatbot from the supplied question'''

    def __init__(self, data_class: Data, model = None, api_key = None) -> None:
        '''Initialize OpenAI class with data class and model name
        Args:  data_class : dataModels.Data class
                model: OpenAI model name = None
                api_key: OpenAI API key = None
        '''
        if api_key is not None:
            self.api_key = api_key
        else:
            self.api_key = env.OPENAI_API_KEY
        
        openai.api_key = self.api_key
        
        if model is None:
            self.model = env.OPENAI_MODEL
        else:
            self.model = model
    
        self.data_class = data_class

        self.models = self.get_models()

    def get_models(self):
        '''Get list of OpenAI models'''
        models = openai.Model.list(self.api_key)
        print(models)
        return models

    def save_settings(self, api_key: str, model: str):
        '''Save settings for the session.
        Args:  api_key: OpenAI API key
                model: OpenAI model name'''
        self.api_key = api_key
        self.model = model

    def set_system_message(self):
        '''Set system message for OpenAI chatbot.'''
        
        message_data_type = 'table'
        message_data = tabulate(self.data_class.df_summary.to_pandas().rename(columns={'state': 'State', 'risk_appetite': 'Risk_Appetite', 'count': 'Count_of_Customers'}), headers='keys', tablefmt='outline', showindex=False)
        
        self.system_message = f'''You are an AI assistant who's purpose is to provide information on structured data.
                                The data formated as {message_data_type} is: 
                                {message_data}"'''

    def predict(self, message, system_message = None):
        '''Predict response from OpenAI chatbot from the supplied question
        Args:  message: Question to ask the OpenAI chatbot
                system_message = None: System message for OpenAI chatbot'''
        
        if system_message is None:
            system_message = self.set_system_message()
        
        if system_message and self.system_message is None:
            raise Exception("System message is not defined. Please use set_system_message() method to set system message.")
        
        if env.DEBUG: print(self.system_message)

        response = openai.ChatCompletion.create(
            model=self.model,
            messages=[
                {
                "role": "system",
                "content": self.system_message
                },
                {
                "role": "user",
                "content": message
                }
            ],
            temperature=1,
            max_tokens=2048,
            top_p=1,
            frequency_penalty=0,
            presence_penalty=0,
            n = 3
        )
        responses = [i.message.content + "\n\n" for i in response.choices]
        if env.DEBUG: print(responses)
        return responses[0]

Tutorial complete

Congratulations! You have reached the end of this tutorial, meaning you have successfully built and deployed a customer analysis application using Galaxy and PyStarburst.

Next steps

Want to see more Starburst powered data applications in action? See how Vectra is paving the way in cybersecurity with their AI-driven threat detection and prevention platform, powered by Starburst Galaxy.

Other Tutorials

Visit the Tutorials section [link] to view the full list of tutorials and keep moving forward on your journey!

Start Free with
Starburst Galaxy

Up to $500 in usage credits included

  • Query your data lake fast with Starburst's best-in-class MPP SQL query engine
  • Get up and running in less than 5 minutes
  • Easily deploy clusters in AWS, Azure and Google Cloud
For more deployment options:
Download Starburst Enterprise

Please fill in all required fields and ensure you are using a valid email address.