Source code for copul.checkerboard.check_pi

import itertools

import numpy as np

from copul.checkerboard.check import Check
from copul.family.core.copula_plotting_mixin import CopulaPlottingMixin
from copul.family.core.copula_approximator_mixin import CopulaApproximatorMixin


[docs] class CheckPi(Check, CopulaPlottingMixin, CopulaApproximatorMixin): def __new__(cls, matr, *args, **kwargs): """ Create a new CheckPi instance or a BivCheckPi instance if dimension is 2. Parameters ---------- matr : array-like Matrix of values that determine the copula's distribution. *args, **kwargs Additional arguments passed to the constructor. Returns ------- CheckPi or BivCheckPi A CheckPi instance, or a BivCheckPi instance if dimension is 2. """ # If this is the CheckPi class itself (not a subclass) if cls is CheckPi: # Convert matrix to numpy array to get its dimensionality matr_arr = np.asarray(matr) # Check if it's a 2D matrix (bivariate copula) if matr_arr.ndim == 2: # Import the BivCheckPi class here to avoid circular imports try: # Use importlib approach for better testability import importlib bcp_module = importlib.import_module( "copul.checkerboard.biv_check_pi" ) BivCheckPi = getattr(bcp_module, "BivCheckPi") # Return a new BivCheckPi instance with the same arguments return BivCheckPi(matr, *args, **kwargs) except (ImportError, ModuleNotFoundError, AttributeError): # If the import fails, just continue with normal instantiation pass # Otherwise, create a normal instance of the class instance = super().__new__(cls) return instance def __str__(self): return f"CheckPiCopula({self.matr.shape})" @property def is_absolutely_continuous(self) -> bool: return True @property def is_symmetric(self) -> bool: return np.allclose(self.matr, self.matr.T)
[docs] def cdf(self, *args): """ Compute the CDF at one or multiple points. This method handles both single-point and multi-point CDF evaluation in an efficient vectorized manner. Parameters ---------- *args : array-like or float Either: - Multiple separate coordinates (x, y, ...) of a single point - A single array-like object with coordinates of a single point - A 2D array where each row represents a separate point Returns ------- float or numpy.ndarray If a single point is provided, returns a float. If multiple points are provided, returns an array of shape (n_points,). Examples -------- # Single point as separate arguments value = copula.cdf(0.3, 0.7) # Single point as array value = copula.cdf([0.3, 0.7]) # Multiple points as 2D array values = copula.cdf(np.array([[0.1, 0.2], [0.3, 0.4]])) """ # Handle different input formats if len(args) == 0: raise ValueError("No arguments provided") elif len(args) == 1: # A single argument was provided - either a point or multiple points arg = args[0] if hasattr(arg, "ndim") and hasattr(arg, "shape"): # NumPy array or similar arr = np.asarray(arg, dtype=float) if arr.ndim == 1: # 1D array - single point if len(arr) != self.dim: raise ValueError( f"Expected point array of length {self.dim}, got {len(arr)}" ) return self._cdf_single_point(arr) elif arr.ndim == 2: # 2D array - multiple points if arr.shape[1] != self.dim: raise ValueError( f"Expected points with {self.dim} dimensions, got {arr.shape[1]}" ) return self._cdf_vectorized_impl(arr) else: raise ValueError(f"Expected 1D or 2D array, got {arr.ndim}D array") elif hasattr(arg, "__len__"): # List, tuple, or similar sequence if len(arg) == self.dim: # Single point as a sequence return self._cdf_single_point(np.array(arg, dtype=float)) else: raise ValueError( f"Expected point with {self.dim} dimensions, got {len(arg)}" ) else: # Single scalar value - only valid for 1D case if self.dim == 1: return self._cdf_single_point(np.array([arg], dtype=float)) else: raise ValueError( f"Single scalar provided but copula has {self.dim} dimensions" ) else: # Multiple arguments provided if len(args) == self.dim: # Separate coordinates for a single point return self._cdf_single_point(np.array(args, dtype=float)) else: raise ValueError(f"Expected {self.dim} coordinates, got {len(args)}")
def _cdf_single_point(self, u): """ Helper method to compute CDF for a single point using vectorized operations. Parameters ---------- u : numpy.ndarray 1D array of length dim representing a single point. Returns ------- float CDF value at the point. """ # Short-circuit checks if np.any(u <= 0): return 0.0 if np.all(u >= 1): return 1.0 # Create a numpy meshgrid of indices for vectorized computation indices = np.meshgrid(*[np.arange(s) for s in self.matr.shape], indexing="ij") # Calculate the fraction matrix - vectorized approach fraction_matrix = np.ones_like(self.matr, dtype=float) for d in range(self.dim): # Get indices array for this dimension idx_d = indices[d] # Calculate lower and upper bounds lower_d = idx_d / self.matr.shape[d] upper_d = (idx_d + 1) / self.matr.shape[d] # Calculate overlap with [0, u[d]] overlap_d = np.maximum(0.0, np.minimum(u[d], upper_d) - lower_d) # Convert to fraction of cell width cell_width_d = 1.0 / self.matr.shape[d] frac_d = overlap_d / cell_width_d # Clip to ensure fractions are in [0, 1] frac_d = np.clip(frac_d, 0.0, 1.0) # Multiply into the fraction matrix fraction_matrix *= frac_d # Multiply by cell masses and sum result = np.sum(self.matr * fraction_matrix) return float(result) def _cdf_vectorized_impl(self, points): """ Implementation of vectorized CDF for multiple points. Parameters ---------- points : numpy.ndarray Array of shape (n_points, dim) where each row is a point. Returns ------- numpy.ndarray Array of shape (n_points,) with CDF values. """ # Convert to numpy array points = np.asarray(points, dtype=float) n_points = points.shape[0] results = np.zeros(n_points) # Create mask arrays for special cases all_zeros_mask = np.any(points <= 0, axis=1) all_ones_mask = np.all(points >= 1, axis=1) # Set results for special cases results[all_zeros_mask] = 0.0 results[all_ones_mask] = 1.0 # Filter out points that need actual computation compute_mask = ~(all_zeros_mask | all_ones_mask) compute_points = points[compute_mask] if len(compute_points) == 0: return results # Precompute cell information matr_shape = np.array(self.matr.shape) # Create a numpy meshgrid of indices for the cells indices = np.meshgrid(*[np.arange(s) for s in self.matr.shape], indexing="ij") indices = np.array([idx.ravel() for idx in indices]).T # Shape: (n_cells, dim) # Reshape for broadcasting against points indices = indices[:, np.newaxis, :] # Shape: (n_cells, 1, dim) compute_points = compute_points[ np.newaxis, :, : ] # Shape: (1, n_points_to_compute, dim) # Calculate cell bounds lower_bounds = indices / matr_shape[np.newaxis, np.newaxis, :] upper_bounds = (indices + 1) / matr_shape[np.newaxis, np.newaxis, :] # Calculate overlap with [0, u] for each dimension and each point overlaps = np.maximum( 0.0, np.minimum(compute_points, upper_bounds) - lower_bounds ) # Convert to fraction of cell width cell_widths = 1.0 / matr_shape[np.newaxis, np.newaxis, :] fractions = overlaps / cell_widths fractions = np.clip(fractions, 0.0, 1.0) # Calculate the total fraction for each cell and point cell_fractions = np.prod( fractions, axis=2 ) # Shape: (n_cells, n_points_to_compute) # Get cell masses cell_masses = self.matr.ravel()[:, np.newaxis] # Shape: (n_cells, 1) # Calculate weighted sum for each point weighted_fractions = ( cell_masses * cell_fractions ) # Shape: (n_cells, n_points_to_compute) point_results = np.sum( weighted_fractions, axis=0 ) # Shape: (n_points_to_compute,) # Put results back in the output array results[compute_mask] = point_results return results
[docs] def cond_distr(self, i, *args): """ Compute the conditional distribution for one or multiple points. Parameters ---------- i : int Dimension index (1-based) to condition on. *args : array-like or float Either: - Multiple separate coordinates (x, y, ...) of a single point - A single array-like object with coordinates of a single point - A 2D array where each row represents a separate point Returns ------- float or numpy.ndarray If a single point is provided, returns a float. If multiple points are provided, returns an array of shape (n_points,). Examples -------- # Single point as separate arguments value = copula.cond_distr(1, 0.3, 0.7) # Single point as array value = copula.cond_distr(1, [0.3, 0.7]) # Multiple points as 2D array values = copula.cond_distr(1, np.array([[0.1, 0.2], [0.3, 0.4]])) """ if i < 1 or i > self.dim: raise ValueError(f"Dimension {i} out of range 1..{self.dim}") # Handle different input formats if len(args) == 0: raise ValueError("No point coordinates provided") elif len(args) == 1: # A single argument was provided - either a point or multiple points arg = args[0] if hasattr(arg, "ndim") and hasattr(arg, "shape"): # NumPy array or similar arr = np.asarray(arg, dtype=float) if arr.ndim == 1: # 1D array - single point if len(arr) != self.dim: raise ValueError( f"Expected point array of length {self.dim}, got {len(arr)}" ) return self._cond_distr_single(i, arr) elif arr.ndim == 2: # 2D array - multiple points if arr.shape[1] != self.dim: raise ValueError( f"Expected points with {self.dim} dimensions, got {arr.shape[1]}" ) return self._cond_distr_vectorized(i, arr) else: raise ValueError(f"Expected 1D or 2D array, got {arr.ndim}D array") elif hasattr(arg, "__len__"): # List, tuple, or similar sequence if len(arg) == self.dim: # Single point as a sequence return self._cond_distr_single(i, np.array(arg, dtype=float)) else: raise ValueError( f"Expected point with {self.dim} dimensions, got {len(arg)}" ) else: # Single scalar value - only valid for 1D case if self.dim == 1: return self._cond_distr_single(i, np.array([arg], dtype=float)) else: raise ValueError( f"Single scalar provided but copula has {self.dim} dimensions" ) else: # Multiple arguments provided if len(args) == self.dim: # Separate coordinates for a single point return self._cond_distr_single(i, np.array(args, dtype=float)) else: raise ValueError(f"Expected {self.dim} coordinates, got {len(args)}")
[docs] def cond_distr_1(self, u): """F_{U_{-1}|U_1}(u_{-1} | u_1).""" return self.cond_distr(1, u)
[docs] def cond_distr_2(self, u): """F_{U_{-2}|U_2}(u_{-2} | u_2).""" return self.cond_distr(2, u)
def _cond_distr_single(self, i, u): """ Helper method for conditional distribution of a single point. This preserves the original implementation for a single point. Parameters ---------- i : int Dimension index (1-based) to condition on. u : numpy.ndarray Single point as a 1D array of length dim. Returns ------- float Conditional distribution value. """ i0 = i - 1 # Convert to 0-based index # Find which cell index along dim i0 the coordinate u[i0] falls into x_i = u[i0] if x_i < 0: return 0.0 # If 'conditioning coordinate' <0, prob is 0 elif x_i >= 1: # If 'conditioning coordinate' >=1, then we pick the last cell index i_idx = self.matr.shape[i0] - 1 else: i_idx = int(np.floor(x_i * self.matr.shape[i0])) # clamp (just in case) if i_idx < 0: i_idx = 0 elif i_idx >= self.matr.shape[i0]: i_idx = self.matr.shape[i0] - 1 # Clear cached results from previous calls to avoid test interference self.intervals = {} # Cache key for the slice indices only slice_key = (i, i_idx) # Check if we've cached the slice indices for this dimension and index if slice_key in self.intervals: slice_indices = self.intervals[slice_key] # Recalculate denominator (the sum of all cells in the slice) denom = sum(self.matr[c] for c in slice_indices) else: # 1) DENOMINATOR: sum of all cells in "slice" c[i0] = i_idx denom = 0.0 slice_indices = [] # This is more efficient than using itertools.product for the whole space # when we're only interested in a specific slice indices = [range(s) for s in self.matr.shape] indices[i0] = [i_idx] # Fix the i0 dimension for c in itertools.product(*indices): cell_mass = self.matr[c] denom += cell_mass if cell_mass > 0: # Only track positive mass cells for numerator slice_indices.append(c) # Cache the slice indices for future use self.intervals[slice_key] = slice_indices if denom <= 0: return 0.0 # 2) NUMERATOR: Among that same slice, we see how much is below u[j] in each j != i0 num = 0.0 for c in slice_indices: cell_mass = self.matr[c] fraction = 1.0 for j in range(self.dim): if j == i0: # No partial coverage in the conditioning dimension continue # Use exactly the same calculation method as the original lower_j = c[j] / self.matr.shape[j] upper_j = (c[j] + 1) / self.matr.shape[j] val_j = u[j] # Overlap with [0, val_j] in this dimension if val_j <= 0: fraction = 0.0 break if val_j >= 1: # entire cell dimension is included continue # Calculate exactly as in the original implementation overlap_len = max(0.0, min(val_j, upper_j) - lower_j) cell_width = 1.0 / self.matr.shape[j] frac_j = overlap_len / cell_width # fraction in [0,1] if frac_j <= 0: fraction = 0.0 break if frac_j > 1.0: frac_j = 1.0 fraction *= frac_j if fraction == 0.0: break if fraction > 0.0: num += cell_mass * fraction return num / denom def _cond_distr_vectorized(self, i, points): """ Vectorized implementation of conditional distribution for multiple points. Parameters ---------- i : int Dimension index (1-based) to condition on. points : numpy.ndarray Multiple points as a 2D array of shape (n_points, dim). Returns ------- numpy.ndarray Array of shape (n_points,) with conditional distribution values. """ # Convert to numpy array points = np.asarray(points, dtype=float) n_points = points.shape[0] results = np.zeros(n_points) # Convert to 0-based index i0 = i - 1 # Process each point separately (vectorizing all calculations for a single point) # This is more efficient than trying to vectorize across all points at once for cond_distr for p_idx, u in enumerate(points): x_i = u[i0] # Special case: conditioning coordinate < 0 if x_i < 0: results[p_idx] = 0.0 continue # Determine the cell index along the conditioning dimension if x_i >= 1: i_idx = self.matr.shape[i0] - 1 else: i_idx = int(np.floor(x_i * self.matr.shape[i0])) # clamp (just in case) i_idx = max(0, min(i_idx, self.matr.shape[i0] - 1)) # Create a slice for efficient indexing slice_spec = [slice(None)] * self.dim slice_spec[i0] = i_idx # Get the slice of the matrix - all cells with index i_idx in dimension i0 slice_matr = self.matr[tuple(slice_spec)] # Calculate denominator - sum of all cells in the slice denom = np.sum(slice_matr) if denom <= 0: results[p_idx] = 0.0 continue # For numerator, we need to compute fractions for each cell in the slice # First, create indices for all cells in the slice remaining_dims = [d for d in range(self.dim) if d != i0] mesh_dims = [range(self.matr.shape[d]) for d in remaining_dims] if mesh_dims: # Only create meshgrid if we have remaining dimensions # Create meshgrid for remaining dimensions mesh = np.meshgrid(*mesh_dims, indexing="ij") indices = np.array( [idx.ravel() for idx in mesh] ).T # Shape: (n_cells_in_slice, len(remaining_dims)) # Expand indices to include the fixed i0 dimension full_indices = [] for idx in indices: full_idx = [] rem_dim_idx = 0 for d in range(self.dim): if d == i0: full_idx.append(i_idx) else: full_idx.append(idx[rem_dim_idx]) rem_dim_idx += 1 full_indices.append(tuple(full_idx)) # Calculate fractions for each cell in the slice numerator = 0.0 for c in full_indices: cell_mass = self.matr[c] if cell_mass <= 0: continue fraction = 1.0 for j in range(self.dim): if j == i0: continue # Skip conditioning dimension lower_j = c[j] / self.matr.shape[j] upper_j = (c[j] + 1) / self.matr.shape[j] val_j = u[j] if val_j <= 0: fraction = 0.0 break if val_j >= 1: continue overlap_len = max(0.0, min(val_j, upper_j) - lower_j) cell_width = 1.0 / self.matr.shape[j] frac_j = overlap_len / cell_width frac_j = min(1.0, max(0.0, frac_j)) fraction *= frac_j if fraction <= 0: break if fraction > 0: numerator += cell_mass * fraction results[p_idx] = numerator / denom else: # Special case: only one dimension results[p_idx] = 1.0 if denom > 0 else 0.0 return results
[docs] def pdf(self, *args): """ Evaluate the piecewise PDF at one or multiple points. Parameters ---------- *args : array-like or float Either: - Multiple separate coordinates (x, y, ...) of a single point - A single array-like object with coordinates of a single point - A 2D array where each row represents a separate point Returns ------- float or numpy.ndarray If a single point is provided, returns a float. If multiple points are provided, returns an array of shape (n_points,). Examples -------- # Single point as separate arguments value = copula.pdf(0.3, 0.7) # Single point as array value = copula.pdf([0.3, 0.7]) # Multiple points as 2D array values = copula.pdf(np.array([[0.1, 0.2], [0.3, 0.4]])) """ # Handle different input formats if len(args) == 0: raise ValueError("No arguments provided") elif len(args) == 1: # A single argument was provided - either a point or multiple points arg = args[0] if hasattr(arg, "ndim") and hasattr(arg, "shape"): # NumPy array or similar arr = np.asarray(arg, dtype=float) if arr.ndim == 1: # 1D array - single point if len(arr) != self.dim: raise ValueError( f"Expected point array of length {self.dim}, got {len(arr)}" ) return self._pdf_single_point(arr) elif arr.ndim == 2: # 2D array - multiple points if arr.shape[1] != self.dim: raise ValueError( f"Expected points with {self.dim} dimensions, got {arr.shape[1]}" ) return self._pdf_vectorized(arr) else: raise ValueError(f"Expected 1D or 2D array, got {arr.ndim}D array") elif hasattr(arg, "__len__"): # List, tuple, or similar sequence if len(arg) == self.dim: # Single point as a sequence return self._pdf_single_point(np.array(arg, dtype=float)) else: raise ValueError( f"Expected point with {self.dim} dimensions, got {len(arg)}" ) else: # Single scalar value - only valid for 1D case if self.dim == 1: return self._pdf_single_point(np.array([arg], dtype=float)) else: raise ValueError( f"Single scalar provided but copula has {self.dim} dimensions" ) else: # Multiple arguments provided if len(args) == self.dim: # Separate coordinates for a single point return self._pdf_single_point(np.array(args, dtype=float)) else: raise ValueError(f"Expected {self.dim} coordinates, got {len(args)}")
def _pdf_single_point(self, u): """ Helper method to compute PDF for a single point. Parameters ---------- u : numpy.ndarray 1D array of length dim representing a single point. Returns ------- float PDF value at the point. """ if np.any(u < 0) or np.any(u > 1): return 0.0 # Identify which cell the point falls into cell_idx = [] for k, val in enumerate(u): ix = int(np.floor(val * self.matr.shape[k])) ix = max(0, min(ix, self.matr.shape[k] - 1)) cell_idx.append(ix) # Return the cell's mass return float(self.matr[tuple(cell_idx)]) * np.prod(self.matr.shape) def _pdf_vectorized(self, points): """ Vectorized implementation of PDF for multiple points. Parameters ---------- points : numpy.ndarray Array of shape (n_points, dim) where each row is a point. Returns ------- numpy.ndarray Array of shape (n_points,) with PDF values. """ prod = np.prod(self.matr.shape) # Convert to numpy array points = np.asarray(points, dtype=float) n_points = points.shape[0] results = np.zeros(n_points) # Filter out points outside [0,1]^d valid_mask = np.all((points >= 0) & (points <= 1), axis=1) if not np.any(valid_mask): return results * prod # Process only valid points valid_points = points[valid_mask] # Calculate cell indices for each point - this is the key vectorized operation indices = np.floor(valid_points * np.array(self.matr.shape)).astype(int) # Clip indices to valid ranges for d in range(self.dim): indices[:, d] = np.clip(indices[:, d], 0, self.matr.shape[d] - 1) # Get PDF values using advanced indexing for i, idx in enumerate(indices): results[valid_mask][i] = self.matr[tuple(idx)] return results * prod
[docs] def rvs(self, n=1, random_state=None, **kwargs): """ Draw random variates from the d-dimensional checkerboard copula efficiently. Parameters ---------- n : int Number of samples to generate. random_state : int, optional Seed for random number generator. Returns ------- np.ndarray Array of shape (n, d) containing n samples in d dimensions. """ if random_state is not None: np.random.seed(random_state) # Flatten the matrix and create probability distribution flat_matrix = np.asarray(self.matr, dtype=float).ravel() total = flat_matrix.sum() if total <= 0: raise ValueError("Matrix contains no positive values, cannot sample") probs = flat_matrix / total # Sample flat indices according to cell probabilities flat_indices = np.random.choice(np.arange(len(probs)), size=n, p=probs) # Convert flat indices to multi-indices as a (d, n) array indices_arrays = np.unravel_index(flat_indices, self.matr.shape) # Transform indices to a (n, d) array indices = np.column_stack(indices_arrays) # Generate uniform jitter for each dimension jitter = np.random.rand(n, self.dim) # Combine indices and jitter to get final coordinates return (indices + jitter) / np.array(self.matr.shape)
@staticmethod def _weighted_random_selection(matrix, num_samples): """ Select elements from 'matrix' with probability proportional to matrix entries. Return (selected_values, selected_multi_indices). """ arr = np.asarray(matrix, dtype=float).ravel() p = arr / arr.sum() flat_indices = np.random.choice(np.arange(arr.size), size=num_samples, p=p) shape = matrix.shape multi_idx = [np.unravel_index(ix, shape) for ix in flat_indices] selected_elements = matrix[tuple(np.array(multi_idx).T)] return selected_elements, multi_idx
[docs] def lambda_L(self): return 0
[docs] def lambda_U(self): return 0