Source code for copul.checkerboard.check_min

import itertools
import logging
import importlib  # Import at module level

import numpy as np

from copul.checkerboard.check import Check
from copul.exceptions import PropertyUnavailableException

log = logging.getLogger(__name__)


[docs] class CheckMin(Check): """ Checkerboard "Min" Copula This copula implements a "min-fraction" approach for computing the cumulative distribution function (CDF) across all dimensions, and a fully discrete method for calculating conditional distributions. Key features: - CDF Calculation: Uses a min-fraction partial coverage over all dimensions to aggregate the CDF. - Conditional Distribution (cond_distr): For any given dimension i and input vector u: 1. In dimension i, determine the cell index as floor(u[i] * dim[i]). The entire slice corresponding to this index constitutes the conditioning event (denominator). 2. In every other dimension j ≠ i, only include cells where the cell index c[j] is strictly less than floor(u[j] * dim[j]); this avoids any partial coverage in these dimensions. 3. Compute the conditional distribution as the ratio of the count of cells meeting the numerator condition (cells matching the target criteria) to the total count of cells in the conditioning event (denominator). Example: For a 2x2x2 grid and u = (0.5, 0.5, 0.5): - In dimension 0, we use floor(0.5 * 2) = 1, selecting the second layer. - Within that layer, for dimensions 1 and 2, only cells with indices less than floor(0.5 * 2) = 1 are considered (i.e., only cells with index 0). - This results in 1 favorable cell out of 4 in the conditioning event, so: cond_distr(1, (0.5, 0.5, 0.5)) = 1 / 4 = 0.25. """ def __new__(cls, matr, *args, **kwargs): """ Create a new CheckMin instance or a BivCheckMin instance if dimension is 2. Parameters ---------- matr : array-like Matrix of values that determine the copula's distribution. *args, **kwargs Additional arguments passed to the constructor. Returns ------- CheckMin or BivCheckMin A CheckMin instance, or a BivCheckMin instance if dimension is 2. """ # If this is the CheckMin class itself (not a subclass) if cls is CheckMin: # Convert matrix to numpy array to get its dimensionality matr_arr = np.asarray(matr) # Check if it's a 2D matrix (bivariate copula) if matr_arr.ndim == 2: # Import the BivCheckMin class here to avoid circular imports try: bcp_module = importlib.import_module( "copul.checkerboard.biv_check_min" ) BivCheckMin = getattr(bcp_module, "BivCheckMin") # Return a new BivCheckMin instance with the same arguments return BivCheckMin(matr, *args, **kwargs) except (ImportError, ModuleNotFoundError, AttributeError): # If the import fails, just continue with normal instantiation pass # Create a normal instance by directly calling the parent's __new__ # Using the actual class for clarity and to avoid MRO issues from copul.checkerboard.check import Check instance = Check.__new__(cls) return instance def __str__(self): return f"CheckMinCopula({self.matr.shape})" @property def is_absolutely_continuous(self) -> bool: # 'Min' copula is degenerate along lines, so not absolutely continuous in R^d return False # -------------------------------------------------------------------------- # 1) CDF with 'min fraction' partial coverage # ------------------------------------------>--------------------------------
[docs] def cdf(self, *args): """ Compute the CDF at one or multiple points. This method handles both single-point and multi-point CDF evaluation in an efficient vectorized manner, using the 'min-fraction' approach specific to CheckMin. Parameters ---------- *args : array-like or float Either: - Multiple separate coordinates (x, y, ...) of a single point - A single array-like object with coordinates of a single point - A 2D array where each row represents a separate point Returns ------- float or numpy.ndarray If a single point is provided, returns a float. If multiple points are provided, returns an array of shape (n_points,). Examples -------- # Single point as separate arguments value = copula.cdf(0.3, 0.7) # Single point as array value = copula.cdf([0.3, 0.7]) # Multiple points as 2D array values = copula.cdf(np.array([[0.1, 0.2], [0.3, 0.4]])) """ # Handle different input formats if len(args) == 0: raise ValueError("No arguments provided") elif len(args) == 1: # A single argument was provided - either a point or multiple points arg = args[0] if hasattr(arg, "ndim") and hasattr(arg, "shape"): # NumPy array or similar arr = np.asarray(arg, dtype=float) if arr.ndim == 1: # 1D array - single point if len(arr) != self.dim: raise ValueError( f"Expected point array of length {self.dim}, got {len(arr)}" ) return self._cdf_single_point(arr) elif arr.ndim == 2: # 2D array - multiple points if arr.shape[1] != self.dim: raise ValueError( f"Expected points with {self.dim} dimensions, got {arr.shape[1]}" ) return self._cdf_vectorized_impl(arr) else: raise ValueError(f"Expected 1D or 2D array, got {arr.ndim}D array") elif hasattr(arg, "__len__"): # List, tuple, or similar sequence if len(arg) == self.dim: # Single point as a sequence return self._cdf_single_point(np.array(arg, dtype=float)) else: raise ValueError( f"Expected point with {self.dim} dimensions, got {len(arg)}" ) else: # Single scalar value - only valid for 1D case if self.dim == 1: return self._cdf_single_point(np.array([arg], dtype=float)) else: raise ValueError( f"Single scalar provided but copula has {self.dim} dimensions" ) else: # Multiple arguments provided if len(args) == self.dim: # Separate coordinates for a single point return self._cdf_single_point(np.array(args, dtype=float)) else: raise ValueError(f"Expected {self.dim} coordinates, got {len(args)}")
def _cdf_single_point(self, u): """ Helper method to compute CDF for a single point using the min-fraction approach. Parameters ---------- u : numpy.ndarray 1D array of length dim representing a single point. Returns ------- float CDF value at the point. """ # Quick boundaries if np.any(u <= 0): return 0.0 if np.all(u >= 1): return 1.0 total = 0.0 for c in itertools.product(*(range(s) for s in self.matr.shape)): cell_mass = self.matr[c] if cell_mass <= 0: continue # min-fraction across dims frac_cell = 1.0 for k in range(self.dim): lower_k = c[k] / self.matr.shape[k] upper_k = (c[k] + 1) / self.matr.shape[k] overlap_k = max(0.0, min(u[k], upper_k) - lower_k) if overlap_k <= 0: frac_cell = 0.0 break width_k = 1.0 / self.matr.shape[k] frac_k = overlap_k / width_k if frac_k > 1.0: frac_k = 1.0 if frac_k < frac_cell: frac_cell = frac_k if frac_cell > 0: total += cell_mass * frac_cell return float(total) def _cdf_vectorized_impl(self, points): """ Implementation of vectorized CDF for multiple points using the min-fraction approach. Parameters ---------- points : numpy.ndarray Array of shape (n_points, dim) where each row is a point. Returns ------- numpy.ndarray Array of shape (n_points,) with CDF values. """ # Convert to numpy array points = np.asarray(points, dtype=float) n_points = points.shape[0] results = np.zeros(n_points) # Create mask arrays for special cases all_zeros_mask = np.any(points <= 0, axis=1) all_ones_mask = np.all(points >= 1, axis=1) # Set results for special cases results[all_zeros_mask] = 0.0 results[all_ones_mask] = 1.0 # Filter out points that need actual computation compute_mask = ~(all_zeros_mask | all_ones_mask) compute_points = points[compute_mask] if len(compute_points) == 0: return results # Precompute cell information for more efficient processing matr_shape = np.array(self.matr.shape) # Create a numpy meshgrid of indices for the cells indices = np.meshgrid(*[np.arange(s) for s in self.matr.shape], indexing="ij") indices = np.array([idx.ravel() for idx in indices]).T # Shape: (n_cells, dim) # Only process cells with positive mass cell_masses = self.matr.ravel() positive_mass_mask = cell_masses > 0 if not np.any(positive_mass_mask): return results # All cells have zero mass indices = indices[positive_mass_mask] cell_masses = cell_masses[positive_mass_mask] # Reshape for broadcasting indices = indices[:, np.newaxis, :] # Shape: (n_cells, 1, dim) compute_points = compute_points[ np.newaxis, :, : ] # Shape: (1, n_points_to_compute, dim) cell_masses = cell_masses[:, np.newaxis] # Shape: (n_cells, 1) # Calculate cell bounds lower_bounds = indices / matr_shape[np.newaxis, np.newaxis, :] upper_bounds = (indices + 1) / matr_shape[np.newaxis, np.newaxis, :] # Calculate overlap with [0, u] for each dimension and each point overlaps = np.maximum( 0.0, np.minimum(compute_points, upper_bounds) - lower_bounds ) # Convert to fraction of cell width cell_widths = 1.0 / matr_shape[np.newaxis, np.newaxis, :] fractions = overlaps / cell_widths fractions = np.clip(fractions, 0.0, 1.0) # Calculate the min fraction across dimensions for each cell and point # This is the key difference from CheckPi which uses a product of fractions min_fractions = np.min( fractions, axis=2 ) # Shape: (n_cells, n_points_to_compute) # Early termination for cells with zero overlap in any dimension zero_overlap_mask = np.any(overlaps <= 0, axis=2) min_fractions[zero_overlap_mask] = 0.0 # Calculate weighted sum for each point weighted_fractions = ( cell_masses * min_fractions ) # Shape: (n_cells, n_points_to_compute) point_results = np.sum( weighted_fractions, axis=0 ) # Shape: (n_points_to_compute,) # Put results back in the output array results[compute_mask] = point_results return results
[docs] def cond_distr(self, i, *args): """ Compute the conditional distribution for one or multiple points. Parameters ---------- i : int Dimension index (1-based) to condition on. *args : array-like or float Either: - Multiple separate coordinates (x, y, ...) of a single point - A single array-like object with coordinates of a single point - A 2D array where each row represents a separate point Returns ------- float or numpy.ndarray If a single point is provided, returns a float. If multiple points are provided, returns an array of shape (n_points,). Examples -------- # Single point as separate arguments value = copula.cond_distr(1, 0.3, 0.7) # Single point as array value = copula.cond_distr(1, [0.3, 0.7]) # Multiple points as 2D array values = copula.cond_distr(1, np.array([[0.1, 0.2], [0.3, 0.4]])) """ if i < 1 or i > self.dim: raise ValueError(f"Dimension {i} out of range 1..{self.dim}") # Handle different input formats if len(args) == 0: raise ValueError("No point coordinates provided") elif len(args) == 1: # A single argument was provided - either a point or multiple points arg = args[0] if hasattr(arg, "ndim") and hasattr(arg, "shape"): # NumPy array or similar arr = np.asarray(arg, dtype=float) if arr.ndim == 1: # 1D array - single point if len(arr) != self.dim: raise ValueError( f"Expected point array of length {self.dim}, got {len(arr)}" ) return self._cond_distr_single(i, arr) elif arr.ndim == 2: # 2D array - multiple points if arr.shape[1] != self.dim: raise ValueError( f"Expected points with {self.dim} dimensions, got {arr.shape[1]}" ) return self._cond_distr_vectorized(i, arr) else: raise ValueError(f"Expected 1D or 2D array, got {arr.ndim}D array") elif hasattr(arg, "__len__"): # List, tuple, or similar sequence if len(arg) == self.dim: # Single point as a sequence return self._cond_distr_single(i, np.array(arg, dtype=float)) else: raise ValueError( f"Expected point with {self.dim} dimensions, got {len(arg)}" ) else: # Single scalar value - only valid for 1D case if self.dim == 1: return self._cond_distr_single(i, np.array([arg], dtype=float)) else: raise ValueError( f"Single scalar provided but copula has {self.dim} dimensions" ) else: # Multiple arguments provided if len(args) == self.dim: # Separate coordinates for a single point return self._cond_distr_single(i, np.array(args, dtype=float)) else: raise ValueError(f"Expected {self.dim} coordinates, got {len(args)}")
def _cond_distr_single(self, i, u): """ Helper method for conditional distribution of a single point. This preserves the original implementation for CheckMin for a single point. Parameters ---------- i : int Dimension index (1-based) to condition on. u : numpy.ndarray Single point as a 1D array of length dim. Returns ------- float Conditional distribution value. """ i0 = i - 1 # Convert to 0-based index # Find which cell index along dim i0 the coordinate u[i0] falls into x_i = u[i0] if x_i < 0: return 0.0 # If 'conditioning coordinate' <0, prob is 0 elif x_i >= 1: # If 'conditioning coordinate' >=1, then we pick the last cell index i_idx = self.matr.shape[i0] - 1 else: i_idx = int(np.floor(x_i * self.matr.shape[i0])) # clamp (just in case) if i_idx < 0: i_idx = 0 elif i_idx >= self.matr.shape[i0]: i_idx = self.matr.shape[i0] - 1 # For safety, reset the intervals cache between calls self.intervals = {} # Cache key for the slice indices slice_key = (i, i_idx) # Check if we have cached slice indices for this dimension and index if slice_key in self.intervals: slice_indices = self.intervals[slice_key] # Calculate denominator - sum of all cells in the slice denom = 0.0 for c in slice_indices: denom += self.matr[c] else: # Create more efficient slice iteration by only iterating through relevant dimensions indices = [range(s) for s in self.matr.shape] indices[i0] = [i_idx] # Fix the i0 dimension # Collect all cells in the slice denom = 0.0 slice_indices = [] for c in itertools.product(*indices): cell_mass = self.matr[c] denom += cell_mass # Store all cells for CheckMin (even zero mass cells might be needed for boundary checks) slice_indices.append(c) # Store in cache for future use self.intervals[slice_key] = slice_indices if denom <= 0: return 0.0 # Precompute 1/dim values for faster calculations inv_dims = np.array([1.0 / dim for dim in self.matr.shape]) u_array = np.array(u) # Calculate the conditioning dimension's fraction once val_i0 = u_array[i0] lower_i0 = i_idx * inv_dims[i0] upper_i0 = (i_idx + 1) * inv_dims[i0] overlap_len_i0 = max(0.0, min(val_i0, upper_i0) - lower_i0) frac_i = ( overlap_len_i0 * self.matr.shape[i0] ) # Multiply by dim instead of dividing # Calculate numerator num = 0.0 for c in slice_indices: cell_mass = self.matr[c] if cell_mass <= 0: continue qualifies = True for j in range(self.dim): if j == i0: continue # Skip conditioning dimension val_j = u_array[j] lower_j = c[j] * inv_dims[j] # Early termination check - more efficient boundary comparison if val_j <= lower_j: qualifies = False break upper_j = (c[j] + 1) * inv_dims[j] # If val_j >= upper_j, entire cell dimension is included, so continue if val_j >= upper_j: continue # Partial overlap - calculate fraction overlap_len = ( val_j - lower_j ) # No need for max() since we know val_j > lower_j frac_j = ( overlap_len * self.matr.shape[j] ) # Multiply by dim instead of dividing # More efficient fraction comparison with numerical stability if frac_j < frac_i and abs(frac_j - frac_i) > 1e-10: qualifies = False break if qualifies: num += cell_mass return num / denom def _cond_distr_vectorized(self, i, points): """ Vectorized implementation of conditional distribution for multiple points. Adapted for the CheckMin-specific logic. Parameters ---------- i : int Dimension index (1-based) to condition on. points : numpy.ndarray Multiple points as a 2D array of shape (n_points, dim). Returns ------- numpy.ndarray Array of shape (n_points,) with conditional distribution values. """ # Convert to numpy array points = np.asarray(points, dtype=float) n_points = points.shape[0] results = np.zeros(n_points) # Convert to 0-based index i0 = i - 1 # Process each point separately - the CheckMin cond_distr algorithm isn't easily vectorizable # across multiple points at once due to its conditional logic for p_idx, u in enumerate(points): x_i = u[i0] # Special case: conditioning coordinate < 0 if x_i < 0: results[p_idx] = 0.0 continue # Determine the cell index along the conditioning dimension if x_i >= 1: i_idx = self.matr.shape[i0] - 1 else: i_idx = int(np.floor(x_i * self.matr.shape[i0])) # clamp (just in case) i_idx = max(0, min(i_idx, self.matr.shape[i0] - 1)) # Create a slice for efficient indexing slice_spec = [slice(None)] * self.dim slice_spec[i0] = i_idx # Get the slice of the matrix - all cells with index i_idx in dimension i0 slice_matr = self.matr[tuple(slice_spec)] # Calculate denominator - sum of all cells in the slice denom = np.sum(slice_matr) if denom <= 0: results[p_idx] = 0.0 continue # For numerator, we need to check each cell in the slice against CheckMin's criteria # Create indices for all cells in the slice remaining_dims = [d for d in range(self.dim) if d != i0] mesh_dims = [range(self.matr.shape[d]) for d in remaining_dims] if mesh_dims: # Only create meshgrid if we have remaining dimensions # Create meshgrid for remaining dimensions mesh = np.meshgrid(*mesh_dims, indexing="ij") indices = np.array( [idx.ravel() for idx in mesh] ).T # Shape: (n_cells_in_slice, len(remaining_dims)) # Expand indices to include the fixed i0 dimension full_indices = [] for idx in indices: full_idx = [] rem_dim_idx = 0 for d in range(self.dim): if d == i0: full_idx.append(i_idx) else: full_idx.append(idx[rem_dim_idx]) rem_dim_idx += 1 full_indices.append(tuple(full_idx)) # Precompute 1/dim values for faster calculations inv_dims = np.array([1.0 / dim for dim in self.matr.shape]) # Calculate the conditioning dimension's fraction once val_i0 = u[i0] lower_i0 = i_idx * inv_dims[i0] upper_i0 = (i_idx + 1) * inv_dims[i0] overlap_len_i0 = max(0.0, min(val_i0, upper_i0) - lower_i0) frac_i = overlap_len_i0 * self.matr.shape[i0] # Calculate numerator using CheckMin's specific logic numerator = 0.0 for c in full_indices: cell_mass = self.matr[c] if cell_mass <= 0: continue qualifies = True for j in range(self.dim): if j == i0: continue # Skip conditioning dimension val_j = u[j] lower_j = c[j] * inv_dims[j] # Early termination check if val_j <= lower_j: qualifies = False break upper_j = (c[j] + 1) * inv_dims[j] # If val_j >= upper_j, entire cell is included if val_j >= upper_j: continue # Partial overlap - calculate fraction overlap_len = val_j - lower_j frac_j = overlap_len * self.matr.shape[j] # CheckMin-specific criterion if frac_j < frac_i and abs(frac_j - frac_i) > 1e-10: qualifies = False break if qualifies: numerator += cell_mass results[p_idx] = numerator / denom else: # Special case: only one dimension results[p_idx] = 1.0 if denom > 0 else 0.0 return results @property def pdf(self): raise PropertyUnavailableException("PDF does not exist for CheckMin.")
[docs] def rvs(self, n=1, random_state=None, **kwargs): """ More efficient implementation of random variate sampling. """ if random_state is not None: np.random.seed(random_state) log.info(f"Generating {n} random variates for {self}...") # Get cell indices according to their probability weights _, idxs = self._weighted_random_selection(self.matr, n) # Generate n random numbers uniformly in [0, 1] randoms = np.random.uniform(size=n) # Pre-allocate the output array for better performance out = np.zeros((n, self.dim)) # Pre-compute inverse dimensions (1/dim) for faster division inv_dims = np.array([1.0 / dim for dim in self.matr.shape]) # Process each selected cell more efficiently for i, (c, u) in enumerate(zip(idxs, randoms)): # Vectorized computation of lower bounds lower_bounds = np.array([c[d] * inv_dims[d] for d in range(self.dim)]) # Vectorized computation of ranges (upper - lower) ranges = np.array( [(c[d] + 1) * inv_dims[d] - lower_bounds[d] for d in range(self.dim)] ) # Directly compute the interpolated point and store in output array out[i] = lower_bounds + u * ranges return out
@staticmethod def _weighted_random_selection(matrix, num_samples): arr = np.asarray(matrix, dtype=float).ravel() p = arr / arr.sum() flat_indices = np.random.choice(np.arange(arr.size), size=num_samples, p=p) shape = matrix.shape multi_idx = [np.unravel_index(ix, shape) for ix in flat_indices] selected_elements = matrix[tuple(np.array(multi_idx).T)] return selected_elements, multi_idx
if __name__ == "__main__": ccop = CheckMin([[1, 2], [2, 1]]) ccop.cdf((0.2, 0.2))