Source code for copul.family.other.xi_rho_boundary_copula

# file: copul/families/diagonal_band_b_inverse_reflected.py
import sympy as sp
import numpy as np

from copul.family.core.biv_copula import BivCopula
from copul.family.frechet.biv_independence_copula import BivIndependenceCopula
from copul.family.frechet.lower_frechet import LowerFrechet
from copul.family.frechet.upper_frechet import UpperFrechet
from copul.wrapper.sympy_wrapper import SymPyFuncWrapper


[docs] class XiRhoBoundaryCopula(BivCopula): r""" Optimal–:math:`\rho` diagonal–band copula with dependence parameter :math:`b\in\mathbb{R}\setminus\{0\}`. For :math:`b>0`, the copula is the upper/right boundary copula. For :math:`b<0`, the lower/left boundary copula is obtained by the reflection .. math:: C_b(u,v) = v - C_{|b|}(1-u,v). **Formulas** 1. Maximal Spearman’s :math:`\rho` for :math:`b>0`: .. math:: M(b) = \begin{cases} b - \dfrac{3b^2}{10}, & 0<b\le 1,\\[1ex] 1 - \dfrac{1}{2b^2} + \dfrac{1}{5b^3}, & b\ge 1. \end{cases} 2. Shift :math:`s_v(b)` for :math:`b>0`: .. math:: s_v = \begin{cases} \sqrt{2v/b}, & v \le \frac{1}{2b}\wedge\frac{b}{2},\\[1ex] v + \frac{1}{2b}, & \frac{1}{2b} < v \le 1-\frac{1}{2b},\\[1ex] \frac{v}{b}+\frac12, & \frac{b}{2} < v \le 1-\frac{b}{2},\\[1ex] 1 + \frac1b - \sqrt{2(1-v)/b}, & v > 1 - \left(\frac{1}{2b}\wedge\frac{b}{2}\right). \end{cases} 3. Copula CDF for :math:`b>0`: .. math:: a_v = s_v - 1/b, \qquad C_b(u,v) = \begin{cases} u - \dfrac{b}{2}(u-a_v)^2 + \dfrac{b}{2}(a_v\wedge 0)^2, & a_v < u \le s_v,\\[1ex] \min\{u,v\}, & \text{else}. \end{cases} """ # symbolic parameter & admissible interval b = sp.symbols("b", real=True) params = [b] intervals = {"b": sp.Interval(-sp.oo, 0).union(sp.Interval(0, sp.oo))} special_cases = {0: BivIndependenceCopula} # convenience symbols u, v = sp.symbols("u v", nonnegative=True) def __new__(cls, *args, **kwargs): if args and len(args) == 1: kwargs["b"] = args[0] if "b" in kwargs and kwargs["b"] in cls.special_cases: special_case_cls = cls.special_cases[kwargs["b"]] del kwargs["b"] # Remove b before creating special case return special_case_cls() return super().__new__(cls) def __init__(self, *args, **kwargs): if args and len(args) == 1: kwargs["b"] = args[0] super().__init__(**kwargs) def __call__(self, *args, **kwargs): if args and len(args) == 1: kwargs["b"] = args[0] if "b" in kwargs and kwargs["b"] in self.special_cases: special_case_cls = self.special_cases[kwargs["b"]] del kwargs["b"] # Remove b before creating special case return special_case_cls() return super().__call__(**kwargs) # -------- Maximal Spearman’s rho M(b) -------- # @staticmethod def _M_expr(b): """Piecewise maximal Spearman’s ρ in terms of the original parameter b.""" b_abs = sp.Abs(b) M_when_abs_b_le_1 = b_abs - sp.Rational(3, 10) * b_abs**2 M_when_abs_b_ge_1 = 1 - 1 / (2 * b_abs**2) + 1 / (5 * b_abs**3) return sp.Piecewise( (M_when_abs_b_le_1, b_abs <= 1), (M_when_abs_b_ge_1, True), ) # -------- Shift s_v(b) -------- # @staticmethod def _s_expr(v, b): """ Compute s_v for the original parameter b, using b_inv = 1/|b|. """ b_inv = 1 / sp.Abs(b) # Region |b| >= 1, so min(1/(2|b|), |b|/2) = 1/(2|b|). v1_s_s = b_inv / 2 s1_s_s = sp.sqrt(2 * v * b_inv) s2_s_s = v + b_inv / 2 s3_s_s = 1 + b_inv - sp.sqrt(2 * b_inv * (1 - v)) s_small = sp.Piecewise( (s1_s_s, v <= v1_s_s), (s2_s_s, v <= 1 - v1_s_s), (s3_s_s, True), ) # Region |b| < 1, so min(1/(2|b|), |b|/2) = |b|/2. v1_s_L = 1 / (2 * b_inv) s1_s_L = sp.sqrt(2 * v * b_inv) s2_s_L = v * b_inv + sp.Rational(1, 2) s3_s_L = 1 + b_inv - sp.sqrt(2 * b_inv * (1 - v)) s_large = sp.Piecewise( (s1_s_L, v <= v1_s_L), (s2_s_L, v <= 1 - v1_s_L), (s3_s_L, True), ) return sp.Piecewise( (s_small, sp.Abs(b) >= 1), (s_large, True), ) # -------- Base‐CDF for b > 0 -------- # @staticmethod def _base_cdf_expr(u, v, b): r""" The theoretical CDF for the positive parameter case. """ s = XiRhoBoundaryCopula._s_expr(v, b) a = s - 1 / b quadratic_branch = u - b / 2 * (u - a) ** 2 + b / 2 * sp.Min(a, 0) ** 2 return sp.Piecewise( (quadratic_branch, (a < u) & (u <= s)), (sp.Min(u, v), True), ) # -------- CDF / PDF definitions -------- # @property def _cdf_expr(self): b, u, v = self.b, self.u, self.v # The “upright” expression for b > 0: C_pos = self._base_cdf_expr(u, v, b) # For b < 0, we reflect: C_neg(u,v) = v - C_pos(1-u, v) with b → |b| C_reflected = v - self._base_cdf_expr(1 - u, v, sp.Abs(b)) # Piecewise: choose C_pos if b > 0, else reflection C_full = sp.Piecewise( (C_pos, b > 0), (C_reflected, True), ) return C_full def _pdf_expr(self): r""" Explicit Joint density :math:`c(u,v)` derived from the property :math:`c(u,v) = \lvert b \rvert \cdot s_v'(v)` inside the diagonal band, and 0 outside. For :math:`\lvert b \rvert \ge 1`: Density is :math:`\sqrt{\lvert b \rvert / 2v}` near 0, :math:`\lvert b \rvert` in the middle, and :math:`\sqrt{\lvert b \rvert / 2(1-v)}` near 1. For :math:`\lvert b \rvert < 1`: Density is :math:`\sqrt{\lvert b \rvert / 2v}` near 0, :math:`1` in the middle, and :math:`\sqrt{\lvert b \rvert / 2(1-v)}` near 1. """ b, u, v = self.b, self.u, self.v b_abs = sp.Abs(b) # 1. Determine the transition threshold for v # If |b| >= 1, the linear section starts at v = 1/(2|b|) # If |b| < 1, the linear section starts at v = |b|/2 v_thresh = sp.Piecewise( (1 / (2 * b_abs), b_abs >= 1), (b_abs / 2, True), # covers |b| < 1 ) # 2. Determine density value in the middle linear section # If |b| >= 1, density = |b| # If |b| < 1, density = 1 mid_density_val = sp.Piecewise((b_abs, b_abs >= 1), (1, True)) # 3. Construct the explicit density value function (ignoring band boundaries for a moment) # s'(v)*|b| logic: # Near 0: sqrt(|b| / 2v) # Near 1: sqrt(|b| / 2(1-v)) density_on_band = sp.Piecewise( (sp.sqrt(b_abs / (2 * v)), v <= v_thresh), (sp.sqrt(b_abs / (2 * (1 - v))), v >= 1 - v_thresh), (mid_density_val, True), ) # 4. Define the diagonal band boundaries # We fetch s_v for the parameter |b| s = XiRhoBoundaryCopula._s_expr(v, b) # 5. Combine density with spatial indicator # For b > 0: band is (s - 1/b < u < s) # For b < 0: symmetry implies c(u,v) = c(|b|)(1-u, v). # We can implement this by swapping u -> 1-u in the indicator check if b < 0. # Note: s is computed based on b. If b < 0, _s_expr uses |b| internally. # So 's' represents the shift for C_|b|. # Condition for being inside the band (assuming positive parameter logic) # We use u_eff (effective u) to handle reflection u_eff = sp.Piecewise((u, b > 0), (1 - u, True)) # Check: 0 < |b|(s - u_eff) < 1 <=> s - 1/|b| < u_eff < s in_band = (u_eff > s - 1 / b_abs) & (u_eff < s) pdf_full = sp.Piecewise((density_on_band, in_band), (0, True)) return SymPyFuncWrapper(pdf_full)
[docs] @classmethod def from_xi(cls, x): """Instantiate from xi. Optimized to use float arithmetic if x is float.""" if x == 0: return cls(b=0.0) elif x == 1: return UpperFrechet() elif x == -1: return LowerFrechet() # Fast path for floats to avoid symbolic solver overhead if isinstance(x, (float, np.floating)): # Case 1: 0 < x <= 3/10 if 0 < x <= 0.3: denom = 2.0 * np.cos(np.arccos(-3.0 * np.sqrt(6.0 * x) / 5.0) / 3.0) b_val = np.sqrt(6.0 * x) / denom return cls(b=float(b_val)) # Case 2: 3/10 < x < 1 elif 0.3 < x < 1: numer = 5.0 + np.sqrt(5.0 * (6.0 * x - 1.0)) denom = 10.0 * (1.0 - x) b_val = numer / denom return cls(b=float(b_val)) # Fallback to symbolic for general expressions x_sym = sp.sympify(x) b_ge_1 = sp.sqrt(6 * x_sym) / ( 2 * sp.cos(sp.acos(-3 * sp.sqrt(6 * x_sym) / 5) / 3) ) b_lt_1 = (5 + sp.sqrt(5 * (6 * x_sym - 1))) / (10 * (1 - x_sym)) b_expr = sp.Piecewise( (b_ge_1, (x_sym > 0) & (x_sym <= sp.Rational(3, 10))), (b_lt_1, (x_sym > sp.Rational(3, 10)) & (x_sym < 1)), ) return cls(b=float(b_expr))
@staticmethod def _s_expr_numpy(v, b): """ Calculates shift s_v. Optimized: Uses boolean masks to compute only required branches, avoiding unnecessary ops. """ v = np.asarray(v) b = float(b) b_abs = abs(b) b_inv = 1.0 / b_abs s = np.empty_like(v, dtype=float) if b_abs >= 1.0: thresh_lower = b_inv / 2.0 thresh_upper = 1.0 - thresh_lower mask1 = v <= thresh_lower mask3 = v > thresh_upper mask2 = ~(mask1 | mask3) if np.any(mask1): s[mask1] = np.sqrt(2.0 * v[mask1] * b_inv) if np.any(mask2): s[mask2] = v[mask2] + b_inv / 2.0 if np.any(mask3): s[mask3] = 1.0 + b_inv - np.sqrt(2.0 * b_inv * (1.0 - v[mask3])) else: # |b| < 1 implies |b_inv| > 1 thresh_lower = 1.0 / (2.0 * b_inv) thresh_upper = 1.0 - thresh_lower mask1 = v <= thresh_lower mask3 = v > thresh_upper mask2 = ~(mask1 | mask3) if np.any(mask1): s[mask1] = np.sqrt(2.0 * v[mask1] * b_inv) if np.any(mask2): s[mask2] = v[mask2] * b_inv + 0.5 if np.any(mask3): s[mask3] = 1.0 + b_inv - np.sqrt(2.0 * b_inv * (1.0 - v[mask3])) return s @staticmethod def _base_cdf_numpy(u, v, b): """ Optimized base CDF calculation for b > 0 using the theoretical formula. Explicitly broadcasts inputs to handle scalar/array mixes. """ # Ensure inputs are arrays u = np.asarray(u) v = np.asarray(v) b = float(b) # Explicit broadcasting to ensure u and v have compatible shapes # for boolean indexing operations later. if u.shape != v.shape: u, v = np.broadcast_arrays(u, v) s = XiRhoBoundaryCopula._s_expr_numpy(v, b) a = s - 1.0 / b result = np.minimum(u, v).astype(float, copy=False) mask = (a < u) & (u <= s) if np.any(mask): result = np.array(result, dtype=float, copy=True) result[mask] = ( u[mask] - 0.5 * b * (u[mask] - a[mask]) ** 2 + 0.5 * b * np.minimum(a[mask], 0.0) ** 2 ) return result
[docs] def pdf_vectorized(self, u, v): """ Fully vectorized PDF implementation. Computes density c(u,v) = |b| * s'_v(v) inside the band, 0 outside. Replaces symbolic logic for high performance. """ u = np.asarray(u) v = np.asarray(v) b = self.b b_abs = abs(b) # Symmetry handling: if b < 0, c(u,v) = c(|b|)(1-u, v) u_eff = u if b > 0 else (1.0 - u) # 1. Calculate Shift s_v s = self._s_expr_numpy(v, b_abs) # 2. Calculate Derivative s'_v (ds/dv) # We need to replicate the thresholds from _s_expr_numpy to get derivatives ds_dv = np.empty_like(v, dtype=float) b_inv = 1.0 / b_abs if b_abs >= 1.0: thresh_lower = b_inv / 2.0 thresh_upper = 1.0 - thresh_lower mask1 = v <= thresh_lower mask3 = v > thresh_upper mask2 = ~(mask1 | mask3) if np.any(mask1): # Avoid div by zero at v=0 (density infinite there anyway) with np.errstate(divide="ignore"): ds_dv[mask1] = np.sqrt(b_inv / (2.0 * v[mask1])) if np.any(mask2): ds_dv[mask2] = 1.0 if np.any(mask3): with np.errstate(divide="ignore"): ds_dv[mask3] = np.sqrt(b_inv / (2.0 * (1.0 - v[mask3]))) else: thresh_lower = 1.0 / (2.0 * b_inv) thresh_upper = 1.0 - thresh_lower mask1 = v <= thresh_lower mask3 = v > thresh_upper mask2 = ~(mask1 | mask3) if np.any(mask1): with np.errstate(divide="ignore"): ds_dv[mask1] = np.sqrt(b_inv / (2.0 * v[mask1])) if np.any(mask2): ds_dv[mask2] = b_inv if np.any(mask3): with np.errstate(divide="ignore"): ds_dv[mask3] = np.sqrt(b_inv / (2.0 * (1.0 - v[mask3]))) # 3. Calculate Density # c(u,v) = |b| * s'(v) if a < u_eff < s else 0 # where a = s - 1/|b| density_val = b_abs * ds_dv lower_bound = s - (1.0 / b_abs) upper_bound = s in_band = (u_eff > lower_bound) & (u_eff < upper_bound) result = np.where(in_band, density_val, 0.0) return result
[docs] def cdf_vectorized(self, u, v): """ Vectorized implementation of the cumulative distribution function. This method allows for efficient computation of the CDF for arrays of points, which is detected by the `Checkerboarder` for fast approximation. """ b = self.b if b > 0: return self._base_cdf_numpy(u, v, b) else: # b < 0 u, v = np.asarray(u), np.asarray(v) # Apply the reflection identity: C_neg(u,v) = v - C_pos(1-u, v) with b -> |b| return v - self._base_cdf_numpy(1 - u, v, np.abs(b))
# =================================================================== # END: Vectorized CDF implementation # =================================================================== # -------- Metadata -------- # @property def is_absolutely_continuous(self) -> bool: return True @property def is_symmetric(self) -> bool: return True
[docs] def chatterjees_xi(self): r""" Closed-form :math:`\xi(C_b)` for the original parameter :math:`b`. """ b_abs = sp.Abs(self.b) xi_abs_b_le_1 = b_abs**2 * (5 - 2 * b_abs) / 10 xi_abs_b_ge_1 = 1 - 1 / b_abs + sp.Rational(3, 10) / b_abs**2 return sp.Piecewise( (xi_abs_b_le_1, b_abs <= 1), (xi_abs_b_ge_1, True), )
[docs] def spearmans_rho(self): r""" Closed-form Spearman’s :math:`\rho(C_b)` for the original parameter :math:`b`. """ b = self.b b_abs = sp.Abs(b) rho_abs_b_le_1 = b_abs - sp.Rational(3, 10) * b_abs**2 rho_abs_b_ge_1 = 1 - 1 / (2 * b_abs**2) + 1 / (5 * b_abs**3) return sp.sign(b) * sp.Piecewise( (rho_abs_b_le_1, b_abs <= 1), (rho_abs_b_ge_1, True), )
[docs] def kendalls_tau(self): r""" Closed-form Kendall’s :math:`\tau(C_b)` for the original parameter :math:`b`. """ b = self.b b_abs = sp.Abs(b) tau_abs_b_le_1 = 2 * b_abs / 3 - b_abs**2 / 6 tau_abs_b_ge_1 = 1 - 2 / (3 * b_abs) + 1 / (6 * b_abs**2) return sp.sign(b) * sp.Piecewise( (tau_abs_b_le_1, b_abs <= 1), (tau_abs_b_ge_1, True), )
[docs] def blests_nu(self, *args, **kwargs): return self.spearmans_rho()
# # def cond_distr_1(self, u=None, v=None): # """ # First conditional distribution function: ∂C(u,v)/∂u # # Parameters # ---------- # u, v : float or None, optional # Values to evaluate at. If None, returns the symbolic expression. # # Returns # ------- # CD1Wrapper # The conditional distribution # """ # b = self.b # b_abs = sp.Abs(b) # # # Symmetry: ∂₁C_b(u,v) = ∂₁C_|b|(1-u, v) for b < 0 # u_sym = self.u # u_eff = sp.Piecewise((u_sym, b > 0), (1 - u_sym, True)) # # # Get shift s_v using |b| # s = self._s_expr(self.v, b_abs) # # # The generator formula is h(t) = clamp(|b|*(s - t), 0, 1) # # where t is the effective u # val = b_abs * (s - u_eff) # # # Clamp between 0 and 1 # res = sp.Max(sp.Min(val, 1), 0) # # return CD1Wrapper(res)(u, v) # # def cond_distr_2(self, u=None, v=None): # """ # Second conditional distribution function: ∂C(u,v)/∂v # # Parameters # ---------- # u, v : float or None, optional # Values to evaluate at. If None, returns the symbolic expression. # # Returns # ------- # CD2Wrapper # The conditional distribution # """ # b = self.b # b_abs = sp.Abs(b) # # # 1. Construct the CD2 for the positive parameter case |b| # # The density inside the band is constant in u: c(u,v) = |b| * s'_v(v) # s = self._s_expr(self.v, b_abs) # ds_dv = s.diff(self.v) # density = b_abs * ds_dv # # # Support boundaries for |b| case # # Lower: max(0, s - 1/|b|) # # Upper: min(1, s) # b_inv = 1 / b_abs # L = sp.Max(0, s - b_inv) # R = sp.Min(1, s) # # # 2. Determine effective u based on symmetry # # For b < 0: C_b(u,v) = v - C_|b|(1-u, v) # # Therefore: ∂₂C_b(u,v) = 1 - ∂₂C_|b|(1-u, v) # u_sym = self.u # u_target = sp.Piecewise((u_sym, b > 0), (1 - u_sym, True)) # # # 3. Calculate CD2_|b|(u_target, v) # # Since density is constant in u, CD2 is a linear ramp from 0 to 1 # val_pos = density * (u_target - L) # cd2_pos_expr = sp.Piecewise( # (0, u_target <= L), # (1, u_target >= R), # (val_pos, True), # ) # # # 4. Apply reflection if b < 0 # result = sp.Piecewise((cd2_pos_expr, b > 0), (1 - cd2_pos_expr, True)) # # return CD2Wrapper(result)(u, v) if __name__ == "__main__": # Example usage # XiRhoBoundaryCopula(b=-0.5).plot_cond_distr_1(plot_type="contour") # XiRhoBoundaryCopula(b=-0.5).plot_cond_distr_2(plot_type="contour") # XiRhoBoundaryCopula(b=-1).plot_cond_distr_1(plot_type="contour") # XiRhoBoundaryCopula(b=-1).plot_cond_distr_2(plot_type="contour") XiRhoBoundaryCopula(b=-0.5).plot_pdf(plot_type="contour", levels=100, zlim=(0, 5)) XiRhoBoundaryCopula(b=-1).plot_pdf(plot_type="contour", levels=100, zlim=(0, 6.5)) XiRhoBoundaryCopula(b=-2).plot_pdf(plot_type="contour", levels=100, zlim=(0, 8))