Source code for pyspex.egse_db

#
# This file is part of pyspex
#
# https://github.com/rmvanhees/pyspex.git
#
# Copyright (c) 2019-2023 SRON - Netherlands Institute for Space Research
#    All Rights Reserved
#
# License:  BSD-3-Clause
"""Collection of routines to access EGSE data generated by ITOS."""

from __future__ import annotations

__all__ = ['create_egse_db', 'add_egse_data']

from datetime import datetime, timedelta, timezone
from pathlib import Path
from typing import TYPE_CHECKING

import h5py
import numpy as np

# pylint: disable=no-name-in-module
from netCDF4 import Dataset

if TYPE_CHECKING:
    import argparse

# - global parameters ------------------------------
DB_EGSE = 'egse_db_itos.nc'

# enumerate source status
LDLS_DICT = {b'UNPLUGGED': 0, b'Controller Fault': 1, b'Idle': 2,
             b'Laser ON': 3, b'Lamp ON': 4, b'MISSING': 255}

# enumerate shutter positions
SHUTTER_DICT = {b'CLOSED': 0, b'OPEN': 1, b'PARTIAL': 255}


# - local functions --------------------------------
def init_gse_data(fid: h5py.File) -> h5py.Group:
    """Initialize the netCDF4 group 'gse_data' for EGSE/OGSE information."""
    # investigate filename
    parts = fid.product_name.split('_')
    if len(parts) > 2 and parts[0] == 'SPX1':
        parts = parts[2:]
    msmt_fields = parts[0].split('-')
    background = 'BKG' in msmt_fields
    act_angle = [float(x.replace('act', ''))
                 for x in parts if x.startswith('act')]
    alt_angle = [float(x.replace('alt', ''))
                 for x in parts if x.startswith('alt')]
    pol_angle = [float(x.replace('pol', ''))
                 for x in parts if x.startswith('pol') and x != 'polcal']
    gp1_angle = [float(x.replace('glass', ''))
                 for x in parts if x.startswith('glass')]
    gp1_offs = 5.634375
    # gp2_offs = 5.09625

    # determine viewport: default 0, when all viewports are illuminated
    if alt_angle:
        vp_angle = np.array([-50., -20, 0, 20, 50])
        vp_diff = np.abs(vp_angle - alt_angle[0])
        viewport = 0 if vp_diff.min() > 6 else 2 ** np.argmin(vp_diff)
    else:
        viewport = 0

    gid = fid.createGroup('/gse_data')
    dset = gid.createVariable('viewport', 'u1')
    dset.long_name = 'viewport status'
    dset.standard_name = 'status_flag'
    dset.valid_range = np.array([0, 16], dtype='u1')
    dset.flag_values = np.array([0, 1, 2, 4, 8, 16], dtype='u1')
    dset.flag_meanings = 'ALL -50deg -20deg 0deg +20deg +50deg'
    dset[:] = viewport

    # gid.FOV_begin = np.nan
    # gid.FOV_end = np.nan
    gid.ACT_rotationAngle = np.nan if not act_angle else act_angle[0]
    gid.ALT_rotationAngle = np.nan if not alt_angle else alt_angle[0]
    # gid.ACT_illumination = np.nan
    # gid.ALT_illumination = np.nan
    gid.AoLP = 0. if not pol_angle else pol_angle[0]
    if not background and msmt_fields[0] in ('POLARIZED', 'POLARIMETRIC'):
        gid.DoLP = 1.
        if gp1_angle:
            gid.GP1_angle = gp1_angle[0] + gp1_offs
    else:
        gid.DoLP = 0.

    return gid


def byte_to_timestamp(str_date: bytes) -> float:
    """Convert a byte-string to a timestamp."""
    buff = str_date.decode('ascii').strip() + '+00:00'     # date is in UTC
    return datetime.strptime(buff, '%Y%m%dT%H%M%S.%f%z').timestamp()


