Source code for minerva.data.datasets.har_rodrigues_24

import os
from pathlib import Path
from typing import Iterable, List, Optional, Union

import numpy as np
import pandas as pd
import torch
from numpy.lib.stride_tricks import as_strided as ast
from torch.utils.data import Dataset

from minerva.utils.typing import PathLike


[docs] def norm_shape(shape): """ Normalize numpy array shapes so they're always expressed as a tuple, even for one-dimensional shapes. Parameters ---------- shape : int, tuple, or numpy.ndarray The shape to be normalized. Returns ------- Tuple[int, ...] The normalized shape. """ if isinstance(shape, int): return (shape,) elif isinstance(shape, tuple): return shape elif isinstance(shape, np.ndarray): return tuple(shape.tolist()) else: raise TypeError("shape must be an int, a tuple of ints, or a numpy array")
[docs] def sliding_window(a, ws, ss=None, flatten=True): """ Return a sliding window over a in any number of dimensions Parameters: a - an n-dimensional numpy array ws - an int (a is 1D) or tuple (a is 2D or greater) representing the size of each dimension of the window ss - an int (a is 1D) or tuple (a is 2D or greater) representing the amount to slide the window in each dimension. If not specified, it defaults to ws. flatten - if True, all slices are flattened, otherwise, there is an extra dimension for each dimension of the input. Returns an array containing each n-dimensional window from a """ if None is ss: # ss was not provided. the windows will not overlap in any direction. ss = ws if isinstance(ws, int) and ws < 1: raise ValueError("ws must be at least 1") if isinstance(ss, int) and ss < 1: raise ValueError("ss must be at least 1") # Will transform the ws and ss into a tuple if they are integers ws = norm_shape(ws) ss = norm_shape(ss) # convert ws, ss, and a.shape to numpy arrays so that we can do math in # every dimension at once. ws = np.array(ws) ss = np.array(ss) shape = np.array(a.shape) # ensure that ws, ss, and a.shape all have the same number of dimensions ls = [len(shape), len(ws), len(ss)] if 1 != len(set(ls)): raise ValueError( "a.shape, ws and ss must all have the same length. They were %s" % str(ls) ) # ensure that ws is smaller than a in every dimension if np.any(ws > shape): raise ValueError("ws cannot be larger than a in any dimension.\ a.shape was %s and ws was %s" % (str(a.shape), str(ws))) # how many slices will there be in each dimension? newshape = norm_shape(((shape - ws) // ss) + 1) # the shape of the strided array will be the number of slices in each dimension # plus the shape of the window (tuple addition) newshape += norm_shape(ws) # the strides tuple will be the array's strides multiplied by step size, plus # the array's strides (tuple addition) newstrides = norm_shape(np.array(a.strides) * ss) + a.strides strided = ast(a, shape=newshape, strides=newstrides) if not flatten: return strided # Collapse strided so that it has one more dimension than the window. I.e., # the new array is a flat list of slices. meat = len(ws) if ws.shape else 0 firstdim = (np.prod(newshape[:-meat]),) if ws.shape else () dim = firstdim + (newshape[-meat:]) # remove any dimensions with size 1 # dim = filter(lambda i : i != 1,dim) return strided.reshape(dim)
[docs] def opp_sliding_window(data_x, data_y, ws, ss): data_x = sliding_window(data_x, (ws, data_x.shape[1]), (ss, 1)) data_y = np.reshape(data_y, (len(data_y),)) data_y = np.asarray([[i[-1]] for i in sliding_window(data_y, ws, ss)]) return data_x.astype(np.float32), data_y.reshape(len(data_y)).astype(np.uint8)
[docs] class HARDatasetCPC(Dataset): def __init__( self, data_path: Union[PathLike, List[PathLike]], input_size: int, window: int, overlap: int, phase: str = "train", use_train_as_val: bool = False, use_val_with_train: bool = True, columns: Optional[List[str]] = None, label: Optional[str] = "standard activity code", transpose_data: bool = True, ): """ Initializes the dataset by loading the dataset from CSV files, segmenting the data into windows, and preparing it for training or evaluation. Parameters ---------- data_path : Union[PathLike, List[PathLike]] The path to the directory containing the dataset files. If a list of paths is provided, the datasets will be concatenated, in the order provided, into a single dataset. input_size : int The expected size of input features. window : int The size of the sliding window used to segment the data. overlap : int The overlap between consecutive windows. phase : str The phase of the dataset ('train', 'val', or 'test'). use_train_as_val : bool Whether to use the training set as the validation set. use_val_with_train : bool Whether to use the validation set as the training set. columns : Optional[List[str]] The columns to be used as input features. If None, the default columns ['accel-x', 'accel-y', 'accel-z', 'gyro-x', 'gyro-y', 'gyro-z'] will be used. label : Optional[str] The column to be used as the label. If None, no labels will be used. If 'return_index_as_label', the index of the data will be used as the label. transpose_data : bool If True, the data will be returned as a vector of shape (C, T), else the data will be returned as a vector of shape (T, C). """ # Create a list of paths if only one path is provided self.paths = data_path if isinstance(data_path, list) else [data_path] self.use_train_as_val = use_train_as_val self.use_val_with_train = use_val_with_train self.label = label self.transpose_data = transpose_data self.input_size = input_size self.columns = ( columns if columns is not None else [ "accel-x", "accel-y", "accel-z", "gyro-x", "gyro-y", "gyro-z", ] ) self.data_raw = self.load_dataset() assert input_size == self.data_raw[phase]["data"].shape[1] # Obtaining the segmented data self.data, self.labels = opp_sliding_window( self.data_raw[phase]["data"], self.data_raw[phase]["labels"], window, overlap, ) if self.label and self.label == "return_index_as_label": datum_index = np.arange(len(self.data)) np.random.shuffle(datum_index) self.labels = datum_index # Transpose the data if required if self.transpose_data: if self.data.ndim == 2: self.data = self.data.T elif self.data.ndim == 3: self.data = self.data.transpose(0, 2, 1) # Load .csv file
[docs] def load_dataset(self): """ Loads the dataset from CSV files, concatenates them into numpy arrays, and converts them to the appropriate data types. Returns ------- dict A dictionary containing 'data' and 'labels' for 'train', 'val', and 'test' phases, where 'data' is a numpy array of concatenated data and 'labels' is a numpy array of concatenated labels. """ datasets = {} for phase in ["train", "val", "test"]: if phase == "val": if self.use_train_as_val: datasets[phase] = datasets["train"] continue data_x = [] data_y = [] for path in self.paths: path = Path(path) phase_path = path / phase for f in phase_path.glob("*.csv"): data = pd.read_csv(f) x = data[self.columns].values if self.label and self.label != "return_index_as_label": y = data[self.label].values else: y = np.arange(len(x)) data_x.append(x) data_y.append(y) datasets[phase] = { "data": np.concatenate(data_x), "labels": np.concatenate(data_y), } datasets[phase]["data"] = datasets[phase]["data"].astype(np.float32) datasets[phase]["labels"] = datasets[phase]["labels"].astype(np.uint8) # If use_val_with_train is True, concatene the training and validation datasets. if self.use_val_with_train and "val" in datasets: datasets["train"]["data"] = np.concatenate( [datasets["train"]["data"], datasets["val"]["data"]] ) datasets["train"]["labels"] = np.concatenate( [datasets["train"]["labels"], datasets["val"]["labels"]] ) return datasets
[docs] def __len__(self): return len(self.data)
[docs] def __getitem__(self, index): data = self.data[index] if self.label: return data, self.labels[index] return data