Source code for pyspex.tlm

#
# This file is part of pyspex
#
# https://github.com/rmvanhees/pyspex.git
#
# Copyright (c) 2019-2023 SRON - Netherlands Institute for Space Research
#    All Rights Reserved
#
# License:  BSD-3-Clause
"""`SPXtlm` can read telemetry house-keeping data from SPEXone."""
from __future__ import annotations

__all__ = ['SPXtlm']

import datetime as dt
import logging
from pathlib import Path
from typing import TYPE_CHECKING

import h5py
import numpy as np

# pylint: disable=no-name-in-module
from netCDF4 import Dataset

from .hkt_io import HKTio, check_coverage_nav, read_hkt_nav
from .l1a_io import L1Aio
from .lib.ccsds_hdr import CCSDShdr
from .lib.leap_sec import get_leap_seconds
from .lib.tlm_utils import UNITS_DICT, convert_hk
from .lib.tmtc_def import tmtc_dtype
from .lv0_lib import dump_hkt, dump_science, read_lv0_data
from .version import pyspex_version

if TYPE_CHECKING:
    from dataclasses import dataclass

# - global parameters -----------------------
module_logger = logging.getLogger('pyspex.tlm')

TSTAMP_MIN = 1561939200           # 2019-07-01T00:00:00+00:00
TSTAMP_TYPE = np.dtype(
    [('tai_sec', int), ('sub_sec', int), ('dt', 'O')])

DET_CONSTS = {
    'dimRow': 2048,
    'dimColumn': 2048,
    'dimFullFrame': 2048 * 2048,
    'DEM_frequency': 10,            # [MHz]
    'FTI_science': 1000 / 15,       # [ms]
    'FTI_diagnostic': 240.,         # [ms]
    'FTI_margin': 212.4,            # [ms]
    'overheadTime': 0.4644,         # [ms]
    'FOT_length': 20
}
FULLFRAME_BYTES = 2 * DET_CONSTS['dimFullFrame']


# - helper functions ------------------------
def subsec2musec(sub_sec: int) -> int:
    """Return subsec as microseconds."""
    return 100 * int(sub_sec / 65536 * 10000)


def __exposure_time__(science: np.ndarray) -> np.ndarray:
    """Return exposure time [ms]."""
    return 129e-4 * (0.43 * science['DET_FOTLEN'] + science['DET_EXPTIME'])


def __frame_period__(science: np.ndarray) -> np.ndarray:
    """Return frame period of detector measurement [ms]."""
    n_science = 1 if isinstance(science, np.void) else len(science)
    n_coad = science['REG_NCOADDFRAMES']
    # binning mode
    if science['REG_FULL_FRAME'] == 2:
        return np.full(n_science, n_coad * DET_CONSTS['FTI_science'])

    # full-frame mode
    return n_coad * np.clip(DET_CONSTS['FTI_margin']
                            + DET_CONSTS['overheadTime']
                            + __exposure_time__(science),
                            a_min=DET_CONSTS['FTI_diagnostic'], a_max=None)


def __readout_offset__(science: np.ndarray) -> float:
    """Return offset wrt start-of-integration [ms]."""
    n_coad = science['REG_NCOADDFRAMES']
    n_frm = n_coad + 3 if science['IMRLEN'] == FULLFRAME_BYTES \
        else 2 * n_coad + 2

    return n_frm * __frame_period__(science)[0]


def __binning_table__(science: np.ndarray) -> np.ndarray:
    """Return binning table identifier (zero for full-frame images)."""
    bin_tbl = np.zeros(len(science), dtype='i1')
    _mm = science['IMRLEN'] == FULLFRAME_BYTES
    if np.sum(_mm) == len(science):
        return bin_tbl

    bin_tbl_start = science['REG_BINNING_TABLE_START']
    bin_tbl[~_mm] = 1 + (bin_tbl_start[~_mm] - 0x80000000) // 0x400000
    return bin_tbl


