Phase 2: Productionalize - Implementation Guide¶
Overview¶
Phase 2 transforms successful experiments into reliable, production-ready data products. This phase adds monitoring, error handling, proper data contracts, and operational excellence while maintaining the agility of Streamlit development.
When to Graduate to Phase 2¶
Graduate from Phase 1 when:
✅ Regular usage by target audience (>5 users)
✅ Stable data requirements identified
✅ Clear business value demonstrated
✅ Basic functionality proven
Stay in Phase 2 when:
✅ Tool serves internal users effectively
✅ Streamlit UI meets all requirements
✅ Performance and reliability are acceptable
✅ Integration needs are minimal
Technology Stack Enhancement¶
graph TB
subgraph "Phase 2 Production Stack"
ST[Streamlit Application<br/>Enhanced with Production Features]
RW[Railway Deployment<br/>+ Monitoring & Alerting]
SB[Supabase Analytics Schema<br/>+ Data Contracts]
LOG[Logging & Metrics]
AUTH[Authentication & Access Control]
end
subgraph "External Integrations"
API[External APIs<br/>Error Handling & Retry]
FILES[File Storage<br/>Supabase Storage]
SCHED[Scheduled Jobs<br/>Background Processing]
end
ST --> RW
ST --> SB
ST --> LOG
ST --> AUTH
ST --> API
ST --> FILES
RW --> SCHED
style ST fill:#90EE90
style RW fill:#87CEEB
style SB fill:#F0E68C
Implementation Checklist¶
Data Architecture¶
1. Schema Migration (Sandbox → Analytics)¶
-- Create production tables in analytics schema
CREATE SCHEMA IF NOT EXISTS analytics;
-- Migrate experimental table with proper constraints
CREATE TABLE analytics.margin_reports (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
updated_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
-- Business data
account_number TEXT NOT NULL,
report_date DATE NOT NULL,
extracted_data JSONB NOT NULL,
-- Quality metadata
extraction_confidence NUMERIC CHECK (extraction_confidence >= 0 AND extraction_confidence <= 1),
data_quality_score NUMERIC CHECK (data_quality_score >= 0 AND data_quality_score <= 1),
validation_status TEXT CHECK (validation_status IN ('pending', 'approved', 'rejected')),
-- Audit trail
created_by TEXT NOT NULL,
processing_time_ms INTEGER,
source_file_name TEXT,
-- Constraints
UNIQUE(account_number, report_date)
);
-- Add indexes for performance
CREATE INDEX idx_margin_reports_account ON analytics.margin_reports(account_number);
CREATE INDEX idx_margin_reports_date ON analytics.margin_reports(report_date);
CREATE INDEX idx_margin_reports_created ON analytics.margin_reports(created_at);
-- Row Level Security
ALTER TABLE analytics.margin_reports ENABLE ROW LEVEL SECURITY;
-- Grant permissions to application role
GRANT SELECT, INSERT, UPDATE ON analytics.margin_reports TO streamlit_user;
2. Data Contracts & Validation¶
from pydantic import BaseModel, validator
from typing import Optional, Dict, Any
from datetime import date, datetime
class MarginReportData(BaseModel):
account_number: str
report_date: date
extracted_data: Dict[str, Any]
extraction_confidence: float
created_by: str
source_file_name: Optional[str] = None
@validator('extraction_confidence')
def confidence_must_be_valid(cls, v):
if not 0 <= v <= 1:
raise ValueError('Extraction confidence must be between 0 and 1')
return v
@validator('account_number')
def account_must_be_valid(cls, v):
if not v or len(v.strip()) == 0:
raise ValueError('Account number cannot be empty')
return v.strip().upper()
# Usage in Streamlit app
def save_margin_data(data: dict) -> bool:
try:
# Validate data
validated_data = MarginReportData(**data)
# Save to database
result = supabase.table('analytics.margin_reports').insert(
validated_data.dict()
).execute()
return True
except ValidationError as e:
st.error(f"Data validation failed: {e}")
return False
except Exception as e:
st.error(f"Database error: {e}")
logger.error(f"Failed to save margin data: {e}")
return False
Application Architecture¶
1. Enhanced Streamlit Application Structure¶
production-app/
├── app.py # Main application entry point
├── pages/ # Multi-page application
│ ├── 01_📤_Upload.py
│ ├── 02_📊_Analysis.py
│ └── 03_⚙️_Settings.py
├── utils/ # Business logic modules
│ ├── __init__.py
│ ├── database.py # Database operations
│ ├── processing.py # Data processing logic
│ ├── validation.py # Data validation
│ └── auth.py # Authentication helpers
├── config/
│ ├── settings.py # Application configuration
│ └── logging.py # Logging configuration
├── requirements.txt
├── railway.toml
├── Dockerfile # Optional: custom container
└── tests/ # Unit tests
├── test_processing.py
└── test_validation.py
2. Main Application (app.py)¶
import streamlit as st
import logging
from utils.auth import check_authentication
from utils.database import init_database
from config.logging import setup_logging
from config.settings import get_settings
# Configure application
st.set_page_config(
page_title="MarginIQ Production",
page_icon="💼",
layout="wide",
initial_sidebar_state="expanded"
)
# Initialize logging
setup_logging()
logger = logging.getLogger(__name__)
# Initialize database connection
@st.cache_resource
def init_app():
settings = get_settings()
db = init_database(settings.database_url, settings.database_key)
return db, settings
def main():
try:
# Authentication check
if not check_authentication():
st.error("Authentication required")
st.stop()
# Initialize app resources
db, settings = init_app()
# Store in session state for pages
st.session_state.db = db
st.session_state.settings = settings
# Application header
st.title("💼 MarginIQ Production")
# Health check indicator
with st.container():
col1, col2, col3 = st.columns([2, 1, 1])
with col2:
if st.button("🔍 Health Check"):
health_status = check_system_health(db)
if health_status['healthy']:
st.success("System operational")
else:
st.error(f"System issues: {health_status['issues']}")
# Navigation handled by Streamlit pages
except Exception as e:
logger.error(f"Application error: {e}")
st.error("Application error occurred. Please try again or contact support.")
def check_system_health(db) -> dict:
"""Check system health status"""
try:
# Test database connection
result = db.table('analytics.margin_reports').select("id").limit(1).execute()
# Test external dependencies
# Add checks for external APIs, file storage, etc.
return {'healthy': True, 'issues': []}
except Exception as e:
return {'healthy': False, 'issues': [str(e)]}
if __name__ == "__main__":
main()
3. Database Operations (utils/database.py)¶
from supabase import create_client
import streamlit as st
import logging
from typing import Dict, List, Optional, Any
from datetime import datetime, timedelta
logger = logging.getLogger(__name__)
class DatabaseManager:
def __init__(self, url: str, key: str):
self.client = create_client(url, key)
def save_margin_report(self, data: Dict[str, Any]) -> Optional[str]:
"""Save margin report with error handling and logging"""
try:
# Add metadata
data['created_at'] = datetime.utcnow().isoformat()
data['processing_start'] = datetime.utcnow()
result = self.client.table('analytics.margin_reports').insert(data).execute()
if result.data:
report_id = result.data[0]['id']
logger.info(f"Margin report saved successfully: {report_id}")
return report_id
else:
logger.error("Failed to save margin report: No data returned")
return None
except Exception as e:
logger.error(f"Database error saving margin report: {e}")
# Don't re-raise in production to prevent app crashes
return None
def get_recent_reports(self, limit: int = 50) -> List[Dict[str, Any]]:
"""Get recent reports with error handling"""
try:
result = self.client.table('analytics.margin_reports')\
.select("*")\
.order('created_at', desc=True)\
.limit(limit)\
.execute()
return result.data or []
except Exception as e:
logger.error(f"Error fetching recent reports: {e}")
return []
def get_report_analytics(self, days: int = 30) -> Dict[str, Any]:
"""Get analytics data for dashboard"""
try:
# Date range filter
start_date = (datetime.utcnow() - timedelta(days=days)).isoformat()
result = self.client.table('analytics.margin_reports')\
.select("account_number, report_date, extraction_confidence, data_quality_score")\
.gte('created_at', start_date)\
.execute()
data = result.data or []
# Calculate analytics
analytics = {
'total_reports': len(data),
'unique_accounts': len(set(r['account_number'] for r in data)),
'avg_confidence': sum(r.get('extraction_confidence', 0) for r in data) / len(data) if data else 0,
'avg_quality': sum(r.get('data_quality_score', 0) for r in data) / len(data) if data else 0
}
return analytics
except Exception as e:
logger.error(f"Error calculating analytics: {e}")
return {'error': str(e)}
@st.cache_resource
def init_database(url: str, key: str) -> DatabaseManager:
return DatabaseManager(url, key)
Monitoring & Alerting¶
1. Logging Configuration (config/logging.py)¶
import logging
import streamlit as st
from typing import Dict, Any
import json
from datetime import datetime
class StreamlitLogHandler(logging.Handler):
"""Custom handler that can display logs in Streamlit sidebar"""
def __init__(self):
super().__init__()
self.logs = []
def emit(self, record):
log_entry = {
'timestamp': datetime.utcnow().isoformat(),
'level': record.levelname,
'message': record.getMessage(),
'module': record.module
}
self.logs.append(log_entry)
# Keep only last 100 logs in memory
if len(self.logs) > 100:
self.logs = self.logs[-100:]
def setup_logging():
"""Configure application logging"""
# Create custom handler
streamlit_handler = StreamlitLogHandler()
# Configure root logger
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.StreamHandler(), # Console output
streamlit_handler # Streamlit display
]
)
# Store handler in session state for access
st.session_state.log_handler = streamlit_handler
def display_recent_logs():
"""Display recent logs in Streamlit sidebar"""
if 'log_handler' in st.session_state:
handler = st.session_state.log_handler
with st.sidebar.expander("📋 Recent Logs", expanded=False):
for log in handler.logs[-10:]: # Show last 10 logs
level_color = {
'ERROR': '🔴',
'WARNING': '🟡',
'INFO': '🟢',
'DEBUG': '🔵'
}.get(log['level'], '⚪')
st.text(f"{level_color} {log['timestamp'][:19]}")
st.text(f" {log['message']}")
2. Performance Monitoring¶
import time
import streamlit as st
from functools import wraps
from typing import Callable, Any
def monitor_performance(operation_name: str):
"""Decorator to monitor function performance"""
def decorator(func: Callable) -> Callable:
@wraps(func)
def wrapper(*args, **kwargs) -> Any:
start_time = time.time()
try:
result = func(*args, **kwargs)
execution_time = time.time() - start_time
# Log performance metrics
logger.info(f"Performance: {operation_name} took {execution_time:.2f}s")
# Store in session state for dashboard
if 'performance_metrics' not in st.session_state:
st.session_state.performance_metrics = []
st.session_state.performance_metrics.append({
'operation': operation_name,
'duration': execution_time,
'timestamp': datetime.utcnow(),
'status': 'success'
})
return result
except Exception as e:
execution_time = time.time() - start_time
logger.error(f"Performance: {operation_name} failed after {execution_time:.2f}s: {e}")
# Record failed operation
if 'performance_metrics' not in st.session_state:
st.session_state.performance_metrics = []
st.session_state.performance_metrics.append({
'operation': operation_name,
'duration': execution_time,
'timestamp': datetime.utcnow(),
'status': 'error',
'error': str(e)
})
raise
return wrapper
return decorator
# Usage example
@monitor_performance("PDF Processing")
def process_pdf(file_data: bytes) -> dict:
# PDF processing logic
pass
def display_performance_metrics():
"""Display performance dashboard"""
if 'performance_metrics' not in st.session_state:
return
metrics = st.session_state.performance_metrics[-50:] # Last 50 operations
col1, col2, col3 = st.columns(3)
with col1:
success_count = sum(1 for m in metrics if m['status'] == 'success')
st.metric("Success Rate", f"{success_count/len(metrics)*100:.1f}%" if metrics else "N/A")
with col2:
avg_duration = sum(m['duration'] for m in metrics) / len(metrics) if metrics else 0
st.metric("Avg Duration", f"{avg_duration:.2f}s")
with col3:
error_count = sum(1 for m in metrics if m['status'] == 'error')
st.metric("Error Count", error_count)
Deployment Configuration¶
Railway Configuration (railway.toml)¶
[build]
builder = "dockerfile"
[deploy]
startCommand = "streamlit run app.py --server.port=8501 --server.address=0.0.0.0 --server.headless=true --server.enableCORS=false --server.enableXsrfProtection=false"
# Health check endpoint
healthcheckPath = "/_stcore/health"
healthcheckTimeout = 10
restartPolicyType = "on-failure"
restartPolicyMaxRetries = 3
# Resource limits for production
[[resources]]
cpu = 1000 # 1 CPU
memory = 2048 # 2GB RAM
# Persistent volume for file storage
[[volumes]]
name = "margin-iq-production-data"
mountPath = "/app/data"
# Environment variables (set via Railway dashboard)
# SUPABASE_URL
# SUPABASE_SERVICE_ROLE_KEY
# STREAMLIT_SERVER_HEADLESS=true
# LOG_LEVEL=INFO
Dockerfile (Optional)¶
FROM python:3.11-slim
# Set working directory
WORKDIR /app
# Install system dependencies
RUN apt-get update && apt-get install -y \
curl \
tesseract-ocr \
poppler-utils \
&& rm -rf /var/lib/apt/lists/*
# Copy requirements and install
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
# Copy application code
COPY . .
# Create data directory
RUN mkdir -p /app/data && chmod 755 /app/data
# Health check
HEALTHCHECK --interval=30s --timeout=10s --start-period=5s --retries=3 \
CMD curl -f http://localhost:8501/_stcore/health || exit 1
# Run application
CMD ["streamlit", "run", "app.py", "--server.port=8501", "--server.address=0.0.0.0", "--server.headless=true"]
Operations & Maintenance¶
Daily Operations Checklist¶
- Check application health status
- Review error logs for issues
- Monitor performance metrics
- Verify data quality scores
- Check user feedback/issues
Weekly Operations¶
- Review usage analytics
- Update documentation
- Performance optimization review
- Security update check
- Backup verification
Monthly Operations¶
- User satisfaction survey
- Cost analysis and optimization
- Feature usage analysis
- Architecture review
- Consider Phase 3 graduation
Success Metrics for Phase 2¶
Reliability Metrics: - Uptime > 99.5% - Error rate < 1% - Mean response time < 3 seconds
User Metrics:
- Daily active users > 10
- User satisfaction > 4.0/5.0
- Feature adoption rate > 60%
Business Metrics: - Time savings quantified - Process efficiency improvement - User productivity increase
Common Phase 2 Enhancements¶
Authentication & Authorization¶
def check_user_access(required_role: str = None) -> bool:
"""Check if user has required access level"""
user_info = st.session_state.get('user_info')
if not user_info:
return False
if required_role and user_info.get('role') != required_role:
return False
return True
# Usage in pages
if not check_user_access('analyst'):
st.error("Insufficient permissions")
st.stop()
Data Export Capabilities¶
@monitor_performance("Data Export")
def export_data(data: pd.DataFrame, format: str) -> bytes:
"""Export data in various formats"""
if format == 'csv':
return data.to_csv(index=False).encode()
elif format == 'excel':
buffer = io.BytesIO()
data.to_excel(buffer, index=False)
return buffer.getvalue()
elif format == 'parquet':
buffer = io.BytesIO()
data.to_parquet(buffer, index=False)
return buffer.getvalue()
Scheduled Background Jobs¶
# Background job for data refresh (separate Railway service)
import schedule
import time
def refresh_analytics_data():
"""Refresh materialized views and cache"""
try:
# Refresh logic here
logger.info("Analytics data refreshed successfully")
except Exception as e:
logger.error(f"Failed to refresh analytics: {e}")
# Schedule jobs
schedule.every(1).hour.do(refresh_analytics_data)
schedule.every().day.at("06:00").do(refresh_analytics_data)
# Run scheduler
while True:
schedule.run_pending()
time.sleep(60)
This comprehensive Phase 2 guide transforms experiments into production-ready applications while maintaining development agility.