Source code for dasf.utils.funcs

""" Generic and regular functions. """
#!/usr/bin/env python3

import inspect
import os
import threading
import time
from pathlib import Path

import dask
import dask.delayed as dd
import gdown
import GPUtil
import numpy as np
import pandas
import psutil
from dask.distributed import Client
from distributed.client import FIRST_COMPLETED, wait
from distributed.utils import TimeoutError as DistributedTimeoutError
from IPython import display as disp
from IPython import get_ipython
from ipywidgets import FloatProgress, HBox, Label

from dasf.pipeline.types import TaskExecutorType

try:
    import cupy as cp
    GPU_SUPPORTED = isinstance(cp.__version__, str)
except ImportError: # pragma: no cover
    GPU_SUPPORTED = False

try:
    import jax.numpy as jnp
    JAX_SUPPORTED = isinstance(jnp.__name__, str)
except ImportError: # pragma: no cover
    JAX_SUPPORTED = False

try:
    import kvikio
    import kvikio.defaults
    KVIKIO_SUPPORTED = True
except ImportError: # pragma: no cover
    KVIKIO_SUPPORTED = False

try:
    from kvikio.nvcomp_codec import NvCompBatchCodec
    NV_COMP_BATCH_CODEC_SUPPORTED = True
except ImportError: # pragma: no cover
    NV_COMP_BATCH_CODEC_SUPPORTED = False


