Source code for copul.schur_order.cis_rearranger

"""
CISRearranger module for rearranging copulas to be conditionally increasing in sequence.

This module implements the rearrangement algorithm from:
Strothmann, Dette, Siburg (2022) - "Rearranged dependence measures"
"""

import logging
from typing import Union, Optional, Any, List

import numpy as np
import sympy
from numpy.typing import NDArray

from copul.checkerboard.biv_check_pi import BivCheckPi
from copul.checkerboard.checkerboarder import Checkerboarder

# Set up logger
log = logging.getLogger(__name__)


[docs] class CISRearranger: """ Class for rearranging copulas to be conditionally increasing in sequence (CIS). The rearrangement preserves the checkerboard approximation's margins while creating an ordering such that the conditional distribution functions are ordered decreasingly with respect to the conditioning value. Attributes: _checkerboard_size: Size of the checkerboard grid for approximating copulas """ def __init__(self, checkerboard_size: Optional[int] = None): """ Initialize a CISRearranger. Args: checkerboard_size: Size of checkerboard grid for approximation. If None, uses the default size in Checkerboarder. """ self._checkerboard_size = checkerboard_size def __str__(self) -> str: """Return string representation of the rearranger.""" return f"CISRearranger(checkerboard_size={self._checkerboard_size})"
[docs] def rearrange_copula(self, copula: Any) -> sympy.Matrix: """ Rearrange a copula to be conditionally increasing in sequence. Args: copula: A copula object or BivCheckPi object to rearrange Returns: A sympy Matrix representing the rearranged copula's density """ # Create checkerboarder with specified grid size checkerboarder = Checkerboarder(self._checkerboard_size) # If input is already a checkerboard copula, use it directly if isinstance(copula, BivCheckPi): ccop = copula else: # Otherwise convert to checkerboard approximation log.debug( f"Converting copula to checkerboard approximation with grid size {self._checkerboard_size}" ) ccop = checkerboarder.get_checkerboard_copula(copula) # Perform the rearrangement return self.rearrange_checkerboard(ccop)
[docs] @staticmethod def rearrange_checkerboard( ccop: Union[BivCheckPi, List[List[float]], NDArray, sympy.Matrix, Any], ) -> sympy.Matrix: """ Rearrange a checkerboard copula to be conditionally increasing in sequence (CIS), using numeric (NumPy) operations for speed. Implements Algorithm 1 from Strothmann, Dette, Siburg (2022). Parameters ---------- ccop : Union[BivCheckPi, list, np.ndarray, sympy.Matrix, Any] The checkerboard copula to rearrange. Can be: - a BivCheckPi instance, - a 2D list of floats, - a 2D numpy.ndarray, - a sympy.Matrix, - or any object with a `.matr` attribute containing one of the above. Returns ------- sympy.Matrix The density matrix of the rearranged copula, with shape (n_rows, n_cols). Each entry is a sympy-compatible expression (usually float). """ log.debug("Rearranging checkerboard...") # ------------------------------------------------------ # 1. Extract matrix from BivCheckPi or direct input # ------------------------------------------------------ if isinstance(ccop, BivCheckPi): matr = ccop.matr else: matr = ccop # assumed to be array-like or sympy.Matrix # Convert Python lists to np array if isinstance(matr, list): matr = np.array(matr, dtype=float) elif isinstance(matr, sympy.Matrix): # Convert sympy Matrix to np array for faster numeric ops matr = np.array(matr.tolist(), dtype=float) # Ensure matr is now a NumPy 2D array if not isinstance(matr, np.ndarray): raise TypeError( f"Expected a BivCheckPi, list, np.ndarray, or sympy.Matrix. Got: {type(matr)}" ) if matr.ndim != 2: raise ValueError(f"Expected a 2D matrix, got {matr.ndim}D array.") n_rows, n_cols = matr.shape matr_sum = matr.sum() if matr_sum == 0: raise ValueError("Input matrix has sum zero; cannot rearrange.") # ------------------------------------------------------ # 2. Scale the matrix (Condition 3.2 in Strothmann et al.) # => multiply by n_rows / matr_sum # ------------------------------------------------------ matr_scaled = (n_rows / matr_sum) * matr # shape: (n_rows, n_cols) # ------------------------------------------------------ # 3. Step 1 of the algorithm: # Build partial sums for each row => matrix B # B[k, i] = sum_{j=0..i} matr_scaled[k, j] # # We'll add a left "zero" column to B => shape is (n_rows, n_cols+1) # B[:, 1:] = row-wise cumsum of matr_scaled # B[:, 0] = 0 # ------------------------------------------------------ partial_sums = np.cumsum(matr_scaled, axis=1) # shape (n_rows, n_cols) B = np.zeros((n_rows, n_cols + 1), dtype=float) B[:, 1:] = partial_sums # ------------------------------------------------------ # 4. Step 2: Sort each column of B in descending order => B_tilde # ------------------------------------------------------ B_tilde = np.zeros_like(B) for col_idx in range(n_cols + 1): col_vals = B[:, col_idx] sorted_col = np.sort(col_vals)[::-1] # descending B_tilde[:, col_idx] = sorted_col # ------------------------------------------------------ # 5. Step 3: Compute differences between adjacent columns # => a_arrow = B_tilde[:, 1:] - B_tilde[:, :-1] # ------------------------------------------------------ a_arrow = B_tilde[:, 1:] - B_tilde[:, :-1] # shape (n_rows, n_cols) # ------------------------------------------------------ # 6. Normalize by (n_rows * n_cols) # ------------------------------------------------------ rearranged_np = a_arrow / (n_rows * n_cols) # ------------------------------------------------------ # 7. Convert to a Sympy Matrix for final return # ------------------------------------------------------ rearranged_sp = sympy.Matrix(rearranged_np) log.debug("Rearrangement complete.") return rearranged_sp
[docs] @staticmethod def verify_cis_property(matrix: Union[np.ndarray, Any]) -> bool: """ Verify that a matrix has the conditionally increasing in sequence property. Args: matrix: The matrix to check Returns: bool: True if the matrix has the CIS property, False otherwise """ # Convert sympy matrix to numpy array for easier processing if hasattr(matrix, "tolist") and not isinstance(matrix, np.ndarray): matrix_np = np.array(matrix.tolist(), dtype=float) else: matrix_np = matrix n_rows, n_cols = matrix_np.shape # Compute cumulative sums for each row cum_sums = np.zeros((n_rows, n_cols + 1)) for k in range(n_rows): for i in range(n_cols): cum_sums[k, i + 1] = cum_sums[k, i] + matrix_np[k, i] # Check if each column is in decreasing order for i in range(cum_sums.shape[1]): col = cum_sums[:, i] if not all(col[j] >= col[j + 1] for j in range(len(col) - 1)): return False return True
[docs] def apply_cis_rearrangement(copula: Any, grid_size: Optional[int] = None) -> BivCheckPi: """ Apply CIS rearrangement to a copula and return as a BivCheckPi object. This convenience function rearranges a copula and returns it as a BivCheckPi object for easy use in further computations. Args: copula: The copula to rearrange grid_size: Size of the checkerboard grid (optional) Returns: BivCheckPi: A checkerboard copula with the CIS property """ rearranger = CISRearranger(grid_size) rearranged_matrix = rearranger.rearrange_copula(copula) # Convert sympy matrix to numpy array if hasattr(rearranged_matrix, "tolist") and not isinstance( rearranged_matrix, np.ndarray ): rearranged_np = np.array(rearranged_matrix.tolist(), dtype=float) else: rearranged_np = rearranged_matrix # Create BivCheckPi from rearranged matrix rearranged_copula = BivCheckPi(rearranged_np) return rearranged_copula