Source code for copul.copula_sampler

import inspect
import logging
import random
import warnings
from typing import Any, Callable, Tuple, Optional

import numpy as np
import scipy.optimize as opt
import sympy
from copul.checkerboard.checkerboarder import Checkerboarder
from copul.checkerboard.check_pi import CheckPi

# Set up module logger
log = logging.getLogger(__name__)


[docs] class CopulaSampler: """ Sampler for generating random variates from copula distributions. This class provides methods to sample from arbitrary copula distributions using conditional distribution methods and numerical root-finding. """ # Class-level error counter err_counter = 0 def __init__( self, copul: Any, precision: int = 3, random_state: Optional[int] = None ): """ Initialize a CopulaSampler instance. Args: copul: The copula object to sample from. precision: Precision level for numerical methods (default: 3). random_state: Random seed for reproducibility (default: None). """ self._copul = copul self._precision = precision self._random_state = random_state
[docs] def rvs(self, n: int = 1, approximate=False) -> np.ndarray: """ Sample n random variates from the copula. Args: n: Number of samples to generate (default: 1). approximate: Use approximate sampling method (default: True). Returns: np.ndarray: Array of shape (n, 2) containing the sampled (u, v) pairs. """ if not approximate and self._copul.dim > 2: raise ValueError( "Sampling from copula with dimension > 2 requires approximate=True" ) if approximate: grid_partitions = np.ceil(n ** (1 / self._copul.dim)).astype(int) checkerboarder = Checkerboarder(grid_partitions, dim=self._copul.dim) ccop = checkerboarder.get_checkerboard_copula(self._copul) return ccop.rvs(n) # Set random seed if specified if self._random_state is not None: random.seed(self._random_state) # Get the conditional distribution function cond_distr = self._copul.cond_distr_2 # Determine if conditional distribution has parameters in common with copula intervals sig = inspect.signature(cond_distr) params = set(sig.parameters.keys()) & set(self._copul.intervals) # Use direct function or create a lambda function based on symbolic expression if params or isinstance(self._copul, CheckPi): # Use the conditional distribution directly sampling_func = cond_distr else: # Get symbolic expression and convert to a callable function try: func_expr = cond_distr().func sampling_func = sympy.lambdify( self._copul.u_symbols, func_expr, ["numpy"] ) except Exception as e: log.error(f"Error creating lambda function: {e}") raise ValueError(f"Could not create sampling function: {e}") # Generate samples results = self._sample_val(sampling_func, n) return results
def _sample_val(self, function: Callable, n: int = 1) -> np.ndarray: """ Generate multiple samples using the provided function. Args: function: The sampling function. n: Number of samples to generate. Returns: np.ndarray: Array of shape (n, 2) containing the sampled (u, v) pairs. """ # Generate n independent samples samples = [self.sample_val(function) for _ in range(n)] # Convert to numpy array result = np.array(samples) # Log error counter if debugging is enabled log.debug(f"Error counter: {self.err_counter}") return result
[docs] def sample_val(self, function: Callable) -> Tuple[float, float]: """ Generate a single sample from the copula using inverse CDF method. Args: function: The conditional distribution function. Returns: Tuple[float, float]: A (u, v) pair from the copula distribution. """ # Generate uniform random variables v = random.uniform(0, 1) t = random.uniform(0, 1) # Create function to find root: F(u|v) = t def func2(u: float) -> float: try: return function(u, v) - t except Exception as e: # Handle potential errors in the function evaluation log.debug(f"Function evaluation error at u={u}, v={v}: {e}") # Return a large value to guide search away from problematic areas return 1000.0 * (0.5 - u) # Suppress numerical warnings with warnings.catch_warnings(): warnings.filterwarnings("ignore", category=RuntimeWarning) # Try to find the root using numerical methods try: # Use robust root-finding method result = opt.root_scalar( func2, x0=0.5, bracket=[0.000000001, 0.999999999], method="brentq", # Use Brent's method for reliable root finding ) except (ZeroDivisionError, ValueError, TypeError) as e: # Log the error and fall back to visual solution log.debug(f"{self._copul.__class__.__name__}; {type(e).__name__}: {e}") self.err_counter += 1 return self._get_visual_solution(func2), v # Check if root finding converged if not result.converged: if not result.iterations: log.warning(f"{self._copul.__class__.__name__}; {result}") self.err_counter += 1 return self._get_visual_solution(func2), v # Return the found root and the randomly generated v return result.root, v
def _get_visual_solution(self, func: Callable) -> float: """ Find an approximate root by grid search when numerical methods fail. Args: func: The function to find the root for. Returns: float: Approximate root where the function value is minimized. """ # Define search grid based on precision start = 10 ** (-self._precision) end = 1 - 10 ** (-self._precision) x = np.linspace(start, end, 10**self._precision) # Evaluate function on grid points y = np.zeros_like(x) for i, x_i in enumerate(x): try: y[i] = func(x_i) except Exception: # Assign large value on error y[i] = 1e10 # Find point where function is closest to zero min_idx = np.abs(y).argmin() return x[min_idx]