Source code for copul.family.other.clamped_parabola_copula

from typing import TypeAlias

import sympy as sp
import numpy as np
from scipy.optimize import brentq
import matplotlib.pyplot as plt
import matplotlib.colors as mcolors
import types

from copul.family.core.biv_copula import BivCopula
from copul.family.frechet.biv_independence_copula import BivIndependenceCopula
from copul.family.frechet.upper_frechet import UpperFrechet

import warnings
from scipy.integrate import IntegrationWarning

warnings.filterwarnings("ignore", category=IntegrationWarning)


[docs] class XiNuBoundaryCopula(BivCopula): r""" Clamped–parabola copula parameterized by :math:`b=1/\mu>0`: .. math:: h_v(t) \;=\; \mathrm{clamp}\!\left(b\big((1-t)^2 - q(v)\big),\,0,\,1\right), where :math:`q(v)` is uniquely determined by the marginal constraint :math:`\int_0^1 h_v(t)\,dt=v`. """ # Parameter now: b > 0 b = sp.symbols("b", positive=True) params = [b] intervals = {"b": sp.Interval.open(0, sp.oo)} # Limits: b->∞ gives M, b->0+ gives Π special_cases = {sp.oo: UpperFrechet, 0: BivIndependenceCopula} # Convenience symbols u, v = sp.symbols("u v", nonnegative=True) def __init__(self, *args, **kwargs): if args and len(args) == 1: kwargs["b"] = args[0] super().__init__(**kwargs) self._q_cache = {} # --------------------------- # Core: solve q(v) # --------------------------- @staticmethod def _marginal_integral_residual(q, v_target, b): """ Residual F(q) = (∫_0^1 h_v(t) dt) - v for a given q, with h_v(t)=clamp(b((1-t)^2-q),0,1). Valid q ∈ [-1/b, 1]. """ if q > 1 or q < -1.0 / b: return 1e6 s_v = 1.0 if q < 0 else 1.0 - np.sqrt(q) # right zero of 0-clamp a_v = max(0.0, 1.0 - np.sqrt(q + 1.0 / b)) # right end of plateau (h=1) # ∫ h_v = plateaulen + b * ∫(x^2 - q) on the middle arc, with primitive T(x;q) = x^3/3 - q x integral = a_v val_at_s = -((1.0 - s_v) ** 3) / 3.0 - q * s_v val_at_a = -((1.0 - a_v) ** 3) / 3.0 - q * a_v integral += b * (val_at_s - val_at_a) return integral - v_target def _get_q_v(self, v_val, b_val): """ Solve q(v) for a single v ∈ [0,1]. """ # exact endpoints if v_val == 0.0: return 1.0 if v_val == 1.0: return -1.0 / b_val cache_key = (v_val, b_val) if cache_key in self._q_cache: return self._q_cache[cache_key] lo, hi = -1.0 / b_val, 1.0 try: q_val = brentq( self._marginal_integral_residual, lo, hi, args=(v_val, b_val) ) self._q_cache[cache_key] = q_val return q_val except ValueError: # check boundaries rl = self._marginal_integral_residual(lo, v_val, b_val) if np.isclose(rl, 0.0): return lo rh = self._marginal_integral_residual(hi, v_val, b_val) if np.isclose(rh, 0.0): return hi raise RuntimeError( f"Failed to find q for v={v_val}, b={b_val}. " f"Residuals: F(-1/b)={rl:.3g}, F(1)={rh:.3g}" ) def _get_q_v_vec(self, v_arr, b_val): v_arr = np.asarray(v_arr) shp = v_arr.shape q_flat = np.array([self._get_q_v(v, b_val) for v in v_arr.ravel()]) return q_flat.reshape(shp) # --------------------------- # Vectorized CDF / PDF # --------------------------- @property def cdf(self): return self._cdf_expr # --- base (|b|) helpers used by both signs --- def _base_cdf_vectorized(self, u, v): """CDF for the upright copula with parameter b_abs = |b| > 0.""" u, v = np.asarray(u), np.asarray(v) b_abs = float(abs(self.b)) q = self._get_q_v_vec(v, b_abs) s = np.empty_like(q, dtype=float) mask = q >= 0.0 s[~mask] = 1.0 s[mask] = 1.0 - np.sqrt(q[mask]) a = np.maximum(0.0, 1.0 - np.sqrt(q + 1.0 / b_abs)) val_at_u = -((1.0 - u) ** 3) / 3.0 - q * u val_at_a = -((1.0 - a) ** 3) / 3.0 - q * a middle = a + b_abs * (val_at_u - val_at_a) return np.select([u <= a, u <= s], [u, middle], default=v) def _base_pdf_vectorized(self, u, v): """PDF for the upright copula with parameter b_abs = |b| > 0.""" u = np.asarray(u, dtype=float) v = np.asarray(v, dtype=float) out = np.zeros_like(u, dtype=float) b_abs = float(abs(self.b)) q = self._get_q_v_vec(v, b_abs) a, s = self._switch_points(q, b_abs) vprime = self._vprime_of_q(q, b_abs) eps = 1e-14 qprime = 1.0 / np.where(np.abs(vprime) < eps, np.sign(vprime) * eps, vprime) height = -b_abs * qprime mask = (u > a) & (u < s) out[mask] = height[mask] return out.reshape(u.shape)
[docs] def cdf_vectorized(self, u, v): """ If b >= 0: C(u,v) = C_base(u,v; |b|). If b < 0 : C(u,v) = u - C_base(u, 1-v; |b|) (σ2 reflection). """ if float(self.b) >= 0: return self._base_cdf_vectorized(u, v) else: u_arr, v_arr = np.asarray(u), np.asarray(v) return u_arr - self._base_cdf_vectorized(u_arr, 1.0 - v_arr)
[docs] def pdf_vectorized(self, u, v): """ If b >= 0: c(u,v) = c_base(u,v; |b|). If b < 0 : c(u,v) = c_base(u, 1-v; |b|) (chain rule for σ2). """ if float(self.b) >= 0: return self._base_pdf_vectorized(u, v) else: u_arr, v_arr = np.asarray(u), np.asarray(v) return self._base_pdf_vectorized(u_arr, 1.0 - v_arr)
def _switch_points(self, q, b): """Return a(v), s(v) for given q and b.""" a = np.maximum(0.0, 1.0 - np.sqrt(q + 1.0 / b)) s = np.empty_like(q, dtype=float) mask = q >= 0.0 s[~mask] = 1.0 s[mask] = 1.0 - np.sqrt(q[mask]) return a, s def _vprime_of_q(self, q, b): """ Piecewise derivative v'(q) with μ = 1/b. Returns an array matching q. """ q = np.asarray(q, dtype=float) mu = 1.0 / float(b) R = np.sqrt(np.maximum(q + mu, 0.0)) # defined for q > -mu r = np.sqrt(np.maximum(q, 0.0)) # defined for q >= 0 vprime = np.empty_like(q) # Regime A1: q < 0 and R < 1 mask_A1 = (q < 0.0) & (R < 1.0 - 1e-14) vprime[mask_A1] = -R[mask_A1] / mu # Regime A2: q < 0 and R >= 1 (only if mu >= 1) mask_A2 = (q < 0.0) & ~mask_A1 vprime[mask_A2] = -1.0 / mu # Regime B: 0 <= q < 1 - mu (only if mu < 1) mask_B = (q >= 0.0) & (q < 1.0 - mu - 1e-14) Delta = R - r vprime[mask_B] = -(1.0 / (2.0 * R[mask_B])) * (1.0 + (Delta[mask_B] ** 2) / mu) # Regime C: 1 - mu <= q <= 1 mask_C = ~(mask_A1 | mask_A2 | mask_B) vprime[mask_C] = (r[mask_C] - 1.0) / mu return vprime
[docs] def cond_distr_1(self): """ h(u,v) = ∂_u C(u,v). If b >= 0: h = clamp(|b|((1-u)^2 - q(v)),0,1). If b < 0 : h = 1 - clamp(|b|((1-u)^2 - q(1-v)),0,1). (since C^σ2) NOTE: q(·) will be evaluated with |b| in the lambdify bridge. """ b = sp.Abs(self.b) q = sp.Function("q") h_base = sp.Min(sp.Max(0, b * ((1 - self.u) ** 2 - q(self.v))), 1) h_ref = 1 - sp.Min(sp.Max(0, b * ((1 - self.u) ** 2 - q(1 - self.v))), 1) return sp.Piecewise((h_base, self.b >= 0), (h_ref, True))
@property def _cdf_expr(self): # C(u,v) = ∫_0^u h(x,v) dx — this matches the σ2 wrapper as well. return sp.Integral(self.cond_distr_1(), (self.u, 0, self.u)) def _pdf_expr(self): raise NotImplementedError("Symbolic PDF not available; use pdf_vectorized.") # --------------------------- # Build from target xi # ---------------------------
[docs] @classmethod def from_xi(cls, x_target): """ Solve for b from target ξ. Since ξ(μ) is strictly decreasing in μ, we solve for μ and return b=1/μ. """ if not (0.0 < x_target < 1.0): raise ValueError("Target xi must be in (0, 1).") # use temporary instances to compute ξ(μ) def xi_of_mu(mu): tmp = cls(b=1.0 / mu) return tmp.chatterjees_xi() xi_at_1 = xi_of_mu(1.0) if x_target < xi_at_1: lo, hi = 1.0, 2.0 while xi_of_mu(hi) > x_target: hi *= 2.0 if hi > 1e12: raise RuntimeError( "from_xi bracketing failed (upper bound exploded)." ) else: hi, lo = 1.0, 0.5 while xi_of_mu(lo) < x_target: lo *= 0.5 if lo < 1e-14: return cls(b=1.0 / lo) # effectively b→∞ def f(mu): return xi_of_mu(mu) - x_target mu_val = brentq(f, lo, hi, maxiter=200, xtol=1e-14, rtol=1e-12) return cls(b=1.0 / mu_val)
[docs] def chatterjees_xi(self): import numpy as _np b_abs = float(abs(self.b)) if b_abs <= 0.0: raise ValueError("b must be nonzero.") mu = 1.0 / b_abs s = _np.sqrt(mu) if mu < 1.0: t = _np.sqrt(1.0 - mu) A = _np.asinh(t / s) num = ( -105 * s**8 * A + 183 * s**6 * t - 38 * s**4 * t - 88 * s**2 * t + 112 * s**2 + 48 * t - 48 ) den = 210 * s**6 return num / den else: return 8.0 * (7.0 * mu - 3.0) / (105.0 * mu**3)
[docs] def blests_nu(self): """ Signed Blest's ν(b). Under the σ₂ reflection (b<0) the dependence reverses, so ν must satisfy ν(-b) = -ν(b). We therefore return sign(b) * ν(|b|), where ν(|b|) is the closed form in terms of μ = 1/|b|. """ import numpy as _np b = float(self.b) if b == 0.0: return 0.0 # limit = independence sgn = 1.0 if b > 0.0 else -1.0 mu = 1.0 / abs(b) s = _np.sqrt(mu) if mu < 1.0: t = _np.sqrt(1.0 - mu) A = _np.asinh(t / s) num = ( -105 * s**8 * A + 87 * s**6 * t + 250 * s**4 * t - 376 * s**2 * t + 448 * s**2 + 144 * t - 144 ) den = 420 * s**4 return sgn * (num / den) else: return sgn * (4.0 * (28.0 * mu - 9.0) / (105.0 * mu**2))
# =================================================================== # START: Rich plotting capabilities (restored, adapted to b) # =================================================================== def _plot3d(self, func, title, zlabel, zlim=None, **kwargs): r""" Internal 3D surface plot using either a numpy-callable or a SymPy expr. If 'func' is a SymPy expression, we lambdify it with q(v) injected. """ if hasattr(func, "__name__") and func.__name__ in ( "cdf_vectorized", "pdf_vectorized", ): # Already a numpy-callable f = func else: # func is a SymPy expression or bound method returning an expr if isinstance(func, types.MethodType): func = func() def q_func(v_val): return self._get_q_v_vec(v_val, float(self.b)) f = sp.lambdify( (self.u, self.v), func, modules=[{"q": q_func, "Min": np.minimum, "Max": np.maximum}, "numpy"], ) fig = plt.figure(figsize=(8, 6)) ax = fig.add_subplot(111, projection="3d") u_vals = np.linspace(0.01, 0.99, 50) v_vals = np.linspace(0.01, 0.99, 50) U, V = np.meshgrid(u_vals, v_vals) Z = f(U, V) ax.plot_surface(U, V, Z, cmap="viridis", edgecolor="none") ax.set_xlabel("u") ax.set_ylabel("v") ax.set_zlabel(zlabel) ax.set_title(title) if zlim: ax.set_zlim(*zlim) else: ax.set_zlim(0, None) plt.show() return fig, ax def _plot_contour( self, func, title, zlabel, *, levels=200, zlim=None, log_z=False, **kwargs ): """ Internal contour plot using either a numpy-callable or a SymPy expr. If 'func' is a SymPy expression, we lambdify it with q(v) injected. """ if hasattr(func, "__name__") and func.__name__ in ( "cdf_vectorized", "pdf_vectorized", ): f = func else: if isinstance(func, types.MethodType): func = func() def q_func(v_val): return self._get_q_v_vec(v_val, float(self.b)) f = sp.lambdify( (self.u, self.v), func, modules=[{"q": q_func, "Min": np.minimum, "Max": np.maximum}, "numpy"], ) grid_size = kwargs.pop("grid_size", 100) x = np.linspace(0.005, 0.995, grid_size) y = np.linspace(0.005, 0.995, grid_size) X, Y = np.meshgrid(x, y) Z = f(X, Y) if zlim: Z = np.clip(Z, zlim[0], zlim[1]) fig, ax = plt.subplots() if log_z: # avoid zeros for LogNorm Zp = np.array(Z, copy=True) if np.any(Zp > 0): Zp[Zp <= 0] = np.min(Zp[Zp > 0]) norm = mcolors.LogNorm(vmin=Zp.min(), vmax=Zp.max()) cf = ax.contourf(X, Y, Zp, levels=levels, cmap="viridis", norm=norm) else: # fallback if everything is 0 cf = ax.contourf(X, Y, Z, levels=levels, cmap="viridis") else: cf = ax.contourf(X, Y, Z, levels=levels, cmap="viridis") cbar = fig.colorbar(cf, ax=ax) cbar.set_label(zlabel) ax.set_xlabel("u") ax.set_ylabel("v") ax.set_title(title) plt.tight_layout() plt.savefig(f"images/xi_nu_boundary_{self.b}.png") plt.show() return fig def _plot_functions(self, func, title, zlabel, xlabel="u", **kwargs): """ Internal line plots (slices) using either a numpy-callable or a SymPy expr. If 'func' is a SymPy expression, we lambdify it with q(v) injected. """ if hasattr(func, "__name__") and func.__name__ in ( "cdf_vectorized", "pdf_vectorized", ): f = func else: if isinstance(func, types.MethodType): func = func() def q_func(v_val): return self._get_q_v_vec(v_val, float(self.b)) f = sp.lambdify( (self.u, self.v), func, modules=[{"q": q_func, "Min": np.minimum, "Max": np.maximum}, "numpy"], ) u_vals = np.linspace(0.01, 0.99, 200) v_vals = np.linspace(0.1, 0.9, 9) fig, ax = plt.subplots(figsize=(6, 6)) for v_i in v_vals: y_vals = f(u_vals, v_i) ax.plot(u_vals, y_vals, label=f"$v = {v_i:.1f}$", linewidth=2.5) ax.set_xlabel(xlabel) ax.set_ylabel(zlabel) ax.set_title(f"{title}{zlabel}") ax.grid(True) ax.legend(loc="best") fig.tight_layout() plt.show() return fig
[docs] def plot_cdf(self, *, plot_type="3d", log_z=False, **kwargs): """Plot the CDF using the numerical :meth:`cdf_vectorized` implementation.""" title = kwargs.pop("title", "Cumulative Distribution Function") zlabel = kwargs.pop("zlabel", "CDF") if plot_type == "3d": return self._plot3d( self.cdf_vectorized, title, zlabel, zlim=(0, 1), **kwargs ) elif plot_type == "contour": return self._plot_contour( self.cdf_vectorized, title, zlabel, zlim=(0, 1), log_z=log_z, **kwargs ) else: raise ValueError(f"plot_type must be '3d' or 'contour', not {plot_type}")
[docs] def plot_pdf(self, *, plot_type="3d", log_z=False, **kwargs): """Plot the PDF using the numerical :meth:`pdf_vectorized` implementation.""" title = kwargs.pop("title", "XiNuBoundaryCopula") zlabel = kwargs.pop("zlabel", "PDF") if plot_type == "3d": return self._plot3d(self.pdf_vectorized, title, zlabel, **kwargs) elif plot_type == "contour": return self._plot_contour( self.pdf_vectorized, title, zlabel, log_z=log_z, **kwargs ) else: raise ValueError(f"plot_type must be '3d' or 'contour', not {plot_type}")
[docs] def plot_cond_distr_1(self, *, plot_type="3d", log_z=False, **kwargs): """ Plot h(u,v) = ∂_u C(u,v). Uses the symbolic expression and injects q(v) so the base lambdify has a valid mapping. """ title = kwargs.pop("title", "Conditional Distribution h(u,v)") zlabel = kwargs.pop("zlabel", "h(u,v)") expr = self.cond_distr_1() # SymPy expression depending on q(v) if plot_type == "3d": return self._plot3d(expr, title, zlabel, zlim=(0, 1), **kwargs) elif plot_type == "contour": return self._plot_contour( expr, title, zlabel, zlim=(0, 1), log_z=log_z, **kwargs ) elif plot_type == "slices": return self._plot_functions(expr, title, zlabel, **kwargs) else: raise ValueError("plot_type must be '3d', 'contour', or 'slices'.")
@property def is_absolutely_continuous(self) -> bool: """XiNuBoundaryCopula has a density everywhere on (0,1)^2.""" return True
[docs] def plot_cond_distr_2(self, *, plot_type="3d", log_z=False, **kwargs): """Not available: q(v) is implicit and prevents a closed form.""" raise NotImplementedError( "cond_distr_2 is not available due to the implicit function q(v)." )
ClampedParabolaCopula: TypeAlias = XiNuBoundaryCopula if __name__ == "__main__": b_values = [0.5, 1, 2] # corresponds to mu = 0.2, 0.5, 1.0, 2.0 for b in b_values: copula = XiNuBoundaryCopula(b=b) tp2 = copula.is_tp2() print(f"{b}: {tp2}") # copula.plot_pdf(plot_type="contour", levels=999, grid_size=2000, zlim=(0, 8)) # copula.plot_cond_distr_1(plot_type="contour", levels=999, grid_size=999)