Source code for specutils.spectra.spectrum_mixin

import logging
from copy import deepcopy

import numpy as np
from astropy.wcs import WCSSUB_SPECTRAL
from astropy.units import Unit
from astropy import units as u
import astropy.units.equivalencies as eq
from astropy.utils.decorators import lazyproperty

DOPPLER_CONVENTIONS = {}
DOPPLER_CONVENTIONS['radio'] = u.doppler_radio
DOPPLER_CONVENTIONS['optical'] = u.doppler_optical
DOPPLER_CONVENTIONS['relativistic'] = u.doppler_relativistic

__all__ = ['OneDSpectrumMixin']


[docs]class OneDSpectrumMixin: @property def _spectral_axis_numpy_index(self): return self.data.ndim - 1 - self.wcs.wcs.spec @property def _spectral_axis_len(self): """ How many elements are in the spectral dimension? """ return self.data.shape[self._spectral_axis_numpy_index] @property def _data_with_spectral_axis_last(self): """ Returns a view of the data with the spectral axis last """ if self._spectral_axis_numpy_index == self.data.ndim - 1: return self.data else: return self.data.swapaxes(self._spectral_axis_numpy_index, self.data.ndim - 1) @property def _data_with_spectral_axis_first(self): """ Returns a view of the data with the spectral axis first """ if self._spectral_axis_numpy_index == 0: return self.data else: return self.data.swapaxes(self._spectral_axis_numpy_index, 0) @property def spectral_wcs(self): return self.wcs.axes.spectral @lazyproperty def spectral_axis(self): """ Returns a Quantity array with the values of the spectral axis. """ if len(self.flux) > 0: spectral_axis = self.wcs.pixel_to_world(np.arange(self.flux.shape[-1])) else: # After some discussion it was suggested to create the empty spectral # axis this way to better use the WCS infrastructure. This is to prepare # for a future where pixel_to_world might yield something more than # just a raw Quantity, which is planned for the mid-term in astropy and # possible gwcs. Such changes might necessitate a revisit of this code. dummy_spectrum = self.__class__(spectral_axis=[1,2]*self.wcs.spectral_axis_unit, flux=[1,2]*self.flux.unit) spectral_axis = dummy_spectrum.wcs.pixel_to_world([0])[1:] return spectral_axis @property def spectral_axis_unit(self): """ Returns the units of the spectral axis. """ return self.wcs.spectral_axis_unit @property def flux(self): """ Converts the stored data and unit information into a quantity. Returns ------- `~astropy.units.Quantity` Spectral data as a quantity. """ return u.Quantity(self.data, unit=self.unit, copy=False)
[docs] def new_flux_unit(self, unit, equivalencies=None, suppress_conversion=False): """ Converts the flux data to the specified unit. This is an in-place change to the object. Parameters ---------- unit : str or `~astropy.units.Unit` The unit to conver the flux array to. equivalencies : list of equivalencies Custom equivalencies to apply to conversions. Set to spectral_density by default. suppress_conversion : bool Set to true if updating the unit without converting data values. Returns ------- `~specutils.Spectrum1D` A new spectrum with the converted flux array """ new_spec = deepcopy(self) if not suppress_conversion: if equivalencies is None: equivalencies = eq.spectral_density(self.spectral_axis) new_data = self.flux.to( unit, equivalencies=equivalencies) new_spec._data = new_data.value new_spec._unit = new_data.unit else: new_spec._unit = u.Unit(unit) return new_spec
@property def velocity_convention(self): return self._velocity_convention
[docs] def with_velocity_convention(self, velocity_convention): return self.__class__(flux=self.flux, wcs=self.wcs, meta=self.meta, velocity_convention=velocity_convention)
@property def rest_value(self): return self._rest_value @rest_value.setter def rest_value(self, value): if not hasattr(value, 'unit') or not value.unit.is_equivalent(u.Hz, u.spectral()): raise u.UnitsError( "Rest value must be energy/wavelength/frequency equivalent.") self._rest_value = value @property def velocity(self): """ Converts the spectral axis array to the given velocity space unit given the rest value. These aren't input parameters but required Spectrum attributes Parameters ---------- unit : str or ~`astropy.units.Unit` The unit to convert the dispersion array to. rest : ~`astropy.units.Quantity` Any quantity supported by the standard spectral equivalencies (wavelength, energy, frequency, wave number). type : {"doppler_relativistic", "doppler_optical", "doppler_radio"} The type of doppler spectral equivalency. Returns ------- ~`astropy.units.Quantity` The converted dispersion array in the new dispersion space. """ if not hasattr(self, '_rest_value') or self._rest_value is None: raise ValueError("Cannot get velocity representation of spectral " "axis without specifying a reference value.") if not hasattr(self, '_velocity_convention') or self._velocity_convention is None: raise ValueError("Cannot get velocity representation of spectral " "axis without specifying a velocity convention.") equiv = getattr(u.equivalencies, 'doppler_{0}'.format( self.velocity_convention))(self.rest_value) new_data = self.spectral_axis.to(u.km/u.s, equivalencies=equiv) return new_data
[docs] def with_spectral_unit(self, unit, velocity_convention=None, rest_value=None): """ Returns a new spectrum with a different spectral axis unit. Parameters ---------- unit : :class:`~astropy.units.Unit` Any valid spectral unit: velocity, (wave)length, or frequency. Only vacuum units are supported. velocity_convention : 'relativistic', 'radio', or 'optical' The velocity convention to use for the output velocity axis. Required if the output type is velocity. This can be either one of the above strings, or an `astropy.units` equivalency. rest_value : :class:`~astropy.units.Quantity` A rest wavelength or frequency with appropriate units. Required if output type is velocity. The spectrum's WCS should include this already if the *input* type is velocity, but the WCS's rest wavelength/frequency can be overridden with this parameter. .. note: This must be the rest frequency/wavelength *in vacuum*, even if your spectrum has air wavelength units """ new_wcs, new_meta = self._new_spectral_wcs( unit=unit, velocity_convention=velocity_convention or self._velocity_convention, rest_value=rest_value or self.rest_value) spectrum = self.__class__(flux=self.flux, wcs=new_wcs, meta=new_meta) return spectrum
def _new_wcs_argument_validation(self, unit, velocity_convention, rest_value): # Allow string specification of units, for example if not isinstance(unit, u.UnitBase): unit = u.Unit(unit) # Velocity conventions: required for frq <-> velo # convert_spectral_axis will handle the case of no velocity # convention specified & one is required if velocity_convention in DOPPLER_CONVENTIONS: velocity_convention = DOPPLER_CONVENTIONS[velocity_convention] elif (velocity_convention is not None and velocity_convention not in DOPPLER_CONVENTIONS.values()): raise ValueError("Velocity convention must be radio, optical, " "or relativistic.") # If rest value is specified, it must be a quantity if (rest_value is not None and (not hasattr(rest_value, 'unit') or not rest_value.unit.is_equivalent(u.m, u.spectral()))): raise ValueError("Rest value must be specified as an astropy " "quantity with spectral equivalence.") return unit def _new_spectral_wcs(self, unit, velocity_convention=None, rest_value=None): """ Returns a new WCS with a different Spectral Axis unit. Parameters ---------- unit : :class:`~astropy.units.Unit` Any valid spectral unit: velocity, (wave)length, or frequency. Only vacuum units are supported. velocity_convention : 'relativistic', 'radio', or 'optical' The velocity convention to use for the output velocity axis. Required if the output type is velocity. This can be either one of the above strings, or an `astropy.units` equivalency. rest_value : :class:`~astropy.units.Quantity` A rest wavelength or frequency with appropriate units. Required if output type is velocity. The cube's WCS should include this already if the *input* type is velocity, but the WCS's rest wavelength/frequency can be overridden with this parameter. .. note: This must be the rest frequency/wavelength *in vacuum*, even if your cube has air wavelength units """ unit = self._new_wcs_argument_validation(unit, velocity_convention, rest_value) if velocity_convention is not None: equiv = getattr(u, 'doppler_{0}'.format(velocity_convention)) rest_value.to(unit, equivalencies=equiv) # Store the original unit information for posterity meta = self._meta.copy() if 'original_unit' not in self._meta: meta['original_unit'] = self.wcs.spectral_axis_unit # Create the new wcs object new_wcs = self.wcs.with_spectral_unit(unit=unit, rest_value=rest_value, velocity_convention=velocity_convention) return new_wcs, meta
class InplaceModificationMixin: # Example methods follow to demonstrate how methods can be written to be # agnostic of the non-spectral dimensions. def substract_background(self, background): """ Proof of concept, this subtracts a background spectrum-wise """ data = self._data_with_spectral_axis_last if callable(background): # create substractable array pass elif (isinstance(background, np.ndarray) and background.shape == data[-1].shape): substractable_continuum = background else: raise ValueError( "background needs to be callable or have the same shape as the spectum") data[-1] -= substractable_continuum def normalize(self): """ Proof of concept, this normalizes each spectral dimension based on a trapezoidal integration. """ # this gets a view - if we want normalize to return a new NDData object # then we should make _data_with_spectral_axis_first return a copy. data = self._data_with_spectral_axis_first dx = np.diff(self.spectral_axis) dy = 0.5 * (data[:-1] + data[1:]) norm = np.sum(dx * dy.transpose(), axis=-1).transpose() data /= norm def spectral_interpolation(self, spectral_value, flux_unit=None): """ Proof of concept, this interpolates along the spectral dimension """ data = self._data_with_spectral_axis_last from scipy.interpolate import interp1d interp = interp1d(self.spectral_axis.value, data) x = spectral_value.to(self.spectral_axis.unit, equivalencies=u.spectral()) y = interp(x) if self.unit is not None: y *= self.unit if flux_unit is None: # Lim: Is this acceptable? return y else: return y.to(flux_unit, equivalencies=u.spectral_density(x))