Source code for bambi.priors.scaler

```import numpy as np

from bambi.families.univariate import Gaussian, StudentT, VonMises

from .prior import Prior

[docs]class PriorScaler:
"""Scale prior distributions parameters."""

# Standard deviation multiplier.
STD = 2.5

def __init__(self, model):
self.model = model
self.has_intercept = model.intercept_term is not None
self.priors = {}

# Compute mean and std of the response
if isinstance(self.model.family, (Gaussian, StudentT)):
self.response_mean = np.mean(model.response.data)
self.response_std = np.std(self.model.response.data)
else:
self.response_mean = 0
self.response_std = 1

def get_intercept_stats(self):
mu = self.response_mean
sigma = self.STD * self.response_std

# Only adjust mu and sigma if there is at least one Normal prior for a common term.
if self.priors:
sigmas = np.hstack([prior["sigma"] for prior in self.priors.values()])
x_mean = np.hstack([self.model.terms[term].data.mean(axis=0) for term in self.priors])
sigma = (sigma**2 + np.dot(sigmas**2, x_mean**2)) ** 0.5

return mu, sigma

def get_slope_sigma(self, x):
return self.STD * (self.response_std / np.std(x))

def scale_response(self):
# Add cases for other families
priors = self.model.family.likelihood.priors
if isinstance(self.model.family, (Gaussian, StudentT)):
if priors["sigma"].auto_scale:
priors["sigma"] = Prior("HalfStudentT", nu=4, sigma=self.response_std)
elif isinstance(self.model.family, VonMises):
if priors["kappa"].auto_scale:
priors["kappa"] = Prior("HalfStudentT", nu=4, sigma=self.response_std)

def scale_intercept(self, term):
if term.prior.name != "Normal":
return
mu, sigma = self.get_intercept_stats()
term.prior.update(mu=mu, sigma=sigma)

def scale_common(self, term):
if term.prior.name != "Normal":
return

# As many zeros as columns in the data. It can be greater than 1 for categorical variables
mu = np.zeros(term.data.shape[1])
sigma = np.zeros(term.data.shape[1])

# Iterate over columns in the data
for i, x in enumerate(term.data.T):
sigma[i] = self.get_slope_sigma(x)

# Save and set prior
self.priors.update({term.name: {"mu": mu, "sigma": sigma}})
term.prior.update(mu=mu, sigma=sigma)

def scale_group_specific(self, term):
if term.prior.args["sigma"].name != "HalfNormal":
return

# Handle intercepts
if term.kind == "intercept":
_, sigma = self.get_intercept_stats()
# Handle slopes
else:
# Recreate the corresponding common effect data
if len(term.predictor.shape) == 2:
data_as_common = term.predictor
else:
data_as_common = term.predictor[:, None]
sigma = np.zeros(data_as_common.shape[1])
for i, x in enumerate(data_as_common.T):
sigma[i] = self.get_slope_sigma(x)
term.prior.args["sigma"].update(sigma=np.squeeze(np.atleast_1d(sigma)))

def scale(self):

# Scale response
self.scale_response()

# Scale common terms
for term in self.model.common_terms.values():
if term.prior.auto_scale:
self.scale_common(term)

# Scale intercept
if self.has_intercept:
term = self.model.intercept_term
if term.prior.auto_scale:
self.scale_intercept(term)

# Scale group-specific terms
for term in self.model.group_specific_terms.values():
if term.prior.auto_scale:
self.scale_group_specific(term)
```