def __digital_offset__(science: np.ndarray) -> np.ndarray:
    """Return digital offset including ADC offset [count]."""
    buff = science['DET_OFFSET'].astype('i4')
    buff[buff >= 8192] -= 16384

    return buff + 70


def extract_l0_hk(ccsds_hk: tuple, epoch: dt.datetime) -> dict | None:
    """Return dictionary with NomHk telemetry data."""
    if not ccsds_hk:
        return None

    hdr = np.empty(len(ccsds_hk),
                   dtype=ccsds_hk[0]['hdr'].dtype)
    tlm = np.empty(len(ccsds_hk), dtype=tmtc_dtype(0x320))
    tstamp = []
    ii = 0
    for buf in ccsds_hk:
        hdr[ii] = buf['hdr']
        ccsds_hdr = CCSDShdr(buf['hdr'])
        if ccsds_hdr.apid != 0x320 or buf['hdr']['tai_sec'] < len(ccsds_hk):
            continue

        tlm[ii] = buf['hk']
        tstamp.append(epoch + dt.timedelta(
            seconds=int(buf['hdr']['tai_sec']),
            microseconds=subsec2musec(buf['hdr']['sub_sec'])))
        ii += 1

    # These values are originally stored in little-endian, but
    # Numpy does not accepts a mix of little & big-endian values
    # in a structured array.
    tlm['HTR1_CALCPVAL'][:] = tlm['HTR1_CALCPVAL'].byteswap()
    tlm['HTR2_CALCPVAL'][:] = tlm['HTR2_CALCPVAL'].byteswap()
    tlm['HTR3_CALCPVAL'][:] = tlm['HTR3_CALCPVAL'].byteswap()
    tlm['HTR4_CALCPVAL'][:] = tlm['HTR4_CALCPVAL'].byteswap()
    tlm['HTR1_CALCIVAL'][:] = tlm['HTR1_CALCIVAL'].byteswap()
    tlm['HTR2_CALCIVAL'][:] = tlm['HTR2_CALCIVAL'].byteswap()
    tlm['HTR3_CALCIVAL'][:] = tlm['HTR3_CALCIVAL'].byteswap()
    tlm['HTR4_CALCIVAL'][:] = tlm['HTR4_CALCIVAL'].byteswap()

    return {'hdr': hdr[:ii],
            'tlm': tlm[:ii],
            'tstamp': np.array(tstamp)}


def extract_l0_sci(ccsds_sci: tuple, epoch: dt.datetime) -> dict | None:
    """Return dictionary with Science telemetry data."""
    if not ccsds_sci:
        return None

    n_frames = 0
    hdr_dtype = None
    hk_dtype = None
    found_start_first = False
    for buf in ccsds_sci:
        ccsds_hdr = CCSDShdr(buf['hdr'][0])
        if ccsds_hdr.grouping_flag == 1:
            found_start_first = True
            if n_frames == 0:
                hdr_dtype = buf['hdr'].dtype
                hk_dtype = buf['hk'].dtype
                continue

        if not found_start_first:
            continue

        if ccsds_hdr.grouping_flag == 2:
            found_start_first = False
            n_frames += 1

    # do we have any complete detector images (Note ccsds_sci not empty!)?
    if n_frames == 0:
        module_logger.warning('no valid Science package found')
        return None

    # allocate memory
    hdr_arr = np.empty(n_frames, dtype=hdr_dtype)
    tlm_arr = np.empty(n_frames, dtype=hk_dtype)
    tstamp = np.empty(n_frames, dtype=TSTAMP_TYPE)
    images = ()

    # extract data from ccsds_sci
    ii = 0
    img = None
    found_start_first = False
    for buf in ccsds_sci:
        ccsds_hdr = CCSDShdr(buf['hdr'][0])
        if ccsds_hdr.grouping_flag == 1:
            found_start_first = True
            hdr_arr[ii] = buf['hdr']
            tlm_arr[ii] = buf['hk']
            tstamp[ii] = (buf['icu_tm']['tai_sec'][0],
                          buf['icu_tm']['sub_sec'][0],
                          epoch + dt.timedelta(
                              seconds=int(buf['icu_tm']['tai_sec'][0]),
                              microseconds=subsec2musec(
                                  buf['icu_tm']['sub_sec'][0])))
            img = (buf['frame'][0],)
            continue

        if not found_start_first:
            continue

        if ccsds_hdr.grouping_flag == 0:
            img += (buf['frame'][0],)
        elif ccsds_hdr.grouping_flag == 2:
            found_start_first = False
            img += (buf['frame'][0],)
            images += (np.concatenate(img),)
            ii += 1
            if ii == n_frames:
                break

    return {'hdr': hdr_arr,
            'tlm': tlm_arr,
            'tstamp': tstamp,
            'images': images}


