Pipeline#
- class Pipeline(steps=None)[source]#
Implementation of a Graphpipeline.
This class is a generalized graph pipeline. Generalized means that it can contain forecasters, classifiers, etc. The graph pipeline mean that the structure is not linear. I.e., the each element of the pipeline can be the input of multiple other steps and not only one successors.
fit(y, X, *args)
- changes state by runningfit
on all sktime estimators andtransformers in the pipeline. Note that depending on the sktime estimators and transformers that are added to the pipeline, different keywords are required. E.g., if a forecaster is part of the pipeline, a forecast horizon (fh) should be provided.
predict(X, *args)
- Results in calling predict on the estimators in the pipelineand transform or the specified method on the other skobjects in the pipeline. Depending on the skobject added to the pipeline, you might need to pass additional parameters to predict.
predict_interval(X, fh)
,predict_quantiles(X, fh)
- aspredict(X, fh)
,with
predict_interval
orpredict_quantiles
substituted forpredict
.
predict_var
,predict_proba
- are currently not supportedget_params
,set_params
usessklearn
compatible nesting interfaceadd_step(skobject, name, edges, method, **kwargs)
- adds a skobject to thepipeline and setting the name as identifier and the steps specified with edges as input steps (predecessors). Thereby the method that should be called can be overridden using the method kwarg. Further provided kwargs are directly provided to the skobject if it is called.
- Parameters:
- param stepsA list of dicts that specify the steps of the pipeline. Further
steps can be added by using add_step method. The dict requires the following keys:
- skobject:
sktime
object, the skobject that should be added to the pipeline
- skobject:
name: str, the name of the step that is created
- edges: dict, a dict with string keys to string values. Identifying the
predcessors. The keys of the edges dict specify to which argument of fit/predict/.. the output of the predecessors (the value of the dict specifies the predecessors name) should be passed.
- method: str, an optional argument allowing to determine the method that
should be executed when the pipeline calls the provided skobject. If not specified, the pipeline selects the method based on the method that is called on the pipeline (e.g., predict, transform, ..)
- kwargs: additional kwargs are parameters that are provided to the
skobject if fit/predict/.. is called.
- Attributes:
- attribute id_to_true_ida dict with integer keys and values,
mapping the python object id to skobject ids.
- attribute id_to_obja dict with integer keys and weak references of
skobjects as values. The values are the weak references of the skobjects provided to the
add_step
method. We store the weak references to avoid that the id of the object is reassigned if the user deletes all it references to the object.- attribute model_dicta dict with integer keys and skobject values.
This is a mapping of the id of the skobjects provided to
add_step
to the cloned skobject.- attribute counterinteger, counts the number of steps in the pipeline.
- attribute stepsa dict with string keys and step object values.
The key is the name that is specified if a skobject is added to the pipeline.
- attribute kwargsa dict with str keys and object values. Stores all kwargs
that are specified and might be passed to skobjects in the pipeline.
References
[1]@article{heidrich2021pywatts, title={pyWATTS: Python workflow automation tool for time series}, author={Heidrich, Benedikt and Bartschat, Andreas and Turowski, Marian and
Neumann, Oliver and Phipps, Kaleb and Meisenbacher, Stefan and Schmieder, Kai and Ludwig, Nicole and Mikut, Ralf and Hagenmeyer, Veit},
journal={arXiv preprint arXiv:2106.10157}, year={2021}
}
Examples
>>> from sktime.classification.distance_based import KNeighborsTimeSeriesClassifier >>> from sktime.datasets import load_arrow_head, load_longley >>> from sktime.split import temporal_train_test_split >>> from sktime.forecasting.naive import NaiveForecaster >>> from sktime.pipeline import Pipeline >>> from sktime.transformations.compose import Id >>> from sktime.transformations.series.boxcox import BoxCoxTransformer >>> from sktime.transformations.series.exponent import ExponentTransformer
Example 1: Simple sequential pipeline of transformers using the generalized non-sequential pipeline implementation
>>> y, X = load_longley() >>> general_pipeline = Pipeline() >>> for step in [ ... {"skobject": ExponentTransformer(), "name": "exp", "edges": {"X": "X"}}, ... {"skobject": BoxCoxTransformer(), "name": "box", "edges": {"X": "exp"}}, ... ]: ... general_pipeline = general_pipeline.add_step(**step) >>> general_pipeline.fit(X=X) >>> result_general = general_pipeline.transform(X)
- Example 2: Classification sequential pipeline using the generalized
non-sequential pipeline implementation
>>> X, y = load_arrow_head(split="train", return_X_y=True) >>> general_pipeline = Pipeline() >>> for step in [ ... {"skobject": ExponentTransformer(), "name": "exp", "edges": {"X": "X"}}, ... {"skobject": KNeighborsTimeSeriesClassifier(), ... "name": "knnclassifier", ... "edges": {"X": "exp", "y": "y"}}]: ... general_pipeline = general_pipeline.add_step(**step) >>> general_pipeline.fit(X=X, y=y) >>> result_general = general_pipeline.predict(X)
Example 3: Forecasting pipeline with exogenous features using the generalized non-sequential pipeline implementation
>>> y, X = load_longley() >>> y_train, y_test, X_train, X_test = temporal_train_test_split(y, X) >>> general_pipeline = Pipeline() >>> for step in [ ... {"skobject": ExponentTransformer(), "name": "exp", "edges": {"X": "X"}}, ... {"skobject": NaiveForecaster(), ... "name": "SARIMAX", ... "edges": {"X": "exp", "y": "y"}}]: ... general_pipeline = general_pipeline.add_step(**step) >>> general_pipeline.fit(y=y_train, X=X_train, fh=[1, 2, 3, 4]) >>> result_general = general_pipeline.predict(X=X_test)
Acknowledgements This graphical pipeline is inspired by pyWATTS that is developed by the Institute for Automation and Applied Informatics (IAI) at Karlsruhe Institute of Technology. The implementation is supported by IAI and the author benHeid is funded by HelmholtzAI. Furthermore, we also want to credit @ViktorKaz for his independent pipeline design that is similar to this one.
Methods
add_step
(skobject, name, edges[, method])Add a new skobject to the pipeline.
Check if the estimator has been fitted.
clone
()Obtain a clone of the object with same hyper-parameters.
clone_tags
(estimator[, tag_names])Clone tags from another estimator as dynamic override.
create_test_instance
([parameter_set])Construct Estimator instance if possible.
create_test_instances_and_names
([parameter_set])Create list of all test instances and a list of names for them.
fit
([X, y])Fit graph pipeline to training data.
fit_transform
(X[, y])Fit graph pipeline to training data and call transform afterward.
get_class_tag
(tag_name[, tag_value_default])Get a class tag's value.
Get class tags from the class and all its parent classes.
Get config flags for self.
get_fitted_params
([deep])Get fitted parameters.
Get object's parameter defaults.
Get object's parameter names.
get_params
([deep])Get the parameters of the pipeline.
get_tag
(tag_name[, tag_value_default, ...])Get tag value from estimator class and dynamic tag overrides.
get_tags
()Get tags from estimator class and dynamic tag overrides.
get_test_params
([parameter_set])Return testing parameter settings for the estimator.
Check if the object is composed of other BaseObjects.
load_from_path
(serial)Load object from file location.
load_from_serial
(serial)Load object from serialized memory container.
predict
([X, y])Perform a prediction.
predict_interval
(X[, y])Perform an interval prediction.
predict_quantiles
(X[, y])Perform a quantile prediction.
predict_residuals
(X[, y])Perform a residuals prediction.
reset
()Reset the object to a clean post-init state.
save
([path, serialization_format])Save serialized self to bytes-like object or to (.zip) file.
set_config
(**config_dict)Set config flags to given values.
set_params
(**params)Set the parameters of this estimator.
set_random_state
([random_state, deep, ...])Set random_state pseudo-random seed parameters for self.
set_tags
(**tag_dict)Set dynamic tags to given values.
transform
(X[, y])Call transform on each element in the graph pipeline.
- get_params(deep=True)[source]#
Get the parameters of the pipeline.
- Parameters:
- deepboolean, optional, default=True
If True, will return the parameters for this estimator and contained sub-objects that are estimators
- Returns:
- paramsdict, parameter names mapped to their values.
- set_params(**params)[source]#
Set the parameters of this estimator.
Valid parameter keys can be listed with get_params(). Note if steps is provided the other parameters are ignored.
- Parameters:
- paramsdict, parameter names mapped to their values.
- Returns:
- selfPipeline, this estimator
- add_step(skobject, name, edges, method=None, **kwargs)[source]#
Add a new skobject to the pipeline.
This method changes the structure of the pipeline. It looks up if the cloned skobject already exists. If not it clones the skobject. Atfterwards, with the cloned skobject a new pipeline step is created, with the provided name. The input of this new step is specified by the edges dict.
- Parameters:
- skobject: ``sktime`` object, the skobject that should be added to the pipeline
- name: str, the name of the step that is created
- edges: dict, a dict with string keys to string values. Identifying the
- predcessors. The keys of the edges dict specify to which argument
of fit/predict/.. the output of the predecessors (the value of the dict specifies the predecessors name) should be passed.
- method: str, an optional argument allowing to determine the method that
should be executed when the pipeline calls the provided skobject. If not specified, the pipeline selects the method based on the method that is called on the pipeline (e.g., predict, transform, ..)
- kwargs: additional kwargs are parameters that are provided to the
skobject if fit/predict/.. is called.
- fit(X=None, y=None, **kwargs)[source]#
Fit graph pipeline to training data.
- Parameters:
- Xtime series in sktime compatible format, optional (default=None)
Exogeneous time series to fit to
- ytime series in sktime compatible data container format
Time series to which to fit the pipeline.
- kwargsadditional key word arguments that might be passed to skobjects in the
pipeline if they have a parameter that corresponds to a key of kwargs.
Notes
Changes state by creating a fitted model that updates attributes ending in “_” in the skobjects of the pipeline and sets is_fitted flag to True.
- fit_transform(X, y=None, **kwargs)[source]#
Fit graph pipeline to training data and call transform afterward.
- Parameters:
- Xtime series in sktime compatible format, optional (default=None)
Exogeneous time series to fit to
- ytime series in sktime compatible data container format
Time series to which to fit the pipeline.
- kwargsadditional key word arguments that might be passed to skobjects in the
pipeline if they have a parameter that corresponds to a key of kwargs.
- Raises:
- MethodNotImplementedError if a step in the pipeline does not implement
transform
Notes
Changes state by creating a fitted model that updates attributes ending in “_” in the skobjects of the pipeline and sets is_fitted flag to True.
- transform(X, y=None, **kwargs)[source]#
Call transform on each element in the graph pipeline.
- Parameters:
- Xtime series in sktime compatible format, optional (default=None)
Exogeneous time series to fit to
- ytime series in sktime compatible data container format
Time series to which to fit the pipeline.
- kwargsadditional key word arguments that might be passed to skobjects in the
pipeline if they have a parameter that corresponds to a key of kwargs.
- Raises:
- MethodNotImplementedError if a step in the pipeline does not implement
transform
- predict(X=None, y=None, **kwargs)[source]#
Perform a prediction.
I.e. calls predict or transform on each element in the graph pipeline.
- Parameters:
- Xtime series in sktime compatible format, optional (default=None)
Exogeneous time series to fit to
- ytime series in sktime compatible data container format
Time series to which to fit the pipeline.
- kwargsadditional key word arguments that might be passed to skobjects in the
pipeline if they have a parameter that corresponds to a key of kwargs.
- Raises:
- MethodNotImplementedError if a step in the pipeline does not implement
transform
orpredict
- predict_interval(X, y=None, **kwargs)[source]#
Perform an interval prediction.
I.e. calls predict, predict_interval, or transform on each element in the graph pipeline.
- Parameters:
- Xtime series in sktime compatible format, optional (default=None)
Exogeneous time series to fit to
- ytime series in sktime compatible data container format
Time series to which to fit the pipeline.
- kwargsadditional key word arguments that might be passed to skobjects in the
pipeline if they have a parameter that corresponds to a key of kwargs.
- Raises:
- MethodNotImplementedError if a step in the pipeline does not implement
transform
,predict
, orpredict_interval
- predict_quantiles(X, y=None, **kwargs)[source]#
Perform a quantile prediction.
I.e. calls predict, predict_quantiles, or transform on each element in the graph pipeline.
- Parameters:
- Xtime series in sktime compatible format, optional (default=None)
Exogeneous time series to fit to
- ytime series in sktime compatible data container format
Time series to which to fit the pipeline.
- kwargsadditional key word arguments that might be passed to skobjects in the
pipeline if they have a parameter that corresponds to a key of kwargs.
- Raises:
- MethodNotImplementedError if a step in the pipeline does not implement
transform
,predict
, orpredict_quantiles
- predict_residuals(X, y=None, **kwargs)[source]#
Perform a residuals prediction.
I.e. calls predict, predict_residuals, or transform on each element in the graph pipeline.
- Parameters:
- Xtime series in sktime compatible format, optional (default=None)
Exogeneous time series to fit to
- ytime series in sktime compatible data container format
Time series to which to fit the pipeline.
- kwargsadditional key word arguments that might be passed to skobjects in the
pipeline if they have a parameter that corresponds to a key of kwargs.
- Raises:
- MethodNotImplementedError if a step in the pipeline does not implement
transform
,predict
, orpredict_residuals
- classmethod get_test_params(parameter_set='default')[source]#
Return testing parameter settings for the estimator.
- Parameters:
- parameter_setstr, default=”default”
Name of the set of test parameters to return, for use in tests. If no special parameters are defined for a value, will return
"default"
set. There are currently no reserved values for forecasters.
- Returns:
- paramsdict or list of dict, default = {}
Parameters to create testing instances of the class Each dict are parameters to construct an “interesting” test instance, i.e.,
MyClass(**params)
orMyClass(**params[i])
creates a valid test instance.create_test_instance
uses the first (or only) dictionary inparams
- check_is_fitted()[source]#
Check if the estimator has been fitted.
- Raises:
- NotFittedError
If the estimator has not been fitted yet.
- clone()[source]#
Obtain a clone of the object with same hyper-parameters.
A clone is a different object without shared references, in post-init state. This function is equivalent to returning sklearn.clone of self.
- Raises:
- RuntimeError if the clone is non-conforming, due to faulty
__init__
.
- RuntimeError if the clone is non-conforming, due to faulty
Notes
If successful, equal in value to
type(self)(**self.get_params(deep=False))
.
- clone_tags(estimator, tag_names=None)[source]#
Clone tags from another estimator as dynamic override.
- Parameters:
- estimatorestimator inheriting from :class:BaseEstimator
- tag_namesstr or list of str, default = None
Names of tags to clone. If None then all tags in estimator are used as tag_names.
- Returns:
- Self
Reference to self.
Notes
Changes object state by setting tag values in tag_set from estimator as dynamic tags in self.
- classmethod create_test_instance(parameter_set='default')[source]#
Construct Estimator instance if possible.
- Parameters:
- parameter_setstr, default=”default”
Name of the set of test parameters to return, for use in tests. If no special parameters are defined for a value, will return “default” set.
- Returns:
- instanceinstance of the class with default parameters
Notes
get_test_params can return dict or list of dict. This function takes first or single dict that get_test_params returns, and constructs the object with that.
- classmethod create_test_instances_and_names(parameter_set='default')[source]#
Create list of all test instances and a list of names for them.
- Parameters:
- parameter_setstr, default=”default”
Name of the set of test parameters to return, for use in tests. If no special parameters are defined for a value, will return “default” set.
- Returns:
- objslist of instances of cls
i-th instance is cls(**cls.get_test_params()[i])
- nameslist of str, same length as objs
i-th element is name of i-th instance of obj in tests convention is {cls.__name__}-{i} if more than one instance otherwise {cls.__name__}
- classmethod get_class_tag(tag_name, tag_value_default=None)[source]#
Get a class tag’s value.
Does not return information from dynamic tags (set via set_tags or clone_tags) that are defined on instances.
- Parameters:
- tag_namestr
Name of tag value.
- tag_value_defaultany
Default/fallback value if tag is not found.
- Returns:
- tag_value
Value of the tag_name tag in self. If not found, returns tag_value_default.
- classmethod get_class_tags()[source]#
Get class tags from the class and all its parent classes.
Retrieves tag: value pairs from _tags class attribute. Does not return information from dynamic tags (set via set_tags or clone_tags) that are defined on instances.
- Returns:
- collected_tagsdict
Dictionary of class tag name: tag value pairs. Collected from _tags class attribute via nested inheritance.
- get_config()[source]#
Get config flags for self.
- Returns:
- config_dictdict
Dictionary of config name : config value pairs. Collected from _config class attribute via nested inheritance and then any overrides and new tags from _onfig_dynamic object attribute.
- get_fitted_params(deep=True)[source]#
Get fitted parameters.
- State required:
Requires state to be “fitted”.
- Parameters:
- deepbool, default=True
Whether to return fitted parameters of components.
If True, will return a dict of parameter name : value for this object, including fitted parameters of fittable components (= BaseEstimator-valued parameters).
If False, will return a dict of parameter name : value for this object, but not include fitted parameters of components.
- Returns:
- fitted_paramsdict with str-valued keys
Dictionary of fitted parameters, paramname : paramvalue keys-value pairs include:
always: all fitted parameters of this object, as via
get_param_names
values are fitted parameter value for that key, of this objectif
deep=True
, also contains keys/value pairs of component parameters parameters of components are indexed as[componentname]__[paramname]
all parameters ofcomponentname
appear asparamname
with its valueif
deep=True
, also contains arbitrary levels of component recursion, e.g.,[componentname]__[componentcomponentname]__[paramname]
, etc
- classmethod get_param_defaults()[source]#
Get object’s parameter defaults.
- Returns:
- default_dict: dict[str, Any]
Keys are all parameters of cls that have a default defined in __init__ values are the defaults, as defined in __init__.
- classmethod get_param_names()[source]#
Get object’s parameter names.
- Returns:
- param_names: list[str]
Alphabetically sorted list of parameter names of cls.
- get_tag(tag_name, tag_value_default=None, raise_error=True)[source]#
Get tag value from estimator class and dynamic tag overrides.
- Parameters:
- tag_namestr
Name of tag to be retrieved
- tag_value_defaultany type, optional; default=None
Default/fallback value if tag is not found
- raise_errorbool
whether a ValueError is raised when the tag is not found
- Returns:
- tag_valueAny
Value of the tag_name tag in self. If not found, returns an error if raise_error is True, otherwise it returns tag_value_default.
- Raises:
- ValueError if raise_error is True i.e. if tag_name is not in
- self.get_tags().keys()
- get_tags()[source]#
Get tags from estimator class and dynamic tag overrides.
- Returns:
- collected_tagsdict
Dictionary of tag name : tag value pairs. Collected from _tags class attribute via nested inheritance and then any overrides and new tags from _tags_dynamic object attribute.
- is_composite()[source]#
Check if the object is composed of other BaseObjects.
A composite object is an object which contains objects, as parameters. Called on an instance, since this may differ by instance.
- Returns:
- composite: bool
Whether an object has any parameters whose values are BaseObjects.
- classmethod load_from_path(serial)[source]#
Load object from file location.
- Parameters:
- serialresult of ZipFile(path).open(“object)
- Returns:
- deserialized self resulting in output at
path
, ofcls.save(path)
- deserialized self resulting in output at
- classmethod load_from_serial(serial)[source]#
Load object from serialized memory container.
- Parameters:
- serial1st element of output of
cls.save(None)
- serial1st element of output of
- Returns:
- deserialized self resulting in output
serial
, ofcls.save(None)
- deserialized self resulting in output
- reset()[source]#
Reset the object to a clean post-init state.
Using reset, runs __init__ with current values of hyper-parameters (result of get_params). This Removes any object attributes, except:
hyper-parameters = arguments of __init__
object attributes containing double-underscores, i.e., the string “__”
Class and object methods, and class attributes are also unaffected.
- Returns:
- self
Instance of class reset to a clean post-init state but retaining the current hyper-parameter values.
Notes
Equivalent to sklearn.clone but overwrites self. After self.reset() call, self is equal in value to type(self)(**self.get_params(deep=False))
- save(path=None, serialization_format='pickle')[source]#
Save serialized self to bytes-like object or to (.zip) file.
Behaviour: if
path
is None, returns an in-memory serialized self ifpath
is a file location, stores self at that location as a zip filesaved files are zip files with following contents: _metadata - contains class of self, i.e., type(self) _obj - serialized self. This class uses the default serialization (pickle).
- Parameters:
- pathNone or file location (str or Path)
if None, self is saved to an in-memory object if file location, self is saved to that file location. If:
path=”estimator” then a zip file
estimator.zip
will be made at cwd. path=”/home/stored/estimator” then a zip fileestimator.zip
will be stored in/home/stored/
.- serialization_format: str, default = “pickle”
Module to use for serialization. The available options are “pickle” and “cloudpickle”. Note that non-default formats might require installation of other soft dependencies.
- Returns:
- if
path
is None - in-memory serialized self - if
path
is file location - ZipFile with reference to the file
- if
- set_config(**config_dict)[source]#
Set config flags to given values.
- Parameters:
- config_dictdict
Dictionary of config name : config value pairs. Valid configs, values, and their meaning is listed below:
- displaystr, “diagram” (default), or “text”
how jupyter kernels display instances of self
“diagram” = html box diagram representation
“text” = string printout
- print_changed_onlybool, default=True
whether printing of self lists only self-parameters that differ from defaults (False), or all parameter names and values (False). Does not nest, i.e., only affects self and not component estimators.
- warningsstr, “on” (default), or “off”
whether to raise warnings, affects warnings from sktime only
“on” = will raise warnings from sktime
“off” = will not raise warnings from sktime
- backend:parallelstr, optional, default=”None”
backend to use for parallelization when broadcasting/vectorizing, one of
“None”: executes loop sequentally, simple list comprehension
“loky”, “multiprocessing” and “threading”: uses
joblib.Parallel
“joblib”: custom and 3rd party
joblib
backends, e.g.,spark
“dask”: uses
dask
, requiresdask
package in environment
- backend:parallel:paramsdict, optional, default={} (no parameters passed)
additional parameters passed to the parallelization backend as config. Valid keys depend on the value of
backend:parallel
:“None”: no additional parameters,
backend_params
is ignored“loky”, “multiprocessing” and “threading”: default
joblib
backends any valid keys forjoblib.Parallel
can be passed here, e.g.,n_jobs
, with the exception ofbackend
which is directly controlled bybackend
. Ifn_jobs
is not passed, it will default to-1
, other parameters will default tojoblib
defaults.“joblib”: custom and 3rd party
joblib
backends, e.g.,spark
. Any valid keys forjoblib.Parallel
can be passed here, e.g.,n_jobs
,backend
must be passed as a key ofbackend_params
in this case. Ifn_jobs
is not passed, it will default to-1
, other parameters will default tojoblib
defaults.“dask”: any valid keys for
dask.compute
can be passed, e.g.,scheduler
- Returns:
- selfreference to self.
Notes
Changes object state, copies configs in config_dict to self._config_dynamic.
- set_random_state(random_state=None, deep=True, self_policy='copy')[source]#
Set random_state pseudo-random seed parameters for self.
Finds
random_state
named parameters viaestimator.get_params
, and sets them to integers derived fromrandom_state
viaset_params
. These integers are sampled from chain hashing viasample_dependent_seed
, and guarantee pseudo-random independence of seeded random generators.Applies to
random_state
parameters inestimator
depending onself_policy
, and remaining component estimators if and only ifdeep=True
.Note: calls
set_params
even ifself
does not have arandom_state
, or none of the components have arandom_state
parameter. Therefore,set_random_state
will reset anyscikit-base
estimator, even those without arandom_state
parameter.- Parameters:
- random_stateint, RandomState instance or None, default=None
Pseudo-random number generator to control the generation of the random integers. Pass int for reproducible output across multiple function calls.
- deepbool, default=True
Whether to set the random state in sub-estimators. If False, will set only
self
’srandom_state
parameter, if exists. If True, will setrandom_state
parameters in sub-estimators as well.- self_policystr, one of {“copy”, “keep”, “new”}, default=”copy”
“copy” :
estimator.random_state
is set to inputrandom_state
“keep” :
estimator.random_state
is kept as is“new” :
estimator.random_state
is set to a new random state,
derived from input
random_state
, and in general different from it
- Returns:
- selfreference to self