Open Source Econometric Models.

Portfolio risk modeling using GARCH ensembles, CCC, and DCC methodologies - freely available for academic and commercial use

GARCH Modeling

Multiple GARCH specifications with ensemble optimization

Correlation Models

CCC and DCC frameworks for portfolio correlation

Risk Measurement

Value-at-Risk with strong backtesting

Complete Technical Documentation

The econometric models showcased below are documented in our technical report. This paper provides detailed mathematical derivations, model specifications, validation procedures, and empirical results for all GARCH ensemble, CCC, and DCC methodologies implemented in RiskOptimix.

  • Mathematical foundations
  • Model specifications
  • Estimation procedures
  • Backtesting methodologies
  • Empirical validation
  • Performance comparisons
Download Technical Report

PDF documentation

Introduction & Overview

Imports and global configuration for the portfolio modeling framework

# Import used libraries
import numpy as np
import pandas as pd
import yfinance as yf
from arch import arch_model
from scipy.optimize import minimize
from scipy import stats
import warnings
import json
from datetime import datetime
import logging
from typing import List, Dict, Tuple, Optional
import gc
import matplotlib.pyplot as plt
import os

# Filter out all warnings
warnings.filterwarnings('ignore')

# Set up logging, this is used to log the progress of the portfolio modeling
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

# Set the number of days between rebalancing the individual GARCH models
# This here set as a constant as this number is frequently changed when testing the models
REBALANCE_FREQUENCY = 50  # Number of days between rebalancing

CCC Ensemble Model Class

Core CCC (Constant Conditional Correlation) ensemble modeling class

class PortfolioCCCEnsembleModel:
    """
    This class contains the CCC-GARCH ensemble model for portfolio analysis.
    
    This class implements the complete modeling pipeline for user-submitted portfolios:
    1. Data obtaining for portfolio stocks
    2. Ensemble GARCH model fitting for each stock
    3. CCC correlation matrix estimation
    4. Portfolio optimization and risk assessment
    5. Results generation
    """
    
    def __init__(self, portfolio_data, start_date='2014-01-01', end_date=None, confidence_level=0.05, train_ratio=0.8):
        """
        Initialize the portfolio modeling system
        
        Args:
            portfolio_data: List of (ticker, amount) tuples
            start_date: Start date for historical data
            end_date: End date for historical data (None = today)
            confidence_level: VaR confidence level
            train_ratio: Training/testing split ratio
        """

        # Set the initial parameters for the portfolio modeling
        self.portfolio_data = portfolio_data
        self.start_date = start_date
        self.end_date = end_date or datetime.now().strftime('%Y-%m-%d')
        self.confidence_level = confidence_level
        self.train_ratio = train_ratio
        
        # Portfolio composition
        self.tickers = [ticker for ticker, amount in portfolio_data]
        self.amounts = [float(amount) for ticker, amount in portfolio_data]
        self.total_value = sum(self.amounts)
        self.weights = [amount / self.total_value for amount in self.amounts]
        
        # Data storage
        self.returns = None
        self.prices = None
        self.ensemble_models = {}
        self.conditional_volatilities = {}
        self.correlation_matrix = None
        self.standardized_residuals = None
        
        # Results storage
        self.modeling_results = {}
        self.optimization_results = {}
        
        logger.info(f"Initialized portfolio model with {len(self.tickers)} stocks")
        logger.info(f"Portfolio value: ${self.total_value:,.2f}")
        logger.info(f"Data period: {self.start_date} to {self.end_date}")

GARCH Model Specifications

Definition of GARCH model variants for ensemble modeling

