Source code for fitpy.fitpy

General data fitting facilities.

In order to interpret an EPR spectrum fitting the spectrum can be helpful.


from copy import copy, deepcopy
import collections
import datetime
import importlib

import matplotlib.pyplot as plt
import numpy as np
import pyDOE
import scipy.optimize
import yaml
import pywt

import spinpy.parameter_classes.parameters as prmt

[docs]class FitOpt: """Include all necessary fitting options attributes. The :class:`fitpy.fitpy.FitOpt` class concerns the fitting options. Attributes ---------- thetaoff : :class:`list` List containing the offset angles in degree with respect to theta for every fitted dataset. weight : :class:`list` List containing the weighting of the datasets. maxiter : :class:`int` Number of maximum iterations per sampling point. Default is 100. nLHS : :class:`int` Number of Latin hypercube samplings (LHS). Default is 10. algorithm : :class:`str` Algorithm used to fit the spectrum. Possible values: ``Nelder-Mead``, ``TRF`` Default is Nelder-Mead. outputfilename : :class:`str`, optional Filename of the output file. normalise : :class:`str`, optional Specifies whether the area or the maximum is to be normalised. Default is area. Possible values: ``area``, ``maximum`` """ def __init__(self): # public properties self.thetaoff = None self.weight = None self.maxiter = 100 self.nLHS = 10 self.algorithm = 'Nelder-Mead' self.outputfilename = str() self.normalise = str()
[docs]class FitPy: """ General fitting facilities. Attributes ---------- spc: :class:`numpy.ndarray` y_data xdata: :class:`numpy.ndarray` x_data sys : :class:`list` List of spin system definitions exp : {:class:`list`, :obj:`fitpy.fitpy.Exp`} Either a list of objects of the :class:`fitpy.fitpy.Exp` or only an object of the :class:`fitpy.fitpy.Exp` depending on whether one or several datasets will be fitted. opt : :obj:`fitpy.fitpy.Opt` All necessary simulation options. vary : :class:`list` List with whatever fit_opt : :obj:`fitpy.fitpy.FitOpt` all necessary fitting options fitting_routine : :class:`str` Most probably the name of the simulation routine to be used Therefore, it seems to be a misnomer... result Whatever type, but somehow the result of the fitting process... Parameters ---------- :param y_data: :class:`numpy.ndarray` Array containing the y data. :param sys: :class:`list` List consisting of one or more objects of the :class:`fitpy.fitpy.Sys` class depending on the number of species to be fitted. :param vary: :class:`list` List consisting of one or more dictionaries with the fitting parameters to vary depending on the number of species to be fitted. :param exp: {:class:`list`, :obj:`fitpy.fitpy.Exp`} Either a list of objects of the :class:`fitpy.fitpy.Exp` or only an object of the :class:`fitpy.fitpy.Exp` depending on whether one or several datasets will be fitted. :param opt: :obj:`fitpy.fitpy.Opt` Object of the :class:`fitpy.fitpy.Opt` class containing all necessary simulation options. :param fit_opt: :obj:`fitpy.fitpy.FitOpt` Object of the :class:`fitpy.fitpy.FitOpt` class containing all necessary fitting options. :param input_dict: :class:`dict` Dictionary containing all input parameters. """ def __init__(self, y_data=None, sys=None, vary=None, exp=None, opt=None, fit_opt=FitOpt(), fitting_routine=None, input_dict=None): # public properties self.spc = y_data self.xdata = None self.sys = sys self.exp = exp self.opt = opt self.vary = vary self.fit_opt = fit_opt self.fitting_routine = fitting_routine self.result = None # protected properties self._fig = plt.figure() self._axes = list() self._input_dict = input_dict self._n_datasets = int() self._exp_points = np.zeros(0) self._exp_range = np.zeros((0, 0)) self._exp_mwfreq = np.zeros(0) self._sys_tmp = None self._opt_tmp = None self._initvec = np.zeros(0) self._lower_bound = None self._upper_bound = None self._boundaries = None self._lhs = None self._fitting_routine_object = None self._initvec_tmp = None self._least_square_sum = None self._args2pass = None self._best_least_square_sum = None self._best_result = None self._best_fit = None self._result = None self._now = None
[docs] def fit(self): """Perform the actual fitting.""" self._check_vary_values() self._get_current_date() self._infer_number_of_datasets() self._get_fitting_routine() self._create_zero_arrays() self._calculate_auto_weights() self._set_weights() self._concatenate() self._make_copy() self._create_initvec() self._create_boundaries() self._init_lhs() self._collect_args2pass() self._iterate_fit() self._normalise_populations() self._split_results_vector() self._make_output_file()
[docs] def _split_results_vector(self): if self._n_datasets > 1: split = list() value = 0 for i in self._exp_points[0:-1]: value = value + i split.append(int(value)) self.result[1] = \ np.split(self.result[1], split)
[docs] def _infer_number_of_datasets(self): if isinstance(self.spc, tuple): self._n_datasets = len(self.spc) else: self._n_datasets = 1 self.spc.reshape((-1, 1)) print("\nNumber of dataset(s): " + str(self._n_datasets))
[docs] def _check_vary_values(self): for i in range(len(self.vary)): for key in self.vary[i].keys(): if hasattr(self.sys[i], key): if self.vary[i][key] > getattr(self.sys[i], key): raise ValueError( 'Vary value of ' + key + ' is too large.')
[docs] def _create_zero_arrays(self): self._exp_points = np.zeros(self._n_datasets) self._exp_range = np.zeros((self._n_datasets, 2)) self._exp_mwfreq = np.zeros(self._n_datasets)
[docs] def _calculate_auto_weights(self): if self.fit_opt.weight == 'auto': if self._n_datasets > 1: self.fit_opt.weight = np.zeros(self._n_datasets) for i in range(self._n_datasets): (_, detail_coeff) = pywt.dwt(self.spc[i][:], 'coif4') weight = 1 / (np.std(detail_coeff) * len(self.spc[i][:])) self.fit_opt.weight[i] = weight else: self.fit_opt.weight = np.ones(self._n_datasets)
[docs] def _set_weights(self): if not all(self.fit_opt.weight): self.fit_opt.weight = np.ones(self._n_datasets)
[docs] def _concatenate(self): if self._n_datasets > 1: spec = np.zeros(0) xdata = np.zeros(0) for i in range(self._n_datasets): self._exp_points[i] = len(self.spc[i][:]) self._exp_range[i, :] = self.exp[i].field_range self._exp_mwfreq[i] = self.exp[i].microwave_frequency spec = np.append(spec, self.fit_opt.weight[i] * np.asarray(self._normalise( self.spc[i], self.fit_opt.normalise))) xdata = \ np.append(xdata, np.linspace(self._exp_range[i, 0], self._exp_range[i, 1], int(self._exp_points[i]))) self.spc = spec self.xdata = xdata else: self._exp_points = np.asarray([len(self.spc), ]) self._exp_range = [self.exp.field_range, ] self._exp_mwfreq = [self.exp.microwave_frequency, ] self.spc = self._normalise(self.spc, self.fit_opt.normalise) self.xdata = np.linspace(self._exp_range[0][0], self._exp_range[0][1], int(self._exp_points))
[docs] @staticmethod def _normalise(data, norm='area'): if norm == "area": normalised = data / sum(abs(data)) elif norm == "maximum": normalised = data / max(abs(data)) else: normalised = data / sum(abs(data)) return normalised
[docs] def _make_copy(self): self._sys_tmp = copy(self.sys) self._opt_tmp = copy(self.opt)
[docs] def _create_initvec(self): for i in range(len(self.vary)): for key in self.vary[i].keys(): if hasattr(self.sys[i], key): self._initvec = \ np.append(self._initvec, np.asarray(getattr(self.sys[i], key))) if hasattr(self.opt, key): self._initvec = np.append(self._initvec, getattr(self.opt, key))
[docs] def _create_boundaries(self): amplitude = np.zeros(0) for i in range(len(self.vary)): for key in self.vary[i].keys(): amplitude = np.append(amplitude, self.vary[i][key]) self._lower_bound = self._initvec - amplitude self._upper_bound = self._initvec + amplitude self._boundaries = np.array([self._lower_bound, self._upper_bound])
[docs] def _init_lhs(self): self._lhs = pyDOE.lhs(len(self._initvec), samples=self.fit_opt.nLHS) print("\nNumber of LHS samplings: " + str(self.fit_opt.nLHS))
[docs] def _collect_args2pass(self): args2pass = \ collections.namedtuple('args2pass', ['n_datasets', 'xdata', 'spc', 'exp_points', 'exp_range', 'exp_mwfreq', 'sys_tmp', 'opt_tmp', 'vary', 'lower_bound', 'upper_bound', 'fit_opt', 'fitting_routine_object']) self._args2pass = args2pass(self._n_datasets, self.xdata, self.spc, self._exp_points, self._exp_range, self._exp_mwfreq, self._sys_tmp, self._opt_tmp, self.vary, self._lower_bound, self._upper_bound, self.fit_opt, self._fitting_routine_object)
[docs] def _iterate_fit(self): for fit in range(self.fit_opt.nLHS + 1): if fit == 0: self._initvec_tmp = self._initvec else: self._create_initvec_tmp(fit) self._fit_spc() self._least_square_sum = \ self._objective_function(self._result, *self._args2pass) if fit == 0: self._set_parameters() else: if self._least_square_sum < self._best_least_square_sum: self._set_parameters() result = self._kernel_function(self._args2pass, *self._result) fit_result = result[-1] r = 0 for l in range(0, self._n_datasets): try: point = int(self._exp_points[l]) except: point = int(self._exp_points) if fit == 0: self._axes.append( self._fig.add_subplot(self._n_datasets, 1, l + 1)) else: self._axes[l].clear() self._axes[l].plot(self.xdata[r:point + r], self.spc[r:point + r]) self._axes[l].plot(self.xdata[r:point + r], fit_result[r:point + r], 'r') self._axes[l].plot(self.xdata[r:point + r], self._best_fit[r:point + r], 'g') diff = (max(self.spc[r:point + r]) - min( self.spc[r:point + r])) / 10 self._axes[l].set_ylim( [min(self.spc[r:point + r]) - diff, max(self.spc[r:point + r]) + diff]) self._axes[l].set_xlim( [min(self.xdata[r:point + r]), max(self.xdata[r:point + r])]) r += int(point) self._axes[self._n_datasets - 1].set_xlabel( r'$magnetic\ field\, / \, \rm{B}_0$') self._axes[self._n_datasets - 1].set_ylabel( r'$Normalised\ Intensity$') plt.ylabel('Intensity') plt.xlabel('$B_0$ / mT') plt.pause(0.2) print("\n\nFinal least-squares: " + str(self._best_least_square_sum)) self._kernel_function(self._args2pass, *self._best_result) self.sys, self.opt = \ self._vector_to_class(self.sys, self.opt, self.vary, self._best_result, self._lower_bound, self._upper_bound)
[docs] def _create_initvec_tmp(self, fit): self._initvec_tmp = np.zeros(len(self._initvec)) for k in range(len(self._initvec)): self._initvec_tmp[k] = self._lower_bound[k] + 2 * \ self._lhs[fit - 1, k] * \ (self._initvec[k] - self._lower_bound[k])
[docs] def _fit_spc(self): if self.fit_opt.algorithm == 'TRF': fit_parameter2, conv_info = \ scipy.optimize.curve_fit( self._kernel_function_interface, self._args2pass, self.spc, self._initvec_tmp, method='trf', bounds=self._boundaries, verbose=1, max_nfev=800, ftol=1e-5) self._result = fit_parameter2 else: fit_parameter = \ scipy.optimize.minimize( self._objective_function, self._initvec_tmp, args=self._args2pass, method=self.fit_opt.algorithm, tol=1e-5, options={'maxiter': self.fit_opt.maxiter, 'disp': True}) self._result = fit_parameter.x for t in range(len(self._result)): if self._result[t] > self._upper_bound[t]: self._result[t] = self._upper_bound[t] if self._result[t] < self._lower_bound[t]: self._result[t] = self._lower_bound[t]
[docs] @staticmethod def _objective_function(result, dim, xdata, spc, exp_points, exp_range, exp_mwfreq, sys_tmp, opt_tmp, vary, lower_bound, upper_bound, fit_opt, fitting_routine_object): args2pass = \ collections.namedtuple('args2pass', ['n_datasets', 'xdata', 'spc', 'exp_points', 'exp_range', 'exp_mwfreq', 'sys_tmp', 'opt_tmp', 'vary', 'lower_bound', 'upper_bound', 'fit_opt', 'fitting_routine_object']) argspassed = args2pass(dim, xdata, spc, exp_points, exp_range, exp_mwfreq, sys_tmp, opt_tmp, vary, lower_bound, upper_bound, fit_opt, fitting_routine_object) least_square_sum = sum(np.power(( FitPy._kernel_function(argspassed, *result)[-1] - spc), 2)) print("Total least-squares: " + str(least_square_sum)) return least_square_sum
[docs] def _set_parameters(self): self._best_least_square_sum = self._least_square_sum self._best_result = self._result self.result = \ self._kernel_function(self._args2pass, *self._best_result) self._best_fit = self.result[-1]
[docs] @staticmethod def _kernel_function_interface(args, *res): args2pass = collections.namedtuple('args', ['n_datasets', 'xdata', 'spc', 'exp_points', 'exp_range', 'exp_mwfreq', 'sys_tmp', 'opt_tmp', 'vary', 'lower_bound', 'upper_bound', 'fit_opt', 'fitting_routine_object']) argspassed = args2pass(args[0], args[1], args[2], args[3], args[4], args[5], args[6], args[7], args[8], args[9], args[10], args[11], args[12], ) result = FitPy._kernel_function(argspassed, *res) spc = result[-1] return spc
[docs] @staticmethod def _kernel_function(args, *res): result_as_array = np.asarray(res) opt = copy(args.opt_tmp) sys_tmp, opt = \ FitPy._vector_to_class(args.sys_tmp, opt, args.vary, result_as_array, args.lower_bound, args.upper_bound) try: number_of_datasets = len(args.exp_points) except: number_of_datasets = 1 exp = prmt.ExperimentalParameters() orig_opt = deepcopy(opt) for i in range(number_of_datasets): if args.fit_opt.thetaoff: opt.oritheta[0] = \ orig_opt.oritheta[0] + args.fit_opt.thetaoff[i] exp.field_range = args.exp_range[i] opt.points = args.exp_points[i] exp.microwave_frequency = args.exp_mwfreq[i] args.fitting_routine_object.spin_system = sys_tmp args.fitting_routine_object.experimental_parameters = exp args.fitting_routine_object.optional_parameters = opt args.fitting_routine_object.simulate() result = args.fitting_routine_object.result # intensity data is always the last part of the result spectrum = result[-1] if i == 0: spectrum = FitPy._normalise(spectrum, args.fit_opt.normalise) result_tmp = result spectrum = args.fit_opt.weight[i] * spectrum result_tmp[-1][:] = spectrum else: spectrum = FitPy._normalise(spectrum, args.fit_opt.normalise) spectrum = args.fit_opt.weight[i] * spectrum result_tmp = list(result_tmp) result_tmp[-1] = np.append(result_tmp[-1], spectrum) return result_tmp
[docs] def _get_fitting_routine(self): class_name_parts = self.fitting_routine.split(".") class_name = class_name_parts[-1] module_name = '.'.join(class_name_parts[0:-1]) module = importlib.import_module(module_name) self._fitting_routine_object = getattr(module, class_name)()
[docs] @staticmethod def _vector_to_class(sys, opt, vary, result_as_array, lower_bound, upper_bound): vec_idx = 0 for i in range(len(vary)): for key in vary[i].keys(): if hasattr(sys[i], key): tmp = getattr(sys[i], key) vec_idx_start = vec_idx for k in range(0, len(tmp)): if result_as_array[vec_idx] < lower_bound[vec_idx]: result_as_array[vec_idx] = lower_bound[vec_idx] if result_as_array[vec_idx] > upper_bound[vec_idx]: result_as_array[vec_idx] = upper_bound[vec_idx] vec_idx += 1 setattr(sys[i], key, result_as_array[vec_idx_start:vec_idx]) if hasattr(opt, key): vec_idx_start = vec_idx tmp2 = getattr(opt, key) for k in range(0, len(tmp2)): if result_as_array[vec_idx] < lower_bound[vec_idx]: result_as_array[vec_idx] = lower_bound[vec_idx] if result_as_array[vec_idx] > upper_bound[vec_idx]: result_as_array[vec_idx] = upper_bound[vec_idx] vec_idx += 1 setattr(opt, key, result_as_array[vec_idx_start:vec_idx]) return sys, opt
[docs] def _normalise_populations(self): for sys in self.sys: sys.populations = \ np.asarray(sys.populations) / sum(sys.populations)
[docs] @staticmethod def _create_result_dict(sys, opt, vary): result_list = list() for i in range(len(vary)): output_dict = collections.OrderedDict() for key in vary[i].keys(): if hasattr(sys[i], key): output_dict[key] = getattr(sys[i], key) if hasattr(opt, key): output_dict[key] = getattr(opt, key) result_list.append(output_dict) return result_list
[docs] def _make_output_file(self): result_dict = self._create_result_dict(self.sys, self.opt, self.vary) output_dict = deepcopy(self._input_dict) for i in range(len(result_dict)): for key in result_dict[i].keys(): if key in output_dict['fitting']['Sys'][i]: output_dict['fitting']['Sys'][i][key] = \ result_dict[i][key].tolist() if key in output_dict['fitting']['Opt']: output_dict['fitting']['Opt'][key] = \ result_dict[i][key].tolist() if 'input parameter' in self._input_dict: del self._input_dict['input parameter'] output_dict['input parameter'] = self._input_dict if hasattr(self.fit_opt, 'outputfilename'): try: name = str(self.fit_opt.outputfilename + self._now + ".yaml") except: name = 'fit_results' + self._now + '.yaml' else: name = 'fit_results' + self._now + '.yaml' with open(name, 'w') as file: yaml.dump(output_dict, file, default_flow_style=False, explicit_start=True)
[docs] def _get_current_date(self): now = self._now = now.strftime('_%Y_%m_%d_%H_%M_%S')
if __name__ == '__main__': obj = FitPy(y_data=np.zeros(100))