from abc import abstractmethod, ABCMeta
from bambi.external.six import string_types
from bambi import diagnostics as bmd
import re
import warnings
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from .utils import listify
    import pymc3 as pm
    if hasattr(pm.plots, 'kdeplot_op'):
        pma = pm.plots
        pma = pm.plots.artists
    pma = None

__all__ = ['MCMCResults', 'PyMC3ADVIResults']

class ModelResults(object):

    '''Base class for ModelResults hierarchy.

        model (Model): a bambi Model instance specifying the model.

    __metaclass__ = ABCMeta

    def __init__(self, model):

        self.model = model
        self.terms = list(model.terms.values())
        self.diagnostics = model._diagnostics \
            if hasattr(model, '_diagnostics') else None

    def plot(self):

    def summary(self):

[docs]class MCMCResults(ModelResults): '''Holds sampler results; provides slicing, plotting, and summarization tools. Attributes: model (Model): a bambi Model instance specifying the model. data (array-like): Raw storage of MCMC samples in array with dimensions 0, 1, 2 = samples, chains, variables names (list): Names of all Terms. dims (list): Numbers of levels for all Terms. levels (list): Names of all levels for all Terms. transformed (list): Optional list of variable names to treat as transformed--and hence, to exclude from the output by default. ''' def __init__(self, model, data, names, dims, levels, transformed_vars=None): # store the arguments = data self.names = names self.dims = dims self.levels = levels self.transformed_vars = transformed_vars # compute basic stuff to use later self.n_samples = data.shape[0] self.n_chains = data.shape[1] self.n_params = data.shape[2] self.n_terms = len(names) if transformed_vars is not None: utv = list(set(names) - set(transformed_vars)) else: utv = names self.untransformed_vars = utv # this keeps track of which columns in 'data' go with which terms self.index = np.cumsum( [0] + [x[0] if len(x) else 1 for x in dims][:-1]) # build level_dict: dictionary of lists containing levels of each Term level_dict = {} for i, name, dim in zip(self.index, names, dims): dim = dim[0] if len(dim) else 1 level_dict[name] = levels[i:(i+dim)] self.level_dict = level_dict super(MCMCResults, self).__init__(model) def __getitem__(self, idx): ''' If a variable name, return MCMCResults with only that variable e.g., fit['subject'] If a list of variable names, return MCMCResults with those variables e.g., fit[['subject','item]] If a slice, return MCMCResults with sliced samples e.g., fit[500:] If a tuple, return MCMCResults with those variables sliced e.g., fit[['subject','item'], 500:] OR fit[500:, ['subject','item']] ''' if isinstance(idx, slice): var = self.names vslice = idx elif isinstance(idx, string_types): var = [idx] vslice = slice(0, self.n_samples) elif isinstance(idx, list): if not all([isinstance(x, string_types) for x in idx]): raise ValueError("If passing a list, all elements must be " "parameter names.") var = idx vslice = slice(0, self.n_samples) elif isinstance(idx, tuple): if len(idx) > 2: raise ValueError("Only two arguments can be passed. If you " "want to select multiple parameters and a " "subset of samples, pass a slice and a list " "of parameter names.") vslice = [i for i, x in enumerate(idx) if isinstance(x, slice)] if not len(vslice): raise ValueError("At least one argument must be a slice. If " "you want to select multiple parameters by " "name, pass a list (not a tuple) of names.") if len(vslice) > 1: raise ValueError("Slices can only be applied " "over the samples dimension.") var = idx[1 - vslice[0]] vslice = idx[vslice[0]] if not isinstance(var, (list, tuple)): var = [var] else: raise ValueError("Unrecognized index type.") # do slicing/selection and return subsetted copy of MCMCResults levels = sum([self.level_dict[v] for v in var], []) level_iloc = [self.levels.index(x) for x in levels] var_iloc = [self.names.index(v) for v in var] return MCMCResults(model=self.model,[vslice, :, level_iloc], names=var, dims=[self.dims[x] for x in var_iloc], levels=levels, transformed_vars=self.transformed_vars) def get_chains(self, indices): # Return copy of self but only for chains with the passed indices if not isinstance(indices, (list, tuple)): indices = [indices] return MCMCResults(model=self.model,[:, indices, :], names=self.names, dims=self.dims, levels=self.levels, transformed_vars=self.transformed_vars) def _filter_names(self, varnames=None, ranefs=False, transformed=False): names = self.untransformed_vars if not transformed else self.names if varnames is not None: names = [n for n in listify(varnames) if n in names] if not ranefs: names = [x for x in names if re.sub(r'_offset$', '', x) not in self.model.random_terms] Intercept = [n for n in names if "Intercept" in n] std = [n for n in names if "_" in n] rand_eff = [n for n in names if "|" in n and n not in std] interac = [n for n in names if ":" in n and n not in rand_eff + std] main_eff = [n for n in names if n not in interac + std + rand_eff + Intercept] names = Intercept + sorted(main_eff, key=len) + sorted(interac, key=len) \ + sorted(rand_eff, key=len) + sorted(std, key=len) return names
[docs] def plot(self, varnames=None, ranefs=True, transformed=False, combined=False, hist=False, bins=20, kind='trace'): '''Plots posterior distributions and sample traces. Basically a wrapperfor pm.traceplot() plus some niceties, based partly on code from: Args: varnames (list): List of variable names to plot. If None, all eligible variables are plotted. ranefs (bool): If True (default), shows trace plots for individual random effects. transformed (bool): If False (default), excludes internally transformed variables from plotting. combined (bool): If True, concatenates all chains into one before plotting. If False (default), plots separately lines for each chain (on the same axes). hist (bool): If True, plots a histogram for each fixed effect, in addition to the kde plot. To prevent visual clutter, histograms are never plotted for random effects. bins (int): If hist is True, the number of bins in the histogram. Ignored if hist is False. kind (str): Either 'trace' (default) or 'priors'. If 'priors', this just internally calls Model.plot() ''' def _plot_row(data, row, title, hist=True): # density plot axes[row, 0].set_title(title) if pma is not None: arr = np.atleast_2d(data.values.T).T pma.kdeplot_op(axes[row, 0], arr, bw = 4.5) else: data.plot(kind='kde', ax=axes[row, 0], legend=False) # trace plot axes[row, 1].set_title(title) data.plot(kind='line', ax=axes[row, 1], legend=False) # histogram if hist: if pma is not None: pma.histplot_op(axes[row, 0], arr) else: data.plot(kind='hist', ax=axes[row, 0], legend=False, normed=True, bins=bins) if kind == 'priors': return self.model.plot(varnames) # count the total number of rows in the plot names = self._filter_names(varnames, ranefs, transformed) random = [re.sub(r'_offset$', '', x) in self.model.random_terms for x in names] rows = sum([len(self.level_dict[p]) if not r else 1 for p, r in zip(names, random)]) # make the plot! fig, axes = plt.subplots(rows, 2, figsize=(12, 2*rows)) if rows == 1: axes = np.array([axes]) # For consistent 2D indexing _select_args = {'ranefs': ranefs, 'transformed': transformed} # Create list of chains (for combined, just one list w/ all chains) chains = list(range(self.n_chains)) if combined: chains = [chains] for c in chains: row = 0 for p in names: df = self[p].to_df(chains=c, **_select_args) # if p == 'floor|county_sd': # fixed effects if re.sub(r'_offset$', '', p) not in self.model.random_terms: for lev in self.level_dict[p]: lev_df = df[lev] _plot_row(lev_df, row, lev, hist) row += 1 # random effects else: _plot_row(df, row, p, hist=False) # too much clutter row += 1 fig.tight_layout() # For bernoulli models, tell user which event is being modeled if == 'bernoulli': event = next(i for i, x in enumerate( if x > .99) warnings.warn('Modeling the probability that {}==\'{}\''.format(, str(self.model.clean_data[][event]))) return axes
def _hpd_interval(self, x, width): """ Code adapted from pymc3.stats.calc_min_interval: """ x = np.sort(x) n = len(x) interval_idx_inc = int(np.floor(width * n)) n_intervals = n - interval_idx_inc interval_width = x[interval_idx_inc:] - x[:n_intervals] if len(interval_width) == 0: raise ValueError('Too few elements for interval calculation') min_idx = np.argmin(interval_width) hdi_min = x[min_idx] hdi_max = x[min_idx + interval_idx_inc] index = ['hpd{}_{}'.format(width, x) for x in ['lower', 'upper']] return pd.Series([hdi_min, hdi_max], index=index)
[docs] def summary(self, varnames=None, ranefs=False, transformed=False, hpd=.95, quantiles=None, diagnostics=['effective_n', 'gelman_rubin']): '''Returns a DataFrame of summary/diagnostic statistics for the parameters. Args: varnames (list): List of variable names to include; if None (default), all eligible variables are included. ranefs (bool): Whether or not to include random effects in the summary. Default is False. transformed (bool): Whether or not to include internally transformed variables in the summary. Default is False. hpd (float, between 0 and 1): Show Highest Posterior Density (HPD) intervals with specified width/proportion for all parameters. If None, HPD intervals are suppressed. quantiles (float, list): Show specified quantiles of the marginal posterior distributions for all parameters. If list, must be a list of floats between 0 and 1. If None (default), no quantiles are shown. diagnostics (list): List of functions to use to compute convergence diagnostics for all parameters. Each element can be either a callable or a string giving the name of a function in the diagnostics module. Valid strings are 'gelman_rubin' and 'effective_n'. Functions must accept a MCMCResults object as the sole input, and return a DataFrame with one labeled row per parameter. If None, no convergence diagnostics are computed. ''' samples = self.to_df(varnames, ranefs, transformed) # build the basic DataFrame df = pd.DataFrame({'mean': samples.mean(0), 'sd': samples.std(0)}) # add user-specified quantiles if quantiles is not None: if not isinstance(quantiles, (list, tuple)): quantiles = [quantiles] qnames = ['q' + str(q) for q in quantiles] df = df.merge(samples.quantile(quantiles).set_index([qnames]).T, left_index=True, right_index=True) # add HPD intervals if hpd is not None: df = df.merge(samples.apply(self._hpd_interval, axis=0, width=hpd).T, left_index=True, right_index=True) # add convergence diagnostics if diagnostics is not None: _names = self._filter_names(ranefs=ranefs, transformed=transformed) _self = self[_names] if self.n_chains > 1: for diag in diagnostics: if isinstance(diag, string_types): diag = getattr(bmd, diag) df = df.merge(diag(_self), left_index=True, right_index=True) else: warnings.warn('Multiple MCMC chains are required in order ' 'to compute convergence diagnostics.') # For bernoulli models, tell user which event is being modeled if == 'bernoulli': event = next(i for i, x in enumerate( if x > .99) warnings.warn('Modeling the probability that {}==\'{}\''.format(, str(self.model.clean_data[][event]))) return df
[docs] def to_df(self, varnames=None, ranefs=False, transformed=False, chains=None): ''' Returns the MCMC samples in a nice, neat pandas DataFrame with all MCMC chains concatenated. Args: varnames (list): List of variable names to include; if None (default), all eligible variables are included. ranefs (bool): Whether or not to include random effects in the returned DataFrame. Default is True. transformed (bool): Whether or not to include internally transformed variables in the result. Default is False. chains (int, list): Index, or list of indexes, of chains to concatenate. E.g., [1, 3] would concatenate the first and third chains, and ignore any others. If None (default), concatenates all available chains. ''' # filter out unwanted variables names = self._filter_names(varnames, ranefs, transformed) # concatenate the (pre-sliced) chains if chains is None: chains = list(range(self.n_chains)) chains = listify(chains) data = [[:, i, :] for i in chains] data = np.concatenate(data, axis=0) # construct the trace DataFrame df = sum([self.level_dict[x] for x in names], []) df = pd.DataFrame({x: data[:, self.levels.index(x)] for x in df}) return df
[docs]class PyMC3ADVIResults(ModelResults): '''Holds PyMC3 ADVI results and provides plotting and summarization tools. Attributes: model (Model): A bambi Model instance specifying the model. params (MultiTrace): ADVI parameters returned by PyMC3. ''' def __init__(self, model, params): self.means = params['means'] self.sds = params['stds'] self.elbo_vals = params['elbo_vals'] super(PyMC3ADVIResults, self).__init__(model)