Source code for minerva.data.readers.patched_array_reader

from pathlib import Path
from typing import Dict, List, Optional, Tuple, Union
import warnings

import numpy as np
from numpy.typing import ArrayLike

from minerva.data.readers.reader import _Reader
from minerva.utils.typing import PathLike


[docs] class PatchedArrayReader(_Reader): """This class is used to read data from a NumPy array. It is designed to generate patches from the data and provides sequential access to them. This class can serve as a base class for other readers. Assumptions: - The input data is expected to be a NumPy-like array, that is, it should support NumPy-like indexing. - Patches are fixed-size subarrays of the data. - Patches can have overlap between them. """ def __init__( self, data: ArrayLike, data_shape: Tuple[int, ...], stride: Optional[Tuple[int, ...]] = None, pad_width: Optional[Tuple[Tuple[int, int], ...]] = None, pad_mode: str = "constant", pad_kwargs: Optional[Dict] = None, ): """Reads data from a NumPy array and generates patches from it. Parameters ---------- data : ArrayLike The input array from which patches are generated. data_shape : Tuple[int, ...] The shape of the patches to be extracted. This will be the shape of the subarray that is returned when a patch is accessed using __getitem__. stride : Tuple[int, ...], optional The stride between consecutive patches. If `None`, the stide will be the same as `data_shape`. By default None pad_width : Tuple[Tuple[int, int], ...], optional The width of padding to be applied to the data array. By default `None`, that is, no padding is applied. Check the documentation of `numpy.pad` for more information. pad_mode : str, optional The padding mode, by default "constant". Check the documentation of `numpy.pad` for more information. pad_kwargs : dict, optional Additional keyword arguments for padding, by default None Examples -------- ```python >>> import numpy as np >>> # Generate a 10x10 array >>> data = np.arange(100).reshape(10, 10) >>> # Create a reader that generates 5x5 patches with a stride of 2 in the >>> # first dimension and 5 in the second dimension. >>> reader = PatchedArrayReader( >>> data, >>> data_shape=(5, 5), >>> stride=(2, 5), >>> ) >>> # Printing the number of patches that can be extracted from the data >>> print(len(reader)) 6 >>> # Printing the indices of the patches >>> print(reader.indices) [(0, 0), (0, 5), (2, 0), (2, 5), (4, 0), (4, 5)] >>> # Fetch the first patch and print its shape >>> print(reader[0].shape) (5, 5) >>> # Fetch the third patch and print its content >>> print(reader[2]) [[20 21 22 23 24] [30 31 32 33 34] [40 41 42 43 44] [50 51 52 53 54] [60 61 62 63 64]] ``` """ self.data = data self.shape = data.shape self.data_shape = data_shape assert len(self.data.shape) == len( self.data_shape ), "Data shape and Patch shape must have same number of dimensions" self.stride = stride or self.data_shape assert len(self.stride) == len( self.data_shape ), "Stride shape and Patch shape must have same number of dimensions" self.pad_width = pad_width self.pad_mode = pad_mode self.pad_kwargs = pad_kwargs or {} # Should we pad the data? if self.pad_width: assert len(self.pad_width) == len( self.data.shape ), "Padding width and data shape must have same length" self._pad_data(self.pad_width, self.pad_mode, **self.pad_kwargs) self.indices = self._get_patches()
[docs] def _pad_data( self, pad_width: Tuple[Tuple[int, int], ...], mode="constant", **kwargs ): """Apply padding to the data array. Parameters ---------- pad_width : Tuple[Tuple[int, int], ...] The width of padding to be applied to the data array mode : str, optional The padding mode, by default "constant" """ self.data = np.pad(self.data, pad_width=pad_width, mode=mode, **kwargs) self.shape = self.data.shape
[docs] def _get_patches(self) -> List[Tuple[int, ...]]: """Compute the left upper corner indices of the patches that will be extracted from the data array. The patches are extracted with a stride between them. A list of indices is returned, where each index is a tuple of integers representing the coordinates of the left upper corner of the patches. Returns ------- List[Tuple[int, ...]] A list of indices (coordinates) representing the left upper corner of the patches. """ indices = [] # Calculate the maximum index in each dimension max_indices = tuple( (self.data.shape[i] - self.data_shape[i]) // self.stride[i] + 1 for i in range(len(self.data.shape)) ) # Generate indices for left upper corner of patches for index in np.ndindex(*max_indices): corner_index = tuple(index[i] * self.stride[i] for i in range(len(index))) indices.append(corner_index) return indices
[docs] def __len__(self) -> int: """Return the number of patches that can be extracted from the data array. Returns ------- int The number of patches that can be extracted from the data array. """ return len(self.indices)
[docs] def __getitem__(self, idx: int) -> np.ndarray: """Fetch a patch from the data array. Parameters ---------- idx : int The index of the patch to be fetched. Returns ------- np.ndarray The patch that was fetched from the data array with shape `data_shape` """ left_upper_corner = self.indices[idx] slice_obj = tuple( slice(i, i + s) for i, s in zip(left_upper_corner, self.data_shape) ) return self.data[slice_obj]
[docs] def __str__(self) -> str: return f"{self.__class__.__name__}(samples={len(self.indices)}, shape={self.data_shape}, dtype={self.data.dtype})"
[docs] class NumpyArrayReader(PatchedArrayReader): def __init__( self, data: Union[ArrayLike, PathLike], data_shape: Tuple[int, ...], stride: Optional[Tuple[int, ...]] = None, pad_width: Optional[Tuple[Tuple[int, int], ...]] = None, pad_mode: str = "constant", pad_kwargs: Optional[Dict] = None, allow_pickle: bool = True, npz_key: Optional[str] = None, ): if isinstance(data, PathLike): data = Path(data) if not data.is_file(): raise FileNotFoundError(f"File not found: {data}") if data.suffix == ".npy": data = np.load(data, allow_pickle=allow_pickle) elif data.suffix == ".npz": data = np.load(data, allow_pickle=allow_pickle)[npz_key] else: raise ValueError(f"Unsupported file format: {data.suffix}") super().__init__( data=data, data_shape=data_shape, stride=stride, pad_width=pad_width, pad_mode=pad_mode, pad_kwargs=pad_kwargs, )
[docs] class LazyPaddedPatchedArrayReader(PatchedArrayReader): """Reads patches from a NumPy array. This class is a subclass of `PatchedArrayReader` and is designed to perform padding only when the patch consumed by `__get_item__` is in a region that uses the padding (boundary regions). If no padding is necessary, use PatchedArrayReader. """
[docs] def _pad_data( self, pad_width: Tuple[Tuple[int, int], ...], mode="constant", **kwargs ): """Apply padding to the data array. Parameters ---------- pad_width : Tuple[Tuple[int, int], ...] The width of padding to be applied to the data array mode : str, optional The padding mode, by default "constant" """ if mode in [ "maximum", "mean", "median", "minimum", "wrap", ]: # TODO: add support if necessary raise NotImplementedError(f"Pad mode not supported: {mode}") self.shape = tuple(i + p[0] + p[1] for i, p in zip(self.data.shape, pad_width))
[docs] def _get_patches(self) -> List[Tuple[str, Tuple[int, ...]]]: """Compute the left upper corner indices of the patches that will be extracted from the data array. The patches are extracted with a stride between them. A list of indices is returned, where each index is a tuple of integers representing the coordinates of the left upper corner of the patches. Returns ------- List[Tuple[int, ...]] A list of indices (coordinates) representing the left upper corner of the patches. """ indices = [] # Calculate the maximum index in each dimension max_indices = tuple( (self.shape[i] - self.data_shape[i]) // self.stride[i] + 1 for i in range(len(self.shape)) ) pad_loc_opt = [ "n", # no padding necessary "l", # lower padding "u", # upper padding "b", # both lower and upper paddings ] # Generate indices for left upper corner of patches for index in np.ndindex(*max_indices): corner_index = tuple(index[i] * self.stride[i] for i in range(len(index))) if self.pad_width: pad_loc = ["n"] * len(corner_index) for i, (ci, p, d, s) in enumerate( zip(corner_index, self.pad_width, self.data.shape, self.data_shape) ): cur = 0 if ci - p[0] < 0: # lower boundary check cur += 1 if ci - p[0] + s >= d: # upper boundary check cur += 2 pad_loc[i] = pad_loc_opt[cur] pad_loc = "".join(pad_loc) else: pad_loc = "n" indices.append((pad_loc, corner_index)) return indices
[docs] def __getitem__(self, idx: int) -> np.ndarray: """Fetch a patch from the data array. Parameters ---------- idx : int The index of the patch to be fetched. Returns ------- np.ndarray The patch that was fetched from the data array with shape `data_shape` """ pad_loc, padded_left_upper_corner = self.indices[idx] if self.pad_width: data_pad_width = self.pad_width else: warnings.warn("Padding is not being used! Non-LazyPadded class is recommended, e.g., PatchedArrayReader") data_pad_width = [(0, 0)]*len(self.data_shape) original_left_upper_corner = tuple( max(i - p[0], 0) for i, p in zip(padded_left_upper_corner, data_pad_width) ) slice_base = tuple( slice(i, i + s) for i, s in zip(original_left_upper_corner, self.data_shape) ) base_patch = self.data[slice_base] # no padding necessary if not ("l" in pad_loc or "u" in pad_loc or "b" in pad_loc) : item = base_patch # padding cases elif self.pad_mode in [ "constant", "edge", "linear_ramp", "empty", "reflect", "symmetric", ]: pad_width = [] for opt, p in zip(pad_loc, data_pad_width): cur_pad_l = p[0] if opt in ["l", "b"] else 0 cur_pad_u = p[1] if opt in ["u", "b"] else 0 pad_width.append((cur_pad_l, cur_pad_u)) padded_patch = np.pad( base_patch, pad_width=pad_width, mode=self.pad_mode, **self.pad_kwargs ) adjusted_left_upper_corner = tuple( (0 if p[0] == 0 else i) for i, p in zip(padded_left_upper_corner, pad_width) ) slice_obj = tuple( slice(i, i + s) for i, s in zip(adjusted_left_upper_corner, self.data_shape) ) item = padded_patch[slice_obj] else: raise ValueError(f"Invalid Value for pad_mode: {self.pad_mode}") return item