Skip to content

Margin ELT Loading System

Overview

The Margin ELT (Extract-Load-Transform) Loading System implements a modern data pipeline architecture that separates raw data extraction from business logic transformations. This design enables flexible data processing, improved performance, and better data governance for margin report analysis.

ELT vs ETL Architecture

Traditional ETL Problems

graph LR
    A[PDF] --> B[Extract] --> C[Transform] --> D[Load]
    C -.->|If transform fails| E[Lost raw data]
    C -.->|Business logic changes| F[Re-extract required]

ELT Solution Benefits

graph LR
    A[PDF] --> B[Extract] --> C[Load Raw] --> D[Transform On-Demand]
    C --> E[Multiple Transforms]
    C --> F[Data Recovery]
    C --> G[Reprocessing]

Key Advantages: - Raw Data Preservation: Original OCR data never lost - Flexible Transformations: Different business logic for different use cases - Better Performance: Lazy loading and on-demand processing - Data Recovery: Ability to reprocess without re-extraction

Pipeline Stages

Stage 1: Extract (E) 🔍

Purpose: Raw data extraction with no business logic applied

def extract_stage(self, pdf_file, job_id: str) -> bool:
    """
    Stage 1: Extract raw data from PDF using OCR
    No business logic or data enhancement
    """
    # Pure OCR extraction
    extractor = MarginReportExtractor(log_level='INFO')
    raw_result = extractor.extract_from_pdf(pdf_file)

    # Create raw positions DataFrame (minimal processing)
    raw_positions = pd.DataFrame([{
        'symbol': pos.symbol,
        'quantity': pos.quantity, 
        'closing_price': pos.closing_price,
        'market_value': pos.market_value,
        'reg_t_requirement': pos.reg_t_requirement,
        'option_price': pos.option_price,
        'company_name': pos.company_name,
        'strategy_description': pos.strategy_description,
        'position_type': pos.position_type,
        'raw_line': getattr(pos, 'raw_line', None)
    } for pos in raw_result.positions])

What Gets Extracted: - Symbol identifiers - Quantity values
- Pricing information - Raw text lines from PDF - Account metadata - Processing timestamps

What's NOT Applied: - Position classification logic - Business rule calculations - Data validation and cleanup - Portfolio-level aggregations

Stage 2: Load (L) 💾

Purpose: Store raw data in queryable storage format

def load_stage(self, job_id: str, raw_positions: pd.DataFrame, metadata: Dict) -> bool:
    """
    Stage 2: Load raw data into queryable storage format
    No transformations, pure data persistence
    """
    # Save raw positions (untransformed)
    raw_positions_path = PARQUET_DIR / f"raw_positions_{job_id}.parquet"
    raw_positions.to_parquet(raw_positions_path, index=False)

    # Save extraction metadata
    metadata_path = PARQUET_DIR / f"extraction_metadata_{job_id}.json"
    with open(metadata_path, 'w') as f:
        json.dump(metadata, f, indent=2)

Storage Strategy: - Raw Positions: raw_positions_{job_id}.parquet - Metadata: extraction_metadata_{job_id}.json - Job Tracking: Central jobs table with status tracking - Data Integrity: Atomic writes, rollback on failure

Stage 3: Transform (T) 🔄

Purpose: Apply business logic transformations at consumption time

def transform_stage(self, job_id: str, transform_type: str = "standard") -> Dict[str, Any]:
    """
    Stage 3: Apply business logic transformations at consumption time
    Different transformations for different use cases
    """
    # Load raw data
    raw_data = self.load_raw_data(job_id)

    # Apply transformation based on use case
    if transform_type == "analysis":
        return self.transform_for_analysis(raw_data)
    elif transform_type == "risk_dashboard":
        return self.transform_for_risk(raw_data)
    elif transform_type == "qa":
        return self.transform_for_qa(raw_data)
    else:
        return self.transform_standard(raw_data)

Transform Types

Standard Transform

Use Case: General position analysis and reporting

def transform_standard(self, raw_data: Dict) -> Dict:
    """Standard transformations for general analysis"""
    # Apply standard enhancements
    enhanced_positions = []
    for _, row in positions_df.iterrows():
        enhanced = enhance_position_data(row, raw_data['metadata'])
        enhanced_positions.append(enhanced)

    enhanced_df = pd.DataFrame(enhanced_positions)
    portfolio_totals = self.calculate_portfolio_totals(enhanced_df)

    return {
        'positions': enhanced_df,
        'portfolio_totals': portfolio_totals,
        'metadata': raw_data['metadata']
    }

