Source code for minerva.models.nets.tnc

from typing import Literal
import lightning as L
import torch
import torch.nn.functional as F
import numpy as np
from minerva.models.nets.time_series.resnet import _ResNet1D

# RNN encoder used by tonekaboni


[docs] class RnnEncoder(torch.nn.Module): def __init__( self, hidden_size: int, in_channel: int, encoding_size: int, cell_type: str = 'GRU', num_layers: int = 1, device: str = 'cpu', dropout: int = 0, bidirectional: bool = True, permute: bool = False, squeeze: bool = True, ): """ Initializes an RnnEncoder instance. This encoder utilizes a recurrent neural network (RNN) to encode sequential data, such as accelerometer and gyroscope readings from human activity recognition tasks. Parameters ---------- hidden_size : int Size of the hidden state in the RNN. in_channel : int Number of input channels (e.g., dimensions of accelerometer and gyroscope data). encoding_size : int Desired size of the output encoding. cell_type : str, optional Type of RNN cell to use (default is 'GRU'). Options include 'GRU', 'LSTM', etc. num_layers : int, optional Number of RNN layers (default is 1). device : str, optional Device to run the model on (default is 'cpu'). Options include 'cpu' and 'cuda'. dropout : float, optional Dropout probability (default is 0.0). bidirectional : bool, optional Whether the RNN is bidirectional (default is True). permute: bool, optional If `True` the input data will be permuted before passing through the model, by default False. squeeze: bool, optional If `True`, the outputs of RNN states is squeezed before passed to Linear layer. By default True. Examples -------- >>> device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') >>> encoder = RnnEncoder(hidden_size=100, in_channel=6, encoding_size=320, cell_type='GRU', num_layers=1, device=device, dropout=0.0, bidirectional=True).to(device) >>> element1 = torch.randn(32, 50, 6) # Batch size: 32, Time steps: 50, Input channels: 6 >>> encoding = encoder(element1.to(device)) >>> print(encoding.shape) torch.Size([32, 320]) Notes ----- - The input tensor should have the shape (batch_size, time_steps, in_channel). - The output tensor will have the shape (batch_size, encoding_size). """ super(RnnEncoder, self).__init__() self.hidden_size = hidden_size self.in_channel = in_channel self.num_layers = num_layers self.cell_type = cell_type self.encoding_size = encoding_size self.bidirectional = bidirectional self.device = device self.permute = permute self.squeeze = squeeze self.nn = torch.nn.Sequential( torch.nn.Linear( self.hidden_size * (int(self.bidirectional) + 1), self.encoding_size, ) ).to(self.device) self.rnn = torch.nn.GRU( input_size=in_channel, hidden_size=hidden_size, num_layers=num_layers, batch_first=False, dropout=dropout, bidirectional=bidirectional ).to(self.device)
[docs] def forward(self, x): """ Forward pass for the RnnEncoder. Parameters ---------- x : torch.Tensor Input tensor of shape (batch_size, time_steps, in_channel). Returns ------- torch.Tensor Encoded tensor of shape (batch_size, encoding_size). """ if self.permute: x = x.permute(2,0,1) else: x = x.permute(1, 0, 2) past = torch.zeros(self.num_layers * (int(self.bidirectional) + 1), x.shape[1], self.hidden_size).to(self.device) # print(f"Input tensor shape before passing to RNN \n: {x.shape}") # Print the shape of x out, _ = self.rnn(x.to(self.device), past) # print(f"Input tensor shape after passing to RNN :\n {out.shape}") if self.squeeze: encodings = self.nn(out[-1].squeeze(0)) # Process the output of the RNN else: encodings = self.nn(out[-1]) # print(f"4-Output encodings shape after passing to RNN :\n {encodings.shape}") return encodings
# TS2Vec encoder used by xu
[docs] class ResNetEncoder(_ResNet1D):
[docs] def forward(self, x): return super().forward(x.permute(0, 2, 1))
[docs] class DilatedConvEncoder(torch.nn.Module): def __init__(self, in_channels: int, channels: list, kernel_size: int): """ This module implements a stack of dilated convolutional blocks for feature extraction from sequential data. Parameters: ----------- - in_channels (int): Number of input channels to the first convolutional layer. - channels (list): List of integers specifying the number of output channels for each convolutional layer. - kernel_size (int): Size of the convolutional kernel. """ super().__init__() self.net = torch.nn.Sequential( *[ ConvBlock( channels[i - 1] if i > 0 else in_channels, channels[i], kernel_size=kernel_size, dilation=2**i, final=(i == len(channels) - 1), ) for i in range(len(channels)) ] )
[docs] def forward(self, x): return self.net(x)
[docs] class ConvBlock(torch.nn.Module): def __init__( self, in_channels: int, out_channels: int, kernel_size: int, dilation: int, final: bool = False, ): """ A single block of dilated convolutional layers followed by a residual connection and activation. Parameters: ----------- - in_channels (int): Number of input channels to the first convolutional layer. - out_channels (int): Number of output channels from the final convolutional layer. - kernel_size (int): Size of the convolutional kernel. - dilation (int): Dilation factor for the convolutional layers. - final (bool, optional): Whether this is the final block in the sequence (default: False). """ super().__init__() self.conv1 = SamePadConv( in_channels, out_channels, kernel_size, dilation=dilation ) self.conv2 = SamePadConv( out_channels, out_channels, kernel_size, dilation=dilation ) self.projector = ( torch.nn.Conv1d(in_channels, out_channels, 1) if in_channels != out_channels or final else None )
[docs] def forward(self, x): residual = x if self.projector is None else self.projector(x) x = torch.nn.functional.gelu(x) x = self.conv1(x) x = torch.nn.functional.gelu(x) x = self.conv2(x) return x + residual
[docs] class SamePadConv(torch.nn.Module): def __init__( self, in_channels: int, out_channels: int, kernel_size: int, dilation: int = 1, groups: int = 1, ): """ Purpose: ------- Implements a convolutional layer with padding to maintain the same output size as the input. Parameters: ----------- - in_channels (int): Number of input channels to the convolutional layer. - out_channels (int): Number of output channels from the convolutional layer. - kernel_size (int): Size of the convolutional kernel. - dilation (int, optional): Dilation factor for the convolutional layer (default: 1). - groups (int, optional): Number of blocked connections from input channels to output channels (default: 1). """ super().__init__() self.receptive_field = (kernel_size - 1) * dilation + 1 padding = self.receptive_field // 2 self.conv = torch.nn.Conv1d( in_channels, out_channels, kernel_size, padding=padding, dilation=dilation, groups=groups, ) self.remove = 1 if self.receptive_field % 2 == 0 else 0
[docs] def forward(self, x): out = self.conv(x) if self.remove > 0: out = out[:, :, : -self.remove] return out
# Discriminator aka projection head
[docs] class TSEncoder(torch.nn.Module): def __init__( self, input_dims: int, output_dims: int, hidden_dims: int = 64, depth: int = 10, permute: bool = False, encoder_cls: type = DilatedConvEncoder, encoder_cls_kwargs: dict = {}, ): """ Encoder utilizing dilated convolutional layers for encoding sequential data. Parameters ---------- input_dims : int Dimensionality of the input features. output_dims : int Desired dimensionality of the output features. hidden_dims : int, optional Number of hidden dimensions in the convolutional layers (default is 64). depth : int, optional Number of convolutional layers (default is 10). - permute : bool, optional If `True` the input data will be permuted before passing through the model, by default False. This should be removed after the encoder receives data in the shape (bs, channels, timesteps) Examples -------- >>> device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') >>> encoder = TSEncoder(input_dims=6, output_dims=320, hidden_dims=64, depth=10).to(device) >>> element1 = torch.randn(12, 128, 6) # Batch size: 12, Time steps: 128, Input channels: 6 >>> encoded_features = encoder(element1.to(device)) >>> print(encoded_features.shape) torch.Size([12, 128, 320]) Notes ----- - The input tensor should have the shape (batch_size, seq_len, input_dims). - The output tensor will have the shape (batch_size, seq_len, output_dims). - If the expected output tensor is of shape (batch_size, output_dims), consider using a pooling layer. One option is to use the `MaxPoolingTransposingSqueezingAdapter` adapter. at minerva/models/adapters.py """ super().__init__() self.input_dims = input_dims self.output_dims = output_dims self.hidden_dims = hidden_dims self.input_fc = torch.nn.Linear(input_dims, hidden_dims) self.feature_extractor = encoder_cls( hidden_dims, [hidden_dims] * depth + [output_dims], kernel_size=3, **encoder_cls_kwargs, ) self.repr_dropout = torch.nn.Dropout(p=0.1) self.permute = permute
[docs] def forward(self, x, mask=None): """ Forward pass of the encoder. Parameters: ----------- - x (torch.Tensor): Input tensor of shape (batch_size, seq_len, input_dims). - mask (str, optional): Type of masking to apply (default: None). Returns: -------- - torch.Tensor: Encoded features of shape (batch_size, seq_len, output_dims). """ if self.permute: x = x.permute(0, 2, 1) nan_mask = ~x.isnan().any(axis=-1) x[~nan_mask] = 0 x = self.input_fc(x) if mask == "binomial": mask = torch.from_numpy( np.random.binomial(1, 0.5, size=(x.size(0), x.size(1))) ).to(x.device) mask &= nan_mask x[~mask] = 0 x = x.transpose(1, 2) # B x Ch x T # print("shape of x before feature extractor",x.shape) x = self.repr_dropout(self.feature_extractor(x)) # B x Co x T # print("shape of x after feature extractor",x.shape) x = x.transpose(1, 2) # B x T x Co return x
[docs] class Discriminator_TNC(torch.nn.Module): def __init__(self, input_size: int, max_pool: bool = False): """ A discriminator model used for contrastive learning tasks, predicting whether two inputs belong to the same neighborhood in the feature space. Parameters ---------- input_size : int Dimensionality of each input. max_pool : bool, optional Whether to apply max pooling before feeding into the projection head (default is False). If using TS2Vec encoder, set to True; if using RNN, set to False. Examples -------- >>> device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') >>> discriminator = Discriminator_TNC(input_size=320, max_pool=True).to(device) >>> forward_ts2vec1 = torch.randn(12, 128, 320) # Example tensor with shape (batch_size, timesteps, encoding_size) >>> forward_ts2vec3 = torch.randn(12, 128, 320) # Another example tensor with shape (batch_size, timesteps, encoding_size) >>> output = discriminator(forward_ts2vec1, forward_ts2vec3) >>> print(output.shape) torch.Size([12]) >>> # Example with RNN encoder >>> rnn_encoder = RnnEncoder(hidden_size=100, in_channel=6, encoding_size=320, cell_type='GRU', num_layers=1, device=device, dropout=0.0, bidirectional=True).to(device) >>> element1 = torch.randn(12, 128, 6) # Batch size: 12, Time steps: 128, Input channels: 6 >>> forward_rnn1 = rnn_encoder(element1.to(device)) >>> forward_rnn2 = rnn_encoder(element1.to(device)) >>> discriminator = Discriminator_TNC(input_size=320, max_pool=False).to(device) >>> output = discriminator(forward_rnn1, forward_rnn2) >>> print(output.shape) torch.Size([12]) Notes ----- - The input tensors should have the shape (batch_size, input_size). - The output tensor will have the shape (batch_size,) representing the predicted probabilities. """ super(Discriminator_TNC, self).__init__() self.input_size = input_size self.max_pool = max_pool self.model = torch.nn.Sequential( torch.nn.Linear(2 * self.input_size, 4 * self.input_size), torch.nn.ReLU(inplace=True), torch.nn.Dropout(0.5), torch.nn.Linear(4 * self.input_size, 1), ) torch.nn.init.xavier_uniform_(self.model[0].weight) torch.nn.init.xavier_uniform_(self.model[3].weight)
[docs] def forward(self, x, x_tild): """ Predict the probability of the two inputs belonging to the same neighborhood. Parameters: ----------- - x (torch.Tensor): Input tensor of shape (batch_size, input_size). - x_tild (torch.Tensor): Input tensor of shape (batch_size, input_size). Returns: -------- - p (torch.Tensor): Output tensor of shape (batch_size,) representing the predicted probabilities. """ x_all = torch.cat([x, x_tild], -1) if self.max_pool: x_all = F.max_pool1d( x_all.transpose(1, 2).contiguous(), kernel_size=x_all.size(1) ).transpose(1, 2) p = self.model(x_all) return p.view((-1,))