Pipeline#

class Pipeline(steps=None)[source]#

Implementation of a Graphpipeline.

This class is a generalized graph pipeline. Generalized means that it can contain forecasters, classifiers, etc. The graph pipeline mean that the structure is not linear. I.e., the each element of the pipeline can be the input of multiple other steps and not only one successors.

fit(y, X, *args) - changes state by running fit on all sktime estimators and

transformers in the pipeline. Note that depending on the sktime estimators and transformers that are added to the pipeline, different keywords are required. E.g., if a forecaster is part of the pipeline, a forecast horizon (fh) should be provided.

predict(X, *args) - Results in calling predict on the estimators in the pipeline

and transform or the specified method on the other skobjects in the pipeline. Depending on the skobject added to the pipeline, you might need to pass additional parameters to predict.

predict_interval(X, fh), predict_quantiles(X, fh) - as predict(X, fh),

with predict_interval or predict_quantiles substituted for predict.

predict_var, predict_proba - are currently not supported get_params, set_params uses sklearn compatible nesting interface add_step(skobject, name, edges, method, **kwargs) - adds a skobject to the

pipeline and setting the name as identifier and the steps specified with edges as input steps (predecessors). Thereby the method that should be called can be overridden using the method kwarg. Further provided kwargs are directly provided to the skobject if it is called.

Parameters:
param stepsA list of dicts that specify the steps of the pipeline. Further

steps can be added by using add_step method. The dict requires the following keys:

  • skobject: sktime object, the skobject that should be added to the

    pipeline

  • name: str, the name of the step that is created

  • edges: dict, a dict with string keys to string values. Identifying the

    predcessors. The keys of the edges dict specify to which argument of fit/predict/.. the output of the predecessors (the value of the dict specifies the predecessors name) should be passed.

  • method: str, an optional argument allowing to determine the method that

    should be executed when the pipeline calls the provided skobject. If not specified, the pipeline selects the method based on the method that is called on the pipeline (e.g., predict, transform, ..)

  • kwargs: additional kwargs are parameters that are provided to the

    skobject if fit/predict/.. is called.

Attributes:
attribute id_to_true_ida dict with integer keys and values,

mapping the python object id to skobject ids.

attribute id_to_obja dict with integer keys and weak references of

skobjects as values. The values are the weak references of the skobjects provided to the add_step method. We store the weak references to avoid that the id of the object is reassigned if the user deletes all it references to the object.

attribute model_dicta dict with integer keys and skobject values.

This is a mapping of the id of the skobjects provided to add_step to the cloned skobject.

attribute counterinteger, counts the number of steps in the pipeline.
attribute stepsa dict with string keys and step object values.

The key is the name that is specified if a skobject is added to the pipeline.

attribute kwargsa dict with str keys and object values. Stores all kwargs

that are specified and might be passed to skobjects in the pipeline.

References

[1]

@article{heidrich2021pywatts, title={pyWATTS: Python workflow automation tool for time series}, author={Heidrich, Benedikt and Bartschat, Andreas and Turowski, Marian and

Neumann, Oliver and Phipps, Kaleb and Meisenbacher, Stefan and Schmieder, Kai and Ludwig, Nicole and Mikut, Ralf and Hagenmeyer, Veit},

journal={arXiv preprint arXiv:2106.10157}, year={2021}

}

Examples

>>> from sktime.classification.distance_based import KNeighborsTimeSeriesClassifier
>>> from sktime.datasets import load_arrow_head, load_longley
>>> from sktime.split import temporal_train_test_split
>>> from sktime.forecasting.naive import NaiveForecaster
>>> from sktime.pipeline import Pipeline
>>> from sktime.transformations.compose import Id
>>> from sktime.transformations.series.boxcox import BoxCoxTransformer
>>> from sktime.transformations.series.exponent import ExponentTransformer

Example 1: Simple sequential pipeline of transformers using the generalized non-sequential pipeline implementation

