How to Connect Streamlit to Supabase¶
A step-by-step guide for integrating Streamlit applications with Supabase database
Prerequisites¶
- Python environment with Streamlit installed
- Supabase project with database access
- Basic understanding of SQL and Python
Required Dependencies¶
Install the necessary Python packages:
Step 1: Set Up Environment Variables¶
Create a .env file in your project root:
Security Note: Never commit your .env file to version control. Add it to your .gitignore.
Step 2: Initialize Supabase Client¶
Create a connection helper file (supabase_client.py):
import os
from supabase import create_client, Client
from dotenv import load_dotenv
# Load environment variables
load_dotenv()
# Initialize Supabase client
def get_supabase_client() -> Client:
url = os.getenv("SUPABASE_URL")
key = os.getenv("SUPABASE_KEY")
if not url or not key:
raise ValueError("Missing Supabase credentials in environment variables")
supabase = create_client(url, key)
return supabase
Step 3: Basic Streamlit Integration¶
Create your main Streamlit app (app.py):
import streamlit as st
import pandas as pd
from supabase_client import get_supabase_client
# Initialize Supabase client
supabase = get_supabase_client()
st.title("Streamlit + Supabase Integration")
# Example: Read data from a table
@st.cache_data(ttl=300) # Cache for 5 minutes
def load_data(table_name):
try:
response = supabase.table(table_name).select("*").execute()
return pd.DataFrame(response.data)
except Exception as e:
st.error(f"Error loading data: {e}")
return pd.DataFrame()
# Display data
if st.button("Load Data"):
data = load_data("your_table_name")
if not data.empty:
st.dataframe(data)
else:
st.warning("No data found or error occurred")
Step 4: CRUD Operations¶
Create (Insert) Data¶
def insert_record(table_name, data):
try:
response = supabase.table(table_name).insert(data).execute()
return response.data
except Exception as e:
st.error(f"Error inserting data: {e}")
return None
# Example form for inserting data
with st.form("insert_form"):
name = st.text_input("Name")
email = st.text_input("Email")
if st.form_submit_button("Add Record"):
if name and email:
result = insert_record("users", {"name": name, "email": email})
if result:
st.success("Record added successfully!")
st.experimental_rerun() # Refresh the app
Update Data¶
def update_record(table_name, record_id, updates):
try:
response = supabase.table(table_name).update(updates).eq("id", record_id).execute()
return response.data
except Exception as e:
st.error(f"Error updating data: {e}")
return None
Delete Data¶
def delete_record(table_name, record_id):
try:
response = supabase.table(table_name).delete().eq("id", record_id).execute()
return response.data
except Exception as e:
st.error(f"Error deleting data: {e}")
return None
Step 5: Advanced Features¶
Real-time Subscriptions¶
def setup_realtime():
# Note: Real-time features require additional setup
# and may need websocket connections
pass
File Storage Integration¶
def upload_file(bucket_name, file_path, file_data):
try:
response = supabase.storage.from_(bucket_name).upload(file_path, file_data)
return response
except Exception as e:
st.error(f"Error uploading file: {e}")
return None
# Example file upload
uploaded_file = st.file_uploader("Choose a file")
if uploaded_file is not None:
file_data = uploaded_file.getvalue()
result = upload_file("documents", f"uploads/{uploaded_file.name}", file_data)
if result:
st.success("File uploaded successfully!")
Step 6: Error Handling and Logging¶
import logging
# Set up logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def safe_database_operation(operation_func, *args, **kwargs):
try:
return operation_func(*args, **kwargs)
except Exception as e:
logger.error(f"Database operation failed: {e}")
st.error("An error occurred while processing your request.")
return None
Best Practices¶
1. Connection Management¶
- Use connection pooling for production applications
- Implement proper error handling for network issues
- Cache frequently accessed data
2. Security¶
- Use Row Level Security (RLS) in Supabase
- Validate all user inputs
- Use service role keys only for server-side operations
3. Performance¶
# Use st.cache_data for expensive operations
@st.cache_data(ttl=600) # Cache for 10 minutes
def fetch_analytics_data():
return supabase.table("analytics").select("*").execute().data
# Implement pagination for large datasets
def paginated_query(table_name, page=0, page_size=50):
offset = page * page_size
response = supabase.table(table_name).select("*").range(offset, offset + page_size - 1).execute()
return response.data
4. Environment Configuration¶
# Different configs for different environments
import os
def get_config():
env = os.getenv("ENVIRONMENT", "development")
configs = {
"development": {
"debug": True,
"cache_ttl": 60
},
"production": {
"debug": False,
"cache_ttl": 300
}
}
return configs.get(env, configs["development"])
Common Issues and Solutions¶
1. Authentication Errors¶
- Verify your Supabase URL and keys
- Check if your table policies allow the operation
- Ensure RLS is properly configured
2. Connection Timeouts¶
- Implement retry logic
- Use connection pooling
- Check network connectivity
3. Data Type Mismatches¶
- Validate data types before insertion
- Use proper PostgreSQL data types in your schema
Example: Complete Mini App¶
import streamlit as st
import pandas as pd
from supabase_client import get_supabase_client
# Initialize
supabase = get_supabase_client()
st.set_page_config(page_title="Task Manager", layout="wide")
st.title("📋 Task Manager")
# Sidebar for adding tasks
with st.sidebar:
st.header("Add New Task")
task_name = st.text_input("Task Name")
task_description = st.text_area("Description")
priority = st.selectbox("Priority", ["Low", "Medium", "High"])
if st.button("Add Task"):
if task_name:
data = {
"name": task_name,
"description": task_description,
"priority": priority,
"completed": False
}
try:
supabase.table("tasks").insert(data).execute()
st.success("Task added!")
st.experimental_rerun()
except Exception as e:
st.error(f"Error: {e}")
# Main area for displaying tasks
@st.cache_data(ttl=30)
def load_tasks():
response = supabase.table("tasks").select("*").order("created_at", desc=True).execute()
return pd.DataFrame(response.data)
tasks_df = load_tasks()
if not tasks_df.empty:
st.dataframe(
tasks_df,
use_container_width=True,
hide_index=True
)
else:
st.info("No tasks found. Add some tasks using the sidebar!")
Data Management Best Practices¶
Connection Pooling¶
For production applications handling multiple concurrent users:
import streamlit as st
from supabase import create_client
# Use session state to maintain connection
if 'supabase' not in st.session_state:
st.session_state.supabase = get_supabase_client()
supabase = st.session_state.supabase
Batch Operations¶
When dealing with large datasets:
def batch_insert(table_name, records, batch_size=100):
"""Insert records in batches to avoid timeout issues"""
for i in range(0, len(records), batch_size):
batch = records[i:i + batch_size]
try:
supabase.table(table_name).insert(batch).execute()
except Exception as e:
st.error(f"Batch {i//batch_size + 1} failed: {e}")
return False
return True
Conclusion¶
This guide provides a foundation for connecting Streamlit to Supabase. The combination offers:
- Rapid Development: Quick setup and deployment
- Real-time Data: Live updates and synchronization
- Scalability: PostgreSQL backend with modern API layer
- Security: Built-in authentication and authorization
For production applications, consider implementing additional security measures, error handling, and performance optimizations based on your specific requirements.
For more advanced integrations, refer to the Supabase Python documentation and Streamlit documentation.