Source code for minerva.data.datasets.har_xu_23

import os
from typing import List, Tuple

import numpy as np
import torch
from sklearn.metrics.pairwise import cosine_similarity
from statsmodels.tsa.stattools import adfuller
from torch.utils.data import Dataset

from minerva.utils.typing import PathLike


[docs] class TNCDataset(Dataset): def __init__( self, x: np.array, mc_sample_size: int = 5, window_size: int = 128, epsilon=3, adf: bool = True, ): """ This TNCDataset class is designed to handle time series data for the TNC (Temporal Neighborhood Coding) task. It includes methods to load data, find close neighbors using ADF testing or cosine similarity, and find distant non-neighbors. The dataset returns a tuple of the central window, close neighbors, and distant non-neighbors for each sample. The `time_series` input should have the shape (n_samples, n_channels, n_timesteps). The `__getitem__` method returns: - `central_window`: (n_channels, window_size) - `close_neighbors`: (mc_sample_size, n_channels, window_size) - `non_neighbors`: (mc_sample_size, n_channels, window_size) Parameters ---------- x : np.ndarray The time series data of shape (n_samples, n_channels, n_timesteps). mc_sample_size : int This value determines how many neighboring and non-neighboring windows are used per data sample. window_size : int The size of the window to be used for each sample. epsilon : int, optional This parameter controls the "spread" of neighboring windows. Higher values lead to more diverse neighbors within a larger search radius around the center window. adf : bool, optional A flag indicating whether to use ADF (Augmented Dickey-Fuller) testing for finding neighbors. Defaults to True. Neighbor Selection ------------------ The selection of neighbors and non-neighbors is crucial for TNC. Here's how it's done: 1. **Finding Close Neighbors**: - **ADF (Augmented Dickey-Fuller) Testing**: - The ADF test checks the stationarity of the time series segments. - For each time window of size `w_t` (ranging from `window_size` to `4 * window_size`), the ADF test is applied to determine the p-value. - The average p-value across all channels is calculated. - The neighborhood size `epsilon` is determined based on the p-values. If all p-values are below the threshold (0.01), `epsilon` is set to the length of `corr`, otherwise, it is set to the first index where the p-value exceeds 0.01. - The `delta` is then set to `5 * epsilon * window_size`. - Neighboring time steps are generated by adding a random value from a normal distribution scaled by `epsilon * window_size` to the current time step `t`. - These time steps are adjusted to ensure they are within valid bounds. - **Cosine Similarity**: - If ADF is not used, cosine similarity is employed to find close neighbors. - The target window (current segment) is flattened, and its cosine similarity with all other windows of the same size in the time series is calculated. - The top `mc_sample_size` windows with the highest cosine similarity are selected as neighbors. - The selected time steps are adjusted to ensure they are within valid bounds. 2. **Finding Distant Non-Neighbors**: - The method `_find_non_neighbors` generates non-neighbors by selecting time steps far from the current time step `t`. - Depending on whether `t` is in the first or second half of the time series, the non-neighbor time steps are selected to be either before or after the `delta` range. - A fallback mechanism ensures at least one non-neighbor segment is returned, even if the primary selection fails. Example Usage ------------- ```python # Example configuration from minerva.data.datasets.har_xu_23 import TNCDataset import numpy as np data = np.random.randn(100, 6, 1000) # (samples, channels, timesteps) # Instantiate the dataset tnc_dataset = TNCDataset( x=data, mc_sample_size=mc_sample_size, window_size=window_size, epsilon=epsilon, adf=adf ) # Retrieve a sample from the dataset central_window, close_neighbors, non_neighbors = tnc_dataset[0] print("Central Window Shape:", central_window.shape) # (window_size,n_channels) print("Close Neighbors Shape:", close_neighbors.shape) # (mc_sample_size,window_size, n_channels, ) print("Non-Neighbors Shape:", non_neighbors.shape) # (mc_sample_size, n_channels, window_size) ``` """ super(TNCDataset, self).__init__() self.time_series = x self.T = x.shape[-1] self.window_size = window_size self.mc_sample_size = mc_sample_size self.adf = adf if not self.adf: self.epsilon = epsilon self.delta = 5 * window_size * epsilon
[docs] def __len__(self): """ Returns the number of samples in the dataset. Returns ------- int The number of samples in the dataset. """ return self.time_series.shape[0]
[docs] def __getitem__(self, ind): """ Returns a sample from the dataset. Parameters ---------- ind : int The index of the sample to retrieve. Returns ------- tuple A tuple containing the central window, close neighbors, and distant non-neighbors. """ ind = ind % len(self.time_series) t = np.random.randint(2 * self.window_size, self.T - 2 * self.window_size) x_t = ( torch.from_numpy( self.time_series[ind][ :, t - self.window_size // 2 : t + self.window_size // 2 ] ) .to(torch.float) .transpose(-1, -2) ) X_close = ( torch.from_numpy(self._find_neighours(self.time_series[ind], t)) .to(torch.float) .transpose(-1, -2) ) X_distant = ( torch.from_numpy(self._find_non_neighours(self.time_series[ind], t)) .to(torch.float) .transpose(-1, -2) ) return x_t, X_close, X_distant
[docs] def _find_neighours(self, x, t): """ Finds close neighbors for a given time step. Parameters ---------- x : np.ndarray The time series data for a single sample. t : int The current time step. Returns ------- np.ndarray An array of close neighbors. """ T = self.time_series.shape[-1] if self.adf: gap = self.window_size corr = [] for w_t in range(self.window_size, 4 * self.window_size, gap): try: p_val = 0 for f in range(x.shape[-2]): p = adfuller( np.array( x[ f, max(0, t - w_t) : min(x.shape[-1], t + w_t), ].reshape( -1, ) ) )[1] p_val += 0.01 if np.isnan(p) else p corr.append(p_val / x.shape[-2]) except: corr.append(0.6) self.epsilon = ( len(corr) if len(np.where(np.array(corr) >= 0.01)[0]) == 0 else (np.where(np.array(corr) >= 0.01)[0][0] + 1) ) self.delta = 5 * self.epsilon * self.window_size t_p = [ int(t + np.random.randn() * self.epsilon * self.window_size) for _ in range(self.mc_sample_size) ] t_p = [ max( self.window_size // 2 + 1, min(t_pp, T - self.window_size // 2), ) for t_pp in t_p ] x_p = np.stack( [ x[ :, t_ind - self.window_size // 2 : t_ind + self.window_size // 2, ] for t_ind in t_p ] ) else: target_window = x[ :, t - self.window_size // 2 : t + self.window_size // 2 ].flatten() similarities = [] gap = self.window_size for w_t in range(self.window_size, T - self.window_size, gap): window = x[ :, w_t - self.window_size // 2 : w_t + self.window_size // 2 ].flatten() cos_sim = cosine_similarity([target_window], [window])[0][0] similarities.append((w_t, cos_sim)) similarities = sorted(similarities, key=lambda x: x[1], reverse=True) t_p = [w_t for w_t, _ in similarities[: self.mc_sample_size]] t_p = [ max( self.window_size // 2 + 1, min(t_pp, T - self.window_size // 2), ) for t_pp in t_p ] x_p = np.stack( [ x[ :, t_ind - self.window_size // 2 : t_ind + self.window_size // 2, ] for t_ind in t_p ] ) return x_p
[docs] def _find_non_neighours(self, x, t): """ Finds distant non-neighbors for a given time step. Parameters ---------- x : np.ndarray The time series data for a single sample. t : int The current time step. Returns ------- np.ndarray An array of distant non-neighbors. """ T = self.time_series.shape[-1] if t > T / 2: t_n = np.random.randint( self.window_size // 2, max((t - self.delta + 1), self.window_size // 2 + 1), self.mc_sample_size, ) else: t_n = np.random.randint( min((t + self.delta), (T - self.window_size - 1)), (T - self.window_size // 2), self.mc_sample_size, ) x_n = np.stack( [ x[ :, t_ind - self.window_size // 2 : t_ind + self.window_size // 2, ] for t_ind in t_n ] ) if len(x_n) == 0: rand_t = np.random.randint(0, self.window_size // 5) if t > T / 2: x_n = x[:, rand_t : rand_t + self.window_size].unsqueeze(0) else: x_n = x[:, T - rand_t - self.window_size : T - rand_t].unsqueeze(0) return x_n
[docs] class HarDataset(Dataset): def __init__( self, data_path: PathLike, annotate: str, feature_column_prefixes: List[str] = [ "accel-x", "accel-y", "accel-z", "gyro-x", "gyro-y", "gyro-z", ], target_column: str = "standard activity code", flatten: bool = False, ): """ Dataset class for human activity recognition (HAR) data. Loads and prepares data from `.npy` files and returns features and labels. Parameters ---------- data_path : PathLike Path to the directory containing dataset files. The directory should contain the following files: - train_data_subseq.npy - train_labels_subseq.npy - val_data.npy - val_labels_subseq.npy - test_data.npy - test_labels_subseq.npy These files should correspond to data segmented into subsequences and their labels. annotate : str Annotation type, indicating which subset of the data to load ('train', 'val', or 'test'). feature_column_prefixes : List[str], optional List of prefixes for feature columns. Defaults to: ["accel-x", "accel-y", "accel-z", "gyro-x", "gyro-y", "gyro-z"]. target_column : str, optional Name of the column for the target variable. Defaults to 'standard activity code'. flatten : bool, optional If True, flattens the input data. Defaults to False. Attributes ---------- data : numpy.ndarray Array of features with shape (num_samples, num_timesteps, num_features). - num_samples: Total number of samples in the dataset. - num_timesteps: Length of each subsequence (e.g., 128). - num_features: Number of features per timestep (e.g., 6 for accelerometer and gyroscope data). labels : numpy.ndarray Array of labels with shape (num_samples,). - num_samples: Total number of samples in the dataset. Methods ------- __len__() -> int Returns the number of samples in the dataset. __getitem__(idx: int) -> Tuple[torch.Tensor, int] Retrieves a sample from the dataset. - Features shape: [num_timesteps, num_features] if `flatten` is False, otherwise [num_timesteps * num_features]. - Label shape: Scalar. Examples -------- from minerva.data.datasets.har_xu_23 import HarDataset >>> dataset = HarDataset(data_path="/path/to/data", annotate="train") >>> len(dataset) 3178 >>> sample = dataset[0] >>> features, label = sample >>> features.shape torch.Size([128, 6]) >>> label tensor(4) """ super().__init__() self.data_path = data_path self.annotate = annotate self.feature_column_prefixes = feature_column_prefixes self.target_column = target_column self.flatten = flatten self.data = np.load( os.path.join(self.data_path, f"{self.annotate}_data_subseq.npy") ) self.labels = np.load( os.path.join(self.data_path, f"{self.annotate}_labels_subseq.npy") ) # self.labels = np.load(self.data_path / f"{self.annotate}_labels_subseq.npy") assert len(self.data) == len( self.labels ), "Data and labels must have the same length"
[docs] def __len__(self): return len(self.data)
[docs] def __getitem__(self, idx: int) -> Tuple[torch.Tensor, int]: """ Get a sample from the dataset. Parameters ---------- idx : int Index of the sample to retrieve. Returns ------- Tuple[torch.Tensor, int] Tuple containing the features and the target label. """ data = self.data[idx] if self.flatten: data = data.flatten() features = data target = self.labels[idx] # Convert to torch.FloatTensor and torch.LongTensor features = torch.FloatTensor(features) target = torch.tensor(target, dtype=torch.long) return features, target