>>> y, X = load_longley()
>>> general_pipeline = Pipeline()
>>> for step in [
...     {"skobject": ExponentTransformer(), "name": "exp", "edges": {"X": "X"}},
...     {"skobject": BoxCoxTransformer(), "name": "box", "edges": {"X": "exp"}},
...     ]:
...     general_pipeline = general_pipeline.add_step(**step)
>>> general_pipeline.fit(X=X) 
>>> result_general = general_pipeline.transform(X) 
Example 2: Classification sequential pipeline using the generalized

non-sequential pipeline implementation

>>> X, y = load_arrow_head(split="train", return_X_y=True)
>>> general_pipeline = Pipeline()
>>> for step in [
...     {"skobject": ExponentTransformer(), "name": "exp", "edges": {"X": "X"}},
...     {"skobject": KNeighborsTimeSeriesClassifier(),
...      "name": "knnclassifier",
...      "edges": {"X": "exp", "y": "y"}}]:
...     general_pipeline = general_pipeline.add_step(**step)
>>> general_pipeline.fit(X=X, y=y) 
>>> result_general = general_pipeline.predict(X) 

Example 3: Forecasting pipeline with exogenous features using the generalized non-sequential pipeline implementation

>>> y, X = load_longley()
>>> y_train, y_test, X_train, X_test = temporal_train_test_split(y, X)
>>> general_pipeline = Pipeline()
>>> for step in [
...     {"skobject": ExponentTransformer(), "name": "exp", "edges": {"X": "X"}},
...     {"skobject": NaiveForecaster(),
...      "name": "SARIMAX",
...      "edges": {"X": "exp", "y": "y"}}]:
...     general_pipeline = general_pipeline.add_step(**step)
>>> general_pipeline.fit(y=y_train, X=X_train, fh=[1, 2, 3, 4]) 
>>> result_general = general_pipeline.predict(X=X_test) 

Acknowledgements This graphical pipeline is inspired by pyWATTS that is developed by the Institute for Automation and Applied Informatics (IAI) at Karlsruhe Institute of Technology. The implementation is supported by IAI and the author benHeid is funded by HelmholtzAI. Furthermore, we also want to credit @ViktorKaz for his independent pipeline design that is similar to this one.

Methods

add_step(skobject, name, edges[, method])

Add a new skobject to the pipeline.

check_is_fitted()

Check if the estimator has been fitted.

clone()

Obtain a clone of the object with same hyper-parameters.

clone_tags(estimator[, tag_names])

Clone tags from another estimator as dynamic override.

create_test_instance([parameter_set])

Construct Estimator instance if possible.

create_test_instances_and_names([parameter_set])

Create list of all test instances and a list of names for them.

fit([X, y])

Fit graph pipeline to training data.

fit_transform(X[, y])

Fit graph pipeline to training data and call transform afterward.

get_class_tag(tag_name[, tag_value_default])

Get a class tag's value.

get_class_tags()

Get class tags from the class and all its parent classes.

get_config()

Get config flags for self.

get_fitted_params([deep])

Get fitted parameters.

get_param_defaults()

Get object's parameter defaults.

get_param_names()

Get object's parameter names.

get_params([deep])

Get the parameters of the pipeline.

get_tag(tag_name[, tag_value_default, ...])

Get tag value from estimator class and dynamic tag overrides.

get_tags()

Get tags from estimator class and dynamic tag overrides.

get_test_params([parameter_set])

Return testing parameter settings for the estimator.

is_composite()

Check if the object is composed of other BaseObjects.

load_from_path(serial)

Load object from file location.

load_from_serial(serial)

Load object from serialized memory container.

predict([X, y])

Perform a prediction.

predict_interval(X[, y])

Perform an interval prediction.

predict_quantiles(X[, y])

Perform a quantile prediction.

predict_residuals(X[, y])

Perform a residuals prediction.

reset()

Reset the object to a clean post-init state.

save([path, serialization_format])

Save serialized self to bytes-like object or to (.zip) file.

set_config(**config_dict)

Set config flags to given values.

set_params(**params)

Set the parameters of this estimator.

set_random_state([random_state, deep, ...])

Set random_state pseudo-random seed parameters for self.

set_tags(**tag_dict)

Set dynamic tags to given values.

transform(X[, y])

Call transform on each element in the graph pipeline.

get_params(deep=True)[source]#