def egse_dtype() -> np.dtype:
    """Define numpy structured array to hold EGSE data."""
    return np.dtype([
        ('ITOS_time', 'f8'),
        ('NOMHK_packets_time', 'f8'),
        ('LDLS_STATUS', 'u1'),
        ('POLARIZER_MOVING', 'u1'),
        ('SHUTTER_STAGE_MOVING', 'u1'),
        ('STST_POLARIZER_MOVING', 'u1'),
        ('ALT_STAGE_MOVING', 'u1'),
        ('ACT_STAGE_MOVING', 'u1'),
        ('GP_0_MOVING', 'u1'),
        ('GP_1_MOVING', 'u1'),
        ('C_FLEX-405nm', 'u1'),
        ('C_FLEX-457nm', 'u1'),
        ('C_FLEX-515nm', 'u1'),
        ('C_FLEX-561nm', 'u1'),
        ('C_FLEX-660nm', 'u1'),
        ('COBOLT-785nm', 'u1'),
        ('CRYSTA_STATUS', 'u1'),   # should be C_FLEX-732nm!
        # 15 bytes, will be aligned at 16 or 18 bytes
        ('HK_TS1', 'u4'),
        ('HK_TS2', 'u4'),
        ('SURV_TST1', 'f4'),
        ('SURV_TST2', 'f4'),
        ('AI_02', 'f4'),
        ('AI_03', 'f4'),
        ('AI_04', 'f4'),
        ('AI_05', 'f4'),
        ('AI_06', 'f4'),
        ('AI_07', 'f4'),
        ('AI_08', 'f4'),
        ('AI_09', 'f4'),
        ('V_OUT_ICU', 'f4'),
        ('I_OUT_ICU', 'f4'),
        ('V_OUT_HTR', 'f4'),
        ('I_OUT_HTR', 'f4'),
        ('POLARIZER', 'f4'),
        ('SHUTTER_STAGE', 'f4'),
        ('STST_POLARIZER', 'f4'),
        ('ALT_ANGLE', 'f4'),
        ('ACT_ANGLE', 'f4'),
        ('GP_0_ANGLE', 'f4'),
        ('GP_1_ANGLE', 'f4')
    ])


def egse_units() -> tuple[str, ...]:
    """Define numpy structured array to hold EGSE data."""
    return ('s', 's', '1', '1', '1', '1', '1', '1', '1', '1', '1', '1', '1',
            '1', '1', '1', '1', 'Ohm', 'Ohm', 'V', 'V', 'V', 'V', 'V', 'V',
            'V', 'V', 'V', 'V', 'V', 'A', 'V', 'A', 'deg', 'deg', 'deg',
            'deg', 'deg', 'deg', 'deg')


def read_egse(egse_file: str,
              verbose: bool = False) -> tuple[np.ndarray, list[str, ...]]:
    """Read EGSE data (tab separated values) to numpy compound array."""
    with open(egse_file, encoding='ascii') as fid:
        line = None
        names = []
        units = []
        while not line:
            line = fid.readline().strip()
            fields = line.split('\t')
            for field in fields:
                if field == '':
                    continue
                res = field.strip().split(' [')
                names.append(res[0].replace(' nm', 'nm').replace(' ', '_'))
                if len(res) == 2:
                    units.append(res[1].replace('[', '').replace(']', ''))
                else:
                    units.append('1')

        if len(names) in (35, 36):
            # define dtype of the data
            formats = ('f8',) + 14 * ('f4',) + ('u1',) + 2 * ('i4',)\
                + ('f4', 'u1',) + 2 * ('u1',) + 3 * ('f4', 'u1',) + 7 * ('u1',)
        else:
            # define dtype of the data
            formats = ('f8',) + 14 * ('f4',) + ('u1',) + 2 * ('i4',)\
                + ('f4', 'u1',) + 2 * ('u1',) + 5 * ('f4', 'u1',) + 7 * ('u1',)
        usecols = list(range(len(names)))

        if 'NOMHK_packets_time' in names:
            formats = ('f8',) + formats
            convertors = {0: byte_to_timestamp,
                          1: byte_to_timestamp,
                          16: lambda s: LDLS_DICT.get(s.strip(), 255),
                          21: lambda s: SHUTTER_DICT.get(s.strip(), 255)}
        else:
            convertors = {0: byte_to_timestamp,
                          15: lambda s: LDLS_DICT.get(s.strip(), 255),
                          20: lambda s: SHUTTER_DICT.get(s.strip(), 255)}
        if verbose:
            print(len(names), names)
            print(len(units), units)
            print(len(formats), formats)

        if not len(names) == len(units) == len(formats):
            raise RuntimeError('Size of names, units or formats are not equal')

        data = np.loadtxt(fid, delimiter='\t',
                          converters=convertors, usecols=usecols,
                          dtype={'names': names, 'formats': formats})

    egse = np.empty(data.size, dtype=egse_dtype())
    egse['NOMHK_packets_time'][:] = np.nan
    egse['GP_0_ANGLE'][:] = np.nan
    egse['GP_1_ANGLE'][:] = np.nan
    egse['GP_0_MOVING'][:] = 255
    egse['GP_1_MOVING'][:] = 255
    for name in data.dtype.names:
        egse[name][:] = data[name][:]

    return egse, units


