
Open Source Econometric Models.
Portfolio risk modeling using GARCH ensembles, CCC, and DCC methodologies - freely available for academic and commercial use
GARCH Modeling
Multiple GARCH specifications with ensemble optimization
Correlation Models
CCC and DCC frameworks for portfolio correlation
Risk Measurement
Value-at-Risk with strong backtesting
Complete Technical Documentation
The econometric models showcased below are documented in our technical report. This paper provides detailed mathematical derivations, model specifications, validation procedures, and empirical results for all GARCH ensemble, CCC, and DCC methodologies implemented in RiskOptimix.
- Mathematical foundations
- Model specifications
- Estimation procedures
- Backtesting methodologies
- Empirical validation
- Performance comparisons
PDF documentation
Table of Contents
- Introduction & Overview
- CCC Ensemble Model Class
- GARCH Model Specifications
- Value-at-Risk Calculation
- Backtesting Framework
- Ensemble Weight Optimization
- Fit Ensemble Models
- Correlation Matrix Estimation and Forecasting
- Portfolio Optimization
- DCC Model Class
- DCC Estimation and Forecasting
Source File:
portfolio_modeling.py
3923 lines
Introduction & Overview
Imports and global configuration for the portfolio modeling framework
# Import used libraries
import numpy as np
import pandas as pd
import yfinance as yf
from arch import arch_model
from scipy.optimize import minimize
from scipy import stats
import warnings
import json
from datetime import datetime
import logging
from typing import List, Dict, Tuple, Optional
import gc
import matplotlib.pyplot as plt
import os
# Filter out all warnings
warnings.filterwarnings('ignore')
# Set up logging, this is used to log the progress of the portfolio modeling
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Set the number of days between rebalancing the individual GARCH models
# This here set as a constant as this number is frequently changed when testing the models
REBALANCE_FREQUENCY = 50 # Number of days between rebalancing
CCC Ensemble Model Class
Core CCC (Constant Conditional Correlation) ensemble modeling class
class PortfolioCCCEnsembleModel:
"""
This class contains the CCC-GARCH ensemble model for portfolio analysis.
This class implements the complete modeling pipeline for user-submitted portfolios:
1. Data obtaining for portfolio stocks
2. Ensemble GARCH model fitting for each stock
3. CCC correlation matrix estimation
4. Portfolio optimization and risk assessment
5. Results generation
"""
def __init__(self, portfolio_data, start_date='2014-01-01', end_date=None, confidence_level=0.05, train_ratio=0.8):
"""
Initialize the portfolio modeling system
Args:
portfolio_data: List of (ticker, amount) tuples
start_date: Start date for historical data
end_date: End date for historical data (None = today)
confidence_level: VaR confidence level
train_ratio: Training/testing split ratio
"""
# Set the initial parameters for the portfolio modeling
self.portfolio_data = portfolio_data
self.start_date = start_date
self.end_date = end_date or datetime.now().strftime('%Y-%m-%d')
self.confidence_level = confidence_level
self.train_ratio = train_ratio
# Portfolio composition
self.tickers = [ticker for ticker, amount in portfolio_data]
self.amounts = [float(amount) for ticker, amount in portfolio_data]
self.total_value = sum(self.amounts)
self.weights = [amount / self.total_value for amount in self.amounts]
# Data storage
self.returns = None
self.prices = None
self.ensemble_models = {}
self.conditional_volatilities = {}
self.correlation_matrix = None
self.standardized_residuals = None
# Results storage
self.modeling_results = {}
self.optimization_results = {}
logger.info(f"Initialized portfolio model with {len(self.tickers)} stocks")
logger.info(f"Portfolio value: ${self.total_value:,.2f}")
logger.info(f"Data period: {self.start_date} to {self.end_date}")
GARCH Model Specifications
Definition of GARCH model variants for ensemble modeling
def create_garch_model_specifications(self) -> List[Dict]:
"""
Create the GARCH model specifications for the individual stocks in the portfolio.
Returns:
List of GARCH model specifications
"""
# Initialize the list of GARCH model specifications
models = []
# Define the GARCH model configurations
# In the ensemble, we use GARCH, EGARCH and TARCH models with different parameters
model_configs = [
('GARCH', [(1,1), (1,2), (2,1), (2,2)]),
('EGARCH', [(1,1), (1,2), (2,1)]),
('TGARCH', [(1,1), (1,2), (2,1)])
]
# For each model, we use three different distributions: normal, t and ged
distributions = ['normal', 't', 'ged']
# We iterate over all the model configurations and distributions
for model_type, param_combos in model_configs:
for p, q in param_combos:
for dist in distributions:
# Create the model specification for the current model configuration and distribution
model_spec = {
'model_type': model_type,
'p': p,
'q': q,
'distribution': dist,
'name': f"{model_type}({p},{q})_{dist}"
}
# Set the volatility type for the current model configuration
if model_type == 'EGARCH':
model_spec['vol_type'] = 'EGARCH'
elif model_type == 'TGARCH':
model_spec['vol_type'] = 'GARCH'
model_spec['power'] = 2.0
else:
model_spec['vol_type'] = 'Garch'
# Add the model specification to the list of model specifications
models.append(model_spec)
return models
def fit_garch_model(self, returns, model_spec, verbose=False) -> Optional[arch_model]:
"""
Fit the GARCH model for a given stock and model specification.
Args:
returns: The returns of the stock
model_spec: The model specification
verbose: Whether to log the progress
Returns:
The fitted GARCH model
"""
# Try to fit the GARCH model
try:
# If the model type is GARCH, we use the GARCH model
if model_spec['model_type'] == 'GARCH':
model = arch_model(
returns,
vol='Garch',
p=model_spec['p'],
q=model_spec['q'],
dist=model_spec['distribution']
)
# If the model type is EGARCH, we use the EGARCH model
elif model_spec['model_type'] == 'EGARCH':
model = arch_model(
returns,
vol='EGARCH',
p=model_spec['p'],
q=model_spec['q'],
dist=model_spec['distribution']
)
# If the model type is TGARCH, we use the TGARCH model
elif model_spec['model_type'] == 'TGARCH':
model = arch_model(
returns,
vol='GARCH',
p=model_spec['p'],
q=model_spec['q'],
power=model_spec.get('power', 2.0),
dist=model_spec['distribution']
)
# Fit the model
result = model.fit(disp='off', show_warning=False)
if verbose:
logger.info(f"Successfully fitted {model_spec['name']}")
return result
except Exception as e:
if verbose:
logger.error(f"Failed to fit {model_spec['name']}: {str(e)}")
return None
Value-at-Risk Calculation
Risk measurement using Value-at-Risk methodology
def calculate_var(self, returns_forecast, volatility_forecast, confidence_level=0.05, distribution='normal', dist_params=None) -> float:
"""
Calculate Value at Risk using GARCH forecasts
Args:
returns_forecast: The forecasted returns
volatility_forecast: The forecasted volatility
confidence_level: The confidence level for the VaR
distribution: The distribution of the returns
dist_params: The distribution parameters
Returns:
The calculated VaR
"""
# Calculate the VaR using the GARCH forecasts
if distribution == 'normal':
z_score = stats.norm.ppf(confidence_level)
elif distribution == 't':
if dist_params is not None and 'nu' in dist_params:
nu = dist_params['nu']
z_score = stats.t.ppf(confidence_level, df=nu)
else:
z_score = stats.norm.ppf(confidence_level)
elif distribution == 'ged':
if dist_params is not None and 'nu' in dist_params:
nu = dist_params['nu']
z_score = stats.gennorm.ppf(confidence_level, beta=nu)
else:
z_score = stats.norm.ppf(confidence_level)
else:
z_score = stats.norm.ppf(confidence_level)
# Calculate the VaR using the GARCH forecasts and the calculated z-score
var = -(returns_forecast + z_score * volatility_forecast)
return var
Backtesting Framework
Rolling window backtesting for model validation
def rolling_var_backtest_single_model(self, returns, model_spec, min_train_window=500, rebalance_freq=REBALANCE_FREQUENCY) -> pd.DataFrame:
"""
Compute the rolling window VaR backtest for a single model. After each rebalancing, we fit the GARCH model and compute the VaR.
Then, the VaR is compared to the actual returns to compute the loss.
Args:
returns: The returns of the stock
model_spec: The model specification
min_train_window: The minimum training window
rebalance_freq: The rebalancing frequency
Returns:
The results of the rolling window VaR backtest
"""
# Get the number of observations and the initial training window
n_obs = len(returns)
initial_train_size = max(int(n_obs * self.train_ratio), min_train_window)
# Initialize the list of results
results = []
# Iterate over the training window and rebalance the model
for i in range(initial_train_size, n_obs, rebalance_freq):
train_start = 0
train_end = i
train_returns = returns.iloc[train_start:train_end]
try:
# Fit the GARCH model
garch_result = self.fit_garch_model(train_returns, model_spec, verbose=False)
if garch_result is None:
continue
# Compute the forecasted volatility and expected return
forecast = garch_result.forecast(horizon=1)
vol_forecast = float(np.sqrt(forecast.variance.iloc[-1, 0]))
expected_return = float(train_returns.mean())
# Compute the distribution parameters
dist_params = {}
if hasattr(garch_result, 'params'):
if 'nu' in garch_result.params.index:
dist_params['nu'] = garch_result.params['nu']
# Compute the VaR forecast
var_forecast = self.calculate_var(expected_return, vol_forecast, self.confidence_level, model_spec['distribution'], dist_params)
# Compute the VaR backtest
for j in range(min(rebalance_freq, n_obs - i)):
if i + j < n_obs:
# Compute the actual return
actual_return = float(returns.iloc[i + j])
actual_loss = -actual_return
# Compute the exceedance
exceedance = actual_loss > var_forecast
# Compute the asymmetric quantile loss
loss_asym = self.asymmetric_quantile_loss(actual_return, var_forecast, self.confidence_level)
# Compute the regulatory-style loss
loss_reg = self.regulatory_loss_function(actual_return, var_forecast, self.confidence_level)
results.append({
'date': returns.index[i + j],
'var_forecast': var_forecast,
'actual_return': actual_return,
'exceedance': bool(exceedance),
'loss_asymmetric': loss_asym,
'loss_regulatory': loss_reg
})
except Exception as e:
# Log the error
logger.error(f"Error fitting GARCH model: {e}")
continue
return pd.DataFrame(results)
Ensemble Weight Optimization
Multi-criteria optimization for model ensemble weights
def calculate_ensemble_weights_multicriteria(self, acceptable_models, alpha=0.4, beta=0.3, gamma=0.2, delta=0.1) -> Tuple[Dict, Dict]:
"""
Calculate the ensemble weights for a given set of models using a multi-criteria approach.
Args:
acceptable_models (dict): Dictionary of acceptable models
alpha (float): Weight for the statistical component
beta (float): Weight for the regulatory component
gamma (float): Weight for the coverage component
delta (float): Weight for the asymmetric component
Returns:
Tuple[Dict, Dict]: Tuple containing the ensemble weights and the weight breakdown
"""
# Initialize the lists to store the model data
model_names = list(acceptable_models.keys())
uc_pvalues = []
cc_pvalues = []
reg_losses = []
exc_rates = []
asym_losses = []
# For each model, append the Christoffersen p-values, regulatory losses, exceedance rates, and asymmetric losses
for model_name in model_names:
model_data = acceptable_models[model_name]
uc_pvalues.append(model_data['christoffersen']['p_value_uc'])
cc_pvalues.append(model_data['christoffersen']['p_value_cc'])
reg_losses.append(model_data['avg_loss_regulatory'])
exc_rates.append(abs(model_data['exceedance_rate'] - self.confidence_level))
asym_losses.append(model_data['avg_loss_asymmetric'])
uc_pvalues = np.array(uc_pvalues)
cc_pvalues = np.array(cc_pvalues)
reg_losses = np.array(reg_losses)
exc_rates = np.array(exc_rates)
asym_losses = np.array(asym_losses)
uc_pvalues = np.where(np.isnan(uc_pvalues), 1e-6, uc_pvalues)
cc_pvalues = np.where(np.isnan(cc_pvalues), 1e-6, cc_pvalues)
# Calculate component weights
stat_scores = np.sqrt(np.maximum(uc_pvalues, 1e-6) * np.maximum(cc_pvalues, 1e-6))
stat_weights = stat_scores / np.sum(stat_scores)
reg_scores = 1.0 / (reg_losses + 1e-10)
reg_weights = reg_scores / np.sum(reg_scores)
coverage_scores = 1.0 / (exc_rates + 1e-10)
coverage_weights = coverage_scores / np.sum(coverage_scores)
asym_scores = 1.0 / (asym_losses + 1e-10)
asym_weights = asym_scores / np.sum(asym_scores)
# Combine weights
final_weights = (alpha * stat_weights + beta * reg_weights +
gamma * coverage_weights + delta * asym_weights)
final_weights = final_weights / np.sum(final_weights)
weight_breakdown = {
'statistical': dict(zip(model_names, stat_weights)),
'regulatory': dict(zip(model_names, reg_weights)),
'coverage': dict(zip(model_names, coverage_weights)),
'asymmetric': dict(zip(model_names, asym_weights)),
'final': dict(zip(model_names, final_weights))
}
return dict(zip(model_names, final_weights)), weight_breakdown
Fit Ensemble Models
Fit ensemble models for univariate and multivariate data
def fit_ensemble_univariate_model(self, returns, ticker) -> Dict:
"""
Fit ensemble of GARCH models for a single stock
Args:
returns (pd.Series): The returns of the stock
ticker (str): The ticker symbol of the stock
Returns:
Dict: Dictionary containing the ensemble information
"""
logger.info(f"Fitting ensemble models for {ticker}...")
# Create all model specifications
model_specs = self.create_garch_model_specifications()
all_model_results = {}
# Test each model specification with faster rebalancing for portfolio analysis
for i, model_spec in enumerate(model_specs):
try:
model_results = self.rolling_var_backtest_single_model(
returns, model_spec, min_train_window=500, rebalance_freq=REBALANCE_FREQUENCY
)
if len(model_results) > 0:
exceedances = model_results['exceedance'].values
total_obs = len(model_results)
total_exceedances = np.sum(exceedances)
exceedance_rate = total_exceedances / total_obs
avg_loss_asym = model_results['loss_asymmetric'].mean()
avg_loss_reg = model_results['loss_regulatory'].mean()
christoffersen_results = self.christoffersen_test(exceedances, self.confidence_level)
all_model_results[model_spec['name']] = {
'model_spec': model_spec,
'results_df': model_results,
'total_obs': total_obs,
'total_exceedances': total_exceedances,
'exceedance_rate': exceedance_rate,
'avg_loss_asymmetric': avg_loss_asym,
'avg_loss_regulatory': avg_loss_reg,
'christoffersen': christoffersen_results
}
# Memory cleanup every 10 models
if i % 10 == 0:
gc.collect()
except Exception as e:
logger.error(f"Error fitting model {model_spec['name']}: {e}")
continue
logger.info(f"Successfully tested {len(all_model_results)} models for {ticker}")
# Filter acceptable models
acceptable_models = {}
for model_name, results in all_model_results.items():
uc_pvalue = results['christoffersen']['p_value_uc']
cc_pvalue = results['christoffersen']['p_value_cc']
# Only add the model if it passes the coverage tests with a significance level of 0.05
if (not np.isnan(uc_pvalue) and not np.isnan(cc_pvalue) and
uc_pvalue > 0.05 and cc_pvalue > 0.05):
acceptable_models[model_name] = results
if len(acceptable_models) == 0:
logger.warning(f"No models pass coverage tests for {ticker} - selecting best 5 models based on quality scores")
# Calculate quality scores for all models to select the best ones
best_models = self.select_best_models_by_quality(all_model_results, max_models=5)
acceptable_models = best_models
logger.info(f"Using {len(acceptable_models)} models for {ticker} ensemble")
# Calculate ensemble weights
weight_dict, weight_breakdown = self.calculate_ensemble_weights_multicriteria(acceptable_models)
# Fit final models on full data
final_fitted_models = {}
for model_name in acceptable_models.keys():
model_spec = acceptable_models[model_name]['model_spec']
try:
fitted_model = self.fit_garch_model(returns, model_spec, verbose=False)
if fitted_model is not None:
final_fitted_models[model_name] = fitted_model
except Exception as e:
continue
ensemble_info = {
'acceptable_models': acceptable_models,
'fitted_models': final_fitted_models,
'weights': weight_dict,
'weight_breakdown': weight_breakdown,
'ticker': ticker,
'validation_summary': {
'total_models_tested': len(model_specs),
'successful_models': len(all_model_results),
'acceptable_models': len(acceptable_models),
'final_fitted_models': len(final_fitted_models)
}
}
return ensemble_info
def fit_all_ensemble_models(self) -> None:
"""
Fit ensemble models for all portfolio stocks
"""
logger.info("Fitting ensemble models for all portfolio stocks...")
for ticker in self.tickers:
logger.info(f"Processing {ticker}...")
ensemble_info = self.fit_ensemble_univariate_model(self.returns[ticker], ticker)
self.ensemble_models[ticker] = ensemble_info
# Get conditional volatility
conditional_vol = self.get_ensemble_conditional_volatility(ensemble_info, self.returns[ticker])
self.conditional_volatilities[ticker] = conditional_vol
# Get standardized residuals for correlation estimation
self.standardized_residuals = pd.DataFrame(index=self.returns.index)
for ticker in self.tickers:
std_resid = self.get_ensemble_standardized_residuals(self.ensemble_models[ticker], self.returns[ticker])
self.standardized_residuals[ticker] = std_resid
self.standardized_residuals = self.standardized_residuals.dropna()
logger.info("Completed ensemble model fitting for all stocks")
Correlation Matrix Estimation and Forecasting
Correlation estimation and forecasting for the CCC model
def estimate_correlation_matrix(self) -> None:
"""
Estimate constant conditional correlation matrix
using the standardized residuals.
"""
logger.info("Estimating correlation matrix...")
if self.standardized_residuals is None:
raise ValueError("Must fit ensemble models first")
self.correlation_matrix = self.standardized_residuals.corr().values
logger.info("Correlation Matrix:")
corr_df = pd.DataFrame(self.correlation_matrix, columns=self.tickers, index=self.tickers)
logger.info(f"\n{corr_df.round(4)}")
def forecast_ensemble_volatility(self, ensemble_info, horizon=1) -> float:
"""
Generate volatility forecast from ensemble
Args:
ensemble_info (dict): Dictionary containing the ensemble information
returns (pd.Series): The returns of the stock
horizon (int): The horizon for the forecast
Returns:
float: The weighted forecast
"""
# Initialize the weighted forecast
weighted_forecast = 0.0
total_weight = 0.0
# For each model, append the forecasted volatility
for model_name, fitted_model in ensemble_info['fitted_models'].items():
try:
weight = ensemble_info['weights'][model_name]
forecast = fitted_model.forecast(horizon=horizon)
vol_forecast = np.sqrt(forecast.variance.iloc[-1, 0])
weighted_forecast += weight * vol_forecast
total_weight += weight
except Exception as e:
continue
if total_weight > 0:
weighted_forecast = weighted_forecast / total_weight
return weighted_forecast
def forecast_covariance_matrix(self, horizon=1) -> Tuple[np.ndarray, List[float]]:
"""
Forecast covariance matrix using ensemble models
Args:
horizon (int): The horizon for the forecast
Returns:
Tuple[np.ndarray, List[float]]: The forecasted covariance matrix and the volatilities
"""
logger.info(f"Forecasting covariance matrix for {horizon} period(s) ahead...")
# Initialize the volatilities
vol_forecasts = []
# For each ticker, forecast the volatility
for ticker in self.tickers:
vol_forecast = self.forecast_ensemble_volatility(self.ensemble_models[ticker], horizon)
vol_forecasts.append(vol_forecast)
logger.info(f"{ticker} forecasted volatility: {vol_forecast:.4f}")
# Construct covariance matrix
vol_matrix = np.diag(vol_forecasts)
forecast_cov = vol_matrix @ self.correlation_matrix @ vol_matrix
return forecast_cov, vol_forecasts
Portfolio Optimization
Portfolio optimization using the CCC model
def optimize_portfolio(self) -> None:
"""
Portfolio optimization using CCC ensemble model
"""
logger.info("Optimizing portfolio...")
# Get forecasted covariance matrix
forecast_cov, vol_forecasts = self.forecast_covariance_matrix()
logger.info("Forecasted volatilities:")
for i, ticker in enumerate(self.tickers):
logger.info(f"{ticker}: {vol_forecasts[i]:.4f}")
print("\n")
# Minimum variance portfolio
n_assets = len(self.tickers)
def portfolio_variance(weights, cov_matrix):
return weights.T @ cov_matrix @ weights
# Constraints and bounds
constraints = {'type': 'eq', 'fun': lambda w: np.sum(w) - 1}
bounds = [(0, 1) for _ in range(n_assets)]
initial_weights = np.array([1/n_assets] * n_assets)
# Optimize
result = minimize(portfolio_variance, initial_weights,
args=(forecast_cov,), method='SLSQP',
bounds=bounds, constraints=constraints, options={'maxiter': 1000, 'ftol': 1e-10})
if result.success:
optimal_weights = result.x
min_variance = result.fun
logger.info("Portfolio optimization successful!")
logger.info("Optimal weights:")
for i, ticker in enumerate(self.tickers):
logger.info(f" {ticker}: {optimal_weights[i]:.4f}")
logger.info(f"Portfolio Variance: {min_variance:.6f}")
logger.info(f"Portfolio Volatility: {np.sqrt(min_variance):.4f}")
# Store optimization results
self.optimization_results = {
'optimal_weights': optimal_weights,
'portfolio_variance': min_variance,
'portfolio_volatility': np.sqrt(min_variance),
'forecasted_volatilities': vol_forecasts,
'forecasted_covariance': forecast_cov,
'current_weights': self.weights,
'optimization_success': True
}
else:
logger.error("Portfolio optimization failed!")
logger.error(f"Reason: {result.message}")
# Use current weights as fallback
optimal_weights = np.array(self.weights)
min_variance = portfolio_variance(optimal_weights, forecast_cov)
self.optimization_results = {
'optimal_weights': optimal_weights,
'portfolio_variance': min_variance,
'portfolio_volatility': np.sqrt(min_variance),
'forecasted_volatilities': vol_forecasts,
'forecasted_covariance': forecast_cov,
'current_weights': self.weights,
'optimization_success': False
}
return self.optimization_results
DCC Model Class
Dynamic Conditional Correlation (DCC) modeling framework
class PortfolioDCCEnsembleModel:
"""
This class implements Dynamic Conditional Correlation (DCC) GARCH modeling for portfolios.
Unlike CCC, DCC allows correlations to change over time, providing more flexible modeling
of time-varying dependencies between assets.
"""
def __init__(self, portfolio_data, start_date='2014-01-01', end_date=None, confidence_level=0.05, train_ratio=0.8) -> None:
"""
Initialize the DCC portfolio modeling system
Args:
portfolio_data: List of (ticker, amount) tuples
start_date: Start date for historical data
end_date: End date for historical data (None = today)
confidence_level: VaR confidence level
train_ratio: Training/testing split ratio
"""
# Initialize the portfolio data
self.portfolio_data = portfolio_data
self.start_date = start_date
self.end_date = end_date or datetime.now().strftime('%Y-%m-%d')
self.confidence_level = confidence_level
self.train_ratio = train_ratio
# Portfolio composition
self.tickers = [ticker for ticker, amount in portfolio_data]
self.amounts = [float(amount) for ticker, amount in portfolio_data]
self.total_value = sum(self.amounts)
self.weights = [amount / self.total_value for amount in self.amounts]
# Data storage
self.returns = None
self.prices = None
self.ensemble_models = {}
self.conditional_volatilities = {}
self.standardized_residuals = None
# DCC-specific parameters
self.dcc_params = None # alpha, beta parameters
self.dynamic_correlations = None # Time-varying correlation matrices
self.unconditional_correlation = None # Long-run correlation matrix
self.q_matrices = None # DCC Q matrices
# Results storage
self.modeling_results = {}
self.optimization_results = {}
logger.info(f"Initialized DCC portfolio model with {len(self.tickers)} stocks")
logger.info(f"Portfolio value: ${self.total_value:,.2f}")
logger.info(f"Data period: {self.start_date} to {self.end_date}")
# Inherit basic methods from CCC model
DCC Estimation and Forecasting
Time-varying correlation estimation using DCC methodology
def estimate_dcc_parameters(self, max_iter=1000, tolerance=1e-6) -> Dict:
"""
Estimate DCC parameters using maximum likelihood estimation
This implements the two-step DCC estimation procedure:
1. Estimate univariate GARCH models (already done in ensemble fitting)
2. Estimate DCC parameters (alpha, beta) using standardized residuals
Args:
max_iter (int): Maximum number of iterations
tolerance (float): Tolerance for convergence
Returns:
Dict: Dictionary containing the DCC parameters
"""
logger.info("Estimating DCC parameters...")
if self.standardized_residuals is None:
raise ValueError("Must fit ensemble models first")
# Calculate unconditional correlation matrix
self.unconditional_correlation = self.standardized_residuals.corr().values
logger.info("Unconditional correlation matrix estimated")
# Initial parameter values
initial_params = [0.01, 0.95] # alpha, beta
# Bounds for parameters
bounds = [(1e-6, 1-1e-6), (1e-6, 1-1e-6)] # alpha, beta must be positive and sum < 1
# Constraint: alpha + beta < 1
def constraint_func(params):
return 1 - (params[0] + params[1])
constraint = {'type': 'ineq', 'fun': constraint_func}
# Objective function: negative log-likelihood
def negative_log_likelihood(params):
try:
alpha, beta = params
return -self.dcc_log_likelihood(alpha, beta)
except:
return 1e10 # Return large value if calculation fails
# Optimize
result = minimize(negative_log_likelihood, initial_params,
method='SLSQP', bounds=bounds, constraints=constraint,
options={'maxiter': max_iter, 'ftol': tolerance})
if result.success:
self.dcc_params = {'alpha': result.x[0], 'beta': result.x[1]}
logger.info(f"DCC parameters estimated successfully:")
logger.info(f"Alpha: {self.dcc_params['alpha']:.6f}")
logger.info(f"Beta: {self.dcc_params['beta']:.6f}")
logger.info(f"Alpha + Beta: {sum(result.x):.6f}")
# Generate dynamic correlation matrices
self.generate_dynamic_correlations()
else:
logger.error("DCC parameter estimation failed!")
logger.error(f"Reason: {result.message}")
# Use simple fallback
self.dcc_params = {'alpha': 0.01, 'beta': 0.95}
self.generate_dynamic_correlations()
return self.dcc_params
def dcc_log_likelihood(self, alpha, beta) -> float:
"""
Calculate DCC log-likelihood for given parameters
Args:
alpha (float): Alpha parameter
beta (float): Beta parameter
Returns:
float: The DCC log-likelihood
"""
n_obs = len(self.standardized_residuals)
# Initialize Q matrix with unconditional correlation
Q_bar = self.unconditional_correlation.copy()
Q_t = Q_bar.copy()
log_likelihood = 0.0
for t in range(n_obs):
# Get standardized residuals for time t
z_t = self.standardized_residuals.iloc[t].values.reshape(-1, 1)
# Update Q matrix
if t > 0:
z_prev = self.standardized_residuals.iloc[t-1].values.reshape(-1, 1)
Q_t = (1 - alpha - beta) * Q_bar + alpha * (z_prev @ z_prev.T) + beta * Q_t
# Calculate correlation matrix
D_t = np.sqrt(np.diag(Q_t))
D_t_inv = np.diag(1.0 / D_t)
R_t = D_t_inv @ Q_t @ D_t_inv
# Ensure positive definiteness
try:
R_t_inv = np.linalg.inv(R_t)
det_R_t = np.linalg.det(R_t)
if det_R_t <= 0:
return -1e10
# Add to log-likelihood
log_likelihood += -0.5 * (np.log(det_R_t) + z_t.T @ R_t_inv @ z_t)
except np.linalg.LinAlgError:
return -1e10
return float(log_likelihood)
def generate_dynamic_correlations(self) -> None:
"""
Generate time-varying correlation matrices using estimated DCC parameters
"""
logger.info("Generating dynamic correlation matrices...")
alpha = self.dcc_params['alpha']
beta = self.dcc_params['beta']
n_assets = len(self.tickers)
n_obs = len(self.standardized_residuals)
# Initialize storage
self.q_matrices = np.zeros((n_obs, n_assets, n_assets))
self.dynamic_correlations = np.zeros((n_obs, n_assets, n_assets))
# Initialize Q matrix
Q_bar = self.unconditional_correlation.copy()
Q_t = Q_bar.copy()
for t in range(n_obs):
# Get standardized residuals for time t
z_t = self.standardized_residuals.iloc[t].values.reshape(-1, 1)
# Update Q matrix
if t > 0:
z_prev = self.standardized_residuals.iloc[t-1].values.reshape(-1, 1)
Q_t = (1 - alpha - beta) * Q_bar + alpha * (z_prev @ z_prev.T) + beta * Q_t
# Store Q matrix
self.q_matrices[t] = Q_t.copy()
# Calculate correlation matrix
D_t = np.sqrt(np.diag(Q_t))
D_t_inv = np.diag(1.0 / np.maximum(D_t, 1e-8)) # Avoid division by zero
R_t = D_t_inv @ Q_t @ D_t_inv
# Ensure correlation matrix properties
np.fill_diagonal(R_t, 1.0) # Diagonal elements = 1
# Store correlation matrix
self.dynamic_correlations[t] = R_t.copy()
logger.info(f"Generated {n_obs} dynamic correlation matrices")
# Log some statistics
avg_corr = np.mean([self.dynamic_correlations[t][0, 1] for t in range(n_obs) if n_assets > 1])
if n_assets > 1:
logger.info(f"Average correlation (first pair): {avg_corr:.4f}")
def forecast_dcc_correlation(self, horizon=1) -> np.ndarray:
"""
Forecast correlation matrix using DCC model
Args:
horizon (int): The horizon for the forecast
Returns:
np.ndarray: The forecasted correlation matrix
"""
if self.dynamic_correlations is None:
raise ValueError("Must estimate DCC parameters first")
alpha = self.dcc_params['alpha']
beta = self.dcc_params['beta']
# Use last Q matrix as starting point
Q_t = self.q_matrices[-1].copy()
# Get last standardized residuals
z_last = self.standardized_residuals.iloc[-1].values.reshape(-1, 1)
# Forecast Q matrix
Q_forecast = (1 - alpha - beta) * self.unconditional_correlation + \
alpha * (z_last @ z_last.T) + beta * Q_t
# For multi-step ahead, use persistence
if horizon > 1:
Q_forecast = (1 - beta**horizon) * self.unconditional_correlation + beta**horizon * Q_forecast
# Convert to correlation matrix
D_forecast = np.sqrt(np.diag(Q_forecast))
D_forecast_inv = np.diag(1.0 / np.maximum(D_forecast, 1e-8))
R_forecast = D_forecast_inv @ Q_forecast @ D_forecast_inv
# Ensure correlation matrix properties
np.fill_diagonal(R_forecast, 1.0)
return R_forecast
Download Complete Source Code
Get the complete portfolio_modeling.py
file containing all econometric models
and analysis frameworks used in RiskOptimix.
MIT License - Free for academic and commercial use
Contact
Get in touch with our portfolio analysis experts
riskoptimix@gmail.com
Response Time
We typically respond within 24 hours
Support
Technical support and portfolio analysis questions