def add_hkt_navigation(l1a_file: Path, hkt_list: list[Path]) -> int:
    """Add PACE navigation information from PACE_HKT products.

    Parameters
    ----------
    l1a_file :  Path
       name of an existing L1A product.
    hkt_list :  list[Path]
       listing of files from which the navigation data has to be read
    """
    # read PACE navigation data from HKT files.
    xds_nav = read_hkt_nav(hkt_list)
    # add PACE navigation data to existing Level-1A product.
    xds_nav.to_netcdf(l1a_file, group='navigation_data', mode='a')
    # check time coverage of navigation data.
    return check_coverage_nav(l1a_file, xds_nav)


def add_proc_conf(l1a_file: Path, yaml_conf: Path) -> None:
    """Add dataset 'processor_configuration' to an existing L1A product.

    Parameters
    ----------
    l1a_file :  Path
       name of an existing L1A product.
    yaml_conf :  Path
       name of the YAML file with the processor settings
    """
    with Dataset(l1a_file, 'r+') as fid:
        dset = fid.createVariable('processor_configuration', str)
        dset.comment = ('Configuration parameters used during'
                        ' the processor run that produced this file.')
        dset.markup_language = 'YAML'
        dset[0] = ''.join(
            [s for s in yaml_conf.open(encoding='ascii').readlines()
             if not (s == '\n' or s.startswith('#'))])


