Source code for AstronomyCalc.mcmc

import numpy as np
import matplotlib.pyplot as plt
from scipy.stats import multivariate_normal
from joblib import Parallel, delayed
from tqdm import tqdm
import corner

[docs] class ImportanceSampling: """ Simple Importance Sampling for estimating expectations with MCMC. """ def __init__(self, target_distribution, proposal_distribution=None, proposal_sampler=None, n_jobs=1, proposal_mean=None, proposal_cov=None): """ Initialize the Importance Sampling class. Parameters: - target_distribution (function): The target probability distribution (unnormalized) from which we want to sample. - proposal_distribution (function, optional): The proposal probability distribution we can sample from. Default is a Gaussian distribution. - proposal_sampler (function, optional): A function that generates samples from the proposal distribution. Default is a Gaussian sampler. - n_jobs (int): Number of parallel jobs for sampling. Default is 1. - proposal_mean (array-like, optional): The mean of the Gaussian proposal distribution. Default is a zero vector. - proposal_cov (array-like, optional): The covariance matrix of the Gaussian proposal distribution. Default is the identity matrix. """ self.target_distribution = target_distribution self.n_jobs = n_jobs # Set up default proposal distribution as Gaussian if not provided self.proposal_mean = proposal_mean if proposal_mean is not None else np.zeros(2) self.proposal_cov = proposal_cov if proposal_cov is not None else np.eye(2) if proposal_distribution is None: self.proposal_distribution = self._gaussian_proposal_distribution else: self.proposal_distribution = proposal_distribution if proposal_sampler is None: self.proposal_sampler = self._gaussian_proposal_sampler else: self.proposal_sampler = proposal_sampler def _gaussian_proposal_sampler(self, n_samples): """ Default Gaussian proposal sampler. Parameters: - n_samples (int): The number of samples to generate. Returns: - samples (np.ndarray): The generated samples from a Gaussian distribution. """ return np.random.multivariate_normal(self.proposal_mean, self.proposal_cov, size=n_samples) def _gaussian_proposal_distribution(self, x): """ Default Gaussian proposal distribution (pdf). Parameters: - x (np.ndarray): The points at which to evaluate the proposal distribution. Returns: - prob (np.ndarray): The probability density of the points under the Gaussian distribution. """ size = len(self.proposal_mean) det = np.linalg.det(self.proposal_cov) norm_const = 1.0 / (np.power((2 * np.pi), float(size) / 2) * np.power(det, 1.0 / 2)) x_mu = x - self.proposal_mean inv = np.linalg.inv(self.proposal_cov) result = np.einsum('...k,kl,...l->...', x_mu, inv, x_mu) return norm_const * np.exp(-0.5 * result)
[docs] def sample(self, n_samples=10000): samples = self.proposal_sampler(n_samples) target_probs = self.target_distribution(samples) proposal_probs = self.proposal_distribution(samples) # Compute the importance weights weights = target_probs / proposal_probs weights /= np.sum(weights) # Normalize weights to sum to 1 return samples, weights
[docs] def estimate_expectation(self, function): samples, weights = self.sample() expectations = function(samples) return np.sum(weights * expectations)
[docs] class MetropolisHastings: ''' Metropolis-Hastings Algorithm for Monte Carlo Markov Chains. ''' def __init__(self, nwalkers, ndim, log_probability, proposal='Normal', proposal_cov=None, n_jobs=1): """ Initialize the Metropolis-Hastings sampler. Parameters: - nwalkers (int): Number of walkers (independent chains) to run in parallel. - ndim (int): Number of dimensions in the parameter space. - log_probability (callable): The log-probability function to be sampled. This function should take a parameter array of shape (ndim,) and return the log-probability. - proposal (str, optional): The type of proposal distribution. Currently, only 'Normal' is supported. Default is 'Normal'. - proposal_cov (ndarray, optional): The covariance matrix of the proposal distribution. If None, a default covariance matrix must be set later, or adaptive covariance will be used. - n_jobs (int, optional): The number of parallel jobs to run. Default is 1, which runs the chains sequentially. Setting this to a higher number can speed up the sampling process. Attributes: - proposal (str): The type of proposal distribution. - proposal_cov (ndarray): The covariance matrix for the proposal distribution. - n_jobs (int): The number of parallel jobs to run. - nwalkers (int): Number of independent chains to run. - ndim (int): Dimensionality of the parameter space. - log_probability (callable): The log-probability function used in sampling. - samples (ndarray): Array to store the samples generated by the MCMC sampler. Initialized with zeros and will be filled as sampling progresses. Raises: - AssertionError: If the proposal distribution is not 'Normal'. """ assert proposal.lower() == 'normal', "Currently only 'Normal' proposal is supported." self.proposal = proposal self.proposal_cov = proposal_cov self.n_jobs = n_jobs self.nwalkers = nwalkers self.ndim = ndim self.log_probability = log_probability # Initialize samples self.samples = np.zeros((nwalkers, 1, ndim))
[docs] def draw_from_proposal(self, mean, cov, size=None): if self.proposal.lower() == 'normal': next_samples = np.random.multivariate_normal(mean, cov, size=size) q_next_mean = 1 # Symmetric proposal, so q(next | mean) = q(mean | next) q_mean_next = 1 else: raise NotImplementedError("Proposal type not supported.") return next_samples, q_next_mean, q_mean_next
[docs] def acceptance_probability(self, mean, next_sample, q_next_mean, q_mean_next): logp_next = self.log_probability(next_sample) logp_mean = self.log_probability(mean) np.seterr(over='ignore') r = np.exp(logp_next-logp_mean, dtype=np.float64)*q_mean_next/q_next_mean np.seterr(over='warn') A = min(1, r) return A
[docs] def adapt_proposal_covariance(self, n_step, adapt_window=100, scale_factor=100): if adapt_window is None: return self.proposal_cov if n_step % adapt_window != 0 or n_step == 0: return self.proposal_cov # Flatten samples for covariance estimation samples_flattened = self.samples[:, -adapt_window:, :].reshape(-1, self.ndim) if len(samples_flattened) == 0: return self.proposal_cov cov_matrix = np.cov(samples_flattened, rowvar=False) self.proposal_cov = cov_matrix*scale_factor return self.proposal_cov
[docs] def initialise(self, initial_value): n_start = self.samples.shape[1] if initial_value is None and n_start == 1: print('Provide initial position values for the walkers.') return None elif initial_value is not None: self.samples[:, 0, :] = initial_value
def _walk(self, walker_num, proposal_cov): mean = self.samples[walker_num, -1, :] next_sample, q_next_mean, q_mean_next = self.draw_from_proposal(mean, proposal_cov) p_acc = self.acceptance_probability(mean, next_sample, q_next_mean, q_mean_next) u_acc = np.random.uniform(0, 1) if u_acc <= p_acc: new_samples = next_sample else: new_samples = mean # print(new_samples, u_acc, p_acc) return np.array(new_samples)
[docs] def walk(self, proposal_cov): backend = 'threading' # 'loky' can be used for better performance in some cases new_samples = Parallel(n_jobs=self.n_jobs, backend=backend)( delayed(self._walk)(walker_num, proposal_cov) for walker_num in range(self.nwalkers) ) # new_samples = np.array([self.walk(walker_num, proposal_cov) for walker_num in range(self.nwalkers)]) self.samples = np.concatenate((self.samples, np.array(new_samples)[:, None, :]), axis=1) return new_samples
[docs] def run_sampler(self, n_samples, initial_value=None, adapt_window=None): n_start = self.samples.shape[1] self.initialise(initial_value) for n_step in tqdm(range(n_start, n_samples+1)): if n_step==n_start: pass else: proposal_cov = self.adapt_proposal_covariance(n_step, adapt_window=adapt_window) new_samples = self.walk(proposal_cov) # self.samples = np.concatenate((self.samples, np.array(new_samples)[:, None, :]), axis=1) return self.samples
[docs] def get_flat_samples(self, burn_in=0): samples_flattened = self.samples[:,burn_in:,:].reshape(-1, self.ndim) return samples_flattened
[docs] def plot_posterior_corner(flat_samples, labels, truths=None, weights=None, confidence_levels=None, smooth=0.5, smooth1d=0.5): """ Plots the posterior distributions with corner plots, including 1-sigma and 2-sigma contours. Parameters: - flat_samples (dict or array-like): Dictionary with dataset names as keys and MCMC samples as values, or just the samples array if only one dataset is present. - labels (list of str, optional): Labels for the parameters. - truths (array-like, optional): True parameter values for comparison. - weights (array-like, optional): Weights for the samples. - confidence_levels (list of float, optional): Confidence levels for the contours (e.g., [0.68, 0.95, 0.99]). - smooth (float, optional): Smoothing factor for 2D contours. Default is 0.5. - smooth1d (float, optional): Smoothing factor for 1D histograms. Default is 0.5. Returns: - None """ if not isinstance(flat_samples, dict): flat_samples = {None: flat_samples} if confidence_levels is None: confidence_levels = [0.68, 0.95, 0.99] # Default to 1-sigma, 2-sigma and 3-sigma levels fig = None for ii, name in enumerate(flat_samples.keys()): flats = flat_samples[name] print(flats.shape) fig = corner.corner( flats, weights=weights, fig=fig, labels=labels, truths=truths, levels=confidence_levels, smooth=smooth, smooth1d=smooth1d, plot_density=True, plot_contours=True, fill_contours=True, show_titles=True, title_fmt=".2f", title_kwargs={"fontsize": 12}, label_kwargs={"fontsize": 14}, truth_color="black", color=f'C{ii}', contour_kwargs={"linewidths": 1.5}, hist_kwargs={"color": f'C{ii}'}, ) # Add dataset name to the plots if provided if name is not None: ax = fig.axes[flats.shape[1]-1] ax.text(0.95, 0.95-ii*0.1, name, transform=ax.transAxes, fontsize=14, verticalalignment='top', horizontalalignment='right') plt.show()
if __name__ == '__main__': import AstronomyCalc import numpy as np import matplotlib.pyplot as plt true_m = 2 true_b = 1 x_obs, y_obs, y_err = AstronomyCalc.line_data(true_m=true_m, true_b=true_b, sigma=2, error_sigma=0.5).T def model(param): m, b = param return m*x_obs+b true_param = [true_m,true_b] plt.title('Observation') plt.errorbar(x_obs, y_obs, yerr=y_err, color='k', ls=' ', marker='o') plt.plot(x_obs, model(true_param), color='r', ls='--') plt.xlabel('X') plt.ylabel('Y') plt.show() def log_likelihood(param): y_mod = model(param) # y_err = 0.5*y_obs logL = -np.sum((y_obs-y_mod)**2/2/y_err**2) return logL mins = np.array([-2,-3]) maxs = np.array([6,5]) def log_prior(param): m, b = param if mins[0]<=m<=maxs[0] and mins[1]<=b<=maxs[1]: return 0 else: return -np.inf def log_probability(param): lp = log_prior(param) if np.isfinite(lp): return lp+log_likelihood(param) return -np.inf print(log_probability(true_param)) print(log_probability([0,1])) print(log_probability([10,1])) # Example usage nwalkers = 16 ndim = 2 n_samples = 2000 initial_value = np.random.uniform(size=(nwalkers, ndim))*(maxs-mins)+mins mh_sampler = MetropolisHastings(nwalkers=nwalkers, ndim=ndim, log_probability=log_probability, proposal_cov=np.eye(ndim), n_jobs=2) samples = mh_sampler.run_sampler(n_samples=n_samples, initial_value=initial_value) print("Samples shape:", samples.shape) flat_samples = mh_sampler.get_flat_samples(burn_in=100) print('Flat samples shape:', flat_samples.shape) labels = ['$m$', '$b$'] fig, axs = plt.subplots(ndim, 1, figsize=(5,ndim*3)) for i in range(ndim): axs[i].set_ylabel(labels[i], fontsize=16) axs[i].set_xlabel('step number', fontsize=16) for j in range(nwalkers): axs[i].plot(samples[j,:,i], c=f'C{j}') plt.tight_layout() plt.show() plot_posterior_corner(flat_samples, labels, truths=true_param) def target_distribution(param): if np.array(param).ndim==1: return np.exp(log_probability(param)) else: return np.array([np.exp(log_probability(par)) for par in param]) print(target_distribution(true_param)) print(target_distribution([0,1])) print(target_distribution([10,1])) # Set up the proposal distribution mean and covariance proposal_mean = [2.5, 1.5] proposal_cov = [[2.0, 1.5], [1.5, 2.0]] # Initialize the Importance Sampling with default Gaussian proposal is_sampler = ImportanceSampling(target_distribution, proposal_mean=proposal_mean, proposal_cov=proposal_cov) # Draw samples and compute expectation samples, weights = is_sampler.sample(n_samples=1000) # Plot samples plt.scatter(samples[:, 0], samples[:, 1], c=weights, cmap='viridis', alpha=0.2) plt.scatter(true_m, true_b, marker='X', color='red') plt.xlabel('m') plt.ylabel('b') plt.colorbar(label='Importance Weights') plt.show() plot_posterior_corner(samples, labels, truths=true_param, weights=weights)