Enhancements Applied: - Asset class classification (equity vs. option) - Position type determination (long vs. short) - Option details parsing (strike, expiration, type) - Contract multiplier calculations - Market value adjustments

Risk Transform

Use Case: Risk dashboard and portfolio monitoring

def transform_for_risk(self, raw_data: Dict) -> Dict:
    """Transformations optimized for risk dashboard"""
    result = self.transform_standard(raw_data)

    # Add risk-specific calculations
    positions_df = result['positions']

    # Strategy classification
    positions_df['strategy_type'] = positions_df.apply(self.classify_strategy, axis=1)

    # Concentration calculations
    total_mv = positions_df['market_value'].sum()
    positions_df['concentration_pct'] = (positions_df['market_value'] / total_mv) * 100

    return result

Risk-Specific Enhancements: - Strategy type classification - Concentration percentage calculations - Risk metric aggregations - Portfolio-level statistics

QA Transform

Use Case: Data quality analysis and validation

def transform_for_qa(self, raw_data: Dict) -> Dict:
    """Transformations optimized for data quality analysis"""
    result = self.transform_standard(raw_data)

    # Add QA-specific fields
    positions_df = result['positions']

    # Data quality flags
    positions_df['has_negative_long'] = (
        (positions_df['position_type'] == 'long') & 
        (positions_df['market_value'] < 0)
    )

    positions_df['has_positive_short'] = (
        (positions_df['position_type'] == 'short') & 
        (positions_df['market_value'] > 0)
    )

    return result

QA-Specific Validations: - Inconsistent position signs - Missing option details - Zero price/quantity flags - Data quality scoring

Data Enhancement Engine

Position Classification

def enhance_position_data(pos: Union[pd.Series, Dict], metadata: Dict) -> Dict:
    """Transform raw position data with business logic enhancements"""

    # Parse symbol to determine position characteristics
    symbol = str(pos_dict.get('symbol', '')).upper()
    is_option = bool(pos_dict.get('option_price')) or 'CALL' in symbol or 'PUT' in symbol

    # Option details parsing
    if is_option:
        # Try to parse option symbol format (e.g., AAPL240115C00150000)
        option_match = re.match(r'([A-Z]+)(\\d{6})([CP])(\\d{8})', symbol)
        if option_match:
            underlying_symbol = option_match.group(1)
            option_type = "call" if option_match.group(3) == 'C' else "put"
            option_strike = int(option_match.group(4)) / 1000.0

Business Rule Engine

Contract Multipliers

contract_multiplier = 100 if is_option else 1
market_value = quantity * closing_price * contract_multiplier

Position Types

position_type = "short" if quantity < 0 else "long"

Risk Classifications

def classify_strategy(row) -> str:
    if row.get('asset_class') == 'option':
        if row.get('option_type') == 'call':
            return 'long_call' if row.get('position_type') == 'long' else 'short_call'
        elif row.get('option_type') == 'put':
            return 'long_put' if row.get('position_type') == 'long' else 'short_put'
    elif row.get('position_type') == 'short':
        return 'short_equity'
    else:
        return 'long_equity'

Job Management System

Job Lifecycle

graph TD
    A[Create Job] --> B[IN_PROGRESS]
    B --> C[Extract Stage]
    C --> D[Load Stage] 
    D --> E[Raw Data Stored]
    E --> F{Auto Transform?}
    F -->|Yes| G[Transform Stage]
    F -->|No| H[Ready for On-Demand]
    G --> I[SUCCESS]
    H --> J[LOADED]
    C -->|Error| K[FAILED]
    D -->|Error| K
    G -->|Error| K

Job Status Tracking

class JobManager:
    def create_job(self, source_file: str, created_by: str = "streamlit_user") -> str:
        """Create a new job entry and return job_id"""
        new_job = {
            'job_id': job_id,
            'job_started_at': datetime.now(),
            'status': 'IN_PROGRESS',
            'source_file': source_file,
            'created_by': created_by
        }

    def complete_job(self, job_id: str, positions_df: pd.DataFrame, 
                     portfolio_totals: Dict, metadata: Dict):
        """Complete a job by saving all data files and updating job status"""
        # Save all data files
        # Update status to 'SUCCESS'

    def fail_job(self, job_id: str, error_message: str):
        """Mark job as failed"""
        # Update status to 'FAILED'
        # Log error message

Performance Optimizations

Lazy Loading Strategy

Problem: Loading all job data upfront is expensive Solution: Load data only when needed

