Source code for dasf.transforms.transforms
#!/usr/bin/env python3
import math
import dask
import dask.dataframe as ddf
import h5py
import numpy as np
import pandas as pd
import zarr
from dasf.transforms.base import Transform
from dasf.utils.types import is_array, is_dask_array, is_dask_gpu_array
try:
import cudf
import cupy as cp
except ImportError: # pragma: no cover
pass
[docs]
class ExtractData(Transform):
"""Extract Data from Dataset Object
"""
[docs]
def transform(self, X):
if hasattr(X, "_data") and X._data is not None:
return X._data
raise ValueError("Data could not be extracted. Dataset needs to be previously loaded.")
[docs]
class ArrayToZarr(Transform):
def __init__(self, chunks=None, save=True, filename=None):
self.chunks = chunks
# TODO: implement the possibility of not saving
self.save = True
self.filename = filename
[docs]
@staticmethod
def _convert_filename(url):
if url.endswith(".npy"):
return url.replace(".npy", ".zarr")
return url + ".zarr"
[docs]
def _lazy_transform_generic_all(self, data):
if self.filename:
url = self.filename
elif hasattr(data, '_root_file'):
url = data._root_file
else:
raise Exception("Array requires a valid path to convert to Zarr.")
if data is None:
raise Exception("Dataset needs to be loaded first.")
url = self._convert_filename(url)
# XXX: Workaround to avoid error with CuPy and Zarr library
if is_dask_gpu_array(data):
data = data.map_blocks(lambda x: x.get())
data.to_zarr(url)
return url
[docs]
def _transform_generic_all(self, data, chunks, **kwargs):
if data is None:
raise Exception("Dataset needs to be loaded first.")
if not chunks:
raise Exception("Chunks needs to be passed for non lazy arrays.")
if self.filename:
url = self.filename
else:
raise Exception("Array requires a valid path to convert to Zarr.")
url = self._convert_filename(url)
z = zarr.open(store=url, mode='w', shape=data.shape,
chunks=chunks, dtype='i4')
z = data
return url
[docs]
def _lazy_transform_generic(self, X, **kwargs):
# XXX: Avoid circular dependency
from dasf.datasets.base import DatasetArray, DatasetZarr
name = None
if isinstance(X, DatasetArray):
name = X._name
chunks = X._chunks
if not self.filename and hasattr(X, '_root_file'):
self.filename = X._root_file
url = self._lazy_transform_generic_all(X._data)
elif is_dask_array(X):
chunks = X.chunks
url = self._lazy_transform_generic_all(X)
else:
raise Exception("It is not an Array type.")
return DatasetZarr(name=name, download=False, root=url, chunks=chunks)
[docs]
def _transform_generic(self, X, **kwargs):
# XXX: Avoid circular dependency
from dasf.datasets.base import DatasetArray, DatasetZarr
name = None
url = None
if hasattr(X, '_chunks') and \
(X._chunks is not None and X._chunks != 'auto'):
chunks = X._chunks
else:
chunks = self.chunks
if chunks is None:
raise Exception("Chunks needs to be specified.")
if isinstance(X, DatasetArray):
name = X._name
if not self.filename and hasattr(X, '_root_file'):
self.filename = X._root_file
url = self._transform_generic_all(X._data, chunks)
elif is_array(X):
url = self._transform_generic_all(X, chunks)
else:
raise Exception("It is not an Array type.")
return DatasetZarr(name=name, download=False, root=url, chunks=chunks)
[docs]
class ArrayToHDF5(Transform):
def __init__(self, dataset_path, chunks=None, save=True, filename=None):
# Avoid circular dependency
from dasf.datasets.base import DatasetArray, DatasetHDF5
self.dataset_path = dataset_path
self.chunks = chunks
# TODO: implement the possibility of not saving
self.save = True
self.filename = filename
[docs]
@staticmethod
def _convert_filename(url):
if url.endswith(".npy"):
return url.replace(".npy", ".hdf5")
return url + ".hdf5"
[docs]
def _lazy_transform_generic_all(self, data):
if self.filename:
url = self.filename
elif hasattr(data, '_root_file'):
url = data._root_file
else:
raise Exception("Array requires a valid path to convert to HDF5.")
if data is None:
raise Exception("Dataset needs to be loaded first.")
url = self._convert_filename(url)
data.to_hdf5(url, self.dataset_path)
return url
[docs]
def _transform_generic_all(self, data):
if data is None:
raise Exception("Dataset needs to be loaded first.")
if self.filename:
url = self.filename
else:
raise Exception("Array requires a valid path to convert to Zarr.")
url = self._convert_filename(url)
h5f = h5py.File(url, 'w')
h5f.create_dataset(self.dataset_path, data=data)
h5f.close()
return url
[docs]
def _lazy_transform_generic(self, X, **kwargs):
# XXX: Avoid circular dependency
from dasf.datasets.base import DatasetArray, DatasetHDF5
name = None
chunks = None
if isinstance(X, DatasetArray):
name = X._name
chunks = X._chunks
if not self.filename and hasattr(X, '_root_file'):
self.filename = X._root_file
url = self._lazy_transform_generic_all(X._data)
elif is_dask_array(X):
chunks = X.chunks
url = self._lazy_transform_generic_all(X)
else:
raise Exception("It is not an Array type.")
return DatasetHDF5(name=name, download=False, root=url, chunks=chunks,
dataset_path=self.dataset_path)
[docs]
def _transform_generic(self, X, **kwargs):
# XXX: Avoid circular dependency
from dasf.datasets.base import DatasetArray, DatasetHDF5
name = None
url = None
if hasattr(X, '_chunks') and \
(X._chunks is not None and X._chunks != 'auto'):
chunks = X._chunks
else:
chunks = self.chunks
if isinstance(X, DatasetArray):
name = X._name
if not self.filename and hasattr(X, '_root_file'):
self.filename = X._root_file
url = self._transform_generic_all(X._data)
elif is_array(X):
url = self._transform_generic_all(X)
else:
raise Exception("It is not an Array type.")
return DatasetHDF5(name=name, download=False, root=url, chunks=chunks,
dataset_path=self.dataset_path)
[docs]
class ZarrToArray(Transform):
def __init__(self, chunks=None, save=True, filename=None):
self.chunks = chunks
self.save = save
self.filename = filename
[docs]
@staticmethod
def _convert_filename(url):
if url.endswith(".zarr"):
return url.replace(".zarr", ".npy")
return url + ".npy"
[docs]
def transform(self, X):
# XXX: Avoid circular dependency
from dasf.datasets.base import DatasetZarr
if issubclass(X.__class__, DatasetZarr):
if self.save:
if self.filename:
url = self.filename
elif hasattr(X, '_root_file'):
url = X._root_file
else:
raise Exception("Array requires a valid path to convert to Array.")
url = self._convert_filename(url)
np.save(url, X._data)
# This is just a place holder
return X._data
else:
raise Exception("Input is not a Zarr dataset.")
[docs]
class ArraysToDataFrame(Transform):
[docs]
def _build_dataframe(self, data, columns, xp, df):
data = [d.flatten() for d in data]
stacked_data = xp.stack(data, axis=1)
return df.DataFrame(
stacked_data,
columns=columns
)
[docs]
def _lazy_transform(self, xp, df, **kwargs):
X = list(kwargs.values())
y = list(kwargs.keys())
assert len(X) == len(y), "Data and labels should have the same length."
meta = ddf.utils.make_meta([
(col, data.dtype)
for col, data in zip(y, X)
],
parent_meta=None if df == pd else cudf.DataFrame
)
lazy_dataframe_build = dask.delayed(self._build_dataframe)
data_chunks = [x.to_delayed().ravel() for x in X]
partial_dataframes = [
ddf.from_delayed(lazy_dataframe_build(data=mapped_chunks, columns=y, xp=xp, df=df), meta=meta)
for mapped_chunks in zip(*data_chunks)
]
return ddf.concat(partial_dataframes)
[docs]
def _lazy_transform_cpu(self, X=None, **kwargs):
return self._lazy_transform(np, pd, **kwargs)
[docs]
def _lazy_transform_gpu(self, X=None, **kwargs):
return self._lazy_transform(cp, cudf, **kwargs)
[docs]
def _transform(self, xp, df, **kwargs):
X = list(kwargs.values())
y = list(kwargs.keys())
assert len(X) == len(y), "Data and labels should have the same length."
return self._build_dataframe(data=X, columns=y, xp=xp, df=df)