def create_garch_model_specifications(self) -> List[Dict]: 
        """
        Create the GARCH model specifications for the individual stocks in the portfolio.

        Returns:
            List of GARCH model specifications
        """

        # Initialize the list of GARCH model specifications
        models = []
        
        # Define the GARCH model configurations
        # In the ensemble, we use GARCH, EGARCH and TARCH models with different parameters
        model_configs = [
            ('GARCH', [(1,1), (1,2), (2,1), (2,2)]),
            ('EGARCH', [(1,1), (1,2), (2,1)]),
            ('TGARCH', [(1,1), (1,2), (2,1)])
        ]
        
        # For each model, we use three different distributions: normal, t and ged
        distributions = ['normal', 't', 'ged']
        
        # We iterate over all the model configurations and distributions
        for model_type, param_combos in model_configs:
            for p, q in param_combos:
                for dist in distributions:
                    # Create the model specification for the current model configuration and distribution
                    model_spec = {
                        'model_type': model_type,
                        'p': p,
                        'q': q,
                        'distribution': dist,
                        'name': f"{model_type}({p},{q})_{dist}"
                    }
                    
                    # Set the volatility type for the current model configuration
                    if model_type == 'EGARCH':
                        model_spec['vol_type'] = 'EGARCH'
                    elif model_type == 'TGARCH':
                        model_spec['vol_type'] = 'GARCH'
                        model_spec['power'] = 2.0
                    else:
                        model_spec['vol_type'] = 'Garch'
                    
                    # Add the model specification to the list of model specifications
                    models.append(model_spec)
        
        return models

    def fit_garch_model(self, returns, model_spec, verbose=False) -> Optional[arch_model]:
        """
        Fit the GARCH model for a given stock and model specification.

        Args:
            returns: The returns of the stock
            model_spec: The model specification
            verbose: Whether to log the progress

        Returns:
            The fitted GARCH model
        """

        # Try to fit the GARCH model
        try:
            # If the model type is GARCH, we use the GARCH model
            if model_spec['model_type'] == 'GARCH':
                model = arch_model(
                    returns, 
                    vol='Garch', 
                    p=model_spec['p'], 
                    q=model_spec['q'], 
                    dist=model_spec['distribution']
                )
            # If the model type is EGARCH, we use the EGARCH model
            elif model_spec['model_type'] == 'EGARCH':
                model = arch_model(
                    returns, 
                    vol='EGARCH', 
                    p=model_spec['p'], 
                    q=model_spec['q'], 
                    dist=model_spec['distribution']
                )
            # If the model type is TGARCH, we use the TGARCH model
            elif model_spec['model_type'] == 'TGARCH':
                model = arch_model(
                    returns, 
                    vol='GARCH', 
                    p=model_spec['p'], 
                    q=model_spec['q'], 
                    power=model_spec.get('power', 2.0),
                    dist=model_spec['distribution']
                )
            
            # Fit the model
            result = model.fit(disp='off', show_warning=False)
            
            if verbose:
                logger.info(f"Successfully fitted {model_spec['name']}")
            
            return result
            
        except Exception as e:
            if verbose:
                logger.error(f"Failed to fit {model_spec['name']}: {str(e)}")
            return None

Value-at-Risk Calculation

Risk measurement using Value-at-Risk methodology

def calculate_var(self, returns_forecast, volatility_forecast, confidence_level=0.05, distribution='normal', dist_params=None) -> float:
        """
        Calculate Value at Risk using GARCH forecasts

        Args:
            returns_forecast: The forecasted returns
            volatility_forecast: The forecasted volatility
            confidence_level: The confidence level for the VaR
            distribution: The distribution of the returns
            dist_params: The distribution parameters

        Returns:
            The calculated VaR
        """

        # Calculate the VaR using the GARCH forecasts
        if distribution == 'normal':
            z_score = stats.norm.ppf(confidence_level)
        elif distribution == 't':
            if dist_params is not None and 'nu' in dist_params:
                nu = dist_params['nu']
                z_score = stats.t.ppf(confidence_level, df=nu)
            else:
                z_score = stats.norm.ppf(confidence_level)
        elif distribution == 'ged':
            if dist_params is not None and 'nu' in dist_params:
                nu = dist_params['nu']
                z_score = stats.gennorm.ppf(confidence_level, beta=nu)
            else:
                z_score = stats.norm.ppf(confidence_level)
        else:
            z_score = stats.norm.ppf(confidence_level)
        
        # Calculate the VaR using the GARCH forecasts and the calculated z-score
        var = -(returns_forecast + z_score * volatility_forecast)

        return var

Backtesting Framework

Rolling window backtesting for model validation

