Source code for specutils.manipulation.resample

import logging
from abc import ABC, abstractmethod

from warnings import warn

import numpy as np
from astropy.nddata import StdDevUncertainty, VarianceUncertainty, \
    InverseVariance
from astropy.units import Quantity
from scipy.interpolate import CubicSpline

from ..spectra import Spectrum1D, SpectralAxis

__all__ = ['ResamplerBase', 'FluxConservingResampler',
           'LinearInterpolatedResampler', 'SplineInterpolatedResampler']


[docs]class ResamplerBase(ABC): """ Base class for resample classes. The algorithms and needs for difference resamples will vary quite a bit, so this class is relatively sparse. Parameters ---------- extrapolation_treatment : str What to do when resampling off the edge of the spectrum. Can be ``'nan_fill'`` to have points beyond the edges by set to NaN, or ``'zero_fill'`` to be set to zero. """ def __init__(self, extrapolation_treatment='nan_fill'): if extrapolation_treatment not in ('nan_fill', 'zero_fill'): raise ValueError('invalid extrapolation_treatment value: ' + str(extrapolation_treatment)) self.extrapolation_treatment = extrapolation_treatment
[docs] def __call__(self, orig_spectrum, fin_spec_axis): """ Return the resulting `~specutils.Spectrum1D` of the resampling. """ return self.resample1d(orig_spectrum, fin_spec_axis)
[docs] @abstractmethod def resample1d(self, orig_spectrum, fin_spec_axis): """ Workhorse method that will return the resampled Spectrum1D object. """ return NotImplemented
[docs]class FluxConservingResampler(ResamplerBase): """ This resampling algorithm conserves overall integrated flux (as opposed to flux density). Algorithm based on the equations documented in the following paper: https://ui.adsabs.harvard.edu/abs/2017arXiv170505165C/abstract Parameters ---------- extrapolation_treatment : str What to do when resampling off the edge of the spectrum. Can be ``'nan_fill'`` to have points beyond the edges by set to NaN, or ``'zero_fill'`` to be set to zero. Examples -------- To resample an input spectrum to a user specified spectral grid using a flux conserving algorithm: >>> import numpy as np >>> import astropy.units as u >>> from specutils import Spectrum1D >>> from specutils.manipulation import FluxConservingResampler >>> input_spectra = Spectrum1D( ... flux=np.array([1, 3, 7, 6, 20]) * u.mJy, ... spectral_axis=np.array([2, 4, 12, 16, 20]) * u.nm) >>> resample_grid = [1, 5, 9, 13, 14, 17, 21, 22, 23] *u.nm >>> fluxc_resample = FluxConservingResampler() >>> output_spectrum1D = fluxc_resample(input_spectra, resample_grid) # doctest: +IGNORE_OUTPUT """ def _resample_matrix(self, orig_spec_axis, fin_spec_axis): """ Create a re-sampling matrix to be used in re-sampling spectra in a way that conserves flux. This code was heavily influenced by Nick Earl's resample rough draft: nmearl@0ff6ef1. Parameters ---------- orig_spec_axis : SpectralAxis The original spectral axis array. fin_spec_axis : SpectralAxis The desired spectral axis array. Returns ------- resample_mat : ndarray An [[N_{fin_spec_axis}, M_{orig_spec_axis}]] matrix. """ # Lower bin and upper bin edges orig_edges = orig_spec_axis.bin_edges fin_edges = fin_spec_axis.bin_edges # I could get rid of these alias variables, # but it does add readability orig_low = orig_edges[:-1] fin_low = fin_edges[:-1] orig_upp = orig_edges[1:] fin_upp = fin_edges[1:] # Here's the real work in figuring out the bin overlaps # i.e., contribution of each original bin to the resampled bin l_inf = np.where(orig_low > fin_low[:, np.newaxis], orig_low, fin_low[:, np.newaxis]) l_sup = np.where(orig_upp < fin_upp[:, np.newaxis], orig_upp, fin_upp[:, np.newaxis]) resamp_mat = (l_sup - l_inf).clip(0) resamp_mat *= (orig_upp - orig_low) # set bins that don't overlap 100% with original bins # to zero by checking edges, and applying generated mask left_clip = np.where(fin_edges[:-1] - orig_edges[0] < 0, 0, 1) right_clip = np.where(orig_edges[-1] - fin_edges[1:] < 0, 0, 1) keep_overlapping_matrix = left_clip * right_clip resamp_mat *= keep_overlapping_matrix[:, np.newaxis] return resamp_mat.value
[docs] def resample1d(self, orig_spectrum, fin_spec_axis): """ Create a re-sampling matrix to be used in re-sampling spectra in a way that conserves flux. If an uncertainty is present in the input spectra it will be propagated through to the final resampled output spectra as an InverseVariance uncertainty. Parameters ---------- orig_spectrum : `~specutils.Spectrum1D` The original 1D spectrum. fin_spec_axis : Quantity The desired spectral axis array. Returns ------- resample_spectrum : `~specutils.Spectrum1D` An output spectrum containing the resampled `~specutils.Spectrum1D` """ # Check if units on original spectrum and new wavelength (if defined) # match if isinstance(fin_spec_axis, Quantity): if orig_spectrum.spectral_axis.unit != fin_spec_axis.unit: raise ValueError("Original spectrum spectral axis grid and new" "spectral axis grid must have the same units.") if not isinstance(fin_spec_axis, SpectralAxis): fin_spec_axis = SpectralAxis(fin_spec_axis) # todo: Would be good to return uncertainty in type it was provided? # todo: add in weighting options # Get provided uncertainty into variance if orig_spectrum.uncertainty is not None: if isinstance(orig_spectrum.uncertainty, StdDevUncertainty): pixel_uncer = np.square(orig_spectrum.uncertainty.array) elif isinstance(orig_spectrum.uncertainty, VarianceUncertainty): pixel_uncer = orig_spectrum.uncertainty.array elif isinstance(orig_spectrum.uncertainty, InverseVariance): pixel_uncer = np.reciprocal(orig_spectrum.uncertainty.array) else: pixel_uncer = None orig_axis_in_fin = orig_spectrum.spectral_axis.to(fin_spec_axis.unit) resample_grid = self._resample_matrix(orig_axis_in_fin, fin_spec_axis) # Now for some broadcasting magic to handle multi dimensional flux inputs # Essentially this part is inserting length one dimensions as fillers # For example, if we have a (5,6,10) input flux, and an output grid # of 3, flux will be broadcast to (5,6,1,10) and resample_grid will # Be broadcast to (1,1,3,10). The sum then reduces down the 10, the # original dispersion grid, leaving 3, the new dispersion grid, as # the last index. new_flux_shape = list(orig_spectrum.flux.shape) new_flux_shape.insert(-1, 1) in_flux = orig_spectrum.flux.reshape(new_flux_shape) ones = [1] * len(orig_spectrum.flux.shape[:-1]) new_shape_resample_grid = ones + list(resample_grid.shape) resample_grid = resample_grid.reshape(new_shape_resample_grid) # Calculate final flux out_flux = np.sum(in_flux * resample_grid, axis=-1) / np.sum( resample_grid, axis=-1) # Calculate output uncertainty if pixel_uncer is not None: pixel_uncer = pixel_uncer.reshape(new_flux_shape) out_variance = np.sum(pixel_uncer * resample_grid**2, axis=-1) / np.sum( resample_grid**2, axis=-1) out_uncertainty = InverseVariance(np.reciprocal(out_variance)) else: out_uncertainty = None # nan-filling happens by default - replace with zeros if requested: if self.extrapolation_treatment == 'zero_fill': origedges = orig_spectrum.spectral_axis.bin_edges off_edges = (fin_spec_axis < origedges[0]) | (origedges[-1] < fin_spec_axis) out_flux[off_edges] = 0 if out_uncertainty is not None: out_uncertainty.array[off_edges] = 0 # todo: for now, use the units from the pre-resampled # spectra, although if a unit is defined for fin_spec_axis and it doesn't # match the input spectrum it won't work right, will have to think # more about how to handle that... could convert before and after # calculation, which is probably easiest. Matrix math algorithm is # geometry based, so won't work to just let quantity math handle it. resampled_spectrum = Spectrum1D(flux=out_flux, spectral_axis=np.array(fin_spec_axis) * orig_spectrum.spectral_axis.unit, uncertainty=out_uncertainty) return resampled_spectrum
[docs]class LinearInterpolatedResampler(ResamplerBase): """ Resample a spectrum onto a new ``spectral_axis`` using linear interpolation. Parameters ---------- extrapolation_treatment : str What to do when resampling off the edge of the spectrum. Can be ``'nan_fill'`` to have points beyond the edges by set to NaN, or ``'zero_fill'`` to be set to zero. Examples -------- To resample an input spectrum to a user specified dispersion grid using linear interpolation: >>> import numpy as np >>> import astropy.units as u >>> from specutils import Spectrum1D >>> from specutils.manipulation import LinearInterpolatedResampler >>> input_spectra = Spectrum1D( ... flux=np.array([1, 3, 7, 6, 20]) * u.mJy, ... spectral_axis=np.array([2, 4, 12, 16, 20]) * u.nm) >>> resample_grid = [1, 5, 9, 13, 14, 17, 21, 22, 23] * u.nm >>> fluxc_resample = LinearInterpolatedResampler() >>> output_spectrum1D = fluxc_resample(input_spectra, resample_grid) # doctest: +IGNORE_OUTPUT """ def __init__(self, extrapolation_treatment='nan_fill'): super().__init__(extrapolation_treatment)
[docs] def resample1d(self, orig_spectrum, fin_spec_axis): """ Call interpolation, repackage new spectra Parameters ---------- orig_spectrum : `~specutils.Spectrum1D` The original 1D spectrum. fin_spec_axis : ndarray The desired spectral axis array. Returns ------- resample_spectrum : `~specutils.Spectrum1D` An output spectrum containing the resampled `~specutils.Spectrum1D` """ fill_val = np.nan # bin_edges=nan_fill case if self.extrapolation_treatment == 'zero_fill': fill_val = 0 orig_axis_in_fin = orig_spectrum.spectral_axis.to(fin_spec_axis.unit) out_flux_arr = np.interp(fin_spec_axis.value, orig_axis_in_fin.value, orig_spectrum.flux.value, left=fill_val, right=fill_val) out_flux = Quantity(out_flux_arr, unit=orig_spectrum.flux.unit) new_unc = None if orig_spectrum.uncertainty is not None: out_unc_arr = np.interp(fin_spec_axis.value, orig_axis_in_fin.value, orig_spectrum.uncertainty.array, left=fill_val, right=fill_val) new_unc = orig_spectrum.uncertainty.__class__(array=out_unc_arr, unit=orig_spectrum.unit) return Spectrum1D(spectral_axis=fin_spec_axis, flux=out_flux, uncertainty=new_unc)
[docs]class SplineInterpolatedResampler(ResamplerBase): """ This resample algorithim uses a cubic spline interpolator. Any uncertainty is also interpolated using an identical spline. Parameters ---------- extrapolation_treatment : str What to do when resampling off the edge of the spectrum. Can be ``'nan_fill'`` to have points beyond the edges by set to NaN, or ``'zero_fill'`` to be set to zero. Examples -------- To resample an input spectrum to a user specified spectral axis grid using a cubic spline interpolator: >>> import numpy as np >>> import astropy.units as u >>> from specutils import Spectrum1D >>> from specutils.manipulation import SplineInterpolatedResampler >>> input_spectra = Spectrum1D( ... flux=np.array([1, 3, 7, 6, 20]) * u.mJy, ... spectral_axis=np.array([2, 4, 12, 16, 20]) * u.nm) >>> resample_grid = [1, 5, 9, 13, 14, 17, 21, 22, 23] * u.nm >>> fluxc_resample = SplineInterpolatedResampler() >>> output_spectrum1D = fluxc_resample(input_spectra, resample_grid) # doctest: +IGNORE_OUTPUT """ def __init__(self, bin_edges='nan_fill'): super().__init__(bin_edges)
[docs] def resample1d(self, orig_spectrum, fin_spec_axis): """ Call interpolation, repackage new spectra Parameters ---------- orig_spectrum : `~specutils.Spectrum1D` The original 1D spectrum. fin_spec_axis : Quantity The desired spectral axis array. Returns ------- resample_spectrum : `~specutils.Spectrum1D` An output spectrum containing the resampled `~specutils.Spectrum1D` """ orig_axis_in_new = orig_spectrum.spectral_axis.to(fin_spec_axis.unit) flux_spline = CubicSpline(orig_axis_in_new.value, orig_spectrum.flux.value, extrapolate=self.extrapolation_treatment != 'nan_fill') out_flux_val = flux_spline(fin_spec_axis.value) new_unc = None if orig_spectrum.uncertainty is not None: unc_spline = CubicSpline(orig_axis_in_new.value, orig_spectrum.uncertainty.array, extrapolate=self.extrapolation_treatment != 'nan_fill') out_unc_val = unc_spline(fin_spec_axis.value) new_unc = orig_spectrum.uncertainty.__class__(array=out_unc_val, unit=orig_spectrum.unit) if self.extrapolation_treatment == 'zero_fill': origedges = orig_spectrum.spectral_axis.bin_edges off_edges = (fin_spec_axis < origedges[0]) | (origedges[-1] < fin_spec_axis) out_flux_val[off_edges] = 0 if new_unc is not None: new_unc.array[off_edges] = 0 return Spectrum1D(spectral_axis=fin_spec_axis, flux=out_flux_val*orig_spectrum.flux.unit, uncertainty=new_unc)