Source code for copul.family.rank_correlation_plotter

#!/usr/bin/env python3
"""
Minimal, extensible rank-correlation plotting for copulas.

How to add a new measure:
-------------------------
1) Define a function with signature: f(x, y, rank_x, rank_y) -> float
2) Decorate it with @measure("Nice Name")
   (that's all; it will show up automatically in compute/plot/export)

Built-ins: xi, rho, tau, footrule, gini_gamma, blomqvists_beta,
           schweizer_wolff_sigma, hoeffdings_d
"""

from __future__ import annotations

import logging
import pathlib
import pickle
from dataclasses import dataclass
from typing import Any, Callable, Dict, Iterable, List, Optional, Tuple

import numpy as np
import scipy.stats as st
from matplotlib import pyplot as plt
from scipy.interpolate import CubicSpline
import sympy as sp

# If you have it in your project:
from copul.chatterjee import xi_ncalculate

log = logging.getLogger(__name__)

# ------------------------ Measure Registry ------------------------

MeasureFn = Callable[[np.ndarray, np.ndarray, np.ndarray, np.ndarray], float]
MEASURES: Dict[str, MeasureFn] = {}


[docs] def measure(name: str) -> Callable[[MeasureFn], MeasureFn]: """Decorator to register a rank-correlation-like measure.""" def _wrap(fn: MeasureFn) -> MeasureFn: MEASURES[name] = fn return fn return _wrap
# ------------------------ Built-in measures ------------------------
[docs] @measure("chatterjees_xi") def m_xi(x, y, rx, ry) -> float: return float(xi_ncalculate(x, y))
[docs] @measure("spearmans_rho") def m_rho(x, y, rx, ry) -> float: # Pearson correlation of ranks = Spearman's rho r, _ = st.pearsonr(rx, ry) return float(r)
[docs] @measure("kendalls_tau") def m_tau(x, y, rx, ry) -> float: t, _ = st.kendalltau(x, y) return float(t)
[docs] @measure("spearmans_footrule") def m_footrule(x, y, rx, ry) -> float: n = len(rx) d = np.abs(rx - ry).sum() return 1.0 + 3.0 / n - (3.0 / (n * n)) * d
[docs] @measure("gini_gamma") def m_gini_gamma(x, y, rx, ry) -> float: n = len(rx) u, v = rx / n, ry / n integral_1 = np.mean(1 - np.maximum(u, v)) integral_2 = np.mean(np.maximum(0.0, 1.0 - u - v)) return 4.0 * (integral_1 + integral_2) - 2.0
[docs] @measure("blomqvists_beta") def m_blomqvists_beta(x, y, rx, ry) -> float: n = len(x) med_x, med_y = np.median(x), np.median(y) agree = np.sum(((x <= med_x) & (y <= med_y)) | ((x > med_x) & (y > med_y))) return 2.0 * agree / n - 1.0
[docs] @measure("schweizer_wolff_sigma") def m_schweizer_wolff(x, y, rx, ry) -> float: """ Schweizer–Wolff sigma via the empirical copula. σ̂ = 12 * mean(|Ĉ_n(u_i, v_i) - u_i * v_i|) where u_i = R_i/n, v_i = S_i/n and Ĉ_n is the empirical copula. The O(n²) exact sum is replaced by a fast rank-based estimator using the same identity that yields Spearman's rho. """ n = len(rx) u = rx / (n + 1) # pseudo-observations (Hazen plotting position) v = ry / (n + 1) # Empirical copula value at each pseudo-observation: C_n(u_i,v_i) # = (1/n) * #{j : U_j <= u_i AND V_j <= v_i} # Direct O(n^2) is too slow for large n; use rank-based approach. # For moderate n, the midpoint-grid approximation is accurate: n_grid = min(200, n) h = 1.0 / n_grid mid = np.linspace(h / 2, 1 - h / 2, n_grid) uu, vv = np.meshgrid(mid, mid, indexing="ij") # Empirical CDF on the grid C_emp = np.empty_like(uu) for i in range(n_grid): for j in range(n_grid): C_emp[i, j] = np.mean((u <= mid[i]) & (v <= mid[j])) Pi = uu * vv return float(12 * np.mean(np.abs(C_emp - Pi)))
[docs] @measure("hoeffdings_d") def m_hoeffdings_d(x, y, rx, ry) -> float: """ Hoeffding's D (dependence index) via empirical copula grid. D̂ = 90 * mean((Ĉ_n(u,v) - u*v)^2) """ n = len(rx) u = rx / (n + 1) v = ry / (n + 1) n_grid = min(200, n) h = 1.0 / n_grid mid = np.linspace(h / 2, 1 - h / 2, n_grid) uu, vv = np.meshgrid(mid, mid, indexing="ij") C_emp = np.empty_like(uu) for i in range(n_grid): for j in range(n_grid): C_emp[i, j] = np.mean((u <= mid[i]) & (v <= mid[j])) Pi = uu * vv return float(90 * np.mean((C_emp - Pi) ** 2))
[docs] @measure("blests_nu") def m_nu(x, y, rx, ry) -> float: """ Blest's rank correlation ν via the empirical copula plug-in. Using C_n(u,v) = (1/n) Σ 1{R_j/n ≤ u, S_j/n ≤ v} with ranks R,S∈{1,…,n}, we get ∫∫ (1-u) C_n(u,v) du dv = (1/n) Σ [ ∫_{u=R_j/n}^1 (1-u) du ] [ ∫_{v=S_j/n}^1 dv ] = (1/n) Σ [ (1 - R_j/n)^2 / 2 ] [ 1 - S_j/n ]. Hence the estimator: ν̂ = 24 * ( (1/n) Σ ((1 - R_j/n)^2 / 2) * (1 - S_j/n) ) - 2. """ n = len(rx) u = rx / n # rx,ry are 1..n (from scipy.stats.rankdata) v = ry / n II = np.mean(((1.0 - u) ** 2) * (1.0 - v) / 2.0) return float(24.0 * II - 2.0)
# ------------------------ Data container ------------------------
[docs] @dataclass class CorrelationData: params: np.ndarray values: Dict[str, np.ndarray] # measure_name -> values (aligned with params)
# ------------------------ Runner ------------------------
[docs] class RankCorrelationPlotter: """ Minimal runner: - param grid - sample once per param - compute registered measures - plot & export """ def __init__( self, copula: Any, *, measures: Optional[Iterable[str]] = None, images_dir: pathlib.Path | str = "images", save_pickles: bool = True, ): self.copula = copula self.measures = list(measures) if measures else list(MEASURES) self.images_dir = pathlib.Path(images_dir) self.images_dir.mkdir(parents=True, exist_ok=True) self._data_dir = self.images_dir / "data" self._data_dir.mkdir(parents=True, exist_ok=True) self._splines_dir = self.images_dir / "splines" self._splines_dir.mkdir(parents=True, exist_ok=True) self.save_pickles = save_pickles
[docs] def param_grid( self, n_params: int = 20, *, log_cut_off: Optional[Tuple[float, float]] = None, xlim: Optional[Tuple[float, float]] = None, ) -> np.ndarray: """ Build a grid over the primary parameter interval in the copula. If log_cut_off is provided, use a log grid *shifted by the left boundary (inf)*, matching the behavior of the previous implementation. """ # Prefer a family-provided helper if it exists if hasattr(self.copula, "get_params"): return self.copula.get_params(n_params, log_scale=bool(log_cut_off)) # Fallback: use interval of the first parameter p = str(self.copula.params[0]) interval = self.copula.intervals[p] # Support FiniteSet (discrete parameter sets) if isinstance(interval, sp.FiniteSet): return np.array([float(val) for val in interval]) inf = float(interval.inf) sup = float(interval.sup) # Respect open bounds left = inf + (1e-2 if getattr(interval, "left_open", False) else 0.0) right = sup - (1e-2 if getattr(interval, "right_open", False) else 0.0) if log_cut_off is not None: # log grid shifted by inf: inf + 10^a ... inf + 10^b (or symmetric if scalar) if isinstance(log_cut_off, tuple): lo, hi = log_cut_off return np.logspace(lo, hi, n_params) + inf else: s = float(log_cut_off) return np.logspace(-s, s, n_params) + inf # Linear grid (with optional numeric crop) if xlim is not None: left = max(left, xlim[0]) right = min(right, xlim[1]) return np.linspace(left, right, n_params)
# ---- compute ----
[docs] def compute( self, params: np.ndarray, n_obs: int = 1_000_000, *, approximate: bool = False, ) -> CorrelationData: """ For each parameter: - sample (x,y) - compute ranks once - evaluate each registered measure """ vals: Dict[str, List[float]] = {m: [] for m in self.measures} pname = str(self.copula.params[0]) for theta in params: try: c = self.copula(**{pname: theta}) data = c.rvs(n_obs, approximate=approximate) x, y = data[:, 0], data[:, 1] # ranks once rx = st.rankdata(x) ry = st.rankdata(y) # measures for m in self.measures: fn = MEASURES[m] vals[m].append(fn(x, y, rx, ry)) except Exception as e: log.warning("Param %g failed: %s", theta, e) for m in self.measures: vals[m].append(np.nan) return CorrelationData( params=params, values={k: np.array(v) for k, v in vals.items()} )
[docs] def plot( self, data: "CorrelationData", *, title: Optional[str] = None, ylim: Tuple[float, float] = (-1, 1), log_x: bool = False, log_cut_off: Optional[Tuple[float, float]] = None, # <— NEW ) -> Dict[str, CubicSpline]: """ Scatter + CubicSpline for each measure. If log_x=True, we plot against x - inf, set log scale, and clamp the axis to the exact range implied by log_cut_off (if provided). """ if title is None: from copul.family.copula_graphs import CopulaGraphs title = CopulaGraphs(self.copula, False).get_copula_title() # interval and left boundary (inf) of the first parameter p = str(self.copula.params[0]) interval = self.copula.intervals[p] inf = float(getattr(interval, "inf", 0.0)) splines: Dict[str, CubicSpline] = {} x = data.params for m, y in data.values.items(): mask = ~np.isnan(y) if mask.sum() < 2: continue x_masked = x[mask] y_masked = y[mask] # Fit spline in ORIGINAL x-domain (not shifted) cs = CubicSpline(x_masked, y_masked) xs_dense = np.linspace(x_masked.min(), x_masked.max(), 600) ys_dense = cs(xs_dense) if log_x: x_plot_pts = x_masked - inf xs_plot = xs_dense - inf plt.scatter(x_plot_pts, y_masked, s=16, label=m) plt.plot(xs_plot, ys_dense) else: plt.scatter(x_masked, y_masked, s=16, label=m) plt.plot(xs_dense, ys_dense) splines[m] = cs plt.title(title) plt.xlabel(f"${{{self.copula.params[0]}}}$") plt.ylabel("Correlation") plt.ylim(*ylim) if log_x: self._format_log_x_axis( inf, x, log_cut_off ) # <— pass both x and the cutoffs ax = plt.gca() # after drawing the curves and before saving/showing: if log_x: # ensure the data were plotted against (x - inf) on a log axis self._set_log_ticks(ax, inf, log_cut_off) plt.grid(True) plt.legend() out = self.images_dir / f"{title}_rank_correlations.png" plt.savefig(out, dpi=150, bbox_inches="tight") plt.show() return splines
def _format_log_x_axis( self, inf: float, x_original: np.ndarray, log_cut_off: Optional[Tuple[float, float]] = None, ) -> None: """ Apply log x-scale to the *shifted* axis (x - inf). If log_cut_off=(a,b) is given, clamp to [10^a, 10^b] exactly. Format ticks as 'inf + 10^{k}' when inf != 0. """ plt.xscale("log") # Set limits precisely from log_cut_off if provided if log_cut_off is not None: a, b = ( log_cut_off if isinstance(log_cut_off, tuple) else (-log_cut_off, log_cut_off) ) xmin, xmax = 10.0**a, 10.0**b plt.xlim(xmin, xmax) else: # fallback to data-driven limits in shifted domain x_plot = x_original - inf x_plot = x_plot[np.isfinite(x_plot) & (x_plot > 0)] if x_plot.size: plt.xlim(x_plot.min(), x_plot.max()) @staticmethod def _set_log_ticks( ax, inf: float, log_cut_off: Optional[Tuple[float, float]] = None ): """ On a shifted-log x-axis (x' = x - inf), put ticks at 10^k and label them as 'inf + 10^{k}'. This never creates ticks outside the visible range. """ ax.set_xscale("log") # 1) Determine visible range on the shifted axis if log_cut_off is not None: a, b = ( log_cut_off if isinstance(log_cut_off, tuple) else (-log_cut_off, log_cut_off) ) xmin, xmax = 10.0**a, 10.0**b ax.set_xlim(xmin, xmax) else: xmin, xmax = ax.get_xlim() # 2) Choose integer exponents fully inside [xmin, xmax] k_lo = int(np.ceil(np.log10(max(xmin, np.finfo(float).tiny)))) k_hi = int(np.floor(np.log10(xmax))) if k_lo > k_hi: # degenerate range; just bail with defaults return ticks = 10.0 ** np.arange(k_lo, k_hi + 1) # 3) Set ticks and labels without changing limits if inf == 0.0: labels = [rf"$10^{{{k}}}$" for k in range(k_lo, k_hi + 1)] else: inf_str = f"{int(inf)}" if float(inf).is_integer() else f"{inf:.2f}" labels = [rf"${inf_str} + 10^{{{k}}}$" for k in range(k_lo, k_hi + 1)] ax.set_xticks(ticks) ax.set_xticklabels(labels) # ---- save ----
[docs] def save( self, base_name: str, data: CorrelationData, splines: Dict[str, CubicSpline] ) -> None: if not self.save_pickles: return try: with open(self._data_dir / f"{base_name}_data.pkl", "wb") as f: pickle.dump(data, f) with open(self._splines_dir / f"{base_name}_splines.pkl", "wb") as f: pickle.dump(splines, f) except Exception as e: log.warning("Failed to save pickles: %s", e)
# ------------------------ Convenience API ------------------------
[docs] def run_plot( copula: Any, *, measures: Optional[Iterable[str]] = None, n_obs: int = 10_000, n_params: int = 20, log_cut_off: Optional[Tuple[float, float]] = None, xlim: Optional[Tuple[float, float]] = None, ylim: Tuple[float, float] = (-1, 1), approximate: bool = False, images_dir: str | pathlib.Path = "images", save_pickles: bool = True, ): runner = RankCorrelationPlotter( copula, measures=measures, images_dir=images_dir, save_pickles=save_pickles ) params = runner.param_grid( n_params=n_params, log_cut_off=log_cut_off, xlim=xlim, ) data = runner.compute(params, n_obs=n_obs, approximate=approximate) splines = runner.plot( data, title=None, ylim=ylim, log_x=bool(log_cut_off), log_cut_off=log_cut_off, # <— add this ) # Title again (to name files consistently) from copul.family.copula_graphs import CopulaGraphs base = CopulaGraphs(copula, False).get_copula_name() runner.save(base, data, splines)
# ------------------------ Example ------------------------ if __name__ == "__main__": import copul from pathlib import Path # Families we want to run main_families = ["NELSEN1", "FRANK", "GUMBEL_HOUGAARD", "JOE", "GAUSSIAN"] # main_families = ["JOE"] # Per-family settings (log grids, x-lims, fixed params, etc.) params_dict = { "CLAYTON": {"log_cut_off": (-1.5, 1.5)}, "NELSEN1": {"log_cut_off": (-1.5, 1.5)}, "BIV_CLAYTON": {"log_cut_off": (-1.5, 1.5)}, "NELSEN2": {"log_cut_off": (-1.5, 1.5)}, "FRANK": {"xlim": (-20, 20)}, "JOE": {"log_cut_off": (-1.5, 1.5)}, "NELSEN8": {"log_cut_off": (-2, 3)}, "NELSEN13": {"log_cut_off": (-2, 2)}, "GENEST_GHOUDI": {"log_cut_off": (-2, 1)}, "NELSEN16": {"log_cut_off": (-3.5, 3.5)}, "NELSEN18": {"log_cut_off": (-2, 2)}, "NELSEN21": {"log_cut_off": (-2, 1)}, "BB5": {"params": {"theta": 2}, "log_cut_off": (-1.5, 1.5), "ylim": (0, 1)}, "GALAMBOS": {"log_cut_off": (-1, 1)}, "GUMBEL_HOUGAARD": {"log_cut_off": (-1, 1)}, "HUESLER_REISS": {"log_cut_off": (-1.5, 1.5)}, "JOEEV": {"params": {"alpha_1": 0.9, "alpha_2": 0.9}, "log_cut_off": (-1, 2)}, "TAWN": {"params": {"alpha_1": 0.9, "alpha_2": 0.9}, "log_cut_off": (-2, 2)}, "PLACKETT": {"log_cut_off": (-3, 3)}, "MARSHALL_OLKIN": {"params": {"alpha_2": 0.8}}, } # Measures to show (registered via @measure); include ν ("nu") measures = [ "chatterjees_xi", "blests_nu", "spearmans_rho", "kendalls_tau", "spearmans_footrule", "gini_gamma", "blomqvists_beta", ] images_dir = Path("images") images_dir.mkdir(parents=True, exist_ok=True) for family in main_families: print(f"Plotting rank correlations for {family} copula...") copula_class = copul.family_list.Families.create(family) # Pull per-family settings run_params = dict(params_dict.get(family, {})) # shallow copy # Instantiate with any fixed constructor params, if given ctor_params = run_params.pop("params", None) copula_instance = copula_class(**ctor_params) if ctor_params else copula_class() # Hand remaining plotting/grid kwargs to the new API run_plot( copula=copula_instance, measures=measures, n_obs=2_000_000, n_params=200, approximate=False, images_dir=images_dir, save_pickles=True, **run_params, # e.g. log_cut_off, xlim, ylim )