Skip to content

Phase 2: Productionalize - Implementation Guide

Overview

Phase 2 transforms successful experiments into reliable, production-ready data products. This phase adds monitoring, error handling, proper data contracts, and operational excellence while maintaining the agility of Streamlit development.

When to Graduate to Phase 2

Graduate from Phase 1 when: ✅ Regular usage by target audience (>5 users)
✅ Stable data requirements identified
✅ Clear business value demonstrated
✅ Basic functionality proven

Stay in Phase 2 when: ✅ Tool serves internal users effectively
✅ Streamlit UI meets all requirements
✅ Performance and reliability are acceptable
✅ Integration needs are minimal

Technology Stack Enhancement

graph TB
    subgraph "Phase 2 Production Stack"
        ST[Streamlit Application<br/>Enhanced with Production Features]
        RW[Railway Deployment<br/>+ Monitoring & Alerting]
        SB[Supabase Analytics Schema<br/>+ Data Contracts]
        LOG[Logging & Metrics]
        AUTH[Authentication & Access Control]
    end

    subgraph "External Integrations"
        API[External APIs<br/>Error Handling & Retry]
        FILES[File Storage<br/>Supabase Storage]
        SCHED[Scheduled Jobs<br/>Background Processing]
    end

    ST --> RW
    ST --> SB
    ST --> LOG
    ST --> AUTH
    ST --> API
    ST --> FILES
    RW --> SCHED

    style ST fill:#90EE90
    style RW fill:#87CEEB
    style SB fill:#F0E68C

Implementation Checklist

Data Architecture

1. Schema Migration (Sandbox → Analytics)

-- Create production tables in analytics schema
CREATE SCHEMA IF NOT EXISTS analytics;

-- Migrate experimental table with proper constraints
CREATE TABLE analytics.margin_reports (
    id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
    updated_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),

    -- Business data
    account_number TEXT NOT NULL,
    report_date DATE NOT NULL,
    extracted_data JSONB NOT NULL,

    -- Quality metadata  
    extraction_confidence NUMERIC CHECK (extraction_confidence >= 0 AND extraction_confidence <= 1),
    data_quality_score NUMERIC CHECK (data_quality_score >= 0 AND data_quality_score <= 1),
    validation_status TEXT CHECK (validation_status IN ('pending', 'approved', 'rejected')),

    -- Audit trail
    created_by TEXT NOT NULL,
    processing_time_ms INTEGER,
    source_file_name TEXT,

    -- Constraints
    UNIQUE(account_number, report_date)
);

-- Add indexes for performance
CREATE INDEX idx_margin_reports_account ON analytics.margin_reports(account_number);
CREATE INDEX idx_margin_reports_date ON analytics.margin_reports(report_date);
CREATE INDEX idx_margin_reports_created ON analytics.margin_reports(created_at);

-- Row Level Security
ALTER TABLE analytics.margin_reports ENABLE ROW LEVEL SECURITY;

-- Grant permissions to application role
GRANT SELECT, INSERT, UPDATE ON analytics.margin_reports TO streamlit_user;

2. Data Contracts & Validation

from pydantic import BaseModel, validator
from typing import Optional, Dict, Any
from datetime import date, datetime

class MarginReportData(BaseModel):
    account_number: str
    report_date: date
    extracted_data: Dict[str, Any]
    extraction_confidence: float
    created_by: str
    source_file_name: Optional[str] = None

    @validator('extraction_confidence')
    def confidence_must_be_valid(cls, v):
        if not 0 <= v <= 1:
            raise ValueError('Extraction confidence must be between 0 and 1')
        return v

    @validator('account_number')
    def account_must_be_valid(cls, v):
        if not v or len(v.strip()) == 0:
            raise ValueError('Account number cannot be empty')
        return v.strip().upper()

# Usage in Streamlit app
def save_margin_data(data: dict) -> bool:
    try:
        # Validate data
        validated_data = MarginReportData(**data)

        # Save to database
        result = supabase.table('analytics.margin_reports').insert(
            validated_data.dict()
        ).execute()

        return True
    except ValidationError as e:
        st.error(f"Data validation failed: {e}")
        return False
    except Exception as e:
        st.error(f"Database error: {e}")
        logger.error(f"Failed to save margin data: {e}")
        return False

Application Architecture

1. Enhanced Streamlit Application Structure

