{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "### Tutorial 7 - Trainig a PyTorch Lightning model\n", "\n", "In this tutorial, we will train a simple U-Net model for regression using PyTorch Lightning. For sake of simplicity, we generate random data and label in order to train.\n", "The data is a 1-channel 2D image, with shape `(1, 128, 128)`. As it is a regression task, the label is a 1-channel 2D image with the same shape.\n", "\n", "Thus, in this tutorial, we will:\n", "1. Generate 16 samples of random data and label, with shape `(1, 128, 128)`.\n", "2. Create a DASF map-style like dataset, named `LabeledDataset`. This dataset implements the `__getitem__` and `__len__` methods. The `__getitem__` method returns 2-element tuple with the data and label. The return data may be in numpy, cupy or dask array format. Note that we can create complex pipelines of data using DASF operator. For now, we use only the `DatasetArray` operator, but we can chain other Dasd operator. It is important that, the input to `LightningTrainer` is a map-style dataset that returns a 2-element tuple for each index.\n", "3. Create a U-Net model.\n", "4. Train the model using PyTorch Lightning and Dasf.\n", "\n", "\n", "#### Considerations and still going work\n", "\n", "- We still cannot use the GPU in this notebook environments, but it works fine in python scripts.\n", "- This tutorial is a work in progress and will be updated with more details and explanations." ] }, { "cell_type": "code", "execution_count": 1, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "[1715635051.633686] [d78f44b0045f:277372:f] vfs_fuse.c:281 UCX ERROR inotify_add_watch(/tmp) failed: No space left on device\n" ] } ], "source": [ "from minerva.models.nets.unet import UNet\n", "from dasf.datasets import Dataset, DatasetArray\n", "from dasf.pipeline import Pipeline\n", "from dasf.pipeline.executors import DaskPipelineExecutor\n", "import lightning as L\n", "from dasf.ml.dl import LightningTrainer\n", "import numpy as np\n", "from typing import List, Tuple" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Let's generate random data. The label will be data with a random noise." ] }, { "cell_type": "code", "execution_count": 2, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "(16, 1, 128, 128) (16, 1, 128, 128)\n" ] } ], "source": [ "num_samples = 16\n", "data_path = \"data.npy\"\n", "labels_path = \"labels.npy\"\n", "\n", "data = np.random.rand(num_samples, 1, 128, 128)\n", "noise = np.random.normal(0,1,size=(num_samples, 1, 128, 128))\n", "label = data + noise\n", "\n", "print(data.shape, label.shape)" ] }, { "cell_type": "code", "execution_count": 3, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Data saved to data.npy. Labels saved to labels.npy\n" ] } ], "source": [ "np.save(data_path, data)\n", "np.save(labels_path, label)\n", "print(f\"Data saved to {data_path}. Labels saved to {labels_path}\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "We now going to create a Dataset using DASF. The dataset class will load the data and label from the numpy array using the `DatasetArray` operator. The `__getitem__` method will return a 2-element tuple with the data and label." ] }, { "cell_type": "code", "execution_count": 4, "metadata": {}, "outputs": [], "source": [ "class LabeledDataset(Dataset):\n", " \"\"\"A Dasf dataset that loads data and labels from numpy files, using the\n", " DatasetArray class. This class implements the __getitem__ method to return\n", " a tuple of (data, label) for a given index.\n", " \"\"\"\n", "\n", " def __init__(\n", " self,\n", " original_path: str,\n", " label_path: str,\n", " chunks: Tuple[int, int, int] = (1, -1, -1, -1),\n", " ):\n", " \"\"\"Create a Dasf dataset that loads data and labels from numpy files\n", " using the DatasetArray class.\n", "\n", " Parameters\n", " ----------\n", " original_path : str\n", " The path to the numpy file containing the data.\n", " label_path : str\n", " The path to the numpy file containing the labels.\n", " chunks : Tuple[int, int, int], optional\n", " Chunk size. We will operate over a single sample \n", " (1-channel 2D image), by default (1, -1, -1, -1).\n", " \"\"\"\n", " self.original = DatasetArray(\n", " name=\"input\", root=original_path, chunks=chunks\n", " )\n", " self.label = DatasetArray(name=\"label\", root=label_path, chunks=chunks)\n", "\n", " def load(self):\n", " self.original.load()\n", " self.label.load()\n", " return self\n", "\n", " def _lazy_load_cpu(self):\n", " return self.load()\n", "\n", " def _load_cpu(self):\n", " return self.load()\n", "\n", " def _lazy_load_gpu(self):\n", " return self.load()\n", "\n", " def _load_gpu(self):\n", " return self.load()\n", "\n", " def __len__(self):\n", " return len(self.original)\n", "\n", " def __getitem__(self, idx):\n", " return self.original[idx], self.label[idx]" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Now, let's create the UNet class for regression. We will train using the Mean Squared Error loss and the Adam optimizer.\n", "\n", "The foward method receives a batch of (data, label), both of same shape, and returns the regression output." ] }, { "cell_type": "code", "execution_count": 5, "metadata": {}, "outputs": [], "source": [ "\"\"\" Full assembly of the parts to form the complete network \"\"\"\n", "\n", "import time\n", "from typing import Dict, Optional\n", "\n", "import lightning as L\n", "import torch\n", "import torch.nn as nn\n", "import torch.nn.functional as F\n", "import torch.optim as optim\n", "from torch.optim.lr_scheduler import CyclicLR, StepLR\n", "\n", "\"\"\" -------------- Parts of the U-Net model --------------\"\"\"\n", "class _DoubleConv(nn.Module):\n", " \"\"\"(convolution => [BN] => ReLU) * 2\"\"\"\n", "\n", " \"\"\"\n", " Performs two convolutions with the same number\n", " of input and output channels, followed by batch normalization and ReLU activation\n", " \"\"\"\n", "\n", " def __init__(self, in_channels, out_channels, mid_channels=None):\n", " \"\"\"\n", "\n", " Parameters\n", " ----------\n", " in_channels : int\n", " Number of input channels, i.e. the number of channels in the input image (1 for grayscale, 3 for RGB)\n", " out_channels : int\n", " Number of output channels, i.e. the number of channels produced by the convolution\n", " mid_channels : int, optional\n", " Number of channels in the middle, by default None\n", "\n", " \"\"\"\n", " super().__init__()\n", " if not mid_channels:\n", " mid_channels = out_channels\n", " self.double_conv = nn.Sequential(\n", " nn.Conv2d(\n", " in_channels, mid_channels, kernel_size=3, padding=1, bias=False\n", " ), # no need to add bias since BatchNorm2d will do that\n", " nn.BatchNorm2d(mid_channels), # normalize the output of the previous layer\n", " nn.ReLU(\n", " inplace=True\n", " ), # inplace=True will modify the input directly instead of allocating new memory\n", " nn.Conv2d(mid_channels, out_channels, kernel_size=3, padding=1, bias=False),\n", " nn.BatchNorm2d(out_channels),\n", " nn.ReLU(inplace=True),\n", " )\n", "\n", " def forward(self, x):\n", " return self.double_conv(x)\n", "\n", "\n", "class _Down(nn.Module):\n", " \"\"\"Downscaling with maxpool then double conv\"\"\"\n", "\n", " def __init__(self, in_channels, out_channels):\n", " super().__init__()\n", " self.maxpool_conv = nn.Sequential(\n", " nn.MaxPool2d(2), _DoubleConv(in_channels, out_channels)\n", " )\n", "\n", " def forward(self, x):\n", " return self.maxpool_conv(x)\n", "\n", "\n", "class _Up(nn.Module):\n", " \"\"\"Upscaling then double conv\"\"\"\n", "\n", " def __init__(self, in_channels, out_channels, bilinear=True):\n", " super().__init__()\n", "\n", " # if bilinear, use the normal convolutions to reduce the number of channels\n", " if bilinear:\n", " self.up = nn.Upsample(scale_factor=2, mode=\"bilinear\", align_corners=True)\n", " self.conv = _DoubleConv(in_channels, out_channels, in_channels // 2)\n", " else:\n", " self.up = nn.ConvTranspose2d(\n", " in_channels, in_channels // 2, kernel_size=2, stride=2\n", " )\n", " self.conv = _DoubleConv(in_channels, out_channels)\n", "\n", " def forward(self, x1, x2):\n", " x1 = self.up(x1)\n", " # input is CHW (channel, height, width)\n", " diffY = x2.size()[2] - x1.size()[2]\n", " diffX = x2.size()[3] - x1.size()[3]\n", "\n", " # pad the input tensor on all sides with the given \"pad\" value\n", " x1 = F.pad(x1, [diffX // 2, diffX - diffX // 2, diffY // 2, diffY - diffY // 2])\n", " # if you have padding issues, see\n", " # https://github.com/HaiyongJiang/U-Net-Pytorch-Unstructured-Buggy/commit/0e854509c2cea854e247a9c615f175f76fbb2e3a\n", " # https://github.com/xiaopeng-liao/Pytorch-UNet/commit/8ebac70e633bac59fc22bb5195e513d5832fb3bd\n", " x = torch.cat([x2, x1], dim=1)\n", " return self.conv(x)\n", "\n", "\n", "class _OutConv(nn.Module):\n", " def __init__(self, in_channels, out_channels):\n", " super(_OutConv, self).__init__()\n", " self.conv = nn.Conv2d(in_channels, out_channels, kernel_size=1)\n", "\n", " def forward(self, x):\n", " return self.conv(x)\n", "\n", "\n", "\"\"\" -------------- The U-Net model --------------\"\"\"\n", "class _UNet(torch.nn.Module):\n", " \"\"\"Implementation of U-Net model.\"\"\"\n", "\n", " def __init__(\n", " self,\n", " n_channels: int = 1,\n", " bilinear: bool = False,\n", " ):\n", " \"\"\"Implementation of U-Net model.\n", "\n", " Parameters\n", " ----------\n", " n_channels : int, optional\n", " Number of input channels, by default 1\n", " bilinear : bool, optional\n", " If `True` use bilinear interpolation for upsampling, by default\n", " False.\n", " \"\"\"\n", " super().__init__()\n", " factor = 2 if bilinear else 1\n", "\n", " self.n_channels = n_channels\n", " self.bilinear = bilinear\n", "\n", " self.inc = _DoubleConv(n_channels, 64)\n", " self.down1 = _Down(64, 128)\n", " self.down2 = _Down(128, 256)\n", " self.down3 = _Down(256, 512)\n", " self.down4 = _Down(512, 1024 // factor)\n", " self.up1 = _Up(1024, 512 // factor, bilinear)\n", " self.up2 = _Up(512, 256 // factor, bilinear)\n", " self.up3 = _Up(256, 128 // factor, bilinear)\n", " self.up4 = _Up(128, 64, bilinear)\n", " # self.outc = (OutConv(64, n_classes))\n", " self.outc = _OutConv(64, 1)\n", "\n", " def forward(self, x):\n", " x1 = self.inc(x)\n", " x2 = self.down1(x1)\n", " x3 = self.down2(x2)\n", " x4 = self.down3(x3)\n", " x5 = self.down4(x4)\n", " x = self.up1(x5, x4)\n", " x = self.up2(x, x3)\n", " x = self.up3(x, x2)\n", " x = self.up4(x, x1)\n", " logits = self.outc(x)\n", " return logits\n", " \n", "\"\"\" -------------- The U-Net Lightning model --------------\"\"\"\n", "class UNet(L.LightningModule):\n", " \"\"\"This class is a simple implementation of the U-Net model, which is a\n", " convolutional neural network used for image segmentation. The model consists\n", " of a contracting path (encoder) and an expansive path (decoder). The\n", " contracting path follows the typical architecture of a convolutional neural\n", " network, with repeated applications of convolutions and max pooling layers.\n", " The expansive path consists of up-convolutions and concatenation of feature\n", " maps from the contracting path. The model also has skip connections, which\n", " allows the expansive path to use information from the contracting path at\n", " multiple resolutions. The U-Net model was originally proposed by\n", " Ronneberger, Fischer, and Brox in 2015.\n", "\n", " This architecture, handles arbitrary input sizes, and returns an output of\n", " the same size as the input. The expected input size is (N, C, H, W), where N\n", " is the batch size, C is the number of channels, H is the height of the input\n", " image, and W is the width of the input image.\n", "\n", " Note that, for this implementation, the input batch is a single tensor and\n", " not a tuple of tensors (e.g., data and label).\n", "\n", " Note that this class wrappers the `_UNet` class, which is the actual\n", " implementation of the U-Net model, into a `SimpleReconstructionNet` class,\n", " which is a simple autoencoder pipeline for reconstruction tasks.\n", "\n", " References\n", " ----------\n", " Ronneberger, Olaf, Philipp Fischer, and Thomas Brox. \"U-net: Convolutional\n", " networks for biomedical image segmentation.\" Medical Image Computing and\n", " Computer-Assisted Intervention-MICCAI 2015: 18th International Conference,\n", " Munich, Germany, October 5-9, 2015, Proceedings, Part III 18. Springer\n", " International Publishing, 2015.\n", " \"\"\"\n", "\n", " def __init__(\n", " self,\n", " n_channels: int = 1,\n", " bilinear: bool = False,\n", " learning_rate: float = 1e-3,\n", " ):\n", " \"\"\"Wrapper implementation of the U-Net model.\n", "\n", " Parameters\n", " ----------\n", " n_channels : int, optional\n", " The number of channels of the input, by default 1\n", " bilinear : bool, optional\n", " If `True` use bilinear interpolation for upsampling, by default\n", " False.\n", " \"\"\"\n", " super().__init__()\n", " self.model = _UNet(n_channels, bilinear)\n", " self.loss_fn = nn.MSELoss() \n", " self.learning_rate = learning_rate\n", " \n", " def forward(self, x: torch.Tensor) -> torch.Tensor:\n", " \"\"\"Perform a forward pass with the input data on the backbone model.\n", "\n", " Parameters\n", " ----------\n", " x : torch.Tensor\n", " The input data.\n", "\n", " Returns\n", " -------\n", " torch.Tensor\n", " The output data from the forward pass.\n", " \"\"\"\n", " x = self.model(x)\n", " x = x.view(x.size(0), -1)\n", " return x\n", " \n", " def training_step(self, batch: torch.Tensor, batch_idx: int):\n", " x, y = batch\n", " y_hat = self.forward(x)\n", " loss = self.loss_fn(y_hat, y)\n", " self.log(\"train_loss\", loss)\n", " return loss\n", "\n", " def validation_step(self, batch: torch.Tensor, batch_idx: int):\n", " x, y = batch\n", " y_hat = self.forward(x)\n", " loss = self.loss_fn(y_hat, y)\n", " self.log(\"val_loss\", loss)\n", " return loss\n", "\n", " def test_step(self, batch: torch.Tensor, batch_idx: int):\n", " x, y = batch\n", " y_hat = self.forward(x)\n", " loss = self.loss_fn(y_hat, y)\n", " self.log(\"test_loss\", loss)\n", " return loss\n", " \n", " def configure_optimizers(self):\n", " optimizer = torch.optim.Adam(\n", " self.parameters(),\n", " lr=self.learning_rate,\n", " )\n", " return optimizer" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Instantiate the model, operators and construct the pipeline" ] }, { "cell_type": "code", "execution_count": 6, "metadata": {}, "outputs": [ { "name": "stderr", "output_type": "stream", "text": [ "/usr/local/lib/python3.10/dist-packages/distributed/node.py:182: UserWarning: Port 8787 is already in use.\n", "Perhaps you already have a cluster running?\n", "Hosting the HTTP server on port 44311 instead\n", " warnings.warn(\n", "2024-05-13 21:17:38,140 - distributed.diskutils - INFO - Found stale lock file and directory '/tmp/dask-worker-space/worker-si4r7jcw', purging\n", "2024-05-13 21:17:38,140 - distributed.diskutils - INFO - Found stale lock file and directory '/tmp/dask-worker-space/worker-h6b2kafg', purging\n", "2024-05-13 21:17:38,140 - distributed.diskutils - INFO - Found stale lock file and directory '/tmp/dask-worker-space/worker-e8sr3pya', purging\n", "2024-05-13 21:17:38,140 - distributed.diskutils - INFO - Found stale lock file and directory '/tmp/dask-worker-space/worker-3lb3gqoz', purging\n", "2024-05-13 21:17:38,141 - distributed.diskutils - INFO - Found stale lock file and directory '/tmp/dask-worker-space/worker-4970vq0z', purging\n", "2024-05-13 21:17:38,141 - distributed.diskutils - INFO - Found stale lock file and directory '/tmp/dask-worker-space/worker-lvnvnaul', purging\n", "2024-05-13 21:17:38,141 - distributed.diskutils - INFO - Found stale lock file and directory '/tmp/dask-worker-space/worker-hvaldupk', purging\n", "2024-05-13 21:17:38,141 - distributed.diskutils - INFO - Found stale lock file and directory '/tmp/dask-worker-space/worker-tl32k0yc', purging\n" ] }, { "data": { "image/svg+xml": [ "\n", "\n", "\n", "\n", "\n", "\n", "pipeline\n", "\n", "\n", "\n", "25397\n", "\n", "LightningTrainer.fit\n", "\n", "\n", "\n", "25260\n", "\n", "LabeledDataset.load\n", "\n", "\n", "\n", "25260->25397\n", "\n", "\n", "train_data\n", "\n", "\n", "\n" ], "text/plain": [ "" ] }, "execution_count": 6, "metadata": {}, "output_type": "execute_result" } ], "source": [ "# Instantiate U-Net model\n", "model = UNet()\n", "\n", "# ------------ DASF OPERATORS ------------ \n", "dataset = LabeledDataset(data_path, labels_path)\n", "trainer = LightningTrainer(\n", " model=model,\n", " use_gpu=False, # Do not use GPUs (does not working on jupyter notebook)\n", " max_epochs=1,\n", " strategy=\"ddp_notebook\", # Use DDP strategy (needed only when using jupyter notebooks)\n", " batch_size=8,\n", ")\n", "\n", "# ------------ DASF PIPELINE ------------\n", "executor = DaskPipelineExecutor(\n", " local=True, use_gpu=False # Do not use GPUs (does not working on jupyter notebook)\n", ")\n", "pipeline = Pipeline(\n", " name=\"pipeline\",\n", " executor=executor,\n", " verbose=True,\n", ")\n", "pipeline.add(trainer.fit, train_data=dataset)\n", "\n", "pipeline.visualize()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Finally, run it!" ] }, { "cell_type": "code", "execution_count": 7, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "[2024-05-13 21:17:38+0000] INFO - Beginning pipeline run for 'pipeline'\n", "[2024-05-13 21:17:38+0000] INFO - Task 'LabeledDataset.load': Starting task run...\n", "[2024-05-13 21:17:38+0000] INFO - Task 'LabeledDataset.load': Finished task run\n", "[2024-05-13 21:17:38+0000] INFO - Task 'LightningTrainer.fit': Starting task run...\n" ] }, { "name": "stderr", "output_type": "stream", "text": [ "GPU available: False, used: False\n", "TPU available: False, using: 0 TPU cores\n", "IPU available: False, using: 0 IPUs\n", "HPU available: False, using: 0 HPUs\n", "/usr/local/lib/python3.10/dist-packages/lightning/pytorch/trainer/configuration_validator.py:74: You defined a `validation_step` but have no `val_dataloader`. Skipping val loop.\n", "Initializing distributed: GLOBAL_RANK: 0, MEMBER: 1/1\n", "----------------------------------------------------------------------------------------------------\n", "distributed_backend=gloo\n", "All distributed processes registered. Starting with 1 processes\n", "----------------------------------------------------------------------------------------------------\n", "\n", "\n", " | Name | Type | Params\n", "------------------------------------\n", "0 | model | _UNet | 31.0 M\n", "1 | loss_fn | MSELoss | 0 \n", "------------------------------------\n", "31.0 M Trainable params\n", "0 Non-trainable params\n", "31.0 M Total params\n", "124.146 Total estimated model params size (MB)\n", "/usr/local/lib/python3.10/dist-packages/lightning/pytorch/trainer/connectors/data_connector.py:441: The 'train_dataloader' does not have many workers which may be a bottleneck. Consider increasing the value of the `num_workers` argument` to `num_workers=47` in the `DataLoader` to improve performance.\n", "/usr/local/lib/python3.10/dist-packages/lightning/pytorch/loops/fit_loop.py:298: The number of training batches (2) is smaller than the logging interval Trainer(log_every_n_steps=50). Set a lower value for log_every_n_steps if you want to see logs for the training epoch.\n" ] }, { "data": { "application/vnd.jupyter.widget-view+json": { "model_id": "f18df0fdcea54746974cc05af1f82b27", "version_major": 2, "version_minor": 0 }, "text/plain": [ "Training: | | 0/? [00:00