#!/usr/bin/env python3
""" Base module for most of the DASF Datasets. """
import json
import os
from numbers import Number
import dask
import dask.array as da
import dask.dataframe as ddf
import h5py
import numpy as np
import numpy.lib.format
import pandas as pd
import xarray as xr
import zarr
try:
import cudf
import cupy as cp
# This is just to enable Xarray Cupy capabilities
import cupy_xarray as cx # noqa
import dask_cudf as dcudf
except ImportError: # pragma: no cover
pass
try:
import numcodecs # noqa
from kvikio.nvcomp_codec import NvCompBatchCodec
from kvikio.zarr import GDSStore
except ImportError: # pragma: no cover
pass
from pathlib import Path
from dasf.transforms.base import TargeteredTransform
from dasf.utils.decorators import task_handler
from dasf.utils.funcs import (
human_readable_size,
is_gds_supported,
is_kvikio_compat_mode,
is_kvikio_supported,
is_nvcomp_codec_supported,
)
from dasf.utils.types import is_array, is_dask_array
[docs]
class Dataset(TargeteredTransform):
"""
Class representing a generic dataset based on a TargeteredTransform
object.
Parameters
----------
name : str
Symbolic name of the dataset.
download : bool
If the dataset must be downloaded (the default is False).
root : str
Root download directory (the default is None).
*args : type
Additional arguments without keys.
**kwargs : type
Additional keyworkded arguments.
"""
def __init__(self,
name: str,
download: bool = False,
root: str = None,
*args,
**kwargs):
"""
Constructor of the object Dataset.
"""
super().__init__(*args, **kwargs)
# Dataset internals
self._name = name
self._download = download
self._root = root
self._metadata = {}
self._data = None
self._chunks = None
self.__set_dataset_cache_dir()
self.download()
def __set_dataset_cache_dir(self):
"""
Generate cached directory in $HOME to store dataset(s).
"""
self._cache_dir = os.path.abspath(str(Path.home()) + "/.cache/dasf/datasets/")
os.makedirs(self._cache_dir, exist_ok=True)
if self._root is None:
self._root = self._cache_dir
[docs]
def download(self):
"""
Skeleton of the download method.
"""
if self._download:
raise NotImplementedError("Function download() needs to be defined")
[docs]
def __len__(self) -> int:
"""
Return internal data length.
"""
if self._data is None:
raise Exception("Data is not loaded yet")
return len(self._data)
[docs]
def __getitem__(self, idx):
"""
Generic __getitem__() function based on internal data.
Parameters
----------
idx : Any
Key of the fetched data. It can be an integer or a tuple.
"""
if self._data is None:
raise Exception("Data is not loaded yet")
return self._data.__getitem__(idx)
[docs]
class DatasetArray(Dataset):
"""
Class representing an dataset wich is defined as an array of a defined
shape.
Parameters
----------
name : str
Symbolic name of the dataset.
download : bool
If the dataset must be downloaded (the default is False).
root : str
Root download directory (the default is None).
chunks : Any
Number of blocks of the array (the default is "auto").
"""
def __init__(self,
name: str,
download: bool = False,
root: str = None,
chunks="auto"):
"""
Constructor of the object DatasetArray.
"""
Dataset.__init__(self, name, download, root)
self._chunks = chunks
self._root_file = root
if root is not None:
if not os.path.isfile(root):
raise Exception("Array requires a root=filename.")
self._root = os.path.dirname(root)
[docs]
def __operator_check__(self, other):
"""
Check what type of the data we are handling
Examples:
DatasetArray with array-like; or
DatasetArray with DatasetArray
Parameters
----------
other : Any
array-like of DatasetArray for the operation.
Returns
-------
data : Any
A data representing the internal array or the class itself.
"""
assert self._data is not None, "Data is not loaded yet."
if isinstance(other, DatasetArray):
return other._data
return other
[docs]
def __repr__(self):
"""
Return a class representation based on internal array.
"""
return repr(self._data)
[docs]
def __array__(self, dtype=None):
"""
Array interface is required to support most of the array functions.
Parameters
----------
dtype : Any
Type of the internal array, default=None (not used)
Returns
-------
data : Any
A data representing the internal array or the class itself.
"""
assert self._data is not None, "Data is not loaded yet."
return self._data
[docs]
def __array_ufunc__(self, ufunc, method, *inputs, **kwargs):
"""
Any class, array subclass or not, can define this method or set
it to None in order to override the behavior of Arrays ufuncs.
Parameters
----------
ufunc : Callable
The ufunc object that was called.
method : Str
A string indicating which Ufunc method was called (one of
"__call__", "reduce", "reduceat", "accumulate", "outer", "inner").
inputs : Any
A tuple of the input arguments to the ufunc.
kwargs : Any
A dictionary containing the optional input arguments of the ufunc.
If given, any out arguments, both positional and keyword, are
passed as a tuple in kwargs. See the discussion in Universal
functions (ufunc) for details.
Returns
-------
array : array-like
The return either the result of the operation.
"""
assert self._data is not None, "Data is not loaded yet."
if method == '__call__':
scalars = []
for inp in inputs:
if isinstance(inp, Number):
scalars.append(inp)
elif isinstance(inp, self.__class__):
scalars.append(inp._data)
else:
return NotImplemented
self.__class__(name=self._name, chunks=self._chunks)
self._data = ufunc(*scalars, **kwargs)
return self
return NotImplemented
def __check_op_input(self, in_data):
"""
Return the proper type of data for operation
>>> Result = DatasetArray + Numpy; or
>>> Result = DatasetArray + DatasetArray
Parameters
----------
in_data : Any
Input data to be analyzed.
Returns
-------
data : Any
A data representing the internal array or the class itself.
"""
if is_array(in_data) or is_dask_array(in_data):
return in_data
if isinstance(in_data, self.__class__):
return in_data._data
raise TypeError("Data is incompatible with Array")
[docs]
def __add__(self, other):
"""
Internal function of adding two array datasets.
Parameters
----------
other : Any
A data representing an array or a DatasetArray.
Returns
-------
DatasetArray
A sum with two arrays.
"""
assert self._data is not None, "Data is not loaded yet."
data = self.__check_op_input(other)
return self._data + data
[docs]
def __sub__(self, other):
"""
Internal function of subtracting two array datasets.
Parameters
----------
other : Any
A data representing an array or a DatasetArray.
Returns
-------
DatasetArry
A subtraction of two arrays.
"""
assert self._data is not None, "Data is not loaded yet."
data = self.__check_op_input(other)
return self._data - data
[docs]
def __mul__(self, other):
"""
Internal function of multiplication two array datasets.
Parameters
----------
other : Any
A data representing an array or a DatasetArray.
Returns
-------
DatasetArry
A multiplication of two arrays.
"""
assert self._data is not None, "Data is not loaded yet."
data = self.__check_op_input(other)
return self._data * data
[docs]
def __div__(self, other):
"""
Internal function of division two array datasets.
Parameters
----------
other : Any
A data representing an array or a DatasetArray.
Returns
-------
DatasetArry
A division of two arrays.
"""
assert self._data is not None, "Data is not loaded yet."
data = self.__check_op_input(other)
return self._data / data
def __copy_attrs_from_data(self):
"""
Extends metadata to new transformed object (after operations).
"""
self._metadata["type"] = type(self._data)
attrs = dir(self._data)
for attr in attrs:
if not attr.startswith("__") and callable(getattr(self._data, attr)):
if not hasattr(self, attr):
self.__dict__[attr] = getattr(self._data, attr)
def __npy_header(self):
"""
Read an array header from a filelike object.
"""
with open(self._root_file, 'rb') as fobj:
version = numpy.lib.format.read_magic(fobj)
func_name = "read_array_header_" + "_".join(str(v) for v in version)
func = getattr(numpy.lib.format, func_name)
return func(fobj)
[docs]
def _lazy_load(self, xp, **kwargs):
"""
Lazy load the dataset using an CPU dask container.
Parameters
----------
xp : type
Library used to load the file. It must follow numpy library.
**kwargs : type
Additional keyworkded arguments to the load.
Returns
-------
Any
The data (or a Future load object, for `_lazy` operations).
"""
npy_shape = self.shape
local_data = dask.delayed(xp.load)(self._root_file, **kwargs)
local_data = da.from_delayed(local_data, shape=npy_shape,
dtype=xp.float32, meta=xp.array(()))
if isinstance(self._chunks, tuple):
local_data = local_data.rechunk(self._chunks)
return local_data
[docs]
def _load(self, xp, **kwargs):
"""
Load data using CPU container.
Parameters
----------
xp : Module
A module that load data (implement `load` function)
**kwargs : type
Additional `kwargs` to `xp.load` function.
"""
return xp.load(self._root_file, **kwargs)
[docs]
def _lazy_load_gpu(self):
"""
Load data with GPU container + DASK. (It does not load immediattly)
"""
self._metadata = self._load_meta()
self.from_array(self._lazy_load(cp))
return self
[docs]
def _lazy_load_cpu(self):
"""
Load data with CPU container + DASK. (It does not load immediattly)
"""
self._metadata = self._load_meta()
self.from_array(self._lazy_load(np))
return self
[docs]
def _load_gpu(self):
"""
Load data with GPU container (e.g. cupy).
"""
self._metadata = self._load_meta()
self.from_array(self._load(cp))
return self
[docs]
def _load_cpu(self):
"""
Load data with CPU container (e.g. numpy).
"""
self._metadata = self._load_meta()
self.from_array(self._load(np))
return self
[docs]
def from_array(self, array):
"""
Load data from an existing array.
Parameters
----------
array : array-like
Input data to be initialized.
"""
self._data = array
self.__copy_attrs_from_data()
[docs]
@task_handler
def load(self):
"""
Placeholder for load function.
"""
...
@property
def shape(self) -> tuple:
"""
Returns the shape of an array.
Returns
-------
tuple
A tuple with the shape.
"""
if self._data is not None:
return self._data.shape
return self.__npy_header()[0]
[docs]
class DatasetZarr(Dataset):
"""
Class representing an dataset wich is defined as a Zarr array of a
defined shape.
Parameters
----------
name : str
Symbolic name of the dataset.
download : bool
If the dataset must be downloaded (the default is False).
root : str
Root download directory (the default is None).
chunks : Any
Number of blocks of the array (the default is "auto").
"""
def __init__(self,
name: str,
download: bool = False,
root: str = None,
backend: str = None,
chunks=None):
""" Constructor of the object DatasetZarr. """
Dataset.__init__(self, name, download, root)
self._backend = backend
self._chunks = chunks
self._root_file = root
if root is not None:
if not os.path.isfile(root):
self._root = root
else:
self._root = os.path.dirname(root)
[docs]
def _lazy_load(self, xp, **kwargs):
"""
Lazy load the dataset using an CPU dask container.
Parameters
----------
xp : type
Library used to load the file. It must follow numpy library.
**kwargs : type
Additional keyworkded arguments to the load.
Returns
-------
Any
The data (or a Future load object, for `_lazy` operations).
"""
if (self._backend == "kvikio" and is_kvikio_supported() and
(is_gds_supported() or is_kvikio_compat_mode()) and
is_nvcomp_codec_supported()):
store = GDSStore(self._root_file)
meta = json.loads(store[".zarray"])
meta["compressor"] = NvCompBatchCodec("lz4").get_config()
store[".zarray"] = json.dumps(meta).encode()
array = zarr.open_array(store, meta_array=xp.empty(()))
return da.from_zarr(array, chunks=array.chunks).map_blocks(xp.asarray)
return da.from_zarr(self._root_file, chunks=self._chunks).map_blocks(xp.asarray)
[docs]
def _load(self, xp, **kwargs):
"""
Load data using CPU container.
Parameters
----------
xp : Module
A module that load data (implement `load` function)
**kwargs : type
Additional `kwargs` to `xp.load` function.
"""
return zarr.open(self._root_file, mode='r', meta_array=xp.empty(()))
[docs]
def _lazy_load_cpu(self):
"""Load data with CPU container + DASK. (It does not load immediattly)
"""
self._metadata = self._load_meta()
self._data = self._lazy_load(np)
self.__copy_attrs_from_data()
return self
[docs]
def _lazy_load_gpu(self):
"""Load data with GPU container + DASK. (It does not load immediattly)
"""
self._metadata = self._load_meta()
self._data = self._lazy_load(cp)
self.__copy_attrs_from_data()
return self
[docs]
def _load_cpu(self):
"""Load data with CPU container (e.g. numpy).
"""
self._metadata = self._load_meta()
self._data = self._load(np)
self.__copy_attrs_from_data()
return self
[docs]
def _load_gpu(self):
"""Load data with GPU container (e.g. cupy).
"""
self._metadata = self._load_meta()
self._data = self._load(cp)
self.__copy_attrs_from_data()
return self
[docs]
@task_handler
def load(self):
"""Placeholder for load function.
"""
...
def __read_zarray(self, key):
"""Returns the value of ZArray JSON metadata.
"""
if self._root_file and os.path.isdir(self._root_file):
zarray = os.path.abspath(self._root_file + "/.zarray")
if os.path.exists(zarray):
try:
with open(zarray) as fz:
meta = json.load(fz)
return meta[key]
except Exception:
pass
return None
@property
def shape(self) -> tuple:
"""
Returns the shape of an array.
Returns
-------
tuple
A tuple with the shape.
"""
if not self._data:
shape = self.__read_zarray("shape")
if shape is not None:
return tuple(shape)
return tuple()
return self._data.shape
@property
def chunksize(self):
"""
Returns the chunksize of an array.
Returns
-------
tuple
A tuple with the chunksize.
"""
if not self._data:
chunks = self.__read_zarray("chunks")
if chunks is not None:
return tuple(chunks)
return tuple()
return self._data.chunksize
[docs]
def __repr__(self):
"""
Return a class representation based on internal array.
"""
return repr(self._data)
def __check_op_input(self, in_data):
"""
Return the proper type of data for operation
>>> Result = DatasetZarr + Numpy; or
>>> Result = DatasetZarr + DatasetZarr
Parameters
----------
in_data : Any
Input data to be analyzed.
Returns
-------
data : Any
A data representing the internal array or the class itself.
"""
if is_array(in_data) or is_dask_array(in_data):
return in_data
elif isinstance(in_data, self.__class__):
return in_data._data
raise TypeError("Data is incompatible with Array")
[docs]
def __add__(self, other):
"""
Internal function of adding two array datasets.
Parameters
----------
other : Any
A data representing an array or a DatasetArray.
Returns
-------
DatasetArry
A sum with two arrays.
"""
assert self._data is not None, "Data is not loaded yet."
data = self.__check_op_input(other)
return self._data + data
[docs]
def __sub__(self, other):
"""
Internal function of subtracting two array datasets.
Parameters
----------
other : Any
A data representing an array or a DatasetArray.
Returns
-------
DatasetArry
A subtraction of two arrays.
"""
assert self._data is not None, "Data is not loaded yet."
data = self.__check_op_input(other)
return self._data - data
[docs]
def __mul__(self, other):
"""
Internal function of multiplication two array datasets.
Parameters
----------
other : Any
A data representing an array or a DatasetArray.
Returns
-------
DatasetArry
A multiplication of two arrays.
"""
assert self._data is not None, "Data is not loaded yet."
data = self.__check_op_input(other)
return self._data * data
[docs]
def __div__(self, other):
"""
Internal function of division two array datasets.
Parameters
----------
other : Any
A data representing an array or a DatasetArray.
Returns
-------
DatasetArry
A division of two arrays.
"""
assert self._data is not None, "Data is not loaded yet."
data = self.__check_op_input(other)
return self._data / data
def __copy_attrs_from_data(self):
"""
Extends metadata to new transformed object (after operations).
"""
self._metadata["type"] = type(self._data)
attrs = dir(self._data)
for attr in attrs:
if not attr.startswith("__") and callable(getattr(self._data, attr)):
if not hasattr(self, attr):
self.__dict__[attr] = getattr(self._data, attr)
[docs]
class DatasetHDF5(Dataset):
"""
Class representing an dataset wich is defined as a HDF5 dataset of a
defined shape.
Parameters
----------
name : str
Symbolic name of the dataset.
download : bool
If the dataset must be downloaded (the default is False).
root : str
Root download directory (the default is None).
chunks : Any
Number of blocks of the array (the default is "auto").
dataset_path : str
Relative path of the internal HDF5 dataset (the default is None).
"""
def __init__(self,
name: str,
download: str = False,
root: str = None,
chunks="auto",
dataset_path: str = None):
"""
Constructor of the object DatasetHDF5.
"""
Dataset.__init__(self, name, download, root)
self._chunks = chunks
self._root_file = root
self._dataset_path = dataset_path
if root is not None:
if not os.path.isfile(root):
raise Exception("HDF5 requires a root=filename.")
self._root = os.path.dirname(root)
if dataset_path is None:
raise Exception("HDF5 requires a path.")
[docs]
def _lazy_load(self, xp, **kwargs):
"""
Lazy load the dataset using an CPU dask container.
Parameters
----------
xp : type
Library used to load the file. It must follow numpy library.
**kwargs : type
Additional keyworkded arguments to the load.
Returns
-------
Any
The data (or a Future load object, for `_lazy` operations).
"""
f = h5py.File(self._root_file)
data = f[self._dataset_path]
return da.from_array(data, chunks=self._chunks, meta=xp.array(()))
[docs]
def _load(self, xp=None, **kwargs):
"""
Load data using CPU container.
Parameters
----------
xp : Module
A module that load data (implement `load` function) (placeholder).
**kwargs : type
Additional `kwargs` to `xp.load` function.
"""
f = h5py.File(self._root_file)
return f[self._dataset_path]
[docs]
def _lazy_load_cpu(self):
"""
Load data with CPU container + DASK. (It does not load immediattly)
"""
self._metadata = self._load_meta()
self._data = self._lazy_load(np)
return self
[docs]
def _lazy_load_gpu(self):
"""
Load data with GPU container + DASK. (It does not load immediattly)
"""
self._metadata = self._load_meta()
self._data = self._lazy_load(cp)
return self
[docs]
def _load_cpu(self):
"""
Load data with CPU container (e.g. numpy).
"""
self._metadata = self._load_meta()
self._data = self._load()
return self
[docs]
def _load_gpu(self):
"""
Load data with GPU container (e.g. cupy).
"""
self._metadata = self._load_meta()
self._data = cp.asarray(self._load())
return self
[docs]
@task_handler
def load(self):
"""
Placeholder for load function.
"""
...
[docs]
class DatasetXarray(Dataset):
"""
Class representing an dataset wich is defined as a Xarray dataset of a
defined shape.
Parameters
----------
name : str
Symbolic name of the dataset.
download : bool
If the dataset must be downloaded (the default is False).
root : str
Root download directory (the default is None).
chunks : Any
Number of blocks of the array (the default is "auto").
data_var : Any
Key (or index) of the internal Xarray dataset (the default is None).
"""
def __init__(self,
name: str,
download: bool = False,
root: str = None,
chunks=None,
data_var=None):
"""
Constructor of the object DatasetXarray.
"""
Dataset.__init__(self, name, download, root)
self._chunks = chunks
self._root_file = root
self._data_var = data_var
if chunks and not isinstance(chunks, dict):
raise Exception("Chunks should be a dict.")
if root is not None:
if not os.path.isfile(root):
raise Exception("HDF5 requires a root=filename.")
self._root = os.path.dirname(root)
[docs]
def _lazy_load_cpu(self):
"""
Load data with CPU container + DASK. (It does not load immediattly)
"""
assert self._chunks is not None, "Lazy operations require chunks"
if self._data_var:
self._data = xr.open_dataset(self._root_file,
chunks=self._chunks)
else:
self._data = xr.open_dataarray(self._root_file,
chunks=self._chunks)
self._metadata = self._load_meta()
[docs]
def _lazy_load_gpu(self):
"""
Load data with GPU container + DASK. (It does not load immediattly)
"""
assert self._chunks is not None, "Lazy operations require chunks"
if self._data_var:
self._data = xr.open_dataset(self._root_file,
chunks=self._chunks).as_cupy()
else:
self._data = xr.open_dataarray(self._root_file,
chunks=self._chunks).as_cupy()
self._metadata = self._load_meta()
[docs]
def _load_cpu(self):
"""
Load data with CPU container (e.g. numpy).
"""
if self._data_var:
self._data = xr.open_dataset(self._root_file)
else:
self._data = xr.open_dataarray(self._root_file)
self._data.load()
self._metadata = self._load_meta()
[docs]
def _load_gpu(self):
"""
Load data with GPU container (e.g. cupy).
"""
if self._data_var:
self._data = xr.open_dataset(self._root_file).as_cupy()
else:
self._data = xr.open_dataarray(self._root_file).as_cupy()
self._data.load()
self._metadata = self._load_meta()
[docs]
@task_handler
def load(self):
"""
Placeholder for load function.
"""
...
[docs]
def __len__(self) -> int:
"""
Return internal data length.
"""
if self._data is None:
raise Exception("Data is not loaded yet")
if self._data_var:
return len(self._data[self._data_var])
return len(self._data)
[docs]
def __getitem__(self, idx):
"""
A __getitem__() function based on internal Xarray data.
Parameters
----------
idx : Any
Key of the fetched data. It can be an integer or a tuple.
"""
if self._data is None:
raise Exception("Data is not loaded yet")
# Always slice a DataArray
if self._data_var:
return self._data[self._data_var].data[idx]
return self._data.data[idx]
[docs]
class DatasetLabeled(Dataset):
"""
A class representing a labeled dataset. Each item is a 2-element tuple,
where the first element is a array of data and the second element is the
respective label. The items can be accessed from `dataset[x]`.
Parameters
----------
name : str
Symbolic name of the dataset.
download : bool
If the dataset must be downloaded (the default is False).
root : str
Root download directory (the default is None).
chunks : Any
Number of blocks of the array (the default is "auto").
Attributes
----------
__chunks : type
Description of attribute `__chunks`.
"""
def __init__(self,
name: str,
download: bool = False,
root: str = None,
chunks="auto"):
"""
Constructor of the object DatasetLabeled.
"""
Dataset.__init__(self, name, download, root)
self._chunks = chunks
[docs]
def download(self):
"""
Download the dataset.
"""
if hasattr(self, "_train") and hasattr(self._train, "download"):
self._train.download()
if hasattr(self, "_val") and hasattr(self._val, "download"):
self._val.download()
[docs]
def _lazy_load(self, xp, **kwargs) -> tuple:
"""
Lazy load the dataset using an CPU dask container.
Parameters
----------
xp : type
Library used to load the file. It must follow numpy library.
**kwargs : type
Additional keyworkded arguments to the load.
Returns
-------
Tuple
A Future object that will return a tuple: (data, label).
"""
local_data = self._train._lazy_load(xp)
local_labels = self._val._lazy_load(xp)
return (local_data, local_labels)
[docs]
def _load(self, xp, **kwargs) -> tuple:
"""
Load data using CPU container.
Parameters
----------
xp : Module
A module that load data (implement `load` function)
**kwargs : type
Additional `kwargs` to `xp.load` function.
Returns
-------
Tuple
A 2-element tuple: (data, label)
"""
local_data = self._train._load(xp)
local_labels = self._val._load(xp)
return (local_data, local_labels)
[docs]
def _lazy_load_gpu(self):
"""
Load data with GPU container + DASK. (It does not load immediattly)
"""
self._metadata = self._load_meta()
self._data, self._labels = self._lazy_load(cp)
[docs]
def _lazy_load_cpu(self):
"""
Load data with CPU container + DASK. (It does not load immediattly)
"""
self._metadata = self._load_meta()
self._data, self._labels = self._lazy_load(np)
[docs]
def _load_gpu(self):
"""
Load data with GPU container (e.g. cupy).
"""
self._metadata = self._load_meta()
self._data, self._labels = self._load(cp)
[docs]
def _load_cpu(self):
"""
Load data with CPU container (e.g. numpy).
"""
self._metadata = self._load_meta()
self._data, self._labels = self._load(np)
[docs]
@task_handler
def load(self):
"""
Placeholder for load function.
"""
...
[docs]
def __getitem__(self, idx):
"""
A __getitem__() function for data and labeled data together.
Parameters
----------
idx : Any
Key of the fetched data. It can be an integer or a tuple.
"""
if self._data is None:
raise Exception("Data is not loaded yet")
return (self._data.__getitem__(idx), self._labels.__getitem__(idx))
[docs]
class DatasetDataFrame(Dataset):
"""
Class representing an dataset wich is defined as a dataframe.
Parameters
----------
name : str
Symbolic name of the dataset.
download : bool
If the dataset must be downloaded (the default is False).
root : str
Root download directory (the default is None).
chunks : Any
Number of blocks of the array (the default is "auto").
"""
def __init__(self,
name: str,
download: bool = True,
root: str = None,
chunks="auto"):
"""
Constructor of the object DatasetDataFrame.
"""
Dataset.__init__(self, name, download, root)
self._chunks = chunks
self._root_file = root
if root is not None:
if not os.path.isfile(root):
raise Exception("DataFrame requires a root=filename.")
self._root = os.path.dirname(root)
[docs]
def _lazy_load_gpu(self):
"""
Load data with GPU container + DASK. (It does not load immediattly)
"""
self._data = dcudf.read_csv(self._root_file)
self._metadata = self._load_meta()
return self
[docs]
def _lazy_load_cpu(self):
"""
Load data with CPU container + DASK. (It does not load immediattly)
"""
self._data = ddf.read_csv(self._root_file)
self._metadata = self._load_meta()
return self
[docs]
def _load_gpu(self):
"""
Load data with GPU container (e.g. CuDF).
"""
self._data = cudf.read_csv(self._root_file)
self._metadata = self._load_meta()
return self
[docs]
def _load_cpu(self):
"""
Load data with CPU container (e.g. pandas).
"""
self._data = pd.read_csv(self._root_file)
self._metadata = self._load_meta()
return self
[docs]
@task_handler
def load(self):
"""
Placeholder for load function.
"""
...
@property
def shape(self) -> tuple:
"""
Returns the shape of an array.
Returns
-------
tuple
A tuple with the shape.
"""
if self._data is None:
raise Exception("Data is not loaded yet")
return self._data.shape
[docs]
def __len__(self) -> int:
"""
Return internal data length.
"""
if self._data is None:
raise Exception("Data is not loaded yet")
return len(self._data)
[docs]
def __getitem__(self, idx):
"""
A __getitem__() function based on internal dataframe.
Parameters
----------
idx : Any
Key of the fetched data. It can be an integer or a tuple.
"""
if self._data is None:
raise Exception("Data is not loaded yet")
return self._data.iloc[idx]
[docs]
class DatasetParquet(DatasetDataFrame):
"""
Class representing an dataset wich is defined as a Parquet.
Parameters
----------
name : str
Symbolic name of the dataset.
download : bool
If the dataset must be downloaded (the default is False).
root : str
Root download directory (the default is None).
chunks : Any
Number of blocks of the array (the default is "auto").
"""
def __init__(self,
name: str,
download: bool = True,
root: str = None,
chunks="auto"):
"""
Constructor of the object DatasetParquet.
"""
DatasetDataFrame.__init__(self, name, download, root, chunks)
[docs]
def _lazy_load_gpu(self):
"""
Load data with GPU container + DASK. (It does not load immediattly)
"""
self._data = dcudf.read_parquet(self._root_file)
self._metadata = self._load_meta()
return self
[docs]
def _lazy_load_cpu(self):
"""
Load data with CPU container + DASK. (It does not load immediattly)
"""
self._data = ddf.read_parquet(self._root_file)
self._metadata = self._load_meta()
return self
[docs]
def _load_gpu(self):
"""
Load data with GPU container (e.g. CuDF).
"""
self._data = cudf.read_parquet(self._root_file)
self._metadata = self._load_meta()
return self
[docs]
def _load_cpu(self):
"""
Load data with CPU container (e.g. pandas).
"""
self._data = pd.read_parquet(self._root_file)
self._metadata = self._load_meta()
return self