production-app/
├── app.py                 # Main application entry point
├── pages/                 # Multi-page application
│   ├── 01_📤_Upload.py   
│   ├── 02_📊_Analysis.py
│   └── 03_⚙️_Settings.py
├── utils/                 # Business logic modules
│   ├── __init__.py
│   ├── database.py        # Database operations
│   ├── processing.py      # Data processing logic  
│   ├── validation.py      # Data validation
│   └── auth.py           # Authentication helpers
├── config/
│   ├── settings.py        # Application configuration
│   └── logging.py        # Logging configuration
├── requirements.txt
├── railway.toml
├── Dockerfile            # Optional: custom container
└── tests/               # Unit tests
    ├── test_processing.py
    └── test_validation.py

2. Main Application (app.py)

import streamlit as st
import logging
from utils.auth import check_authentication
from utils.database import init_database
from config.logging import setup_logging
from config.settings import get_settings

# Configure application
st.set_page_config(
    page_title="MarginIQ Production",
    page_icon="💼",
    layout="wide",
    initial_sidebar_state="expanded"
)

# Initialize logging
setup_logging()
logger = logging.getLogger(__name__)

# Initialize database connection
@st.cache_resource
def init_app():
    settings = get_settings()
    db = init_database(settings.database_url, settings.database_key)
    return db, settings

def main():
    try:
        # Authentication check
        if not check_authentication():
            st.error("Authentication required")
            st.stop()

        # Initialize app resources
        db, settings = init_app()

        # Store in session state for pages
        st.session_state.db = db
        st.session_state.settings = settings

        # Application header
        st.title("💼 MarginIQ Production")

        # Health check indicator
        with st.container():
            col1, col2, col3 = st.columns([2, 1, 1])
            with col2:
                if st.button("🔍 Health Check"):
                    health_status = check_system_health(db)
                    if health_status['healthy']:
                        st.success("System operational")
                    else:
                        st.error(f"System issues: {health_status['issues']}")

        # Navigation handled by Streamlit pages

    except Exception as e:
        logger.error(f"Application error: {e}")
        st.error("Application error occurred. Please try again or contact support.")

def check_system_health(db) -> dict:
    """Check system health status"""
    try:
        # Test database connection
        result = db.table('analytics.margin_reports').select("id").limit(1).execute()

        # Test external dependencies
        # Add checks for external APIs, file storage, etc.

        return {'healthy': True, 'issues': []}
    except Exception as e:
        return {'healthy': False, 'issues': [str(e)]}

if __name__ == "__main__":
    main()

3. Database Operations (utils/database.py)

from supabase import create_client
import streamlit as st
import logging
from typing import Dict, List, Optional, Any
from datetime import datetime, timedelta

logger = logging.getLogger(__name__)

class DatabaseManager:
    def __init__(self, url: str, key: str):
        self.client = create_client(url, key)

    def save_margin_report(self, data: Dict[str, Any]) -> Optional[str]:
        """Save margin report with error handling and logging"""
        try:
            # Add metadata
            data['created_at'] = datetime.utcnow().isoformat()
            data['processing_start'] = datetime.utcnow()

            result = self.client.table('analytics.margin_reports').insert(data).execute()

            if result.data:
                report_id = result.data[0]['id']
                logger.info(f"Margin report saved successfully: {report_id}")
                return report_id
            else:
                logger.error("Failed to save margin report: No data returned")
                return None

        except Exception as e:
            logger.error(f"Database error saving margin report: {e}")
            # Don't re-raise in production to prevent app crashes
            return None

    def get_recent_reports(self, limit: int = 50) -> List[Dict[str, Any]]:
        """Get recent reports with error handling"""
        try:
            result = self.client.table('analytics.margin_reports')\
                .select("*")\
                .order('created_at', desc=True)\
                .limit(limit)\
                .execute()

            return result.data or []

        except Exception as e:
            logger.error(f"Error fetching recent reports: {e}")
            return []

    def get_report_analytics(self, days: int = 30) -> Dict[str, Any]:
        """Get analytics data for dashboard"""
        try:
            # Date range filter
            start_date = (datetime.utcnow() - timedelta(days=days)).isoformat()

            result = self.client.table('analytics.margin_reports')\
                .select("account_number, report_date, extraction_confidence, data_quality_score")\
                .gte('created_at', start_date)\
                .execute()

            data = result.data or []

            # Calculate analytics
            analytics = {
                'total_reports': len(data),
                'unique_accounts': len(set(r['account_number'] for r in data)),
                'avg_confidence': sum(r.get('extraction_confidence', 0) for r in data) / len(data) if data else 0,
                'avg_quality': sum(r.get('data_quality_score', 0) for r in data) / len(data) if data else 0
            }

            return analytics

        except Exception as e:
            logger.error(f"Error calculating analytics: {e}")
            return {'error': str(e)}