def rolling_var_backtest_single_model(self, returns, model_spec, min_train_window=500, rebalance_freq=REBALANCE_FREQUENCY) -> pd.DataFrame:
        """
        Compute the rolling window VaR backtest for a single model. After each rebalancing, we fit the GARCH model and compute the VaR.
        Then, the VaR is compared to the actual returns to compute the loss.

        Args:
            returns: The returns of the stock
            model_spec: The model specification
            min_train_window: The minimum training window
            rebalance_freq: The rebalancing frequency
        
        Returns:
            The results of the rolling window VaR backtest
        """

        # Get the number of observations and the initial training window
        n_obs = len(returns)
        initial_train_size = max(int(n_obs * self.train_ratio), min_train_window)
        
        # Initialize the list of results
        results = []
        
        # Iterate over the training window and rebalance the model
        for i in range(initial_train_size, n_obs, rebalance_freq):
            train_start = 0
            train_end = i
            train_returns = returns.iloc[train_start:train_end]
            
            try:
                # Fit the GARCH model
                garch_result = self.fit_garch_model(train_returns, model_spec, verbose=False)
                
                if garch_result is None:
                    continue
                
                # Compute the forecasted volatility and expected return
                forecast = garch_result.forecast(horizon=1)
                vol_forecast = float(np.sqrt(forecast.variance.iloc[-1, 0]))
                expected_return = float(train_returns.mean())
                
                # Compute the distribution parameters
                dist_params = {}
                if hasattr(garch_result, 'params'):
                    if 'nu' in garch_result.params.index:
                        dist_params['nu'] = garch_result.params['nu']
                
                # Compute the VaR forecast
                var_forecast = self.calculate_var(expected_return, vol_forecast, self.confidence_level, model_spec['distribution'], dist_params)
                
                # Compute the VaR backtest
                for j in range(min(rebalance_freq, n_obs - i)):
                    if i + j < n_obs:
                        # Compute the actual return
                        actual_return = float(returns.iloc[i + j])
                        actual_loss = -actual_return
                        # Compute the exceedance
                        exceedance = actual_loss > var_forecast
                        # Compute the asymmetric quantile loss
                        loss_asym = self.asymmetric_quantile_loss(actual_return, var_forecast, self.confidence_level)
                        # Compute the regulatory-style loss
                        loss_reg = self.regulatory_loss_function(actual_return, var_forecast, self.confidence_level)
                        
                        results.append({
                            'date': returns.index[i + j],
                            'var_forecast': var_forecast,
                            'actual_return': actual_return,
                            'exceedance': bool(exceedance),
                            'loss_asymmetric': loss_asym,
                            'loss_regulatory': loss_reg
                        })
            
            except Exception as e:
                # Log the error
                logger.error(f"Error fitting GARCH model: {e}")
                continue
        
        return pd.DataFrame(results)

Ensemble Weight Optimization

Multi-criteria optimization for model ensemble weights

def calculate_ensemble_weights_multicriteria(self, acceptable_models, alpha=0.4, beta=0.3, gamma=0.2, delta=0.1) -> Tuple[Dict, Dict]:
        """
        Calculate the ensemble weights for a given set of models using a multi-criteria approach.

        Args:
            acceptable_models (dict): Dictionary of acceptable models
            alpha (float): Weight for the statistical component
            beta (float): Weight for the regulatory component
            gamma (float): Weight for the coverage component
            delta (float): Weight for the asymmetric component

        Returns:
            Tuple[Dict, Dict]: Tuple containing the ensemble weights and the weight breakdown
        """

        # Initialize the lists to store the model data
        model_names = list(acceptable_models.keys())
        uc_pvalues = []
        cc_pvalues = []
        reg_losses = []
        exc_rates = []
        asym_losses = []
        
        # For each model, append the Christoffersen p-values, regulatory losses, exceedance rates, and asymmetric losses
        for model_name in model_names:
            model_data = acceptable_models[model_name]
            uc_pvalues.append(model_data['christoffersen']['p_value_uc'])
            cc_pvalues.append(model_data['christoffersen']['p_value_cc'])
            reg_losses.append(model_data['avg_loss_regulatory'])
            exc_rates.append(abs(model_data['exceedance_rate'] - self.confidence_level))
            asym_losses.append(model_data['avg_loss_asymmetric'])
        
        uc_pvalues = np.array(uc_pvalues)
        cc_pvalues = np.array(cc_pvalues)
        reg_losses = np.array(reg_losses)
        exc_rates = np.array(exc_rates)
        asym_losses = np.array(asym_losses)
        
        uc_pvalues = np.where(np.isnan(uc_pvalues), 1e-6, uc_pvalues)
        cc_pvalues = np.where(np.isnan(cc_pvalues), 1e-6, cc_pvalues)
        
        # Calculate component weights
        stat_scores = np.sqrt(np.maximum(uc_pvalues, 1e-6) * np.maximum(cc_pvalues, 1e-6))
        stat_weights = stat_scores / np.sum(stat_scores)
        
        reg_scores = 1.0 / (reg_losses + 1e-10)
        reg_weights = reg_scores / np.sum(reg_scores)
        
        coverage_scores = 1.0 / (exc_rates + 1e-10)
        coverage_weights = coverage_scores / np.sum(coverage_scores)
        
        asym_scores = 1.0 / (asym_losses + 1e-10)
        asym_weights = asym_scores / np.sum(asym_scores)
        
        # Combine weights
        final_weights = (alpha * stat_weights + beta * reg_weights + 
                        gamma * coverage_weights + delta * asym_weights)
        final_weights = final_weights / np.sum(final_weights)
        
        weight_breakdown = {
            'statistical': dict(zip(model_names, stat_weights)),
            'regulatory': dict(zip(model_names, reg_weights)),
            'coverage': dict(zip(model_names, coverage_weights)),
            'asymmetric': dict(zip(model_names, asym_weights)),
            'final': dict(zip(model_names, final_weights))
        }
        
        return dict(zip(model_names, final_weights)), weight_breakdown

