# Copyright (c) 2021-2025 Cubillos & Blecic
# Pyrat Bay is open-source software under the GPL-2.0 license (see LICENSE)
__all__ = [
'save_pyrat',
'load_pyrat',
'write_atm',
'read_atm',
'write_spectrum',
'read_spectrum',
'write_spectra',
'read_spectra',
'write_opacity',
'read_opacity',
'write_pf',
'read_pf',
'write_cs',
'read_cs',
'write_observations',
'read_observations',
'read_molecs',
'read_isotopes',
'import_xs',
'import_tea',
]
from decimal import Decimal
import inspect
import os
import pickle
import numpy as np
import mc3
import h5py
from .. import constants as pc
from .. import tools as pt
from .. import spectrum as ps
[docs]
def save_pyrat(pyrat, pfile=None):
"""
Save a pyrat instance into a pickle file.
Parameters
----------
pyrat: A Pyrat instance
Object to save.
pfile: String
Name of output file. Default to the pyrat logname (changing
the extension to '.pickle').
"""
if pfile is None:
pfile = os.path.splitext(pyrat.log.logname)[0] + '.pickle'
print(f'Saving pyrat instance to: {pfile}')
# Reset values to reduce pickle size:
with pt.tmp_reset(
pyrat,
'voigt.profile', 'log.file',
'ex.ec', 'ex.etable', 'ret.posterior',
lt=pyrat.lt.clone_new(pyrat)):
with open(pfile, 'wb') as f:
pickle.dump(pyrat, f, pickle.HIGHEST_PROTOCOL)
[docs]
def load_pyrat(pfile):
"""
Load a pyrat instance from a pickle file.
Parameters
----------
pfile: String
Name of input pickle file.
Returns
-------
pyrat: A Pyrat instance
Loaded object.
"""
with open(pfile, 'rb') as f:
pyrat = pickle.load(f)
pyrat.log.verb = -1
pyrat.set_spectrum()
pyrat.log.verb = pyrat.verb
# Recover MCMC posterior:
if pyrat.ret.sampler == 'multinest':
retrieval_file = f'{pyrat.ret.retrieval_file}.txt'
if pt.isfile(retrieval_file) == 1:
pyrat.ret.posterior = pt.weighted_to_equal(retrieval_file)
elif pyrat.ret.sampler == 'snooker':
retrieval_file = f'{pyrat.ret.retrieval_file}.txt'
if pt.isfile(retrieval_file) == 1:
with np.load(retrieval_file) as mcmc:
pyrat.ret.posterior = mc3.utils.burn(mcmc)[0]
return pyrat
[docs]
def write_atm(
atmfile, pressure, temperature, species=None, abundances=None,
radius=None, punits='bar', runits=None, header=None,
):
r"""
Write an atmospheric file following the Pyrat format.
Parameters
----------
atmfile: String
Name of output atmospheric file.
pressure: 1D float ndarray
Monotonously decreasing pressure profile (in bar).
temperature: 1D float ndarray
Temperature profile for pressure layers (in Kelvin).
species: 1D string ndarray
List of atmospheric species.
abundances: 2D float ndarray
The species mole mixing ratio (of shape [nlayers,nspecies]).
radius: 1D float ndarray
Monotonously increasing radius profile (in cm).
punits: String
Pressure units of output.
runits: String
Radius units of output.
header: String
Header message (comment) to include at the top of the file.
Examples
--------
>>> import numpy as np
>>> import pyratbay.io as io
>>> import pyratbay.atmosphere as pa
>>> atmfile = 'WASP-00b.atm'
>>> nlayers = 5
>>> pressure = pa.pressure('1e-8 bar', '1e2 bar', nlayers)
>>> temperature = pa.tmodels.Isothermal(pressure)(1500.0)
>>> species = "H2 He H2O".split()
>>> abundances = [0.8499, 0.15, 1e-4]
>>> vmr = pa.uniform(abundances, nlayers)
>>> io.write_atm(atmfile, pressure, temperature, species, vmr,
>>> punits='bar', header='# Example atmospheric file:\n')
>>> # Print output file:
>>> with open(atmfile, 'r') as f:
>>> print(f.read())
# Example atmospheric file:
# Pressure units:
@PRESSURE
bar
# Temperatures units:
@TEMPERATURE
kelvin
# Abundance units (mixing ratio):
@ABUNDANCE
volume
# Atmospheric composition:
@SPECIES
H2 He H2O
# Pressure Temperature H2 He H2O
@DATA
1.0000e-08 1500.000 8.499000e-01 1.500000e-01 1.000000e-04
3.1623e-06 1500.000 8.499000e-01 1.500000e-01 1.000000e-04
1.0000e-03 1500.000 8.499000e-01 1.500000e-01 1.000000e-04
3.1623e-01 1500.000 8.499000e-01 1.500000e-01 1.000000e-04
1.0000e+02 1500.000 8.499000e-01 1.500000e-01 1.000000e-04
"""
if (species is None) != (abundances is None):
raise ValueError('Both species and abundances must be defined')
f = open(atmfile, "w")
if header is not None:
f.write(header)
# Set the values units:
f.write(f"# Pressure units:\n@PRESSURE\n{punits}\n")
f.write("# Temperatures units:\n@TEMPERATURE\nkelvin\n")
if radius is not None:
f.write(f"# Radius units:\n@RADIUS\n{runits}\n")
# At the moment, only take volume MR (mass if theres's popular demand)
if species is not None:
f.write("# Abundance units (mixing fraction):\n@ABUNDANCE\nvolume\n")
# Species names:
f.write("\n# Atmospheric composition:\n@SPECIES\n" +
" ".join([f"{mol:<s}" for mol in species]) + '\n')
# Write the per-layer data:
if radius is not None:
f.write("\n# Radius Pressure Temperature ")
else:
f.write("\n# Pressure Temperature ")
if species is not None:
f.write("".join([f"{mol:<14s}" for mol in species]))
f.write("\n@DATA\n")
pressure = pressure*pc.bar/pt.u(punits)
if radius is not None:
radius = radius/pt.u(runits)
# Write data for each layer:
nlayers = len(pressure)
for i in range(nlayers):
if radius is not None:
f.write(f"{radius[i]:12.6e} ")
f.write(f"{pressure[i]:12e} {temperature[i]:9.3f} ")
if species is not None:
f.write(" ".join([f"{q:12.6e}" for q in abundances[i]]))
f.write('\n')
f.close()
[docs]
def read_atm(atmfile):
r"""
Read a Pyrat atmospheric file.
Parameters
----------
atmfile: String
File path to a Pyrat Bay's atmospheric file.
Returns
-------
units: 4-element string tuple
Units for pressure, temperature, abundance, and radius as given
in the atmospheric file.
species: 1D string ndarray
The list of species names read from the atmospheric file (of
size nspec).
press: 1D float ndarray
The atmospheric pressure profile (of size nlayers). The
file's @PRESSURE keyword indicates the ouptput units.
temp: 1D float ndarray
The atmospheric temperature profile (of size nlayers). The
file's @TEMPERATURE keyword indicates the ouptput units.
vmr: 2D float ndarray
The mixing ratio profiles of the atmospheric species (of size
[nlayers,nspec]). The file's @ABUNDANCE indicates the output
units.
radius: 1D float ndarray
The atmospheric altiture profile (of size nlayers). None if the
atmospheric file does not contain a radius profile.
The file's @RADIUS keyword indicates the output units.
Examples
--------
>>> # Continuing example from io.write_atm():
>>> import pyratbay.io as io
>>> atmfile = 'WASP-00b.atm'
>>> units, specs, pressure, temp, q, rad = io.read_atm(atmfile)
>>> print(units, specs, pressure, temp, q, rad, sep='\n')
('bar', 'kelvin', 'volume', None)
['H2' 'He' 'H2O']
[1.0000e-08 3.1623e-06 1.0000e-03 3.1623e-01 1.0000e+02]
[1500. 1500. 1500. 1500. 1500.]
[[8.499e-01 1.500e-01 1.000e-04]
[8.499e-01 1.500e-01 1.000e-04]
[8.499e-01 1.500e-01 1.000e-04]
[8.499e-01 1.500e-01 1.000e-04]
[8.499e-01 1.500e-01 1.000e-04]]
None
"""
atmfile = open(atmfile, "r")
punits, runits, tunits, vmr_units, species = None, None, None, None, None
while True:
line = atmfile.readline().strip()
# Stop when the per-layer data begins:
if line == "@DATA":
break
# Skip empty and comment lines:
elif line == '' or line.startswith('#'):
continue
# Extract units, and species from header:
elif line == '@PRESSURE':
punits = atmfile.readline().strip()
elif line == '@RADIUS':
runits = atmfile.readline().strip()
elif line == '@TEMPERATURE':
tunits = atmfile.readline().strip()
elif line == '@ABUNDANCE':
vmr_units = atmfile.readline().strip()
elif line == '@SPECIES':
species = np.asarray(atmfile.readline().strip().split())
else:
raise ValueError(f"Atmosphere file has unexpected line: \n'{line}'")
if punits is None:
raise ValueError("Atmospheric file does not have '@PRESSURE' header")
if tunits is None:
raise ValueError("Atmospheric file does not have '@TEMPERATURE' header")
if vmr_units is None and species is not None:
raise ValueError("Atmospheric file does not have '@ABUNDANCE' header")
if species is None and vmr_units is not None:
raise ValueError("Atmospheric file does not have '@SPECIES' header")
has_radius = runits is not None
has_vmr = species is not None
nspecies = len(species) if has_vmr else 0
nrad = int(has_radius)
# Read first line to count number of columns:
datastart = atmfile.tell()
line = atmfile.readline()
ncolumns = len(line.split())
if ncolumns == 3 + nspecies and runits is None:
raise ValueError("Atmospheric file does not have '@RADIUS' header")
if ncolumns != 2 + nrad + nspecies:
rad_txt = ", 1 column for radius" if has_radius else ""
q_txt = f", {nspecies} columns for abundances" if has_vmr else ""
raise ValueError(
f"Inconsistent number of columns ({ncolumns}) in '@DATA', "
"expected 2 columns for temperature and pressure"
f"{rad_txt}{q_txt}"
)
# Count number of layers:
nlayers = 1
while True:
line = atmfile.readline()
if line == '' or line.startswith('#'):
break
nlayers += 1
# Initialize arrays:
radius = np.zeros(nlayers, np.double) if has_radius else None
press = np.zeros(nlayers, np.double)
temp = np.zeros(nlayers, np.double)
vmr = np.zeros((nlayers, nspecies), np.double) if has_vmr else None
# Read table:
atmfile.seek(datastart, 0)
for i in range(nlayers):
data = atmfile.readline().split()
if has_radius:
radius[i] = data[0]
press[i] = data[nrad+0]
temp [i] = data[nrad+1]
if has_vmr:
vmr[i] = data[nrad+2:]
units = (punits, tunits, vmr_units, runits)
return units, species, press, temp, vmr, radius
[docs]
def write_spectra(spectra, wl, temperatures, filename):
"""
Write flux spectra as function of wavelength and temperature to file.
Parameters
----------
spectra: 1D float iterable
Flux spectra arrays (erg s-1 cm-2 cm units).
wl: 1D float iterable
Wavelength array in microns.
temperatures: 1D float iterable
Effective temperature array in Kelvin degrees.
filename: String
Output file name.
"""
# Precision of 5 decimal places (or better if needed):
min_diff = np.amin(np.abs(np.ediff1d(wl)))
dec_place = Decimal(min_diff).adjusted()
dec = np.clip(1 - dec_place, 5, 20)
ntemps, nwave = np.shape(spectra)
with open(filename, 'w') as f:
# Write header:
f.write(
"# Temperature units: K\n"
"# Wavelength units: um\n"
"# Flux units: erg s-1 cm-2 cm\n"
)
f.write(f"\n@TEMPERATURES\n{'':{dec+4}}")
for temp in temperatures:
f.write(f"{temp:>15.1f}")
f.write('\n\n@SPECTRA')
# Write the spectrum values:
for i in range(nwave):
f.write(f"\n{wl[i]:>{dec+4}.{dec}f}")
for j in range(ntemps):
f.write(f"{spectra[j,i]:15.7e}")
[docs]
def read_spectra(filename):
"""
Write flux spectra as function of wavelength and temperature to file.
Parameters
----------
filename: String
Input file name.
Returns
-------
spectra: 1D float iterable
Flux spectra arrays (erg s-1 cm-2 cm units).
wn: 1D float iterable
Wavenumber array in cm-1.
temperatures: 1D float iterable
Effective temperature array in Kelvin degrees.
"""
with open(filename, 'r') as f:
lines = [line.strip() for line in f.readlines()]
# Old style:
if '@SPECTRA' not in lines:
wn, spectra = read_spectrum(filename)
temperatures = np.array([0])
return spectra, wn, temperatures
# Remove comments and blanks:
lines = [
line for line in lines
if not line.startswith('#') and line != ''
]
itemp = lines.index('@TEMPERATURES')
temperatures = np.array(lines[itemp+1].split(), np.double)
ntemps = len(temperatures)
iflux = lines.index('@SPECTRA') + 1
nwave = len(lines) - iflux
data = np.zeros((nwave, ntemps+1))
for i in range(nwave):
data[i] = lines[i+iflux].split()
spectra = data[:, 1:].T
wn = 1.0/data[:,0]/pc.um
return spectra, wn, temperatures
[docs]
def write_spectrum(wl, spectrum, filename, type):
"""
Write a spectrum to file.
Parameters
----------
wl: 1D float iterable
Wavelength array in micron units.
spectrum: 1D float iterable
Spectrum array. (rp/rs)**2 for transmission (unitless),
planetary flux for emission (erg s-1 cm-2 cm units).
filename: String
Output file name.
type: String
Data type (only used for header comments):
- 'transit' for transit spectra
- 'eclipse' for secondary eclipse spectra
- 'emission' for emission flux
- 'filter' for a instrumental filter transmission
Examples
--------
>>> # See read_spectrum() examples.
"""
if filename is None:
return
# Type of spectrum and units:
if type == "transit":
spectype = "(Rp/Rs)**2"
specunits = "unitless"
elif type == "eclipse":
spectype = "Fp/Fs"
specunits = "unitless"
elif type == "emission":
spectype = "Flux"
specunits = "erg s-1 cm-2 cm"
elif type == "f_lambda":
spectype = "Flux"
specunits = "W m-2 um-1"
elif type == "filter":
spectype = "transmission"
specunits = "unitless"
else:
raise ValueError(
"Input 'type' argument must be 'transit', 'eclipse', "
"'emission', 'f_lambda', or 'filter'"
)
# Precision of 5 decimal places (or better if needed):
precision = -np.floor(np.log10(np.amin(np.abs(np.ediff1d(wl)))))
precision = int(np.clip(precision+1, 5, np.inf))
buff = precision + 5
# Open-write file:
with open(filename, 'w') as f:
# Write header:
f.write(f'# {"Wavelength":>{buff:d}s} {spectype:>15s}\n')
f.write(f"# {'um':>{buff:d}s} {specunits:>15s}\n")
# Write the spectrum values:
for wave, flux in zip(wl, spectrum):
f.write(f"{wave:>{buff+2:d}.{precision:d}f} {flux:.9e}\n")
[docs]
def read_spectrum(filename, wn=True):
"""
Read a Pyrat spectrum file, a plain text file with two-columns: the
wavelength and signal. If wn is true, this function converts
wavelength to wavenumber in cm-1. The very last comment line sets
the wavelength units (the first string following a blank, e.g., the
string '# um' sets the wavelength units as microns).
If the units are not defined, assume wavelength units are microns.
Parameters
----------
filename: String
Path to output Transit spectrum file to read.
wn: Boolean
If True convert wavelength to wavenumber.
Return
------
wave: 1D float ndarray
The spectrum's wavenumber (in cm units) or wavelength array (in
the input file's units).
spectrum: 1D float ndarray
The spectrum in the input file.
Examples
--------
>>> import pyratbay.io as io
>>> # Write a spectrum to file:
>>> nwave = 7
>>> wl = np.linspace(1.1, 1.7, nwave)
>>> spectrum = np.ones(nwave)
>>> io.write_spectrum(wl, spectrum, 'sample_spectrum.dat', type='transit')
>>> # Take a look at the output file:
>>> with open('sample_spectrum.dat', 'r') as f:
>>> print("".join(f.readlines()))
# Wavelength (Rp/Rs)**2
# um unitless
1.10000 1.000000000e+00
1.20000 1.000000000e+00
1.30000 1.000000000e+00
1.40000 1.000000000e+00
1.50000 1.000000000e+00
1.60000 1.000000000e+00
1.70000 1.000000000e+00
>>> # Now, read from file (getting wavenumber array):
>>> wn, flux = io.read_spectrum('sample_spectrum.dat')
>>> print(wn)
[9090.90909091 8333.33333333 7692.30769231 7142.85714286 6666.66666667
6250. 5882.35294118]
>>> print(flux)
[1. 1. 1. 1. 1. 1. 1.]
>>> # Read from file (getting wavelength array):
>>> wl, flux = io.read_spectrum('sample_spectrum.dat', wn=False)
>>> print(wl)
[1.1 1.2 1.3 1.4 1.5 1.6 1.7]
>>> print(flux)
[1. 1. 1. 1. 1. 1. 1.]
"""
wave, spectrum = np.loadtxt(filename, unpack=True)
# Convert wavelength (um) to wavenumber (cm-1)
if wn:
wave = 1.0 / (wave*pc.um)
return wave, spectrum
[docs]
def write_opacity(ofile, species, temp, press, wn, opacity):
"""
Write an opacity table as a binary npz file.
Parameters
----------
ofile: String
Output filename where to save the opacity data.
File extension must be .npz
species: String
The species name.
temp: 1D float ndarray
Temperature array (Kelvin degree).
press: 1D float ndarray
Pressure array (bar).
wn: 1D float ndarray
Wavenumber array (cm-1).
opacity: 3D float ndarray
Tabulated opacities (cm2 molecule-1) of shape [ntemp, nlayers, nwave].
"""
if not isinstance(species, str):
raise ValueError("'species' input must be a string")
units = {
'temperature': 'K',
'pressure': 'bar',
'wavenumber': 'cm-1',
'cross section': 'cm2 molecule-1',
}
np.savez(
ofile,
species=[species],
temperature=temp,
pressure=press,
wavenumber=wn,
opacity=opacity,
units=units,
)
[docs]
def read_opacity(ofile, extract='all'):
"""
Read an opacity table from file.
Compatible with petitRADTRANS3 opacity files as well.
Parameters
----------
ofile: String
Path to a Pyrat Bay opacity file.
extract: String
Information to extract, select between:
- 'arrays' for the species, temperature, pressure, and wavenumber
- 'opacity' for the opacity grid
- 'all' to get both and the array dimensions.
Returns
-------
units: dict
The physical units for the different quantities
species: String
The species name.
temp: 1D float array
The temperature array (K)
press: 1D float array
The pressure array (bar)
wn: 1D float array
The wavenumber array (cm-1)
opacity: 3D float ndarray tuple
The tabulated opacities (cm2 molecule-1), of shape
[ntemp, nlayers, nwave].
"""
if ofile.endswith('petitRADTRANS.h5'):
with h5py.File(ofile, 'r') as f:
species = list(f['mol_name'])
species = species[0].decode('utf-8')
temp = np.array(f['t'])
press = np.array(f['p'])
wn = np.array(f['bin_edges'])
if extract in ['opacity', 'all']:
opacity = np.array(f['xsecarr'])
# Same format as pyratbay files: (npress, ntemp, nwave)
opacity = np.swapaxes(opacity, 0, 1)
units = {
'temperature': 'K',
'pressure': 'bar',
'wavenumber': 'cm-1',
'cross section': 'cm2 molecule-1',
}
else:
with np.load(ofile, allow_pickle=True) as f:
if len(f['species']) > 1:
raise ValueError(
'Opacity files must contain a single species'
)
species = str(f['species'][0])
temp = f['temperature']
press = f['pressure']
wn = f['wavenumber']
if extract in ['opacity', 'all']:
opacity = f['opacity']
# check/correction for format in pyratbay version 2.0beta
if np.ndim(opacity) == 4:
opacity = opacity[0]
units = np.ndarray.item(f['units']) if 'units' in f else None
# If it does not have units, must be pyratbay<2.0, where pressures
# were stored in barye units, thus, need to convert to bars
if units is None:
press /= pc.bar
units = {
'temperature': 'K',
'pressure': 'bar',
'wavenumber': 'cm-1',
'cross section': 'cm2 molecule-1',
}
if extract == 'opacity':
return opacity
if extract == 'arrays':
return species, temp, press, wn
if extract == 'all':
return (
units,
species, temp, press, wn,
opacity,
)
[docs]
def write_pf(pffile, pf, isotopes, temp, header=None):
"""
Write a partition-function file in Pyrat Bay format.
Parameters
----------
pffile: String
Output partition-function file.
pf: 2D float iterable
Partition-function data (of shape [niso, ntemp]).
isotopes: 1D string iterable
Isotope names.
temp: 1D float iterable
Temperature array.
header: String
A header for the partition-function file (must be as comments).
Examples
--------
>>> # See read_pf() examples.
"""
if len(isotopes) != np.shape(pf)[0]:
raise ValueError('Shape of the partition-function array does not '
'match with the number of isotopes.')
if len(temp) != np.shape(pf)[1]:
raise ValueError('Shape of the partition-function array does not '
'match with the number of temperature samples.')
# Write output file:
with open(pffile, "w") as f:
if header is not None:
f.write(header)
f.write("@ISOTOPES\n "
+ " ".join(["{:13s}".format(iso) for iso in isotopes])
+ "\n\n")
f.write("# Temperature (K), partition function for each isotope:\n")
f.write("@DATA\n")
for t, z in zip(temp, pf.T):
f.write(" {:7.1f} ".format(t)
+ " ".join("{:.7e}".format(d) for d in z) + "\n")
[docs]
def read_pf(pffile):
r"""
Read a partition-function file.
Parameters
----------
pffile: String
Partition function file to read.
Returns
-------
pf: 2D float ndarray
The partition function data (of shape [niso, ntemp]).
isotopes: List of strings
The names of the tabulated isotopes.
temp: 1D float ndarray
Array with temperature sample.
Examples
--------
>>> import pyratbay.io as io
>>> # Generate some mock PF data and write to file:
>>> pffile = 'PF_Exomol_NH3.dat'
>>> isotopes = ['4111', '5111']
>>> temp = np.linspace(10,100,4)
>>> pf = np.array([np.logspace(0,3,4),
>>> np.logspace(1,4,4)])
>>> header = '# Mock partition function for NH3.\n'
>>> io.write_pf(pffile, pf, isotopes, temp, header)
>>> # Now, read it back:
>>> pf, iso, temp = io.read_pf(pffile)
>>> for item in [iso, temp, pf]:
>>> print(item)
['4111' '5111']
[ 10. 40. 70. 100.]
[[1.e+00 1.e+01 1.e+02 1.e+03]
[1.e+01 1.e+02 1.e+03 1.e+04]]
"""
if not os.path.isfile(pffile):
raise ValueError("Partition-function file '{:s}' does not exist.".
format(pffile))
with open(pffile, "r") as f:
lines = f.readlines()
nlines = len(lines)
lines = iter(lines)
for i,line in enumerate(lines):
line = line.strip()
# Stop when the tabulated data begins:
if line == "@DATA":
break
# Read isotopes:
if line == "@ISOTOPES":
isotopes = np.asarray(next(lines).split())
# Number of samples:
niso = len(isotopes)
ntemp = nlines - i - 2
# Allocate arrays:
temp = np.zeros(ntemp, np.double)
pf = np.zeros((niso, ntemp), np.double)
# Read the data:
for i,line in enumerate(lines):
info = line.split()
temp[i] = info[0]
pf[:,i] = info[1:]
return pf, isotopes, temp
[docs]
def write_cs(csfile, cs, species, temp, wn, header=None):
"""
Write a cross-section file in Pyrat Bay format.
Parameters
----------
csfile: String
Output cross-section file.
cs: 2D float iterable
Cross-section opacity in units of cm-1 amagat^-N, with N the
number of species, of shape [ntemp, nwave].
species: 1D string iterable
Species names.
temp: 1D float iterable
Temperature array in Kelvin degree.
wn: 1D float iterable
Wavenumber array in cm-1.
header: String
A header for the cross-section file (must be as comments).
Examples
--------
>>> # See read_cs() examples.
"""
if len(temp) != np.shape(cs)[0]:
raise ValueError('Shape of the cross-section array does not '
'match the number of temperature samples.')
if len(wn) != np.shape(cs)[1]:
raise ValueError('Shape of the cross-section array does not '
'match the number of wavenumber samples.')
with open(csfile, "w") as f:
if header is not None:
f.write(header)
f.write("@SPECIES\n"
+ " ".join(["{:s}".format(spec) for spec in species])
+ "\n\n")
f.write("@TEMPERATURES\n "
+ " ".join(["{:4.0f}".format(t) for t in temp])
+ "\n\n")
f.write("# Wavenumber in cm-1, opacity in cm-1 amagat-{:d}:\n".
format(len(species)))
f.write("@DATA\n")
for wave, data in zip(wn, cs.T):
f.write(" {:7.1f} ".format(wave)
+ " ".join("{:.3e}".format(d) for d in data) + "\n")
[docs]
def read_cs(csfile):
r"""
Read a cross-section file.
Parameters
----------
csfile: String
Partition function file to read.
Returns
-------
cs: 2D float ndarray
Cross-section opacity in units of cm-1 amagat^-N, with N the
number of species, of shape [ntemp, nwave].
species: 1D string list
Species names.
temp: 1D float ndarray
Temperature array in Kelvin degree.
wn: 1D float ndarray
Wavenumber array in cm-1.
Examples
--------
>>> import pyratbay.io as io
>>> # Generate some mock PF data and write to file:
>>> csfile = 'CS_Mock-HITRAN_H2-H2.dat'
>>> species = ['H2', 'H2']
>>> temp = np.linspace(100, 1000, 3)
>>> wn = np.arange(10, 15, 1.0)
>>> cs = np.array([np.logspace( 0,-4,5),
>>> np.logspace(-1,-5,5),
>>> np.logspace(-2,-6,5)])
>>> header = '# Mock cross-section for H2-H2.\n'
>>> io.write_cs(csfile, cs, species, temp, wn, header)
>>> # Now, read it back:
>>> cs, species, temp, wn = io.read_cs(csfile)
>>> for item in [species, temp, wn, cs]:
>>> print(item)
['H2', 'H2']
[ 100. 550. 1000.]
[10. 11. 12. 13. 14.]
[[1.e+00 1.e-01 1.e-02 1.e-03 1.e-04]
[1.e-01 1.e-02 1.e-03 1.e-04 1.e-05]
[1.e-02 1.e-03 1.e-04 1.e-05 1.e-06]]
"""
if not os.path.isfile(csfile):
raise ValueError("Cross-section file '{:s}' does not exist.".
format(csfile))
with open(csfile, "r") as f:
lines = f.readlines()
# Number of header lines (to skip when reading the tabulated data):
nlines = len(lines)
lines = iter(lines)
for i,line in enumerate(lines):
line = line.strip()
# Stop when the tabulated data begins:
if line == "@DATA":
break
# Get species:
elif line == "@SPECIES":
species = next(lines).split()
# Get the sampled temperatures:
elif line == "@TEMPERATURES":
temp = np.array(next(lines).split(), np.double)
# Number of samples:
nwave = nlines - i - 3
ntemp = len(temp)
# Allocate arrays:
wn = np.zeros(nwave, np.double)
cs = np.zeros((ntemp, nwave), np.double)
# Read the data:
for i,line in enumerate(lines):
info = line.split()
wn[i] = info[0]
cs[:,i] = info[1:]
return cs, species, temp, wn
[docs]
def write_observations(
obs_file, inst_names, wl, wl_half_width,
depth=None, depth_err=None, depth_units='none',
):
r"""
Write an observation file for use in pyrat bay.
These can be a combination of tophat pass bands (non-zero wl) or
paths to files with tabulated passbands (zero wl value).
Parameters
----------
obs_file: String
Name of observation file.
inst_names: 1D string iterable
Name for the bandpasses. If this is of type str, use
the same name for all bands.
wl: 1D string ndarray
Central wavelength of the bands (in microns).
If wl is zero for a data point, assume that the inst_name
is a file path to a tabulated pass band.
wl_half_width: 2D float ndarray
Bandpass half-width (in microns).
data: 1D float ndarray
Transit of eclipse depth corresponding to each band.
If not None, include this data into the observation file.
depth_err: 1D float ndarray
Depth uncertainties.
depth_units: String
Units of input depth data.
Examples
--------
>>> import pyratbay.io as io
>>> # Observation file with only the pass bands:
>>> wl = [2.144, 2.333, 2.523]
>>> half_widths = [0.095, 0.095, 0.095]
>>> io.write_observations('obs_file.txt', 'HST', wl, half_widths)
>>> # Observation file with pass bands and data:
>>> data = np.array([329.6, 344.5, 301.4])
>>> uncert = np.array([20.4, 21.9, 23.5])
>>> io.write_observations(
>>> 'obs_file.txt', 'HST', wl, half_widths, data, uncert, 'ppm',
>>> )
"""
default_header = inspect.cleandoc(
"""
# Passband info could be (1) a path to a filter file or
# (2) a tophat filter defined by a central wavelength, half-width,
# and optionally a name
# Comment lines (like this one) and blank lines are ignored,
# central-wavelength and half-width units are always microns
# @DEPTH_UNITS sets the depth and uncert units (none, percent, ppt, ppm)
# and also indicates that there's data and uncertainties to read
# as two columns before the passband info
"""
)
# Consistency checks TBD:
ndata = len(wl)
if isinstance(inst_names, str):
inst_names = np.tile(inst_names, ndata)
has_data = depth is not None and depth_err is not None
# Position of least significant digit
wl_dec = [Decimal(width).adjusted() for width in wl_half_width]
wl_dec += [
Decimal(wl_diff).adjusted()
for wl_diff in np.ediff1d(sorted(wl))
]
wl_dec = np.clip(-np.amin(wl_dec) + 2, 1, 10)
wl_len = wl_dec + 4
depth_header1 = ''
depth_header2 = ''
if has_data:
depth_header1 = f'\n\n@DEPTH_UNITS\n{depth_units}'
depth_header2 = 'depth depth_err'
if depth_units!='f_lambda':
depth_dec = np.amin([Decimal(err).adjusted() for err in depth_err])
depth_dec = np.clip(-depth_dec + 2, 1, 10)
depth_len = depth_dec + 6
data_fmt = f'{depth_len}.{depth_dec}f'
elif has_data and depth_units=='f_lambda':
data_fmt = '15.8e'
with open(obs_file, 'w') as f:
f.write(default_header)
f.write(
f'{depth_header1}'
f'\n\n# {depth_header2} wl half_width instrument\n@DATA\n'
)
for i in range(ndata):
if has_data:
f.write(f'{depth[i]:{data_fmt}} {depth_err[i]:{data_fmt}} ')
# Passband file
if wl[i]==0.0:
f.write(f'{inst_names[i]}\n')
# Tophat
else:
f.write(
f'{wl[i]:{wl_len}.{wl_dec}f} '
f'{wl_half_width[i]:{wl_len}.{wl_dec}f} '
f'{inst_names[i]}\n'
)
[docs]
def read_observations(obs_file):
r"""
Read an observations file.
Parameters
----------
obs_file: String
Path to file containing observations info, see Notes below.
Returns
-------
filters: List
Filter passband objects.
depth: 1D string list
The transit or eclipse depths for each filter.
depth_err: 1D float ndarray
The depth uncertainties.
Notes
-----
An obs_file contains passband info (indicated by a '@DATA' flag),
one line per passband. The passband info could be:
(1) a path to a file containing the spectral response, or
(2) a tophat filter defined by a central wavelength, half-width,
and optionally a name.
A @DEPTH_UNITS flag sets the depth and uncert units (which can be
set to: none, percent, ppt, ppm).
This flag also indicates that there's data and uncerts to read
as two columns before the passband info.
Comment and blank lines are ignored,
central-wavelength and half-width units are always microns.
Examples
--------
>>> import pyratbay.io as io
>>> # File including depths and uncertainties:
>>> obs_file = 'observations.dat'
>>> bands, depth, depth_err = io.read_observations(obs_file)
>>> # File including only the passband info:
>>> obs_file = 'filters.dat'
>>> bands = io.read_observations(obs_file)
"""
if not os.path.isfile(obs_file):
raise ValueError(f"Observation file '{obs_file}' does not exist")
# Skip comment and blank lines:
lines = []
for line in open(obs_file, 'r'):
line = line.strip()
if line == '' or line.startswith('#'):
continue
lines.append(line)
if '@DATA' not in lines:
raise ValueError("Observation file does not have a '@DATA' header")
# Number of header lines (to skip when reading the tabulated data):
i = 0
depth_units = None
nlines = len(lines)
for i in range(nlines):
line = lines[i]
# Stop when the tabulated data begins:
if line == "@DATA":
i += 1
break
# Get the sampled temperatures:
elif line == "@DEPTH_UNITS":
depth_units = lines[i+1]
nobs = nlines - i
has_data = depth_units is not None
filters = []
depth = np.zeros(nobs)
depth_err = np.zeros(nobs)
# Read the data:
for j in range(nobs):
info = lines[i+j].split()
ndata = 0
if has_data:
depth[j] = info[0]
depth_err[j] = info[1]
ndata = 2
if len(info) - ndata == 1:
filter_file = info[ndata].replace('{ROOT}', pc.ROOT)
filter_file = filter_file.replace('{FILTERS}', pc.FILTERS)
filter_file = os.path.realpath(filter_file)
filters.append(ps.PassBand(filter_file))
elif len(info) - ndata == 2:
wl0, half_width = np.array(info[ndata:ndata+2], np.double)
filters.append(ps.Tophat(wl0, half_width))
elif len(info) - ndata == 3:
wl0, half_width = np.array(info[ndata:ndata+2], np.double)
name = info[ndata+2]
filters.append(ps.Tophat(wl0, half_width, name=name))
else:
error_msg = 'Invalid number of values in obs_file'
if has_data and len(info) in [1,2]:
error_msg += (
", perhaps remove the '@DEPTH_UNITS' flag if there's no "
"depth/uncert data"
)
elif not has_data and len(info) in [4,5]:
error_msg += ", perhaps the '@DEPTH_UNITS' flag is missing"
raise ValueError(error_msg)
if has_data:
# Absolute units or depths:
unit_factor = 1.0 if depth_units=='f_lambda' else pt.u(depth_units)
depth *= unit_factor
depth_err *= unit_factor
return filters, depth, depth_err
return filters
[docs]
def read_molecs(file):
r"""
Read a molecules file to extract their names, masses, and radii.
The output also includes the ions denoted by a '-' and '+'
character appended at the end of the species names.
Parameters
----------
file: String
The molecule file path.
Returns
-------
names: 1D string ndarray
The molecules' names.
masses: 1D float ndarray
The mass of the molecules (in g mol-1).
radii: 1D float ndarray
The collisional radius of the molecules (in angstrom).
Notes
-----
In all truthfulness, these are species, not only molecules, as the
file also contain elemental particles.
Examples
--------
>>> import pyratbay.io as io
>>> import pyratbay.constants as pc
>>> names, masses, radii = io.read_molecs(
>>> pc.ROOT+'pyratbay/data/molecules.dat')
>>> names = list(names)
>>> print(f"H2O: mass = {masses[names.index('H2O')]} g mol-1, "
>>> f"radius = {radii[names.index('H2O')]} angstrom.")
H2O: mass = 18.015 g mol-1, radius = 1.6 Angstrom.
"""
names = [] # Molecule names
masses = [] # Molecule masses
radii = [] # Molecule radii
for line in open(file, 'r'):
# Skip comment and blank lines:
if line.strip() == '' or line.strip().startswith('#'):
continue
info = line.split()
names.append(info[0])
masses.append(info[1])
radii.append(info[2])
electron_index = names.index('e-')
names.pop(electron_index)
e_mass = masses.pop(electron_index)
e_radius = radii.pop(electron_index)
names = np.array(
names
+ ['e-']
+ [f'{name}-' for name in names]
+ [f'{name}+' for name in names])
masses = np.array(
masses + [e_mass] + masses + masses,
np.double)
radii = np.array(
radii + [e_radius] + radii + radii,
np.double)
return names, masses, radii
[docs]
def read_isotopes(file):
r"""
Read an isotopes file to extract their molecule, hitran name,
exomol name, isotopic ratio, and mass.
Parameters
----------
file: String
The isotope file path.
Returns
-------
mol: 1D string ndarray
Molecule names.
hitran_iso: 1D string ndarray
Isotope name as in HITRAN database.
exomol_iso: 1D string ndarray
Isotope name based on exomol database.
iso_ratio: 1D float ndarray
Isotopic ratios.
iso_mass: 1D float ndarray
The mass of the molecules (in g mol-1).
Examples
--------
>>> import pyratbay.io as io
>>> import pyratbay.constants as pc
>>> mol, hit_iso, exo_iso, ratio, mass = \
>>> io.read_isotopes(pc.ROOT+'pyratbay/data/isotopes.dat')
>>> print("H2O isotopes:\n iso iso isotopic mass"
>>> "\n hitran exomol ratio g/mol")
>>> for i in range(len(mol)):
>>> if mol[i] == 'H2O':
>>> print(f" {hit_iso[i]:6} {exo_iso[i]:6} "
>>> f"{ratio[i]:.3e} {mass[i]:.4f}")
H2O isotopes:
iso iso isotopic mass
hitran exomol ratio g/mol
161 116 9.973e-01 18.0106
181 118 1.999e-03 20.0148
171 117 3.719e-04 19.0148
162 126 3.107e-04 19.0168
182 000 6.230e-07 21.0211
172 000 1.158e-07 20.0211
262 226 2.420e-08 20.0210
282 000 0.000e+00 22.0000
272 000 0.000e+00 21.0000
"""
mol, hitran_iso, exomol_iso, ratio, mass = [], [], [], [], []
for line in open(file, 'r'):
# Skip comment and blank lines:
if line.strip() == '' or line.strip().startswith('#'):
continue
info = line.split()
mol.append(info[0])
hitran_iso.append(info[1])
exomol_iso.append(info[2])
ratio.append(info[3])
mass.append(info[4])
mol = np.asarray(mol)
hitran_iso = np.asarray(hitran_iso)
exomol_iso = np.asarray(exomol_iso)
ratio = np.asarray(ratio, np.double)
mass = np.asarray(mass, np.double)
return mol, hitran_iso, exomol_iso, ratio, mass
[docs]
def import_xs(filename, source, read_all=True, ofile=None):
"""
Read a cross-section opacity file from an external source.
Parameters
----------
filename: String
The opacity pickle file to read.
source: String
The cross-section source: 'exomol' or 'taurex' (see note below).
read_all: Bool
If True, extract all contents in the file: cross-section,
pressure, temperature, and wavenumber.
If False, extract only the cross-section data.
ofile: String
If not None, store Exomol XS data into a Pyratbay opacity
format.
Returns
-------
xs: 3D float ndarray
Opacity cross-section in cm2 molecule-1.
with shape [npress, ntemp, nwave].
pressure: 1D float ndarray
Pressure sample of the opacity file (in bars)
temperature: 1D float ndarray
Temperature sample of the opacity file (in Kelvin degrees).
wavenumber: 1D float ndarray
Wavenumber sample of the opacity file (in cm-1).
species: String
The species name.
Notes
-----
- exomol cross sections (Chubb et al. 2020, AA) can be found here:
http://www.exomol.com/data/data-types/opacity/
- taurex cross sections (Al-Refaie et al. 2019) can be found here:
https://taurex3-public.readthedocs.io/en/latest/user/taurex/quickstart.html
Examples
--------
>>> # For this example, you'll need to have/download the following
>>> # file into the current folder:
>>> # http://www.exomol.com/db/H2O/1H2-16O/POKAZATEL/1H2-16O__POKAZATEL__R15000_0.3-50mu.xsec.TauREx.h5
>>> import pyratbay.io as io
>>> filename = '1H2-16O__POKAZATEL__R15000_0.3-50mu.xsec.TauREx.h5'
>>> xs, press, temp, wn, species = io.import_xs(filename, 'exomol')
"""
if source == 'exomol':
with h5py.File(filename, 'r') as xs_data:
xs = np.array(xs_data['xsecarr'])
if read_all or ofile is not None:
pressure = np.array(xs_data['p'])
temperature = np.array(xs_data['t'])
wavenumber = np.array(xs_data['bin_edges'])
species = xs_data['mol_name'][0].decode('utf-8')
elif source == 'taurex':
with open(filename, 'rb') as f:
xs_data = pickle.load(f)
xs = xs_data['xsecarr']
if read_all or ofile is not None:
pressure = xs_data['p']
temperature = xs_data['t']
wavenumber = xs_data['wno']
species = xs_data['name']
else:
raise ValueError("Invalid cross-section source type.")
if ofile is not None:
nlayers, ntemp, nwave = np.shape(xs)
xs_pb = np.swapaxes(xs, 0, 1)
write_opacity(ofile, species, temperature, pressure, wavenumber, xs_pb)
if read_all:
return xs, pressure, temperature, wavenumber, species
return xs
[docs]
def import_tea(teafile, atmfile, req_species=None):
"""
Format a TEA atmospheric file into a Pyrat atmospheric file.
Paramters
---------
teafile: String
Input TEA atmospheric file.
atmfile: String
Output Pyrat atmospheric file.
req_species: List of strings
The requested species for output. If None, request all species
in teafile.
"""
# Open read the TEA file:
with open(teafile, "r") as f:
tea = np.asarray(f.readlines())
# Line-indices where the species and data are:
ispec = np.where(tea == "#SPECIES\n")[0][0] + 1 # Species list
idata = np.where(tea == "#TEADATA\n")[0][0] + 2 # data starting line
# TEA--Pyrat molecules names dictionary:
tea_to_pyrat = {}
for line in open(pc.ROOT+"pyratbay/data/TEA_gdata_defaults.txt", "r"):
pyrat_name, tea_name = line.split()
tea_to_pyrat[tea_name] = pyrat_name
# Read and clean species names:
species = tea[ispec].split()
nspecies = len(species)
for i in range(nspecies):
species[i] = tea_to_pyrat[species[i]]
if req_species is None:
req_species = species
# Species indices corresponding to req_species:
nreqspecies = len(req_species)
sindex = np.zeros(len(req_species), int)
for i in range(len(req_species)):
sindex[i] = species.index(req_species[i])
# Extract per-layer data:
nlayers = len(tea) - idata
temperature = np.zeros(nlayers)
pressure = np.zeros(nlayers)
abundance = np.zeros((nlayers, nreqspecies))
for i in range(nlayers):
data = np.asarray(tea[idata+i].split(), np.double)
pressure[i], temperature[i] = data[0:2]
abundance[i,:] = data[2:][sindex]
# TEA pressure units are always bars:
punits = "bar"
# File header:
header = "# TEA atmospheric file formatted for Pyrat.\n\n"
write_atm(
atmfile, pressure, temperature, req_species, abundance,
punits=punits, header=header,
)