import numpy as np from importlib import import_module, reload from zfit_constants import * from lmfit import minimize, Parameters from scipy.stats import gmean # geometric mean import csv class CliInterface(): def __init__(self): self.checkBoxLogMag class DoModel: """ Encapsulate all operations for modeling. This class is instantiated only once, and the instance is used to interface to the remaining program and to perform the actual modeling. """ def __init__(self, cli=False): if cli is False: # Interface to the main program which instantiates this class self.amw = None self.ya = None self.ya_list = None self.range = None self.range_list = None self.print_results = None self.draw_formatted = None self.exc_handler = None else: # Interface to the main program which instantiates this class self.amw = CliInterface() self.ya = None self.ya_list = None self.range = None self.range_list = None self.print_results = None self.draw_formatted = None self.exc_handler = None # Local fitting functions ============================ def _fcn2min(self, params, w, Z, weight, **kwargs): """ This is the function to minimize. It is the difference between model and target (aka residuals) with modeling and penalty weights applied. :param params: :param w: radian frequency array :param Z: complex impedances corresponding to frequencies w :param weight: array of weights corresponding to frequencies w :param **kwargs: keyword arguments :return: must return array for leastsq method, optional for others. """ if self.amw.checkBoxLogMag.isChecked(): diff = np.log10( Z) - np.log10(self.model.model(w, params, **kwargs)) else: diff = Z - self.model.model(w, params, **kwargs) diff *= weight # Flatten complex impedance into re/im adjacent floats residuals = diff.view('double') if LIMITS == "zfit": # Get mean-square float of entire difference array for weight scaling mean_sq = (residuals**2).mean() # Append penalties to residuals bounded = np.hstack( (residuals, self._bound_penalties(params, mean_sq))) return bounded else: return residuals def _bound_penalties(self, params, weight): """ This function is only used when zfit_constants.LIMITS == 'zfit'. Return a list of numbers each of which increases rapidly when the min or max for a param is approached. This represents boundary penalties as a parameter goes out of bounds. :param params: :param weight: :return: a list of N elements where N is the total number of min or max bounds in the set of params. Each element is a number which increases rapidly when the min or max bound is approached. Append penalty elements if params are out of bounds. """ penalties = [] # Custom limiting is done here: # This is an exponent which controls the abruptness of penalty increase as # a min or max limit is approached: PENALTY_WALL = 6 full_penalty = 1e4 * weight for p in self.model.PARAMS: name = p['name'] val = params[name].value # max and min must be >0 or None. # Use min/max limits from model.PARAMS here and set them to None # for lmfit (except for diff evo method). if p['max'] == None: max_pen = 0 else: max_pen = full_penalty if val >= p['max'] else \ full_penalty * np.power(val/p['max'], PENALTY_WALL) if p['min'] == None: min_pen = 0 else: min_pen = full_penalty if val <= p['min'] else \ full_penalty * np.power(p['min']/val, PENALTY_WALL) penalty = np.maximum(np.abs(max_pen), np.abs(min_pen)) penalties.append(penalty) return penalties def _min_max_set(self, min_max, method, scaled_val): """ Set min or max for the minimization function being used, and whether limits are handled by lmfit or zfit. :param min_max: :param method: :param scaled_val: :return: """ if method == "differential_evolution": # Diff Evo requires finite min & max values return scaled_val if min_max == None else min_max elif LIMITS == "zfit": # lmfit doesn't do the limiting return None elif LIMITS == "lmfit": # lmfit gets spec'd limit return min_max def _prog_bar_update(self, x1, x2, x3, *x4, **x5): """ Update the progress bar in the main app. (Could look for abort here too.) :param xn: Dummy args to match actual sent by minimize() callback :return: nothing """ self.amw.prog_bar_tick += 1 if self.amw.prog_bar_tick >= 100: self.amw.prog_bar_tick = 0 next = (self.amw.progressBar.value() + 1) % 100 self.amw.progressBar.setValue(next) def _find_sf(self, model_list): """ Find scale factors for frequency (FSF) and impedance (ZSF). Ignore component if it doesn't begin with R, G, C, L, or M :param model_list: PARAM list from model script :return: FSF and ZSF """ R, G, C, L = [], [], [], [] for comp in model_list: type = comp['name'][0].upper() if type == 'R': R.append(comp['init']) elif type == 'G': G.append(comp['init']) elif type == 'C': C.append(comp['init']) elif type == 'L' or type == 'M': L.append(comp['init']) # Get zsf depending on whether Rs and/or Gs are present if R == []: if G == []: # No Rs or Gs zsf = 1.0 else: # Gs but no Rs zsf = 1.0 / gmean(G) else: if G == []: # Rs but no Gs zsf = gmean(R) else: # Both Rs and Gs. Use the geometric mean of the # preferred zsf for each Rzsf = gmean(R) Gzsf = 1.0 / gmean(G) zsf = gmean([Rzsf, Gzsf]) # Get fsf depending on whether Ls and/or Cs are present if L == []: if C == []: # No Ls or Cs fsf = 1.0 else: # No Ls, but Cs fsf = 1.0 / (gmean(C) * zsf) else: if C == []: # No Cs, but Ls fsf = zsf / gmean(L) else: # Both Ls and Cs. Use the geometric mean of the # preferred fsf for each Lfsf = zsf / gmean(L) Cfsf = 1.0 / (gmean(C) * zsf) fsf = gmean([Lfsf, Cfsf]) return fsf, zsf def _normalize(self, val, min, max, comp_type, fsf, zsf): # Scale val, min, and max by the appropriate factor depending # on its type and return the scaled values. Default scale is 1.0. scale = { 'R': 1.0/zsf, 'G': zsf, 'C': fsf*zsf, 'L': fsf/zsf, 'M': fsf/zsf } s = scale.get(comp_type, 1.0) min_ret = None if min is None else min * s max_ret = None if max is None else max * s return {'init': val*s, 'min': min_ret, 'max': max_ret} def _denormalize(self, val, comp_type, fsf, zsf): # Scale val by the appropriate factor depending on its type # and return the scaled value. Default scale is 1.0. scale = { 'R': zsf, 'G': 1.0/zsf, 'C': 1.0/(fsf*zsf), 'L': zsf/fsf, 'M': zsf/fsf } s = scale.get(comp_type, 1.0) return val * s def do_model(self): # Clear status and params label boxes self.amw.labelParams.setText("") self.amw.labelParams.repaint() self.amw.labelStatus.setText("Modeling...") self.amw.labelStatus.repaint() # Import the model script, or reload it if already imported self.model = import_module("Models." + self.amw.lineEditModel.text()) self.model = reload(self.model) # Clear any previous modeling, M and P axes for line in self.ya.ax[M].get_lines() + self.ya.ax[P].get_lines(): if line.get_label() == "modeledZPlot": line.remove() # Create local concatenated arrays for freq, mag, phase, and load. # Overwrites data in ya class m = np.array([]) p = np.array([]) f = np.array([]) l = np.array([]) for y, r in zip(self.ya_list, self.range_list): # Copy data from lists to working objects self.ya.data_unbundle(y) self.range.data_unbundle(r) # Append data to form full set for modeling m = np.append(m, self.ya.inputData[M]) p = np.append(p, np.radians(self.ya.inputData[P])) f = np.append(f, self.range.xa["Hz"]) l = np.append(l, self.range.load_array) # Radian frequency w = 2*np.pi*f # Use drawn curves if they exist if self.ya.drawnData[M] is not None: # Drawn data exists for magnitude, use it instead m = self.ya.drawnData[M] if self.ya.drawnData[P] is not None: # Drawn data exists for phase, use it instead p = np.radians(self.ya.drawnData[P]) if len(self.ya_list) > 1: # Multiple data segments. # Create null weighting array, same size as m but full of 1's weight = m.copy() weight.fill(1.0) else: weight = self.ya.drawnData[W] # Complex impedance target z = m*np.exp(1j*p) # Instantiate clean class for lmfit fitter params = Parameters() params.clear() # Init list of name/value tuples values = [] # Get selected fitting method method = METHODS[self.amw.comboBoxMethod.currentIndex()][1] if self.amw.checkBoxLocked.isChecked(): # Read last saved or edited params data (denormalized) with open(PARAM_FILE, mode='r', encoding='utf-8', newline='') as f: reader = csv.reader(f) next(f) # skip header line for line in reader: v = (line[0], float(line[1])) # Build a list of name/value tuples values.append(v) else: # Do actual modeling. # Make working copy of PARAMS list from model param_list = list(self.model.PARAMS) # Adjust min and max if necessary for p in param_list: p["min"] = self._min_max_set(p["min"], method, p["init"] / 1e2) p["max"] = self._min_max_set(p["max"], method, p["init"] * 1e2) if self.amw.do_norm_denorm: # Normalize component, frequency, and impedance values # Determine frequency and Z scaling factors from initial values fsf, zsf = self._find_sf(param_list) # Normalize each component value, min, and max for p in param_list: type = p["name"][0].upper() norm = self._normalize( p["init"], p["min"], p["max"], type, fsf, zsf) p["init"] = norm["init"] p["min"] = norm["min"] p["max"] = norm["max"] # Normalize frequency, target Z, and load w = w / fsf z = z / zsf l = l / zsf else: fsf, zsf = 1.0, 1.0 # Add modified params to lmfit Parameter class # .add converts min/max of None to -/+inf for p in param_list: params.add(p["name"], value=p["init"], vary=p["vary"], min=p["min"], max=p["max"]) # Perform weighted model optimization. # Errors will be caught and displayed by zfit_excepthook() in main window. kw_args = {"load": l, "fsf": fsf, "zsf": zsf} result = minimize(self._fcn2min, params, args=(w, z, weight), kws=kw_args, method=method, iter_cb=self._prog_bar_update) # Don't use params class after minimize -- some values are scrambled or changed. # Populate values[] with modeling results, denormalized if necessary for p in param_list: name = p["name"] val = result.params[name].value if self.amw.do_norm_denorm: comp_type = name[0] val = self._denormalize(val, comp_type, fsf, zsf) v = (name, val) values.append(v) if self.amw.do_norm_denorm: # Denormalize frequency, target Z, and load w = w * fsf z = z * zsf l = l * zsf # Write denormalized modeling results to file with open(PARAM_FILE, mode='w', encoding='utf-8') as f: print('name, value', file=f) for p in values: print('{}, {}'.format(p[0], p[1]), file=f) self.amw.progressBar.setValue(0) # Convert list of tuples to a single dict for the model, to be compatible # with the way minimize() uses the model values_d = {p[0]: p[1] for p in values} # Get complex impedance of model using modeled or locked parameters # Use denormalized values kw_args = {"load": l, "fsf": 1.0, "zsf": 1.0} zfit = self.model.model(w, values_d, **kw_args) # Break into magnitude and degree phase magfit = np.abs(zfit) phasefit = np.angle(zfit, deg=True) # Split into segments as required, add to data list a, b, i = 0, 0, 0 for y in self.ya_list: self.ya.data_unbundle(y) seg_length = len(self.ya.inputData[M]) b += seg_length self.ya.modeledData[M] = magfit[a:b] self.ya.modeledData[P] = phasefit[a:b] a += seg_length self.ya_list[i] = self.ya.data_bundle() i += 1 # Refresh working classes with currently indexed segment data self.ya.data_unbundle(self.ya_list[self.range.segment_index]) self.range.data_unbundle(self.range_list[self.range.segment_index]) # Add to plot self.ya.ax[M].plot(self.range.xa["Hz"], self.ya.modeledData[M], self.ya.modeledLinePlot[M], ls=self.ya.modeledLineStyle[M], lw=1, label="modeledZPlot") self.ya.ax[P].plot(self.range.xa["Hz"], self.ya.modeledData[P], self.ya.modeledLinePlot[P], ls=self.ya.modeledLineStyle[P], lw=1, label="modeledZPlot") # Update results text box self.print_results(values) if self.amw.checkBoxLocked.isChecked(): self.amw.labelStatus.setText("") else: # Append "written to" text to results box outstr = self.amw.labelParams.text() outstr += "
Written to
" + PARAM_FILE self.amw.labelParams.setText(outstr) # Print optimization info status = "Number of function calls: " + str(result.nfev) + "
" if result.aborted: status = RICH_TEXT_RED + "Process aborted:
" #status += result.lmdif_message self.amw.labelStatus.setText(status) self.draw_formatted()