Source code for pyratbay.opacity.line_sampling

# Copyright (c) 2021-2026 Cubillos & Blecic
# Pyrat Bay is open-source software under the GPL-2.0 license (see LICENSE)

__all__ = [
    'Line_Sample',
]

import os
import numpy as np
import mc3

from .. import io as io
from .. import tools as pt
from .. import constants as pc
from .. import spectrum as ps
from ..lib import _extcoeff as ec


[docs] class Line_Sample(): """Line-by-line sampled opacities""" def __init__( self, cs_files, *, pressure=None, temperature=None, min_wl=None, max_wl=None, min_wn=None, max_wn=None, isotope_ratios=None, wl_thinning=1, log=None, ): """ Read line-sampled cross-section table(s), with units of cm2 molec-1. Parameters ---------- cs_files: String or iterable of strings Line-sampled cross section file(s) to read. pressure: 1D floar array Pressure profile where to resample the opacities (bar). If None, use the tabulated pressure array from the opacities. If not None, it is allowed to extrapolate to lower pressures but not to higher pressures. min_wl: 1D float ndarray Minimum wavelength value to extract from line-sample files (um) (only one of min_wl or max_wn should be provided). max_wl: 1D float ndarray Maximum wavelength value to extract from line-sample files (um) (only one of min_wn or max_wl should be provided). min_wn: 1D float ndarray Minimum wavenumber value to extract from line-sample files (cm-1) (only one of min_wn or max_wl should be provided). max_wn: 1D float ndarray Maximum wavenumber value to extract from line-sample files (cm-1) (only one of min_wl or max_wn should be provided). wl_thinning: Integer Thinning factor to take every n-th value of the wavenumber array isotope_ratios: String Examples -------- >>> import pyratbay.opacity as op >>> import numpy as np >>> import matplotlib.pyplot as plt >>> # Generate these files from >>> # pyratbay.rtfd.io/en/latest/cookbooks/line_list_hitran.html >>> cs_files = [ >>> 'cross_section_R020K_0150-3000K_0.5-5.0um_hitran_H2O.npz', >>> 'cross_section_R020K_0150-3000K_0.5-5.0um_hitemp_CO.npz', >>> ] >>> # Line-sample opacities at wavenumbers > 1.0 cm-1: >>> ls = op.Line_Sample(cs_files, min_wl=1.0) >>> # Total or per-mol cross sections (cm2 molec-1): >>> temp = np.tile(1400.0, ls.nlayers) >>> cs = ls.calc_cross_section(temp) >>> cs_per_mol = ls.calc_cross_section(temp, per_mol=True) >>> # Cross section per species (cm2 molec-1): >>> ec = ls.calc_cross_section(temp, per_mol=True) >>> # Take a look >>> wl = 1e4/ls.wn >>> plt.figure(1) >>> plt.clf() >>> plt.plot(wl, ec[0,35], color='blue', lw=1.0) >>> plt.plot(wl, ec[1,35], alpha=0.6, color='orange', lw=1.0) >>> plt.yscale('log') """ self.name = 'line sampling' self.nspec = 0 if log is None: log = mc3.utils.Log(width=80) if isinstance(cs_files, str): cs_files = [cs_files] self.cs_files = cs_files missing_files = [ cs_file for cs_file in self.cs_files if pt.isfile(cs_file) == 0 ] if len(missing_files) > 0: log.error(f'Missing opacity files: {missing_files}') # Load opacities into shared memory if and only if possible and needed: use_shared_memory = pt.get_mpi_size() > 1 if use_shared_memory: from mpi4py import MPI rank = pt.get_mpi_rank() # Get dimensions first: # Species, temperature (K), pressure (bar), and wavenumber (cm-1) species, temp, press, wn = io.read_opacity( self.cs_files[0], extract='arrays', ) if temperature is None: self.temp = temp else: self.temp = temperature self.ntemp = len(self.temp) if pressure is None: self.press = press else: self.press = pressure self.nlayers = len(self.press) if min_wn is not None and max_wl is not None: log.error('Either define min_wn or max_wl, not both') if max_wn is not None and min_wl is not None: log.error('Either define min_wl or max_wn, not both') if min_wn is None: min_wn = 0.0 if max_wl is None else 1.0/(max_wl*pc.um) if max_wn is None: max_wn = np.inf if min_wl is None else 1.0/(min_wl*pc.um) wn_mask = ps.wn_mask(wn, min_wn, max_wn) self.wn = wn[wn_mask][::wl_thinning] self.nwave = len(self.wn) # Unpack isotopic parameters # TBD: move parsing logic to pyrat/opacity.py? if isotope_ratios is None: iso_keys = [] else: ratios = isotope_ratios.strip().split('\n') iso_keys = [] iso_labels = [] iso_ratios = [] for iso_data in ratios: ext_label, label, ratio = iso_data.split() iso_keys.append(ext_label) iso_labels.append('iso_' + label) iso_ratios.append(ratio) self.species = [] self.isotopes = [] iso_species = [] species_per_file = [] wn_masks = [] for cs_file in self.cs_files: #cs_file = os.path.basename(cs_file) species, temp, press, wn = io.read_opacity(cs_file,extract='arrays') wn_mask = ps.wn_mask(wn, min_wn, max_wn) wn = wn[wn_mask][::wl_thinning] wn_masks.append(wn_mask) nwave = len(wn) wave_mismatch = ( nwave != self.nwave or np.any(np.abs(1.0-wn/self.wn) > 0.01) ) if wave_mismatch: log.error( f"Wavenumber array of cross-section file '{cs_file}' " "does not match with previous arrays" ) check_pressure_boundaries(self.press, press) # TBD: Implement or raise warning #check_temperature_boundaries(self.temp, temp) # Add new species: iso = '' for i,key in enumerate(iso_keys): if key in cs_file and iso != '': err = f'Multiple isotope labels match {repr(cs_file)}' raise ValueError(err) elif key in cs_file: iso = iso_labels[i] species_per_file.append(species + iso) if species+iso not in iso_species: iso_species.append(species + iso) self.species.append(species) self.isotopes.append(iso) spec_indices = [ iso_species.index(species) for species in species_per_file ] self.species = np.array(self.species) self.nspec = len(self.species) # Set isotopic ratios (ensure setup is valid) self.iso_ratios = np.ones(self.nspec, float) self.iso_fill = [None] * self.nspec self._iso_free = [] self.pnames = [] pars = [] for i,iso in enumerate(self.isotopes): if iso == '': continue idx = iso_labels.index(iso) ratio = iso_ratios[idx] is_fill = ratio.startswith('fill_') if not is_fill: self.iso_ratios[i] = 10.0**float(ratio) # Only non-fillers are free parameters self.pnames.append(iso) self._iso_free.append(i) pars.append(ratio) continue fillers = [f'iso_{filler}' for filler in ratio[5:].split('_')] for filler in fillers: if filler not in self.isotopes: raise ValueError('Invalid filler') self.iso_fill[i] = [ self.isotopes.index(filler) for filler in fillers ] # Now set the filler isotopic ratios self._update_iso_ratios() self.pars = np.array(pars, float) self.npars = len(self.pars) self.texnames = [pname for pname in self.pnames] # Cross-sections table (cm2 molecule-1): cs_shape = (self.nspec, self.ntemp, self.nlayers, self.nwave) if not use_shared_memory: self.cs_table = np.zeros(cs_shape) for i,cs_file in enumerate(self.cs_files): base_name, file_name = os.path.split(cs_file) log.msg(file_name, indent=4) idx = spec_indices[i] mask = wn_masks[i] self.cs_table[idx] += pt.interpolate_opacity( cs_file, self.temp, self.press, mask, wl_thinning, ) else: itemsize = MPI.DOUBLE.Get_size() if rank == 0: nbytes = np.prod(cs_shape) * itemsize else: nbytes = 0 # on rank 0, create the shared block # else get a handle to it win = MPI.Win.Allocate_shared(nbytes, itemsize, comm=MPI.COMM_WORLD) buf, itemsize = win.Shared_query(0) assert itemsize == MPI.DOUBLE.Get_size() self.cs_table = np.ndarray(buffer=buf, dtype='d', shape=cs_shape) if rank == 0: for i,cs_file in enumerate(self.cs_files): base_name, file_name = os.path.split(cs_file) log.msg(file_name, indent=4) idx = spec_indices[i] mask = wn_masks[i] self.cs_table[idx] += pt.interpolate_opacity( cs_file, self.temp, self.press, mask, wl_thinning, ) pt.mpi_barrier() # Set tabulated temperature extrema: self.tmin = np.amin(self.temp) self.tmax = np.amax(self.temp) def _update_iso_ratios(self, pars=None): """ Update isotopic ratios ensuring that filler isotopic ratios are up to date Parameters ---------- pars: 1D float iterable iso_ratio parameters for free isotope parameters. That is, a parameter of -2.0 corresponds to a ratio of 10**-2.0 """ if pars is not None: self.iso_ratios[self._iso_free] = 10.0**np.array(pars) for i,iso in enumerate(self.isotopes): if self.iso_fill[i] is not None: fillers = self.iso_fill[i] self.iso_ratios[i] = 1.0 - np.sum(self.iso_ratios[fillers])
[docs] def get_wl(self, units='um'): """ Get the wavelength array in the desired units. Parameters ---------- units: String Select one from: ['A', 'nm', 'um', 'mm', 'cm', 'm', 'km'] Returns ------- wl: 1D float ndarray The wavelength array from the line-sample grid. """ valid_units = 'A nm um mm cm m km'.split() if units not in valid_units: raise ValueError( f"Invalid wavelength units '{units}', " f"select one from {valid_units}" ) return 1.0/(self.wn * pt.u(units))
[docs] def calc_cross_section( self, temperature, layer=None, per_mol=False, pars=None, ): """ Calculate cross-section spectra (cm2 molec-1) over temperature profiles by interpolating from tabulated values. Parameters ---------- temperature: 1D float array Temperature array (Kelvin) at which to interpolate the cross section (must match nlayers size). layer: Integer If not None, compute cross-sections only at selected layer. In this case, the output array dimension is reduced by 1. per_mol: bool If True, compute cross sections individually per species. If False, co-add cross section contributions from all species. pars: 1D iterable If not None, update the iso_ratio parameters with given values. Returns ------- cross_section: 1D, 2D, or 3D float array Cross section spectra (cm2 molec-1) Output array has shape [nspec, nlayers, nwave]. If per_mol is False, nspec dimension is removed. If layer is not None, nlayers dimension is removed. """ if np.amax(temperature) > self.tmax or np.amin(temperature) < self.tmin: raise ValueError('Temperatures are out of line-sample bounds') if layer is None: layer1 = 0 layer2 = self.nlayers elif np.isscalar(layer): layer1 = layer layer2 = layer+1 elif len(layer) == 2: layer1 = layer[0] layer2 = layer[1] else: raise ValueError('Invalid layer input') if per_mol: cross_section = np.zeros((self.nspec, self.nlayers, self.nwave)) interp_ec = ec.interp_ec_per_mol else: cross_section = np.zeros((self.nlayers, self.nwave)) interp_ec = ec.interp_ec # Update isotopic ratios if pars is not None: self._update_iso_ratios(pars) density = np.ones((self.nlayers, self.nspec)) * self.iso_ratios interp_ec( cross_section, self.cs_table, self.temp, temperature, density, layer1, layer2, ) if np.isscalar(layer): if per_mol: cross_section = cross_section[:,layer] else: cross_section = cross_section[layer] return cross_section
[docs] def calc_extinction_coefficient( self, temperature, density, layer=None, per_mol=False, pars=None, ): """ Calculate extinction-coefficient spectra (cm-1) for temperature and number-density profiles. Parameters ---------- temperature: 1D float array Temperature array (Kelvin) at which to interpolate the cross section (must match nlayers size). density: 2D float array Number-density profiles (molec cm-3) at each layer. Array has shape [nspec,nlayers]. layer: Integer If not None, compute extinction coefficient only at selected layer. In this case, the output array dimension is reduced by 1. per_mol: bool If True, compute extinction coefficients individually per species. If False, co-add extinction contributions from all species. pars: 1D iterable If not None, update the iso_ratio parameters with given values. Returns ------- extinction: 1D, 2D, or 3D float array Extinction-coefficient spectra (cm-1) Output array has shape [nspec, nlayers, nwave]. If per_mol is False, nspec dimension is removed. If layer is not None, nlayers dimension is removed. """ if np.amax(temperature) > self.tmax or np.amin(temperature) < self.tmin: raise ValueError('Temperatures are out of line-sample bounds') if layer is None: layer1 = 0 layer2 = self.nlayers elif np.isscalar(layer): layer1 = layer layer2 = layer+1 elif len(layer) == 2: layer1 = layer[0] layer2 = layer[1] else: raise ValueError('Invalid layer input') if per_mol: extinction = np.zeros((self.nspec, self.nlayers, self.nwave)) interp_ec = ec.interp_ec_per_mol else: extinction = np.zeros((self.nlayers, self.nwave)) interp_ec = ec.interp_ec # Update isotopic ratios if pars is not None: self._update_iso_ratios(pars) interp_ec( extinction, self.cs_table, self.temp, temperature, density*self.iso_ratios, layer1, layer2, ) if np.isscalar(layer): if per_mol: extinction = extinction[:,layer] else: extinction = extinction[layer] return extinction
def __str__(self): fw = pt.Formatted_Write() fw.write( f"Line-sampling cross-section files (cs_files):\n{self.cs_files}" ) fw.write(f'Number of species (nspec): {self.nspec}') fw.write(f'Number of temperature samples (ntemp): {self.ntemp}') fw.write(f'Number of pressure layers (nlayers): {self.nlayers}') fw.write(f'Number of wavenumber samples (nwave): {self.nwave}') fw.write( '\nMinimum and maximum temperatures (tmin, tmax) in K: ' f'[{self.tmin:.1f}, {self.tmax:.1f}]' ) fw.write( 'Minimum and maximum pressures in bar: ' f'[{np.amin(self.press):.3e}, {np.amax(self.press):.3e}]' ) fw.write( 'Minimum and maximum wavelengths in um: ' f'[{np.amin(self.get_wl()):.3f}, {np.amax(self.get_wl()):.3f}]' ) fw.write(f'\nLine-sample species (species): {self.species}') fw.write(f'Temperature array (temps, K):\n{self.temp}') with np.printoptions(precision=3): fw.write(f'Pressure layers (pressure, bar):\n{self.press}') fw.write(f'Wavenumber array (wn, cm-1):\n {self.wn}') fw.write( 'The tabulated cross sections (cs_table, cm2 molecule-1) ' 'are an array\nof dimensions [nspec,ntemp,nlayers,nwave] and ' f'shape {self.cs_table.shape}' ) return fw.text
def check_pressure_boundaries(press, tabulated_press): """ Check that values in pressure profile are no larger than the maximum tabulated pressure (to relative precission of 1e-3). """ pmax = np.amax(press) pmax_table = np.amax(tabulated_press) if pmax/pmax_table - 1 > 1e-3: raise ValueError( 'Pressure profile extends beyond the maximum tabulated pressure' )