"""
filter.py
"""
from itertools import chain
from operator import attrgetter
from numpy import (
any as np_any,
float64,
nan,
nanpercentile,
uint8,
)
from zipline.errors import (
BadPercentileBounds,
NonExistentAssetInTimeFrame,
UnsupportedDataType,
)
from zipline.lib.labelarray import LabelArray
from zipline.lib.rank import is_missing, grouped_masked_is_maximal
from zipline.pipeline.dtypes import (
CLASSIFIER_DTYPES,
FACTOR_DTYPES,
FILTER_DTYPES,
)
from zipline.pipeline.expression import (
BadBinaryOperator,
FILTER_BINOPS,
method_name_for_op,
NumericalExpression,
)
from zipline.pipeline.mixins import (
AliasedMixin,
CustomTermMixin,
DownsampledMixin,
LatestMixin,
PositiveWindowLengthMixin,
RestrictedDTypeMixin,
SingleInputMixin,
StandardOutputs,
)
from zipline.pipeline.term import ComputableTerm, Term
from zipline.utils.input_validation import expect_types
from zipline.utils.memoize import classlazyval
from zipline.utils.numpy_utils import (
bool_dtype,
int64_dtype,
repeat_first_axis,
)
from ..sentinels import NotSpecified
def concat_tuples(*tuples):
"""
Concatenate a sequence of tuples into one tuple.
"""
return tuple(chain(*tuples))
def binary_operator(op):
"""
Factory function for making binary operator methods on a Filter subclass.
Returns a function "binary_operator" suitable for implementing functions
like __and__ or __or__.
"""
# When combining a Filter with a NumericalExpression, we use this
# attrgetter instance to defer to the commuted interpretation of the
# NumericalExpression operator.
commuted_method_getter = attrgetter(method_name_for_op(op, commute=True))
def binary_operator(self, other):
if isinstance(self, NumericalExpression):
self_expr, other_expr, new_inputs = self.build_binary_op(
op, other,
)
return NumExprFilter.create(
"({left}) {op} ({right})".format(
left=self_expr,
op=op,
right=other_expr,
),
new_inputs,
)
elif isinstance(other, NumericalExpression):
# NumericalExpression overrides numerical ops to correctly handle
# merging of inputs. Look up and call the appropriate
# right-binding operator with ourself as the input.
return commuted_method_getter(other)(self)
elif isinstance(other, Term):
if other.dtype != bool_dtype:
raise BadBinaryOperator(op, self, other)
if self is other:
return NumExprFilter.create(
"x_0 {op} x_0".format(op=op),
(self,),
)
return NumExprFilter.create(
"x_0 {op} x_1".format(op=op),
(self, other),
)
elif isinstance(other, int): # Note that this is true for bool as well
return NumExprFilter.create(
"x_0 {op} {constant}".format(op=op, constant=int(other)),
binds=(self,),
)
raise BadBinaryOperator(op, self, other)
binary_operator.__doc__ = "Binary Operator: '%s'" % op
return binary_operator
def unary_operator(op):
"""
Factory function for making unary operator methods for Filters.
"""
valid_ops = {'~'}
if op not in valid_ops:
raise ValueError("Invalid unary operator %s." % op)
def unary_operator(self):
# This can't be hoisted up a scope because the types returned by
# unary_op_return_type aren't defined when the top-level function is
# invoked.
if isinstance(self, NumericalExpression):
return NumExprFilter.create(
"{op}({expr})".format(op=op, expr=self._expr),
self.inputs,
)
else:
return NumExprFilter.create("{op}x_0".format(op=op), (self,))
unary_operator.__doc__ = "Unary Operator: '%s'" % op
return unary_operator
[docs]class Filter(RestrictedDTypeMixin, ComputableTerm):
"""
Pipeline expression computing a boolean output.
Filters are most commonly useful for describing sets of assets to include
or exclude for some particular purpose. Many Pipeline API functions accept
a ``mask`` argument, which can be supplied a Filter indicating that only
values passing the Filter should be considered when performing the
requested computation. For example, :meth:`zipline.pipeline.Factor.top`
accepts a mask indicating that ranks should be computed only on assets that
passed the specified Filter.
The most common way to construct a Filter is via one of the comparison
operators (``<``, ``<=``, ``!=``, ``eq``, ``>``, ``>=``) of
:class:`~zipline.pipeline.Factor`. For example, a natural way to construct
a Filter for stocks with a 10-day VWAP less than $20.0 is to first
construct a Factor computing 10-day VWAP and compare it to the scalar value
20.0::
>>> from zipline.pipeline.factors import VWAP
>>> vwap_10 = VWAP(window_length=10)
>>> vwaps_under_20 = (vwap_10 <= 20)
Filters can also be constructed via comparisons between two Factors. For
example, to construct a Filter producing True for asset/date pairs where
the asset's 10-day VWAP was greater than it's 30-day VWAP::
>>> short_vwap = VWAP(window_length=10)
>>> long_vwap = VWAP(window_length=30)
>>> higher_short_vwap = (short_vwap > long_vwap)
Filters can be combined via the ``&`` (and) and ``|`` (or) operators.
``&``-ing together two filters produces a new Filter that produces True if
**both** of the inputs produced True.
``|``-ing together two filters produces a new Filter that produces True if
**either** of its inputs produced True.
The ``~`` operator can be used to invert a Filter, swapping all True values
with Falses and vice-versa.
Filters may be set as the ``screen`` attribute of a Pipeline, indicating
asset/date pairs for which the filter produces False should be excluded
from the Pipeline's output. This is useful both for reducing noise in the
output of a Pipeline and for reducing memory consumption of Pipeline
results.
"""
# Filters are window-safe by default, since a yes/no decision means the
# same thing from all temporal perspectives.
window_safe = True
# Used by RestrictedDTypeMixin
ALLOWED_DTYPES = FILTER_DTYPES
dtype = bool_dtype
clsdict = locals()
clsdict.update(
{
method_name_for_op(op): binary_operator(op)
for op in FILTER_BINOPS
}
)
clsdict.update(
{
method_name_for_op(op, commute=True): binary_operator(op)
for op in FILTER_BINOPS
}
)
__invert__ = unary_operator('~')
def _validate(self):
# Run superclass validation first so that we handle `dtype not passed`
# before this.
retval = super(Filter, self)._validate()
if self.dtype != bool_dtype:
raise UnsupportedDataType(
typename=type(self).__name__,
dtype=self.dtype
)
return retval
@classlazyval
def _downsampled_type(self):
return DownsampledMixin.make_downsampled_type(Filter)
@classlazyval
def _aliased_type(self):
return AliasedMixin.make_aliased_type(Filter)
class NumExprFilter(NumericalExpression, Filter):
"""
A Filter computed from a numexpr expression.
"""
@classmethod
def create(cls, expr, binds):
"""
Helper for creating new NumExprFactors.
This is just a wrapper around NumericalExpression.__new__ that always
forwards `bool` as the dtype, since Filters can only be of boolean
dtype.
"""
return cls(expr=expr, binds=binds, dtype=bool_dtype)
def _compute(self, arrays, dates, assets, mask):
"""
Compute our result with numexpr, then re-apply `mask`.
"""
return super(NumExprFilter, self)._compute(
arrays,
dates,
assets,
mask,
) & mask
class NullFilter(SingleInputMixin, Filter):
"""
A Filter indicating whether input values are missing from an input.
Parameters
----------
factor : zipline.pipeline.Term
The factor to compare against its missing_value.
"""
window_length = 0
def __new__(cls, term):
return super(NullFilter, cls).__new__(
cls,
inputs=(term,),
)
def _compute(self, arrays, dates, assets, mask):
data = arrays[0]
if isinstance(data, LabelArray):
return data.is_missing()
return is_missing(arrays[0], self.inputs[0].missing_value)
class NotNullFilter(SingleInputMixin, Filter):
"""
A Filter indicating whether input values are **not** missing from an input.
Parameters
----------
factor : zipline.pipeline.Term
The factor to compare against its missing_value.
"""
window_length = 0
def __new__(cls, term):
return super(NotNullFilter, cls).__new__(
cls,
inputs=(term,),
)
def _compute(self, arrays, dates, assets, mask):
data = arrays[0]
if isinstance(data, LabelArray):
return ~data.is_missing()
return ~is_missing(arrays[0], self.inputs[0].missing_value)
class PercentileFilter(SingleInputMixin, Filter):
"""
A Filter representing assets falling between percentile bounds of a Factor.
Parameters
----------
factor : zipline.pipeline.factor.Factor
The factor over which to compute percentile bounds.
min_percentile : float [0.0, 1.0]
The minimum percentile rank of an asset that will pass the filter.
max_percentile : float [0.0, 1.0]
The maxiumum percentile rank of an asset that will pass the filter.
"""
window_length = 0
def __new__(cls, factor, min_percentile, max_percentile, mask):
return super(PercentileFilter, cls).__new__(
cls,
inputs=(factor,),
mask=mask,
min_percentile=min_percentile,
max_percentile=max_percentile,
)
def _init(self, min_percentile, max_percentile, *args, **kwargs):
self._min_percentile = min_percentile
self._max_percentile = max_percentile
return super(PercentileFilter, self)._init(*args, **kwargs)
@classmethod
def _static_identity(cls, min_percentile, max_percentile, *args, **kwargs):
return (
super(PercentileFilter, cls)._static_identity(*args, **kwargs),
min_percentile,
max_percentile,
)
def _validate(self):
"""
Ensure that our percentile bounds are well-formed.
"""
if not 0.0 <= self._min_percentile < self._max_percentile <= 100.0:
raise BadPercentileBounds(
min_percentile=self._min_percentile,
max_percentile=self._max_percentile,
upper_bound=100.0
)
return super(PercentileFilter, self)._validate()
def _compute(self, arrays, dates, assets, mask):
"""
For each row in the input, compute a mask of all values falling between
the given percentiles.
"""
# TODO: Review whether there's a better way of handling small numbers
# of columns.
data = arrays[0].copy().astype(float64)
data[~mask] = nan
# FIXME: np.nanpercentile **should** support computing multiple bounds
# at once, but there's a bug in the logic for multiple bounds in numpy
# 1.9.2. It will be fixed in 1.10.
# c.f. https://github.com/numpy/numpy/pull/5981
lower_bounds = nanpercentile(
data,
self._min_percentile,
axis=1,
keepdims=True,
)
upper_bounds = nanpercentile(
data,
self._max_percentile,
axis=1,
keepdims=True,
)
return (lower_bounds <= data) & (data <= upper_bounds)
def graph_repr(self):
return "{}:\l min: {}, max: {}\l".format(
type(self).__name__,
self._min_percentile,
self._max_percentile,
)
class CustomFilter(PositiveWindowLengthMixin, CustomTermMixin, Filter):
"""
Base class for user-defined Filters.
Parameters
----------
inputs : iterable, optional
An iterable of `BoundColumn` instances (e.g. USEquityPricing.close),
describing the data to load and pass to `self.compute`. If this
argument is passed to the CustomFilter constructor, we look for a
class-level attribute named `inputs`.
window_length : int, optional
Number of rows to pass for each input. If this argument is not passed
to the CustomFilter constructor, we look for a class-level attribute
named `window_length`.
Notes
-----
Users implementing their own Filters should subclass CustomFilter and
implement a method named `compute` with the following signature:
.. code-block:: python
def compute(self, today, assets, out, *inputs):
...
On each simulation date, ``compute`` will be called with the current date,
an array of sids, an output array, and an input array for each expression
passed as inputs to the CustomFilter constructor.
The specific types of the values passed to `compute` are as follows::
today : np.datetime64[ns]
Row label for the last row of all arrays passed as `inputs`.
assets : np.array[int64, ndim=1]
Column labels for `out` and`inputs`.
out : np.array[bool, ndim=1]
Output array of the same shape as `assets`. `compute` should write
its desired return values into `out`.
*inputs : tuple of np.array
Raw data arrays corresponding to the values of `self.inputs`.
See the documentation for
:class:`~zipline.pipeline.factors.factor.CustomFactor` for more details on
implementing a custom ``compute`` method.
See Also
--------
zipline.pipeline.factors.factor.CustomFactor
"""
def _validate(self):
try:
super(CustomFilter, self)._validate()
except UnsupportedDataType:
if self.dtype in CLASSIFIER_DTYPES:
raise UnsupportedDataType(
typename=type(self).__name__,
dtype=self.dtype,
hint='Did you mean to create a CustomClassifier?',
)
elif self.dtype in FACTOR_DTYPES:
raise UnsupportedDataType(
typename=type(self).__name__,
dtype=self.dtype,
hint='Did you mean to create a CustomFactor?',
)
raise
class ArrayPredicate(SingleInputMixin, Filter):
"""
A filter applying a function from (ndarray, *args) -> ndarray[bool].
Parameters
----------
term : zipline.pipeline.Term
Term producing the array over which the predicate will be computed.
op : function(ndarray, *args) -> ndarray[bool]
Function to apply to the result of `term`.
opargs : tuple[hashable]
Additional argument to apply to ``op``.
"""
params = ('op', 'opargs')
window_length = 0
@expect_types(term=Term, opargs=tuple)
def __new__(cls, term, op, opargs):
hash(opargs) # fail fast if opargs isn't hashable.
return super(ArrayPredicate, cls).__new__(
ArrayPredicate,
op=op,
opargs=opargs,
inputs=(term,),
mask=term.mask,
)
def _compute(self, arrays, dates, assets, mask):
params = self.params
data = arrays[0]
return params['op'](data, *params['opargs']) & mask
def graph_repr(self):
return "{}:\l op: {}.{}()".format(
type(self).__name__,
self.params['op'].__module__,
self.params['op'].__name__,
)
class Latest(LatestMixin, CustomFilter):
"""
Filter producing the most recently-known value of `inputs[0]` on each day.
"""
pass
[docs]class SingleAsset(Filter):
"""
A Filter that computes to True only for the given asset.
"""
inputs = []
window_length = 1
def __new__(cls, asset):
return super(SingleAsset, cls).__new__(cls, asset=asset)
def _init(self, asset, *args, **kwargs):
self._asset = asset
return super(SingleAsset, self)._init(*args, **kwargs)
@classmethod
def _static_identity(cls, asset, *args, **kwargs):
return (
super(SingleAsset, cls)._static_identity(*args, **kwargs), asset,
)
def _compute(self, arrays, dates, assets, mask):
is_my_asset = (assets == self._asset.sid)
out = repeat_first_axis(is_my_asset, len(mask))
# Raise an exception if `self._asset` does not exist for the entirety
# of the timeframe over which we are computing.
if (is_my_asset.sum() != 1) or ((out & mask).sum() != len(mask)):
raise NonExistentAssetInTimeFrame(
asset=self._asset, start_date=dates[0], end_date=dates[-1],
)
return out
[docs] def graph_repr(self):
return "SingleAsset:\l asset: {!r}\l".format(self._asset)
[docs]class StaticSids(Filter):
"""
A Filter that computes True for a specific set of predetermined sids.
``StaticSids`` is mostly useful for debugging or for interactively
computing pipeline terms for a fixed set of sids that are known ahead of
time.
Parameters
----------
sids : iterable[int]
An iterable of sids for which to filter.
"""
inputs = ()
window_length = 0
params = ('sids',)
def __new__(cls, sids):
sids = frozenset(sids)
return super(StaticSids, cls).__new__(cls, sids=sids)
def _compute(self, arrays, dates, sids, mask):
my_columns = sids.isin(self.params['sids'])
return repeat_first_axis(my_columns, len(mask)) & mask
[docs]class StaticAssets(StaticSids):
"""
A Filter that computes True for a specific set of predetermined assets.
``StaticAssets`` is mostly useful for debugging or for interactively
computing pipeline terms for a fixed set of assets that are known ahead of
time.
Parameters
----------
assets : iterable[Asset]
An iterable of assets for which to filter.
"""
def __new__(cls, assets):
sids = frozenset(asset.sid for asset in assets)
return super(StaticAssets, cls).__new__(cls, sids)
[docs]class AllPresent(CustomFilter, SingleInputMixin, StandardOutputs):
"""Pipeline filter indicating input term has data for a given window.
"""
def _validate(self):
if isinstance(self.inputs[0], Filter):
raise TypeError(
"Input to filter `AllPresent` cannot be a Filter."
)
return super(AllPresent, self)._validate()
[docs] def compute(self, today, assets, out, value):
if isinstance(value, LabelArray):
out[:] = ~np_any(value.is_missing(), axis=0)
else:
out[:] = ~np_any(
is_missing(value, self.inputs[0].missing_value),
axis=0,
)
class MaximumFilter(Filter, StandardOutputs):
"""Pipeline filter that selects the top asset, possibly grouped and masked.
"""
window_length = 0
def __new__(cls, factor, groupby, mask):
if groupby is NotSpecified:
from zipline.pipeline.classifiers import Everything
groupby = Everything()
return super(MaximumFilter, cls).__new__(
cls,
inputs=(factor, groupby),
mask=mask,
)
def _compute(self, arrays, dates, assets, mask):
# XXX: We're doing a lot of unncessary work here if `groupby` isn't
# specified.
data = arrays[0]
group_labels, null_label = self.inputs[1]._to_integral(arrays[1])
effective_mask = (
mask
& (group_labels != null_label)
& ~is_missing(data, self.inputs[0].missing_value)
).view(uint8)
return grouped_masked_is_maximal(
# Unconditionally view the data as int64.
# This is safe because casting from float64 to int64 is an
# order-preserving operation.
data.view(int64_dtype),
# PERF: Consider supporting different sizes of group labels.
group_labels.astype(int64_dtype),
effective_mask,
)
def __repr__(self):
return "Maximum({!r}, groupby={!r}, mask={!r})".format(
self.inputs[0], self.inputs[1], self.mask,
)
def graph_repr(self):
return "Maximum:\l groupby: {}\l mask: {}\l".format(
type(self.inputs[1]).__name__,
type(self.mask).__name__,
)