Source code for dasf.transforms.transforms
#!/usr/bin/env python3
""" All the essential data transforms module. """
import math
import dask
import dask.dataframe as ddf
import h5py
import numpy as np
import pandas as pd
import zarr
from dasf.transforms.base import Transform
from dasf.utils.types import is_array, is_dask_array, is_dask_gpu_array
try:
import cudf
import cupy as cp
except ImportError: # pragma: no cover
pass
[docs]
class ExtractData(Transform):
"""
Extract data from Dataset object
"""
[docs]
def transform(self, X):
"""
Extract data from datasets that contains internal data.
Parameters
----------
X : Dataset-like
A dataset object that could be anything that contains an internal
structure representing the raw data.
Returns
-------
data : Any
Any representation of the internal Dataset data.
"""
if hasattr(X, "_data") and X._data is not None:
return X._data
raise ValueError("Data could not be extracted. Dataset needs to be previously loaded.")
[docs]
class Normalize(Transform):
"""
Normalize data object
"""
[docs]
def transform(self, X):
"""
Normalize the input data based on mean() and std().
Parameters
----------
X : Any
Any data that could be normalized based on mean and standard
deviation.
Returns
-------
data : Any
Normalized data
"""
return (X - X.mean()) / (X.std(ddof=0))
[docs]
class ArrayToZarr(Transform):
def __init__(self, chunks=None, save=True, filename=None):
self.chunks = chunks
# TODO: implement the possibility of not saving
self.save = True
self.filename = filename
[docs]
@staticmethod
def _convert_filename(url):
if url.endswith(".npy"):
return url.replace(".npy", ".zarr")
return url + ".zarr"
[docs]
def _lazy_transform_generic_all(self, data):
if self.filename:
url = self.filename
elif hasattr(data, '_root_file'):
url = data._root_file
else:
raise Exception("Array requires a valid path to convert to Zarr.")
if data is None:
raise Exception("Dataset needs to be loaded first.")
url = self._convert_filename(url)
# XXX: Workaround to avoid error with CuPy and Zarr library
if is_dask_gpu_array(data):
data = data.map_blocks(lambda x: x.get())
data.to_zarr(url)
return url
[docs]
def _transform_generic_all(self, data, chunks, **kwargs):
if data is None:
raise Exception("Dataset needs to be loaded first.")
if not chunks:
raise Exception("Chunks needs to be passed for non lazy arrays.")
if self.filename:
url = self.filename
else:
raise Exception("Array requires a valid path to convert to Zarr.")
url = self._convert_filename(url)
z = zarr.open(store=url, mode='w', shape=data.shape,
chunks=chunks, dtype='i4')
z = data
return url
[docs]
def _lazy_transform_generic(self, X, **kwargs):
# XXX: Avoid circular dependency
from dasf.datasets.base import DatasetArray, DatasetZarr
name = None
if isinstance(X, DatasetArray):
name = X._name
chunks = X._chunks
if not self.filename and hasattr(X, '_root_file'):
self.filename = X._root_file
url = self._lazy_transform_generic_all(X._data)
elif is_dask_array(X):
chunks = X.chunks
url = self._lazy_transform_generic_all(X)
else:
raise Exception("It is not an Array type.")
return DatasetZarr(name=name, download=False, root=url, chunks=chunks)
[docs]
def _transform_generic(self, X, **kwargs):
# XXX: Avoid circular dependency
from dasf.datasets.base import DatasetArray, DatasetZarr
name = None
url = None
if hasattr(X, '_chunks') and \
(X._chunks is not None and X._chunks != 'auto'):
chunks = X._chunks
else:
chunks = self.chunks
if chunks is None:
raise Exception("Chunks needs to be specified.")
if isinstance(X, DatasetArray):
name = X._name
if not self.filename and hasattr(X, '_root_file'):
self.filename = X._root_file
url = self._transform_generic_all(X._data, chunks)
elif is_array(X):
url = self._transform_generic_all(X, chunks)
else:
raise Exception("It is not an Array type.")
return DatasetZarr(name=name, download=False, root=url, chunks=chunks)
[docs]
class ArrayToHDF5(Transform):
def __init__(self, dataset_path, chunks=None, save=True, filename=None):
# Avoid circular dependency
from dasf.datasets.base import DatasetArray, DatasetHDF5
self.dataset_path = dataset_path
self.chunks = chunks
# TODO: implement the possibility of not saving
self.save = True
self.filename = filename
[docs]
@staticmethod
def _convert_filename(url):
if url.endswith(".npy"):
return url.replace(".npy", ".hdf5")
return url + ".hdf5"
[docs]
def _lazy_transform_generic_all(self, data):
if self.filename:
url = self.filename
elif hasattr(data, '_root_file'):
url = data._root_file
else:
raise Exception("Array requires a valid path to convert to HDF5.")
if data is None:
raise Exception("Dataset needs to be loaded first.")
url = self._convert_filename(url)
data.to_hdf5(url, self.dataset_path)
return url
[docs]
def _transform_generic_all(self, data):
if data is None:
raise Exception("Dataset needs to be loaded first.")
if self.filename:
url = self.filename
else:
raise Exception("Array requires a valid path to convert to Zarr.")
url = self._convert_filename(url)
h5f = h5py.File(url, 'w')
h5f.create_dataset(self.dataset_path, data=data)
h5f.close()
return url
[docs]
def _lazy_transform_generic(self, X, **kwargs):
# XXX: Avoid circular dependency
from dasf.datasets.base import DatasetArray, DatasetHDF5
name = None
chunks = None
if isinstance(X, DatasetArray):
name = X._name
chunks = X._chunks
if not self.filename and hasattr(X, '_root_file'):
self.filename = X._root_file
url = self._lazy_transform_generic_all(X._data)
elif is_dask_array(X):
chunks = X.chunks
url = self._lazy_transform_generic_all(X)
else:
raise Exception("It is not an Array type.")
return DatasetHDF5(name=name, download=False, root=url, chunks=chunks,
dataset_path=self.dataset_path)
[docs]
def _transform_generic(self, X, **kwargs):
# XXX: Avoid circular dependency
from dasf.datasets.base import DatasetArray, DatasetHDF5
name = None
url = None
if hasattr(X, '_chunks') and \
(X._chunks is not None and X._chunks != 'auto'):
chunks = X._chunks
else:
chunks = self.chunks
if isinstance(X, DatasetArray):
name = X._name
if not self.filename and hasattr(X, '_root_file'):
self.filename = X._root_file
url = self._transform_generic_all(X._data)
elif is_array(X):
url = self._transform_generic_all(X)
else:
raise Exception("It is not an Array type.")
return DatasetHDF5(name=name, download=False, root=url, chunks=chunks,
dataset_path=self.dataset_path)
[docs]
class ZarrToArray(Transform):
def __init__(self, chunks=None, save=True, filename=None):
self.chunks = chunks
self.save = save
self.filename = filename
[docs]
@staticmethod
def _convert_filename(url):
if url.endswith(".zarr"):
return url.replace(".zarr", ".npy")
return url + ".npy"
[docs]
def transform(self, X):
# XXX: Avoid circular dependency
from dasf.datasets.base import DatasetZarr
if issubclass(X.__class__, DatasetZarr):
if self.save:
if self.filename:
url = self.filename
elif hasattr(X, '_root_file'):
url = X._root_file
else:
raise Exception("Array requires a valid path to convert to Array.")
url = self._convert_filename(url)
np.save(url, X._data)
# This is just a place holder
return X._data
else:
raise Exception("Input is not a Zarr dataset.")
[docs]
class ArraysToDataFrame(Transform):
[docs]
def _build_dataframe(self, data, columns, xp, df):
data = [d.flatten() for d in data]
stacked_data = xp.stack(data, axis=1)
return df.DataFrame(
stacked_data,
columns=columns
)
[docs]
def _lazy_transform(self, xp, df, **kwargs):
X = list(kwargs.values())
y = list(kwargs.keys())
assert len(X) == len(y), "Data and labels should have the same length."
meta = ddf.utils.make_meta([
(col, data.dtype)
for col, data in zip(y, X)
],
parent_meta=None if df == pd else cudf.DataFrame
)
lazy_dataframe_build = dask.delayed(self._build_dataframe)
data_chunks = [x.to_delayed().ravel() for x in X]
partial_dataframes = [
ddf.from_delayed(lazy_dataframe_build(data=mapped_chunks, columns=y, xp=xp, df=df), meta=meta)
for mapped_chunks in zip(*data_chunks)
]
return ddf.concat(partial_dataframes)
[docs]
def _lazy_transform_cpu(self, X=None, **kwargs):
return self._lazy_transform(np, pd, **kwargs)
[docs]
def _lazy_transform_gpu(self, X=None, **kwargs):
return self._lazy_transform(cp, cudf, **kwargs)
[docs]
def _transform(self, xp, df, **kwargs):
X = list(kwargs.values())
y = list(kwargs.keys())
assert len(X) == len(y), "Data and labels should have the same length."
return self._build_dataframe(data=X, columns=y, xp=xp, df=df)