Get the parameters of the pipeline.

Parameters:
deepboolean, optional, default=True

If True, will return the parameters for this estimator and contained sub-objects that are estimators

Returns:
paramsdict, parameter names mapped to their values.
set_params(**params)[source]#

Set the parameters of this estimator.

Valid parameter keys can be listed with get_params(). Note if steps is provided the other parameters are ignored.

Parameters:
paramsdict, parameter names mapped to their values.
Returns:
selfPipeline, this estimator
add_step(skobject, name, edges, method=None, **kwargs)[source]#

Add a new skobject to the pipeline.

This method changes the structure of the pipeline. It looks up if the cloned skobject already exists. If not it clones the skobject. Atfterwards, with the cloned skobject a new pipeline step is created, with the provided name. The input of this new step is specified by the edges dict.

Parameters:
skobject: ``sktime`` object, the skobject that should be added to the pipeline
name: str, the name of the step that is created
edges: dict, a dict with string keys to string values. Identifying the
predcessors. The keys of the edges dict specify to which argument

of fit/predict/.. the output of the predecessors (the value of the dict specifies the predecessors name) should be passed.

method: str, an optional argument allowing to determine the method that

should be executed when the pipeline calls the provided skobject. If not specified, the pipeline selects the method based on the method that is called on the pipeline (e.g., predict, transform, ..)

kwargs: additional kwargs are parameters that are provided to the

skobject if fit/predict/.. is called.

fit(X=None, y=None, **kwargs)[source]#

Fit graph pipeline to training data.

Parameters:
Xtime series in sktime compatible format, optional (default=None)

Exogeneous time series to fit to

ytime series in sktime compatible data container format

Time series to which to fit the pipeline.

kwargsadditional key word arguments that might be passed to skobjects in the

pipeline if they have a parameter that corresponds to a key of kwargs.

Notes

Changes state by creating a fitted model that updates attributes ending in “_” in the skobjects of the pipeline and sets is_fitted flag to True.

fit_transform(X, y=None, **kwargs)[source]#

Fit graph pipeline to training data and call transform afterward.

Parameters:
Xtime series in sktime compatible format, optional (default=None)

Exogeneous time series to fit to

ytime series in sktime compatible data container format

Time series to which to fit the pipeline.

kwargsadditional key word arguments that might be passed to skobjects in the

pipeline if they have a parameter that corresponds to a key of kwargs.

Raises:
MethodNotImplementedError if a step in the pipeline does not implement
transform

Notes

Changes state by creating a fitted model that updates attributes ending in “_” in the skobjects of the pipeline and sets is_fitted flag to True.

transform(X, y=None, **kwargs)[source]#

Call transform on each element in the graph pipeline.

Parameters:
Xtime series in sktime compatible format, optional (default=None)

Exogeneous time series to fit to

ytime series in sktime compatible data container format

Time series to which to fit the pipeline.

kwargsadditional key word arguments that might be passed to skobjects in the

pipeline if they have a parameter that corresponds to a key of kwargs.

Raises:
MethodNotImplementedError if a step in the pipeline does not implement

transform

predict(X=None, y=None, **kwargs)[source]#

Perform a prediction.

I.e. calls predict or transform on each element in the graph pipeline.

Parameters:
Xtime series in sktime compatible format, optional (default=None)

Exogeneous time series to fit to

ytime series in sktime compatible data container format

Time series to which to fit the pipeline.

kwargsadditional key word arguments that might be passed to skobjects in the

pipeline if they have a parameter that corresponds to a key of kwargs.

Raises:
MethodNotImplementedError if a step in the pipeline does not implement
transform or predict
predict_interval(X, y=None, **kwargs)[source]#

Perform an interval prediction.

I.e. calls predict, predict_interval, or transform on each element in the graph pipeline.

Parameters:
Xtime series in sktime compatible format, optional (default=None)

Exogeneous time series to fit to

ytime series in sktime compatible data container format

Time series to which to fit the pipeline.

kwargsadditional key word arguments that might be passed to skobjects in the

pipeline if they have a parameter that corresponds to a key of kwargs.

