Margin ELT Loading System¶
Overview¶
The Margin ELT (Extract-Load-Transform) Loading System implements a modern data pipeline architecture that separates raw data extraction from business logic transformations. This design enables flexible data processing, improved performance, and better data governance for margin report analysis.
ELT vs ETL Architecture¶
Traditional ETL Problems¶
graph LR
A[PDF] --> B[Extract] --> C[Transform] --> D[Load]
C -.->|If transform fails| E[Lost raw data]
C -.->|Business logic changes| F[Re-extract required]
ELT Solution Benefits¶
graph LR
A[PDF] --> B[Extract] --> C[Load Raw] --> D[Transform On-Demand]
C --> E[Multiple Transforms]
C --> F[Data Recovery]
C --> G[Reprocessing]
Key Advantages: - Raw Data Preservation: Original OCR data never lost - Flexible Transformations: Different business logic for different use cases - Better Performance: Lazy loading and on-demand processing - Data Recovery: Ability to reprocess without re-extraction
Pipeline Stages¶
Stage 1: Extract (E) 🔍¶
Purpose: Raw data extraction with no business logic applied
def extract_stage(self, pdf_file, job_id: str) -> bool:
"""
Stage 1: Extract raw data from PDF using OCR
No business logic or data enhancement
"""
# Pure OCR extraction
extractor = MarginReportExtractor(log_level='INFO')
raw_result = extractor.extract_from_pdf(pdf_file)
# Create raw positions DataFrame (minimal processing)
raw_positions = pd.DataFrame([{
'symbol': pos.symbol,
'quantity': pos.quantity,
'closing_price': pos.closing_price,
'market_value': pos.market_value,
'reg_t_requirement': pos.reg_t_requirement,
'option_price': pos.option_price,
'company_name': pos.company_name,
'strategy_description': pos.strategy_description,
'position_type': pos.position_type,
'raw_line': getattr(pos, 'raw_line', None)
} for pos in raw_result.positions])
What Gets Extracted:
- Symbol identifiers
- Quantity values
- Pricing information
- Raw text lines from PDF
- Account metadata
- Processing timestamps
What's NOT Applied: - Position classification logic - Business rule calculations - Data validation and cleanup - Portfolio-level aggregations
Stage 2: Load (L) 💾¶
Purpose: Store raw data in queryable storage format
def load_stage(self, job_id: str, raw_positions: pd.DataFrame, metadata: Dict) -> bool:
"""
Stage 2: Load raw data into queryable storage format
No transformations, pure data persistence
"""
# Save raw positions (untransformed)
raw_positions_path = PARQUET_DIR / f"raw_positions_{job_id}.parquet"
raw_positions.to_parquet(raw_positions_path, index=False)
# Save extraction metadata
metadata_path = PARQUET_DIR / f"extraction_metadata_{job_id}.json"
with open(metadata_path, 'w') as f:
json.dump(metadata, f, indent=2)
Storage Strategy:
- Raw Positions: raw_positions_{job_id}.parquet
- Metadata: extraction_metadata_{job_id}.json
- Job Tracking: Central jobs table with status tracking
- Data Integrity: Atomic writes, rollback on failure
Stage 3: Transform (T) 🔄¶
Purpose: Apply business logic transformations at consumption time
def transform_stage(self, job_id: str, transform_type: str = "standard") -> Dict[str, Any]:
"""
Stage 3: Apply business logic transformations at consumption time
Different transformations for different use cases
"""
# Load raw data
raw_data = self.load_raw_data(job_id)
# Apply transformation based on use case
if transform_type == "analysis":
return self.transform_for_analysis(raw_data)
elif transform_type == "risk_dashboard":
return self.transform_for_risk(raw_data)
elif transform_type == "qa":
return self.transform_for_qa(raw_data)
else:
return self.transform_standard(raw_data)
Transform Types¶
Standard Transform¶
Use Case: General position analysis and reporting
def transform_standard(self, raw_data: Dict) -> Dict:
"""Standard transformations for general analysis"""
# Apply standard enhancements
enhanced_positions = []
for _, row in positions_df.iterrows():
enhanced = enhance_position_data(row, raw_data['metadata'])
enhanced_positions.append(enhanced)
enhanced_df = pd.DataFrame(enhanced_positions)
portfolio_totals = self.calculate_portfolio_totals(enhanced_df)
return {
'positions': enhanced_df,
'portfolio_totals': portfolio_totals,
'metadata': raw_data['metadata']
}
Enhancements Applied: - Asset class classification (equity vs. option) - Position type determination (long vs. short) - Option details parsing (strike, expiration, type) - Contract multiplier calculations - Market value adjustments
Risk Transform¶
Use Case: Risk dashboard and portfolio monitoring
def transform_for_risk(self, raw_data: Dict) -> Dict:
"""Transformations optimized for risk dashboard"""
result = self.transform_standard(raw_data)
# Add risk-specific calculations
positions_df = result['positions']
# Strategy classification
positions_df['strategy_type'] = positions_df.apply(self.classify_strategy, axis=1)
# Concentration calculations
total_mv = positions_df['market_value'].sum()
positions_df['concentration_pct'] = (positions_df['market_value'] / total_mv) * 100
return result
Risk-Specific Enhancements: - Strategy type classification - Concentration percentage calculations - Risk metric aggregations - Portfolio-level statistics
QA Transform¶
Use Case: Data quality analysis and validation
def transform_for_qa(self, raw_data: Dict) -> Dict:
"""Transformations optimized for data quality analysis"""
result = self.transform_standard(raw_data)
# Add QA-specific fields
positions_df = result['positions']
# Data quality flags
positions_df['has_negative_long'] = (
(positions_df['position_type'] == 'long') &
(positions_df['market_value'] < 0)
)
positions_df['has_positive_short'] = (
(positions_df['position_type'] == 'short') &
(positions_df['market_value'] > 0)
)
return result
QA-Specific Validations: - Inconsistent position signs - Missing option details - Zero price/quantity flags - Data quality scoring
Data Enhancement Engine¶
Position Classification¶
def enhance_position_data(pos: Union[pd.Series, Dict], metadata: Dict) -> Dict:
"""Transform raw position data with business logic enhancements"""
# Parse symbol to determine position characteristics
symbol = str(pos_dict.get('symbol', '')).upper()
is_option = bool(pos_dict.get('option_price')) or 'CALL' in symbol or 'PUT' in symbol
# Option details parsing
if is_option:
# Try to parse option symbol format (e.g., AAPL240115C00150000)
option_match = re.match(r'([A-Z]+)(\\d{6})([CP])(\\d{8})', symbol)
if option_match:
underlying_symbol = option_match.group(1)
option_type = "call" if option_match.group(3) == 'C' else "put"
option_strike = int(option_match.group(4)) / 1000.0
Business Rule Engine¶
Contract Multipliers
contract_multiplier = 100 if is_option else 1
market_value = quantity * closing_price * contract_multiplier
Position Types
Risk Classifications
def classify_strategy(row) -> str:
if row.get('asset_class') == 'option':
if row.get('option_type') == 'call':
return 'long_call' if row.get('position_type') == 'long' else 'short_call'
elif row.get('option_type') == 'put':
return 'long_put' if row.get('position_type') == 'long' else 'short_put'
elif row.get('position_type') == 'short':
return 'short_equity'
else:
return 'long_equity'
Job Management System¶
Job Lifecycle¶
graph TD
A[Create Job] --> B[IN_PROGRESS]
B --> C[Extract Stage]
C --> D[Load Stage]
D --> E[Raw Data Stored]
E --> F{Auto Transform?}
F -->|Yes| G[Transform Stage]
F -->|No| H[Ready for On-Demand]
G --> I[SUCCESS]
H --> J[LOADED]
C -->|Error| K[FAILED]
D -->|Error| K
G -->|Error| K
Job Status Tracking¶
class JobManager:
def create_job(self, source_file: str, created_by: str = "streamlit_user") -> str:
"""Create a new job entry and return job_id"""
new_job = {
'job_id': job_id,
'job_started_at': datetime.now(),
'status': 'IN_PROGRESS',
'source_file': source_file,
'created_by': created_by
}
def complete_job(self, job_id: str, positions_df: pd.DataFrame,
portfolio_totals: Dict, metadata: Dict):
"""Complete a job by saving all data files and updating job status"""
# Save all data files
# Update status to 'SUCCESS'
def fail_job(self, job_id: str, error_message: str):
"""Mark job as failed"""
# Update status to 'FAILED'
# Log error message
Performance Optimizations¶
Lazy Loading Strategy¶
Problem: Loading all job data upfront is expensive Solution: Load data only when needed
@dataclass
class JobData:
"""Container for job-related data files with lazy loading"""
job_id: str
positions: Optional[pd.DataFrame] = None
portfolio_totals: Optional[pd.DataFrame] = None
metadata: Optional[Dict] = None
def materialize_positions(self):
"""Load positions data if not already loaded"""
if self.positions is None:
positions_path = PARQUET_DIR / f"positions_{self.job_id}.parquet"
if positions_path.exists():
self.positions = pd.read_parquet(positions_path)
return self.positions
Caching Strategy¶
Transform Results Caching
@st.cache_data
def load_job_with_transform(job_id: str, transform_type: str = "standard"):
"""Load job data with caching for transformed results"""
elt_processor = ELTProcessor()
return elt_processor.transform_stage(job_id, transform_type)
Parquet Storage Benefits¶
- Columnar Format: Efficient for analytical queries
- Compression: Reduced storage footprint
- Schema Evolution: Support for adding new fields
- Fast I/O: Optimized for pandas operations
Error Handling & Recovery¶
Extraction Failures¶
try:
extract_success = elt_processor.extract_stage(temp_path, job_id)
if not extract_success:
st.error("❌ Extraction stage failed")
return
except Exception as e:
job_manager.fail_job(job_id, str(e))
st.error(f"❌ ELT Pipeline Failed: {str(e)}")
Data Recovery Scenarios¶
Scenario 1: Transform logic changes
# Raw data preserved - can reprocess without re-extraction
old_data = load_raw_data(job_id)
new_result = apply_new_transform_logic(old_data)
Scenario 2: Extraction improvements
# Can compare old vs new extraction results
old_extraction = load_raw_data(old_job_id)
new_extraction = extract_stage(same_pdf, new_job_id)
diff_analysis = compare_extractions(old_extraction, new_extraction)
Best Practices¶
🎯 Development Workflow¶
ELT Development Pattern
- Test extraction first - Verify raw data quality
- Develop transforms incrementally - Start with standard, add specialized
- Preserve backward compatibility - Don't break existing transforms
- Use type hints - Ensure data contracts are clear
📊 Data Quality Assurance¶
Validation at Each Stage
# Stage 1: Extraction validation
assert len(raw_positions) > 0, "No positions extracted"
assert 'symbol' in raw_positions.columns, "Missing symbol column"
# Stage 2: Load validation
assert raw_positions_path.exists(), "Raw data not saved"
# Stage 3: Transform validation
assert 'asset_class' in enhanced_df.columns, "Missing asset_class"
assert enhanced_df['asset_class'].isin(['equity', 'option']).all()
🔒 Security & Compliance¶
Data Isolation - Each job gets unique storage files - No cross-job data contamination - Clean separation of concerns
Audit Trail
- Complete processing history
- Error logging and tracking
- Metadata preservation
Common Use Cases¶
Reprocessing Historical Data¶
# Get all jobs from last month
historical_jobs = job_manager.get_jobs_by_date_range(start_date, end_date)
# Apply new transform logic to all jobs
for job_id in historical_jobs['job_id']:
raw_data = elt_processor.load_raw_data(job_id)
updated_result = elt_processor.transform_standard(raw_data)
save_updated_results(job_id, updated_result)
A/B Testing Transform Logic¶
# Test new risk calculation logic
control_result = transform_for_risk_v1(raw_data)
experiment_result = transform_for_risk_v2(raw_data)
# Compare results
risk_diff = compare_risk_calculations(control_result, experiment_result)
Data Migration¶
# Migrate old ETL results to ELT format
old_etl_files = get_legacy_files()
for file in old_etl_files:
# Extract raw data from legacy format
raw_data = reverse_engineer_raw_data(file)
# Save in new ELT format
job_id = job_manager.create_job(file.name, "migration")
elt_processor.load_stage(job_id, raw_data['positions'], raw_data['metadata'])
Monitoring & Troubleshooting¶
Performance Monitoring¶
- Extract Stage: OCR processing time
- Load Stage: File I/O performance
- Transform Stage: Business logic execution time
Common Issues¶
Troubleshooting Guide
- Import Errors: Check ELT processor imports and paths
- Job Status Issues: Verify JobManager method calls
- Transform Failures: Check raw data format and business logic
- Memory Issues: Use lazy loading for large datasets
Logging Strategy¶
import logging
logger = logging.getLogger('elt_pipeline')
# Stage-specific logging
logger.info(f"Starting extraction for job {job_id}")
logger.info(f"Raw data loaded: {len(raw_positions)} positions")
logger.info(f"Transform complete: {transform_type}")
For operational guidance, see the main Margin Risk Management Guide.