from solver_base import *
from mean_field_solver import *
from catmap import string2symbols
try:
from scipy.optimize import fmin_powell as fmin
except ImportError:
fmin = None
from catmap.functions import numerical_jacobian
import math
from string import Template
import random
import re
[docs]class SteadyStateSolver(MeanFieldSolver):
[docs] def __init__(self,reaction_model=ReactionModel()):
MeanFieldSolver.__init__(self,reaction_model)
defaults = dict(
max_rootfinding_iterations = 50,
internally_constrain_coverages = True,
residual_threshold = 0.9,
analytical_jacobian = True,
optimize_analytical_expressions = False,
)
self._rxm.update(defaults)
self._rate_constant_memoize = {}
self._steady_state_memoize = {}
self._required = {'max_rootfinding_iterations':int,
'internally_constrain_coverages':None,
'residual_threshold':float,
'analytical_jacobian':bool,
}
self._log_strings = {'rootfinding_fail':
"stagnated or diverging (residual = ${resid})",
'rootfinding_maxiter':
"exceeded maximum iterations (residual = ${resid})",
'rootfinding_cancel':
"stationary point or singular jacobian (residual = ${resid})",
'rootfinding_success':
"found solution at point ${pt}",
'rootfinding_status':
"converging (residual = ${resid})"}
[docs] def get_rate_constants(self, rxn_parameters, coverages=None):
"""Return rate constants for given sequence of reaction parameters and coverages.
If no coverages are supplied assume limit of zero coverage
which corresponds to no interaction regardless of which interaction
model is used.
If no coverages are supplied assume limit of zero coverage which corresponds to no interaction regardless of which interaction model is used.
:param rxn_parameters: Sequence of reaction parameters.
:type rxn_parameters: [float]
:param coverages: Sequence of coverages.
:type coverages: [float]
"""
if coverages is None:
coverages = [0.] * len(self.adsorbate_names)
if self.adsorbate_interaction_model not in [None,'ideal']:
memo = tuple(rxn_parameters) + tuple(coverages) + tuple(self._gas_energies)
else:
memo = tuple(rxn_parameters) + tuple(self._gas_energies)
if memo in self._rate_constant_memoize:
kf, kr = self._rate_constant_memoize[memo]
self._kf = kf
self._kr = kr
return kf+kr
kfs, krs, dkfs, dkrs = self.rate_constants(rxn_parameters,coverages,
self._gas_energies,self._site_energies,
self.temperature,self.interaction_response_function,
self._mpfloat,self._matrix,self._math.exp)
self._kf = kfs
self._kr = krs
self._rate_constant_memoize[memo] = [kfs,krs]
return kfs + krs
[docs] def get_coverage(self,rxn_parameters,c0=None,findrootArgs={}):
"""Return coverages for given reaction parameters and coverage constraints.
:param rxn_parameters: Sequence of rxn_parameters
:type rxn_parameters: [float]
:param c0: Coverage constraints.
:type c0: **TODO**
:param findrootArgs: *deprecated*
"""
if self.adsorbate_interaction_model in [None,'ideal'] or self.interaction_strength == 0:
return self.get_ideal_coverages(rxn_parameters,c0,True,findrootArgs)
else:
return self.get_interacting_coverages(rxn_parameters,c0,1.0,findrootArgs)
[docs] def get_steady_state_coverage(self,rxn_parameters,steady_state_fn, jacobian_fn,
c0=None,findrootArgs={}):
"""Return steady-state coverages using catmap.solvers.solver_base.NewtonRoot .
:param rxn_parameters: Sequence of reaction parameters.
:type rxn_parameters: [float]
:param steady_state_fn: **TODO**
:type steady_state_fn: **TODO**
:param jacobian_fn: **TODO**
:type jacobian_fn: **TODO**
:param c0: Coverage constraints
:type c0: **TODO**
:param findrootArgs: *deprecated*
"""
n_tot = len(self.adsorbate_names) + len(self.transition_state_names)
if c0 is None:
raise ValueError("No initial coverage supplied. Mapper must supply initial guess")
c0 = self.constrain_coverages(c0)
self.steady_state_function = steady_state_fn
self.steady_state_jacobian = jacobian_fn
self._coverage = [self._mpfloat(ci) for ci in c0]
self._rxn_parameters = rxn_parameters
#Enter root finding algorithm
f = steady_state_fn
f_resid = lambda x: self.get_residual(x,True,False)
norm = self._math.infnorm
if self.internally_constrain_coverages == True:
constraint = self.constrain_coverages
else:
constraint = lambda x: x
solver = NewtonRoot
if f_resid(c0) <= self.tolerance:
self._coverage = c0
return c0
solver_kwargs = dict(
norm = norm,
verbose = self.verbose,
constraint = constraint,
)
if self.analytical_jacobian == True:
solver_kwargs['J'] = jacobian_fn
else:
def J(x):
return numerical_jacobian(f,x,self._matrix)
solver_kwargs['J'] = J
iterations = solver(f,c0, self._matrix, self._mpfloat,
self._Axb_solver, **solver_kwargs)
old_error = 1e99
coverages = None
maxiter = self.max_rootfinding_iterations
iterations.maxiter = maxiter
i = 0
x = c0
for x,error in iterations:
self.log('rootfinding_status',
n_iter=i,
resid=float(error),
priority=1)
i+=1
if error < self.tolerance:
if f_resid(x) < self.tolerance:
coverages = self.constrain_coverages(x)
self.log('rootfinding_success',
n_iter = i,
priority = 1)
break
else:
x = self.constrain_coverages(x)
error = f_resid(x)
elif error >= self.residual_threshold*old_error:
self.log('rootfinding_fail',
n_iter=i,
resid = float(f_resid(x)),
)
raise ValueError('Stagnated or diverging residual (resid='+\
str(float(f_resid(x)))+')')
old_error = error
if i >= maxiter:
self.log('rootfinding_maxiter',
n_iter=i,
resid = float(f_resid(x)))
raise ValueError('Out of iterations (resid='+\
str(float(f_resid(x)))+')')
self._coverage = x
self._error = error
if coverages:
self._coverage = [c for c in coverages]
return [c for c in coverages]
else:
if f_resid(x) < self.tolerance:
coverages = self.constrain_coverages(x)
else:
self.log('rootfinding_cancel',
n_iter=i,
resid=float(f_resid(x)))
raise ValueError('Solver cancellation. (resid='+\
str(float(f_resid(x)))+')')
[docs] def get_ideal_coverages(self,rxn_parameters,c0=None,
refresh_rate_constants=True,findrootArgs={}):
"""Return
:TODO:
"""
if refresh_rate_constants:
self.get_rate_constants(rxn_parameters,[0]*len(self.adsorbate_names))
return self.get_steady_state_coverage(rxn_parameters,self.ideal_steady_state_function,
self.ideal_steady_state_jacobian,c0,findrootArgs)
[docs] def get_interacting_coverages(self,rxn_parameters,c0=None,
interaction_strength=1.0,findrootArgs={}):
""":TODO:
"""
#weight interaction parameters - useful for using non-interacting systems as guess
n_tot = len(self.adsorbate_names +self.transition_state_names)
rxn_parameters = list(rxn_parameters)
if interaction_strength is not None:
if len(rxn_parameters) == n_tot+n_tot**2:
rxn_parameters = rxn_parameters[:n_tot] + [pi*interaction_strength for pi in rxn_parameters[-n_tot**2:]]
elif len(rxn_parameters) == n_tot and interaction_strength == 0:
pass
else:
raise ValueError('System does not have enough parameters for interactions')
cvgs = self.get_steady_state_coverage(rxn_parameters,self.interacting_steady_state_function,
self.interacting_steady_state_jacobian,c0,findrootArgs)
return cvgs
[docs] def bisect_interaction_strength(self,rxn_parameters,valid_strength,valid_coverages,target_strength,max_bisections,findrootArgs={}):
"""
:TODO:
"""
n_tot = len(self.adsorbate_names+self.transition_state_names)
bisect_iter = 0
n_bisects = 0
while n_bisects <= max_bisections:
new_strength = valid_strength + (target_strength-valid_strength)/(2**n_bisects)
new_params = rxn_parameters[:n_tot] + [pi*new_strength for pi in rxn_parameters[-n_tot**2:]]
try:
valid_coverages = self.get_steady_state_coverage(new_params,self.interacting_steady_state_function,
self.interacting_steady_state_jacobian,valid_coverages,findrootArgs)
print 'Successfully bisected with strength: ',new_strength
valid_strength = new_strength
n_bisects = 0
if valid_strength > 0.95*target_strength:
return valid_coverages
except ValueError:
print 'Failed to bisect with strength: ',new_strength
n_bisects += 1
return valid_coverages
[docs] def get_initial_coverage(self,rxn_parameters):
"""Return coverages based on probabilties according to the Boltzmann distribution
and the adsorption energies for a given sequence of rxn_parameters.
:param rxn_parameters: Sequence of reaction parameters
:type rxn_parameter: [float]
"""
energy_dict = {}
for ads,E in zip(self.adsorbate_names,rxn_parameters):
energy_dict[ads] = E
for gas,E in zip(self.gas_names,self._gas_energies):
energy_dict[gas] = E
if not self.atomic_reservoir_dict:
#check all possibilities, return min residual
min_resid = 1e99
boltz_cvgs = [[0]*len(self.adsorbate_names)] #include empty coverage as possible guess
for ref_dict in self.atomic_reservoir_list:
self.atomic_reservoir_dict = ref_dict
cvgs = self.thermodynamics.boltzmann_coverages(energy_dict)
boltz_cvgs.append(cvgs)
else:
boltz_cvgs = [self.thermodynamics.boltzmann_coverages(energy_dict)]
return boltz_cvgs
[docs] def get_residual(self, coverages,
validate_coverages = True, refresh_rate_constants = True):
"""
:TODO:
"""
if validate_coverages == True:
coverages = self.constrain_coverages(coverages)
self._coverage = coverages
if refresh_rate_constants == True:
self._rxn_parameters = self.scaler.get_rxn_parameters(
self._descriptors)
self.get_rate_constants(self._rxn_parameters,coverages)
# cvg_rates = self.steady_state_function(None)
cvg_rates = self.steady_state_function(coverages)
residual = max([abs(r) for r in cvg_rates])
return residual
[docs] def interacting_steady_state_function(self, coverages):
"""
:TODO:
"""
memo = tuple(self._rxn_parameters) + tuple(self._gas_energies) + \
tuple(self._site_energies) + tuple(coverages) + tuple(self.gas_pressures+[self.temperature])
if memo in self._steady_state_memoize:
return self._steady_state_memoize[memo]
else:
c = self.interacting_mean_field_steady_state(
self._rxn_parameters,coverages,self.gas_pressures,
self._gas_energies, self._site_energies,
self.temperature,self.interaction_response_function,
self._mpfloat, self._matrix,self._math.exp)
self._steady_state_memoize[memo] = c
return c
[docs] def ideal_steady_state_function(self,coverages):
"""
:TODO:
"""
memo = tuple(self._kf) + tuple(self._kr) + tuple(coverages) + tuple(self.gas_pressures+[self.temperature])
if memo in self._steady_state_memoize:
return self._steady_state_memoize[memo]
else:
c = self.ideal_mean_field_steady_state(
self._kf,self._kr,coverages,self.gas_pressures,
self._mpfloat, self._matrix)
self._steady_state_memoize[memo] = c
return c
[docs] def interacting_steady_state_jacobian(self,coverages):
"""
:TODO:
"""
J = self.interacting_mean_field_jacobian(
self._rxn_parameters,coverages,self.gas_pressures,
self._gas_energies,self._site_energies,
self.temperature,self.interaction_response_function,
self._mpfloat, self._matrix,self._math.exp)
return J
[docs] def ideal_steady_state_jacobian(self,coverages):
"""
:TODO:
"""
J = self.ideal_mean_field_jacobian(
self._kf,self._kr,coverages,self.gas_pressures,
self._mpfloat, self._matrix)
return J
[docs] def constrain_coverages(self,cvgs):
"""
:TODO:
"""
min_cvg = self._mpfloat(10**(-(self.decimal_precision)))
cvgs = self.constrain_coverage_function(list(cvgs),self._mpfloat,min_cvg)
return cvgs
[docs] def compile(self):
"""
:TODO:
"""
if not self._compiled:
intermediate_subs = {}
self._function_substitutions.update(
self.substitutions_dict())
#make 2 versions of rate-constants function
energy_expressions_noderivs = '\n '.join(self.reaction_energy_equations(adsorbate_interactions = False))
energy_expressions_derivs = '\n '.join(self.reaction_energy_equations(adsorbate_interactions = True))
templates['rate_constants_no_derivatives'] = Template(templates['rate_constants']).safe_substitute({'elementary_step_energetics':energy_expressions_noderivs})
templates['rate_constants_with_derivatives'] = Template(templates['rate_constants']).safe_substitute({'elementary_step_energetics':energy_expressions_derivs})
#make steady-state expressions
ss_eqs = self.rate_equations()
self._function_substitutions['steady_state_expressions'] = '\n '.join(ss_eqs)
#make jacobian expressions
jac_eqs = self.jacobian_equations(adsorbate_interactions=True)
self._function_substitutions['jacobian_expressions'] = '\n '.join(jac_eqs)
jac_eqs_nd = self.jacobian_equations(adsorbate_interactions=False)
self._function_substitutions['jacobian_expressions_no_derivatives'] = '\n '.join(jac_eqs_nd)
def indent_string(string,levels):
lines = string.split('\n')
indention = '\n'+' '*levels
return indention.join(lines)
#pre-substitute the interaction function into rate_constants (needed because its nested 2 levels)
indented = indent_string(templates[self.adsorbate_interaction_model+'_interaction_function'],1)
indented = Template(indented).substitute(self._function_substitutions)
self._function_substitutions['interaction_function'] = indented
for f in ['rate_constants_no_derivatives','rate_constants_with_derivatives']:
templates[f] = Template(templates[f]).substitute(self._function_substitutions)
#indent rate_constant functions because they are nested 1 level
indented_funcs = [
['rate_constants_no_derivatives',''],
['rate_constants_with_derivatives',''],
]
for func,tempname in indented_funcs:
if not tempname:
tempname = func
self._function_substitutions[func] = indent_string(templates[tempname],1)
compiled_funcs = [
['rate_constants','rate_constants_with_derivatives'],
['interaction_function',self.adsorbate_interaction_model+'_interaction_function'],
['interacting_mean_field_steady_state',''],
['ideal_mean_field_steady_state',''],
['interacting_mean_field_jacobian',''],
['ideal_mean_field_jacobian',''],
['constrain_coverage_function','constrain_coverages'],
['elementary_rates',''],
]
for func,tempname in compiled_funcs:
if not tempname:
tempname = func
self._function_templates[func] = templates[tempname]
self.generate_static_functions()
if self.optimize_analytical_expressions:
test_theta = [self._mpfloat(random.random()) for a in self.adsorbate_names]
test_params = [self._mpfloat(random.random()) for a in self.adsorbate_names+self.transition_state_names]
test_kfs = [self._mpfloat(random.random()) for a in self.elementary_rxns]
test_krs = [self._mpfloat(random.random()) for a in self.elementary_rxns]
test_p = [self._mpfloat(random.random()) for a in self.gas_names]
test_gas_E = [self._mpfloat(random.random()) for a in self.gas_names]
test_site_E = [self._mpfloat(random.random()) for a in self.site_names]
test_T = 500
test_smearing = 0.02
arg_dict = {
'interacting_mean_field_steady_state':[
test_params,test_theta,test_p,test_gas_E,test_site_E,
test_T,self.interaction_response_function,
self._mpfloat,self._matrix,self._math.exp],
'ideal_mean_field_steady_state':[
test_kfs, test_krs, test_theta, test_p,
self._mpfloat, self._matrix],
'interacting_mean_field_jacobian':[
test_params,test_theta,test_p,test_gas_E,test_site_E,
test_T,self.interaction_response_function,
self._mpfloat, self._matrix, self._math.exp],
'ideal_mean_field_jacobian':[
test_kfs, test_krs, test_theta, test_p,
self._mpfloat, self._matrix]
}
for func in arg_dict:
args= arg_dict[func]
old_str = self._function_strings[func]
insertion_line = None
for i,li in enumerate(old_str.split('\n')):
if ' s['+str(len(self.site_names)-1)+']' in li:
insertion_line = i+1
if insertion_line is None:
raise ValueError('Could not find line for inserting optimizations')
#optimize string
func_string = self.optimize_analytical_function(func,old_str,
insertion_line,1,*args)
#re-compile optimized function
self._function_strings[func] = func_string
locs = {}
exec func_string in globals(), locs
setattr(self,func,locs[func])
self._compiled = True
if self.adsorbate_interaction_model not in [None,'ideal']:
self.steady_state_function = self.interacting_steady_state_function
else:
self.steady_state_function = self.ideal_steady_state_function
[docs] def optimize_analytical_function(self,func_name,func_string,insertion_line,indention_level,*test_args):
"""Replace some common multiplication terms to speed up functions.
"""
locs = {}
exec func_string in globals(), locs
unoptimized = locs[func_name]
#replace common multiplications with substitution
mult_regex = '((?:(?:k(?:f|r)\[\d\]|theta\[\d\]|p\[\d\]|s\[\d\])\*?)+ +)'
opt_strs = []
match = re.findall(mult_regex,func_string)
matches = []
for m in match:
if m.count('*') and func_string.count(m) > 1:
matches.append((-1*m.count('*'),func_string.count(m),m))
matches = list(set(matches))
matches = sorted(matches)
for i,m in enumerate(matches):
opt_strs.append(m[-1].strip())
func_string = func_string.replace(m[-1].strip(),'subs['+str(i)+'] ')
sub_dict = {
'':r'( +1\*|\-1\*\-1\*| +0 +\+| +1\.0\*)+',
' + ':r'( +\- *\-1\*| +\+ *1\*| +\+ *\+ +)+',
' - ':r'( +\+ *\-1\*| +\- *1\*| +\+ *\- +| +0 +\-|\-1\*|\-1\*1\*)',
}
opt_str = '\n'+' '*indention_level+'subs = [0]*'+str(len(opt_strs))
for i,op in enumerate(opt_strs):
opt_str += '\n'+' '*indention_level+ 'subs['+str(i)+'] = '+op
func_lines = func_string.split('\n')
func_lines.insert(insertion_line, opt_str)
func_string = '\n'.join(func_lines)
mtch = 1
while mtch:
new_fn = func_string
mtch = None
for sub in sub_dict:
rgx = sub_dict[sub]
mtch = re.findall(rgx,func_string)
for m in list(set(mtch)):
func_string = func_string.replace(m,sub)
locs = {}
exec func_string in globals(), locs
optimized = locs[func_name]
delta = np.array((optimized(*test_args) - unoptimized(*test_args)).tolist()).max()
if delta > 10**(-(self.decimal_precision-1)):
print('Warning: Function optimization failed for '+\
func_name+'. Using unoptimized functions')
func_string = '\n'.join(func_lines)
return func_string
else:
return func_string