from zipline.errors import UnsupportedPipelineOutput
from zipline.utils.input_validation import (
expect_element,
expect_types,
optional,
)
from .graph import ExecutionPlan, TermGraph
from .filters import Filter
from .term import AssetExists, ComputableTerm, Term
[docs]class Pipeline(object):
"""
A Pipeline object represents a collection of named expressions to be
compiled and executed by a PipelineEngine.
A Pipeline has two important attributes: 'columns', a dictionary of named
`Term` instances, and 'screen', a Filter representing criteria for
including an asset in the results of a Pipeline.
To compute a pipeline in the context of a TradingAlgorithm, users must call
``attach_pipeline`` in their ``initialize`` function to register that the
pipeline should be computed each trading day. The outputs of a pipeline on
a given day can be accessed by calling ``pipeline_output`` in
``handle_data`` or ``before_trading_start``.
Parameters
----------
columns : dict, optional
Initial columns.
screen : zipline.pipeline.term.Filter, optional
Initial screen.
"""
__slots__ = ('_columns', '_screen', '__weakref__')
@expect_types(
columns=optional(dict),
screen=optional(Filter),
)
def __init__(self, columns=None, screen=None):
if columns is None:
columns = {}
validate_column = self.validate_column
for column_name, term in columns.items():
validate_column(column_name, term)
if not isinstance(term, ComputableTerm):
raise TypeError(
"Column {column_name!r} contains an invalid pipeline term "
"({term}). Did you mean to append '.latest'?".format(
column_name=column_name, term=term,
)
)
self._columns = columns
self._screen = screen
@property
def columns(self):
"""
The columns registered with this pipeline.
"""
return self._columns
@property
def screen(self):
"""
The screen applied to the rows of this pipeline.
"""
return self._screen
[docs] @expect_types(term=Term, name=str)
def add(self, term, name, overwrite=False):
"""
Add a column.
The results of computing `term` will show up as a column in the
DataFrame produced by running this pipeline.
Parameters
----------
column : zipline.pipeline.Term
A Filter, Factor, or Classifier to add to the pipeline.
name : str
Name of the column to add.
overwrite : bool
Whether to overwrite the existing entry if we already have a column
named `name`.
"""
self.validate_column(name, term)
columns = self.columns
if name in columns:
if overwrite:
self.remove(name)
else:
raise KeyError("Column '{}' already exists.".format(name))
if not isinstance(term, ComputableTerm):
raise TypeError(
"{term} is not a valid pipeline column. Did you mean to "
"append '.latest'?".format(term=term)
)
self._columns[name] = term
[docs] @expect_types(name=str)
def remove(self, name):
"""
Remove a column.
Parameters
----------
name : str
The name of the column to remove.
Raises
------
KeyError
If `name` is not in self.columns.
Returns
-------
removed : zipline.pipeline.term.Term
The removed term.
"""
return self.columns.pop(name)
[docs] @expect_types(screen=Filter, overwrite=(bool, int))
def set_screen(self, screen, overwrite=False):
"""
Set a screen on this Pipeline.
Parameters
----------
filter : zipline.pipeline.Filter
The filter to apply as a screen.
overwrite : bool
Whether to overwrite any existing screen. If overwrite is False
and self.screen is not None, we raise an error.
"""
if self._screen is not None and not overwrite:
raise ValueError(
"set_screen() called with overwrite=False and screen already "
"set.\n"
"If you want to apply multiple filters as a screen use "
"set_screen(filter1 & filter2 & ...).\n"
"If you want to replace the previous screen with a new one, "
"use set_screen(new_filter, overwrite=True)."
)
self._screen = screen
[docs] def to_execution_plan(self,
screen_name,
default_screen,
all_dates,
start_date,
end_date):
"""
Compile into an ExecutionPlan.
Parameters
----------
screen_name : str
Name to supply for self.screen.
default_screen : zipline.pipeline.term.Term
Term to use as a screen if self.screen is None.
all_dates : pd.DatetimeIndex
A calendar of dates to use to calculate starts and ends for each
term.
start_date : pd.Timestamp
The first date of requested output.
end_date : pd.Timestamp
The last date of requested output.
"""
return ExecutionPlan(
self._prepare_graph_terms(screen_name, default_screen),
all_dates,
start_date,
end_date,
)
[docs] def to_simple_graph(self, screen_name, default_screen):
"""
Compile into a simple TermGraph with no extra row metadata.
Parameters
----------
screen_name : str
Name to supply for self.screen.
default_screen : zipline.pipeline.term.Term
Term to use as a screen if self.screen is None.
"""
return TermGraph(
self._prepare_graph_terms(screen_name, default_screen)
)
def _prepare_graph_terms(self, screen_name, default_screen):
"""Helper for to_graph and to_execution_plan."""
columns = self.columns.copy()
screen = self.screen
if screen is None:
screen = default_screen
columns[screen_name] = screen
return columns
[docs] @expect_element(format=('svg', 'png', 'jpeg'))
def show_graph(self, format='svg'):
"""
Render this Pipeline as a DAG.
Parameters
----------
format : {'svg', 'png', 'jpeg'}
Image format to render with. Default is 'svg'.
"""
g = self.to_simple_graph('', AssetExists())
if format == 'svg':
return g.svg
elif format == 'png':
return g.png
elif format == 'jpeg':
return g.jpeg
else:
# We should never get here because of the expect_element decorator
# above.
raise AssertionError("Unknown graph format %r." % format)
@staticmethod
@expect_types(term=Term, column_name=str)
def validate_column(column_name, term):
if term.ndim == 1:
raise UnsupportedPipelineOutput(column_name=column_name, term=term)