#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Created on Tue May 1 15:58:22 2018
@author: prmiles
"""
from .MCMC import MCMC
from .chain import ChainStatistics as CS
from .chain import ChainProcessing as CP
from .samplers.utilities import is_sample_outside_bounds
from multiprocessing import Pool, cpu_count
import numpy as np
import sys
import os
import copy
import time
[docs]class ParallelMCMC:
'''
Run Parallel MCMC Simulations.
Attributes:
* :meth:`~setup_parallel_simulation`
* :meth:`~run_parallel_simulation`
* :meth:`~display_individual_chain_statistics`
'''
def __init__(self):
self.description = 'Run MCMC simulations in parallel'
[docs] def setup_parallel_simulation(self, mcset, initial_values = None, num_cores = 1, num_chain = 1):
'''
Setup simulation to run in parallel.
Settings defined in `mcset` object will be copied into different instances in
order to run parallel chains.
Args:
* **mcset** (:class:`~.MCMC`): Instance of MCMC object with serial simulation setup.
* **initial_values** (:class:`~numpy.ndarray`): Array of initial parameter values - [num_chain,npar].
* **num_cores** (:py:class:`int`): Number of cores designated by user.
* **num_chain** (:py:class:`int`): Number of sampling chains to be generated.
'''
# extract settings from mcset
data, options, model, parameters = unpack_mcmc_set(mcset = mcset)
npar, low_lim, upp_lim = get_parameter_features(parameters.parameters)
options = check_options_output(options)
# number of CPUs
self.num_cores = assign_number_of_cores(num_cores)
# check initial values and number of chains to be generated
self.initial_values, self.num_chain = check_initial_values(initial_values = initial_values, num_chain = num_chain, npar = npar, low_lim = low_lim, upp_lim = upp_lim)
# assign parallel log file directory
parallel_dir = options.savedir
check_directory(parallel_dir)
self.__parallel_dir = parallel_dir
# replicate features of mcstat object num_chain times
self.parmc = []
for ii in range(self.num_chain):
self.parmc.append(MCMC())
# replicate data
self.parmc[ii].data = copy.deepcopy(data)
# replicate model settings
self.parmc[ii].model_settings = copy.deepcopy(model)
# replicate simulation options and create log files for each
self.parmc[ii].simulation_options = copy.deepcopy(options)
chain_dir = str('chain_{}'.format(ii))
self.parmc[ii].simulation_options.savedir = str('{}{}{}'.format(self.__parallel_dir,os.sep,chain_dir))
self.parmc[ii].simulation_options.results_filename = str('{}.json'.format(chain_dir))
self.parmc[ii].simulation_options.json_restart_file = check_for_restart_file(options.json_restart_file, chain_dir)
# replicate parameter settings and assign distributed initial values
self.parmc[ii].parameters = copy.deepcopy(parameters)
for jj in range(npar):
self.parmc[ii].parameters.parameters[jj]['theta0'] = self.initial_values[ii,jj]
[docs] def run_parallel_simulation(self):
'''
Run MCMC simulations in parallel.
The code is run in parallel by using :class:`~.Pool`. While
running, you can expect a display similar to
::
Processing: <parallel_dir>/chain_1
Processing: <parallel_dir>/chain_0
Processing: <parallel_dir>/chain_2
The simulation is complete when you see the run time displayed.
::
Parallel simulation run time: 16.15234899520874 sec
'''
start = time.time()
mcpool = Pool(processes=self.num_cores) # depends on available cores
res = mcpool.map(run_serial_simulation, self.parmc)
mcpool.close() # not optimal! but easy
mcpool.join()
end = time.time()
self.__parsimutime = end - start
print('Parallel simulation run time: {} sec'.format(self.__parsimutime))
# assign results to invidual simulations
for ii in range(self.num_chain):
self.parmc[ii].simulation_results = res[ii]
# -------------------------
[docs] def display_individual_chain_statistics(self):
'''
Display chain statistics for different chains in parallel simulation.
Example display:
::
****************************************
Displaying results for chain 0
Files: <parallel_dir>/chain_0
---------------------
name : mean std MC_err tau geweke
m : 1.9869 0.1295 0.0107 320.0997 0.9259
b : 3.0076 0.2489 0.0132 138.1260 0.9413
---------------------
****************************************
Displaying results for chain 1
Files: <parallel_dir>/chain_1
---------------------
name : mean std MC_err tau geweke
m : 1.8945 0.4324 0.0982 2002.6361 0.3116
b : 3.2240 1.0484 0.2166 1734.0201 0.4161
---------------------
'''
for ii, mc in enumerate(self.parmc):
print('\n{}\nDisplaying results for chain {}\nFiles: {}'.format(40*'*',ii,mc.simulation_options.savedir))
CS.chainstats(mc.simulation_results.results['chain'],mc.simulation_results.results)
# -------------------------
[docs]def unpack_mcmc_set(mcset):
'''
Unpack attributes of MCMC object.
Args:
* **mcset** (:class:`~.MCMC`): MCMC object.
Returns:
* **data** (:class:`~.DataStructure`): MCMC data structure.
* **options** (:class:`~.SimulationOptions`): MCMC simulation options.
* **model** (:class:`~.ModelSettings`): MCMC model settings.
* **parameters** (:class:`~.ModelParameters`): MCMC model parameters.
'''
data = mcset.data
options = mcset.simulation_options
model = mcset.model_settings
parameters = mcset.parameters
return data, options, model, parameters
# -------------------------
[docs]def get_parameter_features(parameters):
'''
Get features of model parameters.
Args:
* **parameters** (:py:class:`list`): List of MCMC model parameter dictionaries.
Returns:
* **npar** (:py:class:`int`): Number of model parameters.
* **low_lim** (:class:`~numpy.ndarray`): Lower limits.
* **upp_lim** (:class:`~numpy.ndarray`): Upper limits.
'''
npar = len(parameters)
theta0 = np.zeros([npar])
low_lim = np.zeros([npar])
upp_lim = np.zeros([npar])
for jj in range(npar):
theta0[jj] = parameters[jj]['theta0']
low_lim[jj] = parameters[jj]['minimum']
upp_lim[jj] = parameters[jj]['maximum']
# check if infinity - needs to be finite for default mapping
if low_lim[jj] == -np.inf:
low_lim[jj] = theta0[jj] - 100*(np.abs(theta0[jj]))
print('Finite lower limit required - setting low_lim[{}] = {}'.format(jj,low_lim[jj]))
if upp_lim[jj] == np.inf:
upp_lim[jj] = theta0[jj] + 100*(np.abs(theta0[jj]))
print('Finite upper limit required - setting upp_lim[{}] = {}'.format(jj,upp_lim[jj]))
return npar, low_lim, upp_lim
# -------------------------
[docs]def check_initial_values(initial_values, num_chain, npar, low_lim, upp_lim):
'''
Check if initial values satisfy requirements.
Args:
* **initial_values** (:class:`~numpy.ndarray`): Array of initial parameter values - [num_chain,npar].
* **num_chain** (:py:class:`int`): Number of sampling chains to be generated.
* **npar** (:py:class:`int`): Number of model parameters.
* **low_lim** (:class:`~numpy.ndarray`): Lower limits.
* **upp_lim** (:class:`~numpy.ndarray`): Upper limits.
Returns:
* **initial_values** (:class:`~numpy.ndarray`): Array of initial parameter values - [num_chain,npar].
* **num_chain** (:py:class:`int`): Number of sampling chains to be generated.
'''
if initial_values is None:
initial_values = generate_initial_values(num_chain = num_chain, npar = npar, low_lim = low_lim, upp_lim = upp_lim)
else:
num_chain, initial_values = check_shape_of_users_initial_values(initial_values = initial_values, num_chain = num_chain, npar = npar)
initial_values = check_users_initial_values_wrt_limits(initial_values = initial_values, low_lim = low_lim, upp_lim = upp_lim)
return initial_values, num_chain
# -------------------------
[docs]def generate_initial_values(num_chain, npar, low_lim, upp_lim):
'''
Generate initial values by sampling from uniform distribution between limits
Args:
* **num_chain** (:py:class:`int`): Number of sampling chains to be generated.
* **npar** (:py:class:`int`): Number of model parameters.
* **low_lim** (:class:`~numpy.ndarray`): Lower limits.
* **upp_lim** (:class:`~numpy.ndarray`): Upper limits.
Returns:
* **initial_values** (:class:`~numpy.ndarray`): Array of initial parameter values - [num_chain,npar]
'''
u = np.random.random_sample(size = (num_chain, npar))
# map sampling between lower/upper limit
initial_values = low_lim + (upp_lim - low_lim)*u
return initial_values
# -------------------------
[docs]def check_shape_of_users_initial_values(initial_values, num_chain, npar):
'''
Check shape of users initial values
Args:
* **initial_values** (:class:`~numpy.ndarray`): Array of initial parameter values - expect [num_chain,npar]
* **num_chain** (:py:class:`int`): Number of sampling chains to be generated.
* **npar** (:py:class:`int`): Number of model parameters.
Returns:
* **num_chain** (:py:class:`int`): Number of sampling chains to be generated - equal to number of rows in initial values array.
* **initial_values**
'''
m,n = initial_values.shape
if m != num_chain:
print('Shape of initial values inconsistent with requested number of chains. \n num_chain = {}, initial_values.shape -> {},{}. Resizing num_chain to {}.'.format(num_chain, m, n, m))
num_chain = m
if n != npar:
print('Shape of initial values inconsistent with requested number of parameters. \n npar = {}, initial_values.shape -> {},{}. Only using first {} columns of initial_values.'.format(npar, m, n, npar))
initial_values = np.delete(initial_values, [range(npar,n+1)], axis = 1)
return num_chain, initial_values
# -------------------------
[docs]def check_users_initial_values_wrt_limits(initial_values, low_lim, upp_lim):
'''
Check users initial values wrt parameter limits
Args:
* **initial_values** (:class:`~numpy.ndarray`): Array of initial parameter values - expect [num_chain,npar]
* **low_lim** (:class:`~numpy.ndarray`): Lower limits.
* **upp_lim** (:class:`~numpy.ndarray`): Upper limits.
Returns:
* **initial_values** (:class:`~numpy.ndarray`): Array of initial parameter values - expect [num_chain,npar]
Raises:
* `SystemExit` if initial values are outside parameter bounds.
'''
outsidebounds = is_sample_outside_bounds(initial_values, low_lim, upp_lim)
if outsidebounds is True:
sys.exit(str('Initial values are not within parameter limits. Make sure they are within the following limits:\n\tLower: {}\n\tUpper: {}\nThe initial_values tested were:\n{}'.format(low_lim, upp_lim, initial_values)))
else:
return initial_values
# -------------------------
[docs]def check_options_output(options):
'''
Check output settings defined in options
Args:
* **options** (:class:`.SimulationOptions`): MCMC simulation options.
Returns:
* **options** (:class:`.SimulationOptions`): MCMC simulation options with at least binary save flag set to True.
'''
if options.save_to_txt == False and options.save_to_bin == False and options.save_to_json == False:
options.save_to_bin = True
return options
# -------------------------
[docs]def check_directory(directory):
'''
Check and make sure directory exists
Args:
* **directory** (:py:class:`str`): Folder/directory path name.
'''
if not os.path.exists(directory):
os.makedirs(directory)
# -------------------------
[docs]def run_serial_simulation(mcstat):
'''
Run serial MCMC simulation
Args:
* **mcstat** (:class:`MCMC.MCMC`): MCMC object.
Returns:
* **results** (:py:class:`dict`): Results dictionary for serial simulation.
'''
print('Processing: {}'.format(mcstat.simulation_options.savedir))
mcstat.run_simulation()
return mcstat.simulation_results
# -------------------------
[docs]def assign_number_of_cores(num_cores = 1):
'''
Assign number of cores to use in parallel process
Args:
* **num_cores** (:py:class:`int`): Number of cores designated by user.
Returns:
* **num_cores** (:py:class:`int`): Number of cores designated by user or maximum number of cores available on machine.
'''
tmp = cpu_count()
if num_cores > tmp:
return tmp
else:
return num_cores
# -------------------------
[docs]def load_parallel_simulation_results(savedir):
'''
Load results from parallel simulation directory json files.
Lists in json files are converted to numpy arrays.
Args:
* **savedir** (:py:class:`str`): String indicated path to parallel directory.
Returns:
* **pres** (:py:class:`list`): Each element of list is an MCMC result dictionary.
'''
pres = CP.read_in_parallel_json_results_files(savedir)
for ii, pr in enumerate(pres):
for key in pr.keys():
if isinstance(pres[ii][key], list):
pres[ii][key] = np.array(pres[ii][key])
return pres
# -------------------------
[docs]def check_for_restart_file(json_restart_file, chain_dir):
'''
Check if restart directory was specified.
Args:
* **json_restart_file** (:py:class:`str`): String specifying path to results directory.
* **chain_dir** (:py:class:`str`): String specifying which chain is being restarted.
'''
if json_restart_file is not None:
json_restart_file = str('{}{}{}{}{}.json'.format(json_restart_file,os.sep,chain_dir,os.sep,chain_dir))
if os.path.exists(json_restart_file):
return json_restart_file
else:
return None
else:
return None