Source code for copul.checkerboard.shuffle_min

from typing import Sequence, Union
import numpy as np

from copul.family.core.biv_core_copula import BivCoreCopula
from copul.family.core.copula_approximator_mixin import CopulaApproximatorMixin
from copul.family.core.copula_plotting_mixin import CopulaPlottingMixin


[docs] class ShuffleOfMin(BivCoreCopula, CopulaPlottingMixin, CopulaApproximatorMixin): r""" Straight shuffle-of-Min copula :math:`C_\pi` of order :math:`n`. Parameters ---------- pi : Sequence[int] A permutation of :math:`\{1,\dots,n\}` in **1-based** notation. ``pi[i]`` represents :math:`\pi(i+1)`. Notes ----- The support consists of :math:`n` diagonal line segments \[ S_i \;=\; \bigl\{\,((i-1+t)/n,\; (\pi(i)-1+t)/n) : 0 \le t \le 1 \,\bigr\}. \] The copula is **singular** (the density is 0 almost everywhere). A convenient closed form for the CDF is \[ C(u,v) \;=\; \frac{1}{n}\sum_{i=1}^n \min\!\Bigl( 1,\; \max\!\bigl(0,\; \min(nu-i+1,\; nv-\pi(i)+1)\bigr) \Bigr). \] For fixed :math:`u \in ((i-1)/n,\, i/n)`, the conditional CDF :math:`C_1(v\mid u)` is a step function: it jumps from 0 to 1 at \[ v_0 \;=\; \frac{\pi(i)-1 + t}{n}, \qquad t \;=\; nu - (i-1). \] At the boundaries, :math:`C_1(v\mid 0)=v` and :math:`C_1(v\mid 1)=v`. Similarly for :math:`C_2(u\mid v)` with :math:`C_2(u\mid 0)=u` and :math:`C_2(u\mid 1)=u`. """ def __init__(self, pi: Sequence[int]) -> None: self.pi = np.asarray(pi, dtype=int) if self.pi.ndim != 1: raise ValueError("pi must be a 1-D permutation array.") self.n = len(self.pi) if self.n == 0: raise ValueError("Permutation cannot be empty.") # Check if it's a valid permutation of 1..n if sorted(self.pi.tolist()) == list(range(0, self.n)): self.pi += 1 # Convert to 1-based permutation if sorted(self.pi.tolist()) != list(range(1, self.n + 1)): raise ValueError("pi must be a permutation of 1..n") # Pre-compute 0-based permutation and its inverse for efficiency self.pi0 = self.pi - 1 # 0-based permutation: pi0[i] = pi(i+1)-1 if self.n > 0: # 0-based inverse: pi0_inv[j] = k means pi0[k] = j self.pi0_inv = np.argsort(self.pi0) else: self.pi0_inv = np.array([], dtype=int) # Check if this is identity or reverse permutation (for optimized calculations) self.is_identity = np.array_equal(self.pi, np.arange(1, self.n + 1)) self.is_reverse = np.array_equal(self.pi, np.arange(self.n, 0, -1)) BivCoreCopula.__init__(self) # Sets self.dim = 2 # ---------- helper ------------------------------------------------------- def _process_args( self, args ) -> tuple[Union[float, np.ndarray], Union[float, np.ndarray]]: r""" Normalize positional inputs and extract ``u`` and ``v``. Accepted forms -------------- - ``_process_args((u, v))`` - ``_process_args(([u, v],))`` (single point as 1-D array) - ``_process_args(([[u1, v1], ...],))`` (multiple points as 2-D array) Returns ------- tuple[float | ndarray, float | ndarray] The two coordinates ``(u, v)`` with shapes broadcastable for vectorized use. Raises ------ ValueError If the input shape is invalid. """ if not args: raise ValueError("No arguments provided.") if len(args) == 2: # Direct u, v arguments return args[0], args[1] if len(args) == 1: arr = np.asarray(args[0], dtype=float) if arr.ndim == 1: if arr.size == 2: # Single point as 1D array [u, v] return arr[0], arr[1] else: raise ValueError("1D input must have length 2.") elif arr.ndim == 2: if arr.shape[1] == 2: # Multiple points as 2D array [[u1, v1], [u2, v2], ...] return arr[:, 0], arr[:, 1] else: raise ValueError("2D input must have 2 columns.") else: raise ValueError("Input must be 1D or 2D array.") raise ValueError(f"Expected 1 or 2 arguments, got {len(args)}.") # ---------- CDF ----------------------------------------------------------
[docs] def cdf(self, *args) -> Union[float, np.ndarray]: r""" Copula :math:`C_\pi(u,v)` (vectorized). Multiple call signatures are supported: - ``cdf(u, v)`` where ``u`` and ``v`` are scalars or arrays, - ``cdf([u, v])`` for a single point as a 1-D array, - ``cdf([[u1, v1], [u2, v2], ...])`` for multiple points as a 2-D array. Returns ------- float or numpy.ndarray The CDF values at the specified points. """ u, v = self._process_args(args) # Ensure inputs are arrays and broadcastable u_arr, v_arr = np.broadcast_arrays(u, v) # Check bounds if np.any((u_arr < 0) | (u_arr > 1) | (v_arr < 0) | (v_arr > 1)): raise ValueError("u, v must lie in [0,1].") # Store if the original input was scalar to return scalar at the end input_is_scalar = np.isscalar(u) and np.isscalar(v) # --- Optimization for identity permutation --- if self.is_identity: # For identity, CDF is simply min(u, v) result = np.minimum(u_arr, v_arr) return result.item() if input_is_scalar else result # Initialize output array with the same shape as broadcast inputs out = np.zeros_like(u_arr, dtype=float) # --- Handle boundary conditions using masks --- tol = 1e-9 # Tolerance for floating point comparisons mask_u0 = np.abs(u_arr) < tol mask_v0 = np.abs(v_arr) < tol mask_u1 = np.abs(u_arr - 1.0) < tol mask_v1 = np.abs(v_arr - 1.0) < tol # Condition: C(u,v) = 0 if u=0 or v=0 out[mask_u0 | mask_v0] = 0.0 # Condition: C(1,v) = v (apply where u=1 but v!=0) # Ensure C(1,0)=0 is preserved out[mask_u1 & ~mask_v0] = v_arr[mask_u1 & ~mask_v0] # Condition: C(u,1) = u (apply where v=1 but u!=0 and u!=1) # Ensure C(0,1)=0 and C(1,1)=1 are preserved out[mask_v1 & ~mask_u0 & ~mask_u1] = u_arr[mask_v1 & ~mask_u0 & ~mask_u1] # Identify points strictly inside (0,1)x(0,1) for the main calculation mask_in = ~(mask_u0 | mask_v0 | mask_u1 | mask_v1) # Perform calculation only if there are interior points if np.any(mask_in): # Select only interior points u_in = u_arr[mask_in] v_in = v_arr[mask_in] # --- Vectorized calculation for interior points --- nu = self.n * u_in[:, None] nv = self.n * v_in[:, None] i_idx = np.arange(self.n)[None, :] # Segment indices (0 to n-1) pi_i_0based = self.pi0[None, :] # 0-based permutation values # Calculate contribution from each segment: # min(max(0, min(nu-(i-1), nv-(pi(i)-1))), 1) t_u_comp = nu - i_idx t_v_comp = nv - pi_i_0based min_t = np.minimum(t_u_comp, t_v_comp) # Capped contribution: min( max(0, min_t), 1.0 ) capped_contribution = np.minimum(np.maximum(0.0, min_t), 1.0) # Sum contributions over segments and divide by n cdf_values_in = np.sum(capped_contribution, axis=1) / self.n # Assign results back to the output array out[mask_in] = cdf_values_in # Return scalar if input was scalar, otherwise return the array if input_is_scalar: return out.item() else: return out
# ---------- PDF ----------------------------------------------------------
[docs] def pdf(self, *args) -> Union[float, np.ndarray]: r""" The straight shuffle-of-Min copula is singular ⇒ the density is 0 a.e. """ raise NotImplementedError( "The straight shuffle-of-Min copula is singular. PDF does not exist." )
# ---------- Conditional Distribution -------------------------------------
[docs] def cond_distr_1(self, *args) -> Union[float, np.ndarray]: r""" Conditional distribution of :math:`V` given :math:`U=u`: :math:`C_1(v\mid u)=\mathbb{P}(V\le v\mid U=u)`. Same calling conventions as :meth:`cdf`. """ return self.cond_distr(1, *args)
[docs] def cond_distr_2(self, *args) -> Union[float, np.ndarray]: r""" Conditional distribution of :math:`U` given :math:`V=v`: :math:`C_2(u\mid v)=\mathbb{P}(U\le u\mid V=v)`. Same calling conventions as :meth:`cdf`. """ return self.cond_distr(2, *args)
[docs] def cond_distr(self, i: int, *args) -> Union[float, np.ndarray]: r""" Conditional distribution (vectorized). Computes - :math:`C_1(v\mid u)` if ``i=1`` (condition on :math:`U`) - :math:`C_2(u\mid v)` if ``i=2`` (condition on :math:`V`) For interior conditioning values the conditional CDF is a step function jumping from :math:`0` to :math:`1` at the point where the corresponding support segment is reached. At the boundaries :math:`u\in\{0,1\}` or :math:`v\in\{0,1\}` the conditionals are uniform (:math:`v` or :math:`u`, respectively). Parameters ---------- i : int ``1`` for :math:`C_1(v\mid u)`, ``2`` for :math:`C_2(u\mid v)`. *args Same calling conventions as :meth:`cdf`. Returns ------- float or numpy.ndarray Conditional CDF value(s). Raises ------ ValueError If ``i`` is outside ``{1, 2}`` or coordinates lie outside :math:`[0,1]`. """ if not (1 <= i <= self.dim): raise ValueError(f"i must be between 1 and {self.dim}") u, v = self._process_args(args) # Ensure inputs are arrays and broadcastable u_arr, v_arr = np.broadcast_arrays(u, v) # Check bounds if np.any((u_arr < 0) | (u_arr > 1) | (v_arr < 0) | (v_arr > 1)): raise ValueError("u, v must lie in [0,1].") # Store if the original input was scalar input_is_scalar = np.isscalar(u) and np.isscalar(v) # Initialize output array out = np.zeros_like(u_arr, dtype=float) # Use a small tolerance for floating point comparisons tol = 1e-9 # --- C_1(v|u): Conditional of V given U=u --- if i == 1: # Handle boundaries: C_1(v|0)=v, C_1(v|1)=v mask_u0 = np.abs(u_arr) < tol mask_u1 = np.abs(u_arr - 1.0) < tol mask_boundary = mask_u0 | mask_u1 mask_in = ~mask_boundary # Interior u values # Apply boundary condition out[mask_boundary] = v_arr[mask_boundary] # Process interior u values if np.any(mask_in): u_in = u_arr[mask_in] v_in = v_arr[mask_in] # Find segment index i (0-based) based on u: i/n <= u < (i+1)/n # Use floor(n*u - tol) to handle edge cases near boundaries like u=1/n i_idx = np.floor(self.n * u_in - tol).astype(int) # Clip to handle potential edge case u slightly less than 0 or exactly 1 i_idx = np.clip(i_idx, 0, self.n - 1) # Calculate parameter t = n*u - i t = self.n * u_in - i_idx # Find the corresponding v0 on the segment's diagonal # v0 = (pi(i+1)-1 + t) / n = (pi0[i_idx] + t) / n pi_i_0based = self.pi0[i_idx] # Get pi(i+1)-1 using 0-based index v0 = (pi_i_0based + t) / self.n # Conditional CDF is 1 if v >= v0, else 0 out[mask_in] = (v_in >= v0 - tol).astype( float ) # Add tol for comparison robustness # --- C_2(u|v): Conditional of U given V=v --- elif i == 2: # Handle boundaries: C_2(u|0)=u, C_2(u|1)=u mask_v0 = np.abs(v_arr) < tol mask_v1 = np.abs(v_arr - 1.0) < tol mask_boundary = mask_v0 | mask_v1 mask_in = ~mask_boundary # Interior v values # Apply boundary condition out[mask_boundary] = u_arr[mask_boundary] # Process interior v values if np.any(mask_in): u_in = u_arr[mask_in] v_in = v_arr[mask_in] # Find the segment index k (0-based) and parameter t corresponding to v # Find the rank j (0-based) of v: j/n <= v < (j+1)/n j_idx = np.floor(self.n * v_in - tol).astype(int) j_idx = np.clip(j_idx, 0, self.n - 1) # Find the segment index k (0-based) such that pi0[k] = j_idx # This is k = pi0_inv[j_idx] k_idx = self.pi0_inv[j_idx] # Calculate parameter t = n*v - j t = self.n * v_in - j_idx # Find the corresponding u0 on the segment's diagonal # u0 = (k + t) / n u0 = (k_idx + t) / self.n # Conditional CDF is 1 if u >= u0, else 0 out[mask_in] = (u_in >= u0 - tol).astype( float ) # Add tol for comparison robustness # Return scalar if input was scalar, otherwise return the array if input_is_scalar: return out.item() else: return out
# ---------- utilities ---------------------------------------------------- def __str__(self): return f"ShuffleOfMin(order={self.n}, pi={self.pi.tolist()})" # simple generators for simulation / plotting -----------------------------
[docs] def rvs(self, size: int, **kwargs) -> np.ndarray: r""" Generate :math:`\texttt{size}` i.i.d. samples from :math:`C_{\pi}`. Sampling picks a segment index uniformly from :math:`\{0,\dots,n-1\}` and a parameter :math:`t\sim U(0,1)`, then sets :math:`u=(i+t)/n`, :math:`v=(\pi(i+1)-1+t)/n`. Parameters ---------- size : int Number of samples. Returns ------- numpy.ndarray Array of shape ``(size, 2)`` with samples in :math:`[0,1]^2`. """ # Choose a random segment index (0 to n-1) for each sample seg_idx = np.random.randint(0, self.n, size=size) # Choose a random parameter t (0 to 1) along the segment diagonal t = np.random.random(size=size) # Calculate u and v based on the chosen segment and t # u = (i + t) / n where i = seg_idx u = (seg_idx + t) / self.n # v = (pi(i+1)-1 + t) / n = (pi0[i] + t) / n v = (self.pi0[seg_idx] + t) / self.n return np.column_stack([u, v])
# --- Association measures ------------------------------------------------
[docs] def kendall_tau(self) -> float: r""" Population Kendall’s :math:`\tau` via inversion count. Let :math:`N_{\mathrm{inv}}` be the number of inversions of the 0-based permutation ``pi0``. Then :math:`\tau = 1 - \dfrac{4\,N_{\mathrm{inv}}}{n^2}`. Returns ------- float Kendall’s :math:`\tau` (``nan`` if :math:`n\le 1`). """ # Handle n=1 case first if self.n <= 1: return np.nan if self.is_identity: return 1.0 # Correct calculation using 0-based indexing internally for pi0 pi0_temp = self.pi0 # Use precomputed 0-based perm N_inv = sum( 1 for i in range(self.n) for j in range(i + 1, self.n) if pi0_temp[i] > pi0_temp[j] ) # Denominator n*(n-1)/2 is the total number of pairs # Tau = 1 - 2 * (N_inv / (n*(n-1)/2)) = 1 - 4*N_inv/(n*(n-1)) tau = 1.0 - 4.0 * N_inv / (self.n**2) return tau
[docs] def spearman_rho(self) -> float: r""" Population Spearman’s :math:`\rho` via squared rank differences. With ranks :math:`1,\dots,n` and :math:`\pi(1),\dots,\pi(n)`, :math:`\rho = 1 - \dfrac{6\sum_{i=1}^n (i-\pi(i))^2}{n^3}`. Returns ------- float Spearman’s :math:`\rho` (``nan`` if :math:`n\le 1`). """ # Handle n=1 case first if self.n <= 1: return np.nan if self.is_identity: return 1.0 # Ranks for u are essentially 1, 2, ..., n based on segment index # Ranks for v are pi(1), pi(2), ..., pi(n) i_ranks = np.arange(1, self.n + 1) pi_ranks = self.pi # Use 1-based perm for rank difference calculation d_sq = np.sum((i_ranks - pi_ranks) ** 2) # Rho = 1 - 6 * sum(d^2) / (n * (n^2 - 1)) return 1.0 - 6.0 * d_sq / self.n**3
[docs] def chatterjee_xi(self) -> float: r""" Chatterjee’s :math:`\xi`. For any straight shuffle-of-Min (functional dependence along segments), :math:`\xi=1`. Returns ------- float Always ``1.0`` (``nan`` only if :math:`n=0`). """ # V is a piecewise linear function of U, so xi should be 1. # For n=1, dependence is perfect, so 1 seems reasonable, though ranks are trivial. if self.n == 0: return np.nan # Or raise error? return 1.0
[docs] def tail_lower(self) -> float: r""" Lower tail dependence coefficient :math:`\lambda_L`. It is positive (equal to 1) iff the first segment lies on the main diagonal, i.e. :math:`\pi(1)=1`; otherwise it is 0. Returns ------- float :math:`\lambda_L \in \{0,1\}` (``nan`` if :math:`n=0`). """ if self.n == 0: return np.nan return 1.0 if self.pi[0] == 1 else 0.0
[docs] def tail_upper(self) -> float: r""" Upper tail dependence coefficient :math:`\lambda_U`. It is positive (equal to 1) iff the last segment lies on the main diagonal, i.e. :math:`\pi(n)=n`; otherwise it is 0. Returns ------- float :math:`\lambda_U \in \{0,1\}` (``nan`` if :math:`n=0`). """ if self.n == 0: return np.nan return 1.0 if self.pi[-1] == self.n else 0.0
if __name__ == "__main__": # Example usage copula = ShuffleOfMin([1, 3, 2]) copula.plot_c_over_u() copula.plot_cdf() copula.plot_cond_distr_1() copula.plot_cond_distr_2()