# - class SPXtlm ----------------------------
[docs] class SPXtlm: """Access/convert parameters of SPEXone Science telemetry data. Notes ----- This class has the following methods:: - set_coverage(coverage: tuple[datetime, datetime] | None) -> None - hk_hdr() -> np.ndarray | None - hk_tlm() -> np.ndarray | None - hk_tstamp() -> np.ndarray | None - sci_hdr() -> np.ndarray | None - sci_tlm() -> np.ndarray | None - sci_tstamp() -> np.ndarray | None - images() -> tuple | None - reference_date() -> datetime - time_coverage_start() -> datetime - time_coverage_end() -> datetime - from_hkt(flnames: Path | list[Path], *, instrument: str | None = None, dump: bool = False) -> None - from_lv0(flnames: Path | list[Path], *, file_format: str, tlm_type: str | None = None, debug: bool = False, dump: bool = False) -> None - from_l1a(flname: Path, *, tlm_type: str | None = None) -> None - set_selection(mode: str) -> None - gen_l1a(config: dataclass, mode: str) -> None - convert(key: str, tm_type: str = 'both') -> np.ndarray - units(key: str) -> str """ def __init__(self: SPXtlm) -> None: """Initialize class SPXtlm.""" self.logger = logging.getLogger(__name__) self.file_list: list | None = None self._coverage: tuple[dt.datetime, dt.datetime] | None = None self._hk = None self._sci = None self._selection = None
[docs] def set_coverage(self: SPXtlm, coverage: tuple[dt.datetime, dt.datetime] | None) -> None: """Store or update the class attribute `coverage`.""" if coverage is None: self._coverage = None elif self._coverage is None: self._coverage = coverage else: self._coverage = (min(self._coverage[0], coverage[0]), max(self._coverage[1], coverage[1]))
@property def hk_hdr(self: SPXtlm) -> np.ndarray | None: """Return CCSDS header data of telemetry packages @1Hz.""" if self._hk is None: return None if self._selection is None or self._selection: return self._hk['hdr'] return self._hk['hdr'][self._selection['hk_mask']] @property def hk_tlm(self: SPXtlm) -> np.ndarray | None: """Return telemetry packages @1Hz.""" if self._hk is None: return None if self._selection is None: return self._hk['tlm'] return self._hk['tlm'][self._selection['hk_mask']] @property def hk_tstamp(self: SPXtlm) -> np.ndarray | None: """Return timestamps of telemetry packages @1Hz.""" if self._hk is None: return None if self._selection is None: return self._hk['tstamp'] return self._hk['tstamp'][self._selection['hk_mask']] @property def sci_hdr(self: SPXtlm) -> np.ndarray | None: """Return CCSDS header data of Science telemetry packages.""" if self._sci is None: return None if self._selection is None: return self._sci['hdr'] return self._sci['hdr'][self._selection['sci_mask']] @property def sci_tlm(self: SPXtlm) -> np.ndarray | None: """Return Science telemetry packages.""" if self._sci is None: return None if self._selection is None: return self._sci['tlm'] return self._sci['tlm'][self._selection['sci_mask']] @property def sci_tstamp(self: SPXtlm) -> np.ndarray | None: """Return timestamps of Science telemetry packages.""" if self._sci is None: return None if self._selection is None: return self._sci['tstamp'] return self._sci['tstamp'][self._selection['sci_mask']] @property def images(self: SPXtlm) -> tuple[np.ndarray, ...] | None: """Return image-frames of Science telemetry packages.""" if self._sci is None or 'images' not in self._sci: return None if self._selection is None: return self._sci['images'] images = () for ii, img in enumerate(self._sci['images']): if self._selection['sci_mask'][ii]: images += (img,) return images def __get_valid_tstamps(self: SPXtlm) -> np.ndarray | None: """Return valid timestamps from Science or NomHk packages.""" if self.sci_tstamp is None \ or np.all(self.sci_tstamp['tai_sec'] < TSTAMP_MIN): indx = self.hk_tstamp > dt.datetime( 2020, 1, 1, 1, tzinfo=dt.timezone.utc) return self.hk_tstamp[indx] if indx.size > 0 else None indx = np.where(self.sci_tstamp['tai_sec'] > TSTAMP_MIN)[0] return self.sci_tstamp['dt'][indx] if indx.size > 0 else None @property def reference_date(self: SPXtlm) -> dt.datetime: """Return date of reference day (tzone aware).""" tstamp = self.__get_valid_tstamps() if tstamp is None: raise ValueError('no valid timestamps found') return dt.datetime.combine( tstamp[0].date(), dt.time(0), tstamp[0].tzinfo) @property def time_coverage_start(self: SPXtlm) -> dt.datetime: """Return a string for the time_coverage_start.""" if self._coverage is not None: return self._coverage[0] tstamp = self.__get_valid_tstamps() if tstamp is None: raise ValueError('no valid timestamps found') return tstamp[0] @property def time_coverage_end(self: SPXtlm) -> dt.datetime: """Return a string for the time_coverage_end.""" if self._coverage is not None: return self._coverage[1] tstamp = self.__get_valid_tstamps() if tstamp is None: raise ValueError('no valid timestamps found') frame_period = __frame_period__(self.sci_tlm[-1:])[0] \ if self.sci_tlm is not None else 1. return tstamp[-1] + dt.timedelta(milliseconds=frame_period)
[docs] def from_hkt(self: SPXtlm, flnames: Path | list[Path], *, instrument: str | None = None, dump: bool = False) -> None: """Read telemetry data from a PACE HKT product. Parameters ---------- flnames : Path | list[Path] list of PACE_HKT filenames (netCDF4 format) instrument : {'spx', 'sc', 'oci', 'harp'}, optional dump : bool, default=False dump header information of the telemetry packages @1Hz for debugging purposes """ if isinstance(flnames, Path): flnames = [flnames] if instrument is None: instrument = 'spx' elif instrument not in ['spx', 'sc', 'oci', 'harp']: raise KeyError("instrument not in ['spx', 'sc', 'oci', 'harp']") self.file_list = flnames ccsds_hk: tuple[np.ndarray] | tuple = () for name in flnames: hkt = HKTio(name) self.set_coverage(hkt.coverage()) ccsds_hk += hkt.housekeeping(instrument) if not ccsds_hk: return if dump: dump_hkt(flnames[0].stem + '_hkt.dump', ccsds_hk) epoch = dt.datetime(1958, 1, 1, tzinfo=dt.timezone.utc) ii = len(ccsds_hk) // 2 leap_sec = get_leap_seconds(ccsds_hk[ii]['hdr']['tai_sec'][0]) epoch -= dt.timedelta(seconds=leap_sec) self._hk = extract_l0_hk(ccsds_hk, epoch)
[docs] def from_lv0(self: SPXtlm, flnames: Path | list[Path], *, file_format: str, tlm_type: str | None = None, debug: bool = False, dump: bool = False) -> None: """Read telemetry data from SPEXone Level-0 product. Parameters ---------- flnames : Path | list[Path] list of CCSDS filenames file_format : {'raw', 'st3', 'dsb'} type of CCSDS data tlm_type : {'hk', 'sci', 'all'}, optional select type of telemetry packages. Note that we allways read the complete Level-0 producs. debug : bool, default=False run in debug mode, read only packages heades dump : bool, default=False dump header information of the telemetry packages @1Hz for debugging purposes """ if isinstance(flnames, Path): flnames = [flnames] if tlm_type is None: tlm_type = 'all' elif tlm_type not in ['hk', 'sci', 'all']: raise KeyError("tlm_type not in ['hk', 'sci', 'all']") if file_format not in ['raw', 'st3', 'dsb']: raise KeyError("file_format not in ['raw', 'st3', 'dsb']") self.file_list = flnames self._hk = None self._sci = None ccsds_sci, ccsds_hk = read_lv0_data(flnames, file_format, debug=debug) if dump: dump_hkt(flnames[0].stem + '_hkt.dump', ccsds_hk) dump_science(flnames[0].stem + '_sci.dump', ccsds_sci) if debug or dump: return # set epoch if file_format == 'dsb': epoch = dt.datetime(1958, 1, 1, tzinfo=dt.timezone.utc) ii = len(ccsds_hk) // 2 leap_sec = get_leap_seconds(ccsds_hk[ii]['hdr']['tai_sec'][0]) epoch -= dt.timedelta(seconds=leap_sec) else: epoch = dt.datetime(1970, 1, 1, tzinfo=dt.timezone.utc) # collect Science telemetry data if tlm_type != 'hk': self._sci = extract_l0_sci(ccsds_sci, epoch) del ccsds_sci # collected NomHk telemetry data if tlm_type != 'sci': self._hk = extract_l0_hk(ccsds_hk, epoch)
[docs] def from_l1a(self: SPXtlm, flname: Path, *, tlm_type: str | None = None) -> None: """Read telemetry data from SPEXone Level-1A product. Parameters ---------- flname : Path name of one SPEXone Level-1A product tlm_type : {'hk', 'sci', 'all'}, optional select type of telemetry packages """ if tlm_type is None: tlm_type = 'all' elif tlm_type not in ['hk', 'sci', 'all']: raise KeyError("tlm_type not in ['hk', 'sci', 'all']") self.file_list = [flname] self._hk = None self._sci = None with h5py.File(flname) as fid: if tlm_type != 'hk': dset = fid['/image_attributes/icu_time_sec'] seconds = dset[:] try: # pylint: disable=no-member _ = dset.attrs['units'].index(b'1958') except ValueError: epoch = dt.datetime(1970, 1, 1, tzinfo=dt.timezone.utc) else: epoch = dt.datetime(1958, 1, 1, tzinfo=dt.timezone.utc) epoch -= dt.timedelta(seconds=get_leap_seconds(seconds[0])) subsec = fid['/image_attributes/icu_time_subsec'][:] self._sci = { 'tlm': fid['/science_data/detector_telemetry'][:], 'images': fid['/science_data/detector_images'][:], 'tstamp': np.empty(len(seconds), dtype=TSTAMP_TYPE) } self._sci['tstamp']['tai_sec'] = seconds self._sci['tstamp']['sub_sec'] = subsec _dt = [] for ii, sec in enumerate(seconds): _dt.append(epoch + dt.timedelta( seconds=int(sec), milliseconds=-__readout_offset__(self._sci['tlm'][0]), microseconds=subsec2musec(subsec[ii]))) self._sci['tstamp']['dt'] = _dt if tlm_type != 'sci': self._hk = { 'tlm': fid['/engineering_data/NomHK_telemetry'][:], 'tstamp': [] } dset = fid['/engineering_data/HK_tlm_time'] # pylint: disable=no-member ref_date = dset.attrs['units'].decode()[14:] + '+00:00' epoch = dt.datetime.fromisoformat(ref_date) for sec in dset[:]: self._hk['tstamp'].append(epoch + dt.timedelta(seconds=sec))
[docs] def set_selection(self: SPXtlm, mode: str) -> None: """Obtain image and housekeeping dimensions. Parameters ---------- mode : {'full', 'binned', 'all'} """ self._selection = None if mode == 'full': sci_mask = [] if self.sci_tlm is None else \ self.sci_tlm['IMRLEN'] == FULLFRAME_BYTES if np.sum(sci_mask) == 0: return mps_list = np.unique(self.sci_tlm['MPS_ID'][sci_mask]) self.logger.debug('unique Diagnostic MPS: %s', mps_list) hk_mask = np.in1d(self.hk_tlm['MPS_ID'], mps_list) self._selection = { 'sci_mask': sci_mask, 'hk_mask': hk_mask, 'dims': { 'number_of_images': np.sum(sci_mask), 'samples_per_image': DET_CONSTS['dimFullFrame'], 'hk_packets': np.sum(hk_mask)} } return if mode == 'binned': sci_mask = [] if self.sci_tlm is None else \ self.sci_tlm['IMRLEN'] < FULLFRAME_BYTES if np.sum(sci_mask) == 0: return mps_list = np.unique(self.sci_tlm['MPS_ID'][sci_mask]) self.logger.debug('unique Science MPS: %s', mps_list) hk_mask = np.in1d(self.hk_tlm['MPS_ID'], mps_list) self._selection = { 'sci_mask': sci_mask, 'hk_mask': hk_mask, 'dims': { 'number_of_images': np.sum(sci_mask), 'samples_per_image': np.max( [len(self.images[ii]) for ii in sci_mask.nonzero()[0]]), 'hk_packets': np.sum(hk_mask)} } return if mode == 'all': nr_hk = 0 if self.hk_hdr is None else len(self.hk_hdr) nr_sci = 0 if self.sci_hdr is None else len(self.sci_hdr) self._selection = { 'hk_mask': np.full(nr_hk, True), 'sci_mask': np.full(nr_sci, True), 'dims': { 'number_of_images': nr_sci, 'samples_per_image': DET_CONSTS['dimRow'] if nr_sci == 0 else np.max([len(x) for x in self.images]), 'hk_packets': nr_hk} }
[docs] def l1a_file(self: SPXtlm, config: dataclass, mode: str) -> Path: """Return filename of Level-1A product. Parameters ---------- config : dataclass Settings for the L0->l1A processing. mode : {'all', 'full', 'binned'} Select Science packages with full-frame image or binned images Returns ------- Path Filename of Level-1A product. Notes ----- === Inflight === L1A file name format, following the NASA ... naming convention: PACE_SPEXONE[_TTT].YYYYMMDDTHHMMSS.L1A[.Vnn].nc where TTT is an optional data type (e.g., for the calibration data files) YYYYMMDDTHHMMSS is time stamp of the first image in the file Vnn file-version number (omitted when nn=1) for example (file-version=1): [Science Product] PACE_SPEXONE.20230115T123456.L1A.nc [Calibration Product] PACE_SPEXONE_CAL.20230115T123456.L1A.nc [Dark science Product] PACE_SPEXONE_DARK.20230115T123456.L1A.nc === OCAL === L1A file name format: SPX1_OCAL_<msm_id>[_YYYYMMDDTHHMMSS]_L1A_vvvvvvv.nc where msm_id is the measurement identifier YYYYMMDDTHHMMSS is time stamp of the first image in the file vvvvvvv is the git-hash string of the pyspex repository """ if config.outfile: return config.outdir / config.outfile if config.l0_format != 'raw': if config.eclipse is None: subtype = '_OCAL' elif not config.eclipse: subtype = '' else: subtype = '_CAL' if mode == 'full' else '_DARK' prod_ver = '' if config.file_version == 1 \ else f'.V{config.file_version:02d}' return config.outdir / ( f'PACE_SPEXONE{subtype}' f'.{self.time_coverage_start.strftime("%Y%m%dT%H%M%S"):15s}' f'.L1A{prod_ver}.nc') # OCAL product name # determine measurement identifier msm_id = config.l0_list[0].stem try: new_date = dt.datetime.strptime( msm_id[-22:], '%y-%j-%H:%M:%S.%f').strftime('%Y%m%dT%H%M%S.%f') except ValueError: pass else: msm_id = msm_id[:-22] + new_date return (config.outdir / f'SPX1_OCAL_{msm_id}_L1A_{pyspex_version(githash=True)}.nc')
[docs] def gen_l1a(self: SPXtlm, config: dataclass, mode: str) -> None: """Generate a SPEXone Level-1A product.""" self.set_selection(mode) if self._selection is None: return l1a_file = self.l1a_file(config, mode) ref_date = self.reference_date with L1Aio(l1a_file, ref_date, self._selection['dims'], compression=config.compression) as l1a: l1a.fill_global_attrs(inflight=config.l0_format != 'raw') if self.hk_tlm is not None: l1a.set_attr('icu_sw_version', f'0x{self.hk_tlm["ICUSWVER"][0]:x}') l1a.set_attr('time_coverage_start', self.time_coverage_start.isoformat( timespec='milliseconds')) l1a.set_attr('time_coverage_end', self.time_coverage_end.isoformat( timespec='milliseconds')) l1a.set_attr('input_files', [x.name for x in config.l0_list]) self.logger.debug('(1) initialized Level-1A product') self._fill_engineering(l1a) self.logger.debug('(2) added engineering data') self._fill_science(l1a) self.logger.debug('(3) added science data') self._fill_image_attrs(l1a, config.l0_format) self.logger.debug('(4) added image attributes') # add processor_configuration if config.yaml_fl: add_proc_conf(l1a_file, config.yaml_fl) # add PACE navigation information from HKT products if config.hkt_list: status_ok = add_hkt_navigation(l1a_file, config.hkt_list) self.logger.debug('(5) added PACE navigation data') if not status_ok: raise UserWarning( 'time-coverage of navigation data is too short') self.logger.info('successfully generated: %s', l1a_file.name)
def _fill_engineering(self: SPXtlm, l1a: h5py.File) -> None: """Fill datasets in group '/engineering_data'.""" if self.hk_tlm is None: return l1a.set_dset('/engineering_data/NomHK_telemetry', self.hk_tlm) ref_date = self.reference_date l1a.set_dset('/engineering_data/HK_tlm_time', [(x - ref_date).total_seconds() for x in self.hk_tstamp]) l1a.set_dset('/engineering_data/temp_detector', self.convert('TS1_DEM_N_T', tm_type='hk')) l1a.set_dset('/engineering_data/temp_housing', self.convert('TS2_HOUSING_N_T', tm_type='hk')) l1a.set_dset('/engineering_data/temp_radiator', self.convert('TS3_RADIATOR_N_T', tm_type='hk')) def _fill_science(self: SPXtlm, l1a: h5py.File) -> None: """Fill datasets in group '/science_data'.""" if self.sci_tlm is None: return img_sz = [img.size for img in self.images] if len(np.unique(img_sz)) != 1: images = np.zeros((len(img_sz), np.max(img_sz)), dtype='u2') for ii, img in enumerate(self.images): images[ii, :len(img)] = img else: images = np.vstack(self.images) l1a.set_dset('/science_data/detector_images', images) l1a.set_dset('/science_data/detector_telemetry', self.sci_tlm) def _fill_image_attrs(self: SPXtlm, l1a: h5py.File, lv0_format: str) -> None: """Fill datasets in group '/image_attributes'.""" if self.sci_tlm is None: return l1a.set_dset('/image_attributes/icu_time_sec', self.sci_tstamp['tai_sec']) # modify attribute units for non-DSB products if lv0_format != 'dsb': # timestamp of 2020-01-01T00:00:00+00:00 l1a.set_attr('valid_min', np.uint32(1577836800), ds_name='/image_attributes/icu_time_sec') # timestamp of 2024-01-01T00:00:00+00:00 l1a.set_attr('valid_max', np.uint32(1704067200), ds_name='/image_attributes/icu_time_sec') l1a.set_attr('units', 'seconds since 1970-01-01 00:00:00', ds_name='/image_attributes/icu_time_sec') l1a.set_dset('/image_attributes/icu_time_subsec', self.sci_tstamp['sub_sec']) ref_date = self.reference_date l1a.set_dset('/image_attributes/image_time', [(x - ref_date).total_seconds() for x in self.sci_tstamp['dt']]) l1a.set_dset('/image_attributes/image_ID', np.bitwise_and(self.sci_hdr['sequence'], 0x3fff)) l1a.set_dset('/image_attributes/binning_table', __binning_table__(self.sci_tlm)) l1a.set_dset('/image_attributes/digital_offset', __digital_offset__(self.sci_tlm)) l1a.set_dset('/image_attributes/exposure_time', __exposure_time__(self.sci_tlm) / 1000) l1a.set_dset('/image_attributes/nr_coadditions', self.sci_tlm['REG_NCOADDFRAMES'])
[docs] def convert(self: SPXtlm, key: str, tm_type: str = 'both') -> np.ndarray: """Convert telemetry parameter to physical units. Parameters ---------- key : str Name of telemetry parameter tm_type : {'hk', 'sci', 'both'}, default 'both' Default is to check if key is present in sci_tlm else hk_tlm Returns ------- np.ndarray """ if tm_type == 'hk': tlm = self.hk_tlm elif tm_type == 'sci': tlm = self.sci_tlm else: tlm = self.sci_tlm if self.sci_tlm is not None else self.hk_tlm if key.upper() not in tlm.dtype.names: raise KeyError(f'Parameter: {key.upper()} not found' f' in {tlm.dtype.names}') raw_data = np.array([x[key.upper()] for x in tlm]) return convert_hk(key.upper(), raw_data)
[docs] @staticmethod def units(key: str) -> str: """Obtain units of converted telemetry parameter. Parameters ---------- key : str Name of telemetry parameter Returns ------- str """ return UNITS_DICT.get(key, '1')