@dataclass
class JobData:
    """Container for job-related data files with lazy loading"""
    job_id: str
    positions: Optional[pd.DataFrame] = None
    portfolio_totals: Optional[pd.DataFrame] = None
    metadata: Optional[Dict] = None

    def materialize_positions(self):
        """Load positions data if not already loaded"""
        if self.positions is None:
            positions_path = PARQUET_DIR / f"positions_{self.job_id}.parquet"
            if positions_path.exists():
                self.positions = pd.read_parquet(positions_path)
        return self.positions

Caching Strategy

Transform Results Caching

@st.cache_data
def load_job_with_transform(job_id: str, transform_type: str = "standard"):
    """Load job data with caching for transformed results"""
    elt_processor = ELTProcessor()
    return elt_processor.transform_stage(job_id, transform_type)

Parquet Storage Benefits

  • Columnar Format: Efficient for analytical queries
  • Compression: Reduced storage footprint
  • Schema Evolution: Support for adding new fields
  • Fast I/O: Optimized for pandas operations

Error Handling & Recovery

Extraction Failures

try:
    extract_success = elt_processor.extract_stage(temp_path, job_id)
    if not extract_success:
        st.error("❌ Extraction stage failed")
        return
except Exception as e:
    job_manager.fail_job(job_id, str(e))
    st.error(f"❌ ELT Pipeline Failed: {str(e)}")

Data Recovery Scenarios

Scenario 1: Transform logic changes

# Raw data preserved - can reprocess without re-extraction
old_data = load_raw_data(job_id)
new_result = apply_new_transform_logic(old_data)

Scenario 2: Extraction improvements

# Can compare old vs new extraction results
old_extraction = load_raw_data(old_job_id)
new_extraction = extract_stage(same_pdf, new_job_id)
diff_analysis = compare_extractions(old_extraction, new_extraction)

Best Practices

🎯 Development Workflow

ELT Development Pattern

  1. Test extraction first - Verify raw data quality
  2. Develop transforms incrementally - Start with standard, add specialized
  3. Preserve backward compatibility - Don't break existing transforms
  4. Use type hints - Ensure data contracts are clear

📊 Data Quality Assurance

Validation at Each Stage

# Stage 1: Extraction validation
assert len(raw_positions) > 0, "No positions extracted"
assert 'symbol' in raw_positions.columns, "Missing symbol column"

# Stage 2: Load validation  
assert raw_positions_path.exists(), "Raw data not saved"

# Stage 3: Transform validation
assert 'asset_class' in enhanced_df.columns, "Missing asset_class"
assert enhanced_df['asset_class'].isin(['equity', 'option']).all()

🔒 Security & Compliance

Data Isolation - Each job gets unique storage files - No cross-job data contamination - Clean separation of concerns

Audit Trail - Complete processing history - Error logging and tracking
- Metadata preservation

Common Use Cases

Reprocessing Historical Data

# Get all jobs from last month
historical_jobs = job_manager.get_jobs_by_date_range(start_date, end_date)

# Apply new transform logic to all jobs
for job_id in historical_jobs['job_id']:
    raw_data = elt_processor.load_raw_data(job_id)
    updated_result = elt_processor.transform_standard(raw_data)
    save_updated_results(job_id, updated_result)

A/B Testing Transform Logic

# Test new risk calculation logic
control_result = transform_for_risk_v1(raw_data)
experiment_result = transform_for_risk_v2(raw_data)

# Compare results
risk_diff = compare_risk_calculations(control_result, experiment_result)

Data Migration

# Migrate old ETL results to ELT format
old_etl_files = get_legacy_files()
for file in old_etl_files:
    # Extract raw data from legacy format
    raw_data = reverse_engineer_raw_data(file)

    # Save in new ELT format
    job_id = job_manager.create_job(file.name, "migration")
    elt_processor.load_stage(job_id, raw_data['positions'], raw_data['metadata'])

Monitoring & Troubleshooting

Performance Monitoring

  • Extract Stage: OCR processing time
  • Load Stage: File I/O performance
  • Transform Stage: Business logic execution time

Common Issues

Troubleshooting Guide

  • Import Errors: Check ELT processor imports and paths
  • Job Status Issues: Verify JobManager method calls
  • Transform Failures: Check raw data format and business logic
  • Memory Issues: Use lazy loading for large datasets

Logging Strategy

import logging
logger = logging.getLogger('elt_pipeline')

# Stage-specific logging
logger.info(f"Starting extraction for job {job_id}")
logger.info(f"Raw data loaded: {len(raw_positions)} positions")
logger.info(f"Transform complete: {transform_type}")

For operational guidance, see the main Margin Risk Management Guide.