Source code for easistrain.EDD.utils

import argparse
from typing import Callable, List, Sequence, Tuple, Union
from pathlib import Path
import yaml
import numpy
import scipy.optimize
import silx.math.fit


[docs] def read_config_file(path: Union[str, Path]) -> dict: with open(path, "r") as config_file: return yaml.load(config_file, Loader=yaml.SafeLoader)
[docs] def run_from_cli(f: Callable): parser = argparse.ArgumentParser() parser.add_argument("config_file", type=str, help="Path to the config file") args = parser.parse_args() config = read_config_file(args.config_file) f(**config) print(f"Data processing step '{f.__name__}' finished !")
[docs] def linefunc(a, xData): return a * xData
[docs] def splitPseudoVoigt(xData, *params): return silx.math.fit.sum_splitpvoigt(xData, *params) # type: ignore
[docs] def gaussEstimation(xData, *params): return silx.math.fit.sum_gauss(xData, *params) # type: ignore
[docs] def calcBackground( xData: numpy.ndarray, yData: numpy.ndarray, fwhmRight: float, fwhmLeft: float, guessedPeaksIndex: Sequence[float], ) -> numpy.ndarray: """Extracts the data outside of the peak bounds to fit the background""" try: peaks_left_bound = int(guessedPeaksIndex[0] - 3 * fwhmLeft) peaks_right_bound = int(guessedPeaksIndex[-1] + 3 * fwhmRight) # print(peaks_left_bound, peaks_right_bound) if peaks_left_bound <= 5 and peaks_right_bound < len(xData) - 5: # case of not enough of points at left xBackground = numpy.append(xData[0], xData[peaks_right_bound:]) yBackground = numpy.append(yData[0], yData[peaks_right_bound:]) if peaks_right_bound >= len(xData) - 5 and peaks_left_bound > 5: # case of not enough of points at right xBackground = numpy.append(xData[:peaks_left_bound], xData[-1]) yBackground = numpy.append(yData[:peaks_left_bound], yData[-1]) if peaks_left_bound <= 5 and peaks_right_bound >= len(xData) - 5: # case of not enough of points at left and right xBackground = numpy.append(xData[:5], xData[-5:]) yBackground = numpy.append(yData[:5], yData[-5:]) if peaks_left_bound > 5 and peaks_right_bound < len(xData) - 5: # case of enough of points at left and right xBackground = numpy.append( xData[:peaks_left_bound], xData[peaks_right_bound:] ) yBackground = numpy.append( yData[:peaks_left_bound], yData[peaks_right_bound:] ) backgroundCoefficient = numpy.polyfit( x=xBackground, y=yBackground, deg=1 ) ## fit of background with 1d polynom function except ValueError: backgroundCoefficient = numpy.empty(2) backgroundCoefficient.fill(numpy.NaN) return numpy.poly1d(backgroundCoefficient)( xData ) ## yBackground calculated with the 1d polynom fitted coefficient
[docs] def guessParameters( xData: numpy.ndarray, yData: numpy.ndarray, nb_peaks: int, withBounds: bool, ) -> Tuple[numpy.ndarray, List[float]]: fwhmGuess = silx.math.fit.peaks.guess_fwhm(yData) first_peaks_guess: List[float] = silx.math.fit.peaks.peak_search( yData, fwhmGuess, sensitivity=2.5, relevance_info=False, ) if ( len(first_peaks_guess) != nb_peaks ): ## case if more or less peaks than expected are detected raw_peak_guess: List[Tuple[float, float]] = silx.math.fit.peaks.peak_search( yData, fwhmGuess, sensitivity=1, relevance_info=True ) ## index of the peak with peak relevance peaksGuessArray = numpy.asarray(raw_peak_guess) # Take nb_peaks that are the most relevant orderedIndex = numpy.argsort(peaksGuessArray[:, 1])[-nb_peaks:] unsorted_peaks_guess: List[float] = peaksGuessArray[ orderedIndex[:], 0 ].tolist() ## peaks indices else: unsorted_peaks_guess = first_peaks_guess peaksGuess = sorted(unsorted_peaks_guess) p0Guess = numpy.zeros(3 * nb_peaks, float) minBounds = numpy.empty((nb_peaks, 3)) maxBounds = numpy.empty((nb_peaks, 3)) try: for ipar in range(nb_peaks): p0Guess[3 * ipar] = yData[int(peaksGuess[ipar])] p0Guess[3 * ipar + 1] = xData[int(peaksGuess[ipar])] p0Guess[3 * ipar + 2] = fwhmGuess minBounds[ipar, 0] = numpy.amin(yData) # min H minBounds[ipar, 1] = ( p0Guess[3 * ipar + 1] - 3 * p0Guess[3 * ipar + 2] ) # min C minBounds[ipar, 2] = 0 # min FWHM maxBounds[ipar, 0] = numpy.amax(yData) # max H maxBounds[ipar, 1] = ( p0Guess[3 * ipar + 1] + 3 * p0Guess[3 * ipar + 2] ) # max C maxBounds[ipar, 2] = 2 * p0Guess[3 * ipar + 2] # max FWHM firstGuess, covGuess = scipy.optimize.curve_fit( gaussEstimation, xData, yData, p0Guess, **( {"bounds": (minBounds.flatten(), maxBounds.flatten()), "maxfev": 10000} if withBounds else {} ), ) except (IndexError, RuntimeError, ValueError): peaksGuess = numpy.empty(nb_peaks) peaksGuess.fill(numpy.NaN) firstGuess = numpy.empty_like(p0Guess) firstGuess.fill(numpy.NaN) print("!! guessing fit parameters failed !!") return firstGuess, peaksGuess
[docs] def uChEConversion(a, b, c, ch, ua, ub, uc, uch): """ Calculates the uncertainty on the energy coming from the conversion from channel to energy. It includes the uncertainty on the calibration of the coefficients and the peak position """ return numpy.sqrt( ((ua**2) * (ch**4)) + ((uch**2)) * (((2 * a * ch) + b) ** 2) + ((ub**2) * ch**2) + (uc**2) )
[docs] def fit_detector_data( channels: numpy.ndarray, raw_data: numpy.ndarray, nb_peaks: int, boxCounter: int, scanNumber: int, detectorName: str, ): """ Process detector data: - Find the background - Make a first guess of peak parameters - Calculate a background (?) - Fit the data without background starting from the first guess """ # print(raw_data, channels) first_guess_background: numpy.ndarray = silx.math.fit.strip( # type: ignore data=raw_data, w=5, niterations=4000, factor=1, anchors=None, ) peak_guesses, peak_indices = guessParameters( xData=channels, yData=raw_data - first_guess_background, nb_peaks=nb_peaks, withBounds=True, ) ## guess fit parameters calculated_background = calcBackground( xData=channels, yData=raw_data, fwhmRight=peak_guesses[-1], fwhmLeft=peak_guesses[2], guessedPeaksIndex=peak_indices, ) initial_fit_guess = numpy.zeros(5 * nb_peaks) fit_min_bounds = numpy.zeros(5 * nb_peaks) fit_max_bounds = numpy.zeros(5 * nb_peaks) for n in range(nb_peaks): initial_fit_guess[5 * n] = peak_guesses[3 * n] initial_fit_guess[5 * n + 1] = peak_guesses[3 * n + 1] initial_fit_guess[5 * n + 2] = peak_guesses[3 * n + 2] initial_fit_guess[5 * n + 3] = peak_guesses[3 * n + 2] initial_fit_guess[5 * n + 4] = 0.5 fit_min_bounds[5 * n : 5 * n + 5] = [ numpy.amin(raw_data), initial_fit_guess[5 * n + 1] - 3 * initial_fit_guess[5 * n + 2], 0, 0, 0, ] fit_max_bounds[5 * n : 5 * n + 5] = [ numpy.amax(raw_data), initial_fit_guess[5 * n + 1] + 3 * initial_fit_guess[5 * n + 2], len(channels) / 2, len(channels) / 2, 1, ] try: optimal_parameters, covariance = scipy.optimize.curve_fit( f=splitPseudoVoigt, xdata=channels, ydata=raw_data - calculated_background, p0=initial_fit_guess, bounds=(fit_min_bounds, fit_max_bounds), maxfev=10000, sigma=numpy.sqrt(0.5 + raw_data), ) except (RuntimeError, ValueError): print( f"!!Fitting of Peaks in box {boxCounter} in scan {scanNumber} failed for the {detectorName} detector !!" ) print("!! Filling fit parameters with NaN values") optimal_parameters = numpy.empty_like(initial_fit_guess) optimal_parameters.fill(numpy.NaN) covariance = numpy.empty((5 * nb_peaks, 5 * nb_peaks)) covariance.fill(numpy.NaN) fitted_data = ( splitPseudoVoigt(channels, *list(optimal_parameters)) + calculated_background ) goodness_factor = ( 100 * numpy.sum(numpy.absolute(fitted_data - raw_data)) / numpy.sum(raw_data) ) fit_params = numpy.empty((nb_peaks, 6), dtype=optimal_parameters.dtype) for n in range(nb_peaks): fit_params[n, :5] = optimal_parameters[5 * n : 5 * n + 5] fit_params[n, 5] = goodness_factor uncertainty_fit_params = numpy.sqrt(numpy.diag(covariance)) return ( calculated_background, fitted_data, fit_params.flatten(), uncertainty_fit_params, )