Fit Ensemble Models

Fit ensemble models for univariate and multivariate data

def fit_ensemble_univariate_model(self, returns, ticker) -> Dict:
        """
        Fit ensemble of GARCH models for a single stock

        Args:
            returns (pd.Series): The returns of the stock
            ticker (str): The ticker symbol of the stock

        Returns:
            Dict: Dictionary containing the ensemble information
        """

        logger.info(f"Fitting ensemble models for {ticker}...")
        
        # Create all model specifications
        model_specs = self.create_garch_model_specifications()
        all_model_results = {}
                
        # Test each model specification with faster rebalancing for portfolio analysis
        for i, model_spec in enumerate(model_specs):
            try:
                model_results = self.rolling_var_backtest_single_model(
                    returns, model_spec, min_train_window=500, rebalance_freq=REBALANCE_FREQUENCY
                )
                
                if len(model_results) > 0:
                    exceedances = model_results['exceedance'].values
                    
                    total_obs = len(model_results)
                    total_exceedances = np.sum(exceedances)
                    exceedance_rate = total_exceedances / total_obs
                    
                    avg_loss_asym = model_results['loss_asymmetric'].mean()
                    avg_loss_reg = model_results['loss_regulatory'].mean()
                    
                    christoffersen_results = self.christoffersen_test(exceedances, self.confidence_level)
                    
                    all_model_results[model_spec['name']] = {
                        'model_spec': model_spec,
                        'results_df': model_results,
                        'total_obs': total_obs,
                        'total_exceedances': total_exceedances,
                        'exceedance_rate': exceedance_rate,
                        'avg_loss_asymmetric': avg_loss_asym,
                        'avg_loss_regulatory': avg_loss_reg,
                        'christoffersen': christoffersen_results
                    }
                    
                # Memory cleanup every 10 models
                if i % 10 == 0:
                    gc.collect()
                    
            except Exception as e:
                logger.error(f"Error fitting model {model_spec['name']}: {e}")
                continue
        
        logger.info(f"Successfully tested {len(all_model_results)} models for {ticker}")
        
        # Filter acceptable models
        acceptable_models = {}
        for model_name, results in all_model_results.items():
            uc_pvalue = results['christoffersen']['p_value_uc']
            cc_pvalue = results['christoffersen']['p_value_cc']
            
            # Only add the model if it passes the coverage tests with a significance level of 0.05
            if (not np.isnan(uc_pvalue) and not np.isnan(cc_pvalue) and 
                uc_pvalue > 0.05 and cc_pvalue > 0.05):
                acceptable_models[model_name] = results
        
        if len(acceptable_models) == 0:
            logger.warning(f"No models pass coverage tests for {ticker} - selecting best 5 models based on quality scores")
            # Calculate quality scores for all models to select the best ones
            best_models = self.select_best_models_by_quality(all_model_results, max_models=5)
            acceptable_models = best_models
        
        logger.info(f"Using {len(acceptable_models)} models for {ticker} ensemble")
        
        # Calculate ensemble weights
        weight_dict, weight_breakdown = self.calculate_ensemble_weights_multicriteria(acceptable_models)
        
        # Fit final models on full data
        final_fitted_models = {}
        for model_name in acceptable_models.keys():
            model_spec = acceptable_models[model_name]['model_spec']
            
            try:
                fitted_model = self.fit_garch_model(returns, model_spec, verbose=False)
                if fitted_model is not None:
                    final_fitted_models[model_name] = fitted_model
            except Exception as e:
                continue
        
        ensemble_info = {
            'acceptable_models': acceptable_models,
            'fitted_models': final_fitted_models,
            'weights': weight_dict,
            'weight_breakdown': weight_breakdown,
            'ticker': ticker,
            'validation_summary': {
                'total_models_tested': len(model_specs),
                'successful_models': len(all_model_results),
                'acceptable_models': len(acceptable_models),
                'final_fitted_models': len(final_fitted_models)
            }
        }
        
        return ensemble_info

    def fit_all_ensemble_models(self) -> None:
        """
        Fit ensemble models for all portfolio stocks
        """

        logger.info("Fitting ensemble models for all portfolio stocks...")
        
        for ticker in self.tickers:
            logger.info(f"Processing {ticker}...")
            
            ensemble_info = self.fit_ensemble_univariate_model(self.returns[ticker], ticker)
            self.ensemble_models[ticker] = ensemble_info
            
            # Get conditional volatility
            conditional_vol = self.get_ensemble_conditional_volatility(ensemble_info, self.returns[ticker])
            self.conditional_volatilities[ticker] = conditional_vol
        
        # Get standardized residuals for correlation estimation
        self.standardized_residuals = pd.DataFrame(index=self.returns.index)
        for ticker in self.tickers:
            std_resid = self.get_ensemble_standardized_residuals(self.ensemble_models[ticker], self.returns[ticker])
            self.standardized_residuals[ticker] = std_resid
        
        self.standardized_residuals = self.standardized_residuals.dropna()
        
        logger.info("Completed ensemble model fitting for all stocks")