Raises:
MethodNotImplementedError if a step in the pipeline does not implement
transform, predict, or predict_interval
predict_quantiles(X, y=None, **kwargs)[source]#

Perform a quantile prediction.

I.e. calls predict, predict_quantiles, or transform on each element in the graph pipeline.

Parameters:
Xtime series in sktime compatible format, optional (default=None)

Exogeneous time series to fit to

ytime series in sktime compatible data container format

Time series to which to fit the pipeline.

kwargsadditional key word arguments that might be passed to skobjects in the

pipeline if they have a parameter that corresponds to a key of kwargs.

Raises:
MethodNotImplementedError if a step in the pipeline does not implement
transform, predict, or predict_quantiles
predict_residuals(X, y=None, **kwargs)[source]#

Perform a residuals prediction.

I.e. calls predict, predict_residuals, or transform on each element in the graph pipeline.

Parameters:
Xtime series in sktime compatible format, optional (default=None)

Exogeneous time series to fit to

ytime series in sktime compatible data container format

Time series to which to fit the pipeline.

kwargsadditional key word arguments that might be passed to skobjects in the

pipeline if they have a parameter that corresponds to a key of kwargs.

Raises:
MethodNotImplementedError if a step in the pipeline does not implement

transform, predict, or predict_residuals

classmethod get_test_params(parameter_set='default')[source]#

Return testing parameter settings for the estimator.

Parameters:
parameter_setstr, default=”default”

Name of the set of test parameters to return, for use in tests. If no special parameters are defined for a value, will return "default" set. There are currently no reserved values for forecasters.

Returns:
paramsdict or list of dict, default = {}

Parameters to create testing instances of the class Each dict are parameters to construct an “interesting” test instance, i.e., MyClass(**params) or MyClass(**params[i]) creates a valid test instance. create_test_instance uses the first (or only) dictionary in params

check_is_fitted()[source]#

Check if the estimator has been fitted.

Raises:
NotFittedError

If the estimator has not been fitted yet.

clone()[source]#

Obtain a clone of the object with same hyper-parameters.

A clone is a different object without shared references, in post-init state. This function is equivalent to returning sklearn.clone of self.

Raises:
RuntimeError if the clone is non-conforming, due to faulty __init__.

Notes

If successful, equal in value to type(self)(**self.get_params(deep=False)).

clone_tags(estimator, tag_names=None)[source]#

Clone tags from another estimator as dynamic override.

Parameters:
estimatorestimator inheriting from :class:BaseEstimator
tag_namesstr or list of str, default = None

Names of tags to clone. If None then all tags in estimator are used as tag_names.

Returns:
Self

Reference to self.

Notes

Changes object state by setting tag values in tag_set from estimator as dynamic tags in self.

classmethod create_test_instance(parameter_set='default')[source]#

Construct Estimator instance if possible.

Parameters:
parameter_setstr, default=”default”

Name of the set of test parameters to return, for use in tests. If no special parameters are defined for a value, will return “default” set.

Returns:
instanceinstance of the class with default parameters

Notes

get_test_params can return dict or list of dict. This function takes first or single dict that get_test_params returns, and constructs the object with that.

classmethod create_test_instances_and_names(parameter_set='default')[source]#

Create list of all test instances and a list of names for them.

Parameters:
parameter_setstr, default=”default”

Name of the set of test parameters to return, for use in tests. If no special parameters are defined for a value, will return “default” set.

Returns:
objslist of instances of cls

i-th instance is cls(**cls.get_test_params()[i])

nameslist of str, same length as objs

i-th element is name of i-th instance of obj in tests convention is {cls.__name__}-{i} if more than one instance otherwise {cls.__name__}

classmethod get_class_tag(tag_name, tag_value_default=None)[source]#

Get a class tag’s value.

Does not return information from dynamic tags (set via set_tags or clone_tags) that are defined on instances.

Parameters:
tag_namestr

Name of tag value.

tag_value_defaultany

Default/fallback value if tag is not found.

Returns:
tag_value

Value of the tag_name tag in self. If not found, returns tag_value_default.

classmethod get_class_tags()[source]#

Get class tags from the class and all its parent classes.