# ---------- CREATE EGSE DATABASE ----------
[docs] def create_egse_db(args: argparse.Namespace) -> None: """Write EGSE data to HDF5 database.""" egse = None for egse_file in args.file_list: try: res = read_egse(egse_file, verbose=args.verbose) except RuntimeError: return egse = res[0] if egse is None else np.concatenate((egse, res[0])) with Dataset(args.egse_dir / DB_EGSE, 'w', format='NETCDF4') as fid: fid.input_files = [Path(x).name for x in args.file_list] fid.creation_date = \ datetime.now(timezone.utc).isoformat(timespec='seconds') _ = fid.createEnumType('u1', 'ldls_t', {k.replace(b' ', b'_').upper(): v for k, v in LDLS_DICT.items()}) _ = fid.createEnumType('u1', 'shutter_t', {k.upper(): v for k, v in SHUTTER_DICT.items()}) _ = fid.createDimension('time', egse.size) dset = fid.createVariable('time', 'f8', ('time',), chunksizes=(256,)) time_key = 'ITOS_time' if 'ITOS_time' in egse.dtype.names else 'time' indx = np.argsort(egse[time_key]) dset[:] = egse[time_key][indx] egse_t = fid.createCompoundType(egse.dtype, 'egse_dtype') dset = fid.createVariable('egse', egse_t, ('time',), chunksizes=(64,)) dset.long_name = 'EGSE settings' dset.fields = np.array([np.string_(n) for n in egse.dtype.names]) dset.units = np.array([np.string_(n) for n in egse_units()]) dset.comment = ('DIG_IN_00 is of enumType ldls_t;' ' SHUTTER_STATUS is of enumType shutter_t') dset[:] = egse[indx]
# ----- SELECT OGSE DATA FROM DATABASE AND ADD TO L1A PRODUCT -----
[docs] def add_egse_data(args: argparse.Namespace) -> None: """Write EGSE records of a measurement to a level-1A product.""" # determine duration of the measurement (ITOS clock) with h5py.File(args.l1a_file, 'r') as fid: # pylint: disable=unsubscriptable-object res = fid.attrs['input_files'] if isinstance(res, bytes): input_file = Path(res.decode('ascii')).stem.rstrip('_hk') else: input_file = Path(res[0]).stem.rstrip('_hk') # pylint: disable=no-member msmt_start = datetime.fromisoformat( fid.attrs['time_coverage_start'].decode('ascii')) msmt_stop = datetime.fromisoformat( fid.attrs['time_coverage_end'].decode('ascii')) if args.verbose: print('L1A time-coverage:', fid.attrs['time_coverage_start'].decode('ascii'), fid.attrs['time_coverage_end'].decode('ascii')) duration = np.ceil((msmt_stop - msmt_start).total_seconds()) # use the timestamp in the filename to correct ICU time date_str = input_file.split('_')[-1] + '+00:00' msmt_start = datetime.strptime(date_str, '%Y%m%dT%H%M%S.%f%z') msmt_start = msmt_start.replace(microsecond=0) msmt_stop = msmt_start + timedelta(seconds=int(duration)) if args.verbose: print('Corrected time-coverage:', msmt_start.timestamp(), msmt_stop.timestamp()) # open EGSE database with Dataset(args.egse_dir / DB_EGSE, 'r') as fid: egse_time = fid['time'][:].data if args.verbose: print('EGSE time-coverage:', egse_time.min(), egse_time.max()) mask = ((egse_time >= msmt_start.timestamp()) & (egse_time <= msmt_stop.timestamp())) if mask.sum() == 0: raise RuntimeError('no EGSE data found') egse_time = egse_time[mask] egse_data = fid['egse'][mask] # update Level-1A product with EGSE information with Dataset(args.l1a_file, 'r+') as fid: gid = fid['/gse_data'] if fid.groups.get('/gse_data') else init_gse_data(fid) _ = gid.createEnumType('u1', 'ldls_t', {k.replace(b' ', b'_').upper(): v for k, v in LDLS_DICT.items()}) _ = gid.createEnumType('u1', 'shutter_t', {k.upper(): v for k, v in SHUTTER_DICT.items()}) _ = gid.createDimension('time', egse_data.size) dset = gid.createVariable('time', 'f8', ('time',)) dset[:] = egse_time egse_t = gid.createCompoundType(egse_data.dtype, 'egse_dtype') dset = gid.createVariable('egse', egse_t, ('time',)) dset.long_name = 'EGSE settings' dset.fields = np.array([np.string_(n) for n in egse_data.dtype.names]) dset.units = np.array([np.string_(n) for n in egse_units()]) dset.comment = ('DIG_IN_00 is of enumType ldls_t;' ' SHUTTER_STATUS is of enumType shutter_t') dset[:] = egse_data def smart_average(data: np.ndarray, thres_range: float = 0.1) -> float: val_range = np.abs(data.max() - data.min()) if val_range == 0: return data[0] if val_range < thres_range: return np.mean(data) return np.median(data) fid['/gse_data'].ACT_rotationAngle = \ smart_average(egse_data['ACT_ANGLE']) fid['/gse_data'].ALT_rotationAngle = \ smart_average(egse_data['ALT_ANGLE']) fid['/gse_data'].AoLP = smart_average(egse_data['POLARIZER'])