Correlation Matrix Estimation and Forecasting

Correlation estimation and forecasting for the CCC model

def estimate_correlation_matrix(self) -> None:
        """
        Estimate constant conditional correlation matrix
        using the standardized residuals.
        """

        logger.info("Estimating correlation matrix...")
        
        if self.standardized_residuals is None:
            raise ValueError("Must fit ensemble models first")
        
        self.correlation_matrix = self.standardized_residuals.corr().values
        
        logger.info("Correlation Matrix:")
        corr_df = pd.DataFrame(self.correlation_matrix, columns=self.tickers, index=self.tickers)
        logger.info(f"\n{corr_df.round(4)}")

    def forecast_ensemble_volatility(self, ensemble_info, horizon=1) -> float:
        """
        Generate volatility forecast from ensemble

        Args:
            ensemble_info (dict): Dictionary containing the ensemble information
            returns (pd.Series): The returns of the stock
            horizon (int): The horizon for the forecast

        Returns:
            float: The weighted forecast
        """

        # Initialize the weighted forecast
        weighted_forecast = 0.0
        total_weight = 0.0
        
        # For each model, append the forecasted volatility
        for model_name, fitted_model in ensemble_info['fitted_models'].items():
            try:
                weight = ensemble_info['weights'][model_name]
                
                forecast = fitted_model.forecast(horizon=horizon)
                vol_forecast = np.sqrt(forecast.variance.iloc[-1, 0])
                
                weighted_forecast += weight * vol_forecast
                total_weight += weight
                
            except Exception as e:
                continue
        
        if total_weight > 0:
            weighted_forecast = weighted_forecast / total_weight
        
        return weighted_forecast

    def forecast_covariance_matrix(self, horizon=1) -> Tuple[np.ndarray, List[float]]:
        """
        Forecast covariance matrix using ensemble models

        Args:
            horizon (int): The horizon for the forecast

        Returns:
            Tuple[np.ndarray, List[float]]: The forecasted covariance matrix and the volatilities
        """
        logger.info(f"Forecasting covariance matrix for {horizon} period(s) ahead...")
        
        # Initialize the volatilities
        vol_forecasts = []

        # For each ticker, forecast the volatility
        for ticker in self.tickers:
            vol_forecast = self.forecast_ensemble_volatility(self.ensemble_models[ticker], horizon)
            vol_forecasts.append(vol_forecast)
            logger.info(f"{ticker} forecasted volatility: {vol_forecast:.4f}")
        
        # Construct covariance matrix
        vol_matrix = np.diag(vol_forecasts)
        forecast_cov = vol_matrix @ self.correlation_matrix @ vol_matrix
        
        return forecast_cov, vol_forecasts

