# Copyright (c) 2021-2022 Patricio Cubillos
# Pyrat Bay is open-source software under the GPL-2.0 license (see LICENSE)
__all__ = [
'PassBand',
'Tophat',
'tophat',
'resample',
'band_integrate',
]
from collections.abc import Iterable
import operator
import os
import numpy as np
import scipy.interpolate as si
from .. import constants as pc
from .. import io as io
[docs]class PassBand(object):
"""
A Filter passband object.
"""
def __init__(self, filter_file, wn=None):
"""
Parameters
----------
filter_file: String
Path to filter file containing wavelength (um) and passband
response in two columns.
Comment and blank lines are ignored.
Examples
--------
>>> import pyratbay.spectrum as ps
>>> import pyratbay.constants as pc
>>> import matplotlib.pyplot as plt
>>> import numpy as np
>>> filter_file = f'{pc.ROOT}pyratbay/data/filters/spitzer_irac2_sa.dat'
>>> band = ps.PassBand(filter_file)
>>> # Evaluate over a wavelength array (um):
>>> wl = np.arange(3.5, 5.5, 0.001)
>>> out_wl, out_response = band(wl)
>>> plt.figure(1)
>>> plt.clf()
>>> plt.plot(out_wl, out_response)
>>> plt.plot(band.wl, band.response) # Same variables
>>> # Note wl differs from band.wl, but original array can be used as:
>>> plt.plot(wl[band.idx], band.response)
>>> # Evaluate over a wavenumber array:
>>> wn = 1e4 / wl
>>> band(wn=wn)
>>> plt.figure(1)
>>> plt.clf()
>>> plt.plot(band.wn, band.response, dashes=(5,3))
>>> plt.plot(wn[band.idx], out_response)
"""
# Read filter wavenumber and transmission curves:
filter_file = filter_file.replace('{ROOT}', pc.ROOT)
self.filter_file = os.path.realpath(filter_file)
input_wl, input_response = io.read_spectrum(self.filter_file, wn=False)
self.wl0 = np.sum(input_wl*input_response) / np.sum(input_response)
input_wn = 1.0 / (input_wl * pc.um)
self.wn0 = 1.0 / (self.wl0 * pc.um)
# Sort it in increasing wavenumber order, store it:
wn_sort = np.argsort(input_wn)
self.input_response = input_response[wn_sort]
self.input_wn = input_wn[wn_sort]
self.input_wl = input_wl[wn_sort]
self.response = np.copy(input_response[wn_sort])
self.wn = np.copy(input_wn[wn_sort])
self.wl = 1.0 / (self.wn * pc.um)
self.name = os.path.splitext(os.path.basename(filter_file))[0]
# Resample the filters into the planet wavenumber array:
if wn is not None:
self.__eval__(wn=wn)
def __repr__(self):
return f"pyratbay.spectrum.PassBand('{self.filter_file}')"
def __str__(self):
return f'{self.name}'
def __call__(self, wl=None, wn=None):
"""
Interpolate filter response function at specified spectral array.
The response funciton is normalized such that the integral over
wavenumber equals one.
Parameters
----------
wl: 1D float array
Wavelength array at which evaluate the passband response's
in micron units.
(only one of wl or wn should be provided on call)
wn: 1D float array
Wavenumber array at which evaluate the passband response's
in cm-1 units.
(only one of wl or wn should be provided on call)
Defines
-------
self.response Normalized interpolated response function
self.idx IndicesWavenumber indices
self.wn Passband's wavenumber array
self.wl Passband's wavelength array
Returns
-------
out_wave: 1D float array
Same as self.wl or self.wn depending on the input argument.
out_response: 1D float array
Same as self.response
Examples
--------
>>> # See examples in help(ps.PassBand.__init__)
"""
if not operator.xor(wl is None, wn is None):
raise ValueError(
'Either provide wavelength or wavenumber array, not both'
)
input_is_wl = wn is None
if input_is_wl:
wn = 1.0 / (wl*pc.um)
sign = np.sign(np.ediff1d(wn))
if not (np.all(sign == 1) or np.all(sign == -1)):
raise ValueError(
'Input wavelength/wavenumber array must be strictly '
'increasing or decreasing'
)
sign = sign[0]
response, wn_idx = resample(
self.input_response, self.input_wn, wn, normalize=True,
)
# Internally, wavenumber is always monotonically increasing:
wn_sort = np.argsort(wn[wn_idx])
self.response = response[wn_sort] * sign
self.wn = wn[wn_idx][wn_sort]
self.idx = wn_idx[wn_sort]
self.wl = 1.0 / (self.wn * pc.um)
if input_is_wl:
out_wave = self.wl
else:
out_wave = self.wn
return out_wave, self.response
[docs] def save_filter(self, save_file):
"""
Write filter response function data to file, into two columns
the wavelength (um) and the response function.
Parameters
----------
save_file: String
File where to save the filter data.
"""
io.write_spectrum(
self.wl*pc.um, self.response, save_file, type='filter',
)
[docs]class Tophat(PassBand):
"""
A Filter passband object with a tophat-shaped passband.
"""
def __init__(
self, wl0, half_width,
name='tophat',
):
"""
Parameters
----------
wl0: Float
The passband's central wavelength (um units).
half_width: Float
The passband's half-width (um units).
name: Str
A user-defined name for the filter when calling str(self),
e.g., to identify the instrument provenance of this filter.
Examples
--------
>>> import pyratbay.spectrum as ps
>>> import matplotlib.pyplot as plt
>>> import numpy as np
>>> hat = ps.Tophat(4.5, 0.5)
>>> # Evaluate over a wavelength array (um units):
>>> wl = np.arange(3.5, 5.5, 0.001)
>>> out_wl, out_response = hat(wl)
>>> plt.figure(1)
>>> plt.clf()
>>> plt.plot(out_wl, out_response)
>>> plt.plot(hat.wl, hat.response) # Same variables
>>> # Note wl differs from hat.wl, but original array can be used as:
>>> plt.plot(wl[hat.idx], hat.response)
>>> # Evaluate over a wavenumber array:
>>> wn = 1e4 / wl
>>> hat(wn=wn)
>>> plt.figure(1)
>>> plt.clf()
>>> plt.plot(hat.wn, hat.response, dashes=(5,3))
>>> plt.plot(wn[hat.idx], out_response)
"""
self.wl0 = wl0
self.half_width = half_width
# Read filter wavenumber and transmission curves:
self.wn0 = 1.0 / (self.wl0 * pc.um)
self.name = name
def __repr__(self):
return f'pyratbay.spectrum.Tophat({self.wl0}, {self.half_width})'
def __str__(self):
return f'{self.name}_{self.wl0}um'
def __call__(self, wl=None, wn=None):
"""
Interpolate filter response function at specified spectral array.
The response funciton is normalized such that the integral over
wavenumber equals one.
Parameters
----------
wl: 1D float array
Wavelength array at which evaluate the passband response's
in micron units.
(only one of wl or wn should be provided on call)
wn: 1D float array
Wavenumber array at which evaluate the passband response's
in cm-1 units.
(only one of wl or wn should be provided on call)
Defines
-------
self.response Normalized interpolated response function
self.idx IndicesWavenumber indices
self.wn Passband's wavenumber array
self.wl Passband's wavelength array
Returns
-------
out_wave: 1D float array
Same as self.wl or self.wn depending on the input argument.
out_response: 1D float array
Same as self.response
Examples
--------
>>> # See examples in help(ps.Tophat.__init__)
"""
if not operator.xor(wl is None, wn is None):
raise ValueError(
'Either provide wavelength or wavenumber array, not both'
)
input_is_wl = wn is None
if input_is_wl:
wn = 1.0 / (wl*pc.um)
sign = np.sign(np.ediff1d(wn))
if not (np.all(sign == 1) or np.all(sign == -1)):
raise ValueError(
'Input wavelength/wavenumber array must be strictly '
'increasing or decreasing'
)
sign = sign[0]
nwave = len(wn)
wl_low = self.wl0 - self.half_width
wl_high = self.wl0 + self.half_width
wn_low = 1.0 / (wl_high*pc.um)
wn_high = 1.0 / (wl_low*pc.um)
idx = (wn >= wn_low) & (wn <= wn_high)
indices = np.where(idx)[0]
# One spectral point as margin:
idx_first = indices[0]
idx_last = indices[-1] + 1
if idx_first > 0:
idx_first -= 1
if idx_last < nwave:
idx_last += 1
if sign < 0.0:
self.idx = np.flip(np.arange(idx_first, idx_last))
else:
self.idx = np.arange(idx_first, idx_last)
self.wn = wn[self.idx]
self.response = np.array(idx[self.idx], np.double)
self.response /= np.trapz(self.response, self.wn)
self.wl = 1.0 / (self.wn * pc.um)
if input_is_wl:
out_wave = self.wl
else:
out_wave = self.wn
return out_wave, self.response
[docs]def tophat(wl0, width, margin=None, dlambda=None, resolution=None, ffile=None):
r"""
Generate a top-hat filter function, with transmission = 1.0 from
wl0-width/2 to wl0+width/2, and an extra margin with transmission
= 0.0 at each end.
Parameters
----------
ffile: String
Name of the output file.
wl0: Float
Filter central wavelength in microns.
width: Float
Filter width in microns.
margin: Float
Margin (in microns) with zero-valued transmission, to append
at each end of the filter.
dlambda: Float
Spectral sampling rate in microns.
resolution: Float
Spectral sampling resolution (used if dlambda is None).
ffile: String
If not None, save filter to file.
Examples
--------
>>> import pyratbay.spectrum as ps
>>> wl0 = 1.50
>>> width = 0.50
>>> margin = 0.10
>>> dlambda = 0.05
>>> wl, trans = ps.tophat(wl0, width, margin, dlambda)
>>> print(wl, trans, sep='\n')
[1.15 1.2 1.25 1.3 1.35 1.4 1.45 1.5 1.55 1.6 1.65 1.7 1.75 1.8
1.85]
[0. 0. 0. 1. 1. 1. 1. 1. 1. 1. 1. 1. 0. 0. 0.]
"""
if margin is None:
margin = 0.1 * width
if dlambda is None and resolution is None:
raise ValueError('Either dlambda or resolution must be defined.')
# Wavelength array:
wllow = wl0 - 0.5*width - margin
wlhigh = wl0 + 0.5*width + margin
if dlambda is not None:
wl = np.arange(wllow, wlhigh, dlambda)
else:
f = 0.5 / resolution
g = (1.0-f) / (1.0+f)
imax = int(np.ceil(np.log(wllow/wlhigh) / np.log(g))) + 1
dwl = wlhigh * g**np.arange(imax)
wl = 0.5 * np.flip(dwl[1:] + dwl[:-1], axis=0)
transmission = np.array(np.abs(wl-wl0) < 0.5*width, np.double)
if ffile is not None:
io.write_spectrum(wl*pc.um, transmission, ffile, type='filter')
return wl, transmission
[docs]def resample(signal, wn, specwn, normalize=False):
r"""
Resample signal from wn to specwn wavenumber sampling using a linear
interpolation.
Parameters
----------
signal: 1D ndarray
A spectral signal sampled at wn.
wn: 1D ndarray
Signal's wavenumber sampling, in cm-1 units.
specwn: 1D ndarray
Wavenumber sampling to resample into, in cm-1 units.
normalize: Bool
If True, normalized the output resampled signal to integrate to
1.0 (note that a normalized curve when specwn is a decreasing
function results in negative values for resampled).
Returns
-------
resampled: 1D ndarray
The interpolated signal.
wnidx: 1D ndarray
The indices of specwn covered by the input signal.
Examples
--------
>>> import pyratbay.spectrum as ps
>>> import numpy as np
>>> wn = np.linspace(1.3, 1.7, 11)
>>> signal = np.array(np.abs(wn-1.5)<0.1, np.double)
>>> specwn = np.linspace(1, 2, 51)
>>> resampled, wnidx = ps.resample(signal, wn, specwn)
>>> print(wnidx, specwn[wnidx], resampled, sep='\n')
[16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34]
[1.32 1.34 1.36 1.38 1.4 1.42 1.44 1.46 1.48 1.5 1.52 1.54 1.56 1.58
1.6 1.62 1.64 1.66 1.68]
[0. 0. 0. 0. 0.5 1. 1. 1. 1. 1. 1. 1. 1. 1. 0.5 0. 0. 0.
0. ]
"""
if np.amin(wn) < np.amin(specwn) or np.amax(wn) > np.amax(specwn):
raise ValueError(
"Resampling signal's wavenumber is not contained in specwn."
)
# Indices in the spectrum wavenumber array included in the band
# wavenumber range:
wnidx = np.where((specwn < np.amax(wn)) & (np.amin(wn) < specwn))[0]
# Spline-interpolate:
resampled = si.interp1d(wn,signal)(specwn[wnidx])
if normalize:
resampled /= np.trapz(resampled, specwn[wnidx])
# Return the normalized interpolated filter and the indices:
return resampled, wnidx
[docs]def band_integrate(spectrum, specwn, bandtrans, bandwn):
"""
Integrate a spectrum over the band transmission.
Parameters
----------
spectrum: 1D float iterable
Spectral signal to be integrated.
specwn: 1D float iterable
Wavenumber of spectrum in cm-1.
bandtrans: 1D float iterable
List of normalized interpolated band transmission values in each filter.
bandwn: 1D float iterable
Returns
-------
bflux: 1D float list
Band-integrated values.
Examples
--------
>>> import numpy as np
>>> import matplotlib.pyplot as plt
>>> import pyratbay.io as io
>>> import pyratbay.spectrum as ps
>>> import pyratbay.constants as pc
>>> # Load Spitzer IRAC filters:
>>> wn1, irac1 = io.read_spectrum(
>>> pc.ROOT+'pyratbay/data/filters/spitzer_irac1_sa.dat')
>>> wn2, irac2 = io.read_spectrum(
>>> pc.ROOT+'pyratbay/data/filters/spitzer_irac2_sa.dat')
>>> # Spectrum to integrate:
>>> wn = np.arange(1500, 5000.1, 1.0)
>>> sflux = ps.bbflux(wn, 1800.0)
>>> # Integrate over single filter:
>>> bandflux = ps.band_integrate(sflux, wn, irac1, wn1)
>>> # Integrate over multiple:
>>> bandfluxes = ps.band_integrate(sflux, wn, [irac1,irac2], [wn1, wn2])
>>> # Plot the results:
>>> meanwn = [np.mean(wn1), np.mean(wn2)]
>>> width = 0.5*(np.amax(wn1)-np.amin(wn1)), 0.5*(np.amax(wn2)-np.amin(wn2))
>>> plt.figure(1)
>>> plt.clf()
>>> plt.semilogy(wn, sflux, 'k')
>>> plt.plot(wn1, (irac1+1)*4e4, 'red')
>>> plt.plot(wn2, (irac2+1)*4e4, 'blue')
>>> plt.errorbar(meanwn[0], bandflux, xerr=width[0], fmt='o', color='red')
>>> plt.errorbar(meanwn, bandfluxes, xerr=width, fmt='o', color='none',
>>> mec='k', ecolor='k')
>>> plt.xlim(np.amin(wn), np.amax(wn))
>>> plt.ylim(4e4, 1.2e5)
>>> plt.xlabel('Wavenumber (cm$^{-1}$)')
>>> plt.ylabel(r'Flux (erg s$^{-1}$ cm$^{-2}$ cm)')
"""
if not isinstance(bandtrans[0], Iterable):
bandtrans = [bandtrans]
bandwn = [bandwn]
bflux = []
for btrans, wn in zip(bandtrans, bandwn):
# Resample bandpasses into spectrum wavenumber sampling:
resampled, wnidx = resample(btrans, wn, specwn, normalize=True)
# Band-integrate spectrum:
bflux.append(np.trapz(spectrum[wnidx]*resampled, specwn[wnidx]))
return bflux