import os
import random
from collections import defaultdict
from pathlib import Path
from typing import Callable, Dict, List, Optional, Tuple, Union
import lightning as L
from torch.utils.data import ConcatDataset, DataLoader, Subset
from minerva.data.datasets.series_dataset import (
MultiModalSeriesCSVDataset,
SeriesFolderCSVDataset,
)
from minerva.samplers.domain_sampler import RandomDomainSampler
from minerva.utils.typing import PathLike
[docs]
def parse_num_workers(num_workers: Optional[int] = None) -> int:
"""Parse the num_workers parameter. If None, use all cores.
Parameters
----------
num_workers : int
Number of workers to load data. If None, then use all cores
Returns
-------
int
Number of workers to load data.
"""
n = num_workers if num_workers is not None else os.cpu_count()
return n or 1
[docs]
class UserActivityFolderDataModule(L.LightningDataModule):
def __init__(
self,
# Dataset Params
data_path: PathLike,
features: List[str] = (
"accel-x",
"accel-y",
"accel-z",
"gyro-x",
"gyro-y",
"gyro-z",
), # type: ignore
label: str = "standard activity code",
pad: bool = False,
transforms: Optional[Union[List[Callable], Dict[str, List[Callable]]]] = None,
cast_to: str = "float32",
# Loader params
batch_size: int = 1,
num_workers: Optional[int] = None,
):
"""Define the dataloaders for train, validation and test splits for
HAR datasets. The data must be in the following folder structure:
It is a wrapper around ``SeriesFolderCSVDataset`` dataset class.
The ``SeriesFolderCSVDataset`` class assumes that the data is in a
folder with multiple CSV files. Each CSV file is a single sample that
can be composed of multiple time steps (rows). Each column is a feature
of the sample.
For instance, if we have two samples, user-1.csv and user-2.csv,
the directory structure will look something like:
data_path
├── user-1.csv
└── user-2.csv
And the data will look something like:
- user-1.csv:
+---------+---------+--------+
| accel-x | accel-y | class |
+---------+---------+--------+
| 0.502123| 0.02123 | 1 |
| 0.682012| 0.02123 | 1 |
| 0.498217| 0.00001 | 1 |
+---------+---------+--------+
- user-2.csv:
+---------+---------+--------+
| accel-x | accel-y | class |
+---------+---------+--------+
| 0.502123| 0.02123 | 0 |
| 0.682012| 0.02123 | 0 |
| 0.498217| 0.00001 | 0 |
| 3.141592| 1.414141| 0 |
+---------+---------+--------+
The ``features`` parameter is used to select the columns that will be
used as features. For instance, if we want to use only the accel-x
column, we can set ``features=["accel-x"]``. If we want to use both
accel-x and accel-y, we can set ``features=["accel-x", "accel-y"]``.
The label column is specified by the ``label`` parameter. Note that we
have one label per time-step and not a single label per sample.
The dataset will return a 2-element tuple with the data and the label,
if the ``label`` parameter is specified, otherwise return only the data.
Parameters
----------
data_path : PathLike
The location of the directory with CSV files.
features: List[str]
A list with column names that will be used as features. If None,
all columns except the label will be used as features.
pad: bool, optional
If True, the data will be padded to the length of the longest
sample. Note that padding will be applyied after the transforms,
and also to the labels if specified.
label: str, optional
Specify the name of the column with the label of the data
transforms : Union[List[Callable], Dict[str, List[Callable]]], optional
This could be:
- None: No transforms will be applied
- List[Callable]: A list of transforms that will be applied to the
data. The same transforms will be applied to all splits.
- Dict[str, List[Callable]]: A dictionary with the split name as
key and a list of transforms as value. The split name must be
one of: "train", "validation", "test" or "predict".
cast_to: str, optional
Cast the numpy data to the specified type
batch_size : int, optional
The size of the batch
num_workers : int, optional
Number of workers to load data. If None, then use all cores
"""
super().__init__()
# ---- Dataset Parameters ----
# Allowing multiple datasets
self.data_path = Path(data_path)
self.features = features
self.label = label
self.pad = pad
self.transforms = parse_transforms(transforms)
# ---- Loader Parameters ----
self.batch_size = batch_size
self.num_workers = parse_num_workers(num_workers)
self.cast_to = cast_to
# ---- Class specific ----
self.datasets = {}
[docs]
def _load_dataset(self, split_name: str) -> SeriesFolderCSVDataset:
"""Create a ``SeriesFolderCSVDataset`` dataset with the given split.
Parameters
----------
split_name : str
Name of the split (train, validation or test). This will be used to
load the corresponding CSV file.
Returns
-------
SeriesFolderCSVDataset
The dataset with the given split.
"""
assert split_name in [
"train",
"validation",
"test",
"predict",
], f"Invalid split_name: {split_name}"
if split_name == "predict":
split_name = "test"
return SeriesFolderCSVDataset(
self.data_path / split_name,
features=self.features,
label=self.label,
pad=self.pad,
transforms=self.transforms[split_name],
cast_to=self.cast_to,
)
[docs]
def setup(self, stage: str):
"""Assign the datasets to the corresponding split. ``self.datasets``
will be a dictionary with the split name as key and the dataset as
value.
Parameters
----------
stage : str
The stage of the setup. This could be:
- "fit": Load the train and validation datasets
- "test": Load the test dataset
- "predict": Load the predict dataset
Raises
------
ValueError
If the stage is not one of: "fit", "test" or "predict"
"""
if stage == "fit":
self.datasets["train"] = self._load_dataset("train")
self.datasets["validation"] = self._load_dataset("validation")
elif stage == "test":
self.datasets["test"] = self._load_dataset("test")
elif stage == "predict":
self.datasets["predict"] = self._load_dataset("test")
else:
raise ValueError(f"Invalid setup stage: {stage}")
[docs]
def _get_loader(self, split_name: str, shuffle: bool) -> DataLoader:
"""Get a dataloader for the given split.
Parameters
----------
split_name : str
The name of the split. This must be one of: "train", "validation",
"test" or "predict".
shuffle : bool
Shuffle the data or not.
Returns
-------
DataLoader
A dataloader for the given split.
"""
return DataLoader(
self.datasets[split_name],
batch_size=self.batch_size,
num_workers=self.num_workers,
shuffle=shuffle,
pin_memory=True,
)
[docs]
def train_dataloader(self) -> DataLoader:
return self._get_loader("train", shuffle=True)
[docs]
def val_dataloader(self) -> DataLoader:
return self._get_loader("validation", shuffle=False)
[docs]
def test_dataloader(self) -> DataLoader:
return self._get_loader("test", shuffle=False)
[docs]
def predict_dataloader(self) -> DataLoader:
return self._get_loader("predict", shuffle=False)
[docs]
def __str__(self):
return f"UserActivityFolderDataModule(data_path={self.data_path}, batch_size={self.batch_size})"
[docs]
def __repr__(self) -> str:
return str(self)
[docs]
class MultiModalHARSeriesDataModule(L.LightningDataModule):
def __init__(
self,
# Dataset params
data_path: PathLike | List[PathLike],
feature_prefixes: List[str] = (
"accel-x",
"accel-y",
"accel-z",
"gyro-x",
"gyro-y",
"gyro-z",
), # type: ignore
label: str = "standard activity code",
features_as_channels: bool = True,
transforms: Optional[Union[List[Callable], Dict[str, List[Callable]]]] = None,
cast_to: str = "float32",
# Loader params
batch_size: int = 1,
num_workers: Optional[int] = None,
data_percentage: float = 1.0,
use_train_as_validation: bool = False,
use_val_with_train: bool = False,
map_labels: Optional[Dict[int, int]] = None,
drop_last: bool = True,
n_domains_per_sample: Optional[int] = None,
samples_per_class: Optional[int] = None,
seed: Optional[int] = None,
predict_split: str = "test",
shuffle_train: bool = True,
):
"""Define the dataloaders for train, validation and test splits for
HAR datasets. This datasets assumes that the data is in a single CSV
file with series of data. Each row is a single sample that can be
composed of multiple modalities (series). Each column is a feature of
some series with the prefix indicating the series. The suffix may
indicates the time step. For instance, if we have two series, accel-x
and accel-y, the data will look something like:
+-----------+-----------+-----------+-----------+--------+
| accel-x-0 | accel-x-1 | accel-y-0 | accel-y-1 | class |
+-----------+-----------+-----------+-----------+--------+
| 0.502123 | 0.02123 | 0.502123 | 0.502123 | 0 |
| 0.6820123 | 0.02123 | 0.502123 | 0.502123 | 1 |
| 0.498217 | 0.00001 | 1.414141 | 3.141592 | 2 |
+-----------+-----------+-----------+-----------+--------+
The ``feature_prefixes`` parameter is used to select the columns that
will be used as features. For instance, if we want to use only the
accel-x series, we can set ``feature_prefixes=["accel-x"]``. If we want
to use both accel-x and accel-y, we can set
``feature_prefixes=["accel-x", "accel-y"]``. If None is passed, all
columns will be used as features, except the label column.
The label column is specified by the ``label`` parameter.
The dataset will return a 2-element tuple with the data and the label,
if the ``label`` parameter is specified, otherwise return only the data.
If ``features_as_channels`` is ``True``, the data will be returned as a
vector of shape `(C, T)`, where C is the number of channels (features)
and `T` is the number of time steps. Else, the data will be returned as
a vector of shape T*C (a single vector with all the features).
Parameters
----------
data_path : PathLike
The path to the folder with "train.csv", "validation.csv" and
"test.csv" files inside it.
feature_prefixes : Union[str, List[str]], optional
The prefix of the column names in the dataframe that will be used
to become features. If None, all columns except the label will be
used as features.
label : str, optional
The name of the column that will be used as label
features_as_channels : bool, optional
If True, the data will be returned as a vector of shape (C, T),
else the data will be returned as a vector of shape T*C.
cast_to: str, optional
Cast the numpy data to the specified type
transforms : Union[List[Callable], Dict[str, List[Callable]]], optional
This could be:
- None: No transforms will be applied
- List[Callable]: A list of transforms that will be applied to the
data. The same transforms will be applied to all splits.
- Dict[str, List[Callable]]: A dictionary with the split name as
key and a list of transforms as value. The split name must be
one of: "train", "validation", "test" or "predict".
batch_size : int, optional
The size of the batch
num_workers : int, optional
Number of workers to load data. If None, then use all cores
data_percentage : float, optional
The percentage of the data that will be used. This is useful to
create a small datasets.
use_train_as_validation : bool, optional
If True, the train dataset will be used as validation dataset.
use_val_with_train: bool, optional
If True, the validation and train sets will be concatenated in
order to create a large train set. By default, this is False.
map_labels : Dict[int, int], optional
A dictionary to map the labels to a new label. The key is the
original label and the value is the new label.
drop_last : bool, optional
Drop the last batch if it is not complete.
n_domains_per_sample : int, optional
This is inly useful when using multiple domains (`data_path`). It
will allow creating batches with same number of samples from
multiple domains. If None, it will just use concatenate all
datasets and sample in a non-stratified way. By default, None-
samples_per_class : int, optional
If not None, use this number of samples per class for the train
split. This will override the data_percentage parameter.
seed: Optional[int] = None
Seed for sampling the dataset. If None, no seed is set.
predict_split: str
The name of the split to use for prediction. This will be used to
load the dataset for prediction. By default, this is "test".
shuffle_train: str
If True, the train dataset will be shuffled.
Notes
-----
- If `data_percentage` is set to a value less than 1.0, a random subset
of the dataset will be used, containing approximately the specified
percentage of the total data. This sampling is not stratified.
- If `samples_per_class` is specified, the train split will contain an
equal number of samples for each class, as defined by this
parameter. This option is mutually exclusive with data_percentage;
both cannot be used at the same time.
- The `seed` parameter controls the randomness of sampling: If `seed` is
set (i.e., an integer), sampling becomes deterministic, ensuring
the same subset is selected on each run. This improves
reproducibility and supports cumulative sampling—for example,
progressively increasing `samples_per_class` will retain consistency
across runs by sampling the same initial elements. If seed is
`None`, sampling is non-deterministic, and different subsets may
be chosen each time.
Raises
------
ValueError
If `samples_per_class` and `data_percentage` are both set.
"""
super().__init__()
self.data_path = data_path if isinstance(data_path, list) else [data_path]
self.data_path = [Path(data) for data in self.data_path]
self.feature_prefixes = feature_prefixes
if isinstance(self.feature_prefixes, tuple):
self.feature_prefixes = list(self.feature_prefixes)
self.label = label
self.features_as_channels = features_as_channels
self.transforms = parse_transforms(transforms)
self.cast_to = cast_to
self.batch_size = batch_size
self.num_workers = parse_num_workers(num_workers)
self.data_percentage = data_percentage
self.datasets = {}
self.use_train_as_validation = use_train_as_validation
self.use_val_with_train = use_val_with_train
self.map_labels = map_labels
self.drop_last = drop_last
self.n_domains_per_sample = n_domains_per_sample
self.samples_per_class = samples_per_class
if self.data_percentage < 1.0 and self.samples_per_class is not None:
raise ValueError("Cannot use both data_percentage and samples_per_class")
if self.data_percentage > 1.0 or self.data_percentage < 0.0:
raise ValueError("data_percentage must be between 0 and 1.0, inclusive.")
self.seed = seed
self.rng = random.Random(seed)
self.predict_split = predict_split
self.shuffle_train = shuffle_train
[docs]
def _sample_dataset(self, dataset):
"""Sample the dataset based on the specified parameters.
If `samples_per_class` is specified, a subset will be created
containing the specified number of samples for each class. If
`data_percentage` is specified, a random subset of the dataset
will be created containing approximately the specified percentage
of the total data. If neither is specified, the entire dataset
will be returned.
Note
-----
The `seed` parameter controls the randomness of sampling:
If `seed` is set (i.e., an integer), sampling becomes deterministic,
ensuring the same subset is selected on each run and allowing for
cumulative sampling (e.g., progressively increasing `samples_per_class`
will retain consistency across runs by sampling the same initial
elements). If `seed` is `None`, sampling is non-deterministic, and
different subsets may be chosen each time.
Parameters
----------
dataset: Dataset
A map-like dataset to sample from. This should be a-
Returns
-------
Dataset
A sampled dataset.
Raises
------
ValueError
If `samples_per_class` is specified and a class has fewer
samples than the specified number.
"""
if self.samples_per_class is not None:
if self.samples_per_class <= 0:
raise ValueError("samples_per_class must be a positive integer.")
class_indices = defaultdict(list)
dset_indices = list(range(len(dataset)))
self.rng.shuffle(dset_indices) # Shuffle indices to ensure randomness
# Group indices by class (note that are shuffled before!)
# Thus, each list in class_indices will contain indices of samples (shuffled)
for idx in dset_indices:
label = dataset[idx][1]
class_indices[label].append(idx)
sampled_indices = []
for label, indices in class_indices.items():
if len(indices) < self.samples_per_class:
raise ValueError(
f"Class {label} has only {len(indices)} samples, "
f"but {self.samples_per_class} were requested."
)
sampled_indices += indices[: self.samples_per_class]
subset_ = Subset(dataset, sampled_indices)
return subset_
elif self.data_percentage < 1.0:
# Existing percentage-based sampling
indices = list(range(len(dataset)))
self.rng.shuffle(indices)
indices = indices[: int(self.data_percentage * len(dataset))]
subset_ = Subset(dataset, indices)
return subset_
else:
return dataset
[docs]
def _load_dataset(
self, split_name: str
) -> Tuple[Union[MultiModalSeriesCSVDataset, ConcatDataset], List[int]]:
"""Create a ``MultiModalSeriesCSVDataset`` dataset with the given split.
Parameters
----------
split_name : str
The name of the split. This must be one of: "train", "validation",
"test" or "predict".
Returns
-------
MultiModalSeriesCSVDataset
A MultiModalSeriesCSVDataset dataset with the given split.
"""
if split_name == "predict":
split_name = self.predict_split
assert split_name in [
"train",
"validation",
"test",
"predict",
], f"Invalid split_name: {split_name}"
datasets = []
domain_labels = []
for i, data in enumerate(self.data_path):
data = Path(data)
dataset = MultiModalSeriesCSVDataset(
data / f"{split_name}.csv",
feature_prefixes=self.feature_prefixes,
label=self.label,
features_as_channels=self.features_as_channels,
cast_to=self.cast_to,
transforms=self.transforms[split_name],
map_labels=self.map_labels,
)
# Apply sampling only to the train split
if split_name == "train":
dataset = self._sample_dataset(dataset)
domain_labels += [i] * len(dataset)
datasets.append(dataset)
if len(datasets) == 1:
return datasets[0], domain_labels
else:
return ConcatDataset(datasets), domain_labels
[docs]
def setup(self, stage: str):
"""Assign the datasets to the corresponding split. ``self.datasets``
will be a dictionary with the split name as key and the dataset as
value.
Parameters
----------
stage : str
The stage of the setup. This could be:
- "fit": Load the train and validation datasets
- "test": Load the test dataset
- "predict": Load the predict dataset
Raises
------
ValueError
If the stage is not one of: "fit", "test" or "predict"
"""
if stage == "fit":
self.datasets["train"] = self._load_dataset("train")
if self.use_val_with_train:
train_dataset, train_domains = self.datasets["train"]
val_dataset, val_domains = self._load_dataset("validation")
train_dataset = ConcatDataset([train_dataset, val_dataset])
train_domains += val_domains
self.datasets["train"] = (train_dataset, train_domains)
if self.use_train_as_validation:
self.datasets["validation"] = self.datasets["train"]
else:
self.datasets["validation"] = self._load_dataset("validation")
elif stage == "test":
self.datasets["test"] = self._load_dataset("test")
elif stage == "predict":
self.datasets["predict"] = self._load_dataset("predict")
else:
raise ValueError(f"Invalid setup stage: {stage}")
[docs]
def _get_loader(self, split_name: str, shuffle: bool) -> DataLoader:
"""Get a dataloader for the given split.
Parameters
----------
split_name : str
The name of the split. This must be one of: "train", "validation",
"test" or "predict".
shuffle : bool
Shuffle the data or not.
Returns
-------
DataLoader
A dataloader for the given split.
"""
dataset, domain_labels = self.datasets[split_name]
if self.n_domains_per_sample is not None:
print(
f"Using DataLoader with RandomDomainSampler with n_domains_per_sample={self.n_domains_per_sample}"
)
sampler = RandomDomainSampler(
dataset,
domain_labels,
batch_size=self.batch_size,
consistent_iterating=False,
n_domains_per_sample=self.n_domains_per_sample,
)
return DataLoader(
dataset,
batch_sampler=sampler,
num_workers=self.num_workers,
)
else:
print(f"Using DataLoader with shuffle={shuffle}")
return DataLoader(
dataset,
batch_size=self.batch_size,
num_workers=self.num_workers,
shuffle=shuffle,
pin_memory=True,
drop_last=self.drop_last,
)
[docs]
def train_dataloader(self) -> DataLoader:
return self._get_loader("train", shuffle=self.shuffle_train)
[docs]
def val_dataloader(self) -> DataLoader:
return self._get_loader("validation", shuffle=False)
[docs]
def test_dataloader(self) -> DataLoader:
return self._get_loader("test", shuffle=False)
[docs]
def predict_dataloader(self) -> DataLoader:
return self._get_loader("predict", shuffle=False)
[docs]
def __str__(self):
return f"MultiModalHARSeriesDataModule(data_path={', '.join([str(d) for d in self.data_path])}, batch_size={self.batch_size})"
[docs]
def __repr__(self) -> str:
return str(self)