Source code for specutils.spectra.spectrum1d

import logging
from copy import deepcopy

import numpy as np
from astropy import units as u
from astropy.nddata import NDDataRef
from astropy.utils.decorators import lazyproperty
from .spectrum_mixin import OneDSpectrumMixin
from .spectral_axis import SpectralAxis
from ..utils.wcs_utils import gwcs_from_array

__all__ = ['Spectrum1D']

__doctest_skip__ = ['Spectrum1D.spectral_resolution']


[docs]class Spectrum1D(OneDSpectrumMixin, NDDataRef): """ Spectrum container for 1D spectral data. Parameters ---------- flux : `astropy.units.Quantity` or astropy.nddata.NDData`-like The flux data for this spectrum. spectral_axis : `astropy.units.Quantity` or `specutils.SpectralAxis` Dispersion information with the same shape as the last (or only) dimension of flux, or one greater than the last dimension of flux if specifying bin edges. wcs : `astropy.wcs.WCS` or `gwcs.wcs.WCS` WCS information object. velocity_convention : {"doppler_relativistic", "doppler_optical", "doppler_radio"} Convention used for velocity conversions. rest_value : `~astropy.units.Quantity` Any quantity supported by the standard spectral equivalencies (wavelength, energy, frequency, wave number). Describes the rest value of the spectral axis for use with velocity conversions. redshift See `redshift` for more information. radial_velocity See `radial_velocity` for more information. uncertainty : `~astropy.nddata.NDUncertainty` Contains uncertainty information along with propagation rules for spectrum arithmetic. Can take a unit, but if none is given, will use the unit defined in the flux. meta : dict Arbitrary container for any user-specific information to be carried around with the spectrum container object. """ def __init__(self, flux=None, spectral_axis=None, wcs=None, velocity_convention=None, rest_value=None, redshift=None, radial_velocity=None, bin_specification=None, **kwargs): # Check for pre-defined entries in the kwargs dictionary. unknown_kwargs = set(kwargs).difference( {'data', 'unit', 'uncertainty', 'meta', 'mask', 'copy'}) if len(unknown_kwargs) > 0: raise ValueError("Initializer contains unknown arguments(s): {}." "".format(', '.join(map(str, unknown_kwargs)))) # If the flux (data) argument is a subclass of nddataref (as it would # be for internal arithmetic operations), avoid setup entirely. if isinstance(flux, NDDataRef): super(Spectrum1D, self).__init__(flux) return # Ensure that the flux argument is an astropy quantity if flux is not None: if not isinstance(flux, u.Quantity): raise ValueError("Flux must be a `Quantity` object.") elif flux.isscalar: flux = u.Quantity([flux]) # Ensure that only one or neither of these parameters is set if redshift is not None and radial_velocity is not None: raise ValueError("Cannot set both radial_velocity and redshift at " "the same time.") # In cases of slicing, new objects will be initialized with `data` # instead of ``flux``. Ensure we grab the `data` argument. if flux is None and 'data' in kwargs: flux = kwargs.pop('data') # Ensure that the unit information codified in the quantity object is # the One True Unit. kwargs.setdefault('unit', flux.unit if isinstance(flux, u.Quantity) else kwargs.get('unit')) # In the case where the arithmetic operation is being performed with # a single float, int, or array object, just go ahead and ignore wcs # requirements if (not isinstance(flux, u.Quantity) or isinstance(flux, float) or isinstance(flux, int)) and np.ndim(flux) == 0: super(Spectrum1D, self).__init__(data=flux, wcs=wcs, **kwargs) return if rest_value is None: if hasattr(wcs, 'rest_frequency') and wcs.rest_frequency != 0: rest_value = wcs.rest_frequency * u.Hz elif hasattr(wcs, 'rest_wavelength') and wcs.rest_wavelength != 0: rest_value = wcs.rest_wavelength * u.AA elif hasattr(wcs, 'wcs') and hasattr(wcs.wcs, 'restfrq') and wcs.wcs.restfrq > 0: rest_value = wcs.wcs.restfrq * u.Hz elif hasattr(wcs, 'wcs') and hasattr(wcs.wcs, 'restwav') and wcs.wcs.restwav > 0: rest_value = wcs.wcs.restwav * u.m else: rest_value = None else: if not isinstance(rest_value, u.Quantity): logging.info("No unit information provided with rest value. " "Assuming units of spectral axis ('%s').", spectral_axis.unit) rest_value = u.Quantity(rest_value, spectral_axis.unit) elif not rest_value.unit.is_equivalent(u.AA, equivalencies=u.spectral()): raise u.UnitsError("Rest value must be " "energy/wavelength/frequency equivalent.") # If flux and spectral axis are both specified, check that their lengths # match or are off by one (implying the spectral axis stores bin edges) if flux is not None and spectral_axis is not None: if spectral_axis.shape[0] == flux.shape[-1]: if bin_specification == "edges": raise ValueError("A spectral axis input as bin edges" "must have length one greater than the flux axis") bin_specification = "centers" elif spectral_axis.shape[0] == flux.shape[-1]+1: if bin_specification == "centers": raise ValueError("A spectral axis input as bin centers" "must be the same length as the flux axis") bin_specification = "edges" else: raise ValueError( "Spectral axis length ({}) must be the same size or one " "greater (if specifying bin edges) than that of the last " "flux axis ({})".format(spectral_axis.shape[0], \ flux.shape[-1])) # Attempt to parse the spectral axis. If none is given, try instead to # parse a given wcs. This is put into a GWCS object to # then be used behind-the-scenes for all specutils operations. if spectral_axis is not None: # Ensure that the spectral axis is an astropy Quantity if not isinstance(spectral_axis, u.Quantity): raise ValueError("Spectral axis must be a `Quantity` or " "`SpectralAxis` object.") # If spectral axis is provided as an astropy Quantity, convert it # to a specutils SpectralAxis object. if not isinstance(spectral_axis, SpectralAxis): if spectral_axis.shape[0] == flux.shape[-1] + 1: bin_specification = "edges" else: bin_specification = "centers" self._spectral_axis = SpectralAxis( spectral_axis, redshift=redshift, radial_velocity=radial_velocity, doppler_rest=rest_value, doppler_convention=velocity_convention, bin_specification=bin_specification) # If a SpectralAxis object is provided, we assume it doesn't need # information from other keywords added else: for a in [radial_velocity, redshift]: if a is not None: raise ValueError("Cannot separately set redshift or " "radial_velocity if a SpectralAxis " "object is input to spectral_axis") self._spectral_axis = spectral_axis wcs = gwcs_from_array(self._spectral_axis) elif wcs is None: # If no spectral axis or wcs information is provided, initialize # with an empty gwcs based on the flux. size = len(flux) if not flux.isscalar else 1 wcs = gwcs_from_array(np.arange(size) * u.Unit("")) super(Spectrum1D, self).__init__( data=flux.value if isinstance(flux, u.Quantity) else flux, wcs=wcs, **kwargs) # If no spectral_axis was provided, create a SpectralCoord based on # the WCS if spectral_axis is None: # If spectral_axis wasn't provided, set _spectral_axis based on # the WCS spec_axis = self.wcs.pixel_to_world(np.arange(self.flux.shape[-1])) if spec_axis.unit.is_equivalent(u.one): spec_axis = spec_axis * u.pixel self._spectral_axis = SpectralAxis( spec_axis, redshift=redshift, radial_velocity=radial_velocity, doppler_rest=rest_value, doppler_convention=velocity_convention) if hasattr(self, 'uncertainty') and self.uncertainty is not None: if not flux.shape == self.uncertainty.array.shape: raise ValueError( "Flux axis ({}) and uncertainty ({}) shapes must be the " "same.".format(flux.shape, self.uncertainty.array.shape)) def __getitem__(self, item): """ Override the class indexer. We do this here because there are two cases for slicing on a ``Spectrum1D``: 1.) When the flux is one dimensional, indexing represents a single flux value at a particular spectral axis bin, and returns a new ``Spectrum1D`` where all attributes are sliced. 2.) When flux is multi-dimensional (i.e. several fluxes over the same spectral axis), indexing returns a new ``Spectrum1D`` with the sliced flux range and a deep copy of all other attributes. The first case is handled by the parent class, while the second is handled here. """ if len(self.flux.shape) > 1: return self._copy( flux=self.flux[item], uncertainty=self.uncertainty[item] if self.uncertainty is not None else None) if not isinstance(item, slice): item = slice(item, item+1, None) tmp_spec = super().__getitem__(item) # TODO: this is a workaround until we figure out how to deal with non- # strictly ascending spectral axes. Currently, the wcs is created from # a spectral axis array by converting to a length physical type. On # a regular slicing operation, the wcs is handed back to the # initializer and a new spectral axis is created. This would then also # be in length units, which may not be the units used initially. So, # we create a new ``Spectrum1D`` that includes the sliced spectral # axis. This means that a new wcs object will be created with the # appropriate unit translation handling. return tmp_spec._copy( spectral_axis=self.spectral_axis[item]) def _copy(self, **kwargs): """ Peform deep copy operations on each attribute of the ``Spectrum1D`` object. """ alt_kwargs = dict( flux=deepcopy(self.flux), spectral_axis=deepcopy(self.spectral_axis), uncertainty=deepcopy(self.uncertainty), wcs=deepcopy(self.wcs), mask=deepcopy(self.mask), meta=deepcopy(self.meta), unit=deepcopy(self.unit), velocity_convention=deepcopy(self.velocity_convention), rest_value=deepcopy(self.rest_value)) alt_kwargs.update(kwargs) return self.__class__(**alt_kwargs) @property def frequency(self): """ The frequency as a `~astropy.units.Quantity` in units of GHz """ return self.spectral_axis.to(u.GHz, u.spectral()) @property def wavelength(self): """ The wavelength as a `~astropy.units.Quantity` in units of Angstroms """ return self.spectral_axis.to(u.AA, u.spectral()) @property def energy(self): """ The energy of the spectral axis as a `~astropy.units.Quantity` in units of eV. """ return self.spectral_axis.to(u.eV, u.spectral()) @property def photon_flux(self): """ The flux density of photons as a `~astropy.units.Quantity`, in units of photons per cm^2 per second per spectral_axis unit """ flux_in_spectral_axis_units = self.flux.to( u.W * u.cm**-2 * self.spectral_axis.unit**-1, u.spectral_density(self.spectral_axis)) photon_flux_density = flux_in_spectral_axis_units / (self.energy / u.photon) return photon_flux_density.to(u.photon * u.cm**-2 * u.s**-1 * self.spectral_axis.unit**-1) @lazyproperty def bin_edges(self): return self.spectral_axis.bin_edges @property def shape(self): return self.flux.shape @property def redshift(self): """ The redshift(s) of the objects represented by this spectrum. May be scalar (if this spectrum's ``flux`` is 1D) or vector. Note that the concept of "redshift of a spectrum" can be ambiguous, so the interpretation is set to some extent by either the user, or operations (like template fitting) that set this attribute when they are run on a spectrum. """ return self.spectral_axis.redshift @redshift.setter def redshift(self, val): new_spec_coord = self.spectral_axis.with_redshift(val) self._spectral_axis = new_spec_coord @property def radial_velocity(self): """ The radial velocity(s) of the objects represented by this spectrum. May be scalar (if this spectrum's ``flux`` is 1D) or vector. Note that the concept of "RV of a spectrum" can be ambiguous, so the interpretation is set to some extent by either the user, or operations (like template fitting) that set this attribute when they are run on a spectrum. """ return self.spectral_axis.radial_velocity @radial_velocity.setter def radial_velocity(self, val): if val is not None: if not val.unit.is_equivalent(u.km/u.s): raise u.UnitsError("Radial velocity must be a velocity.") new_spectral_axis = self.spectral_axis.with_radial_velocity(val) self._spectral_axis = new_spectral_axis def __add__(self, other): if not isinstance(other, NDDataRef): other = u.Quantity(other, unit=self.unit) return self.add(other) def __sub__(self, other): if not isinstance(other, NDDataRef): other = u.Quantity(other, unit=self.unit) return self.subtract(other) def __mul__(self, other): if not isinstance(other, NDDataRef): other = u.Quantity(other) return self.multiply(other) def __div__(self, other): if not isinstance(other, NDDataRef): other = u.Quantity(other) return self.divide(other) def __truediv__(self, other): if not isinstance(other, NDDataRef): other = u.Quantity(other) return self.divide(other) def _format_array_summary(self, label, array): if len(array) == 1: mean = np.mean(array) s = "{:17} [ {:.5} ], mean={:.5}" return s.format(label+':', array[0], array[-1], mean) elif len(array) > 1: mean = np.mean(array) s = "{:17} [ {:.5}, ..., {:.5} ], mean={:.5}" return s.format(label+':', array[0], array[-1], mean) else: return "{:17} [ ], mean= n/a".format(label+':') def __str__(self): result = "Spectrum1D " # Handle case of single value flux if self.flux.ndim == 0: result += "(length=1)\n" return result + "flux: {}".format(self.flux) # Handle case of multiple flux arrays result += "(length={})\n".format(len(self.spectral_axis)) if self.flux.ndim > 1: for i, flux in enumerate(self.flux): label = 'flux{:2}'.format(i) result += self._format_array_summary(label, flux) + '\n' else: result += self._format_array_summary('flux', self.flux) + '\n' # Add information about spectral axis result += self._format_array_summary('spectral axis', self.spectral_axis) # Add information about uncertainties if available if self.uncertainty: result += "\nuncertainty: [ {}, ..., {} ]".format( self.uncertainty[0], self.uncertainty[-1]) return result def __repr__(self): inner_str = "flux={}, spectral_axis={}".format(repr(self.flux), repr(self.spectral_axis)) if self.uncertainty is not None: inner_str += ", uncertainty={}".format(repr(self.uncertainty)) result = "<Spectrum1D({})>".format(inner_str) return result