# pylint: disable=relative-beyond-top-level
import itertools
import logging
import textwrap
from copy import deepcopy
import numpy as np
import pandas as pd
from .eval import EvalEnvironment
from .terms import Model, Intercept
from .model_description import model_description
from .utils import flatten_list
_log = logging.getLogger("formulae")
[docs]class DesignMatrices:
"""A wrapper of the response, the common and group specific effects.
Parameters
----------
model : Model
The model description, the result of calling ``model_description``.
data: pandas.DataFrame
The data frame where variables are taken from.
eval_env: EvalEnvironment
The environment where values and functions are taken from.
Attributes
----------
response: ResponseVector
The response in the model. Access its values with ``self.response.design_vector``. It is
``None`` if there is no response term in the model.
common: CommonEffectsMatrix
The common effects (a.k.a. fixed effects) in the model. The design matrix can be accessed
with ``self.common.design_matrix``. The submatrix for a term is accessed via
``self.common[term_name]``. It is ``None`` if there are no common terms in the
model.
group: GroupEffectsMatrix
The group specific effects (a.k.a. random effects) in the model. The design matrix can be
accessed with ``self.group.design_matrix``. The submatrix for a term is accessed via
``self.group[term_name]``. It is ``None`` if there are no group specific terms in the
model.
"""
def __init__(self, model, data, eval_env):
self.data = data
self.eval_env = eval_env
self.response = None
self.common = None
self.group = None
self.model = model
if self.model.response:
self.response = ResponseVector(self.model.response)
self.response._evaluate(data, eval_env)
if self.model.common_terms:
self.common = CommonEffectsMatrix(Model(*self.model.common_terms))
self.common._evaluate(data, eval_env)
if self.model.group_terms:
self.group = GroupEffectsMatrix(self.model.group_terms)
self.group._evaluate(data, eval_env)
[docs]class ResponseVector:
"""Representation of the respose vector of a model.
Parameters
----------
term : Response
The term that represents the response in the model.
data: pandas.DataFrame
The data frame where variables are taken from.
eval_env: EvalEnvironment
The environment where values and functions are taken from.
Attributes
----------
design_vector: np.array
A 1-dimensional numpy array containing the values of the response.
name: string
The name of the response term.
type: string
Either ``"numeric"`` or ``"categoric"``.
baseline: string
The name of the class taken as reference if ``type = "categoric"``.
"""
def __init__(self, term):
self.term = term
self.data = None
self.eval_env = None
self.design_vector = None
self.name = None # a string
self.type = None # either numeric or categorical
self.baseline = None # Not None for non-binary categorical variables
self.success = None # Not None for binary categorical variables
self.levels = None # Not None for categorical variables
self.binary = None # Not None for categorical variables (either True or False)
[docs] def _evaluate(self, data, eval_env):
"""Evaluates ``self.term`` inside the data mask provided by ``data`` and
updates ``self.design_vector`` and ``self.name``.
"""
self.data = data
self.eval_env = eval_env
self.term.set_type(self.data, self.eval_env)
self.term.set_data()
self.name = self.term.term.name
self.design_vector = self.term.term.data
self.type = self.term.term.metadata["type"]
if self.type == "categoric":
self.binary = len(np.unique(self.design_vector)) == 2
self.levels = self.term.term.metadata["levels"]
if self.binary:
self.success = self.term.term.metadata["reference"]
else:
self.baseline = self.term.term.metadata["reference"]
def _evaluate_new_data(self, data):
if self.type == "proportion":
return self.term.term.eval_new_data(data)
raise ValueError("Can't evaluate response term with type different to 'proportion'")
[docs] def as_dataframe(self):
"""Returns ``self.design_vector`` as a pandas.DataFrame."""
data = pd.DataFrame(self.design_vector)
if self.type == "categoric":
colname = f"{self.name}[{self.baseline}]"
else:
colname = self.name
data.columns = [colname]
return data
def __repr__(self):
return self.__str__()
def __str__(self):
string_list = [
f"name: {self.name}",
f"type: {self.type}",
f"length: {len(self.design_vector)}",
]
if self.type == "categoric":
string_list += [f"levels: {self.levels}", f"binary: {self.binary}"]
if self.binary:
string_list += [f"success: {self.success}"]
else:
string_list += [f"baseline: {self.baseline}"]
return f"ResponseVector({wrapify(spacify(multilinify(string_list)))}\n)"
[docs]class CommonEffectsMatrix:
"""Representation of the design matrix for the common effects of a model.
Parameters
----------
model : Model
A ``Model`` object containing only terms for the common effects of the model.
data: pandas.DataFrame
The data frame where variables are taken from.
eval_env: EvalEnvironment
The environment where values and functions are taken from.
Attributes
----------
design_matrix: np.array
A 2-dimensional numpy array containing the values of the design matrix.
evaluated: bool
Indicates if the terms have been evaluated at least once. The terms must have been evaluated
before calling ``self._evaluate_new_data()`` because we must know the type of each term
to correctly handle the new data passed and the terms here.
terms_info: dict
A dictionary that holds information related to each of the common specific terms, such as
``"cols"``, ``"type"``, and ``"full_names"``. If ``"type"`` is ``"categoric"``, it also
contains ``"groups"``, ``"encoding"``, ``"levels"``, and ``"reference"``.
The keys are given by the term names.
"""
def __init__(self, model):
self.model = model
self.data = None
self.eval_env = None
self.design_matrix = None
self.terms_info = None
self.evaluated = False
[docs] def _evaluate(self, data, eval_env):
"""Obtain design matrix for common effects.
Evaluates ``self.model`` inside the data mask provided by ``data`` and updates
``self.design_matrix``. This method also sets the values of ``self.data`` and
``self.eval_env``.
It also populates the dictionary ``self.terms_info`` with information related to each term,
such as the type, the columns they occupy in the design matrix and the names of the columns.
Parameters
----------
data: pandas.DataFrame
The data frame where variables are taken from
eval_env: EvalEnvironment
The environment where values and functions are taken from.
"""
self.data = data
self.eval_env = eval_env
d = self.model.eval(self.data, self.eval_env)
self.design_matrix = np.column_stack([d[key] for key in d.keys()])
self.terms_info = {}
# Get types and column slices
start = 0
for term in self.model.terms:
self.terms_info[term.name] = term.metadata
delta = d[term.name].shape[1]
if term._type == "interaction": # pylint: disable = protected-access
self.terms_info[term.name]["levels"] = self._interaction_levels(term.name)
self.terms_info[term.name]["full_names"] = self._term_full_names(term.name)
self.terms_info[term.name]["cols"] = slice(start, start + delta)
start += delta
self.evaluated = True
[docs] def _evaluate_new_data(self, data):
"""Evaluates common terms with new data and return a new instance of
``CommonEffectsMatrix``.
This method is intended to be used to obtain design matrices for new data and obtain
out of sample predictions. Stateful transformations are properly handled if present in any
of the terms, which means parameters involved in the transformation are not overwritten with
the new data.
Parameters
----------
data: pandas.DataFrame
The data frame where variables are taken from
Returns
----------
new_instance: CommonEffectsMatrix
A new instance of ``CommonEffectsMatrix`` whose design matrix is obtained with the
values in the new data set.
"""
# Create and return new CommonEffectsMatrix from the information in the terms, with new data
if not self.evaluated:
raise ValueError("Can't evaluate new data on unevaluated matrix.")
new_instance = self.__class__(self.model)
new_instance.data = data
new_instance.eval_env = self.eval_env
new_instance.terms_info = deepcopy(self.terms_info)
new_instance.design_matrix = np.column_stack(
[term.eval_new_data(data) for term in self.model.terms]
)
new_instance.evaluated = True
return new_instance
[docs] def as_dataframe(self):
"""Returns `self.design_matrix` as a pandas.DataFrame."""
colnames = [self._term_full_names(name) for name in self.terms_info]
data = pd.DataFrame(self.design_matrix)
data.columns = list(flatten_list(colnames))
return data
def _term_full_names(self, name): # pylint: disable=inconsistent-return-statements
# Always returns a list
term = self.terms_info[name]
_type = term["type"]
if _type == "intercept":
return ["Intercept"]
elif _type in ["numeric", "offset"]:
return [name]
elif _type == "interaction":
return interaction_label(term)
elif _type == "categoric":
# "levels" is present when we have dummy encoding (not just a vector of 0-1)
if "levels" in term.keys():
# Ask if encoding is "full" or "reduced"
levels = term["levels"] if term["encoding"] == "full" else term["levels"][1:]
return [f"{name}[{level}]" for level in levels]
else:
return [f"{name}[{term['reference']}]"]
def _interaction_levels(self, name):
terms = self.terms_info[name]["terms"]
colnames = []
for v in terms.values():
if v["type"] == "categoric":
levels = v["levels"] if v["encoding"] == "full" else v["levels"][1:]
colnames.append([str(level) for level in levels])
if colnames:
return [", ".join(str_tuple) for str_tuple in list(itertools.product(*colnames))]
else:
return None
[docs] def __getitem__(self, term):
"""Get the sub-matrix that corresponds to a given term.
Parameters
----------
term: string
The name of the term.
Returns
----------
matrix: np.array
A 2-dimensional numpy array that represents the sub-matrix corresponding to the
term passed.
"""
if term not in self.terms_info.keys():
raise ValueError(f"'{term}' is not a valid term name")
return self.design_matrix[:, self.terms_info[term]["cols"]]
def __repr__(self):
return self.__str__()
def __str__(self):
string = [f"'{k}': {{{spacify(term_str(v))}\n}}" for k, v in self.terms_info.items()]
string = multilinify(string)
string = [
f"shape: {self.design_matrix.shape}",
f"terms: {{{spacify(string)}\n}}",
]
return f"CommonEffectsMatrix({wrapify(spacify(multilinify(string)))}\n)"
[docs]class GroupEffectsMatrix:
"""Representation of the design matrix for the group specific effects of a model.
The sub-matrix that corresponds to a specific group effect can be accessed by
``self[term_name]``, for example ``self["1|g"]``.
Parameters
----------
terms : list
A list of ``GroupSpecificTerm`` objects.
data: pandas.DataFrame
The data frame where variables are taken from
eval_env: EvalEnvironment
The environment where values and functions are taken from.
Attributes
----------
design_matrix: np.array
A 2 dimensional numpy array with the values of the design matrix.
evaluated: bool
Indicates if the terms have been evaluated at least once. The terms must have been evaluated
before calling ``self._evaluate_new_data()`` because we must know the type of each term
to correctly handle the new data passed and the terms here.
terms_info: dict
A dictionary that holds information related to each of the group specific terms, such as
the matrices ``"Xi"`` and ``"Ji"``, ``"cols"``, ``"type"``, and ``"full_names"``. If
``"type"`` is ``"categoric"``, it also contains ``"groups"``, ``"encoding"``, ``"levels"``,
and ``"reference"``. The keys are given by the term names.
"""
def __init__(self, terms):
self.terms = terms
self.data = None
self.eval_env = None
self.design_matrix = np.zeros((0, 0))
self.terms_info = {}
self.evaluated = False
[docs] def _evaluate(self, data, eval_env):
"""Evaluate group specific terms.
This evaluates ``self.terms`` inside the data mask provided by ``data`` and the environment
``eval_env``. It updates ``self.design_matrix`` with the result from the evaluation of each
term.
This method also sets the values of ``self.data`` and ``self.eval_env``. It also populates
the dictionary ``self.terms_info`` with information related to each term,such as the type,
the columns and rows they occupy in the design matrix and the names of the columns.
Parameters
----------
data: pandas.DataFrame
The data frame where variables are taken from
eval_env: EvalEnvironment
The environment where values and functions are taken from.
"""
self.data = data
self.eval_env = eval_env
start = 0
Z = []
self.terms_info = {}
for term in self.terms:
encoding = True
# If both (1|g) and (x|g) are in the model, then the encoding for x is False.
if not isinstance(term.expr, Intercept):
for term_ in self.terms:
if term_.factor == term.factor and isinstance(term_.expr, Intercept):
encoding = False
d = term.eval(self.data, self.eval_env, encoding)
# Grab subcomponent of Z that corresponds to this term
Zi = d["Zi"]
delta = Zi.shape[1]
Z.append(Zi)
name = term.get_name()
self.terms_info[name] = {k: v for k, v in d.items() if k != "Zi"}
if self.terms_info[name]["type"] == "interaction": # pylint: disable = protected-access
self.terms_info[name]["levels"] = self._interaction_levels(name)
# Generate term names
self.terms_info[name]["full_names"] = self._term_full_names(name, term.expr.name)
self.terms_info[name]["cols"] = slice(start, start + delta)
start += delta
if Z:
self.design_matrix = np.column_stack(Z)
self.evaluated = True
[docs] def _evaluate_new_data(self, data):
"""Evaluates group specific terms with new data and return a new instance of
``GroupEffectsMatrix``.
This method is intended to be used to obtain design matrices for new data and obtain
out of sample predictions. Stateful transformations are properly handled if present in any
of the group specific terms, which means parameters involved in the transformation are not
overwritten with the new data.
Parameters
----------
data: pandas.DataFrame
The data frame where variables are taken from
Returns
----------
new_instance: GroupEffectsMatrix
A new instance of ``GroupEffectsMatrix`` whose design matrix is obtained with the values
in the new data set.
"""
if not self.evaluated:
raise ValueError("Can't evaluate new data on unevaluated matrix.")
new_instance = self.__class__(self.terms)
start = 0
Z = []
for term in self.terms:
d = term.eval_new_data(data)
# Grab subcomponent of Z that corresponds to this term
Zi = d["Zi"]
delta = Zi.shape[1]
Z.append(Zi)
name = term.get_name()
new_instance.terms_info[name] = deepcopy(self.terms_info[name])
new_instance.terms_info[name]["cols"] = slice(start, start + delta)
start += delta
new_instance.data = data
new_instance.eval_env = self.eval_env
if Z:
new_instance.design_matrix = np.column_stack(Z)
return new_instance
def _term_full_names(self, name, expr):
# Always returns a list. This should be clearer in the future.
term = self.terms_info[name]
groups = term["groups"]
if term["type"] in ["intercept", "numeric"]:
names = [f"{name}[{group}]" for group in groups]
elif term["type"] == "interaction":
levels = interaction_label(term)
names = [f"{level}|{group}" for group in groups for level in levels]
elif term["type"] == "categoric":
if "levels" in term.keys():
# Ask if encoding is "full" or "reduced"
levels = term["levels"] if term["encoding"] == "full" else term["levels"][1:]
else:
levels = [term["reference"]]
names = [f"{expr}[{level}]|{group}" for group in groups for level in levels]
return names
def _interaction_levels(self, name):
terms = self.terms_info[name]["terms"]
colnames = []
for v in terms.values():
if v["type"] == "categoric":
levels = v["levels"] if v["encoding"] == "full" else v["levels"][1:]
colnames.append([str(level) for level in levels])
if colnames:
return [", ".join(str_tuple) for str_tuple in list(itertools.product(*colnames))]
else:
return None
[docs] def __getitem__(self, term):
"""Get the sub-matrix that corresponds to a given term.
Parameters
----------
term: string
The name of a group specific term.
Returns
----------
matrix: np.array
A 2-dimensional numpy array that represents the sub-matrix corresponding to the
term passed.
"""
if term not in self.terms_info.keys():
raise ValueError(f"'{term}' is not a valid term name")
return self.design_matrix[:, self.terms_info[term]["cols"]]
def __repr__(self):
return self.__str__()
def __str__(self):
string = [f"'{k}': {{{spacify(term_str(v))}\n}}" for k, v in self.terms_info.items()]
string = multilinify(string)
string = [
f"shape: {self.design_matrix.shape}",
f"terms: {{{spacify(string)}\n}}",
]
return f"GroupEffectsMatrix({wrapify(spacify(multilinify(string)))}\n)"
[docs]def design_matrices(formula, data, na_action="drop", eval_env=0):
"""Parse model formula and obtain a ``DesignMatrices`` object containing objects representing
the response and the design matrices for both the common and group specific effects.
Parameters
----------
formula : string
A model formula.
data: pandas.DataFrame
The data frame where variables in the formula are taken from.
na_action: string
Describes what to do with missing values in ``data``. ``"drop"`` means to drop
all rows with a missing value, ``"error"`` means to raise an error. Defaults
to ``"drop"``.
eval_env: integer
The number of environments we walk up in the stack starting from the function's caller
to capture the environment where formula is evaluated. Defaults to 0 which means
the evaluation environment is the environment where ``design_matrices`` is called.
Returns
----------
design: DesignMatrices
An instance of DesignMatrices that contains the design matrice(s) described by
``formula``.
"""
if not isinstance(formula, str):
raise ValueError("'formula' must be a string.")
if len(formula) == 0:
raise ValueError("'formula' cannot be an empty string.")
if not isinstance(data, pd.DataFrame):
raise ValueError("'data' must be a pandas.DataFrame.")
if data.shape[0] == 0:
raise ValueError("'data' does not contain any observation.")
if data.shape[1] == 0:
raise ValueError("'data' does not contain any variable.")
if na_action not in ["drop", "error"]:
raise ValueError("'na_action' must be either 'drop' or 'error'")
eval_env = EvalEnvironment.capture(eval_env, reference=1)
description = model_description(formula)
# Incomplete rows are calculated using columns involved in model formula only
cols_to_select = description.var_names.intersection(set(data.columns))
data = data[list(cols_to_select)]
incomplete_rows = data.isna().any(axis=1)
incomplete_rows_n = incomplete_rows.sum()
if incomplete_rows_n > 0:
if na_action == "drop":
_log.info(
"Automatically removing %s/%s rows from the dataset.",
incomplete_rows_n,
data.shape[0],
)
data = data[~incomplete_rows]
else:
raise ValueError(f"'data' contains {incomplete_rows_n} incomplete rows.")
design = DesignMatrices(description, data, eval_env)
return design
# Utils
def term_str(term):
if term["type"] == "interaction":
string_list = [f"{k}: {v}" for k, v in term.items() if k not in ["terms", "Xi", "Ji"]]
string_vars = [f"'{k}': {{{spacify(term_str(v))}\n}}" for k, v in term["terms"].items()]
string = multilinify(string_list + [f"vars: {{{spacify(multilinify(string_vars))}\n}}"])
else:
string = multilinify([f"{k}: {v}" for k, v in term.items() if k not in ["Xi", "Ji"]])
return string
def interaction_label(x):
terms = x["terms"]
colnames = []
for k, v in terms.items():
if v["type"] == "numeric":
colnames.append([k])
if v["type"] == "categoric":
if "levels" in v.keys():
# ask whether encoding is full or reduced
if v["encoding"] == "full":
colnames.append([f"{k}[{level}]" for level in v["levels"]])
else:
colnames.append([f"{k}[{level}]" for level in v["levels"][1:]])
else:
colnames.append([f"{k}[{v['reference']}]"])
return [":".join(str_tuple) for str_tuple in list(itertools.product(*colnames))]
def spacify(string):
return " " + " ".join(string.splitlines(True))
def multilinify(l, sep=","):
sep += "\n"
return "\n" + sep.join(l)
def wrapify(string, width=100):
l = string.splitlines(True)
wrapper = textwrap.TextWrapper(width=width)
for idx, line in enumerate(l):
if len(line) > width:
leading_spaces = len(line) - len(line.lstrip(" "))
wrapper.subsequent_indent = " " * (leading_spaces + 2)
wrapped = wrapper.wrap(line)
l[idx] = "\n".join(wrapped) + "\n"
return "".join(l)