Source code for dasf.transforms.transforms

#!/usr/bin/env python3

""" All the essential data transforms module. """

import math

import dask
import dask.dataframe as ddf
import h5py
import numpy as np
import pandas as pd
import zarr

from dasf.transforms.base import Transform
from dasf.utils.types import is_array, is_dask_array, is_dask_gpu_array

try:
    import cudf
    import cupy as cp
except ImportError: # pragma: no cover
    pass


[docs] class ExtractData(Transform): """ Extract data from Dataset object """
[docs] def transform(self, X): """ Extract data from datasets that contains internal data. Parameters ---------- X : Dataset-like A dataset object that could be anything that contains an internal structure representing the raw data. Returns ------- data : Any Any representation of the internal Dataset data. """ if hasattr(X, "_data") and X._data is not None: return X._data raise ValueError("Data could not be extracted. Dataset needs to be previously loaded.")
[docs] class Normalize(Transform): """ Normalize data object """
[docs] def transform(self, X): """ Normalize the input data based on mean() and std(). Parameters ---------- X : Any Any data that could be normalized based on mean and standard deviation. Returns ------- data : Any Normalized data """ return (X - X.mean()) / (X.std(ddof=0))
[docs] class ArrayToZarr(Transform): def __init__(self, chunks=None, save=True, filename=None): self.chunks = chunks # TODO: implement the possibility of not saving self.save = True self.filename = filename
[docs] @staticmethod def _convert_filename(url): if url.endswith(".npy"): return url.replace(".npy", ".zarr") return url + ".zarr"
[docs] def _lazy_transform_generic_all(self, data): if self.filename: url = self.filename elif hasattr(data, '_root_file'): url = data._root_file else: raise Exception("Array requires a valid path to convert to Zarr.") if data is None: raise Exception("Dataset needs to be loaded first.") url = self._convert_filename(url) # XXX: Workaround to avoid error with CuPy and Zarr library if is_dask_gpu_array(data): data = data.map_blocks(lambda x: x.get()) data.to_zarr(url) return url
[docs] def _transform_generic_all(self, data, chunks, **kwargs): if data is None: raise Exception("Dataset needs to be loaded first.") if not chunks: raise Exception("Chunks needs to be passed for non lazy arrays.") if self.filename: url = self.filename else: raise Exception("Array requires a valid path to convert to Zarr.") url = self._convert_filename(url) z = zarr.open(store=url, mode='w', shape=data.shape, chunks=chunks, dtype='i4') z = data return url
[docs] def _lazy_transform_generic(self, X, **kwargs): # XXX: Avoid circular dependency from dasf.datasets.base import DatasetArray, DatasetZarr name = None if isinstance(X, DatasetArray): name = X._name chunks = X._chunks if not self.filename and hasattr(X, '_root_file'): self.filename = X._root_file url = self._lazy_transform_generic_all(X._data) elif is_dask_array(X): chunks = X.chunks url = self._lazy_transform_generic_all(X) else: raise Exception("It is not an Array type.") return DatasetZarr(name=name, download=False, root=url, chunks=chunks)
[docs] def _transform_generic(self, X, **kwargs): # XXX: Avoid circular dependency from dasf.datasets.base import DatasetArray, DatasetZarr name = None url = None if hasattr(X, '_chunks') and \ (X._chunks is not None and X._chunks != 'auto'): chunks = X._chunks else: chunks = self.chunks if chunks is None: raise Exception("Chunks needs to be specified.") if isinstance(X, DatasetArray): name = X._name if not self.filename and hasattr(X, '_root_file'): self.filename = X._root_file url = self._transform_generic_all(X._data, chunks) elif is_array(X): url = self._transform_generic_all(X, chunks) else: raise Exception("It is not an Array type.") return DatasetZarr(name=name, download=False, root=url, chunks=chunks)
[docs] def _lazy_transform_gpu(self, X, **kwargs): return self._lazy_transform_generic(X, **kwargs)
[docs] def _lazy_transform_cpu(self, X, **kwargs): return self._lazy_transform_generic(X, **kwargs)
[docs] def _transform_gpu(self, X, **kwargs): return self._transform_generic(X, **kwargs)
[docs] def _transform_cpu(self, X, **kwargs): return self._transform_generic(X, **kwargs)
[docs] class ArrayToHDF5(Transform): def __init__(self, dataset_path, chunks=None, save=True, filename=None): # Avoid circular dependency from dasf.datasets.base import DatasetArray, DatasetHDF5 self.dataset_path = dataset_path self.chunks = chunks # TODO: implement the possibility of not saving self.save = True self.filename = filename
[docs] @staticmethod def _convert_filename(url): if url.endswith(".npy"): return url.replace(".npy", ".hdf5") return url + ".hdf5"
[docs] def _lazy_transform_generic_all(self, data): if self.filename: url = self.filename elif hasattr(data, '_root_file'): url = data._root_file else: raise Exception("Array requires a valid path to convert to HDF5.") if data is None: raise Exception("Dataset needs to be loaded first.") url = self._convert_filename(url) data.to_hdf5(url, self.dataset_path) return url
[docs] def _transform_generic_all(self, data): if data is None: raise Exception("Dataset needs to be loaded first.") if self.filename: url = self.filename else: raise Exception("Array requires a valid path to convert to Zarr.") url = self._convert_filename(url) h5f = h5py.File(url, 'w') h5f.create_dataset(self.dataset_path, data=data) h5f.close() return url
[docs] def _lazy_transform_generic(self, X, **kwargs): # XXX: Avoid circular dependency from dasf.datasets.base import DatasetArray, DatasetHDF5 name = None chunks = None if isinstance(X, DatasetArray): name = X._name chunks = X._chunks if not self.filename and hasattr(X, '_root_file'): self.filename = X._root_file url = self._lazy_transform_generic_all(X._data) elif is_dask_array(X): chunks = X.chunks url = self._lazy_transform_generic_all(X) else: raise Exception("It is not an Array type.") return DatasetHDF5(name=name, download=False, root=url, chunks=chunks, dataset_path=self.dataset_path)
[docs] def _transform_generic(self, X, **kwargs): # XXX: Avoid circular dependency from dasf.datasets.base import DatasetArray, DatasetHDF5 name = None url = None if hasattr(X, '_chunks') and \ (X._chunks is not None and X._chunks != 'auto'): chunks = X._chunks else: chunks = self.chunks if isinstance(X, DatasetArray): name = X._name if not self.filename and hasattr(X, '_root_file'): self.filename = X._root_file url = self._transform_generic_all(X._data) elif is_array(X): url = self._transform_generic_all(X) else: raise Exception("It is not an Array type.") return DatasetHDF5(name=name, download=False, root=url, chunks=chunks, dataset_path=self.dataset_path)
[docs] def _lazy_transform_gpu(self, X, **kwargs): return self._lazy_transform_generic(X, **kwargs)
[docs] def _lazy_transform_cpu(self, X, **kwargs): return self._lazy_transform_generic(X, **kwargs)
[docs] def _transform_gpu(self, X, **kwargs): return self._transform_generic(X, **kwargs)
[docs] def _transform_cpu(self, X, **kwargs): return self._transform_generic(X, **kwargs)
[docs] class ZarrToArray(Transform): def __init__(self, chunks=None, save=True, filename=None): self.chunks = chunks self.save = save self.filename = filename
[docs] @staticmethod def _convert_filename(url): if url.endswith(".zarr"): return url.replace(".zarr", ".npy") return url + ".npy"
[docs] def transform(self, X): # XXX: Avoid circular dependency from dasf.datasets.base import DatasetZarr if issubclass(X.__class__, DatasetZarr): if self.save: if self.filename: url = self.filename elif hasattr(X, '_root_file'): url = X._root_file else: raise Exception("Array requires a valid path to convert to Array.") url = self._convert_filename(url) np.save(url, X._data) # This is just a place holder return X._data else: raise Exception("Input is not a Zarr dataset.")
[docs] class ArraysToDataFrame(Transform):
[docs] def _build_dataframe(self, data, columns, xp, df): data = [d.flatten() for d in data] stacked_data = xp.stack(data, axis=1) return df.DataFrame( stacked_data, columns=columns )
[docs] def _lazy_transform(self, xp, df, **kwargs): X = list(kwargs.values()) y = list(kwargs.keys()) assert len(X) == len(y), "Data and labels should have the same length." meta = ddf.utils.make_meta([ (col, data.dtype) for col, data in zip(y, X) ], parent_meta=None if df == pd else cudf.DataFrame ) lazy_dataframe_build = dask.delayed(self._build_dataframe) data_chunks = [x.to_delayed().ravel() for x in X] partial_dataframes = [ ddf.from_delayed(lazy_dataframe_build(data=mapped_chunks, columns=y, xp=xp, df=df), meta=meta) for mapped_chunks in zip(*data_chunks) ] return ddf.concat(partial_dataframes)
[docs] def _lazy_transform_cpu(self, X=None, **kwargs): return self._lazy_transform(np, pd, **kwargs)
[docs] def _lazy_transform_gpu(self, X=None, **kwargs): return self._lazy_transform(cp, cudf, **kwargs)
[docs] def _transform(self, xp, df, **kwargs): X = list(kwargs.values()) y = list(kwargs.keys()) assert len(X) == len(y), "Data and labels should have the same length." return self._build_dataframe(data=X, columns=y, xp=xp, df=df)
[docs] def _transform_cpu(self, X=None, **kwargs): return self._transform(np, pd, **kwargs)
[docs] def _transform_gpu(self, X=None, **kwargs): return self._transform(cp, cudf, **kwargs)