dasf.profile.plugins

Classes

WorkerTaskPlugin

Interface to extend the Worker

ResourceMonitor

GPUAnnotationPlugin

Interface to extend the Worker

Module Contents

class dasf.profile.plugins.WorkerTaskPlugin(name='TracePlugin')[source]

Bases: distributed.diagnostics.plugin.WorkerPlugin

Interface to extend the Worker

A worker plugin enables custom code to run at different stages of the Workers’ lifecycle.

A plugin enables custom code to run at each of step of a Workers’s life. Whenever such an event happens, the corresponding method on this class will be called. Note that the user code always runs within the Worker’s main thread.

To implement a plugin implement some of the methods of this class and register the plugin to your client in order to have it attached to every existing and future workers with Client.register_worker_plugin.

Examples

>>> class ErrorLogger(WorkerPlugin):
...     def __init__(self, logger):
...         self.logger = logger
...
...     def setup(self, worker):
...         self.worker = worker
...
...     def transition(self, key, start, finish, *args, **kwargs):
...         if finish == 'error':
...             ts = self.worker.tasks[key]
...             exc_info = (type(ts.exception), ts.exception, ts.traceback)
...             self.logger.error(
...                 "Error during computation of '%s'.", key,
...                 exc_info=exc_info
...             )
>>> import logging
>>> plugin = ErrorLogger(logging)
>>> client.register_worker_plugin(plugin)  
setup(worker)[source]

Run when the plugin is attached to a worker. This happens when the plugin is registered and attached to existing workers, or when a worker is created after the plugin has been registered.

transition(key, start, finish, *args, **kwargs)[source]

Throughout the lifecycle of a task (see Worker State), Workers are instructed by the scheduler to compute certain tasks, resulting in transitions in the state of each task. The Worker owning the task is then notified of this state transition.

Whenever a task changes its state, this method will be called.

Warning

This is an advanced feature and the transition mechanism and details of task states are subject to change without deprecation cycle.

Parameters

key : string start : string

Start state of the transition. One of waiting, ready, executing, long-running, memory, error.

finishstring

Final state of the transition.

kwargs : More options passed when transitioning

Parameters:

name (str)

class dasf.profile.plugins.ResourceMonitor(time=100, autostart=True, name='ResourceMonitor', **monitor_kwargs)[source]
Parameters:
  • autostart (bool)

  • name (str)

__del__()[source]
update()[source]
start()[source]
stop()[source]
class dasf.profile.plugins.GPUAnnotationPlugin(name='GPUAnnotationPlugin')[source]

Bases: distributed.diagnostics.plugin.WorkerPlugin

Interface to extend the Worker

A worker plugin enables custom code to run at different stages of the Workers’ lifecycle.

A plugin enables custom code to run at each of step of a Workers’s life. Whenever such an event happens, the corresponding method on this class will be called. Note that the user code always runs within the Worker’s main thread.

To implement a plugin implement some of the methods of this class and register the plugin to your client in order to have it attached to every existing and future workers with Client.register_worker_plugin.

Examples

>>> class ErrorLogger(WorkerPlugin):
...     def __init__(self, logger):
...         self.logger = logger
...
...     def setup(self, worker):
...         self.worker = worker
...
...     def transition(self, key, start, finish, *args, **kwargs):
...         if finish == 'error':
...             ts = self.worker.tasks[key]
...             exc_info = (type(ts.exception), ts.exception, ts.traceback)
...             self.logger.error(
...                 "Error during computation of '%s'.", key,
...                 exc_info=exc_info
...             )
>>> import logging
>>> plugin = ErrorLogger(logging)
>>> client.register_worker_plugin(plugin)  
setup(worker)[source]

Run when the plugin is attached to a worker. This happens when the plugin is registered and attached to existing workers, or when a worker is created after the plugin has been registered.

transition(key, start, finish, *args, **kwargs)[source]

Throughout the lifecycle of a task (see Worker State), Workers are instructed by the scheduler to compute certain tasks, resulting in transitions in the state of each task. The Worker owning the task is then notified of this state transition.

Whenever a task changes its state, this method will be called.

Warning

This is an advanced feature and the transition mechanism and details of task states are subject to change without deprecation cycle.

Parameters

key : string start : string

Start state of the transition. One of waiting, ready, executing, long-running, memory, error.

finishstring

Final state of the transition.

kwargs : More options passed when transitioning

Parameters:

name (str)