# pylint: disable = too-many-lines
import itertools
import logging
from copy import deepcopy
from functools import reduce
from itertools import combinations, product
import numpy as np
from scipy import linalg
from formulae.utils import get_interaction_matrix
from formulae.contrasts import pick_contrasts
from formulae.terms.call import Call
from formulae.terms.variable import Variable
_log = logging.getLogger("formulae")
[docs]class Intercept:
"""Internal representation of a model intercept."""
def __init__(self):
self.name = "Intercept"
self._type = "Intercept"
self.data = None
self.len = None
self.metadata = {"type": "intercept"}
def __eq__(self, other):
return isinstance(other, type(self))
def __hash__(self):
return hash(self._type)
[docs] def __add__(self, other):
"""Addition operator.
Generally this operator is used to explicitly add an intercept to a model. However, there
may be cases where the result is not a ``Model``, or does not contain an intercept.
* ``"1 + 0"`` and ``"1 + (-1)"`` return an empty model.
* ``"1 + 1"`` returns an intercept.
* ``"1 + x"`` and ``"1 + (x|g)"`` returns a model with both the term and the intercept.
* ``"1 + (x + y)"`` adds an intercept to the model given by ``x`` and ``y``.
"""
if isinstance(other, NegatedIntercept):
return Model()
elif isinstance(other, type(self)):
return self
elif isinstance(other, (Term, GroupSpecificTerm)):
return Model(self, other)
elif isinstance(other, Model):
return Model(self) + other
else:
return NotImplemented
[docs] def __sub__(self, other):
"""Subtraction operator.
This operator removes an intercept from a model if the given model has an intercept.
* ``"1 - 1"`` returns an empty model.
* ``"1 - 0"`` and ``"1 - (-1)"`` return an intercept.
* ``"1 - (x + y)"`` returns the model given by ``x`` and ``y`` unchanged.
* ``"1 - (1 + x + y)"`` returns the model given by ``x`` and ``y``, removing the intercept.
"""
if isinstance(other, type(self)):
return Model()
elif isinstance(other, NegatedIntercept):
return self
elif isinstance(other, Model):
if self in other.common_terms:
return Model()
else:
return self
else:
return NotImplemented
[docs] def __or__(self, other):
"""Group-specific operator. Creates group-specific intercept.
This operation is usually surrounded by parenthesis. It is not actually required. They
are always used because ``|`` has lower precedence that the other common operators.
This operator is distributed over the right-hand side, which means ``(1|g + h)`` is
equivalent to ``(1|g) + (1|h)``.
"""
if isinstance(other, Term):
return GroupSpecificTerm(self, other)
elif isinstance(other, Model):
products = product([self], other.common_terms)
terms = [GroupSpecificTerm(p[0], p[1]) for p in products]
return Model(*terms)
else:
return NotImplemented
def __repr__(self):
return self.__str__()
def __str__(self):
return f"{self.__class__.__name__}()"
@property
def var_names(self):
"""Returns empty set, no variables are used in the intercept."""
return set()
[docs] def set_type(self, data, eval_env): # pylint: disable = unused-argument
"""Sets length of the intercept."""
# Nothing goes here as the type is given by the class.
# Only works with DataFrames or Series so far
self.len = data.shape[0]
[docs] def set_data(self, encoding): # pylint: disable = unused-argument
"""Creates data for the intercept.
It sets ``self.data`` equal to a numpy array of ones of length ``(self.len, 1)``.
"""
self.data = np.ones((self.len, 1))
[docs] def eval_new_data(self, data):
"""Returns data for a new intercept.
The length of the new intercept is given by the number of rows in ``data``.
"""
# it assumes data is a pandas DataFrame now
return np.ones((data.shape[0], 1))
[docs]class NegatedIntercept:
"""Internal representation of the opposite of a model intercept.
This object is created whenever we use ``"0"`` or ``"-1"`` in a model formula. It is not
expected to appear in a final model. It's here to help us make operations using the
``Intercept`` and deciding when to keep it and when to drop it.
"""
def __init__(self):
self.name = "NegatedIntercept"
self._type = "Intercept"
[docs] def __add__(self, other):
"""Addition operator.
Generally this operator is used to explicitly remove an from a model.
* ``"0 + 1"`` returns an empty model.
* ``"0 + 0"`` returns a negated intercept
* ``"0 + x"`` returns a model that includes the negated intercept.
* ``"0 + (x + y)"`` adds an the negated intercept to the model given by ``x`` and ``y``.
No matter the final result contains the negated intercept, for example if we do something
like ``"y ~ 0 + x + y + 0"``, the ``Model`` that is obtained removes any negated intercepts
thay may have been left. They just don't make sense in a model.
"""
if isinstance(other, type(self)):
return self
elif isinstance(other, Intercept):
return Model()
elif isinstance(other, (Term, GroupSpecificTerm)):
return Model(self, other)
elif isinstance(other, Model):
return Model(self) + other
else:
return NotImplemented
def __eq__(self, other):
return isinstance(other, type(self))
def __or__(self, other):
raise ValueError("At least include an intercept in '|' operation")
def __repr__(self):
return self.__str__()
def __str__(self):
return f"{self.__class__.__name__}()"
@property
def var_names(self):
# This method should never be called. Leaving a set() to avoid harmless error.
return set()
def set_type(self, *args, **kwargs):
# This method should never be called. Leaving a pass to avoid harmless error.
pass
def set_data(self, *args, **kwargs):
# This method should never be called. Leaving a pass to avoid harmless error.
pass
[docs]class Term:
"""Representation of a model term.
Terms are made of one or more components. Components are instances of :class:`.Variable` or
:class:`.Call`. Terms with only one component are known as main effects and terms with more than
one component are known as interaction effects. The order of the interaction is given by the
number of components in the term.
Parameters
----------
components: :class:`.Variable` or :class:`.Call`
Atomic components of a term.
Attributes
----------
data: dict
The values associated with the term as they go into the design matrix.
metadata: dict
Metadata associated with the term. If ``"numeric"`` or ``"categoric"`` it holds additional
information in the component ``.data`` attribute. If ``"interaction"``, the keys are
the name of the components and the values are dictionaries holding the metadata.
_type: string
Indicates the type of the term. Can be one of ``"numeric"``, ``"categoric"``, or
``"interaction"``.
name: string
The name of the term as it was originally written in the model formula.
"""
def __init__(self, *components):
self.data = None
self.metadata = {}
self._type = None
self.components = []
self.component_types = None
for component in components:
if component not in self.components:
self.components.append(component)
self.name = ":".join([str(c.name) for c in self.components])
def __hash__(self):
return hash(tuple(self.components))
def __eq__(self, other):
if not isinstance(other, type(self)):
return False
else:
return self.components == other.components
[docs] def __add__(self, other):
"""Addition operator. Analogous to set union.
* ``"x + x"`` is equal to just ``"x"``
* ``"x + y"`` is equal to a model with both ``x`` and ``y``.
* ``"x + (y + z)"`` adds ``x`` to model already containing ``y`` and ``z``.
"""
# x + x -> x
# x + y -> x + y
# x:y + u -> x:y + u
# x:y + u:v -> x:y + u:v
# x:y + (u + v) -> x:y + u + v
# f(x) + y -> f(x) + y
# f(x) + (y + z) -> f(x) + y + z
if self == other:
return self
elif isinstance(other, type(self)):
return Model(self, other)
elif isinstance(other, Model):
return Model(self) + other
else:
return NotImplemented
[docs] def __sub__(self, other):
"""Subtraction operator. Analogous to set difference.
* ``"x - x"`` returns empty model.
* ``"x - y"`` returns the term ``"x"``.
* ``"x - (y + z)"`` returns the term ``"x"``.
"""
# x:y - u -> x:y
# x:y - u:v -> x:y
# x:y - (u + v) -> x:y
# f(x) - y -> f(x)
# f(x) - (y + z) -> f(x)
if isinstance(other, type(self)):
if self.components == other.components:
return Model()
else:
return self
elif isinstance(other, Model):
if self in other.terms:
return Model()
else:
return self
else:
return NotImplemented
[docs] def __mul__(self, other):
"""Full interaction operator.
This operator includes both the interaction as well as the main effects involved in the
interaction. It is a shortcut for ``x + y + x:y``.
* ``"x * x"`` equals to ``"x"``
* ``"x * y"`` equals to``"x + y + x:y"``
* ``"x:y * u"`` equals to ``"x:y + u + x:y:u"``
* ``"x:y * u:v"`` equals to ``"x:y + u:v + x:y:u:v"``
* ``"x:y * (u + v)"`` equals to ``"x:y + u + v + x:y:u + x:y:v"``
"""
if self == other:
return self
elif isinstance(other, type(self)):
if len(other.components) == 1 and isinstance(other.components[0].name, (int, float)):
raise TypeError("Interaction with numeric does not make sense.")
return Model(self, other, Term(*deepcopy(self.components), *deepcopy(other.components)))
elif isinstance(other, Model):
products = product([self], other.common_terms)
terms = [self] + other.common_terms
iterms = [
Term(*deepcopy(p[0].components), *deepcopy(p[1].components)) for p in products
]
return Model(*terms) + Model(*iterms)
else:
return NotImplemented
[docs] def __matmul__(self, other):
"""Simple interaction operator.
This operator is actually invoked as ``:`` but internally passed as ``@`` because there
is no ``:`` operator in Python.
* ``"x : x"`` equals to ``"x"``
* ``"x : y"`` is the interaction between ``"x"`` and ``"y"``
* ``x:(y:z)"`` equals to just ``"x:y:z"``
* ``(x:y):u"`` equals to just ``"x:y:u"``
* ``"(x:y):(u + v)"`` equals to ``"x:y:u + x:y:v"``
"""
if self == other:
return self
elif isinstance(other, type(self)):
if len(other.components) == 1 and isinstance(other.components[0].name, (int, float)):
raise TypeError("Interaction with numeric does not make sense.")
return Term(*self.components, *other.components)
elif isinstance(other, Model):
products = product([self], other.common_terms)
iterms = [Term(*p[0].components, *p[1].components) for p in products]
return Model(*iterms)
else:
return NotImplemented
[docs] def __truediv__(self, other):
"""Division interaction operator.
* ``"x / x"`` equals to just ``"x"``
* ``"x / y"`` equals to ``"x + x:y"``
* ``"x / z:y"`` equals to ``"x + x:z:y"``
* ``"x / (z + y)"`` equals to ``"x + x:z + x:y"``
* ``"x:y / u:v"`` equals to ``"x:y + x:y:u:v"``
* ``"x:y / (u + v)"`` equals to ``"x:y + x:y:u + x:y:v"``
"""
if self == other:
return self
elif isinstance(other, type(self)):
if len(other.components) == 1 and isinstance(other.components[0].name, (int, float)):
raise TypeError("Interaction with numeric does not make sense.")
return Model(self, Term(*self.components, *other.components))
elif isinstance(other, Model):
products = product([self], other.common_terms)
iterms = [Term(*p[0].components, *p[1].components) for p in products]
return self + Model(*iterms)
else:
return NotImplemented
[docs] def __or__(self, other):
"""Group-specific operator. Creates group-specific intercept.
Intercepts are implicitly added.
* ``"x|g"`` equals to ``"(1|g) + (x|g)"``
Distributive over right hand side
* ``"(x|g + h)"`` equals to ``"(1|g) + (1|h) + (x|g) + (x|h)"``
"""
if isinstance(other, Term):
# Only accepts terms, call terms and interactions.
# Adds implicit intercept.
terms = [GroupSpecificTerm(Intercept(), other), GroupSpecificTerm(self, other)]
return Model(*terms)
elif isinstance(other, Model):
intercepts = [
GroupSpecificTerm(Intercept(), p[1]) for p in product([self], other.common_terms)
]
slopes = [GroupSpecificTerm(p[0], p[1]) for p in product([self], other.common_terms)]
return Model(*intercepts, *slopes)
else:
return NotImplemented
[docs] def __pow__(self, other):
"""Power operator.
It leaves the term as it is. For a power in the math sense do ``I(x ** n)`` or ``{x ** n}``.
"""
c = other.components
if len(c) == 1 and isinstance(c[0].name, int) and c[0].name >= 1:
_log.warning(
"Exponentiation on an individual variable returns the variable as it is.\n"
"Use {%s**%s} or I(%s**%s) to compute the math power.",
self.name,
c[0].name,
self.name,
c[0].name,
)
return self
else:
return NotImplemented
def __repr__(self):
return self.__str__()
def __str__(self):
string = "[" + ", ".join([str(component) for component in self.components]) + "]"
return f"{self.__class__.__name__}({string})"
[docs] def set_type(self, data, eval_env):
"""Set type of the components in the term.
Calls ``.set_type()`` method on each component in the term. For those components of class
:class:`.Variable`` it only passes the data mask. For `:class:`.Call` objects it also passes
the evaluation environment.
Parameters
----------
data: pd.DataFrame
The data frame where variables are taken from
eval_env: EvalEnvironment
The environment where values and functions are taken from.
"""
# Set the type of the components by calling their set_type method.
for component in self.components:
if isinstance(component, Variable):
component.set_type(data)
elif isinstance(component, Call):
component.set_type(data, eval_env)
else:
raise ValueError(
"Can't set type on Term because at least one of the components "
f"is of the unexpected type {type(component)}."
)
# Store the type of the components
self.component_types = {
component.name: component._type # pylint: disable = protected-access
for component in self.components
}
# Determine whether this term is numeric, categoric, or an interaction.
if len(self.components) > 1:
self._type = "interaction" # pylint: disable = protected-access
else:
self._type = self.components[0]._type # pylint: disable = protected-access
[docs] def set_data(self, encoding):
"""Obtains and stores the final data object related to this term.
Calls ``.set_data()`` method on each component in the term. Then, it uses the ``.data``
attribute on each of them to build ``self.data`` and ``self.metadata``.
Parameters
----------
encoding: list or dict
Indicates if it uses full or reduced encoding when the type of the variable is
categoric.
"""
if isinstance(encoding, list) and len(encoding) == 1:
encoding = encoding[0]
else:
ValueError("encoding is a list of len > 1")
for component in self.components:
encoding_ = False
if isinstance(encoding, dict):
encoding_ = encoding.get(component.name, False)
elif isinstance(encoding, bool):
encoding_ = encoding
component.set_data(encoding_)
if self._type == "interaction":
self.data = reduce(get_interaction_matrix, [c.data["value"] for c in self.components])
self.metadata["type"] = "interaction"
self.metadata["terms"] = {
c.name: {k: v for k, v in c.data.items() if k != "value"} for c in self.components
}
else:
component = self.components[0]
self.data = component.data["value"]
self.metadata = {k: v for k, v in component.data.items() if k != "value"}
[docs] def eval_new_data(self, data):
"""Evaluates the term with new data.
Calls ``.eval_new_data()`` method on each component in the term and combines the results
appropiately.
Parameters
----------
data: pd.DataFrame
The data frame where variables are taken from
Returns
----------
result: np.array
The values resulting from evaluating this term using the new data.
"""
if self._type == "interaction":
result = reduce(
get_interaction_matrix, [c.eval_new_data(data) for c in self.components]
)
else:
result = self.components[0].eval_new_data(data)
return result
@property
def var_names(self):
"""Returns the name of the variables in the term as a set.
Loops through each component and updates the set with the ``.var_names`` of each component.
Returns
----------
var_names: set
The names of the variables involved in the term.
"""
var_names = set()
for component in self.components:
var_names.update(component.var_names)
return var_names
[docs] def get_component(self, name): # pylint: disable = inconsistent-return-statements
"""Returns a component by name.
Parameters
----------
name: string
The name of the component to return.
Returns
-------
component: `:class:`.Variable` or `:class:`.Call`
The component with name ``name``.
"""
for component in self.components:
if component.name == name:
return component
[docs]class GroupSpecificTerm:
"""Representation of a group specific term.
Group specific terms are of the form ``(expr | factor)``. The expression ``expr`` is evaluated
as a model formula with only common effects and produces a model matrix following the rules
for common terms. ``factor`` is inspired on factors in R, but here it is evaluated as an ordered
pandas.CategoricalDtype object.
The operator ``|`` works as in R package lme4. As its authors say: "One way to think about the
vertical bar operator is as a special kind of interaction between the model matrix and the
grouping factor. This interaction ensures that the columns of the model matrix have different
effects for each level of the grouping factor"
Parameters
----------
expr: :class:`.Intercept` or :class:`.Term`
The term for which we want to have a group specific term.
factor: :class:`.Term`
The factor that determines the groups in the group specific term.
Attributes
----------
factor_type: pandas.core.dtypes.dtypes.CategoricalDtype
The type assigned to the grouping factor ``factor``. This is useful for when we need to
create a design matrix for new a new data set.
"""
def __init__(self, expr, factor):
self.expr = expr
self.factor = factor
self.groups = None
def __eq__(self, other):
if not isinstance(other, type(self)):
return False
return self.expr == other.expr and self.factor == other.factor
def __hash__(self):
return hash((self.expr, self.factor))
def __repr__(self):
return self.__str__()
def __str__(self):
strlist = [
f"expr= {' '.join(str(self.expr).splitlines(True))}",
f"factor= {' '.join(str(self.factor).splitlines(True))}",
]
return self.__class__.__name__ + "(\n " + ",\n ".join(strlist) + "\n)"
[docs] def eval(self, data, eval_env, encoding):
"""Evaluates term.
First, it evaluates the variable in ``self.factor``, creates an oredered categorical data
type using its levels, and stores it in ``self.factor_type``. Then, it obtains the
design matrix for ``self.expr`` to finally produce the matrix for the group specific
effect.
The output contains the following information
* ``"type"``: The type of the ``expr`` term.
* ``"Xi"``: The design matrix for the ``expr`` term.
* ``"Ji"``: The design matrix for the ``factor`` term.
* ``"Zi"``: The design matrix for the group specific term.
* ``"groups"``: The groups present in ``factor``.
If ``"type"`` is ``"categoric"``, the output dictionary also contains
* ``"levels"``: Levels of the term in ``expr``.
* ``"reference"``: The level taken as baseline.
* ``"encoding"``: The encoding of the term, either ``"full"`` or ``"reduced"``
If ``"type"`` is ``"interaction"``, the output dictionary also contains
* ``"terms"``: Metadata for each of the components in the interaction in ``expr``.
Parameters
----------
data: pandas.DataFrame
The data frame where variables are taken from.
eval_env: EvalEnvironment
The environment where values and functions are taken from.
encoding: bool
Whether to use full or reduced rank encoding when ``expr`` is categoric.
Returns
-------
out: dict
See above.
"""
# Factor must be considered categorical, and with full encoding. We set type and obtain
# data for the factor term manually.
# Set type on each component to check data is behaved as expected and then
# manually set type of the components to categoric.
for comp in self.factor.components:
if isinstance(comp, Variable):
comp.set_type(data)
elif isinstance(comp, Call):
comp.set_type(data, eval_env)
else:
raise ValueError(
"Can't set type on Term because at least one of the components "
f"is of the unexpected type {type(comp)}."
)
comp._type = "categoric" # pylint: disable = protected-access
# Store the type of the components.
# We know they are categoric.
self.factor.component_types = {comp.name: "categoric" for comp in self.factor.components}
if len(self.factor.components) > 1:
self.factor._type = "interaction" # pylint: disable = protected-access
else:
self.factor._type = "categoric" # pylint: disable = protected-access
# Pass encoding=True when setting data.
self.factor.set_data(True)
# Obtain group names
groups = []
for comp in self.factor.components:
# We're certain they are all categoric with full encoding.
groups.append([str(lvl) for lvl in comp.data["levels"]])
self.groups = [":".join(s) for s in list(itertools.product(*groups))]
self.expr.set_type(data, eval_env)
self.expr.set_data(encoding)
Xi = self.expr.data
Ji = self.factor.data
Zi = linalg.khatri_rao(Ji.T, Xi.T).T
out = {
"type": self.expr.metadata["type"],
"Xi": Xi,
"Ji": Ji,
"Zi": Zi,
"groups": self.groups,
}
if self.expr._type == "categoric": # pylint: disable = protected-access
out["levels"] = self.expr.metadata["levels"]
out["reference"] = self.expr.metadata["reference"]
out["encoding"] = self.expr.metadata["encoding"]
elif self.expr._type == "interaction": # pylint: disable = protected-access
out["terms"] = self.expr.metadata["terms"]
return out
[docs] def eval_new_data(self, data):
"""Evaluates the term with new data.
Converts the variable in ``factor`` to the type remembered from the first evaluation and
produces the design matrix for this grouping, calls ``.eval_new_data()`` on ``self.expr``
to obtain the design matrix for the ``expr`` side, then computes the design matrix
corresponding to the group specific effect.
Parameters
----------
data: pd.DataFrame
The data frame where variables are taken from.
Returns
----------
out: dict
Same rules as in :meth:`eval <GroupSpecificTerm.eval>`.
"""
Xi = self.expr.eval_new_data(data)
Ji = self.factor.eval_new_data(data)
Zi = linalg.khatri_rao(Ji.T, Xi.T).T
out = {
"type": self.expr.metadata["type"],
"Xi": Xi,
"Ji": Ji,
"Zi": Zi,
"groups": self.groups,
}
if self.expr._type == "categoric": # pylint: disable = protected-access
out["levels"] = self.expr.metadata["levels"]
out["reference"] = self.expr.metadata["reference"]
out["encoding"] = self.expr.metadata["encoding"]
elif self.expr._type == "interaction": # pylint: disable = protected-access
out["terms"] = self.expr.metadata["terms"]
return out
@property
def var_names(self):
"""Returns the name of the variables in the term as a set.
Obtains both the variables in the ``expr`` as well as the variables in ``factor``.
Returns
----------
var_names: set
The names of the variables involved in the term.
"""
expr_names = self.expr.var_names.copy()
factor_names = self.factor.var_names.copy()
return expr_names.union(factor_names)
[docs] def get_name(self):
"""Obtain string representation of the name of the term.
Returns
----------
name: str
The name of the term, such as ``1|g`` or ``var|g``.
"""
name = ""
if isinstance(self.expr, Intercept):
name += "1|"
elif isinstance(self.expr, Term):
name += f"{self.expr.name}|"
else:
raise ValueError("Invalid LHS expression for group specific term")
if isinstance(self.factor, Term):
name += self.factor.name
else:
raise ValueError("Invalid RHS expression for group specific term")
return name
[docs]class Response:
"""Representation of a response term.
It is mostly a wrapper around :class:`.Term`.
Parameters
----------
term: :class:`.Term`
The term we want to take as response in the model. Must contain only one component.
"""
def __init__(self, term):
if isinstance(term, Term):
n = len(term.components)
if n == 1:
self.term = term
self.term.components[0].is_response = True
else:
raise ValueError(f"The response term must contain only one component, not {n}.")
else:
raise ValueError(f"The response term must be of class Term, not {type(term)}.")
def __eq__(self, other):
if not isinstance(other, type(self)):
return False
else:
return self.term == other.term
[docs] def __add__(self, other):
"""Modelled as operator.
The operator is ``~``, but since it is not an operator in Python, we internally replace it
with ``+``. It means the LHS is taken as the response, and the RHS as the predictor.
"""
if isinstance(other, (Term, GroupSpecificTerm, Intercept)):
return Model(other, response=self)
elif isinstance(other, Model):
return other.add_response(self)
else:
return NotImplemented
def __repr__(self):
return self.__str__()
def __str__(self):
return f"{self.__class__.__name__}({self.term})"
@property
def var_names(self):
"""Returns the name of the variables in the response as a set."""
return self.term.var_names
[docs] def set_type(self, data, eval_env):
"""Set type of the response term."""
self.term.set_type(data, eval_env)
[docs] def set_data(self, encoding=False):
"""Set data of the response term."""
self.term.set_data(encoding)
ACCEPTED_TERMS = (Term, GroupSpecificTerm, Intercept, NegatedIntercept)
[docs]class Model:
"""Representation of a model.
Parameters
----------
terms: :class:`.Term`
This object can be instantiated with one or many terms.
response::class:`.Response`
The response term. Defaults to ``None`` which means there is no response.
"""
def __init__(self, *terms, response=None):
if isinstance(response, Response) or response is None:
self.response = response
else:
raise ValueError("Response must be of class Response.")
if all(isinstance(term, ACCEPTED_TERMS) for term in terms):
self.common_terms = [term for term in terms if not isinstance(term, GroupSpecificTerm)]
self.group_terms = [term for term in terms if isinstance(term, GroupSpecificTerm)]
else:
raise ValueError("There is a least one term of an unexpected class.")
def __eq__(self, other):
if not isinstance(other, type(self)):
return False
equal_terms = set(self.terms) == set(other.terms)
equal_response = self.response == other.response
return equal_terms and equal_response
[docs] def __add__(self, other):
"""Addition operator. Analogous to set union.
Adds terms to the model and returns the model.
Returns
-------
self: :class:`.Model`
The same model object with the added term(s).
"""
if isinstance(other, NegatedIntercept):
return self - Intercept()
elif isinstance(other, (Term, GroupSpecificTerm, Intercept)):
return self.add_term(other)
elif isinstance(other, type(self)):
for term in other.terms:
self.add_term(term)
return self
else:
return NotImplemented
[docs] def __sub__(self, other):
"""Subtraction operator. Analogous to set difference.
* ``"(x + y) - (x + u)"`` equals to ``"y + u"``..
* ``"(x + y) - x"`` equals to ``"y"``.
* ``"(x + y + (1 | g)) - (1 | g)"`` equals to ``"x + y"``.
Returns
-------
self: :class:`.Model`
The same model object with the removed term(s).
"""
if isinstance(other, type(self)):
for term in other.terms:
if term in self.common_terms:
self.common_terms.remove(term)
if term in self.group_terms:
self.group_terms.remove(term)
return self
elif isinstance(other, (Term, Intercept)):
if other in self.common_terms:
self.common_terms.remove(other)
return self
elif isinstance(other, GroupSpecificTerm):
if other in self.group_terms:
self.group_terms.remove(other)
return self
else:
return NotImplemented
[docs] def __matmul__(self, other):
"""Simple interaction operator.
* ``"(x + y) : (u + v)"`` equals to ``"x:u + x:v + y:u + y:v"``.
* ``"(x + y) : u"`` equals to ``"x:u + y:u"``.
* ``"(x + y) : f(u)"`` equals to ``"x:f(u) + y:f(u)"``.
Returns
-------
model: :class:`.Model`
A new instance of the model with all the interaction terms computed.
"""
if isinstance(other, type(self)):
products = product(self.common_terms, other.common_terms)
iterms = [Term(*p[0].components, *p[1].components) for p in products]
return Model(*iterms)
elif isinstance(other, Term):
products = product(self.common_terms, [other])
iterms = [Term(*p[0].components, *p[1].components) for p in products]
return Model(*iterms)
else:
return NotImplemented
[docs] def __mul__(self, other):
"""Full interaction operator.
* ``"(x + y) * (u + v)"`` equals to ``"x + y + u + v + x:u + x:v + y:u + y:v"``.
* ``"(x + y) * u"`` equals to ``"x + y + u + x:u + y:u"``.
Returns
-------
model: :class:`.Model`
A new instance of the model with all the interaction terms computed.
"""
if self == other:
return self
elif isinstance(other, type(self)):
if len(other.common_terms) == 1:
components = other.common_terms[0].components
if len(components) == 1 and isinstance(components, (int, float)):
raise TypeError("Interaction with numeric does not make sense.")
products = product(self.common_terms, other.common_terms)
terms = self.common_terms + other.common_terms
iterms = [Term(*p[0].components, *p[1].components) for p in products]
return Model(*terms) + Model(*iterms)
elif isinstance(other, Term):
if len(other.components) == 1 and isinstance(other.components[0].name, (int, float)):
raise TypeError("Interaction with numeric does not make sense.")
products = product(self.common_terms, [other])
terms = self.common_terms + [other]
iterms = [Term(*p[0].components, *p[1].components) for p in products]
return Model(*terms) + Model(*iterms)
else:
return NotImplemented
[docs] def __pow__(self, other):
"""Power of a set made of :class:`.Term`
Computes all interactions up to order ``n`` between the terms in the set.
* ``"(x + y + z) ** 2"`` equals to ``"x + y + z + x:y + x:z + y:z"``.
Returns
-------
model: :class:`.Model`
A new instance of the model with all the terms computed.
"""
if isinstance(other, Term) and len(other.components) == 1:
value = other.components[0].name
if isinstance(value, int) and value >= 1:
comb = [
list(p) for i in range(2, value + 1) for p in combinations(self.common_terms, i)
]
iterms = [Term(*[comp for term in terms for comp in term.components]) for terms in comb]
return self + Model(*iterms)
else:
raise ValueError("Power must be a positive integer.")
[docs] def __truediv__(self, other):
"""Division interaction operator.
* ``"(x + y) / z"`` equals to ``"x + y + x:y:z"``.
* ``"(x + y) / (u + v)"`` equals to ``"x + y + x:y:u + x:y:v"``.
Returns
-------
model: :class:`.Model`
A new instance of the model with all the terms computed.
"""
if isinstance(other, Term):
return self.add_term(Term(*self.common_components + other.components))
elif isinstance(other, Model):
iterms = [Term(*self.common_components, comp) for comp in other.common_components]
return self + Model(*iterms)
else:
return NotImplemented
[docs] def __or__(self, other):
"""Group specific term operator.
Only _models_ ``"0 + x"`` arrive here.
* ``"(0 + x | g)"`` equals to ``"(x|g)"``.
* ``"(0 + x | g + y)"`` equals to ``"(x|g) + (x|y)"``.
There are several edge cases to handle here. See in-line comments.
Returns
-------
model: :class:`.Model`
A new instance of the model with all the terms computed.
"""
# If only one term in the expr, resolve according to the type of the term.
if len(self.common_terms) == 1:
return self.common_terms[0] | other
# Handle intercept
if Intercept() in self.common_terms and NegatedIntercept() in self.common_terms:
# Explicit addition and negation -> remove both -> no intercept
self.common_terms.remove(Intercept())
self.common_terms.remove(NegatedIntercept())
elif NegatedIntercept() in self.common_terms:
# Negation -> remove negation and do not add intercept
self.common_terms.remove(NegatedIntercept())
elif Intercept() not in self.common_terms:
# No negation and no explicit intercept -> implicit intercept
self.common_terms.insert(0, Intercept())
if isinstance(other, Term):
products = product(self.common_terms, [other])
terms = [GroupSpecificTerm(p[0], p[1]) for p in products]
return Model(*terms)
elif isinstance(other, type(self)):
products = product(self.common_terms, other.common_terms)
terms = [GroupSpecificTerm(p[0], p[1]) for p in products]
return Model(*terms)
else:
return NotImplemented
def __repr__(self):
return self.__str__()
def __str__(self):
terms = [str(term) for term in self.common_terms]
if self.response is not None:
terms.insert(0, str(self.response))
string = ",\n ".join([str(term) for term in terms])
if self.group_terms:
group_terms = ",\n".join([str(term) for term in self.group_terms])
if len(string) > 0:
string += ",\n "
string += " ".join(group_terms.splitlines(True))
return f"{self.__class__.__name__}(\n {string}\n)"
[docs] def add_response(self, term):
"""Add response term to model description.
This method is called when something like ``"y ~ x + z"`` appears in a model formula.
This method is called via special methods such as :meth:`Response.__add__`.
Returns
-------
self: :class:`.Model`
The same model object but now with a reponse term.
"""
if isinstance(term, Response):
self.response = term
return self
else:
raise ValueError("not Response")
[docs] def add_term(self, term):
"""Add term to model description.
The term added can be of class :class:`.Intercept` :class:`.Term`, or
:class:`.GroupSpecificTerm`. It appends the new term object to the list of common terms or
group specific terms as appropriate.
This method is called via special methods such as :meth:`__add__`.
Returns
-------
self: :class:`.Model`
The same model object but now containing the new term.
"""
if isinstance(term, GroupSpecificTerm):
if term not in self.group_terms:
self.group_terms.append(term)
return self
elif isinstance(term, (Term, Intercept)):
if term not in self.common_terms:
self.common_terms.append(term)
return self
else:
raise ValueError(f"Can't add an object of class {type(term)} to Model.")
@property
def terms(self):
"""Terms in the model.
Returns
-------
terms: list
A list containing both common and group specific terms.
"""
return self.common_terms + self.group_terms
@property
def common_components(self):
"""Components in common terms in the model.
Returns
-------
components: list
A list containing all components from common terms in the model.
"""
# Note: Check whether this method is really necessary.
return [
comp for term in self.common_terms if isinstance(term, Term) for comp in term.components
]
@property
def var_names(self):
"""Get the name of the variables in the model.
Returns
-------
var_names: set
The names of all variables in the model.
"""
var_names = set()
for term in self.terms:
var_names.update(term.var_names)
if self.response is not None:
var_names.update(self.response.var_names)
return var_names
[docs] def set_types(self, data, eval_env):
"""Set the type of the common terms in the model.
Calls ``.set_type()`` method on term in the model.
Parameters
----------
data: pd.DataFrame
The data frame where variables are taken from
eval_env: EvalEnvironment
The environment where values and functions are taken from.
"""
for term in self.common_terms:
term.set_type(data, eval_env)
def _encoding_groups(self):
components = {}
for term in self.common_terms:
if term._type == "interaction": # pylint: disable = protected-access
components[term.name] = {
c.name: c._type for c in term.components # pylint: disable = protected-access
}
else:
components[term.name] = term._type # pylint: disable = protected-access
# First, group with only categoric terms
categoric_group = dict()
for k, v in components.items():
if v == "categoric":
categoric_group[k] = [k]
elif v == "Intercept":
categoric_group[k] = []
elif isinstance(v, dict): # interaction
# If all categoric terms in the interaction
if all(v_ == "categoric" for v_ in v.values()):
categoric_group[k] = list(v.keys())
# Determine groups of numerics
numeric_group_sets = []
numeric_groups = []
for k, v in components.items():
# v is dict when interaction, otherwise is string.
if isinstance(v, dict):
categoric = [k_ for k_, v_ in v.items() if v_ == "categoric"]
numeric = [k_ for k_, v_ in v.items() if v_ == "numeric"]
# if it is an interaction with both categoric and numeric terms
if categoric and numeric:
numeric_set = set(numeric)
numeric_part = ":".join(numeric)
if numeric_set not in numeric_group_sets:
numeric_group_sets.append(numeric_set)
numeric_groups.append(dict())
idx = numeric_group_sets.index(numeric_set)
# Prevent full encoding when numeric part is present outside
# this numeric-categoric interaction
if numeric_part in components.keys():
numeric_groups[idx][numeric_part] = []
numeric_groups[idx][k] = categoric
return [categoric_group] + numeric_groups
def _encoding_bools(self):
"""Determine encodings for terms containing at least one categorical variable.
This method returns dictionaries with ``True``/``False`` values.
``True`` means the categorical variable uses 'levels' dummies.
``False`` means the categorial variable uses 'levels - 1' dummies.
"""
groups = self._encoding_groups()
l = [pick_contrasts(group) for group in groups]
result = dict()
for d in l:
result.update(d)
return result
[docs] def eval(self, data, eval_env):
"""Evaluates terms in the model.
Only common effects are evaluated here. Group specific terms are evaluated individually
in :class:`GroupEffectsMatrix <formulae.matrices.GroupEffectsMatrix>`.
Parameters
----------
data: pd.DataFrame
The data frame where variables are taken from
eval_env: EvalEnvironment
The environment where values and functions are taken from.
Returns
-------
result: dict
A dictionary where keys are the name of the terms and the values are their ``.data``
attribute.
"""
self.set_types(data, eval_env)
encodings = self._encoding_bools()
result = dict()
# First, we have to add terms if the encoding implies so.
# Group specific effects aren't evaluated here -- this may change
common_terms = self.common_terms.copy()
for term in common_terms:
term_encoding = False
if term.name in encodings.keys():
term_encoding = encodings[term.name]
if hasattr(term_encoding, "__len__") and len(term_encoding) > 1:
# we're in an interaction that added terms.
# we need to create and evaluate these extra terms.
# i.e. "y ~ g1:g2", both g1 and g2 categoric, is equivalent to "y ~ g2 + g1:g2"
# Possibly an interaction adds LOWER order terms, but NEVER HIGHER order terms.
for (idx, encoding) in enumerate(term_encoding):
# Last term never adds any new term, it corresponds to the outer `term`.
if idx == len(term_encoding) - 1:
term.set_type(data, eval_env)
term.set_data(encoding)
result[term.name] = term.data
else:
extra_term = _create_and_eval_extra_term(term, encoding, data, eval_env)
result[extra_term.name] = extra_term.data
# Finally, add term to self.common_terms object, right before the term
# that causes its addition.
self.common_terms.insert(self.common_terms.index(term), extra_term)
else:
# This term does not add any lower order term, so we just evaluate it as it is.
term.set_type(data, eval_env)
term.set_data(term_encoding)
result[term.name] = term.data
return result
def _create_and_eval_extra_term(term, encoding, data, eval_env):
if len(encoding) == 1:
component_name = list(encoding.keys())[0]
encoding_ = list(encoding.values())[0]
component = term.get_component(component_name)
extra_term = Term(component)
else:
component_names = [c.name for c in term.components]
encoding_ = encoding
components = [
term.get_component(name) for name in component_names if name in encoding.keys()
]
extra_term = Term(*components)
extra_term.set_type(data, eval_env)
extra_term.set_data(encoding_)
return extra_term