[docs] def human_readable_size(size, decimal=3) -> str: """ converts data size into the proper measurement """ for unit in ['B', 'KB', 'MB', 'GB', 'TB']: if size < 1024.0: break size /= 1024.0 return f"{size:.{decimal}f} {unit}"
[docs] def get_worker_info(client) -> list: """ Returns a list of workers (sorted), and the DNS name for the master host The master is the 0th worker's host """ info = client.scheduler_info() if "workers" not in info: return [] workers = info["workers"] worker_keys = sorted(workers.keys()) workers_by_host = {} for key in worker_keys: worker = workers[key] host = worker["host"] workers_by_host.setdefault(host, []).append(key) all_workers = [] if len(worker_keys) == 0: return all_workers host = workers[worker_keys[0]]["host"] global_rank = 0 world_size = len(workers_by_host) hosts = sorted(workers_by_host.keys()) for host in hosts: local_rank = 0 for worker in workers_by_host[host]: all_workers.append( dict( master=hosts[0], worker=worker, nthreads=workers[worker]["nthreads"], local_rank=0, global_rank=global_rank, host=host, world_size=world_size, ) ) local_rank += 1 global_rank += 1 return all_workers
[docs] def sync_future_loop(futures): """ Synchronize all futures submitted to workers. """ while True: if not futures: break try: result = wait(futures, 0.1, FIRST_COMPLETED) except DistributedTimeoutError: continue for fut in result.done: try: fut.result(timeout=7200) except Exception as exc: # pylint: disable=broad-except print(str(exc)) raise futures = result.not_done
[docs] class NotebookProgressBar(threading.Thread): """ ProgressBar representation for ipython notebooks only. """ MIN_CUR = -2 MIN_TOTAL = -1 def __init__(self): """ Constructor of the Progress Bar """ threading.Thread.__init__(self) # pylint: disable=disallowed-name self.bar = None self.percentage = None self.data = None self.__lock = threading.Lock() self.__current = self.MIN_CUR self.__total = self.MIN_TOTAL self.__error = False
[docs] def show(self): """ Return the HTML representation of the ProgressBar. """ self.bar = FloatProgress(value=0, min=0, max=100) self.percentage = Label(value='0 %') self.data = Label(value='') box = HBox((self.percentage, self.bar, self.data)) disp.display(box)
[docs] def set_current(self, current, total): """ Set current value of the bar progress """ with self.__lock: self.__current = current self.__total = total
[docs] def set_error(self, error): """ Set an error if it exists. """ self.__error = error
[docs] def run(self): """ Thread main loop that updates the bar progress. """ while (not self.__error and self.__current < self.__total): time.sleep(1) if self.__current != self.MIN_CUR and self.__total != self.MIN_TOTAL: progress = (self.__current / self.__total) * 100 self.bar.value = progress self.percentage.value = f"{int(self.bar.value)} %%" self.data.value = f"{int(self.__current)} / {int(self.__total)}" if not self.__error: self.bar.style.bar_color = '#03c04a' else: self.bar.style.bar_color = '#ff0000'
[docs] def download_file(url, filename=None, directory=None): """ Download a generic file and save it. """ if directory is not None: os.makedirs(os.path.dirname(directory), exist_ok=True) progressbar = None if is_notebook(): progressbar = NotebookProgressBar() def update_notebook_bar(current, total): """ Update progress bar with the current download size. """ progressbar.set_current(current, total) try: if filename and directory: output = os.path.abspath(os.path.join(directory, filename)) if not os.path.exists(output): if is_notebook(): # Activate the notebook progress bar progressbar.show() progressbar.start() gdown.download(url, output=output) # TODO: use pbar=update_notebook_bar else: gdown.download(url, output=output) elif filename: output = os.path.abspath(os.path.join(os.getcwd(), filename)) if not os.path.exists(output): if is_notebook(): # Activate the notebook progress bar progressbar.show() progressbar.start() gdown.download(url, output=output) # TODO: use pbar=update_notebook_bar else: gdown.download(url, output=output) elif directory: if is_notebook(): # Activate the notebook progress bar progressbar.show() progressbar.start() output = \ os.path.abspath(os.path.join(directory, gdown.download(url, bar=update_notebook_bar))) else: output = os.path.abspath(os.path.join(directory, gdown.download(url))) else: if is_notebook(): # Activate the notebook progress bar progressbar.show() progressbar.start() output = \ os.path.abspath(os.path.join(os.getcwd(), gdown.download(url, bar=update_notebook_bar))) else: output = os.path.abspath(os.path.join(os.getcwd(), gdown.download(url))) except Exception as exc: if progressbar: progressbar.set_error(True) # XXX: workaround to fix thread starvation while gdown does not support # external pbar's. See [1]. # [1] https://github.com/wkentaro/gdown/pull/241 if progressbar is not None: progressbar.set_current(100, 100) return output
[docs] def download_file_from_gdrive(file_id, filename=None, directory=None): """ Download a file from Google Drive using gdrive file id. """ url = f"https://drive.google.com/uc?export=download&confirm=9iBg&id={file_id}" return download_file(url, filename=filename, directory=directory)
[docs] def get_machine_memory_avail(): """ Return free memory available from a single machine. """ return psutil.virtual_memory().free
[docs] def set_executor_default(): """ Return executor as a CPU (default) instance. """ return TaskExecutorType.single_cpu
[docs] def set_executor_cpu(): """ Return executor as a CPU instance. """ return set_executor_default()
[docs] def set_executor_gpu(): """ Return executor as a GPU instance. """ return TaskExecutorType.single_gpu
[docs] def set_executor_multi_cpu(): """ Return executor as a Multi CPU instance. """ return TaskExecutorType.multi_cpu
[docs] def set_executor_multi_gpu(): """ Return executor as a GPU instance. """ return TaskExecutorType.multi_gpu
[docs] def is_executor_single(dtype) -> bool: """ Return if the executor is a single machine instance. """ return dtype in (TaskExecutorType.single_cpu, TaskExecutorType.single_gpu)
[docs] def is_executor_cluster(dtype) -> bool: """ Return if the executor is a cluster instance. """ return dtype in (TaskExecutorType.multi_cpu, TaskExecutorType.multi_gpu)
[docs] def is_executor_cpu(dtype) -> bool: """ Return if the executor is a CPU instance. """ return dtype in (TaskExecutorType.single_cpu, TaskExecutorType.multi_cpu)
[docs] def is_executor_gpu(dtype) -> bool: """ Return if the executor is a GPU instance. """ return dtype in (TaskExecutorType.single_gpu, TaskExecutorType.multi_gpu)
[docs] def executor_to_string(dtype) -> str: """ Return the executor type as a string. """ prefix = 'Multi ' if is_executor_cluster(dtype) else '' suffix = 'GPU' if is_executor_gpu(dtype) else 'CPU' return prefix + suffix
[docs] def is_gpu_supported() -> bool: """ Return if GPU is supported. """ return GPU_SUPPORTED and get_gpu_count() >= 1
[docs] def is_kvikio_supported() -> bool: """ Return if kvikio is supported (installed). """ return KVIKIO_SUPPORTED
[docs] def is_gds_supported() -> bool: """ Return if GPU Direct Store is supported. """ if is_kvikio_supported(): props = kvikio.DriverProperties() return props.is_gds_available return False
[docs] def is_kvikio_compat_mode() -> bool: """ Return if Kvikio is running in compatibility mode. """ return kvikio.defaults.compat_mode()
[docs] def is_nvcomp_codec_supported() -> bool: """ Return if NVidia Compressor Codecs are supported. """ return NV_COMP_BATCH_CODEC_SUPPORTED
[docs] def is_jax_supported() -> bool: """ Return if JAX is supported. """ return JAX_SUPPORTED
[docs] def is_dask_local_supported() -> bool: """ Return if Dask is supported locally by the executor. """ try: scheduler = dask.config.get(key="scheduler") return scheduler is not None except Exception: return False
[docs] def get_dask_running_client(): """ Get Dask runner stanza. """ try: return Client.current() except: return None
[docs] def get_backend_supported(func): """ Get backend support. """ par = inspect.signature(func) if "backend" in par.parameters: return True return False
[docs] def is_dask_supported() -> bool: """ Return if Dask is supported by the executor. """ try: if is_dask_local_supported(): return True cur = get_dask_running_client() if hasattr(cur, 'dtype'): return is_executor_cluster(cur.dtype) return cur is not None except Exception: return False
[docs] def is_dask_gpu_supported() -> bool: """ Return if any node supports GPU. """ if is_dask_supported(): if get_gpu_from_workers(): return True elif get_dask_gpu_count() > 0: return True return False
[docs] def get_gpu_from_workers() -> bool: """ Return if any worker has a GPU available. """ try: cur = get_dask_running_client() workers = cur.cluster.scheduler_info["workers"] for worker_id, worker_meta in workers.items(): if 'gpu' in worker_meta: return True except Exception: pass return False
[docs] def get_gpu_count() -> int: """ Get single node GPU count. """ return len(GPUtil.getGPUs())
[docs] def get_dask_gpu_count(fetch=True) -> int: """ Get how many GPUs are available in each worker. """ # pylint: disable=not-callable ret = dd(GPUtil.getGPUs)() if fetch: return len(ret.compute()) return ret
[docs] def get_dask_gpu_names(fetch=True) -> list: """ Get all GPU names of each worker. """ # pylint: disable=not-callable ret = dd(GPUtil.getGPUs)() if fetch: return [x.name + f", {x.memoryTotal} MB" for x in ret.compute()] return ret
[docs] def block_chunk_reduce(dask_data, output_chunk): """ Reduce the chunk according the new output size. """ drop_axis = np.array([]) new_axis = None if output_chunk is None or not isinstance(output_chunk, tuple): return drop_axis.tolist(), new_axis data_chunk = dask_data.chunksize drop_axis = np.in1d(data_chunk, output_chunk) new_axis = np.in1d(output_chunk, data_chunk) drop_axis = np.where(drop_axis == False) new_axis = np.where(new_axis == False) return drop_axis[0].tolist(), new_axis[0].tolist()
[docs] def trim_chunk_location(block_info, depth, index=0): """ Trim an overlapped chunk to the exact size of the chunk. """ if not 'array-location' in block_info[index]: raise IndexError("Key 'array-location' was not found in block-info.") if not 'chunk-location' in block_info[index]: raise IndexError("Key 'chunk-location' was not found in block-info.") loc = block_info[index]['array-location'] chunks = block_info[index]['chunk-location'] if len(depth) != len(loc) and len(depth) != len(chunks): raise ValueError(f"Depth {len(depth)}, location {len(loc)} and/or chunks {len(chunks)} do not match.") loc_orig = [] for i in range(0, len(depth)): loc_orig.append((loc[i][0] - 2 * depth[i] * chunks[i], loc[i][1] - 2 * depth[i] - (loc[i][0] - 2 * depth[i] * chunks[i]))) return loc_orig
[docs] def get_dask_mem_usage(profiler): """ Get Dask memory usage profile. """ profiler_dir = os.path.abspath(str(Path.home()) + "/.cache/dasf/profiler/") if profiler == "memusage": os.makedirs(profiler_dir, exist_ok=True) mem = pandas.read_csv(os.path.abspath(profiler_dir + "/dask-memusage")) column = mem["max_memory_mb"] max_index = column.idxmax() return mem["max_memory_mb"][max_index] return 0.0
[docs] def is_notebook() -> bool: """ Return if the code is being executed in a IPyNotebook. """ try: shell = get_ipython().__class__.__name__ if shell == "ZMQInteractiveShell": return True except NameError: pass return False
[docs] def weight_gaussian(shape): """ Produces a NDArray for a given shape with a Gaussian Distribution in all directions starting from the center """ center = np.array(shape) / 2 distances = np.zeros(shape) for idx in np.ndindex(shape): distances[idx] = np.linalg.norm(np.array(idx) - center) distances = distances / np.max(distances) return np.exp(-2 * (distances**2)) / (np.sqrt(2 * np.pi) / 2)
[docs] def weight_radial(shape): """ Produces a NDArray for a given shape with a decreasing rate starting from the center """ center = np.array(shape) / 2 distances = np.zeros(shape) for idx in np.ndindex(shape): distances[idx] = np.linalg.norm(np.array(idx) - center) return 1 / (1 + distances)