see function: dagrunner.utils.data_polling
see function: dagrunner.utils.process_path
see function: dagrunner.utils.stage_to_dir
DataPolling(timeout=120, polling=1, file_count=None, verbose=False)A trigger plugin that completes only when data is successfully polled.
Remote file paths using <hostname>:<path> syntax are supported as well as
local and remote glob patterns.
__call__(self, *args)Poll for data until available or timeout is reached.
Args:
- *args: File paths or glob patterns to poll for.
Returns:
- None
__init__(self, timeout=120, polling=1, file_count=None, verbose=False)Initialize the DataPolling plugin.
Args:
- timeout (int): Maximum time to wait for data in seconds.
- polling (int): Polling interval in seconds.
- file_count (int or None): Expected number of files. If None, any number greater than 1 per input/glob pattern is not considered missing.
- verbose (bool): Whether to print verbose output.
Input()A plugin to expand filepaths using keyword arguments and environment variables.
__call__(self, filepath, node_properties=None, **kwargs)Expand a filepath.
Expand the provided string (typically representing a filepath) using the keyword arguments and environment variables. Note that this plugin is 'node aware' since it is derived from the NodeAwarePlugin.
Args:
filepath(str): The filepath to be expanded.node_properties: node properties passed by the plugin executor.- **kwargs: Keyword arguments to be used in the expansion.
Returns:
- str: The expanded filepath.
Raises:
- ValueError: If positional arguments are provided.
Load(staging_dir=None, on_missing='error', verbose=False)Abstract data loader.
The load method must be implemented by the subclass.
This abstract class handles staging of files from remote hosts
and handling missing files according to the on_missing parameter as well
as globbing of file paths (local or remote).
__call__(self, *args, **kwargs)Load data from a file or list of files.
Args:
- *args: List of filepaths to load.
<hostname>:<path>syntax supported for loading files from a remote host. - **kwargs: Keyword arguments to pass to.
Returns:
- Any: User overrode 'load' abstractmethod return value, or
events.IGNOREorevents.SKIPif files are missing andon_missingis set to 'ignore' or 'skip' respectively.
Raises:
- FileNotFoundError: If any of the files do not exist and
on_missingis set to 'error'.
__init__(self, staging_dir=None, on_missing='error', verbose=False)Load data from a file.
Args:
- staging_dir: Local directory to stage files in.
Staging of remote files where filepaths are of
<hostname>:<path>syntax. A staging directory must be specified when loading remote files. - on_missing: Action to take when files are missing. Accepted values: 'error',
'ignore' and 'skip'.
'ignore' and 'skip' will return
events.IGNOREandevents.SKIPrespectively, whilst 'error' will raise aFileNotFoundError. See dagrunner.events - verbose: Print verbose output.
load(self, *args, **kwargs)Load data from a file.
Args:
- *args: Positional arguments.
- **kwargs: Keyword arguments.
Returns:
- Any: The loaded data.
Raises:
- NotImplementedError: If the method is not implemented.
LoadJson(staging_dir=None, on_missing='error', verbose=False)json file loader.
__call__(self, *args, **kwargs)Load data from a file or list of files.
Args:
- *args: List of filepaths to load.
<hostname>:<path>syntax supported for loading files from a remote host. - **kwargs: Keyword arguments to pass to.
Returns:
- Any: User overrode 'load' abstractmethod return value, or
events.IGNOREorevents.SKIPif files are missing andon_missingis set to 'ignore' or 'skip' respectively.
Raises:
- FileNotFoundError: If any of the files do not exist and
on_missingis set to 'error'.
__init__(self, staging_dir=None, on_missing='error', verbose=False)Load data from a file.
Args:
- staging_dir: Local directory to stage files in.
Staging of remote files where filepaths are of
<hostname>:<path>syntax. A staging directory must be specified when loading remote files. - on_missing: Action to take when files are missing. Accepted values: 'error',
'ignore' and 'skip'.
'ignore' and 'skip' will return
events.IGNOREandevents.SKIPrespectively, whilst 'error' will raise aFileNotFoundError. See dagrunner.events - verbose: Print verbose output.
load(self, *args)Load data from a file.
Args:
- *args: Positional arguments.
- **kwargs: Keyword arguments.
Returns:
- Any: The loaded data.
Raises:
- NotImplementedError: If the method is not implemented.
LoadPickle(staging_dir=None, on_missing='error', verbose=False)pickle file loader.
__call__(self, *args, **kwargs)Load data from a file or list of files.
Args:
- *args: List of filepaths to load.
<hostname>:<path>syntax supported for loading files from a remote host. - **kwargs: Keyword arguments to pass to.
Returns:
- Any: User overrode 'load' abstractmethod return value, or
events.IGNOREorevents.SKIPif files are missing andon_missingis set to 'ignore' or 'skip' respectively.
Raises:
- FileNotFoundError: If any of the files do not exist and
on_missingis set to 'error'.
__init__(self, staging_dir=None, on_missing='error', verbose=False)Load data from a file.
Args:
- staging_dir: Local directory to stage files in.
Staging of remote files where filepaths are of
<hostname>:<path>syntax. A staging directory must be specified when loading remote files. - on_missing: Action to take when files are missing. Accepted values: 'error',
'ignore' and 'skip'.
'ignore' and 'skip' will return
events.IGNOREandevents.SKIPrespectively, whilst 'error' will raise aFileNotFoundError. See dagrunner.events - verbose: Print verbose output.
load(self, *args)Load data from a file.
Args:
- *args: Positional arguments.
- **kwargs: Keyword arguments.
Returns:
- Any: The loaded data.
Raises:
- NotImplementedError: If the method is not implemented.
NodeAwarePlugin()An abstract base class plugin that is of type that instructs the plugin executor to pass it node parameters. This enables the definition of plugins that are 'node aware'.
__call__(self, *args, **kwargs)The main method of the plugin (abstract method).
Positional arguments represent the plugin's inputs (dependencies), while keyword arguments represent the plugin's parameters.
Args:
- *args: Positional arguments.
- **kwargs: Keyword arguments.
Returns:
- Any: The output of the plugin.
Plugin()Abstract base class to define our plugin UI
__call__(self, *args, **kwargs)The main method of the plugin (abstract method).
Positional arguments represent the plugin's inputs (dependencies), while keyword arguments represent the plugin's parameters.
Args:
- *args: Positional arguments.
- **kwargs: Keyword arguments.
Returns:
- Any: The output of the plugin.
SaveJson()Save data to a JSON file.
__call__(self, *args, filepath, node_properties=None, **kwargs)Save data to a JSON file
Save the provided data to a JSON file at the specified filepath. The filepath is expanded using the keyword arguments and environment variables. Note that this plugin is 'node aware' since it is derived from the NodeAwarePlugin.
Args:
*args: Positional arguments (data) to be saved.filepath: The filepath to save the data to.node_properties: node properties passed by the plugin executor.**kwargs: Keyword arguments to be used in the expansion.
Returns:
- None
SavePickle()Save data to a Pickle file.
__call__(self, *args, filepath, node_properties=None, **kwargs)Save data to a Pickle file
Save the provided data to a pickle file at the specified filepath. The filepath is expanded using the keyword arguments and environment variables. Note that this plugin is 'node aware' since it is derived from the NodeAwarePlugin.
Args:
*args: Positional arguments (data) to be saved.filepath: The filepath to save the data to.node_properties: node properties passed by the plugin executor.**kwargs: Keyword arguments to be used in the expansion.
Returns:
- None
Shell()Abstract base class to define our plugin UI
__call__(self, *args, **kwargs)Execute a subprocess command.
Args:
- *args: The command to be executed.
- **kwargs: Additional keyword arguments to be passed to
subprocess.run
Returns:
- CompletedProcess: An object representing the completed process.
Raises:
- CalledProcessError: If the command returns a non-zero exit status.