Source code for pyspex.hkt_io

#
# This file is part of pyspex
#
# https://github.com/rmvanhees/pyspex.git
#
# Copyright (c) 2019-2023 SRON - Netherlands Institute for Space Research
#    All Rights Reserved
#
# License:  BSD-3-Clause
"""Contains the class `HKTio` to read PACE HKT products."""

from __future__ import annotations

__all__ = ['HKTio', 'check_coverage_nav', 'read_hkt_nav']

import logging
from datetime import datetime, time, timedelta, timezone
from enum import IntFlag, auto
from typing import TYPE_CHECKING

import h5py
import numpy as np
import xarray as xr
from moniplot.image_to_xarray import h5_to_xr

# pylint: disable=no-name-in-module
from netCDF4 import Dataset

from .lib.ccsds_hdr import CCSDShdr
from .lib.leap_sec import get_leap_seconds

if TYPE_CHECKING:
    from pathlib import Path

# - global parameters -----------------------
module_logger = logging.getLogger('pyspex.hkt_io')

EPOCH = datetime(1958, 1, 1, tzinfo=timezone.utc)

# valid data coverage range
VALID_COVERAGE_MIN = datetime(2021, 1, 1, tzinfo=timezone.utc)
VALID_COVERAGE_MAX = datetime(2035, 1, 1, tzinfo=timezone.utc)

# expect the navigation data to extend at least 10 seconds
# w.r.t. time_coverage_start and time_coverage_end.
TIMEDELTA_MIN = timedelta(seconds=10)


class CoverageFlag(IntFlag):
    """Define flags for coverage_quality (navigation_data)."""

    GOOD = 0
    MISSING_SAMPLES = auto()
    TOO_SHORT_EXTENDS = auto()
    NO_EXTEND_AT_START = auto()
    NO_EXTEND_AT_END = auto()


