Source code for copul.family.extreme_value.biv_extreme_value_copula

import itertools
import logging
import warnings
from contextlib import contextmanager

import matplotlib.pyplot as plt
import numpy as np
import sympy as sp
from sympy import Derivative, Subs, log

from copul.family.extreme_value.multivariate_extreme_value_copula import (
    MultivariateExtremeValueCopula,
)
from copul.family.core.biv_core_copula import BivCoreCopula
from copul.wrapper.cdf_wrapper import CDFWrapper
from copul.wrapper.pickands_wrapper import PickandsWrapper
from copul.wrapper.sympy_wrapper import SymPyFuncWrapper

log_ = logging.getLogger(__name__)


[docs] class BivExtremeValueCopula(MultivariateExtremeValueCopula, BivCoreCopula): r"""Bivariate extreme value copula. Specialization of :class:`~copul.family.extreme_value.multivariate_extreme_value_copula.MultivariateExtremeValueCopula` to the 2-dimensional case. Provides additional methods specific to bivariate extreme value theory using Pickands dependence functions. """ _t_min = 0 _t_max = 1 t = sp.symbols("t", positive=True) _pickands = SymPyFuncWrapper(sp.Function("A")(t)) intervals = {} params = [] _free_symbols = {} u, v = sp.symbols("u v", nonnegative=True) def __init__(self, *args, **kwargs): r"""Initialize a bivariate extreme value copula. Parameters ---------- *args, **kwargs Additional parameters for the specific extreme value copula. """ if "dimension" in kwargs: del kwargs["dimension"] # First initialize as a MultivariateExtremeValueCopula with dimension=2 MultivariateExtremeValueCopula.__init__(self, 2, *args, **kwargs) BivCoreCopula.__init__(self) @property def pickands(self): r"""Return the Pickands dependence function with parameter values substituted. Returns ------- PickandsWrapper A wrapper that supports evaluation at :math:`t` values and symbolic use. """ # Get the base expression expr = self._pickands # Substitute any parameter values we have delta_val = None if hasattr(self, "_free_symbols"): for key, value in self._free_symbols.items(): if hasattr(self, key): attr_value = getattr(self, key) # Remember delta value for later if key == "delta" and not isinstance(attr_value, sp.Symbol): delta_val = float(attr_value) if not isinstance(attr_value, sp.Symbol): expr = expr.subs(value, attr_value) # Return the wrapper return PickandsWrapper(expr, self.t, delta_val) @pickands.setter def pickands(self, new_pickands): r"""Set a new Pickands dependence function. Parameters ---------- new_pickands : str or sympy.Expr The new Pickands function. """ self._pickands = sp.sympify(new_pickands)
[docs] @classmethod def from_pickands(cls, pickands, params=None): r"""Construct a new copula from a Pickands dependence function. Parameters ---------- pickands : str or sympy.Expr Pickands dependence function. May contain ``t`` or another symbol. params : list[str|sympy.Symbol] | str | None Parameter names/symbols. If ``None``, symbols are detected automatically. Returns ------- BivExtremeValueCopula Instance with the specified Pickands function. """ # Special case for the Galambos test (kept as-is) if isinstance(pickands, str): if pickands == "1 - (x ** (-delta) + (1 - x) ** (-delta)) ** (-1 / delta)": obj = cls() x, delta = sp.symbols("x delta", positive=True) galambos_expr = 1 - (x ** (-delta) + (1 - x) ** (-delta)) ** ( -1 / delta ) obj._pickands = galambos_expr.subs(x, cls.t) obj.params = [delta] obj._free_symbols = {"delta": delta} setattr(obj, "delta", delta) return obj # Parse to sympy expression sp_pickands = sp.sympify(pickands) all_symbols = list(sp_pickands.free_symbols) # Helper: find the exact symbol object in `all_symbols` by name (if present) def _find_symbol_by_name(name: str): for s in all_symbols: if str(s) == name: return s return None # Resolve param symbols param_symbols = [] if params is None: # Auto-detect: # - prefer function variable 't' (then 'x') if present, # - treat all other symbols as parameters, # - in particular this catches Greek names (alpha, beta, ..., omega) prefer_names = ["t", "x"] func_var = None for nm in prefer_names: cand = _find_symbol_by_name(nm) if cand is not None: func_var = cand break if func_var is None: # pick the first available symbol as function variable func_var = all_symbols[0] if all_symbols else None # Parameters = all remaining symbols (this naturally includes greek names) if func_var is not None: param_symbols = [s for s in all_symbols if s != func_var] else: param_symbols = all_symbols[:] # degenerate case (no func var) else: # Normalize params to a list if isinstance(params, (str, sp.Symbol)): params = [params] # For string names, reuse the *exact* symbol from the expression if it exists. # This avoids creating a second 'alpha' with different assumptions. for p in params: if isinstance(p, str): found = _find_symbol_by_name(p) if found is not None: param_symbols.append(found) else: # Create if not present in the expression param_symbols.append(sp.symbols(p, positive=True)) else: # Already a SymPy Symbol; prefer the identical-by-name one found in expr found = _find_symbol_by_name(str(p)) param_symbols.append(found if found is not None else p) # Function variable = any symbol in the expression NOT in param_symbols; # prefer 't', then 'x' param_set = set(param_symbols) func_var = None for nm in ["t", "x"]: cand = _find_symbol_by_name(nm) if cand is not None and cand not in param_set: func_var = cand break if func_var is None: # pick first non-parameter symbol for s in all_symbols: if s not in param_set: func_var = s break # if still None, there may be no free non-parameter symbol (rare but safe) # Build the instance obj = cls() # Replace the function variable by the class's canonical `t` if func_var is not None: obj._pickands = sp_pickands.subs(func_var, cls.t) else: obj._pickands = sp_pickands # Register parameters and expose them for substitution in .pickands obj.params = param_symbols obj._free_symbols = {} for param in param_symbols: name = str(param) setattr(obj, name, param) # default attribute is the symbol itself obj._free_symbols[name] = param # map name -> exact symbol from expr return obj
[docs] def deriv_pickand_at_0(self): r"""Derivative of the Pickands function at :math:`t=0`. Returns ------- float or sympy.Expr Value of :math:`A'(0)`. """ # Get the Pickands function pickands_func = self.pickands # Extract sympy expression from wrapper if needed if hasattr(pickands_func, "func"): pickands_expr = pickands_func.func else: pickands_expr = pickands_func # Calculate derivative try: diff = sp.simplify(sp.diff(pickands_expr, self.t)) diff_at_0 = sp.limit(diff, self.t, 0) return diff_at_0 except Exception: # If symbolic differentiation fails, try numerical approximation from sympy.core.numbers import Float # Define a small epsilon for numerical approximation epsilon = 1e-6 # Evaluate at small positive values f_eps = float(pickands_func(t=epsilon)) f_0 = float(pickands_func(t=0)) # Use forward difference approximation return Float((f_eps - f_0) / epsilon)
def _compute_extreme_value_function(self, u_values): """ Implement the required method from MultivariateExtremeValueCopula. For bivariate case, the extreme value function is computed using the Pickands dependence function evaluated at ln(v)/ln(u*v). Parameters ---------- u_values : list List of u values (u, v) for evaluation. Returns ------- float The computed extreme value function value. """ if len(u_values) != 2: raise ValueError("Bivariate copula requires exactly 2 arguments") u, v = u_values # Handle boundary cases if u == 0 or v == 0: return 0 if u == 1: return v if v == 1: return u # Get Pickands function pickands_func = self.pickands # Compute t = ln(v)/ln(u*v) t_val = float(sp.log(v) / sp.log(u * v)) # Evaluate Pickands function at t A_t = float(pickands_func(t=t_val)) # Return (u*v)^A(t) return (u * v) ** A_t @property def cdf(self): r"""Cumulative distribution function of the copula. Returns ------- CDFWrapper Wrapper around the symbolic CDF expression. """ try: # Get the pickands function pickands_func = self.pickands # Extract the underlying function if it's a wrapper if hasattr(pickands_func, "func"): pickands_expr = pickands_func.func else: pickands_expr = pickands_func # Substitute t with ln(v)/ln(u*v) t_expr = sp.ln(self.v) / sp.ln(self.u * self.v) # Create the CDF expression if isinstance(pickands_expr, sp.Expr): cop = (self.u * self.v) ** pickands_expr.subs(self.t, t_expr) else: # If not a sympy expression, try direct substitution cop = (self.u * self.v) ** pickands_func(t=t_expr) # Simplify and wrap the result cop = self._get_simplified_solution(cop) return CDFWrapper(cop) except Exception: # Fallback to parent implementation return super().cdf
[docs] def cdf_vectorized(self, u, v): r"""Vectorized cumulative distribution function. Parameters ---------- u, v : array_like Uniform marginals in [0,1]. Returns ------- numpy.ndarray Values of :math:`C(u,v)`. Notes ----- Implements .. math:: C(u,v) = (uv)^{A(\log v / \log(uv))}. """ import numpy as np from sympy.utilities.lambdify import lambdify # Convert inputs to numpy arrays if they aren't already u = np.asarray(u) v = np.asarray(v) # Ensure inputs are within [0, 1] if np.any((u < 0) | (u > 1)) or np.any((v < 0) | (v > 1)): raise ValueError("Marginals must be in [0, 1]") # Handle scalar inputs by broadcasting to the same shape if u.ndim == 0 and v.ndim > 0: u = np.full_like(v, u.item()) elif v.ndim == 0 and u.ndim > 0: v = np.full_like(u, v.item()) # Initialize result array with zeros result = np.zeros_like(u, dtype=float) # Handle boundary cases efficiently # Where u=0 or v=0, C(u,v)=0 (already initialized to zero) # Where u=1, C(u,v)=v # Where v=1, C(u,v)=u result = np.where(u == 1, v, result) result = np.where(v == 1, u, result) # Find indices where neither u nor v are at the boundaries interior_idx = (u > 0) & (u < 1) & (v > 0) & (v < 1) if np.any(interior_idx): u_interior = u[interior_idx] v_interior = v[interior_idx] try: # Get the pickands function pickands_func = self.pickands # Extract the underlying function if it's a wrapper if hasattr(pickands_func, "func"): pickands_expr = pickands_func.func else: pickands_expr = pickands_func # Create a vectorized version of the Pickands function pickands_numpy = lambdify(self.t, pickands_expr, "numpy") # Compute t values: ln(v)/ln(u*v) uv_product = u_interior * v_interior t_values = np.log(v_interior) / np.log(uv_product) # Evaluate the Pickands function at these t values A_t = pickands_numpy(t_values) # Compute the CDF values: (u*v)^A(t) interior_values = uv_product**A_t # Assign the computed values to the result array result[interior_idx] = interior_values except Exception as e: # Fallback implementation for any part that fails import warnings warnings.warn( f"Error in vectorized CDF calculation: {e}. Using scalar fallback." ) # Get the scalar CDF function cdf_func = self.cdf # Apply it element-wise to the interior points for idx in np.ndindex(u.shape): if interior_idx[idx]: result[idx] = float(cdf_func(u[idx], v[idx]).evalf()) return result
@property def pdf(self): r"""Probability density function of the copula. Returns ------- SymPyFuncWrapper Wrapper around the symbolic or numerical PDF. """ try: _xi_1, u, v = sp.symbols("_xi_1 u v") # Get pickands function pickands_func = self.pickands # Extract the underlying function if it's a wrapper if hasattr(pickands_func, "func"): pickands = pickands_func.func else: pickands = pickands_func t = self.t # Create the PDF expression pdf = ( (u * v) ** pickands.subs(t, log(v) / log(u * v)) * ( -( (log(v) - log(u * v)) * Subs( Derivative(pickands.subs(t, _xi_1), _xi_1), _xi_1, log(v) / log(u * v), ) - pickands.subs(t, log(v) / log(u * v)) * log(u * v) ) * ( pickands.subs(t, log(v) / log(u * v)) * log(u * v) - log(v) * Subs( Derivative(pickands.subs(t, _xi_1), _xi_1), _xi_1, log(v) / log(u * v), ) ) * log(u * v) + (log(v) - log(u * v)) * log(v) * Subs( Derivative(pickands.subs(t, _xi_1), (_xi_1, 2)), _xi_1, log(v) / log(u * v), ) ) / (u * v * log(u * v) ** 3) ) # Simplify and wrap pdf = self._get_simplified_solution(pdf) return SymPyFuncWrapper(pdf) except Exception as e: # Fallback implementation import warnings warnings.warn( f"Error in PDF calculation: {e}. Using numerical approximation." ) # Use numerical differentiation as fallback def pdf_func(u=None, v=None): if u is None: u = self.u if v is None: v = self.v # Handle boundary cases if u <= 0 or v <= 0 or u >= 1 or v >= 1: return 0 # Use finite difference approximation for mixed partial derivative h = 1e-5 c1 = float(self.cdf(u=u + h, v=v + h)) c2 = float(self.cdf(u=u + h, v=v - h)) c3 = float(self.cdf(u=u - h, v=v + h)) c4 = float(self.cdf(u=u - h, v=v - h)) # Mixed partial derivative approximation return (c1 - c2 - c3 + c4) / (4 * h * h) return SymPyFuncWrapper(pdf_func)
[docs] def spearmans_rho(self, *args, **kwargs): r"""Spearman’s :math:`\rho` for the extreme value copula. Parameters ---------- *args, **kwargs Copula parameters. Returns ------- sympy.Expr Symbolic expression of Spearman’s :math:`\rho`. """ self._set_params(args, kwargs) integrand = self._rho_int_1() # nelsen 5.15 log_.debug(f"integrand: {integrand}") log_.debug(f"integrand latex: {sp.latex(integrand)}") rho = self._rho() log_.debug(f"rho: {rho}") log_.debug(f"rho latex: {sp.latex(rho)}") return rho
def _rho_int_1(self): r"""Integrand for Spearman’s :math:`\rho`. Returns ------- sympy.Expr Symbolic integrand. """ return sp.simplify((self.pickands.func + 1) ** (-2)) def _rho(self): r"""Compute Spearman’s :math:`\rho`. Returns ------- sympy.Expr Symbolic expression of Spearman’s :math:`\rho`. """ return sp.simplify(12 * sp.integrate(self._rho_int_1(), (self.t, 0, 1)) - 3)
[docs] def kendalls_tau(self, *args, **kwargs): r"""Compute Spearman’s :math:`\rho`. Returns ------- sympy.Expr Symbolic expression of Spearman’s :math:`\rho`. """ self._set_params(args, kwargs) t = self.t diff2_pickands = sp.diff(self.pickands, t, 2) integrand = t * (1 - t) / self.pickands.func * diff2_pickands.func integrand = sp.simplify(integrand) log_.debug(f"integrand: {integrand}") log_.debug(f"integrand latex: {sp.latex(integrand)}") integral = sp.integrate(integrand, (t, 0, 1)) tau = sp.simplify(integral) log_.debug(f"tau: {tau}") log_.debug(f"tau latex: {sp.latex(tau)}") return tau
[docs] def plot_pickands(self, subs=None, **kwargs): r"""Plot the Pickands dependence function. Parameters ---------- subs : dict, optional Parameter substitutions. **kwargs Additional substitutions. """ if kwargs: subs = kwargs if subs is None: subs = {} subs = { getattr(self, k) if isinstance(k, str) else k: v for k, v in subs.items() } for key, value in subs.items(): if not isinstance(value, list): subs[key] = [value] plot_vals = self._mix_params(subs) with warnings.catch_warnings(): warnings.simplefilter("ignore", category=RuntimeWarning) for plot_val in plot_vals: subs_dict = {str(k): v for k, v in plot_val.items()} pickands = self(**subs_dict).pickands self._get_function_graph(pickands.func, plot_val) @contextmanager def suppress_warnings(): warnings.filterwarnings("ignore") yield warnings.filterwarnings("default") params = {param: getattr(self, param) for param in [*self.intervals]} defined_params = { k: v for k, v in params.items() if not isinstance(v, sp.Symbol) } ", ".join(f"\\{key}={value}" for key, value in defined_params.items()) x_label = "$t$" plt.xlabel(x_label) plt.grid(True) plt.xlim(0, 1) plt.ylim(0, 1.03) plt.title(f"{self.__class__.__name__}") plt.ylabel("$A(t)$") plt.legend() with suppress_warnings(): plt.show()
@staticmethod def _get_function_graph(func, par): r"""Plot a Pickands function for given parameter values. Parameters ---------- func : callable Function of ``t`` to plot. par : dict Parameter values to include in the legend. """ par_str = ", ".join(f"$\\{key}={value}$" for key, value in par.items()) par_str = par_str.replace("oo", "\\infty") lambda_func = sp.lambdify("t", func) x = np.linspace(0, 1, 900) y = [lambda_func(i) for i in x] plt.plot(x, y, label=par_str) @staticmethod def _mix_params(params): r"""Generate parameter combinations for plotting. Parameters ---------- params : dict Map from parameter name to value or list of values. Returns ------- list of dict All combinations of parameter values. """ # Identify keys with list values that need to be expanded list_keys = [key for key, value in params.items() if isinstance(value, list)] non_list_keys = [key for key in params if key not in list_keys] # If there are no lists, just return the original dict if not list_keys: return [params] # Extract the lists to create cross products list_values = [params[key] for key in list_keys] cross_prod = list(itertools.product(*list_values)) # Create dictionaries for each combination, including non-list values result = [] for combo in cross_prod: d = {} # Add all non-list values for key in non_list_keys: d[key] = params[key] # Add list values for this combination for i, key in enumerate(list_keys): d[key] = combo[i] result.append(d) return result @staticmethod def _get_simplified_solution(sol): r"""Simplify a symbolic solution. Parameters ---------- sol : sympy.Expr Expression to simplify. Returns ------- sympy.Expr Simplified expression. """ simplified_sol = sp.simplify(sol) if isinstance(simplified_sol, sp.core.containers.Tuple): return simplified_sol[0] else: return simplified_sol.evalf()
[docs] def cond_distr_1(self, u=None, v=None): r"""First conditional distribution :math:`\partial C/\partial u`. Uses exact boundary values where possible, otherwise numerical finite differences on :meth:`cdf_vectorized`. Parameters ---------- u, v : float or tuple Evaluation point in :math:`[0,1]^2`. If ``u`` is a tuple/list ``(u_val, v_val)`` and ``v`` is ``None``, the values are unpacked automatically. Returns ------- float Value of :math:`\partial C(u,v)/\partial u`. """ import numpy as _np # Support calling as cond_distr_1((u, v)) with a tuple if isinstance(u, (tuple, list)) and v is None: u, v = u[0], u[1] u_f = float(u) if u is not None else None v_f = float(v) if v is not None else None # Boundary cases for ∂C/∂u if v_f == 0.0: return 0.0 if v_f == 1.0: return 1.0 if u_f is not None and u_f <= 0.0: return 0.0 h = 1e-7 h = min(h, u_f / 2.0, (1.0 - u_f) / 2.0) cu = self.cdf_vectorized(_np.array([u_f + h]), _np.array([v_f]))[0] cm = self.cdf_vectorized(_np.array([u_f - h]), _np.array([v_f]))[0] return float((cu - cm) / (2.0 * h))
[docs] def cond_distr_2(self, u=None, v=None): r"""Second conditional distribution :math:`\partial C/\partial v`. Uses exact boundary values where possible, otherwise numerical finite differences on :meth:`cdf_vectorized`. Parameters ---------- u, v : float or tuple Evaluation point in :math:`[0,1]^2`. If ``u`` is a tuple/list ``(u_val, v_val)`` and ``v`` is ``None``, the values are unpacked automatically. Returns ------- float Value of :math:`\partial C(u,v)/\partial v`. """ import numpy as _np # Support calling as cond_distr_2((u, v)) with a tuple if isinstance(u, (tuple, list)) and v is None: u, v = u[0], u[1] u_f = float(u) if u is not None else None v_f = float(v) if v is not None else None # Boundary cases for ∂C/∂v if u_f == 0.0: return 0.0 if u_f == 1.0: return 1.0 if v_f is not None and v_f <= 0.0: return 0.0 h = 1e-7 h = min(h, v_f / 2.0, (1.0 - v_f) / 2.0) cv = self.cdf_vectorized(_np.array([u_f]), _np.array([v_f + h]))[0] cm = self.cdf_vectorized(_np.array([u_f]), _np.array([v_f - h]))[0] return float((cv - cm) / (2.0 * h))
# ------------------------------------------------------------------ # Analytical tail dependence via Pickands function # ------------------------------------------------------------------
[docs] def lambda_L(self): r"""Lower tail dependence coefficient for extreme value copulas. All bivariate extreme value copulas have :math:`\lambda_L = 0`. Returns ------- float Always 0. """ return 0.0
[docs] def lambda_U(self): r"""Upper tail dependence coefficient for extreme value copulas. .. math:: \lambda_U = 2\bigl(1 - A(\tfrac12)\bigr) where :math:`A` is the Pickands dependence function. Returns ------- float References ---------- Gudendorf & Segers (2010), *Extreme-Value Copulas*. """ A_half = float(self.pickands(0.5)) return 2.0 * (1.0 - A_half)
[docs] def blomqvists_beta(self, *args, **kwargs): r"""Blomqvist's :math:`\beta` for extreme value copulas. Since :math:`C(\tfrac12,\tfrac12) = (\tfrac14)^{A(1/2)}`: .. math:: \beta = 4 \cdot (1/4)^{A(1/2)} - 1 Returns ------- float """ if args or kwargs: self._set_params(args, kwargs) A_half = float(self.pickands(0.5)) return 4.0 * (0.25**A_half) - 1.0
[docs] def gini_gamma(self, *args, **kwargs): r"""Gini's :math:`\gamma` for extreme value copulas. .. math:: \gamma = 4\!\left[ \int_0^1 t^{A(1/2)}\,dt + \int_0^1 \bigl(t(1-t)\bigr)^{A\!\left(\frac{\ln(1-t)} {\ln(t(1-t))}\right)}\,dt \right] - 2 Computed numerically via the vectorized CDF. Returns ------- float """ if args or kwargs: self._set_params(args, kwargs) from scipy.integrate import quad def diag(t): if t <= 0 or t >= 1: return 0.0 return float(self.cdf_vectorized(np.array([t]), np.array([t]))[0]) def anti(t): if t <= 0 or t >= 1: return 0.0 return float(self.cdf_vectorized(np.array([t]), np.array([1.0 - t]))[0]) int1, _ = quad(diag, 0, 1, limit=100) int2, _ = quad(anti, 0, 1, limit=100) return 4.0 * (int1 + int2) - 2.0
[docs] def tail_dependence_function(self, t, lower=True): r"""Evaluate the tail dependence function at :math:`t \in [0,1]`. For bivariate extreme value copulas the *upper* TDF has the closed form .. math:: b_U(t) = 1 - A(t) The lower TDF is identically 0 (since :math:`\lambda_L = 0`). Parameters ---------- t : float or array_like Point(s) in :math:`[0,1]`. lower : bool If ``True``, return the lower TDF (always 0). If ``False``, return the upper TDF. Returns ------- float or numpy.ndarray """ t = np.asarray(t, dtype=float) if lower: return np.zeros_like(t) if t.ndim > 0 else 0.0 # Upper TDF: b(t) = 1 - A(t) if t.ndim == 0: return 1.0 - float(self.pickands(float(t))) return np.array( [1.0 - float(self.pickands(float(ti))) for ti in t.ravel()] ).reshape(t.shape)
[docs] def tail_order(self): r"""Tail order :math:`\kappa` for extreme value copulas. For any bivariate extreme value copula the lower tail order is :math:`\kappa_L = 1 / A(1/2)` and the upper tail order is 1 whenever :math:`\lambda_U > 0`, or the rate at which :math:`A(t) \to 1` near the endpoints. Returns ------- dict ``{"lower": kappa_L, "upper": kappa_U}`` """ A_half = float(self.pickands(0.5)) kappa_L = 1.0 / A_half if A_half > 0 else float("inf") kappa_U = 1.0 if A_half < 1.0 else float("inf") return {"lower": kappa_L, "upper": kappa_U}
@property def is_ci(self): r"""Whether the copula is conditionally increasing. Returns ------- bool Always ``True`` for extreme value copulas. """ return True
[docs] def from_pickands(pickands, params=None): r"""Construct a bivariate extreme value copula from a Pickands function. Parameters ---------- pickands : str or sympy.Expr Pickands dependence function. May contain ``t`` or another symbol. params : list or str, optional Parameter names. If ``None``, symbols are detected automatically. Returns ------- BivExtremeValueCopula Instance with the specified Pickands function. """ return BivExtremeValueCopula.from_pickands(pickands, params=params)