Source code for pyratbay.pyrat.spectrum

# Copyright (c) 2021-2025 Cubillos & Blecic
# Pyrat Bay is open-source software under the GPL-2.0 license (see LICENSE)

__all__ = [
    'Spectrum',
    'spectrum',
    'two_stream',
]

import os
import numpy as np
import scipy.constants as sc
import scipy.special as ss

from .. import constants as pc
from .. import io
from .. import spectrum as ps
from .. import tools as pt


class GetWavelength:
    def __get__(self, obj, objtype=None):
        return 1.0 / (obj.wn * pc.um)


[docs] class Spectrum(): wl = GetWavelength() def __init__(self, inputs, log): """ Make the wavenumber sample from user inputs. """ log.head('\nGenerating wavenumber array.') self.specfile = inputs.specfile # Transmission/Emission spectrum file self.intensity = None # Intensity spectrum array self.clear = None # Clear modulation spectrum for patchy model self.cloudy = None # Cloudy modulation spectrum for patchy model self.starflux = None # Stellar flux spectrum self.wnosamp = None # Gaussian-quadrature flux integration over hemisphere (for emission) if inputs.quadrature is not None: self.quadrature = inputs.quadrature qnodes, qweights = ss.p_roots(self.quadrature) qnodes = 0.5*(qnodes + 1.0) self.raygrid = np.arccos(np.sqrt(qnodes)) self.quadrature_mu = np.sqrt(qnodes) weights = 0.5 * np.pi * qweights else: # Custom defined mu-angles: raygrid = inputs.raygrid * sc.degree if raygrid[0] != 0: log.error('First angle in raygrid must be 0.0 (normal)') if np.any(raygrid < 0) or np.any(raygrid > 0.5*np.pi): log.error('raygrid angles must lie between 0 and 90 deg') if np.any(np.ediff1d(raygrid) <= 0): log.error('raygrid angles must be monotonically increasing') self.quadrature_mu = np.cos(raygrid) # Weights are the projected area for each mu range: bounds = np.linspace(0, 0.5*np.pi, len(raygrid)+1) bounds[1:-1] = 0.5 * (raygrid[:-1] + raygrid[1:]) weights = np.pi * (np.sin(bounds[1:])**2 - np.sin(bounds[:-1])**2) self.quadrature_weights = np.expand_dims(weights, axis=1) self.nangles = len(self.quadrature_mu) self.f_dilution = inputs.f_dilution # Radiative-transfer path: self._rt_path = inputs.rt_path if inputs.wlunits is None: self.wlunits = 'um' else: self.wlunits = inputs.wlunits wl_units = pt.u(inputs.wlunits) # Low wavenumber boundary: if inputs.wnlow is None and inputs.wl_high is None: log.error( 'Undefined low wavenumber boundary. Either set wnlow or wl_high' ) if inputs.wnlow is not None and inputs.wl_high is not None: log.warning( f'Both wnlow ({self.wnlow:.2e} cm-1) and wl_high ' '({self.wl_high:.2e} cm) were defined. wl_high will be ignored' ) if inputs.wnlow is not None: self.wnlow = inputs.wnlow self.wl_high = 1.0 / self.wnlow else: self.wl_high = inputs.wl_high self.wnlow = 1.0 / self.wl_high # High wavenumber boundary: if inputs.wnhigh is None and inputs.wl_low is None: log.error( 'Undefined high wavenumber boundary. Either set wnhigh or wl_low' ) if inputs.wnhigh is not None and inputs.wl_low is not None: log.warning( f'Both wnhigh ({self.wnhigh:.2e} cm-1) and wl_low ' '({self.wl_low:.2e} cm) were defined. wl_low will be ignored' ) if inputs.wnhigh is not None: self.wnhigh = inputs.wnhigh self.wl_low = 1.0 / self.wnhigh else: self.wl_low = inputs.wl_low self.wnhigh = 1.0 / self.wl_low # Consistency check (wnlow < wnhigh): if self.wnlow > self.wnhigh: log.error( f'Wavenumber low boundary ({self.wnlow:.1f} cm-1) must be ' f'larger than the high boundary ({self.wnhigh:.1f} cm-1)' ) self.resolution = None self.wlstep = None # If there are cross-section tables, take sampling from there: if pt.isfile(inputs.sampled_cs) == 1 and inputs.runmode != 'opacity': wn = io.read_opacity(inputs.sampled_cs[0], extract='arrays')[3] # Update wavenumber sampling: wn_mask = ps.wn_mask(wn, self.wnlow, self.wnhigh) self.wn = wn[wn_mask][::inputs.wl_thinning] self.nwave = len(self.wn) self.spectrum = np.zeros(self.nwave, np.double) if self._rt_path not in pc.transmission_rt: self.fplanet = np.zeros(self.nwave, np.double) wn_min = self.wn[0] wn_max = self.wn[-1] # Guess sampling by looking at std of sampling rates: dwn = np.ediff1d(np.abs(self.wn)) dwl = np.ediff1d(np.abs(1.0/self.wn)) res = np.abs(self.wn[1:]/dwn) std_dwn = np.std(dwn/np.mean(dwn)) std_dwl = np.std(dwl/np.mean(dwl)) std_res = np.std(res/np.mean(res)) if std_dwn < std_dwl and std_dwn < std_res: self.wnstep = self.wn[1] - self.wn[0] sampling_text = f'sampling rate = {self.wnstep:.2f} cm-1' elif std_dwl < std_dwn and std_dwl < std_res: self.wlstep = np.abs(1/self.wn[0] - 1/self.wn[1]) / pc.um sampling_text = f'sampling rate = {self.wlstep:.6f} um' else: g = self.wn[-2]/self.wn[-1] # Assuming no one would care for a R with more than 5 decimals: self.resolution = np.round(0.5*(1+g)/(1-g), decimals=5) #self.wnhigh = 2 * ex.wn[-1]/(1+g) sampling_text = f'R = {self.resolution:.1f}' log.msg( "Reading spectral sampling from extinction-coefficient " f"table. Adopting array with {sampling_text}, " f"and {self.nwave} samples between " f"[{wn_min:.2f}, {wn_max:.2f}] cm-1." ) return # At least one sampling mode must be defined: undefined_sampling_rate = ( inputs.wnstep is None and inputs.wlstep is None and inputs.resolution is None ) if undefined_sampling_rate: log.error( 'Undefined spectral sampling rate, either set resolution, ' 'wnstep, or wlstep' ) # highly composite numbers hcn = np.array([ 1, 2, 4, 6, 12, 24, 36, 48, 60, 120, 180, 240, 360, 720, 840, 1260, 1680, 2160, 2520, 5040, 7560, 10080, 15120, 20160, 25200, 27720, 45360, 50400, 55440, 83160, 110880, 221760, 277200, ]) if inputs.wnstep is not None: self.wnstep = inputs.wnstep # Default to a wavenumber supersampling ~0.0004 (R ~2e7 at 1.0 um) if inputs.wnosamp is None: if inputs.wnstep is None: self.wnstep = 1.0 self.wnosamp = hcn[self.wnstep/hcn <= 0.0004][0] else: self.wnosamp = inputs.wnosamp self.resolution = inputs.resolution self.wlstep = inputs.wlstep if inputs.resolution is not None: # Constant-resolving power wavenumber sampling: self.wn = ps.constant_resolution_spectrum( self.wnlow, self.wnhigh, self.resolution, ) self.wlstep = None elif self.wlstep is not None: # Constant-sampling rate wavelength sampling: wl = np.arange(self.wl_low, self.wl_high, self.wlstep) self.wn = 1.0/np.flip(wl) self.wnlow = self.wn[0] self.resolution = None else: # Constant-sampling rate wavenumber sampling: nwave = int((self.wnhigh-self.wnlow)/self.wnstep) + 1 self.wn = self.wnlow + np.arange(nwave) * self.wnstep self.nwave = len(self.wn) # Fine-sampled wavenumber array: self.ownstep = self.wnstep / self.wnosamp self.onwave = int(np.ceil((self.wn[-1]-self.wnlow)/self.ownstep)) + 1 self.own = self.wnlow + np.arange(self.onwave) * self.ownstep self.spectrum = np.zeros(self.nwave, np.double) if self._rt_path not in pc.transmission_rt: self.fplanet = np.zeros(self.nwave, np.double) # Get list of divisors: self.odivisors = pt.divisors(self.wnosamp) # Re-set final boundary (stay inside given boundaries): if self.wn[-1] != self.wnhigh: log.warning( f'Final wavenumber modified from {self.wnhigh:.4f} cm-1 (input)' f'\n to {self.wn[-1]:.4f} cm-1' ) # Screen output: log.msg( f'Initial wavenumber boundary: {self.wnlow:.5e} cm-1 ' f'({self.wl_high/wl_units:.3e} {self.wlunits})\n' f'Final wavenumber boundary: {self.wnhigh:.5e} cm-1 ' f'({self.wl_low/wl_units:.3e} {self.wlunits})', indent=2, ) if self.resolution is not None: msg = f'Spectral resolving power: {self.resolution:.1f}' elif self.wlstep is not None: wl_step = self.wlstep / wl_units msg = f'Wavelength sampling interval: {wl_step:.2g} {self.wlunits}' else: msg = f'Wavenumber sampling interval: {self.wnstep:.2g} cm-1' log.msg( f'{msg}\n' f'Wavenumber sample size: {self.nwave:8d}\n' f'Wavenumber fine-sample size: {self.onwave:8d}\n', indent=2, ) log.head('Wavenumber sampling done.') def __str__(self): fmt = {'float': '{: .3e}'.format} fw = pt.Formatted_Write() fw.write('Spectral information:') fw.write('Wavenumber internal units: cm-1') fw.write('Wavelength internal units: cm') fw.write('Wavelength display units (wlunits): {:s}', self.wlunits) wn_min = np.amin(self.wn) wn_max = np.amax(self.wn) wl_min = 1/(wn_max*pt.u(self.wlunits)) wl_max = 1/(wn_min*pt.u(self.wlunits)) fw.write( f'Low wavenumber boundary (wnlow): {wn_min:10.3f} cm-1 ' f'(wl_high = {wl_max:6.2f} {self.wlunits})', ) fw.write( f'High wavenumber boundary (wnhigh): {wn_max:10.3f} cm-1 ' f'(wl_low = {wl_min:6.2f} {self.wlunits})', ) fw.write('Number of samples (nwave): {:d}', self.nwave) if self.resolution is None: fw.write('Sampling interval (wnstep): {:.3f} cm-1', self.wnstep) else: fw.write( 'Spectral resolving power (resolution): {:.1f}', self.resolution, ) fw.write( 'Wavenumber array (wn, cm-1):\n {}', self.wn, fmt={'float': '{: .3f}'.format}, ) fw.write('Oversampling factor (wnosamp): {:d}', self.wnosamp) fw.write( '\nGaussian quadrature cos(theta) angles (quadrature_mu):\n {}', self.quadrature_mu, prec=3, ) fw.write( 'Gaussian quadrature weights (quadrature_weights):\n {}', self.quadrature_weights.flatten(), prec=3, ) if self.intensity is not None: fw.write('Intensity spectra (intensity, erg s-1 cm-2 sr-1 cm):') for intensity in self.intensity: fw.write(' {}', intensity, fmt=fmt, edge=3) if self._rt_path in pc.emission_rt: fw.write( 'Emission spectrum (spectrum, erg s-1 cm-2 cm):\n {}', self.spectrum, fmt=fmt, edge=3, ) elif self._rt_path in pc.transmission_rt: fw.write( '\nTransmission spectrum, (Rp/Rs)**2 (spectrum):\n {}', self.spectrum, fmt=fmt, edge=3, ) return fw.text
def _get_cloud_deck(pyrat): """Wrapper to get cloud deck parameters if they exist""" for model in pyrat.opacity.models: if model.name == 'deck': return model.rsurf, model.tsurf, model.itop return None, None, None
[docs] def spectrum(pyrat): """ Spectrum calculation driver. """ pyrat.log.head('\nCalculate the planetary spectrum.') spec = pyrat.spec # Initialize the spectrum array: spec.spectrum = np.empty(spec.nwave, np.double) if pyrat.opacity.is_patchy: spec.clear = np.empty(spec.nwave, np.double) spec.cloudy = np.empty(spec.nwave, np.double) deck_rsurf, deck_tsurf, deck_itop = _get_cloud_deck(pyrat) f_patchy = pyrat.opacity.fpatchy # Transmission spectroscopy: if pyrat.od.rt_path in pc.transmission_rt: spec.spectrum = ps.transmission( pyrat.od.depth, pyrat.atm.radius, pyrat.atm.rstar, pyrat.od.ideep, pyrat.atm.rtop, deck_rsurf, deck_itop, ) if pyrat.opacity.is_patchy: spec.cloudy = spec.spectrum spec.clear = ps.transmission( pyrat.od.depth_clear, pyrat.atm.radius, pyrat.atm.rstar, pyrat.od.ideep_clear, pyrat.atm.rtop, ) spec.spectrum = f_patchy*spec.cloudy + (1.0-f_patchy)*spec.clear # Plane-parallel emission elif pyrat.od.rt_path in ['emission', 'eclipse', 'f_lambda']: pyrat.od.B = np.zeros((pyrat.atm.nlayers, spec.nwave), np.double) ps.blackbody_wn_2D(spec.wn, pyrat.atm.temp, pyrat.od.B) weights = spec.quadrature_weights spec.intensity, spec.spectrum = ps.plane_parallel_rt( pyrat.od.depth, pyrat.od.B, spec.wn, spec.quadrature_mu, spec.quadrature_weights, pyrat.od.ideep, pyrat.atm.rtop, deck_tsurf, deck_itop, ) spec.spectrum = np.sum(spec.intensity * weights, axis=0) if pyrat.opacity.is_patchy: spec.cloudy = spec.spectrum intensity_clear, spec.clear = ps.plane_parallel_rt( pyrat.od.depth_clear, pyrat.od.B, spec.wn, spec.quadrature_mu, spec.quadrature_weights, pyrat.od.ideep_clear, pyrat.atm.rtop, ) spec.spectrum = f_patchy*spec.cloudy + (1.0-f_patchy)*spec.clear spec.fplanet = spec.spectrum # Two stream emission elif pyrat.od.rt_path in ['emission_two_stream', 'eclipse_two_stream']: two_stream(pyrat) # Scaling factors is_emission = pyrat.od.rt_path in pc.eclipse_rt + pc.emission_rt if is_emission and spec.f_dilution is not None: spec.fplanet *= spec.f_dilution if pyrat.opacity.is_patchy: spec.clear *= spec.f_dilution spec.cloudy *= spec.f_dilution if pyrat.od.rt_path in pc.eclipse_rt: fstar_rprs = 1/spec.starflux * (pyrat.atm.rplanet/pyrat.atm.rstar)**2 spec.fplanet = np.copy(spec.spectrum) spec.spectrum = spec.eclipse = spec.fplanet * fstar_rprs if pyrat.opacity.is_patchy: spec.fplanet_clear = np.copy(spec.clear) spec.fplanet_cloudy = np.copy(spec.cloudy) spec.clear = spec.fplanet_clear * fstar_rprs spec.cloudy = spec.fplanet_cloudy * fstar_rprs # Print spectra to file: if pyrat.od.rt_path in pc.transmission_rt: spec_type = 'transit' elif pyrat.od.rt_path in pc.emission_rt: spec_type = 'emission' elif pyrat.od.rt_path in pc.eclipse_rt: spec_type = 'eclipse' io.write_spectrum( spec.wl, spec.spectrum, spec.specfile, spec_type, ) # Also save fstar and fplanet when possible if spec.specfile is not None: file, extension = os.path.splitext(spec.specfile) if is_emission and spec.starflux is not None: fstar_file = f'{file}_fstar{extension}' io.write_spectrum( spec.wl, spec.starflux, fstar_file, 'emission', ) if pyrat.od.rt_path in pc.eclipse_rt: fplanet_file = f'{file}_fplanet{extension}' io.write_spectrum( spec.wl, spec.fplanet, fplanet_file, 'emission', ) if spec.specfile is not None: specfile = f": '{spec.specfile}'" else: specfile = "" pyrat.log.head(f"Computed {spec_type} spectrum{specfile}.", indent=2) pyrat.log.head('Done.')
[docs] def two_stream(pyrat): """ Two-stream approximation radiative transfer following Heng et al. (2014) This function defines downward (flux_down) and uppward fluxes (flux_up) into pyrat.spec, and sets the emission spectrum as the uppward flux at the top of the atmosphere (flux_up[0]): flux_up: 2D float ndarray Upward flux spectrum through each layer under the two-stream approximation (erg s-1 cm-2 cm). flux_down: 2D float ndarray Downward flux spectrum through each layer under the two-stream approximation (erg s-1 cm-2 cm). """ pyrat.log.msg('Compute two-stream flux spectrum.', indent=2) spec = pyrat.spec nlayers = pyrat.atm.nlayers # Set internal net bolometric flux to sigma*Tint**4: spec.f_int = ps.blackbody_wn(spec.wn, pyrat.atm.tint) total_f_int = np.trapezoid(spec.f_int, spec.wn) if total_f_int > 0: spec.f_int *= pc.sigma * pyrat.atm.tint**4 / total_f_int # Diffusivity factor (Eq. B5 of Heng et al. 2014): dtau0 = np.diff(pyrat.od.depth, n=1, axis=0) trans = (1-dtau0)*np.exp(-dtau0) + dtau0**2 * ss.exp1(dtau0) B = pyrat.od.B = ps.blackbody_wn_2D(spec.wn, pyrat.atm.temp) Bp = np.diff(pyrat.od.B, n=1, axis=0) / dtau0 # Diffuse approximation to compute downward and upward fluxes: spec.flux_down = np.zeros((nlayers, spec.nwave)) spec.flux_up = np.zeros((nlayers, spec.nwave)) # TBD: Make this an error if any data is not there is_irradiation = ( spec.starflux is not None and pyrat.atm.smaxis is not None and pyrat.atm.rstar is not None ) # Top boundary condition: if is_irradiation: spec.flux_down[pyrat.atm.rtop] = pyrat.atm.beta_irr * \ (pyrat.atm.rstar/pyrat.atm.smaxis)**2 * spec.starflux # Eqs. (B6) of Heng et al. (2014): # TBD: Can I make this work if rtop is below the top layer? #for i in range(pyrat.atm.rtop, nlayers-1): for i in range(nlayers-1): spec.flux_down[i+1] = ( trans[i] * spec.flux_down[i] + np.pi * B[i] * (1-trans[i]) + np.pi * Bp[i] * ( -2/3 * (1-np.exp(-dtau0[i])) + dtau0[i]*(1-trans[i]/3)) ) spec.flux_up[nlayers-1] = spec.flux_down[nlayers-1] + spec.f_int for i in reversed(range(nlayers-1)): #for i in reversed(range(pyrat.atm.rtop, nlayers-1)): spec.flux_up[i] = ( trans[i] * spec.flux_up[i+1] + np.pi * B[i+1] * (1-trans[i]) + np.pi * Bp[i] * ( 2/3 * (1-np.exp(-dtau0[i])) - dtau0[i]*(1-trans[i]/3)) ) pyrat.spec.fplanet = pyrat.spec.spectrum = spec.flux_up[0]