#!/usr/bin/env python3
""" Basic transform operations module. """
import dask.array as da
import numpy as np
from scipy import stats
try:
import cupy as cp
except ImportError: # pragma: no cover
pass
from dasf.ml.inference.loader.base import BaseLoader
from dasf.transforms.base import Fit, ReductionTransform, Transform
from dasf.utils.types import is_array, is_dataframe
[docs]
class Reshape(Fit):
"""Get a slice of a cube. An inline slice is a section over the x-axis.
Parameters
----------
iline_index : int
The index of the inline to get.
"""
def __init__(self, shape: tuple = None):
self.shape = shape
[docs]
def fit(self, X, y=None):
if self.shape:
cube_shape = self.shape
elif y is not None and hasattr(y, "shape"):
cube_shape = y.shape
else:
raise Exception("Missing shape input.")
if is_array(X):
slice_array = X
elif is_dataframe(X):
slice_array = X.values
else:
raise ValueError("X is not a known datatype.")
return slice_array.reshape(cube_shape)
[docs]
class SliceArray(Transform):
def __init__(self, output_size):
self.x = list(output_size)
[docs]
class SliceArrayByPercent(Transform):
def __init__(self, x=100.0, y=100.0, z=100.0):
self.x = float(x / 100.0)
self.y = float(y / 100.0)
self.z = float(z / 100.0)
[docs]
class SliceArrayByPercentile(Transform):
def __init__(self, percentile):
self.p = percentile
[docs]
def _internal_chunk_array_positive(self, block, axis=None, keepdims=False, xp=np):
block[block < 0] = 0
block[block != 0]
return xp.array([xp.percentile(block.flatten(), self.p)])
[docs]
def _internal_aggregate_array_positive(self, block, axis=None, keepdims=False, xp=np):
block = xp.array(block)
return xp.array([xp.max(block)])
[docs]
def _internal_chunk_array_negative(self, block, axis=None, keepdims=False, xp=np):
block *= -1
block[block < 0] = 0
block[block != 0]
return xp.array([-xp.percentile(block.flatten(), self.p)])
[docs]
def _internal_aggregate_array_negative(self, block, axis=None, keepdims=False, xp=np):
block = xp.array(block)
return xp.array([xp.min(block)])
[docs]
class ApplyPatchesBase(Transform):
"""
Base Class for ApplyPatches Functionalities
"""
def __init__(self, function, weight_function, input_size, overlap, offsets):
"""
function: function to be applied to each patch, can be eiter a Python Function or a ModelLoader
weight_function: weight attribution function, must receive a shape and produce a NDArray with the respective weights for each array position
input_size: size of input to the function to be applied,
overlap: dictionary containing overlapping/padding configurations to use with np.pad or dask.overlap.overlap. Its important that for the base patch set the whole "chunk core" is covered by the patches.
offsets: list of offsets for overlapping patches extraction
"""
self._function = function
self._weight_function = weight_function
self._input_size = input_size
self._offsets = offsets if offsets is not None else []
overlap = overlap if overlap is not None else {}
self._overlap_config = {
"padding": overlap.get("padding", tuple(len(input_size) * [0])),
"boundary": overlap.get("boundary", 0),
}
[docs]
def _apply_patches(self, patch_set):
"""
Applies function to each patch in a patch set
"""
if callable(self._function):
return np.array(list(map(self._function, patch_set)))
if isinstance(self._function, BaseLoader):
return self._function.predict(patch_set)
raise NotImplementedError("Requested Apply Method not supported")
[docs]
def _reconstruct_patches(self, patches, index, weights, inner_dim=None):
"""
Rearranges patches to reconstruct area of interest from patches and weights
"""
reconstruct_shape = np.array(self._input_size) * np.array(index)
if weights:
weight = np.zeros(reconstruct_shape)
base_weight = (
self._weight_function(self._input_size)
if self._weight_function
else np.ones(self._input_size)
)
else:
weight = None
if inner_dim is not None:
reconstruct_shape = np.append(reconstruct_shape, inner_dim)
reconstruct = np.zeros(reconstruct_shape)
for patch_index, patch in zip(np.ndindex(index), patches):
sl = [
slice(idx * patch_len, (idx + 1) * patch_len, None)
for idx, patch_len in zip(patch_index, self._input_size)
]
if weights:
weight[tuple(sl)] = base_weight
if inner_dim is not None:
sl.append(slice(None, None, None))
reconstruct[tuple(sl)] = patch
return reconstruct, weight
[docs]
def _adjust_patches(self, arrays, ref_shape, offset, pad_value=0):
"""
Pads reconstructed_patches with 0s to have same shape as the reference shape from the base patch set
"""
pad_width = []
sl = []
ref_shape = list(ref_shape)
arr_shape = list(arrays[0].shape)
if len(offset) < len(ref_shape):
ref_shape = ref_shape[:-1]
arr_shape = arr_shape[:-1]
for idx, lenght, ref in zip(offset, arr_shape, ref_shape):
if idx > 0:
sl.append(slice(0, min(lenght, ref), None))
pad_width.append((idx, max(ref - lenght - idx, 0)))
else:
sl.append(slice(np.abs(idx), min(lenght, ref - idx), None))
pad_width.append((0, max(ref - lenght - idx, 0)))
adjusted = [
np.pad(
arr[tuple([*sl, slice(None, None, None)])],
pad_width=[*pad_width, (0, 0)],
mode="constant",
constant_values=pad_value,
)
if len(offset) < len(arr.shape)
else np.pad(
arr[tuple(sl)],
pad_width=pad_width,
mode="constant",
constant_values=pad_value,
)
for arr in arrays
]
return adjusted
[docs]
def _combine_patches(self, results, offsets, indexes):
"""
How results are combined is dependent on what is being combined.
ApplyPatchesWeightedAvg uses Weighted Average
ApplyPatchesVoting uses Voting (hard or soft)
"""
raise NotImplementedError("Combine patches method must be implemented")
[docs]
def _operation(self, chunk):
"""
Operation to be performed on each chunk
"""
offsets = list(self._offsets)
base = self._overlap_config["padding"]
offsets.insert(0, tuple([0] * len(base)))
slices = [
tuple([slice(i + base, None) for i, base in zip(offset, base)])
for offset in offsets
]
results = []
indexes = []
for sl in slices:
patch_set, patch_idx = self._extract_patches(chunk[sl], self._input_size)
results.append(self._apply_patches(patch_set))
indexes.append(patch_idx)
output_slice = tuple(
[slice(0, lenght - 2 * pad) for lenght, pad in zip(chunk.shape, base)]
)
return self._combine_patches(results, offsets, indexes)[output_slice]
[docs]
class ApplyPatchesWeightedAvg(ApplyPatchesBase):
"""
ApplyPatches with Weighted Average combination function.
"""
[docs]
def _combine_patches(self, results, offsets, indexes):
reconstructed = []
weights = []
for patches, offset, shape in zip(results, offsets, indexes):
reconstruct, weight = self._reconstruct_patches(
patches, shape, weights=True
)
if len(reconstructed) > 0:
adjusted = self._adjust_patches(
[reconstruct, weight], reconstructed[0].shape, offset
)
reconstruct = adjusted[0]
weight = adjusted[1]
reconstructed.append(reconstruct)
weights.append(weight)
reconstructed = np.stack(reconstructed, axis=0)
weights = np.stack(weights, axis=0)
return np.sum(reconstructed * weights, axis=0) / np.sum(weights, axis=0)
[docs]
class ApplyPatchesVoting(ApplyPatchesBase):
"""
ApplyPatches with Voting combination function.
"""
def __init__(
self,
function,
weight_function,
input_size,
overlap,
offsets,
voting,
num_classes,
):
"""
function: function to be applied to each patch, can be eiter a Python Function or a ModelLoader
weight_function: weight attribution function, must receive a shape and produce a NDArray with the respective weights for each array position
input_size: size of input to the function to be applied,
overlap: dictionary containing overlapping/padding configurations to use with np.pad or dask.overlap.overlap. Its important that for the base patch set the whole "chunk core" is covered by the patches.
offsets: list of offsets for overlapping patches extraction
voting: voting method. "hard" or "soft"
num_classes: number of classes possible
"""
super().__init__(function, weight_function, input_size, overlap, offsets)
self._voting = voting # Types: Hard Voting, Soft Voting
self._num_classes = num_classes
[docs]
def _combine_patches(self, results, offsets, indexes):
if self._voting == "hard":
result = self._hard_voting(results, offsets, indexes)
elif self._voting == "soft":
result = self._soft_voting(results, offsets, indexes)
else:
raise ValueError("Invalid Voting Type. Should be either soft or hard.")
return result
[docs]
def _hard_voting(self, results, offsets, indexes):
"""
Hard voting combination function
"""
reconstructed = []
for patches, offset, shape in zip(results, offsets, indexes):
reconstruct, _ = self._reconstruct_patches(
patches, shape, weights=False, inner_dim=self._num_classes
)
reconstruct = np.argmax(reconstruct, axis=-1).astype(np.float32)
if len(reconstructed) > 0:
adjusted = self._adjust_patches(
[reconstruct], reconstructed[0].shape, offset, pad_value=np.nan
)
reconstruct = adjusted[0]
reconstructed.append(reconstruct)
reconstructed = np.stack(reconstructed, axis=0)
ret = stats.mode(reconstructed, axis=0, nan_policy="omit", keepdims=False)[0]
return ret
[docs]
def _soft_voting(self, results, offsets, indexes):
"""
Soft voting combination function
"""
reconstructed = []
for patches, offset, shape in zip(results, offsets, indexes):
reconstruct, _ = self._reconstruct_patches(
patches, shape, weights=False, inner_dim=self._num_classes
)
if len(reconstructed) > 0:
adjusted = self._adjust_patches(
[reconstruct], reconstructed[0].shape, offset
)
reconstruct = adjusted[0]
reconstructed.append(reconstruct)
reconstructed = np.stack(reconstructed, axis=0)
return np.argmax(np.sum(reconstructed, axis=0), axis=-1)