Retrieves tag: value pairs from _tags class attribute. Does not return information from dynamic tags (set via set_tags or clone_tags) that are defined on instances.

Returns:
collected_tagsdict

Dictionary of class tag name: tag value pairs. Collected from _tags class attribute via nested inheritance.

get_config()[source]#

Get config flags for self.

Returns:
config_dictdict

Dictionary of config name : config value pairs. Collected from _config class attribute via nested inheritance and then any overrides and new tags from _onfig_dynamic object attribute.

get_fitted_params(deep=True)[source]#

Get fitted parameters.

State required:

Requires state to be “fitted”.

Parameters:
deepbool, default=True

Whether to return fitted parameters of components.

  • If True, will return a dict of parameter name : value for this object, including fitted parameters of fittable components (= BaseEstimator-valued parameters).

  • If False, will return a dict of parameter name : value for this object, but not include fitted parameters of components.

Returns:
fitted_paramsdict with str-valued keys

Dictionary of fitted parameters, paramname : paramvalue keys-value pairs include:

  • always: all fitted parameters of this object, as via get_param_names values are fitted parameter value for that key, of this object

  • if deep=True, also contains keys/value pairs of component parameters parameters of components are indexed as [componentname]__[paramname] all parameters of componentname appear as paramname with its value

  • if deep=True, also contains arbitrary levels of component recursion, e.g., [componentname]__[componentcomponentname]__[paramname], etc

classmethod get_param_defaults()[source]#

Get object’s parameter defaults.

Returns:
default_dict: dict[str, Any]

Keys are all parameters of cls that have a default defined in __init__ values are the defaults, as defined in __init__.

classmethod get_param_names()[source]#

Get object’s parameter names.

Returns:
param_names: list[str]

Alphabetically sorted list of parameter names of cls.

get_tag(tag_name, tag_value_default=None, raise_error=True)[source]#

Get tag value from estimator class and dynamic tag overrides.

Parameters:
tag_namestr

Name of tag to be retrieved

tag_value_defaultany type, optional; default=None

Default/fallback value if tag is not found

raise_errorbool

whether a ValueError is raised when the tag is not found

Returns:
tag_valueAny

Value of the tag_name tag in self. If not found, returns an error if raise_error is True, otherwise it returns tag_value_default.

Raises:
ValueError if raise_error is True i.e. if tag_name is not in
self.get_tags().keys()
get_tags()[source]#

Get tags from estimator class and dynamic tag overrides.

Returns:
collected_tagsdict

Dictionary of tag name : tag value pairs. Collected from _tags class attribute via nested inheritance and then any overrides and new tags from _tags_dynamic object attribute.

is_composite()[source]#

Check if the object is composed of other BaseObjects.

A composite object is an object which contains objects, as parameters. Called on an instance, since this may differ by instance.

Returns:
composite: bool

Whether an object has any parameters whose values are BaseObjects.

property is_fitted[source]#

Whether fit has been called.

classmethod load_from_path(serial)[source]#

Load object from file location.

Parameters:
serialresult of ZipFile(path).open(“object)
Returns:
deserialized self resulting in output at path, of cls.save(path)
classmethod load_from_serial(serial)[source]#

Load object from serialized memory container.

Parameters:
serial1st element of output of cls.save(None)
Returns:
deserialized self resulting in output serial, of cls.save(None)
reset()[source]#

Reset the object to a clean post-init state.

Using reset, runs __init__ with current values of hyper-parameters (result of get_params). This Removes any object attributes, except:

  • hyper-parameters = arguments of __init__

  • object attributes containing double-underscores, i.e., the string “__”

Class and object methods, and class attributes are also unaffected.

Returns:
self

Instance of class reset to a clean post-init state but retaining the current hyper-parameter values.

Notes

Equivalent to sklearn.clone but overwrites self. After self.reset() call, self is equal in value to type(self)(**self.get_params(deep=False))

save(path=None, serialization_format='pickle')[source]#

Save serialized self to bytes-like object or to (.zip) file.

Behaviour: if path is None, returns an in-memory serialized self if path is a file location, stores self at that location as a zip file

saved files are zip files with following contents: _metadata - contains class of self, i.e., type(self) _obj - serialized self. This class uses the default serialization (pickle).

