Source code for heavyedge_distance.api.distmat

import os

import numpy as np
from heavyedge.wasserstein import quantile

from heavyedge_distance.dfd import dfd
from heavyedge_distance.wasserstein import wdist

__all__ = [
    "distmat_euclidean",
    "distmat_wasserstein",
    "distmat_frechet",
]


def _distmat(
    converter,
    distfunc,
    f1,
    f2=None,
    batch_size=None,
    n_jobs=None,
    logger=lambda x: None,
):
    if n_jobs is not None:
        pass
    else:
        n_jobs = os.environ.get("HEAVYEDGE_MAX_WORKERS")
        if n_jobs is not None:
            n_jobs = int(n_jobs)
        else:
            n_jobs = 1

    x1 = f1.x()
    if f2 is None:
        x2 = x1
    else:
        x2 = f2.x()

    if f2 is not None:
        if batch_size is None:
            fs1, Ls1, _ = f1[:]
            fs2, Ls2, _ = f2[:]
            val1 = converter(x1, fs1, Ls1)
            val2 = converter(x2, fs2, Ls2)
            D = distfunc(val1, val2, n_jobs)
            logger("1/1")
        else:
            N1, N2 = len(f1), len(f2) if f2 is not None else len(f1)
            num_batches_1 = (N1 // batch_size) + int(bool(N1 % batch_size))
            num_batches_2 = (N2 // batch_size) + int(bool(N2 % batch_size))

            D = np.empty((N1, N2), dtype=np.float64)

            for i in range(num_batches_1):
                fs1, Ls1, _ = f1[i * batch_size : (i + 1) * batch_size]
                val1 = converter(x1, fs1, Ls1)
                for j in range(num_batches_2):
                    fs2, Ls2, _ = f2[j * batch_size : (j + 1) * batch_size]
                    val2 = converter(x2, fs2, Ls2)
                    D[
                        i * batch_size : (i + 1) * batch_size,
                        j * batch_size : (j + 1) * batch_size,
                    ] = distfunc(val1, val2, n_jobs)
                    logger(
                        f"{i * num_batches_2 + j + 1}/{num_batches_1 * num_batches_2}"
                    )
    else:
        if batch_size is None:
            fs1, Ls1, _ = f1[:]
            val1 = converter(x1, fs1, Ls1)
            D = distfunc(val1, None, n_jobs)
            logger("1/1")
        else:
            N = len(f1)
            num_batches = (N // batch_size) + int(bool(N % batch_size))

            D = np.empty((N, N), dtype=np.float64)

            for i in range(num_batches):
                fs1, Ls1, _ = f1[i * batch_size : (i + 1) * batch_size]
                val1 = converter(x1, fs1, Ls1)
                # diagonal
                D[
                    i * batch_size : (i + 1) * batch_size,
                    i * batch_size : (i + 1) * batch_size,
                ] = distfunc(val1, None, n_jobs)
                # off-diagonal
                for j in range(i + 1, num_batches):
                    fs2, Ls2, _ = f1[j * batch_size : (j + 1) * batch_size]
                    val2 = converter(x2, fs2, Ls2)
                    dist = distfunc(val1, val2, n_jobs)
                    D[
                        i * batch_size : (i + 1) * batch_size,
                        j * batch_size : (j + 1) * batch_size,
                    ] = dist
                    D[
                        j * batch_size : (j + 1) * batch_size,
                        i * batch_size : (i + 1) * batch_size,
                    ] = dist.T
                    logger(f"{i * num_batches + j + 1}/{num_batches ** 2}")
    return D


def _euclidean_converter(x, fs, Ls):
    return (x, fs)


def _euclidean_distfunc(val1, val2, n_jobs):
    x, fs1 = val1
    if val2 is None:
        fs2 = fs1
    else:
        _, fs2 = val2
    # TODO: implement more efficient algorithm for fs2==fs1
    diff = fs1[:, np.newaxis, :] - fs2[np.newaxis, :, :]
    D = np.sqrt(np.trapezoid(diff**2, x, axis=2))
    return D


[docs] def distmat_euclidean(f1, f2=None, batch_size=None, logger=lambda x: None): """L2 distance matrix between profiles. Parameters ---------- f1 : heavyedge.ProfileData Open h5 file. f2 : heavyedge.ProfileData, optional Open h5 file. If not passed, it is set to *f1*. batch_size : int, optional Batch size to load data. If not passed, all data are loaded at once. logger : callable, optional Logger function which accepts a progress message string. Returns ------- (N1, N2) array Euclidean distance matrix. Notes ----- ``distmat_euclidean(f1)`` is faster than ``distmat_euclidean(f1, f1)``. Examples -------- >>> from heavyedge import ProfileData >>> from heavyedge_distance import get_sample_path >>> from heavyedge_distance.api import distmat_euclidean >>> with ProfileData(get_sample_path("MeanProfiles-AreaScaled.h5")) as data: ... D = distmat_euclidean(data) """ converter = _euclidean_converter distfunc = _euclidean_distfunc return _distmat(converter, distfunc, f1, f2, batch_size, logger)
def _wasserstein_converter(t): def converter(x, fs, Ls): Qs = quantile(x, fs, Ls, t) return Qs return converter def _wasserstein_distfunc(t): def distfunc(Qs1, Qs2, n_jobs): D = wdist(t, Qs1, Qs2) return D return distfunc
[docs] def distmat_wasserstein(t, f1, f2=None, batch_size=None, logger=lambda x: None): """Wasserstein distance matrix between area-scaled profiles. .. warning:: This function assumes that the profiles in *f1* and *f2* are area-scaled and heights outside the support are zero. Parameters ---------- t : (M,) ndarray Coordinates of grids over which the quantile functions will be measured. Must be strictly increasing from 0 to 1. f1 : heavyedge.ProfileData Open h5 file of area-scaled profiles. f2 : heavyedge.ProfileData, optional Open h5 file of area-scaled profiles. If not passed, it is set to *f1*. batch_size : int, optional Batch size to load data. If not passed, all data are loaded at once. logger : callable, optional Logger function which accepts a progress message string. Returns ------- (N1, N2) array Wasserstein distance matrix. Notes ----- ``distmat_wasserstein(f1)`` is faster than ``distmat_wasserstein(f1, f1)``. Examples -------- >>> import numpy as np >>> from heavyedge import ProfileData >>> from heavyedge_distance import get_sample_path >>> from heavyedge_distance.api import distmat_wasserstein >>> with ProfileData(get_sample_path("MeanProfiles-AreaScaled.h5")) as data: ... D = distmat_wasserstein(np.linspace(0, 1, 100), data) """ converter = _wasserstein_converter(t) distfunc = _wasserstein_distfunc(t) return _distmat(converter, distfunc, f1, f2, batch_size, logger)
def _dfd_converter(x, fs, Ls): return (fs, Ls) def _dfd_distfunc(val1, val2, n_jobs): return dfd(val1, val2, n_jobs)
[docs] def distmat_frechet(f1, f2=None, batch_size=None, n_jobs=None, logger=lambda x: None): """1-D discrete Fréchet distance matrix between profiles. Parameters ---------- f1 : heavyedge.ProfileData Open h5 file. f2 : heavyedge.ProfileData, optional Open h5 file. If not passed, it is set to *f1*. batch_size : int, optional Batch size to load data. If not passed, all data are loaded at once. n_jobs : int, optional Number of parallel workers. If not passed, `HEAVYEDGE_MAX_WORKERS` environment variable is used. If the environment variable is invalid, set to 1. logger : callable, optional Logger function which accepts a progress message string. Returns ------- (N1, N2) array Discrete Fréchet distance matrix. Notes ----- ``distmat_frechet(f1)`` is faster than ``distmat_frechet(f1, f1)``. Examples -------- >>> from heavyedge import ProfileData >>> from heavyedge_distance import get_sample_path >>> from heavyedge_distance.api import distmat_frechet >>> with ProfileData(get_sample_path("MeanProfiles-PlateauScaled.h5")) as data: ... D = distmat_frechet(data) """ return _distmat(_dfd_converter, _dfd_distfunc, f1, f2, batch_size, n_jobs, logger)