import operator
import itertools
import numpy as np
from scipy.signal import convolve
import astropy.units as u
from astropy.stats import sigma_clipped_stats
from ..manipulation.utils import excise_regions
from ..analysis import fwhm, centroid
from ..utils import QuantityModel
from ..manipulation import extract_region, noise_region_uncertainty
from ..spectra.spectral_region import SpectralRegion
from ..spectra.spectrum1d import Spectrum1D
from astropy.modeling import fitting, Model, models
from astropy.table import QTable
__all__ = ['find_lines_threshold', 'find_lines_derivative', 'fit_lines',
'estimate_line_parameters']
# Define the initial estimators
# This is the default methods to use to estimate astropy model
# parameters. This is based on only a small subset of the astropy
# models but it was determined that this is a decent start as most
# fitting will probably use one of these.
#
# Each method list must take a Spectrum1D object and should return
# a Quantity.
_parameter_estimators = {
'Gaussian1D': {
'amplitude': lambda s: max(s.flux),
'mean': lambda s: centroid(s, region=None),
'stddev': lambda s: fwhm(s)
},
'Lorentz1D': {
'amplitude': lambda s: max(s.flux),
'x_0': lambda s: centroid(s, region=None),
'fwhm': lambda s: fwhm(s)
},
'Voigt1D': {
'x_0': lambda s: centroid(s, region=None),
'amplitude_L': lambda s: max(s.flux),
'fwhm_L': lambda s: fwhm(s) / np.sqrt(2),
'fwhm_G': lambda s: fwhm(s) / np.sqrt(2)
}
}
def _set_parameter_estimators(model):
"""
Helper method used in method below.
"""
if model.__class__.__name__ in _parameter_estimators:
model._constraints['parameter_estimator'] = _parameter_estimators[model.__class__.__name__]
return model
[docs]def estimate_line_parameters(spectrum, model):
"""
The input ``model`` parameters will be estimated from the input ``spectrum``. The
``model`` can be specified with default parameters, for example ``Gaussian1D()``.
Parameters
----------
spectrum : `~specutils.Spectrum1D`
The spectrum object from which we will estimate the model parameters.
model : `~astropy.modeling.Model`
Model for which we want to estimate parameters from the spectrum.
Returns
-------
model : `~astropy.modeling.Model`
Model with parameters estimated.
"""
if not 'parameter_estimator' in model._constraints:
model = _set_parameter_estimators(model)
# Estimate the parameters based on the estimators already
# attached to the model
if 'parameter_estimator' in model._constraints:
for param, estimator in model._constraints['parameter_estimator'].items():
setattr(model, param, estimator(spectrum))
# No estimators.
else:
raise Exception('No method to estimate parameters')
return model
def _consecutive(data, stepsize=1):
return np.split(data, np.where(np.diff(data) != stepsize)[0]+1)
[docs]def find_lines_threshold(spectrum, noise_factor=1):
"""
Find the emission and absorption lines in a spectrum. The method
here is based on deviations larger than the spectrum's uncertainty by the
``noise_factor``.
This method only works with continuum-subtracted spectra and the uncertainty
must be defined on the spectrum. To add the uncertainty, one could use
`~specutils.manipulation.noise_region_uncertainty` to add the uncertainty.
Parameters
----------
spectrum : `~specutils.Spectrum1D`
The spectrum object in which the lines will be found.
noise_factor : float
``noise_factor`` multiplied by the spectrum's``uncertainty``, used for
thresholding.
Returns
-------
qtable: `~astropy.table.QTable`
Table of emission and absorption lines. Line center (``line_center``),
line type (``line_type``) and index of line center (``line_center_index``)
are stored for each line.
"""
# Threshold based on noise estimate and factor.
uncertainty = spectrum.uncertainty
inds = np.where(np.abs(spectrum.flux) > (noise_factor*uncertainty.array)*spectrum.flux.unit)[0]
pos_inds = inds[spectrum.flux.value[inds] > 0]
line_inds_grouped = _consecutive(pos_inds, stepsize=1)
if len(line_inds_grouped[0]) > 0:
emission_inds = [inds[np.argmax(spectrum.flux.value[inds])] for inds in line_inds_grouped]
else:
emission_inds = []
#
# Find the absorption lines
#
neg_inds = inds[spectrum.flux.value[inds] < 0]
line_inds_grouped = _consecutive(neg_inds, stepsize=1)
if len(line_inds_grouped[0]) > 0:
absorption_inds = [inds[np.argmin(spectrum.flux.value[inds])] for inds in line_inds_grouped]
else:
absorption_inds = []
#
# Create the QTable to return the lines
#
qtable = QTable()
qtable['line_center'] = list(itertools.chain(*[spectrum.spectral_axis.value[emission_inds],
spectrum.spectral_axis.value[absorption_inds]]))*spectrum.spectral_axis.unit
qtable['line_type'] = ['emission']*len(emission_inds) + ['absorption']*len(absorption_inds)
qtable['line_center_index'] = list(itertools.chain(*[emission_inds, absorption_inds]))
return qtable
[docs]def find_lines_derivative(spectrum, flux_threshold=None):
"""
Find the emission and absorption lines in a spectrum. The method
here is based on finding the zero crossings in the derivative
of the spectrum.
Parameters
----------
spectrum : Spectrum1D
The spectrum object over which the equivalent width will be calculated.
flux_threshold : float, `~astropy.units.Quantity` or None
The threshold a pixel must be above to be considered part of a line. If
a float, will assume the same units as ``spectrum.flux``. This threshold
is above and beyond the derivative searching step. Default is None so no
thresholding. The threshold is positive for emission lines and negative
for absorption lines.
Returns
-------
qtable: `~astropy.table.QTable`
Table of emission and absorption lines. Line center (``line_center``),
line type (``line_type``) and index of line center (``line_center_index``)
are stored for each line.
"""
# Take the derivative to find the zero crossings which correspond to
# the peaks (positive or negative)
kernel = [1, 0, -1]
dY = convolve(spectrum.flux, kernel, 'valid')
# Use sign flipping to determine direction of change
S = np.sign(dY)
ddS = convolve(S, kernel, 'valid')
# Add units if needed.
if flux_threshold is not None and isinstance(flux_threshold, (int, float)):
flux_threshold = float(flux_threshold) * spectrum.flux.unit
#
# Emmision lines
#
# Find all the indices that appear to be part of a +ve peak
candidates = np.where(dY > 0)[0] + (len(kernel) - 1)
line_inds = sorted(set(candidates).intersection(np.where(ddS == -2)[0] + 1))
if flux_threshold is not None:
line_inds = np.array(line_inds)[spectrum.flux[line_inds] > flux_threshold]
# Now group them and find the max highest point.
line_inds_grouped = _consecutive(line_inds, stepsize=1)
if len(line_inds_grouped[0]) > 0:
emission_inds = [inds[np.argmax(spectrum.flux[inds])] for inds in line_inds_grouped]
else:
emission_inds = []
#
# Absorption lines
#
# Find all the indices that appear to be part of a -ve peak
candidates = np.where(dY < 0)[0] + (len(kernel) - 1)
line_inds = sorted(set(candidates).intersection(np.where(ddS == 2)[0] + 1))
if flux_threshold is not None:
line_inds = np.array(line_inds)[spectrum.flux[line_inds] < -flux_threshold]
# Now group them and find the max highest point.
line_inds_grouped = _consecutive(line_inds, stepsize=1)
if len(line_inds_grouped[0]) > 0:
absorption_inds = [inds[np.argmin(spectrum.flux[inds])] for inds in line_inds_grouped]
else:
absorption_inds = []
#
# Create the QTable to return the lines
#
qtable = QTable()
qtable['line_center'] = list(itertools.chain(*[spectrum.spectral_axis.value[emission_inds],
spectrum.spectral_axis.value[absorption_inds]]))*spectrum.spectral_axis.unit
qtable['line_type'] = ['emission']*len(emission_inds) + ['absorption']*len(absorption_inds)
qtable['line_center_index'] = list(itertools.chain(*[emission_inds, absorption_inds]))
return qtable
[docs]def fit_lines(spectrum, model, fitter=fitting.LevMarLSQFitter(),
exclude_regions=None, weights=None, window=None,
**kwargs):
"""
Fit the input models to the spectrum. The parameter values of the
input models will be used as the initial conditions for the fit.
Parameters
----------
spectrum : Spectrum1D
The spectrum object over which the equivalent width will be calculated.
model: `~astropy.modeling.Model` or list of `~astropy.modeling.Model`
The model or list of models that contain the initial guess.
exclude_regions : list of `~specutils.SpectralRegion`
List of regions to exclude in the fitting.
weights : list (NOT IMPLEMENTED YET)
List of weights to define importance of fitting regions.
window : `~specutils.SpectralRegion` or list of `~specutils.SpectralRegion`
Regions of the spectrum to use in the fitting. If None, then the
whole spectrum will be used in the fitting.
Additional keyword arguments are passed directly into the call to the
``fitter``.
Returns
-------
models : Compound model of `~astropy.modeling.Model`
A compound model of models with fitted parameters.
Notes
-----
* Could add functionality to set the bounds in
``model`` if they are not set.
* The models in the list of ``model`` are added
together and passed as a compound model to the
`~astropy.modeling.fitting.Fitter` class instance.
"""
#
# If we are to exclude certain regions, then remove them.
#
if exclude_regions is not None:
spectrum = excise_regions(spectrum, exclude_regions)
#
# Make the model a list if not already
#
single_model_in = not isinstance(model, list)
if single_model_in:
model = [model]
#
# If a single model is passed in then just do that.
#
fitted_models = []
for modeli, model_guess in enumerate(model):
#
# Determine the window if it is not None. There
# are several options here:
# window = 4 * u.Angstrom -> Quantity
# window = (4*u.Angstrom, 6*u.Angstrom) -> tuple
# window = (4, 6)*u.Angstrom -> Quantity
#
#
# Determine the window if there is one
#
if window is not None and isinstance(window, list):
model_window = window[modeli]
elif window is not None:
model_window = window
else:
model_window = None
#
# Check to see if the model has units. If it does not
# have units then we are going to ignore them.
#
ignore_units = getattr(model_guess, model_guess.param_names[0]).unit is None
fit_model = _fit_lines(spectrum, model_guess, fitter,
exclude_regions, weights, model_window,
ignore_units, **kwargs)
fitted_models.append(fit_model)
if single_model_in:
fitted_models = fitted_models[0]
return fitted_models
def _fit_lines(spectrum, model, fitter=fitting.LevMarLSQFitter(),
exclude_regions=None, weights=None, window=None, ignore_units=False,
**kwargs):
"""
Fit the input model (initial conditions) to the spectrum. Output will be
the same model with the parameters set based on the fitting.
spectrum, model -> model
Parameters
----------
spectrum : Spectrum1D
The spectrum object over which the equivalent width will be calculated.
model: `~astropy.modeling.Model`
The model or that contain the initial guess.
exclude_regions : list of `~specutils.SpectralRegion`
List of regions to exclude in the fitting.
weights : list (NOT IMPLEMENTED YET)
List of weights to define importance of fitting regions.
window : `~specutils.SpectralRegion` or list of `~specutils.SpectralRegion`
Regions of the spectrum to use in the fitting. If None, then the
whole spectrum will be used in the fitting.
ignore_units : bool
If True, then ignore any units on the input model parameters.
(This would effectively be assuming the model and spectrum have the same units.)
Returns
-------
model : Compound model of `~astropy.modeling.Model`
A compound model of models with fitted parameters.
Notes
-----
* Could add functionality to set the bounds in ``model`` if they are not set.
* Additional keyword arguments are passed directly into
the call to the ``fitter``.
"""
if weights is not None:
raise NotImplementedError('Weights are not yet implemented.')
#
# If we are to exclude certain regions, then remove them.
#
if exclude_regions is not None:
spectrum = excise_regions(spectrum, exclude_regions)
dispersion = spectrum.spectral_axis
dispersion_unit = spectrum.spectral_axis.unit
flux = spectrum.flux
flux_unit = spectrum.flux.unit
#
# Determine the window if it is not None. There
# are several options here:
# window = 4 * u.Angstrom -> Quantity
# window = (4*u.Angstrom, 6*u.Angstrom) -> tuple
# window = (4, 6)*u.Angstrom -> Quantity
#
#
# Determine the window if there is one
#
# In this case the window defines the area around the center of each model
if window is not None and isinstance(window, (float, int)):
center = model.mean
indices = np.nonzero((spectrum.spectral_axis >= center-window) & (spectrum.spectral_axis < center+window))
dispersion = dispersion[indices]
flux = flux[indices]
# In this case the window is the start and end points of where we should fit
elif window is not None and isinstance(window, tuple):
indices = np.nonzero((dispersion >= window[0]) & (dispersion < window[1]))
dispersion = dispersion[indices]
flux = flux[indices]
elif window is not None and isinstance(window, SpectralRegion):
try:
idx1, idx2 = window.bounds
if idx1 == idx2:
raise Exception("Bad selected region.")
extracted_regions = extract_region(spectrum, window)
dispersion, flux = _combined_region_data(extracted_regions)
dispersion = dispersion * dispersion_unit
flux = flux * flux_unit
except ValueError as e:
return
if flux is None or len(flux) == 0:
raise Exception("Spectrum flux is empty or None.")
input_spectrum = spectrum
spectrum = Spectrum1D(flux=flux.value * flux_unit,
spectral_axis=dispersion.value * dispersion_unit,
wcs=input_spectrum.wcs,
velocity_convention=input_spectrum.velocity_convention,
rest_value=input_spectrum.rest_value)
#
# Compound models with units can not be fit.
#
# Convert the model initial guess to the spectral
# units and then remove the units
#
model_unitless, dispersion_unitless, flux_unitless = _strip_units_from_model(model, spectrum, convert=not ignore_units)
#
# Do the fitting of spectrum to the model.
#
fit_model_unitless = fitter(model_unitless, dispersion_unitless, flux_unitless,
**kwargs)
#
# Now add the units back onto the model....
#
if not ignore_units:
fit_model = _add_units_to_model(fit_model_unitless, model, spectrum)
else:
fit_model = QuantityModel(fit_model_unitless, spectrum.spectral_axis.unit, spectrum.flux.unit)
return fit_model
def _combined_region_data(spec):
if isinstance(spec, list):
# Merge sub-spec spectral_axis and flux values.
x = np.array([sv for subspec in spec if subspec is not None
for sv in subspec.spectral_axis.value])
y = np.array([sv for subspec in spec if subspec is not None
for sv in subspec.flux.value])
else:
if spec is None:
return
x = spec.spectral_axis.value
y = spec.flux.value
if len(x) == 0:
return
return x, y
def _convert(q, dispersion_unit, dispersion, flux_unit):
#
# Convert the quantity to the spectrum's units, and then we will use
# the *value* of it in the new unitless-model.
#
if q.unit.is_equivalent(dispersion_unit, equivalencies=u.equivalencies.spectral()):
quantity = q.to(dispersion_unit, equivalencies=u.equivalencies.spectral())
elif q.unit.is_equivalent(flux_unit, equivalencies=u.equivalencies.spectral_density(dispersion)):
quantity = q.to(flux_unit, equivalencies=u.equivalencies.spectral_density(dispersion))
return quantity
def _convert_and_dequantify(poss_quantity, dispersion_unit, dispersion, flux_unit, convert=True):
"""
This method will convert the ``poss_quantity`` value to the proper
dispersion or flux units and then strip the units.
If the ``poss_quantity`` is None, or a number, we just return that...
Note: This method can be removed along with most of the others here
when astropy.fitting will fit models that contain units.
"""
if poss_quantity is None or isinstance(poss_quantity, (float, int)):
return poss_quantity
if convert and hasattr(poss_quantity, 'quantity') and poss_quantity.quantity is not None:
q = poss_quantity.quantity
quantity = _convert(q, dispersion_unit, dispersion, flux_unit)
v = quantity.value
elif convert and isinstance(poss_quantity, u.Quantity):
quantity = _convert(poss_quantity, dispersion_unit, dispersion, flux_unit)
v = quantity.value
else:
v = poss_quantity.value
return v
def _strip_units_from_model(model_in, spectrum, convert=True):
"""
This method strips the units from the model, so the result can
be passed to the fitting routine. This is necessary as CoumpoundModel
with units does not work in the fitters.
Note: When CompoundModel with units works in the fitters this method
can be removed.
Note: This assumes there are two types of models, those that are
based on `~astropy.modeling.models.PolynomialModel` and therefore
require the ``degree`` parameter when instantiating the class, and
"everything else" that does not require an "extra" parameter for
class instantiation.
Note: If convert is False, then we will *not* do the conversion of units
to the units of the Spectrum1D object. Otherwise we will convert.
"""
#
# Get the dispersion and flux information from the spectrum
#
dispersion = spectrum.spectral_axis
dispersion_unit = spectrum.spectral_axis.unit
flux = spectrum.flux
flux_unit = spectrum.flux.unit
#
# Determine if a compound model
#
compound_model = model_in.n_submodels() > 1
if not compound_model:
# For this we are going to just make it a list so that we
# can use the looping structure below.
model_in = [model_in]
else:
# If it is a compound model then we are going to create the RPN
# representation of it which is a list that contains either astropy
# models or string representations of operators (e.g., '+' or '*').
model_in = [c.value for c in model_in._tree.traverse_postorder()]
#
# Run through each model in the list or compound model
#
model_out_stack = []
for sub_model in model_in:
#
# If it is an operator put onto the stack and move on...
#
if not isinstance(sub_model, Model):
model_out_stack.append(sub_model)
continue
#
# Make a new instance of the class.
#
if isinstance(sub_model, models.PolynomialModel):
new_sub_model = sub_model.__class__(sub_model.degree)
else:
new_sub_model = sub_model.__class__()
#
# Now for each parameter in the model determine if a dispersion or
# flux type of unit, then convert to spectrum units and then get the value.
#
for pn in new_sub_model.param_names:
# This could be a Quantity or Parameter
v = _convert_and_dequantify(getattr(sub_model, pn), dispersion_unit, dispersion, flux_unit, convert=convert)
#
# Add this information for the parameter name into the
# new sub model.
#
setattr(new_sub_model, pn, v)
#
# Copy over all the constraints (e.g., tied, fixed...)
#
for k, v in sub_model._constraints.items():
new_sub_model._constraints[k] = v
#
# Convert teh bounds parameter
#
new_bounds = []
for a in sub_model.bounds[pn]:
v = _convert_and_dequantify(a, dispersion_unit, dispersion, flux_unit, convert=convert)
new_bounds.append(v)
new_sub_model.bounds[pn] = tuple(new_bounds)
# The new model now has unitless information in it but has
# been converted to spectral unit scale.
model_out_stack.append(new_sub_model)
# If a compound model we need to re-create it, otherwise
# it is a single model and we just get the first one (as
# there is only one).
if compound_model:
model_out = _combine_postfix(model_out_stack)
else:
model_out = model_out_stack[0]
return model_out, dispersion.value, flux.value
def _add_units_to_model(model_in, model_orig, spectrum):
"""
This method adds the units to the model based on the units of the
model passed in. This is necessary as CoumpoundModel
with units does not work in the fitters.
Note: When CompoundModel with units works in the fitters this method
can be removed.
Note: This assumes there are two types of models, those that are
based on `~astropy.modeling.models.PolynomialModel` and therefore
require the ``degree`` parameter when instantiating the class, and
"everything else" that does not require an "extra" parameter for
class instantiation.
"""
dispersion = spectrum.spectral_axis
#
# If not a compound model, then make a single element
# list so we can use the for loop below.
#
compound_model = model_in.n_submodels() > 1
if not compound_model:
model_in_list = [model_in]
model_orig_list = [model_orig]
else:
compound_model_in = model_in
model_in_list = [c.value for c in model_in._tree.traverse_postorder()]
model_orig_list = [c.value for c in model_orig._tree.traverse_postorder()]
model_out_stack = []
model_index = 0
#
# For each model in the list we will convert the values back to
# the original (sub-)model units.
#
for ii, m_in in enumerate(model_in_list):
#
# If an operator (ie not Model) then we'll just add
# to the stack and evaluate at the end.
#
if not isinstance(m_in, Model):
model_out_stack.append(m_in)
continue
#
# Get the corresponding *original* sub-model that
# will match the current sub-model. From this we will
# grab the units to apply.
#
m_orig = model_orig_list[ii]
#
# Make the new sub-model.
#
if isinstance(m_in, models.PolynomialModel):
new_sub_model = m_in.__class__(m_in.degree)
else:
new_sub_model = m_in.__class__()
#
# Convert the model values from the spectrum units back to the
# original model units.
#
for pi, pn in enumerate(new_sub_model.param_names):
#
# Get the parameter from the original model and unit-less model.
#
m_orig_param = getattr(m_orig, pn)
m_in_param = getattr(m_in, pn)
if hasattr(m_orig_param, 'quantity') and m_orig_param.quantity is not None:
m_orig_param_quantity = m_orig_param.quantity
#
# If a spectral dispersion type of unit...
#
if m_orig_param_quantity.unit.is_equivalent(spectrum.spectral_axis.unit,
equivalencies=u.equivalencies.spectral()):
# If it is a compound model, then we need to get the value from the
# actual compound model as the tree is not updated in the fitting
if compound_model:
current_value = getattr(compound_model_in, '{}_{}'.format(pn, model_index)).value *\
spectrum.spectral_axis.unit
else:
current_value = m_in_param.value * spectrum.spectral_axis.unit
v = current_value.to(m_orig_param_quantity.unit, equivalencies=u.equivalencies.spectral())
#
# If a spectral density type of unit...
#
elif m_orig_param_quantity.unit.is_equivalent(spectrum.flux.unit,
equivalencies=u.equivalencies.spectral_density(dispersion)):
# If it is a compound model, then we need to get the value from the
# actual compound model as the tree is not updated in the fitting
if compound_model:
current_value = getattr(compound_model_in, '{}_{}'.format(pn, model_index)).value *\
spectrum.flux.unit
else:
current_value = m_in_param.value * spectrum.flux.unit
v = current_value.to(m_orig_param_quantity.unit,
equivalencies=u.equivalencies.spectral_density(dispersion))
else:
v = getattr(m_in, pn).value
#
# Set the parameter value into the new sub-model.
#
setattr(new_sub_model, pn, v)
#
# Copy over all the constraints (e.g., tied, fixed, bounds...)
#
for k, v in m_orig._constraints.items():
new_sub_model._constraints[k] = v
#
# Add the new unit-filled model onto the stack.
#
model_out_stack.append(new_sub_model)
model_index += 1
#
# Create the output model which is either the evaulation
# of the RPN representation of the model (if a compound model)
# or just the first element if a non-compound model.
#
if compound_model:
model_out = _combine_postfix(model_out_stack)
else:
model_out = model_out_stack[0]
# If the first parameter is not a Quantity, then at this point we will assume
# none of them are. (It would be inconsistent for fitting to have a model that
# has some parameters as Quantities and some values).
if getattr(model_orig, model_orig.param_names[0]).unit is None:
model_out = QuantityModel(model_out, spectrum.spectral_axis.unit, spectrum.flux.unit)
return model_out
def _combine_postfix(equation):
"""
Given a Python list in post order (RPN) of an equation, convert/apply the operations to evaluate.
The list order is the same as what is output from ``model._tree.traverse_postorder()``.
Structure modified from https://codereview.stackexchange.com/questions/79795/reverse-polish-notation-calculator-in-python
"""
ops = {'+': operator.add,
'-': operator.sub,
'*': operator.mul,
'/': operator.truediv,
'^': operator.pow,
'**': operator.pow}
stack = []
result = 0
for i in equation:
if isinstance(i, Model):
stack.insert(0, i)
else:
if len(stack) < 2:
print('Error: insufficient values in expression')
break
else:
n1 = stack.pop(1)
n2 = stack.pop(0)
result = ops[i](n1, n2)
stack.insert(0, result)
return result