Source code for copul.family.tp2_verifier

import itertools
import logging
from typing import Any, Dict, List, Optional
from dataclasses import dataclass

import numpy as np
import sympy
from sympy.core.expr import Expr
from sympy.core.symbol import Symbol
from sympy.logic.boolalg import BooleanFalse, BooleanTrue
from sympy.utilities.exceptions import SymPyDeprecationWarning
import warnings

# Set up logger
log = logging.getLogger(__name__)


[docs] @dataclass class VerificationResult: """ Class to represent TP2 verification results. """ is_tp2: bool violations: List[Dict[str, float]] tested_params: List[Dict[str, float]]
[docs] class TP2Verifier: """ Class for verifying if a copula satisfies the TP2 property. The TP2 (totally positive of order 2) property is an important mathematical property for copulas, indicating that the copula's density satisfies certain log-supermodularity conditions. """ def __init__( self, range_min: Optional[float] = None, range_max: Optional[float] = None ): """ Initialize a TP2Verifier. Args: range_min: Minimum value for parameter range (default: None, uses copula's lower bound) range_max: Maximum value for parameter range (default: None, uses copula's upper bound) """ self.range_min = range_min self.range_max = range_max
[docs] def is_tp2(self, copula: Any) -> bool: """ Check if a copula satisfies the TP2 property. Args: copula: Copula instance or class to check Returns: True if the copula is TP2, False otherwise """ result = self.verify_tp2(copula) return result.is_tp2
[docs] def verify_tp2(self, copula: Any) -> VerificationResult: """ Verify the TP2 property for a copula and return detailed results. Args: copula: Copula class or factory to check Returns: VerificationResult object containing verification details """ log.info(f"Checking if {type(copula).__name__} copula is TP2") # If the copula is not absolutely continuous, it cannot be TP2 if ( hasattr(copula, "is_absolutely_continuous") and not copula.is_absolutely_continuous ): log.info("Copula is not absolutely continuous, therefore not TP2") return VerificationResult(False, [], []) # Determine parameter ranges parameter_ranges = self._get_parameter_ranges(copula) if not parameter_ranges: log.debug( "No parameter ranges detected—treating as a single 'unique' copula" ) # Grid of evaluation points test_points = np.linspace(0.0001, 0.9999, 20) violations: List[Dict[str, float]] = [] tested_params: List[Dict[str, float]] = [] # Iterate through all parameter combinations (empty dict → one iteration) for param_values in itertools.product(*parameter_ranges.values()): # Build a simple dict of { 'theta': 0.5, ... } keys = [str(k) for k in parameter_ranges.keys()] param_dict = dict(zip(keys, param_values)) tested_params.append(param_dict) # Instantiate with warnings.catch_warnings(): warnings.simplefilter("ignore", category=SymPyDeprecationWarning) try: copula_instance = copula(**param_dict) except Exception as e: log.warning(f"Error creating copula with params {param_dict}: {e}") continue # Skip if no density try: _inst_abs_cont = copula_instance.is_absolutely_continuous except NotImplementedError: _inst_abs_cont = None # treat as unknown — proceed if _inst_abs_cont is not None and not _inst_abs_cont: log.info(f"No density for params: {param_dict}") continue # Try symbolic log-pdf first; fall back to numerical if unavailable. log_pdf = None try: pdf_attr = copula_instance.pdf # CoreCopula.pdf is a regular method (due to MRO taking precedence # over BivCoreCopula.pdf property). Detect that and skip to the # numerical path instead of letting sympy.log raise SympifyError. if callable(pdf_attr) and not isinstance(pdf_attr, sympy.Basic): # It might be a SymPyFuncWrapper — unwrap it. inner = getattr(pdf_attr, "func", None) if isinstance(inner, sympy.Basic): pdf_attr = inner else: raise TypeError( f"pdf is a callable, not a sympy expression: {type(pdf_attr)}" ) log_pdf = sympy.log(pdf_attr) except Exception as e: log.debug(f"Symbolic pdf unavailable for params {param_dict}: {e}") # Check TP2 on the grid violation_found = False for i in range(len(test_points) - 1): if violation_found: break for j in range(len(test_points) - 1): x1, x2 = test_points[i], test_points[i + 1] y1, y2 = test_points[j], test_points[j + 1] if log_pdf is not None: violated = self.check_violation( copula_instance, log_pdf, x1, x2, y1, y2 ) elif hasattr(copula_instance, "pdf_vectorized"): violated = self._check_violation_numerical( copula_instance, x1, x2, y1, y2 ) else: log.warning( f"No symbolic or numerical pdf for params {param_dict}; skipping" ) violation_found = None # sentinel: could not check break if violated: log.info( f"TP2 violation at params: {param_dict}, " f"points: ({x1}, {y1}), ({x2}, {y2})" ) violations.append(param_dict) violation_found = True break if violation_found is None: # Could not check this parameter combination — remove from tested tested_params.pop() elif not violation_found: log.info(f"No TP2 violations for params: {param_dict}") # Final verdict is_tp2 = len(violations) == 0 and len(tested_params) > 0 return VerificationResult(is_tp2, violations, tested_params)
def _get_parameter_ranges(self, copula: Any) -> Dict[Symbol, np.ndarray]: """ Get parameter ranges for testing. Args: copula: Copula class or instance with `.params` and `.intervals` Returns: Dictionary mapping parameter symbols to arrays of test values """ # Defaults if none provided range_min = -10 if self.range_min is None else self.range_min range_max = 10 if self.range_max is None else self.range_max ranges: Dict[Symbol, np.ndarray] = {} num_params = len(getattr(copula, "params", [])) # Choose how many grid points if num_params == 1: n_interpolate = 20 elif num_params == 2: n_interpolate = 10 else: n_interpolate = 6 for param in getattr(copula, "params", []): param_str = str(param) if param_str not in copula.intervals: log.warning(f"Parameter {param_str} not found in intervals") continue interval = copula.intervals[param_str] pmin = float(max(interval.inf, range_min)) if interval.left_open: pmin += 0.01 # **Use interval.end here (not .sup)** pmax = float(min(interval.end, range_max)) if interval.right_open: pmax -= 0.01 ranges[param] = np.linspace(pmin, pmax, n_interpolate) return ranges
[docs] def check_violation( self, copula: Any, log_pdf: Expr, x1: float, x2: float, y1: float, y2: float ) -> bool: """ Check if the TP2 property is violated at specific points. """ u, v = copula.u, copula.v try: return self._check_extreme_mixed_term(copula, log_pdf, u, v, x1, x2, y1, y2) except Exception as e: log.warning(f"Error checking TP2 at points ({x1},{y1}),({x2},{y2}): {e}") return False
def _check_violation_numerical( self, copula: Any, x1: float, x2: float, y1: float, y2: float ) -> bool: """ Numerical fallback for copulas without a symbolic PDF. Checks the TP2 inequality directly: f(x1,y1) * f(x2,y2) >= f(x1,y2) * f(x2,y1) A small tolerance (1 - 1e-13) is applied to mirror the symbolic path. """ try: f11 = float(np.squeeze(copula.pdf_vectorized(x1, y1))) f22 = float(np.squeeze(copula.pdf_vectorized(x2, y2))) f12 = float(np.squeeze(copula.pdf_vectorized(x1, y2))) f21 = float(np.squeeze(copula.pdf_vectorized(x2, y1))) lhs = f11 * f22 rhs = f12 * f21 violated = lhs * 0.9999999999999 < rhs if violated: log.debug( f"Numerical TP2 violation at ({x1},{y1}),({x2},{y2}): " f"lhs={lhs:.6g}, rhs={rhs:.6g}" ) return violated except Exception as e: log.warning(f"Error in numerical TP2 check at ({x1},{y1}),({x2},{y2}): {e}") return False def _check_extreme_mixed_term( self, copula: Any, log_pdf: Expr, u: Symbol, v: Symbol, x1: float, x2: float, y1: float, y2: float, ) -> bool: """ Check TP2 inequality on the log-pdf: log f(x1,y1) + log f(x2,y2) ≥ log f(x1,y2) + log f(x2,y1) """ # Substitutions min_term = log_pdf.subs(u, x1).subs(v, y1) max_term = log_pdf.subs(u, x2).subs(v, y2) mix1 = log_pdf.subs(u, x1).subs(v, y2) mix2 = log_pdf.subs(u, x2).subs(v, y1) extreme = min_term + max_term mixed = mix1 + mix2 # Direct numeric/symbolic comparison try: comp = extreme * 0.9999999999999 < mixed except TypeError: # Fallback to real parts if complex try: extreme = extreme.as_real_imag()[0] mixed = mixed.as_real_imag()[0] comp = extreme * 0.9999999999999 < mixed except Exception: # Last resort: retry with fresh symbols return self._check_extreme_mixed_term( copula, log_pdf, copula.u, copula.v, x1, x2, y1, y2 ) # Ensure a bool if not isinstance(comp, (bool, BooleanFalse, BooleanTrue)): comp = comp.evalf() if not isinstance(comp, (bool, BooleanFalse, BooleanTrue)): return self._check_extreme_mixed_term( copula, log_pdf, copula.u, copula.v, x1, x2, y1, y2 ) if comp: log.debug( f"TP2 violation at ({x1},{y1}),({x2},{y2}): extreme={extreme}, mixed={mixed}" ) return bool(comp)
[docs] def verify_copula_tp2( copula: Any, range_min: Optional[float] = None, range_max: Optional[float] = None ) -> VerificationResult: """ Convenience function to verify if a copula satisfies the TP2 property. """ verifier = TP2Verifier(range_min, range_max) return verifier.verify_tp2(copula)