Parameters:
pathNone or file location (str or Path)

if None, self is saved to an in-memory object if file location, self is saved to that file location. If:

path=”estimator” then a zip file estimator.zip will be made at cwd. path=”/home/stored/estimator” then a zip file estimator.zip will be stored in /home/stored/.

serialization_format: str, default = “pickle”

Module to use for serialization. The available options are “pickle” and “cloudpickle”. Note that non-default formats might require installation of other soft dependencies.

Returns:
if path is None - in-memory serialized self
if path is file location - ZipFile with reference to the file
set_config(**config_dict)[source]#

Set config flags to given values.

Parameters:
config_dictdict

Dictionary of config name : config value pairs. Valid configs, values, and their meaning is listed below:

displaystr, “diagram” (default), or “text”

how jupyter kernels display instances of self

  • “diagram” = html box diagram representation

  • “text” = string printout

print_changed_onlybool, default=True

whether printing of self lists only self-parameters that differ from defaults (False), or all parameter names and values (False). Does not nest, i.e., only affects self and not component estimators.

warningsstr, “on” (default), or “off”

whether to raise warnings, affects warnings from sktime only

  • “on” = will raise warnings from sktime

  • “off” = will not raise warnings from sktime

backend:parallelstr, optional, default=”None”

backend to use for parallelization when broadcasting/vectorizing, one of

  • “None”: executes loop sequentally, simple list comprehension

  • “loky”, “multiprocessing” and “threading”: uses joblib.Parallel

  • “joblib”: custom and 3rd party joblib backends, e.g., spark

  • “dask”: uses dask, requires dask package in environment

backend:parallel:paramsdict, optional, default={} (no parameters passed)

additional parameters passed to the parallelization backend as config. Valid keys depend on the value of backend:parallel:

  • “None”: no additional parameters, backend_params is ignored

  • “loky”, “multiprocessing” and “threading”: default joblib backends any valid keys for joblib.Parallel can be passed here, e.g., n_jobs, with the exception of backend which is directly controlled by backend. If n_jobs is not passed, it will default to -1, other parameters will default to joblib defaults.

  • “joblib”: custom and 3rd party joblib backends, e.g., spark. Any valid keys for joblib.Parallel can be passed here, e.g., n_jobs, backend must be passed as a key of backend_params in this case. If n_jobs is not passed, it will default to -1, other parameters will default to joblib defaults.

  • “dask”: any valid keys for dask.compute can be passed, e.g., scheduler

Returns:
selfreference to self.

Notes

Changes object state, copies configs in config_dict to self._config_dynamic.

set_random_state(random_state=None, deep=True, self_policy='copy')[source]#

Set random_state pseudo-random seed parameters for self.

Finds random_state named parameters via estimator.get_params, and sets them to integers derived from random_state via set_params. These integers are sampled from chain hashing via sample_dependent_seed, and guarantee pseudo-random independence of seeded random generators.

Applies to random_state parameters in estimator depending on self_policy, and remaining component estimators if and only if deep=True.

Note: calls set_params even if self does not have a random_state, or none of the components have a random_state parameter. Therefore, set_random_state will reset any scikit-base estimator, even those without a random_state parameter.

Parameters:
random_stateint, RandomState instance or None, default=None

Pseudo-random number generator to control the generation of the random integers. Pass int for reproducible output across multiple function calls.

deepbool, default=True

Whether to set the random state in sub-estimators. If False, will set only self’s random_state parameter, if exists. If True, will set random_state parameters in sub-estimators as well.

self_policystr, one of {“copy”, “keep”, “new”}, default=”copy”
  • “copy” : estimator.random_state is set to input random_state

  • “keep” : estimator.random_state is kept as is

  • “new” : estimator.random_state is set to a new random state,

derived from input random_state, and in general different from it

Returns:
selfreference to self
set_tags(**tag_dict)[source]#

Set dynamic tags to given values.

Parameters:
**tag_dictdict

Dictionary of tag name: tag value pairs.

Returns:
Self

Reference to self.

Notes

Changes object state by setting tag values in tag_dict as dynamic tags in self.