@st.cache_resource
def init_database(url: str, key: str) -> DatabaseManager:
    return DatabaseManager(url, key)

Monitoring & Alerting

1. Logging Configuration (config/logging.py)

import logging
import streamlit as st
from typing import Dict, Any
import json
from datetime import datetime

class StreamlitLogHandler(logging.Handler):
    """Custom handler that can display logs in Streamlit sidebar"""

    def __init__(self):
        super().__init__()
        self.logs = []

    def emit(self, record):
        log_entry = {
            'timestamp': datetime.utcnow().isoformat(),
            'level': record.levelname,
            'message': record.getMessage(),
            'module': record.module
        }

        self.logs.append(log_entry)

        # Keep only last 100 logs in memory
        if len(self.logs) > 100:
            self.logs = self.logs[-100:]

def setup_logging():
    """Configure application logging"""
    # Create custom handler
    streamlit_handler = StreamlitLogHandler()

    # Configure root logger
    logging.basicConfig(
        level=logging.INFO,
        format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
        handlers=[
            logging.StreamHandler(),  # Console output
            streamlit_handler        # Streamlit display
        ]
    )

    # Store handler in session state for access
    st.session_state.log_handler = streamlit_handler

def display_recent_logs():
    """Display recent logs in Streamlit sidebar"""
    if 'log_handler' in st.session_state:
        handler = st.session_state.log_handler

        with st.sidebar.expander("📋 Recent Logs", expanded=False):
            for log in handler.logs[-10:]:  # Show last 10 logs
                level_color = {
                    'ERROR': '🔴',
                    'WARNING': '🟡', 
                    'INFO': '🟢',
                    'DEBUG': '🔵'
                }.get(log['level'], '⚪')

                st.text(f"{level_color} {log['timestamp'][:19]}")
                st.text(f"   {log['message']}")

2. Performance Monitoring

import time
import streamlit as st
from functools import wraps
from typing import Callable, Any

def monitor_performance(operation_name: str):
    """Decorator to monitor function performance"""
    def decorator(func: Callable) -> Callable:
        @wraps(func)
        def wrapper(*args, **kwargs) -> Any:
            start_time = time.time()

            try:
                result = func(*args, **kwargs)
                execution_time = time.time() - start_time

                # Log performance metrics
                logger.info(f"Performance: {operation_name} took {execution_time:.2f}s")

                # Store in session state for dashboard
                if 'performance_metrics' not in st.session_state:
                    st.session_state.performance_metrics = []

                st.session_state.performance_metrics.append({
                    'operation': operation_name,
                    'duration': execution_time,
                    'timestamp': datetime.utcnow(),
                    'status': 'success'
                })

                return result

            except Exception as e:
                execution_time = time.time() - start_time
                logger.error(f"Performance: {operation_name} failed after {execution_time:.2f}s: {e}")

                # Record failed operation
                if 'performance_metrics' not in st.session_state:
                    st.session_state.performance_metrics = []

                st.session_state.performance_metrics.append({
                    'operation': operation_name,
                    'duration': execution_time,
                    'timestamp': datetime.utcnow(),
                    'status': 'error',
                    'error': str(e)
                })

                raise

        return wrapper
    return decorator

# Usage example
@monitor_performance("PDF Processing")
def process_pdf(file_data: bytes) -> dict:
    # PDF processing logic
    pass

def display_performance_metrics():
    """Display performance dashboard"""
    if 'performance_metrics' not in st.session_state:
        return

    metrics = st.session_state.performance_metrics[-50:]  # Last 50 operations

    col1, col2, col3 = st.columns(3)

    with col1:
        success_count = sum(1 for m in metrics if m['status'] == 'success')
        st.metric("Success Rate", f"{success_count/len(metrics)*100:.1f}%" if metrics else "N/A")

    with col2:
        avg_duration = sum(m['duration'] for m in metrics) / len(metrics) if metrics else 0
        st.metric("Avg Duration", f"{avg_duration:.2f}s")

    with col3:
        error_count = sum(1 for m in metrics if m['status'] == 'error')
        st.metric("Error Count", error_count)

