# Copyright (c) 2021-2025 Cubillos & Blecic
# Pyrat Bay is open-source software under the GPL-2.0 license (see LICENSE)
__all__ = [
'make_tli',
]
import os
import time
import struct
import sys
import numpy as np
import mc3.utils as mu
from . import linelist
from .. import constants as pc
from .. import tools as pt
from .. import version as ver
def pack_str(file, string):
"""Convenience function to pack length of and then a string"""
size = len(string)
file.write(struct.pack(f'h{size}s', size, string.encode('utf-8')))
def pack_array(file, array, format, size=None):
"""Convenience function to pack an array of given data type"""
if size is None:
size = len(array)
file.write(struct.pack(f'{size}{format}', *list(array)))
[docs]
def make_tli(
dblist, pflist, dbtype, tlifile, wl_low, wl_high, wl_units,
log=None,
):
"""
Create a transition-line-information (TLI) file.
Parameters
----------
dblist: List of strings
Opacity databases to read.
pflist: List of strings
Partition function for each of the databases.
dbtype: List of strings
Database type of each database.
tlifile: String
Output TLI file name.
wl_low: Float
Lower wavelength boundary to consider, units given by wl_units.
wl_high: Float
High wavelength boundary to consider, units given by wl_units.
wl_units: String
Wavelength units (when not specified in wl_low nor wl_high).
log: Log object
An mc3.utils.Log instance to log screen outputs to file.
"""
if log is None:
log = mu.Log(verb=2)
# Input-not-found error messages:
if tlifile is None:
log.error('Undefined TLI file (tlifile).')
if wl_low is None:
log.error('Undefined low wavelength boundary (wl_low)')
if wl_high is None:
log.error('Undefined high wavelength boundary (wl_high)')
if dblist is None:
log.error('There are no input database files (dblist)')
if dbtype is None:
log.error('There are no input database types (dbtype)')
if pflist is None:
log.error('There are no partition-function inputs (pflist)')
# Check number of files match:
if isinstance(dblist, str):
dblist = [dblist]
nfiles = len(dblist)
if isinstance(pflist, str):
pflist = [pflist]
if len(pflist) == 1:
pflist = [pflist[0] for _ in range(nfiles)]
if isinstance(dbtype, str):
dbtype = [dbtype]
if len(dbtype) == 1:
dbtype = [dbtype[0] for _ in range(nfiles)]
if nfiles != len(pflist) or nfiles != len(dbtype):
log.error(
f'The number of Line-transition files ({nfiles}) does not match '
f'the number of partition-function files ({len(pflist)}) or '
f'database-type files ({len(dbtype)})'
)
# Driver routine to read the databases:
db_readers = {
dbname.lower(): getattr(linelist, dbname)
for dbname in pc.dbases
}
dblist = [
os.path.realpath(dbase.replace('{ROOT}', pc.ROOT))
for dbase in dblist
]
databases = []
db_names = []
unique_dbs = []
log.head('\nReading input database files:')
for (dbase, pf, dtype) in zip(dblist, pflist, dbtype):
if dtype not in db_readers:
log.error(
f"Unknown type '{dtype}' for database '{dbase}'. "
f"Select from: {str(pc.dbases)}"
)
log.head(dbase, indent=2)
db = db_readers[dtype](dbase, pf, log)
databases.append(db)
db_names.append(db.name)
if db.name not in unique_dbs:
unique_dbs.append(db.name)
log.msg(f'There are {nfiles} input database file(s).\n\n')
# Boundaries in wavenumber space (in cm-1):
wn_low = 1.0 / wl_high / pt.u(wl_units)
wn_high = 1.0 / wl_low / pt.u(wl_units)
# Output file:
tli = {}
tli['version'] = f'{ver.LR_VER}.{ver.LR_MIN}.{ver.LR_REV}'
tli['wn_units'] = 'cm-1'
tli['wn_min'] = wn_low
tli['wn_max'] = wn_high
n_databases = len(unique_dbs)
tli['n_databases'] = n_databases
log.msg(
f'Initial wavelength: {wl_low:7.3f} {wl_units} ({wn_high:9.3f} cm-1)\n'
f'Final wavelength: {wl_high:7.3f} {wl_units} ({wn_low:9.3f} cm-1)\n'
f'There are {n_databases} different database(s).'
)
log.head('\nExtracting line transition info.')
tli['databases'] = []
for i,db_name in enumerate(unique_dbs):
dbase = {}
tli['databases'].append(dbase)
ti = time.time()
wn = []
gf = []
elow = []
iso_id = []
for db in databases:
if db.name != db_name:
continue
this_db = db
transitions = db.dbread(wn_low, wn_high, log.verb)
if transitions is None:
continue
wn.append(transitions[0])
gf.append(transitions[1])
elow.append(transitions[2])
iso_id.append(transitions[3])
db = this_db
wn = np.concatenate(wn)
gf = np.concatenate(gf)
elow = np.concatenate(elow)
iso_id = np.concatenate(iso_id)
tf = time.time()
log.debug(f'Reading time: {tf-ti:8.3f} seconds', indent=2)
ntransitions = np.size(wn)
# iso_id are indices as in db.isotopes
# iso_idx are indices 0-N, after filtering out isotopes with no lines
unique_iso, iso_idx, ntrans_iso = np.unique(
iso_id, return_inverse=True, return_counts=True,
)
# Sort by isotope ID, then each isotope by wavenumber:
ti = time.time()
isort = np.argsort(iso_id)
ihi = 0
for ntrans in ntrans_iso:
ilo = ihi
ihi += ntrans
wn_sort = np.argsort(wn[isort][ilo:ihi])
isort[ilo:ihi] = isort[ilo:ihi][wn_sort]
tf = time.time()
wn = wn[isort]
gf = gf[isort]
elow = elow[isort]
iso_id = iso_id[isort]
iso_idx = iso_idx[isort]
log.debug(f'Sort time: {tf-ti:8.3f} seconds', indent=2)
dbase['name'] = db.name
dbase['molecule'] = db.molecule
dbase['n_lines'] = ntransitions
dbase['n_lines_iso'] = ntrans_iso
dbase['iso_id'] = iso_idx
dbase['wn'] = wn
dbase['elow'] = elow
dbase['gf'] = gf
# Filter out isotopes with no line transitions:
iso_names = np.array(db.isotopes)[unique_iso]
iso_mass = np.array(db.mass)[unique_iso]
iso_ratio = np.array(db.isoratio)[unique_iso]
temp, partition, pf_iso = db.getpf(log.verb)
iso_match = np.isin(iso_names, pf_iso)
if np.any(~iso_match):
log.error(
'No partition functions found for these isotopes of the '
f'{db.molecule} line list: {iso_names[~iso_match]}'
)
# Filter and sort PF by isotopes in iso_names:
pf_idx = [pf_iso.index(iso) for iso in iso_names]
pf = partition[pf_idx]
# Store the number of temperature samples and isotopes:
dbase['temperatures'] = temp
dbase['isotopes'] = iso_names
dbase['iso_mass'] = iso_mass
dbase['iso_ratio'] = iso_ratio
dbase['partition'] = pf
# Report info for each isotope
wl_min = 1.0 / np.amax(wn) / pc.um
wl_max = 1.0 / np.amin(wn) / pc.um
n_iso = len(iso_names)
log.msg(
f"Database ({i+1}/{n_databases}): {repr(db.name)} "
f"({db.molecule} molecule)\n"
f'Number of isotopes with line transitions: {n_iso}',
indent=2,
)
log.msg("idx isotope mass (u) fraction n_lines", indent=2)
for j in range(n_iso):
name = f'{repr(str(iso_names[j])):10s}'
mass = iso_mass[j]
ratio = iso_ratio[j]
ratio = f'{ratio:9.7f}' if ratio >= 1e-5 else f'{ratio:.3e}'
ntrans = ntrans_iso[j]
log.msg(
f"{j+1:3d} {name} {mass:7.3f} {ratio} {ntrans:11,d}",
indent=2,
)
log.msg(
f'Total: {ntransitions:,d} line transitions '
f'between {wl_min:.3f} -- {wl_max:.3f} um\n\n'
f'Number of temperatures: {len(temp)}\n'
' Temperatures (K): '
f'[{temp[0]:6.1f}, {temp[1]:6.1f}, ..., {temp[-1]:6.1f}]',
indent=2,
)
for j in range(n_iso):
log.msg(
f'Partition Function ({str(iso_names[j])}): '
f'[{pf[j,0]:.2e}, {pf[j,1]:.2e}, ..., {pf[j,-1]:.2e}]',
indent=4,
)
# Store to file
ti = time.time()
tli_file = open(tlifile, 'wb')
endian = sys.byteorder[0]
tli_file.write(struct.pack('s', endian.encode('utf-8')))
tli_file.write(struct.pack('3h', ver.LR_VER, ver.LR_MIN, ver.LR_REV))
tli_file.write(struct.pack('2d', tli['wn_min'], tli['wn_max']))
tli_file.write(struct.pack('h', tli['n_databases']))
dbases = tli['databases']
for dbase in dbases:
pack_str(tli_file, dbase['name'])
pack_str(tli_file, dbase['molecule'])
ntemp = len(dbase['temperatures'])
niso = len(dbase['isotopes'])
tli_file.write(struct.pack('hh', ntemp, niso))
pack_array(tli_file, dbase['temperatures'], 'd')
for j,iso in enumerate(dbase['isotopes']):
pack_str(tli_file, str(iso))
tli_file.write(struct.pack('d', dbase['iso_mass'][j]))
tli_file.write(struct.pack('d', dbase['iso_ratio'][j]))
pack_array(tli_file, dbase['partition'][j], 'd')
n_lines = np.sum([dbase['n_lines'] for dbase in dbases])
tli_file.write(struct.pack('i', n_lines))
n_lines_iso = np.concatenate([dbase['n_lines_iso'] for dbase in dbases])
tli_file.write(struct.pack('i', len(n_lines_iso)))
for dbase in dbases:
pack_array(tli_file, dbase['n_lines_iso'], 'i')
for dbase in dbases:
pack_array(tli_file, dbase['wn'], 'd')
for dbase in dbases:
pack_array(tli_file, dbase['iso_id'], 'h')
for dbase in dbases:
pack_array(tli_file, dbase['elow'], 'd')
for dbase in dbases:
pack_array(tli_file, dbase['gf'], 'd')
tli_file.close()
tf = time.time()
log.debug(f'Writing time: {tf-ti:8.3f} seconds')
log.head(f"Generated TLI file: '{tlifile}'.")
log.close()