Skip to content

How to Connect Streamlit to Supabase

A step-by-step guide for integrating Streamlit applications with Supabase database

Prerequisites

  • Python environment with Streamlit installed
  • Supabase project with database access
  • Basic understanding of SQL and Python

Required Dependencies

Install the necessary Python packages:

pip install streamlit supabase python-dotenv

Step 1: Set Up Environment Variables

Create a .env file in your project root:

SUPABASE_URL=https://your-project-id.supabase.co
SUPABASE_KEY=your-anon-public-key

Security Note: Never commit your .env file to version control. Add it to your .gitignore.

Step 2: Initialize Supabase Client

Create a connection helper file (supabase_client.py):

import os
from supabase import create_client, Client
from dotenv import load_dotenv

# Load environment variables
load_dotenv()

# Initialize Supabase client
def get_supabase_client() -> Client:
    url = os.getenv("SUPABASE_URL")
    key = os.getenv("SUPABASE_KEY")

    if not url or not key:
        raise ValueError("Missing Supabase credentials in environment variables")

    supabase = create_client(url, key)
    return supabase

Step 3: Basic Streamlit Integration

Create your main Streamlit app (app.py):

import streamlit as st
import pandas as pd
from supabase_client import get_supabase_client

# Initialize Supabase client
supabase = get_supabase_client()

st.title("Streamlit + Supabase Integration")

# Example: Read data from a table
@st.cache_data(ttl=300)  # Cache for 5 minutes
def load_data(table_name):
    try:
        response = supabase.table(table_name).select("*").execute()
        return pd.DataFrame(response.data)
    except Exception as e:
        st.error(f"Error loading data: {e}")
        return pd.DataFrame()

# Display data
if st.button("Load Data"):
    data = load_data("your_table_name")
    if not data.empty:
        st.dataframe(data)
    else:
        st.warning("No data found or error occurred")

Step 4: CRUD Operations

Create (Insert) Data

def insert_record(table_name, data):
    try:
        response = supabase.table(table_name).insert(data).execute()
        return response.data
    except Exception as e:
        st.error(f"Error inserting data: {e}")
        return None

# Example form for inserting data
with st.form("insert_form"):
    name = st.text_input("Name")
    email = st.text_input("Email")

    if st.form_submit_button("Add Record"):
        if name and email:
            result = insert_record("users", {"name": name, "email": email})
            if result:
                st.success("Record added successfully!")
                st.experimental_rerun()  # Refresh the app

Update Data

def update_record(table_name, record_id, updates):
    try:
        response = supabase.table(table_name).update(updates).eq("id", record_id).execute()
        return response.data
    except Exception as e:
        st.error(f"Error updating data: {e}")
        return None

Delete Data

def delete_record(table_name, record_id):
    try:
        response = supabase.table(table_name).delete().eq("id", record_id).execute()
        return response.data
    except Exception as e:
        st.error(f"Error deleting data: {e}")
        return None

Step 5: Advanced Features

Real-time Subscriptions

def setup_realtime():
    # Note: Real-time features require additional setup
    # and may need websocket connections
    pass

File Storage Integration

def upload_file(bucket_name, file_path, file_data):
    try:
        response = supabase.storage.from_(bucket_name).upload(file_path, file_data)
        return response
    except Exception as e:
        st.error(f"Error uploading file: {e}")
        return None

# Example file upload
uploaded_file = st.file_uploader("Choose a file")
if uploaded_file is not None:
    file_data = uploaded_file.getvalue()
    result = upload_file("documents", f"uploads/{uploaded_file.name}", file_data)
    if result:
        st.success("File uploaded successfully!")

Step 6: Error Handling and Logging

import logging

# Set up logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

def safe_database_operation(operation_func, *args, **kwargs):
    try:
        return operation_func(*args, **kwargs)
    except Exception as e:
        logger.error(f"Database operation failed: {e}")
        st.error("An error occurred while processing your request.")
        return None