# - high-level r/w functions ------------
[docs] def read_hkt_nav(hkt_list: list[Path, ...]) -> xr.Dataset: """Read navigation data from one or more HKT products. Parameters ---------- hkt_list : list[Path, ...] list of PACE-HKT products collocated with SPEXone measurements Returns ------- xr.Dataset xarray dataset with PACE navigation data """ dim_dict = {'att_': 'att_time', 'orb_': 'orb_time', 'tilt': 'tilt_time'} # concatenate DataArrays with navigation data res = {} rdate = None for name in sorted(hkt_list): hkt = HKTio(name) nav = hkt.navigation() if not res: rdate = hkt.reference_date res = nav.copy() continue dtime = None if rdate != hkt.reference_date: dtime = hkt.reference_date - rdate for key1, value in nav.items(): if not value: continue hdim = dim_dict.get(key1, None) if dtime is None: res[key1] = xr.concat((res[key1], value), dim=hdim) else: parm = key1 + '_time' if key1[-1] != '_' else key1 + 'time' val_new = value.assign_coords( {parm: value[parm] + dtime.total_seconds()}) res[key1] = xr.concat((res[key1], val_new), dim=hdim) # make sure that the data is sorted and unique dsets = () for key, coord in dim_dict.items(): if not res[key]: continue res[key] = res[key].sortby(coord) res[key] = res[key].drop_duplicates(dim=coord) dsets += (res[key],) # create Dataset from DataArrays xds_nav = xr.merge(dsets, combine_attrs='drop_conflicts') # remove confusing attributes from Dataset key_list = list(xds_nav.attrs) for key in key_list: del xds_nav.attrs[key] # add attribute 'reference_date' return xds_nav.assign_attrs({'reference_date': rdate.isoformat()})
[docs] def check_coverage_nav(l1a_file: Path, xds_nav: xr.Dataset) -> None: """Check time coverage of navigation data. Parameters ---------- l1a_file : Path name of the SPEXone level-1A product xds_nav : xr.Dataset xarray dataset with PACE navigation data """ coverage_quality = CoverageFlag.GOOD # obtain the reference date of the navigation data ref_date = datetime.fromisoformat(xds_nav.attrs['reference_date']) # obtain time_coverage_range from the Level-1A product with h5py.File(l1a_file) as fid: # pylint: disable=no-member val = fid.attrs['time_coverage_start'].decode() coverage_start = datetime.fromisoformat(val) val = fid.attrs['time_coverage_end'].decode() coverage_end = datetime.fromisoformat(val) module_logger.debug('SPEXone time-coverage: %s - %s', coverage_start, coverage_end) # check at the start of the data sec_of_day = xds_nav['att_time'].values[0] att_coverage_start = ref_date + timedelta(seconds=sec_of_day) module_logger.debug('PACE-HKT time-coverage-start: %s', att_coverage_start) if coverage_start - att_coverage_start < timedelta(0): coverage_quality |= CoverageFlag.NO_EXTEND_AT_START module_logger.error('time coverage of navigation data starts' ' after "time_coverage_start"') if coverage_start - att_coverage_start < TIMEDELTA_MIN: coverage_quality |= CoverageFlag.TOO_SHORT_EXTENDS module_logger.warning('time coverage of navigation data starts after' ' "time_coverage_start - %s"', TIMEDELTA_MIN) # check at the end of the data sec_of_day = xds_nav['att_time'].values[-1] att_coverage_end = ref_date + timedelta(seconds=sec_of_day) module_logger.debug('PACE-HKT time-coverage-end: %s', att_coverage_end) if att_coverage_end - coverage_end < timedelta(0): coverage_quality |= CoverageFlag.NO_EXTEND_AT_END module_logger.error('time coverage of navigation data ends' ' before "time_coverage_end"') if att_coverage_end - coverage_end < TIMEDELTA_MIN: coverage_quality |= CoverageFlag.TOO_SHORT_EXTENDS module_logger.warning('time coverage of navigation data ends before' ' "time_coverage_end + %s"', TIMEDELTA_MIN) # check for completeness dtime = (att_coverage_end - att_coverage_start).total_seconds() dim_expected = round(dtime / np.median(np.diff(xds_nav['att_time']))) module_logger.debug('expected navigation samples %d found %d', len(xds_nav['att_time']), dim_expected) if len(xds_nav['att_time']) / dim_expected < 0.95: coverage_quality |= CoverageFlag.MISSING_SAMPLES module_logger.warning('navigation data poorly sampled') # add coverage flag and attributes to Level-1A product with Dataset(l1a_file, 'a') as fid: gid = fid['/navigation_data'] gid.time_coverage_start = att_coverage_start.isoformat( timespec='milliseconds') gid.time_coverage_end = att_coverage_end.isoformat( timespec='milliseconds') dset = gid.createVariable('coverage_quality', 'u1', fill_value=255) dset[:] = coverage_quality dset.long_name = 'coverage quality of navigation data' dset.standard_name = 'status_flag' dset.valid_range = np.array([0, 15], dtype='u2') dset.flag_values = np.array([0, 1, 2, 4, 8], dtype='u2') dset.flag_meanings = ('good missing-samples too_short_extends' ' no_extend_at_start no_extend_at_end') # generate warning if time-coverage of navigation data is too short if coverage_quality & CoverageFlag.TOO_SHORT_EXTENDS: return False return True
# - class HKTio -------------------------
[docs] class HKTio: """Class to read housekeeping and navigation data from PACE-HKT products. Parameters ---------- filename : Path name of the PACE HKT product Notes ----- This class has the following methods:: - reference_date -> datetime - set_reference_date() - coverage() -> tuple[datetime, datetime] - housekeeping(instrument: str) -> tuple[np.ndarray, ...] - navigation() -> dict """ def __init__(self: HKTio, filename: Path) -> None: """Initialize access to a PACE HKT product.""" self._coverage = None self._reference_date = None self.filename = filename if not self.filename.is_file(): raise FileNotFoundError(f'file {filename} not found') self.set_reference_date() # ---------- PUBLIC FUNCTIONS ---------- @property def reference_date(self: HKTio) -> datetime: """Return reference date of all time_of_day variables.""" return self._reference_date
[docs] def set_reference_date(self: HKTio) -> None: """Set reference date of current PACE HKT product.""" ref_date = None with h5py.File(self.filename) as fid: grp = fid['navigation_data'] if 'att_time' in grp and 'units' in grp['att_time'].attrs: # pylint: disable=no-member words = grp['att_time'].attrs['units'].decode().split(' ') if len(words) > 2: # Note timezone 'Z' is only accepted by Python 3.11+ ref_date = datetime.fromisoformat(words[2] + 'T00:00:00+00:00') if ref_date is None: coverage = self.coverage() ref_date = datetime.combine(coverage[0].date(), time(0), tzinfo=timezone.utc) self._reference_date = ref_date
[docs] def coverage(self: HKTio) -> tuple[datetime, datetime]: """Return data coverage.""" one_day = timedelta(days=1) with h5py.File(self.filename) as fid: # pylint: disable=no-member # Note timezone 'Z' is only accepted by Python 3.11+ val = fid.attrs['time_coverage_start'].decode() coverage_start = datetime.fromisoformat(val.replace('Z', '+00:00')) val = fid.attrs['time_coverage_end'].decode() coverage_end = datetime.fromisoformat(val.replace('Z', '+00:00')) if abs(coverage_end - coverage_start) < one_day: return coverage_start, coverage_end # Oeps, now we have to check the timestamps of the measurement data hk_dset_names = ('HARP2_HKT_packets', 'OCI_HKT_packets', 'SPEXone_HKT_packets', 'SC_HKT_packets') tstamp_mn_list = [] tstamp_mx_list = [] with h5py.File(self.filename) as fid: for ds_set in hk_dset_names: dt_list = () if ds_set not in fid['housekeeping_data']: continue res = fid['housekeeping_data'][ds_set][:] for packet in res: try: ccsds_hdr = CCSDShdr() ccsds_hdr.read('raw', packet) except ValueError as exc: module_logger.warning( 'CCSDS header read error with "%s"', exc) break val = ccsds_hdr.tstamp(EPOCH) if (val > VALID_COVERAGE_MIN) & (val < VALID_COVERAGE_MAX): dt_list += (val,) if not dt_list: continue dt_arr = np.array(dt_list) ii = dt_arr.size // 2 leap_sec = get_leap_seconds(dt_arr[ii].timestamp(), epochyear=1970) dt_arr -= timedelta(seconds=leap_sec) mn_val = min(dt_arr) mx_val = max(dt_arr) if mx_val - mn_val > one_day: indx_close_to_mn = (dt_arr - mn_val) <= one_day indx_close_to_mx = (mx_val - dt_arr) <= one_day module_logger.warning('coverage_range: %s[%d] - %s[%d]', mn_val, np.sum(indx_close_to_mn), mx_val, np.sum(indx_close_to_mx)) if np.sum(indx_close_to_mn) > np.sum(indx_close_to_mx): mx_val = max(dt_arr[indx_close_to_mn]) else: mn_val = min(dt_arr[indx_close_to_mx]) tstamp_mn_list.append(mn_val) tstamp_mx_list.append(mx_val) if len(tstamp_mn_list) == 1: return tstamp_mn_list[0], tstamp_mx_list[0] return min(*tstamp_mn_list), max(*tstamp_mx_list)
[docs] def housekeeping(self: HKTio, instrument: str = 'spx') -> tuple[np.ndarray, ...]: """Get housekeeping telemetry data. Parameters ---------- instrument : {'spx', 'oci', 'harp', 'sc'}, default='spx' name of PACE instrument: 'harp': HARP2, 'oci': OCI, 'sc': spacecraft, 'spx': SPEXone. Notes ----- Current implementation only works for SPEXone. """ ds_set = {'spx': 'SPEXone_HKT_packets', 'sc': 'SC_HKT_packets', 'oci': 'OCI_HKT_packets', 'harp': 'HARP2_HKT_packets'}.get(instrument) with h5py.File(self.filename) as fid: if ds_set not in fid['housekeeping_data']: return () res = fid['housekeeping_data'][ds_set][:] ccsds_hk = () for packet in res: try: ccsds_hdr = CCSDShdr() ccsds_hdr.read('raw', packet) except ValueError as exc: module_logger.warning( 'CCSDS header read error with "%s"', exc) break try: dtype_apid = ccsds_hdr.data_dtype except ValueError: print(f'APID: 0x{ccsds_hdr.apid:x};' f' Packet Length: {ccsds_hdr.packet_size:d}') dtype_apid = None if dtype_apid is not None: # all valid APIDs buff = np.frombuffer(packet, count=1, offset=0, dtype=dtype_apid) ccsds_hk += (buff,) else: module_logger.warning( 'package with APID 0x%x and length %d is not implemented', ccsds_hdr.apid, ccsds_hdr.packet_size) return ccsds_hk
[docs] def navigation(self: HKTio) -> dict: """Get navigation data.""" res = {'att_': (), 'orb_': (), 'tilt': ()} with h5py.File(self.filename) as fid: gid = fid['navigation_data'] for key in gid: if key.startswith('att_'): res['att_'] += (h5_to_xr(gid[key]),) elif key.startswith('orb_'): res['orb_'] += (h5_to_xr(gid[key]),) elif key.startswith('tilt'): res['tilt'] += (h5_to_xr(gid[key]),) else: module_logger.warning('fail to find dataset %s', key) # repair the dimensions xds1 = xr.merge(res['att_'], combine_attrs='drop_conflicts') xds1 = xds1.set_coords(['att_time']) xds1 = xds1.swap_dims({'att_records': 'att_time'}) xds2 = xr.merge(res['orb_'], combine_attrs='drop_conflicts') xds2 = xds2.set_coords(['orb_time']) xds2 = xds2.swap_dims({'orb_records': 'orb_time'}) xds3 = () if res['tilt']: xds3 = xr.merge(res['tilt'], combine_attrs='drop_conflicts') xds3 = xds3.set_coords(['tilt_time']) xds3 = xds3.swap_dims({'tilt_records': 'tilt_time'}) return {'att_': xds1, 'orb_': xds2, 'tilt': xds3}