Source code for pyratbay.spectrum.spec_tools

# Copyright (c) 2021-2025 Cubillos & Blecic
# Pyrat Bay is open-source software under the GPL-2.0 license (see LICENSE)

__all__ = [
    'PassBand',
    'Tophat',
    'constant_resolution_spectrum',
    'bin_spectrum',
    'tophat',
    'resample',
    'band_integrate',
    'wn_mask',
    'inst_convolution',
    'rv_shift',
]

from collections.abc import Iterable
import operator
import os

import numpy as np
import scipy.interpolate as si
from scipy.signal import convolve
from scipy.signal.windows import gaussian

from .. import constants as pc
from .. import io as io


counting_types = ['photon', 'energy']


[docs] class PassBand(): """ A Filter passband object, typically used for photometric filters. """ def __init__(self, filter_file, wl=None, wn=None, counting_type='photon'): """ Parameters ---------- filter_file: String Path to filter file containing wavelength (um) and passband response function in two columns. wl: 1D float array Wavelength array at which evaluate the passband response's in micron units. (only one of wl or wn should be provided on call) wn: 1D float array Wavenumber (cm-1) at which the filter is intended to be evalulated. counting_type: String Detector counting type (i.e., the response function units), choose between 'photon' (default) or 'energy'. Examples -------- >>> import pyratbay.spectrum as ps >>> import pyratbay.constants as pc >>> import matplotlib.pyplot as plt >>> import numpy as np >>> >>> # Test on a blackbody spectrum >>> wl = np.linspace(1.0, 10.0, 10000) >>> hj_spectrum = ps.bbflux(1e4/wl, 2100.0) >>> # Create a Spitzer/IRAC2 band and integrate spectrum over it >>> band = ps.PassBand(f'{pc.FILTERS}spitzer_irac2.dat', wl=wl) >>> band_flux = band(hj_spectrum) >>> >>> plt.figure(0) >>> plt.clf() >>> plt.plot(wl, hj_spectrum, c='black') >>> plt.plot(band.wl0, band_flux, 'o', c='royalblue') >>> plt.plot(band.wl, band.response*5e7, c='0.7') >>> plt.ylim(bottom=0.0) >>> >>> # Now, we can re-evaluate for a bunch of spectra >>> temps = np.arange(900, 2500.0, 150.0) >>> spectra = [ps.bbflux(1e4/wl, temp) for temp in temps] >>> fluxes = [band(spectrum) for spectrum in spectra] >>> >>> plt.figure(1) >>> plt.clf() >>> for i,temp in enumerate(temps): >>> color = plt.cm.viridis(i/11) >>> plt.plot(wl, spectra[i], c=color) >>> plt.plot(band.wl0, fluxes[i], 'o', c=color) >>> plt.plot(band.wl, band.response*2e7, c='0.7', zorder=-1) >>> plt.xscale('log') >>> plt.ylim(bottom=0.0) >>> plt.xlim(1.0, 10.0) """ self.name = os.path.splitext(os.path.basename(filter_file))[0] if counting_type not in counting_types: error = f"Invalid 'counting_type', must be one of {counting_types}" raise ValueError(error) self.counting_type = counting_type # Read filter wavenumber and transmission curves: filter_file = filter_file.replace('{ROOT}', pc.ROOT) self.filter_file = os.path.realpath(filter_file) input_wl, input_response = io.read_spectrum(self.filter_file, wn=False) self.wl0 = np.sum(input_wl*input_response) / np.sum(input_response) input_wn = 1.0 / (input_wl * pc.um) self.wn0 = 1.0 / (self.wl0 * pc.um) # Sort it in increasing wavenumber order, store it: wn_sort = np.argsort(input_wn) self.input_response = input_response[wn_sort] self.input_wn = input_wn[wn_sort] self.input_wl = input_wl[wn_sort] self.response = np.copy(input_response[wn_sort]) self.wn = np.copy(input_wn[wn_sort]) self.wl = 1.0 / (self.wn * pc.um) # Resample the filters into the planet wavenumber array: if wn is not None or wl is not None: self.set_sampling(wl, wn)
[docs] def set_sampling(self, wl=None, wn=None): """ Interpolate filter response function at specified spectral array. The response funciton is normalized such that the integral over wavenumber equals one. Parameters ---------- wl: 1D float array Wavelength array at which evaluate the passband response's in micron units (only one of wl or wn should be provided on call) wn: 1D float array Wavenumber array at which evaluate the passband response's in cm-1 units (only one of wl or wn should be provided on call) Defines ------- self.response Normalized interpolated response function self.idx IndicesWavenumber indices self.wn Passband's wavenumber array self.wl Passband's wavelength array Returns ------- out_wave: 1D float array Same as self.wl or self.wn depending on the input argument. out_response: 1D float array Same as self.response Examples -------- >>> # See examples in help(ps.PassBand.__init__) """ if not operator.xor(wl is None, wn is None): error = 'Either provide wavelength or wavenumber array, not both' raise ValueError(error) input_is_wl = wn is None if input_is_wl: wn = 1.0 / (wl*pc.um) sign = np.sign(np.ediff1d(wn)) if not (np.all(sign == 1) or np.all(sign == -1)): raise ValueError( 'Input wavelength/wavenumber array must be strictly ' 'increasing or decreasing' ) response, wn_idx = resample(self.input_response, self.input_wn, wn) # Internally, wavenumber is always monotonically increasing wn_sort = np.argsort(wn[wn_idx]) response = response[wn_sort] self.wn = wn[wn_idx][wn_sort] self.wl = 1.0 / (self.wn * pc.um) self.idx = wn_idx[wn_sort] # Normalize response function: max(response) == 1.0 self.response = response / np.amax(response) # Scaling factor such that integ(response)dwn = 1.0 if self.counting_type == 'photon': self.height = 1.0 / np.trapezoid(self.response*self.wl, self.wn) elif self.counting_type == 'energy': self.height = 1.0 / np.trapezoid(self.response, self.wn) if input_is_wl: out_wave = self.wl else: out_wave = self.wn return out_wave, self.response
[docs] def integrate(self, spectrum, wl=None, wn=None): """ Integrate a spectral function over the passband. Parameters ---------- spectrum: 1D float array Spectral function to be band-integrated. The spectral sampling of this must be set at initialization or with the set_spectrum() method. Otherwise, it can be set at run time with the wn or wl arguments. wn: 1D float array (optional) wavenumber array (cm-1) over which spectrum is sampled. wl: 1D float array (optional) wavelength array (um) over which spectrum is sampled. Returns ------- bandflux: Float Band-integrated value of the spectral function. """ if not hasattr(self, 'idx'): raise ValueError( "The passband's spectral sampling has not been defined yet. " "Need to call set_sampling() method or initialize with an " "input wavelength sampling" ) if wl is not None or wn is not None: self.set_sampling(wl, wn) if self.counting_type == 'energy': band_integ = np.trapezoid( spectrum[self.idx]*self.response, self.wn, ) else: band_integ = np.trapezoid( self.wl*spectrum[self.idx]*self.response, self.wn, ) return band_integ * self.height
def __call__(self, spectrum, wl=None, wn=None): return self.integrate(spectrum, wl, wn)
[docs] def save_filter(self, save_file): """ Write filter response function data to file, into two columns the wavelength (um) and the response function. Parameters ---------- save_file: String File where to save the filter data. """ io.write_spectrum(self.wl, self.response, save_file, type='filter')
def __repr__(self): return f"pyratbay.spectrum.PassBand('{self.filter_file}')" def __str__(self): if self.__class__ is Tophat: filter_file = '' else: filter_file = f"file = {repr(self.filter_file)}\n" return ( filter_file + f"name = {repr(self.name)}\n" f"central_wl = {self.wl0}\n" f"band_wl = {self.wl}\n" f"band_wn = {self.wn}\n" f"response = {self.response}\n" )
[docs] class Tophat(PassBand): """ A Filter passband object with a tophat-shaped passband. """ def __init__( self, wl0, half_width, name='tophat', wl=None, wn=None, counting_type='photon', ignore_gaps=False, ): """ Parameters ---------- wl0: Float The passband's central wavelength (um units). half_width: Float The passband's half-width (um units). name: Str A user-defined name for the filter when calling str(self), e.g., to identify the instrument provenance of this filter. wl: 1D float array Wavelength array at which evaluate the passband response's in micron units (only one of wl or wn should be provided on call) wn: 1D float array Wavenumber (cm-1) at which the filter is intended to be evalulated. counting_type: String Detector counting type (i.e., the response function units), choose between 'photon' (default) or 'energy'. ignore_gaps: Bool If True and there are no points inside the band, set the idx, wn, wl, and response variables to None. A ps.bin_spectrum() call on such band will return np.nan. Otherwise the code will throw a ValueError. Examples -------- >>> import pyratbay.spectrum as ps >>> import matplotlib.pyplot as plt >>> import numpy as np >>> >>> # Test on a blackbody spectrum >>> wl = np.linspace(1.0, 10.0, 10000) >>> hj_spectrum = ps.bbflux(1e4/wl, 2100.0) >>> # Create a top-hat passband and integrate spectrum >>> band = ps.Tophat(wl0=4.5, half_width=0.1, wl=wl) >>> band_flux = band(hj_spectrum) >>> >>> plt.figure(0) >>> plt.clf() >>> plt.plot(wl, hj_spectrum, c='black') >>> plt.plot(band.wl0, band_flux, 'o', c='royalblue') >>> plt.plot(band.wl, band.response*5e6, c='0.7') >>> plt.ylim(bottom=0.0) >>> >>> # Now, we can re-evaluate for a bunch of spectra >>> temps = np.arange(900, 2500.0, 150.0) >>> spectra = [ps.bbflux(1e4/wl, temp) for temp in temps] >>> fluxes = [band(spectrum) for spectrum in spectra] >>> >>> plt.figure(1) >>> plt.clf() >>> for i,temp in enumerate(temps): >>> color = plt.cm.viridis(i/11) >>> plt.plot(wl, spectra[i], c=color) >>> plt.plot(band.wl0, fluxes[i], 'o', c=color) >>> plt.plot(band.wl, band.response*2e6, c='0.7', zorder=-1) >>> plt.xscale('log') >>> plt.ylim(bottom=0.0) >>> plt.xlim(1.0, 10.0) """ self.wl0 = wl0 self.half_width = half_width # Read filter wavenumber and transmission curves: self.wn0 = 1.0 / (self.wl0 * pc.um) self.ignore_gaps = ignore_gaps self.name = name self.counting_type = counting_type # Resample the filters into the planet wavenumber array: if wn is not None or wl is not None: self.set_sampling(wl, wn)
[docs] def set_sampling(self, wl=None, wn=None): """ Interpolate filter response function at specified spectral array. The response function is normalized such that the integral over wavenumber equals one. Parameters ---------- wl: 1D float array Wavelength array at which evaluate the passband response's in um units (only one of wl or wn should be provided on call) wn: 1D float array Wavenumber array at which evaluate the passband response's in cm-1 units (only one of wl or wn should be provided on call) Defines ------- self.idx Wavenumber indices self.wn Passband's wavenumber array self.wl Passband's wavelength array self.response Normalized interpolated response function Returns ------- out_wave: 1D float array Same as self.wl or self.wn depending on the input argument. out_response: 1D float array Same as self.response Examples -------- >>> # See examples in help(ps.Tophat.__init__) """ if not operator.xor(wl is None, wn is None): error = 'Either provide wavelength or wavenumber array, not both' raise ValueError(error) input_is_wl = wn is None if input_is_wl: wn = 1.0 / (wl*pc.um) sign = np.sign(np.ediff1d(wn)) if not (np.all(sign == 1) or np.all(sign == -1)): raise ValueError( 'Input wavelength/wavenumber array must be strictly ' 'increasing or decreasing' ) sign = sign[0] nwave = len(wn) wl_low = self.wl0 - self.half_width wl_high = self.wl0 + self.half_width wn_low = 1.0 / (wl_high*pc.um) wn_high = 1.0 / (wl_low*pc.um) idx = (wn >= wn_low) & (wn <= wn_high) indices = np.where(idx)[0] if len(indices) == 0: if self.ignore_gaps: self.idx = None self.response = None self.wn = None self.wl = None return None, None raise ValueError( f"Tophat() passband at wl0 = {self.wl0:.3f} um does not " "cover any spectral point" ) # One spectral point as margin: idx_first = indices[0] idx_last = indices[-1] + 1 if idx_first > 0: idx_first -= 1 if idx_last < nwave: idx_last += 1 if sign < 0.0: self.idx = np.flip(np.arange(idx_first, idx_last)) else: self.idx = np.arange(idx_first, idx_last) self.wn = wn[self.idx] self.wl = 1.0 / (self.wn * pc.um) # Normalize response function: max(response) == 1.0 self.response = np.array(idx[self.idx], np.double) # Scaling factor such that integ(response)dwn = 1.0 if self.counting_type == 'photon': self.height = 1.0 / np.trapezoid(self.response*self.wl, self.wn) elif self.counting_type == 'energy': self.height = 1.0 / np.trapezoid(self.response, self.wn) if input_is_wl: out_wave = self.wl else: out_wave = self.wn return out_wave, self.response
def __repr__(self): return f'pyratbay.spectrum.Tophat({self.wl0}, {self.half_width})' def __str__(self): return f'{self.name}_{self.wl0}um'
[docs] def constant_resolution_spectrum(wave_min, wave_max, resolution): """ Compute a constant resolving-power sampling array. Parameters ---------- wave_min: Float Lower spectral boundary. This could be either a wavelength or a wavenumber. This is agnositc of units. wave_max: Float Upper spectral boundary. This could be either a wavelength or a wavenumber. This is agnositc of units. resolution: Float The sampling resolving power: R = wave / delta_wave. Returns ------- wave: 1D float array A spectrum array with the given resolving power. Examples -------- >>> import numpy as np >>> import pyratbay.spectrum as ps >>> # A low-resolution wavelength sampling: >>> wl_min = 0.5 >>> wl_max = 4.0 >>> resolution = 5.5 >>> wl = ps.constant_resolution_spectrum(wl_min, wl_max, resolution) >>> print(wl) [0.5 0.6 0.72 0.864 1.0368 1.24416 1.492992 1.7915904 2.14990848 2.57989018 3.09586821 3.71504185] >>> # The actual resolution matches the input: >>> wl_mean = 0.5*(wl[1:]+wl[:-1]) >>> print(wl_mean/np.ediff1d(wl)) [5.5 5.5 5.5 5.5 5.5 5.5 5.5 5.5 5.5 5.5 5.5] """ f = 0.5 / resolution g = (1.0+f) / (1.0-f) nwave = int(np.ceil(-np.log(wave_min/wave_max) / np.log(g))) wave = wave_min * g**np.arange(nwave) return wave
[docs] def bin_spectrum(bin_wl, wl, spectrum, half_widths=None, gaps=None): """ Bin down a spectrum. Parameters ---------- bin_wl: 1D float array Central wavelength (um) of the desired binned down spectra. wl: 1D float array Wavelength samples of the original spectrum. spectrum: 1D float array Spectral values to be binned down. half_widths: 1D float array The bin half widths (um). If None, assume that the bin edges are at the mid-points of the bin_wl array. gaps: String If None (default) and there are bins that do not cover any value, (e.g., when the resolution of wl is similar to bin_wl's), raise error. If gaps=='ignore', patch those bins with np.nan values. If gaps=='interpolate', patch those bins linearly interpolating. Use with care. Returns ------- bin_spectrum: 1D float array The binned spectrum. Notes ----- Probably bad things will happen if bin_wl has a similar or coarser resolution than wl. Examples -------- >>> import pyratbay.spectrum as ps >>> import numpy as np >>> import matplotlib.pyplot as plt >>> # Make a noisy high-resolution signal >>> wl = ps.constant_resolution_spectrum(1.0, 3.0, resolution=5000) >>> spectrum = np.sin(3.14*wl) + np.random.normal(1.5, 0.1, len(wl)) >>> # Bin it down: >>> bin_wl = ps.constant_resolution_spectrum(1.0, 3.0, resolution=125) >>> bin_spectrum = ps.bin_spectrum(bin_wl, wl, spectrum) >>> # Compare original and binned signals >>> plt.figure(0) >>> plt.clf() >>> plt.plot(wl, spectrum, '.', ms=2, color='gray') >>> plt.plot(bin_wl, bin_spectrum, color='red') """ if gaps is not None and gaps not in ['interpolate', 'ignore']: raise ValueError("Invalid value for 'gaps' argument") if half_widths is None: half_widths = np.ediff1d(bin_wl, 0, 0) half_widths[0] = half_widths[1] half_widths[-1] = half_widths[-2] half_widths /= 2.0 ignore_gaps = gaps is not None bands = [ Tophat(wl0, half_width, wl=wl, ignore_gaps=ignore_gaps) for wl0, half_width in zip(bin_wl, half_widths) ] nbands = len(bands) band_flux = np.zeros(nbands) for i,band in enumerate(bands): if band.idx is None: band_flux[i] = np.nan else: band_flux[i] = band(spectrum) # Patch gaps if requested and needed: mask = np.isnan(band_flux) if gaps == 'interpolate': flux_interp = np.interp(bin_wl[mask], bin_wl[~mask], band_flux[~mask]) band_flux[mask] = flux_interp return band_flux
[docs] def tophat(wl0, width, margin=None, dlambda=None, resolution=None, ffile=None): r""" Generate a top-hat filter function, with transmission = 1.0 from wl0-width/2 to wl0+width/2, and an extra margin with transmission = 0.0 at each end. Parameters ---------- ffile: String Name of the output file. wl0: Float Filter central wavelength in microns. width: Float Filter width in microns. margin: Float Margin (in microns) with zero-valued transmission, to append at each end of the filter. dlambda: Float Spectral sampling rate in microns. resolution: Float Spectral sampling resolution (used if dlambda is None). ffile: String If not None, save filter to file. Examples -------- >>> import pyratbay.spectrum as ps >>> wl0 = 1.50 >>> width = 0.50 >>> margin = 0.10 >>> dlambda = 0.05 >>> wl, trans = ps.tophat(wl0, width, margin, dlambda) >>> print(wl, trans, sep='\n') [1.15 1.2 1.25 1.3 1.35 1.4 1.45 1.5 1.55 1.6 1.65 1.7 1.75 1.8 1.85] [0. 0. 0. 1. 1. 1. 1. 1. 1. 1. 1. 1. 0. 0. 0.] """ if margin is None: margin = 0.1 * width if dlambda is None and resolution is None: raise ValueError('Either dlambda or resolution must be defined.') # Wavelength array: wl_low = wl0 - 0.5*width - margin wl_high = wl0 + 0.5*width + margin if dlambda is not None: wl = np.arange(wl_low, wl_high, dlambda) else: f = 0.5 / resolution g = (1.0-f) / (1.0+f) imax = int(np.ceil(np.log(wl_low/wl_high) / np.log(g))) + 1 dwl = wl_high * g**np.arange(imax) wl = 0.5 * np.flip(dwl[1:] + dwl[:-1], axis=0) transmission = np.array(np.abs(wl-wl0) < 0.5*width, np.double) if ffile is not None: io.write_spectrum(wl, transmission, ffile, type='filter') return wl, transmission
[docs] def resample(signal, wn, specwn, normalize=False): r""" Resample signal from wn to specwn wavenumber sampling using a linear interpolation. Parameters ---------- signal: 1D ndarray A spectral signal sampled at wn. wn: 1D ndarray Signal's wavenumber sampling, in cm-1 units. specwn: 1D ndarray Wavenumber sampling to resample into, in cm-1 units. normalize: Bool If True, normalized the output resampled signal to integrate to 1.0 (note that a normalized curve when specwn is a decreasing function results in negative values for resampled). Returns ------- resampled: 1D ndarray The interpolated signal. wnidx: 1D ndarray The indices of specwn covered by the input signal. Examples -------- >>> import pyratbay.spectrum as ps >>> import numpy as np >>> wn = np.linspace(1.3, 1.7, 11) >>> signal = np.array(np.abs(wn-1.5)<0.1, np.double) >>> specwn = np.linspace(1, 2, 51) >>> resampled, wnidx = ps.resample(signal, wn, specwn) >>> print(wnidx, specwn[wnidx], resampled, sep='\n') [16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34] [1.32 1.34 1.36 1.38 1.4 1.42 1.44 1.46 1.48 1.5 1.52 1.54 1.56 1.58 1.6 1.62 1.64 1.66 1.68] [0. 0. 0. 0. 0.5 1. 1. 1. 1. 1. 1. 1. 1. 1. 0.5 0. 0. 0. 0. ] """ if np.amin(wn) < np.amin(specwn) or np.amax(wn) > np.amax(specwn): raise ValueError( "Resampling signal's wavenumber is not contained in specwn." ) # Indices in the spectrum wavenumber array included in the band # wavenumber range: wnidx = np.where((specwn < np.amax(wn)) & (np.amin(wn) < specwn))[0] # Spline-interpolate: resampled = si.interp1d(wn,signal)(specwn[wnidx]) if normalize: resampled /= np.trapezoid(resampled, specwn[wnidx]) # Return the normalized interpolated filter and the indices: return resampled, wnidx
[docs] def band_integrate(spectrum, specwn, bandtrans, bandwn): """ Integrate a spectrum over the band transmission. Parameters ---------- spectrum: 1D float iterable Spectral signal to be integrated. specwn: 1D float iterable Wavenumber of spectrum in cm-1. bandtrans: 1D float iterable List of normalized interpolated band transmission values in each filter. bandwn: 1D float iterable Returns ------- bflux: 1D float list Band-integrated values. Examples -------- >>> import numpy as np >>> import matplotlib.pyplot as plt >>> import pyratbay.io as io >>> import pyratbay.spectrum as ps >>> import pyratbay.constants as pc >>> # Load Spitzer IRAC filters: >>> wn1, irac1 = io.read_spectrum(pc.FILTERS+'spitzer_irac1.dat') >>> wn2, irac2 = io.read_spectrum(pc.FILTERS+'spitzer_irac2.dat') >>> # Spectrum to integrate: >>> wn = np.arange(1500, 5000.1, 1.0) >>> sflux = ps.bbflux(wn, 1800.0) >>> # Integrate over single filter: >>> bandflux = ps.band_integrate(sflux, wn, irac1, wn1) >>> # Integrate over multiple: >>> bandfluxes = ps.band_integrate(sflux, wn, [irac1,irac2], [wn1, wn2]) >>> # Plot the results: >>> meanwn = [np.mean(wn1), np.mean(wn2)] >>> width = 0.5*(np.amax(wn1)-np.amin(wn1)), 0.5*(np.amax(wn2)-np.amin(wn2)) >>> plt.figure(1) >>> plt.clf() >>> plt.semilogy(wn, sflux, 'k') >>> plt.plot(wn1, (irac1+1)*4e4, 'red') >>> plt.plot(wn2, (irac2+1)*4e4, 'blue') >>> plt.errorbar(meanwn[0], bandflux, xerr=width[0], fmt='o', color='red') >>> plt.errorbar(meanwn, bandfluxes, xerr=width, fmt='o', color='none', >>> mec='k', ecolor='k') >>> plt.xlim(np.amin(wn), np.amax(wn)) >>> plt.ylim(4e4, 1.2e5) >>> plt.xlabel('Wavenumber (cm$^{-1}$)') >>> plt.ylabel(r'Flux (erg s$^{-1}$ cm$^{-2}$ cm)') """ if not isinstance(bandtrans[0], Iterable): bandtrans = [bandtrans] bandwn = [bandwn] bflux = [] for btrans, wn in zip(bandtrans, bandwn): # Resample bandpasses into spectrum wavenumber sampling: resampled, wnidx = resample(btrans, wn, specwn, normalize=True) # Band-integrate spectrum: bflux.append(np.trapezoid(spectrum[wnidx]*resampled, specwn[wnidx])) return bflux
[docs] def wn_mask(wn, wn_min, wn_max, tol=1.0e-8): """ Get mask of wn values withing given ranges with a extra tolerance to account for floating-point (in)precision. Parameters ---------- wn: 1D float array Wavenumber array to mask. wn_min: float Minumum wavenumber in mask. wn_max: float Maximum wavenumber in mask. tol: float Tolerance factor at mask edges, calculated as delta_wn*tol, where delta_wn is the sampling stepsize at the edges. Returns ------- wn_mask: 1D bool array Mask of wavenumber values within ranges. """ # Get sampling at edges wn_mask = (wn >= wn_min) & (wn <= wn_max) if np.sum(wn_mask) < 2: min_dwn = max_dwn = 0 else: min_dwn = np.abs(np.ediff1d(wn[wn_mask][0:2])) max_dwn = np.abs(np.ediff1d(wn[wn_mask][-2:])) # Add a tolerance padding to mask: wn_mask = ( (wn >= wn_min - min_dwn*tol) & (wn <= wn_max + max_dwn*tol) ) return wn_mask
[docs] def inst_convolution(wl, spectrum, resolution, sampling_res=None): """ Convolve a spectrum according to an instrumental resolving power Parameters ---------- wl: 1D float array Spectral array (can be either wavelength or wavenumber). spectrum: 1D float array Full-resolution spectrum to be convolved. resolution: float Instrumental resolving power R = lambda/delta_lambda Where delta_lambda is the FHWM of the gaussian to be applied. sampling_red: float Sampling resolution of the input wl spectrum. Examples -------- >>> import pyratbay.spectrum as ps >>> import numpy as np >>> import matplotlib.pyplot as plt >>> wl_min = 1.499 >>> wl_max = 1.501 >>> samp_resolution = 100_000 >>> wl = ps.constant_resolution_spectrum(wl_min, wl_max, samp_resolution) >>> nwave = len(wl) >>> # A delta at wl0 ~ 1.5 >>> spectrum = np.zeros(nwave) >>> spectrum[np.where(wl>1.5)[0][0]] = 1.0 >>> wl0 = wl[np.where(wl>1.5)[0][0]] >>> resolution = 5_000 >>> conv = ps.inst_convolution(wl, spectrum, resolution) >>> # Plot convolved line and expected FWHM >>> half_max = np.amax(conv)/2 >>> hwhm = 0.5 * wl0 / resolution >>> plt.figure(0) >>> plt.clf() >>> plt.plot(wl, conv, color='salmon', lw=2) >>> plt.plot([wl0-hwhm, wl0+hwhm], [half_max,half_max], color='xkcd:blue', lw=2) """ pixel_dv = pc.c / resolution / 1e5 n_el = int(6*pixel_dv) + 1 kernel = gaussian(n_el, std=(pixel_dv / 2.355)) kernel /= np.sum(kernel) if sampling_res is None: dv = pc.c/1e5 * np.ediff1d(wl) / wl[:-1] rv_pix = np.abs(np.mean(dv)) else: rv_pix = np.abs(pc.c/1e5 / sampling_res) n_rv0 = int(((n_el - 1) / 2) / rv_pix) rv_array = np.arange(-(n_el - 1) / 2, (n_el - 1) / 2 + 1, 1) rv_array_mod = np.linspace(-n_rv0*rv_pix, n_rv0*rv_pix, int(2*n_rv0+1)) csscaled = si.splrep(rv_array, kernel) ker_conv_pix = si.splev(rv_array_mod, csscaled, der=0) ker_conv_pix /= sum(ker_conv_pix) rconv = convolve(spectrum, ker_conv_pix, mode="same") return rconv
[docs] def rv_shift(vel_km, wn=None, wl=None): """ Apply a radial velocity Doppler shift to a 1D wavelength array. Parameters ---------- vel_km: Float Radial velocity in km/s. wn: 1D float array Wavenumber array. wl: 1D float array Wavelength array. Returns ------- wave: 1D float array Doppler-shifted wavenumber of wavelebngth array """ vel = vel_km * pc.km # Convert velocity from km/s to cm/s if wn is not None: doppler_factor = np.sqrt((1 - vel / pc.c) / (1 + vel / pc.c)) return wn * doppler_factor if wl is not None: doppler_factor = np.sqrt((1 + vel / pc.c) / (1 - vel / pc.c)) return wl * doppler_factor