Best Practices

1. Connection Management

  • Use connection pooling for production applications
  • Implement proper error handling for network issues
  • Cache frequently accessed data

2. Security

  • Use Row Level Security (RLS) in Supabase
  • Validate all user inputs
  • Use service role keys only for server-side operations

3. Performance

# Use st.cache_data for expensive operations
@st.cache_data(ttl=600)  # Cache for 10 minutes
def fetch_analytics_data():
    return supabase.table("analytics").select("*").execute().data

# Implement pagination for large datasets
def paginated_query(table_name, page=0, page_size=50):
    offset = page * page_size
    response = supabase.table(table_name).select("*").range(offset, offset + page_size - 1).execute()
    return response.data

4. Environment Configuration

# Different configs for different environments
import os

def get_config():
    env = os.getenv("ENVIRONMENT", "development")

    configs = {
        "development": {
            "debug": True,
            "cache_ttl": 60
        },
        "production": {
            "debug": False,
            "cache_ttl": 300
        }
    }

    return configs.get(env, configs["development"])

Common Issues and Solutions

1. Authentication Errors

  • Verify your Supabase URL and keys
  • Check if your table policies allow the operation
  • Ensure RLS is properly configured

2. Connection Timeouts

  • Implement retry logic
  • Use connection pooling
  • Check network connectivity

3. Data Type Mismatches

  • Validate data types before insertion
  • Use proper PostgreSQL data types in your schema

Example: Complete Mini App

import streamlit as st
import pandas as pd
from supabase_client import get_supabase_client

# Initialize
supabase = get_supabase_client()
st.set_page_config(page_title="Task Manager", layout="wide")

st.title("📋 Task Manager")

# Sidebar for adding tasks
with st.sidebar:
    st.header("Add New Task")
    task_name = st.text_input("Task Name")
    task_description = st.text_area("Description")
    priority = st.selectbox("Priority", ["Low", "Medium", "High"])

    if st.button("Add Task"):
        if task_name:
            data = {
                "name": task_name,
                "description": task_description,
                "priority": priority,
                "completed": False
            }

            try:
                supabase.table("tasks").insert(data).execute()
                st.success("Task added!")
                st.experimental_rerun()
            except Exception as e:
                st.error(f"Error: {e}")

# Main area for displaying tasks
@st.cache_data(ttl=30)
def load_tasks():
    response = supabase.table("tasks").select("*").order("created_at", desc=True).execute()
    return pd.DataFrame(response.data)

tasks_df = load_tasks()

if not tasks_df.empty:
    st.dataframe(
        tasks_df,
        use_container_width=True,
        hide_index=True
    )
else:
    st.info("No tasks found. Add some tasks using the sidebar!")

Data Management Best Practices

Connection Pooling

For production applications handling multiple concurrent users:

import streamlit as st
from supabase import create_client

# Use session state to maintain connection
if 'supabase' not in st.session_state:
    st.session_state.supabase = get_supabase_client()

supabase = st.session_state.supabase

Batch Operations

When dealing with large datasets:

def batch_insert(table_name, records, batch_size=100):
    """Insert records in batches to avoid timeout issues"""
    for i in range(0, len(records), batch_size):
        batch = records[i:i + batch_size]
        try:
            supabase.table(table_name).insert(batch).execute()
        except Exception as e:
            st.error(f"Batch {i//batch_size + 1} failed: {e}")
            return False
    return True

Conclusion

This guide provides a foundation for connecting Streamlit to Supabase. The combination offers:

  • Rapid Development: Quick setup and deployment
  • Real-time Data: Live updates and synchronization
  • Scalability: PostgreSQL backend with modern API layer
  • Security: Built-in authentication and authorization

For production applications, consider implementing additional security measures, error handling, and performance optimizations based on your specific requirements.


For more advanced integrations, refer to the Supabase Python documentation and Streamlit documentation.