Source code for coniferest.evaluator

import math

import numpy as np
from scipy.sparse import csr_array

from .calc_trees import calc_apply, calc_feature_delta_sum, calc_paths_sum, selector_dtype  # noqa
from .utils import average_path_length

__all__ = ["ForestEvaluator"]


[docs] class ForestEvaluator: selector_dtype = selector_dtype def __init__(self, samples, selectors, node_offsets, leaf_offsets, *, num_threads, sampletrees_per_batch): """ Base class for the forest evaluators. Does the trivial job: * runs calc_paths_sum written in Rust, * helps to combine several trees' representations into two arrays. Parameters ---------- samples Number of samples in every tree. selectors Array with all the nodes of all the trees. node_offsets Offsets for tree indices for node-sized arrays. For example for two trees of node-length `len1` and `len2` it would be: [0, len1, len1 + len2] leaf_offsets Offsets for tree indices for leaf-sized arrays. num_threads : int or None Number of threads to use for calculations. If None then """ self.samples = samples self.selectors = selectors self.node_offsets = node_offsets self.leaf_offsets = leaf_offsets if num_threads is None or num_threads < 0: # Ask Rust's rayon to use all available threads self.num_threads = 0 else: self.num_threads = num_threads self.sampletrees_per_batch = sampletrees_per_batch @property def batch_size(self): return math.ceil(self.sampletrees_per_batch / self.n_trees)
[docs] @classmethod def combine_selectors(cls, selectors_list): """ Combine several node arrays into one array of nodes and one array of start node_offsets. Parameters ---------- selectors_list List of node arrays to combine. Returns ------- np.ndarray of selectors Node array with all the nodes from all the trees. np.ndarray of int Array of tree offsets for node-arrays. np.ndarray of int Array of tree offsets for leaf-arrays. """ lens = [len(sels) for sels in selectors_list] full_len = sum(lens) selectors = np.empty((full_len,), dtype=cls.selector_dtype) node_offsets = np.zeros((len(selectors_list) + 1,), dtype=np.uintp) node_offsets[1:] = np.add.accumulate(lens) for i in range(len(selectors_list)): selectors[node_offsets[i] : node_offsets[i + 1]] = selectors_list[i] # Assign a unique sequential index to every leaf # The index is used for weighted scores leaf_mask = selectors["feature"] < 0 # Each offset tells how many leafs are in all previous trees leaf_offsets = np.zeros_like(node_offsets) leaf_offsets[1:] = np.cumsum(leaf_mask)[node_offsets[1:] - 1] leaf_count = leaf_offsets[-1] selectors["left"][leaf_mask] = np.arange(0, leaf_count) return selectors, node_offsets, leaf_offsets
@property def n_trees(self): return self.node_offsets.shape[0] - 1 @property def n_leaves(self): return self.leaf_offsets[-1]
[docs] def score_samples(self, x): """ Perform the computations. Parameters ---------- x Features to calculate scores of. Should be C-contiguous for performance. Returns ------- Array of scores. """ if not x.flags["C_CONTIGUOUS"]: x = np.ascontiguousarray(x) return -( 2 ** ( -calc_paths_sum( self.selectors, self.node_offsets, x, num_threads=self.num_threads, batch_size=self.batch_size, ) / (self.average_path_length(self.samples) * self.n_trees) ) )
def _feature_delta_sum(self, x): if not x.flags["C_CONTIGUOUS"]: x = np.ascontiguousarray(x) return calc_feature_delta_sum( self.selectors, self.node_offsets, x, num_threads=self.num_threads, batch_size=self.batch_size, )
[docs] def feature_signature(self, x): delta_sum, hit_count = self._feature_delta_sum(x) return delta_sum / hit_count / self.average_path_length(self.samples)
[docs] def feature_importance(self, x): delta_sum, hit_count = self._feature_delta_sum(x) return np.sum(delta_sum, axis=0) / np.sum(hit_count, axis=0) / self.average_path_length(self.samples)
def _dense_apply(self, x): if not x.flags["C_CONTIGUOUS"]: x = np.ascontiguousarray(x) return calc_apply( self.selectors, self.node_offsets, x, num_threads=self.num_threads, batch_size=self.batch_size, ) def _sparse_apply(self, x): dense = self._dense_apply(x) n_samples, n_estimators = dense.shape n_leaves = self.n_leaves data = np.ones(n_samples * n_estimators, dtype=np.float32) indices = dense.ravel() indptr = np.arange(0, n_samples * n_estimators + 1, n_estimators) return csr_array((data, indices, indptr), shape=(n_samples, n_leaves))
[docs] def apply(self, x, output=None): """ Apply the forest to X, return leaf indices. Parameters ---------- x : ndarray shape (n_samples, n_features) 2-d array with features. output : {"dense", "sparse"}, default="dense" If "dense", returns a dense array of leaf indices per tree. If "sparse", returns a sparse CSR matrix of shape (n_samples, n_leaves) where each row has non-zero entries for leaves reached by the sample. Returns ------- x_leafs : ndarray of shape (n_samples, n_estimators) or csr_matrix of shape (n_samples, n_leaves) For each datapoint x in X and for each tree in the forest, return the index of the leaf x ends up in (dense format). If output="sparse", returns a sparse matrix with 1.0 in entries where sample reaches the leaf. """ if output is None: output = "dense" if output not in ["dense", "sparse"]: raise ValueError("output is neither dense nor sparse") if output == "dense": return self._dense_apply(x) elif output == "sparse": return self._sparse_apply(x)
def _common_leaf_ratio_distance(self, x, y): n_trees = self.n_trees x = np.atleast_2d(x) x_leaves = self._sparse_apply(x) if y is not None: y = np.atleast_2d(y) y_leaves = self._sparse_apply(y) else: y_leaves = x_leaves return 1 - (x_leaves @ y_leaves.T).toarray() / n_trees
[docs] def distance(self, x, y, *, method=None): """ Compute distance matrix between samples based on leaf co-occurrence. The distance is defined as 1 minus the fraction of trees where two samples land in the same leaf. This gives a measure of dissimilarity between samples based on their paths through the forest. Parameters ---------- x : ndarray shape (n_samples_x, n_features) or (n_features,) Input samples. If 1-D, treated as a single sample. y : ndarray shape (n_samples_y, n_features) or (n_features,), optional Second set of samples for pairwise distance. If None (default), computes distances between all pairs in x. method : {"common_leaf_ratio"}, default="common_leaf_ratio" Distance computation method. Currently only "common_leaf_ratio" is supported. Returns ------- distances : ndarray shape (n_samples_x, n_samples_y) Distance matrix where distances[i, j] is the distance between the i-th sample in x and j-th sample in y. If y is None, returns a square symmetric matrix of shape (n_samples_x, n_samples_x). Raises ------ ValueError If method is not one of the known methods. """ KNOWN_METHODS = ["common_leaf_ratio"] if method is None: method = "common_leaf_ratio" if method not in KNOWN_METHODS: raise ValueError(f"method is not one of {', '.join(KNOWN_METHODS)}.") return self._common_leaf_ratio_distance(x, y)
[docs] @classmethod def average_path_length(cls, n_nodes): """ Average path length is abstracted because in different cases we may want to use a bit different formulas to make the exact match with other software. By default we use our own implementation. """ return average_path_length(n_nodes)