Portfolio Optimization

Portfolio optimization using the CCC model

def optimize_portfolio(self) -> None:
        """
        Portfolio optimization using CCC ensemble model
        """

        logger.info("Optimizing portfolio...")
        
        # Get forecasted covariance matrix
        forecast_cov, vol_forecasts = self.forecast_covariance_matrix()
        
        logger.info("Forecasted volatilities:")
        for i, ticker in enumerate(self.tickers):
            logger.info(f"{ticker}: {vol_forecasts[i]:.4f}")
        print("\n")
        
        # Minimum variance portfolio
        n_assets = len(self.tickers)
        
        def portfolio_variance(weights, cov_matrix):
            return weights.T @ cov_matrix @ weights
        
        # Constraints and bounds
        constraints = {'type': 'eq', 'fun': lambda w: np.sum(w) - 1}
        bounds = [(0, 1) for _ in range(n_assets)]
        initial_weights = np.array([1/n_assets] * n_assets)
        
        # Optimize
        result = minimize(portfolio_variance, initial_weights,
                         args=(forecast_cov,), method='SLSQP',
                         bounds=bounds, constraints=constraints, options={'maxiter': 1000, 'ftol': 1e-10})
        
        if result.success:
            optimal_weights = result.x
            min_variance = result.fun
            
            logger.info("Portfolio optimization successful!")
            logger.info("Optimal weights:")
            for i, ticker in enumerate(self.tickers):
                logger.info(f"  {ticker}: {optimal_weights[i]:.4f}")
            logger.info(f"Portfolio Variance: {min_variance:.6f}")
            logger.info(f"Portfolio Volatility: {np.sqrt(min_variance):.4f}")
            
            # Store optimization results
            self.optimization_results = {
                'optimal_weights': optimal_weights,
                'portfolio_variance': min_variance,
                'portfolio_volatility': np.sqrt(min_variance),
                'forecasted_volatilities': vol_forecasts,
                'forecasted_covariance': forecast_cov,
                'current_weights': self.weights,
                'optimization_success': True
            }
            
        else:
            logger.error("Portfolio optimization failed!")
            logger.error(f"Reason: {result.message}")
            
            # Use current weights as fallback
            optimal_weights = np.array(self.weights)
            min_variance = portfolio_variance(optimal_weights, forecast_cov)
            
            self.optimization_results = {
                'optimal_weights': optimal_weights,
                'portfolio_variance': min_variance,
                'portfolio_volatility': np.sqrt(min_variance),
                'forecasted_volatilities': vol_forecasts,
                'forecasted_covariance': forecast_cov,
                'current_weights': self.weights,
                'optimization_success': False
            }
        
        return self.optimization_results

DCC Model Class

Dynamic Conditional Correlation (DCC) modeling framework

class PortfolioDCCEnsembleModel:
    """
    This class implements Dynamic Conditional Correlation (DCC) GARCH modeling for portfolios.
    Unlike CCC, DCC allows correlations to change over time, providing more flexible modeling
    of time-varying dependencies between assets.
    """
    
    def __init__(self, portfolio_data, start_date='2014-01-01', end_date=None, confidence_level=0.05, train_ratio=0.8) -> None:
        """
        Initialize the DCC portfolio modeling system
        
        Args:
            portfolio_data: List of (ticker, amount) tuples
            start_date: Start date for historical data
            end_date: End date for historical data (None = today)
            confidence_level: VaR confidence level
            train_ratio: Training/testing split ratio
        """

        # Initialize the portfolio data
        self.portfolio_data = portfolio_data
        self.start_date = start_date
        self.end_date = end_date or datetime.now().strftime('%Y-%m-%d')
        self.confidence_level = confidence_level
        self.train_ratio = train_ratio
        
        # Portfolio composition
        self.tickers = [ticker for ticker, amount in portfolio_data]
        self.amounts = [float(amount) for ticker, amount in portfolio_data]
        self.total_value = sum(self.amounts)
        self.weights = [amount / self.total_value for amount in self.amounts]
        
        # Data storage
        self.returns = None
        self.prices = None
        self.ensemble_models = {}
        self.conditional_volatilities = {}
        self.standardized_residuals = None
        
        # DCC-specific parameters
        self.dcc_params = None  # alpha, beta parameters
        self.dynamic_correlations = None  # Time-varying correlation matrices
        self.unconditional_correlation = None  # Long-run correlation matrix
        self.q_matrices = None  # DCC Q matrices
        
        # Results storage
        self.modeling_results = {}
        self.optimization_results = {}
        
        logger.info(f"Initialized DCC portfolio model with {len(self.tickers)} stocks")
        logger.info(f"Portfolio value: ${self.total_value:,.2f}")
        logger.info(f"Data period: {self.start_date} to {self.end_date}")

    # Inherit basic methods from CCC model

