from typing import Tuple, TypeAlias
import numpy as np
import sympy as sp
from copul.family.core.biv_copula import BivCopula
from copul.family.frechet.biv_independence_copula import BivIndependenceCopula
[docs]
class NumericWrapper:
"""
A simple wrapper for numeric functions to make them compatible
with the plotting utilities that expect a callable object.
"""
def __init__(self, func):
self.func = func
def __call__(self, u, v):
# Delegate directly to the numeric function
return self.func(u, v)
[docs]
class XiPsiApproxLowerBoundaryCopula(BivCopula):
r"""
Two-parameter "diagonal strip" copula with a rectangular hole.
The copula is defined by a transformation variable T and a hole geometry H.
C(u,v) = (u*t - Area(Intersection)) / (1 - beta)
where t = F_T^{-1}(v).
Parameters
----------
alpha : float
Controls the horizontal start of the diagonal slope. alpha in [0, 0.5).
beta : float
Controls the vertical thickness of the hole. beta in (0, 1).
"""
# Parameters and domains
alpha, beta = sp.symbols("alpha beta", real=True)
params = [alpha, beta]
intervals = {
# Update: include 1/2 in the domain (Lopen = Left-open, Right-closed ideally,
# or simply use a closed bound if supported by the framework)
"alpha": sp.Interval(0, sp.Rational(1, 2), left_open=True, right_open=False),
"beta": sp.Interval.open(0, 1),
}
special_cases = {0: BivIndependenceCopula}
# Convenience symbols
u, v = sp.symbols("u v", nonnegative=True)
def __init__(self, *args, **kwargs):
if args:
if len(args) == 1:
kwargs["alpha"] = args[0]
elif len(args) == 2:
kwargs["alpha"], kwargs["beta"] = args
else:
raise ValueError("Provide at most two positional args: alpha, beta.")
super().__init__(**kwargs)
def _get_constants(self) -> Tuple[float, float, float, float, float, float, float]:
"""Computes structural constants A, K, C, u1, k."""
a = float(self.alpha)
b = float(self.beta)
denom = 1.0 - b
denom2 = denom * denom
# Determine if we are in the limiting case alpha = 0.5
is_limit = np.isclose(a, 0.5)
A = (1.0 - a) / denom
if is_limit:
# Limits as alpha -> 0.5
K = 0.0
# C_val = 1 / (1-b)
C_val = 1.0 / denom
# k_slope is infinite (vertical jump), handled via logic
k_slope = np.inf
else:
K = (1.0 - 2.0 * a) / denom2
C_val = (1.0 - 2.0 * b + 2.0 * a * b) / denom2
k_slope = denom / (1.0 - 2.0 * a)
# Threshold CDF value at v=beta
# If K=0, this simplifies to A*b
u1 = A * b - 0.5 * K * (b**2)
return a, b, A, K, C_val, u1, k_slope
@staticmethod
def _psi_vec(s, alpha, beta, k_slope):
"""Vectorized ψ(s). Handles step function at alpha=0.5."""
s = np.asarray(s, dtype=float)
res = np.zeros_like(s)
# Case alpha = 0.5: Step function
if np.isclose(alpha, 0.5):
# psi(s) = 1 - beta if s > 0.5 else 0
# We use 0.5 as the threshold
mask_up = s > 0.5
if np.any(mask_up):
res[mask_up] = 1.0 - beta
return res
# Standard Case alpha < 0.5
# Region 2 (middle)
mask_mid = (s > alpha) & (s < 1.0 - alpha)
if np.any(mask_mid):
res[mask_mid] = k_slope * (s[mask_mid] - alpha)
# Region 3 (upper)
mask_up = s >= 1.0 - alpha
if np.any(mask_up):
res[mask_up] = 1.0 - beta
return res
def _quantile_t(self, v):
"""Computes t = F_T^{-1}(v). Handles K=0 (Linear segments)."""
v = np.asarray(v, dtype=float)
a, b, A, K, C_val, u1, _ = self._get_constants()
u2 = 1.0 - u1
t_vals = np.zeros_like(v)
# Check for linear limit (alpha = 0.5 implies K = 0)
is_linear = np.isclose(K, 0.0)
# 1. Lower Region (v < u1)
mask1 = v < u1
if np.any(mask1):
if is_linear:
# F(t) = A*t => t = v/A
t_vals[mask1] = v[mask1] / A
else:
disc = A**2 - 2.0 * K * v[mask1]
t_vals[mask1] = (A - np.sqrt(np.maximum(0, disc))) / K
# 2. Middle Region (u1 <= v <= u2)
# This region is always linear with slope 1/C_val
mask2 = (v >= u1) & (v <= u2)
if np.any(mask2):
t_vals[mask2] = b + (v[mask2] - u1) / C_val
# 3. Upper Region (v > u2)
mask3 = v > u2
if np.any(mask3):
if is_linear:
# Symmetric to lower: t = 1 - (1-v)/A
t_vals[mask3] = 1.0 - (1.0 - v[mask3]) / A
else:
disc = A**2 - 2.0 * K * (1.0 - v[mask3])
t_vals[mask3] = 1.0 - (A - np.sqrt(np.maximum(0, disc))) / K
return np.clip(t_vals, 0.0, 1.0)
def _calc_H(self, u, t):
"""
Calculates H(u, t).
Handles the split cases for alpha=0.5 (two disjoint rectangles) vs alpha<0.5 (connected ramp).
"""
a, b, _, _, _, _, k = self._get_constants()
u = np.asarray(u, dtype=float)
t = np.asarray(t, dtype=float)
if u.shape != t.shape:
u, t = np.broadcast_arrays(u, t)
total_area = np.zeros_like(u)
# --- Case: alpha = 0.5 (Disjoint Rectangles) ---
if np.isclose(a, 0.5):
# Hole 1: [0, 0.5] x [0, beta]
len1 = np.clip(u, 0, 0.5)
total_area += len1 * np.minimum(t, b)
# Hole 2: (0.5, 1.0] x [1-beta, 1.0]
# Contribution only if u > 0.5
len2 = np.maximum(0, u - 0.5)
height2 = np.maximum(0, t - (1.0 - b))
total_area += len2 * height2
return total_area
# --- Case: alpha < 0.5 (Ramp Connection) ---
# Region 1: [0, alpha] -> Rect height min(t, beta)
len1 = np.clip(u, 0, a)
total_area += len1 * np.minimum(t, b)
# Region 2: [alpha, 1-alpha] -> Trapezoid/Triangle logic
U_eff = np.clip(u - a, 0, 1.0 - 2.0 * a)
mask_r2 = U_eff > 0
if np.any(mask_r2):
Ue = U_eff[mask_r2]
tv = t[mask_r2]
inv_k = 1.0 / k
x0 = tv * inv_k
xb = (tv - b) * inv_k
area_r2 = np.zeros_like(Ue)
# Sub-case A: t <= beta
mask_no_sat = tv <= b
if np.any(mask_no_sat):
u_ns = Ue[mask_no_sat]
t_ns = tv[mask_no_sat]
x0_ns = x0[mask_no_sat]
limit = np.minimum(u_ns, np.maximum(0, x0_ns))
area_ns = t_ns * limit - 0.5 * k * (limit**2)
area_r2[mask_no_sat] = area_ns
# Sub-case B: t > beta
mask_sat = tv > b
if np.any(mask_sat):
u_s = Ue[mask_sat]
t_s = tv[mask_sat]
xb_s = xb[mask_sat]
x0_s = x0[mask_sat]
# 1. Saturation part (beta * x)
sat_lim = np.minimum(u_s, xb_s)
area_s = b * sat_lim
# 2. Ramp part
ramp_start = xb_s
ramp_end = np.minimum(u_s, x0_s)
valid_ramp = ramp_end > ramp_start
if np.any(valid_ramp):
rs = ramp_start[valid_ramp]
re = ramp_end[valid_ramp]
ts = t_s[valid_ramp]
val_end = ts * re - 0.5 * k * (re**2)
val_start = ts * rs - 0.5 * k * (rs**2)
area_s[valid_ramp] += val_end - val_start
area_r2[mask_sat] = area_s
total_area[mask_r2] += area_r2
# Region 3: [1-alpha, 1] -> Rect height max(0, t - (1-beta))
len3 = np.maximum(0, u - (1.0 - a))
height3 = np.maximum(0, t - (1.0 - b))
total_area += len3 * height3
return total_area
def _calc_hole_width_at_t(self, u, t):
"""
Calculates partial derivative of H w.r.t t (dh/dt).
This represents the horizontal width of the hole at height t,
intersected with [0, u].
"""
a, b, _, _, _, _, k = self._get_constants()
width = np.zeros_like(u)
# --- Case: alpha = 0.5 ---
if np.isclose(a, 0.5):
# Hole 1 width: t in (0, beta) => width is min(u, 0.5)
in_reg1 = (t > 0) & (t < b)
width += np.where(in_reg1, np.clip(u, 0, 0.5), 0.0)
# Hole 2 width: t in (1-beta, 1) => width is max(0, u - 0.5)
in_reg2 = (t > 1.0 - b) & (t < 1.0)
width += np.where(in_reg2, np.maximum(0, u - 0.5), 0.0)
return width
# --- Case: alpha < 0.5 ---
# Region 1 check (Lower hole)
in_reg1 = (t > 0) & (t < b)
width += np.where(in_reg1, np.clip(u, 0, a), 0.0)
# Region 2 check (Ramp)
inv_k = 1.0 / k
s_min = a + (t - b) * inv_k
s_max = a + t * inv_k
s_start = np.clip(s_min, a, 1.0 - a)
s_end = np.clip(s_max, a, 1.0 - a)
w_start = np.minimum(s_start, u)
w_end = np.minimum(s_end, u)
width += np.maximum(0, w_end - w_start)
# Region 3 check (Upper hole)
in_reg3 = (t > 1.0 - b) & (t < 1.0)
len3 = np.maximum(0, u - (1.0 - a))
width += np.where(in_reg3, len3, 0.0)
return width
def _density_t(self, t):
"""Computes f_T(t)."""
t = np.asarray(t, dtype=float)
a, b, A, K, C_val, _, _ = self._get_constants()
res = np.zeros_like(t)
m1 = t < b
res[m1] = A - K * t[m1]
m2 = (t >= b) & (t <= 1.0 - b)
res[m2] = C_val
m3 = t > 1.0 - b
res[m3] = A - K * (1.0 - t[m3])
return res
def _calc_h_u(self, u, t):
"""h(u, t) = min(beta, max(0, t - psi(u)))."""
a, b, _, _, _, _, k = self._get_constants()
psi_val = self._psi_vec(u, a, b, k)
return np.minimum(b, np.maximum(0, t - psi_val))
[docs]
def cdf(self, u=None, v=None):
"""
Cumulative distribution function C(u,v).
"""
# Return the NotImplemented error if accessed as property/no-args
# (per user constraint on _cdf_expr)
if u is None and v is None:
return self._cdf_expr
return self.cdf_vectorized(u, v)
[docs]
def pdf(self, u=None, v=None):
"""
Probability Density Function c(u,v).
"""
# If no args, return a wrapper that can be called by plotting tools
if u is None and v is None:
return NumericWrapper(self.pdf_vectorized)
return self.pdf_vectorized(u, v)
[docs]
def cond_distr_1(self, u=None, v=None):
"""P(V <= v | U = u) = dC/du."""
if u is None and v is None:
return NumericWrapper(self.cond_distr_1)
u = np.asarray(u, dtype=float)
v = np.asarray(v, dtype=float)
_, b, _, _, _, _, _ = self._get_constants()
t_vals = self._quantile_t(v)
h_u_vals = self._calc_h_u(u, t_vals)
res = (t_vals - h_u_vals) / (1.0 - b)
return np.clip(res, 0.0, 1.0)
[docs]
def cond_distr_2(self, u=None, v=None):
"""P(U <= u | V = v) = dC/dv."""
if u is None and v is None:
return NumericWrapper(self.cond_distr_2)
u = np.asarray(u, dtype=float)
v = np.asarray(v, dtype=float)
_, b, _, _, _, _, _ = self._get_constants()
t_vals = self._quantile_t(v)
ft_vals = self._density_t(t_vals)
dh_dt = self._calc_hole_width_at_t(u, t_vals)
# Guard small density
safe_ft = np.where(ft_vals < 1e-12, 1e-12, ft_vals)
term1 = (u - dh_dt) / (1.0 - b)
res = term1 * (1.0 / safe_ft)
return np.clip(res, 0.0, 1.0)
[docs]
def cdf_vectorized(self, u, v):
"""Exact numeric Copula CDF."""
u = np.asarray(u, dtype=float)
v = np.asarray(v, dtype=float)
_, b, _, _, _, _, _ = self._get_constants()
t_vals = self._quantile_t(v)
h_val = self._calc_H(u, t_vals)
u_cl = np.clip(u, 0, 1)
t_cl = np.clip(t_vals, 0, 1)
res = (u_cl * t_cl - h_val) / (1.0 - b)
return np.clip(res, 0.0, 1.0)
[docs]
def pdf_vectorized(self, u, v):
"""Exact numeric Copula PDF."""
u = np.asarray(u, dtype=float)
v = np.asarray(v, dtype=float)
a, b, _, _, _, _, k = self._get_constants()
t_vals = self._quantile_t(v)
ft_vals = self._density_t(t_vals)
psi_u = self._psi_vec(u, a, b, k)
in_hole = (t_vals > psi_u) & (t_vals < psi_u + b)
res = np.zeros_like(t_vals)
scale = 1.0 / (1.0 - b)
mask_valid = ~in_hole
if np.any(mask_valid):
safe_ft = np.where(ft_vals[mask_valid] < 1e-12, 1e-12, ft_vals[mask_valid])
res[mask_valid] = scale * (1.0 / safe_ft)
return res
@property
def _cdf_expr(self):
"""
Symbolic expression not available.
"""
raise NotImplementedError(
"The analytical CDF for this density is implemented in cdf_vectorized."
)
@property
def is_absolutely_continuous(self) -> bool:
return True
@property
def is_symmetric(self) -> bool:
return False
# Type alias
DiagonalStripCopula: TypeAlias = XiPsiApproxLowerBoundaryCopula
if __name__ == "__main__":
# quick smoke test & plots
pairs = [(0.2, 0.30), (0.3, 0.50), (0.40, 0.50)]
for a, b in pairs:
cop = XiPsiApproxLowerBoundaryCopula(alpha=a, beta=b)
cop.plot_pdf(plot_type="contour", levels=999, grid_size=100)
print(cop.cdf(0.4, 0.6))
print(cop.cond_distr_1(0.4, 0.6))
print(cop.cond_distr_2(0.4, 0.6))
print(cop.pdf(0.4, 0.6))