Deployment Configuration

Railway Configuration (railway.toml)

[build]
builder = "dockerfile"

[deploy] 
startCommand = "streamlit run app.py --server.port=8501 --server.address=0.0.0.0 --server.headless=true --server.enableCORS=false --server.enableXsrfProtection=false"

# Health check endpoint
healthcheckPath = "/_stcore/health"
healthcheckTimeout = 10
restartPolicyType = "on-failure"
restartPolicyMaxRetries = 3

# Resource limits for production
[[resources]]
cpu = 1000  # 1 CPU
memory = 2048  # 2GB RAM

# Persistent volume for file storage
[[volumes]]
name = "margin-iq-production-data" 
mountPath = "/app/data"

# Environment variables (set via Railway dashboard)
# SUPABASE_URL
# SUPABASE_SERVICE_ROLE_KEY  
# STREAMLIT_SERVER_HEADLESS=true
# LOG_LEVEL=INFO

Dockerfile (Optional)

FROM python:3.11-slim

# Set working directory
WORKDIR /app

# Install system dependencies
RUN apt-get update && apt-get install -y \
    curl \
    tesseract-ocr \
    poppler-utils \
    && rm -rf /var/lib/apt/lists/*

# Copy requirements and install
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

# Copy application code
COPY . .

# Create data directory
RUN mkdir -p /app/data && chmod 755 /app/data

# Health check
HEALTHCHECK --interval=30s --timeout=10s --start-period=5s --retries=3 \
    CMD curl -f http://localhost:8501/_stcore/health || exit 1

# Run application
CMD ["streamlit", "run", "app.py", "--server.port=8501", "--server.address=0.0.0.0", "--server.headless=true"]

Operations & Maintenance

Daily Operations Checklist

  • Check application health status
  • Review error logs for issues
  • Monitor performance metrics
  • Verify data quality scores
  • Check user feedback/issues

Weekly Operations

  • Review usage analytics
  • Update documentation
  • Performance optimization review
  • Security update check
  • Backup verification

Monthly Operations

  • User satisfaction survey
  • Cost analysis and optimization
  • Feature usage analysis
  • Architecture review
  • Consider Phase 3 graduation

Success Metrics for Phase 2

Reliability Metrics: - Uptime > 99.5% - Error rate < 1% - Mean response time < 3 seconds

User Metrics:
- Daily active users > 10 - User satisfaction > 4.0/5.0 - Feature adoption rate > 60%

Business Metrics: - Time savings quantified - Process efficiency improvement - User productivity increase

Common Phase 2 Enhancements

Authentication & Authorization

def check_user_access(required_role: str = None) -> bool:
    """Check if user has required access level"""
    user_info = st.session_state.get('user_info')

    if not user_info:
        return False

    if required_role and user_info.get('role') != required_role:
        return False

    return True

# Usage in pages
if not check_user_access('analyst'):
    st.error("Insufficient permissions")
    st.stop()

Data Export Capabilities

@monitor_performance("Data Export")
def export_data(data: pd.DataFrame, format: str) -> bytes:
    """Export data in various formats"""
    if format == 'csv':
        return data.to_csv(index=False).encode()
    elif format == 'excel':
        buffer = io.BytesIO()
        data.to_excel(buffer, index=False)
        return buffer.getvalue()
    elif format == 'parquet':
        buffer = io.BytesIO()
        data.to_parquet(buffer, index=False)
        return buffer.getvalue()

Scheduled Background Jobs

# Background job for data refresh (separate Railway service)
import schedule
import time

def refresh_analytics_data():
    """Refresh materialized views and cache"""
    try:
        # Refresh logic here
        logger.info("Analytics data refreshed successfully")
    except Exception as e:
        logger.error(f"Failed to refresh analytics: {e}")

# Schedule jobs
schedule.every(1).hour.do(refresh_analytics_data)
schedule.every().day.at("06:00").do(refresh_analytics_data)

# Run scheduler
while True:
    schedule.run_pending()
    time.sleep(60)

This comprehensive Phase 2 guide transforms experiments into production-ready applications while maintaining development agility.