DCC Estimation and Forecasting

Time-varying correlation estimation using DCC methodology

def estimate_dcc_parameters(self, max_iter=1000, tolerance=1e-6) -> Dict:
        """
        Estimate DCC parameters using maximum likelihood estimation
        
        This implements the two-step DCC estimation procedure:
        1. Estimate univariate GARCH models (already done in ensemble fitting)
        2. Estimate DCC parameters (alpha, beta) using standardized residuals

        Args:
            max_iter (int): Maximum number of iterations
            tolerance (float): Tolerance for convergence

        Returns:
            Dict: Dictionary containing the DCC parameters
        """

        logger.info("Estimating DCC parameters...")
        
        if self.standardized_residuals is None:
            raise ValueError("Must fit ensemble models first")
        
        # Calculate unconditional correlation matrix
        self.unconditional_correlation = self.standardized_residuals.corr().values
        logger.info("Unconditional correlation matrix estimated")
        
        # Initial parameter values
        initial_params = [0.01, 0.95]  # alpha, beta
        
        # Bounds for parameters
        bounds = [(1e-6, 1-1e-6), (1e-6, 1-1e-6)]  # alpha, beta must be positive and sum < 1
        
        # Constraint: alpha + beta < 1
        def constraint_func(params):
            return 1 - (params[0] + params[1])
        
        constraint = {'type': 'ineq', 'fun': constraint_func}
        
        # Objective function: negative log-likelihood
        def negative_log_likelihood(params):
            try:
                alpha, beta = params
                return -self.dcc_log_likelihood(alpha, beta)
            except:
                return 1e10  # Return large value if calculation fails
        
        # Optimize
        result = minimize(negative_log_likelihood, initial_params, 
                         method='SLSQP', bounds=bounds, constraints=constraint,
                         options={'maxiter': max_iter, 'ftol': tolerance})
        
        if result.success:
            self.dcc_params = {'alpha': result.x[0], 'beta': result.x[1]}
            logger.info(f"DCC parameters estimated successfully:")
            logger.info(f"Alpha: {self.dcc_params['alpha']:.6f}")
            logger.info(f"Beta: {self.dcc_params['beta']:.6f}")
            logger.info(f"Alpha + Beta: {sum(result.x):.6f}")
            
            # Generate dynamic correlation matrices
            self.generate_dynamic_correlations()
            
        else:
            logger.error("DCC parameter estimation failed!")
            logger.error(f"Reason: {result.message}")
            # Use simple fallback
            self.dcc_params = {'alpha': 0.01, 'beta': 0.95}
            self.generate_dynamic_correlations()
        
        return self.dcc_params

    def dcc_log_likelihood(self, alpha, beta) -> float:
        """
        Calculate DCC log-likelihood for given parameters

        Args:
            alpha (float): Alpha parameter
            beta (float): Beta parameter

        Returns:
            float: The DCC log-likelihood
        """

        n_obs = len(self.standardized_residuals)
        
        # Initialize Q matrix with unconditional correlation
        Q_bar = self.unconditional_correlation.copy()
        Q_t = Q_bar.copy()
        
        log_likelihood = 0.0
        
        for t in range(n_obs):
            # Get standardized residuals for time t
            z_t = self.standardized_residuals.iloc[t].values.reshape(-1, 1)
            
            # Update Q matrix
            if t > 0:
                z_prev = self.standardized_residuals.iloc[t-1].values.reshape(-1, 1)
                Q_t = (1 - alpha - beta) * Q_bar + alpha * (z_prev @ z_prev.T) + beta * Q_t
            
            # Calculate correlation matrix
            D_t = np.sqrt(np.diag(Q_t))
            D_t_inv = np.diag(1.0 / D_t)
            R_t = D_t_inv @ Q_t @ D_t_inv
            
            # Ensure positive definiteness
            try:
                R_t_inv = np.linalg.inv(R_t)
                det_R_t = np.linalg.det(R_t)
                
                if det_R_t <= 0:
                    return -1e10
                
                # Add to log-likelihood
                log_likelihood += -0.5 * (np.log(det_R_t) + z_t.T @ R_t_inv @ z_t)
                
            except np.linalg.LinAlgError:
                return -1e10
        
        return float(log_likelihood)

    def generate_dynamic_correlations(self) -> None:
        """
        Generate time-varying correlation matrices using estimated DCC parameters
        """

        logger.info("Generating dynamic correlation matrices...")
        
        alpha = self.dcc_params['alpha']
        beta = self.dcc_params['beta']
        n_assets = len(self.tickers)
        n_obs = len(self.standardized_residuals)
        
        # Initialize storage
        self.q_matrices = np.zeros((n_obs, n_assets, n_assets))
        self.dynamic_correlations = np.zeros((n_obs, n_assets, n_assets))
        
        # Initialize Q matrix
        Q_bar = self.unconditional_correlation.copy()
        Q_t = Q_bar.copy()
        
        for t in range(n_obs):
            # Get standardized residuals for time t
            z_t = self.standardized_residuals.iloc[t].values.reshape(-1, 1)
            
            # Update Q matrix
            if t > 0:
                z_prev = self.standardized_residuals.iloc[t-1].values.reshape(-1, 1)
                Q_t = (1 - alpha - beta) * Q_bar + alpha * (z_prev @ z_prev.T) + beta * Q_t
            
            # Store Q matrix
            self.q_matrices[t] = Q_t.copy()
            
            # Calculate correlation matrix
            D_t = np.sqrt(np.diag(Q_t))
            D_t_inv = np.diag(1.0 / np.maximum(D_t, 1e-8))  # Avoid division by zero
            R_t = D_t_inv @ Q_t @ D_t_inv
            
            # Ensure correlation matrix properties
            np.fill_diagonal(R_t, 1.0)  # Diagonal elements = 1
            
            # Store correlation matrix
            self.dynamic_correlations[t] = R_t.copy()
        
        logger.info(f"Generated {n_obs} dynamic correlation matrices")
        
        # Log some statistics
        avg_corr = np.mean([self.dynamic_correlations[t][0, 1] for t in range(n_obs) if n_assets > 1])
        if n_assets > 1:
            logger.info(f"Average correlation (first pair): {avg_corr:.4f}")

    def forecast_dcc_correlation(self, horizon=1) -> np.ndarray:
        """
        Forecast correlation matrix using DCC model

        Args:
            horizon (int): The horizon for the forecast

        Returns:
            np.ndarray: The forecasted correlation matrix
        """

        if self.dynamic_correlations is None:
            raise ValueError("Must estimate DCC parameters first")
        
        alpha = self.dcc_params['alpha']
        beta = self.dcc_params['beta']
        
        # Use last Q matrix as starting point
        Q_t = self.q_matrices[-1].copy()
        
        # Get last standardized residuals
        z_last = self.standardized_residuals.iloc[-1].values.reshape(-1, 1)
        
        # Forecast Q matrix
        Q_forecast = (1 - alpha - beta) * self.unconditional_correlation + \
                    alpha * (z_last @ z_last.T) + beta * Q_t
        
        # For multi-step ahead, use persistence
        if horizon > 1:
            Q_forecast = (1 - beta**horizon) * self.unconditional_correlation + beta**horizon * Q_forecast
        
        # Convert to correlation matrix
        D_forecast = np.sqrt(np.diag(Q_forecast))
        D_forecast_inv = np.diag(1.0 / np.maximum(D_forecast, 1e-8))
        R_forecast = D_forecast_inv @ Q_forecast @ D_forecast_inv
        
        # Ensure correlation matrix properties
        np.fill_diagonal(R_forecast, 1.0)
        
        return R_forecast

Download Complete Source Code

Get the complete portfolio_modeling.py file containing all econometric models and analysis frameworks used in RiskOptimix.

Download portfolio_modeling.py

MIT License - Free for academic and commercial use

Contact

Get in touch with our portfolio analysis experts

Email

riskoptimix@gmail.com

Response Time

We typically respond within 24 hours

Support

Technical support and portfolio analysis questions