Source code for pyspex.ccsds_io

#
# This file is part of pyspex
#
# https://github.com/rmvanhees/pyspex.git
#
# Copyright (c) 2019-2023 SRON - Netherlands Institute for Space Research
#    All Rights Reserved
#
# License:  BSD-3-Clause
"""Contains the class `CCSDSio` to read SPEXone telemetry packets."""

from __future__ import annotations

__all__ = ['CCSDSio', 'img_sec_of_day', 'hk_sec_of_day']

from datetime import datetime, timedelta, timezone
from pathlib import Path
from typing import Any

import numpy as np

from .lib.tmtc_def import tmtc_dtype

# - global parameters ------------------------------
FULLFRAME_BYTES = 2 * 2048 * 2048

# Define parameters of Primary header
#  - Packet type     (3 bits): Version No.
#                              Indicates this is a CCSDS version 1 packet
#                     (1 bit): Type indicator
#                              Indicates this is a telemetry packet
#                     (1 bit): Secondary flag
#                              Indicate presence of Secondary header
#                   (11 bits): ApID
#                              SPEXone ApID [0x320 - 0x351] or 2047
#
#  - Packet Sequence (2 bits): Grouping flag
#                              00 continuation packet-data segment
#                              01 first packet-data segment
#                              10 last packet-data segment
#                              11 packet-data unsegmented
#                   (14 bits): Counter per ApID, rollover to 0 at 0x3FFF
#  - Packet length  (16 bits): size of packet data in bytes (always odd)
#                              (secondary header + User data) - 1
#  - Packet timestamp:         Secondary header
#                   (32 bits): seconds (1-1-1970 UTC)
#                   (16 bits): sub-seconds (1/2 ** 16)

HDR_DTYPE = np.dtype([
    ('type', '>u2'),
    ('sequence', '>u2'),
    ('length', '>u2'),
    ('tai_sec', '>u4'),
    ('sub_sec', '>u2')
])

TIME_DTYPE = np.dtype([
    ('tai_sec', '>u4'),
    ('sub_sec', '>u2')
])

SCIHK_DTYPE = tmtc_dtype(0x350)

# - Error messages ------------------------
MSG_SKIP_FRAME = "[WARNING]: rejected a frame because it's incomplete"
MSG_INVALID_APID = \
    '[WARNING]: found one or more telemetry packages with an invalid APID'
MSG_CORRUPT_APID = 'corrupted segments - detected APID 1 after <> 2'
MSG_CORRUPT_FRAME = 'corrupted segments - previous frame not closed'


# - functions -----------------------------
[docs] def img_sec_of_day(img_sec: np.ndarray, img_subsec: np.ndarray, img_hk: np.ndarray) -> tuple[datetime, float | Any]: """ Convert Image CCSDS timestamp to seconds after midnight. Parameters ---------- img_sec : numpy array (dtype='u4') Seconds since 1970-01-01 (or 1958-01-01) img_subsec : numpy array (dtype='u2') Sub-seconds as (1 / 2**16) seconds img_hk : numpy array DemHK telemetry packages Returns ------- tuple reference day: datetime, sec_of_day: numpy.ndarray """ # determine for the first timestamp the offset with last midnight [seconds] epoch = datetime(1970, 1, 1, tzinfo=timezone.utc) tstamp0 = epoch + timedelta(seconds=int(img_sec[0])) ref_day = datetime(year=tstamp0.year, month=tstamp0.month, day=tstamp0.day, tzinfo=timezone.utc) # seconds since midnight offs_sec = (ref_day - epoch).total_seconds() # Determine offset wrt start-of-integration (IMRO + 1) # Where the default is defined as IMRO: # [full-frame] COADDD + 2 (no typo, this is valid for the later MPS's) # [binned] 2 * COADD + 1 (always valid) offs_msec = 0 if img_hk['ICUSWVER'][0] > 0x123: imro = np.empty(img_hk.size, dtype=float) _mm = img_hk['IMRLEN'] == FULLFRAME_BYTES imro[_mm] = img_hk['REG_NCOADDFRAMES'][_mm] + 2 imro[~_mm] = 2 * img_hk['REG_NCOADDFRAMES'][~_mm] + 1 offs_msec = img_hk['FTI'] * (imro + 1) / 10 # return seconds since midnight return ref_day, img_sec - offs_sec + img_subsec / 65536 - offs_msec / 1000
[docs] def hk_sec_of_day(ccsds_sec: np.ndarray, ccsds_subsec: np.ndarray, ref_day: datetime | None = None) -> np.ndarray: """ Convert CCSDS timestamp to seconds after midnight. Parameters ---------- ccsds_sec : numpy array (dtype='u4') Seconds since 1970-01-01 (or 1958-01-01) ccsds_subsec : numpy array (dtype='u2') Sub-seconds as (1 / 2**16) seconds ref_day : datetime.datetime, optional Returns ------- numpy.ndarray with sec_of_day """ # determine for the first timestamp the offset with last midnight [seconds] epoch = datetime(1970, 1, 1, tzinfo=timezone.utc) if ref_day is None: tstamp0 = epoch + timedelta(seconds=int(ccsds_sec[0])) ref_day = datetime(year=tstamp0.year, month=tstamp0.month, day=tstamp0.day, tzinfo=timezone.utc) offs_sec = (ref_day - epoch).total_seconds() # return seconds since midnight return ccsds_sec - offs_sec + ccsds_subsec / 65536
# - class CCSDSio -------------------------
[docs] class CCSDSio: """Read SPEXone telemetry packets. Parameters ---------- file_list: iterator to strings list of file-names, where each file contains parts of a measurement Notes ----- The formats of the PACE telemetry packets are following the standards: CCSDS-131.0-B-3, CCSDS-132.0-B-2 and CCSDS-133.0-B-1. This module is currenty restriced to telementry packets with APID: 0x350 (Science), 0x320 (NomHK) and 0x322 (DemHK). A telemtry packet consist of a PRIMARY HEADER, SECONDARY HEADER (consist of a timestamp) and USER DATA with the actual telemetry packet data. Doc: TMTC handbook (SPX1-TN-005), issue 12, 15-May-2020 The files with science and telemetry data needs to be in chronological order. However, you may mix science and housekeeping data as long as science data are chronological and housekeeping data are chronological. Examples -------- >>> packets = () >>> with CCSDSio(['file1', 'file2', 'file3']) as ccsds: >>> while True: >>> # read one telemetry packet at a time >>> packet = ccsds.read_packet() >>> if packet is None: >>> # now we have read all files >>> break >>> >>> packets += (packet[0],) >>> >>> # combine segmented Science packages >>> science_tm = ccsds.science_tm(packets) >>> # now you may want to collect the engineering packages """ def __init__(self: CCSDSio, file_list: list) -> None: """Initialize access to a SPEXone Level-0 product (CCSDS format).""" # initialize class attributes self.file_list = iter(file_list) self.found_invalid_apid = False self.__hdr = None self.fp = None if file_list: self.open_next_file() def __iter__(self: CCSDSio) -> None: """Allow iteration.""" for attr in sorted(self.__dict__): if not attr.startswith('__'): yield attr def __enter__(self: CCSDSio) -> CCSDSio: """Initiate the context manager.""" return self def __exit__(self: CCSDSio, *args: str) -> bool: """Exit the context manager.""" self.close() return False # any exception is raised by the with statement.
[docs] def close(self: CCSDSio) -> None: """Close resources.""" if self.fp is not None: if self.found_invalid_apid: print(MSG_INVALID_APID) self.found_invalid_apid = False self.fp.close()
# ---------- define some class properties ---------- @property def version_no(self: CCSDSio) -> int | None: """Returns CCSDS version number.""" if self.__hdr is None: return None return (self.__hdr['type'] >> 13) & 0x7 @property def type_indicator(self: CCSDSio) -> int | None: """Returns type of telemetry packet.""" if self.__hdr is None: return None return (self.__hdr['type'] >> 12) & 0x1 @property def secnd_hdr_flag(self: CCSDSio) -> bool | None: """Returns flag indicating presence of a secondary header.""" if self.__hdr is None: return None return (self.__hdr['type'] >> 11) & 0x1 @property def ap_id(self: CCSDSio) -> int | None: """Returns SPEXone ApID.""" if self.__hdr is None: return None return self.__hdr['type'] & 0x7FF @property def grouping_flag(self: CCSDSio) -> int | None: """Returns grouping flag. The meaning of the grouping flag values are:: 00 continuation packet-data segment 01 first packet-data segment 10 last packet-data segment 11 packet-data unsegmented """ if self.__hdr is None: return None return (self.__hdr['sequence'] >> 14) & 0x3 @property def sequence_count(self: CCSDSio) -> int | None: """Returns sequence counter, rollover to zero at 0x3FFF.""" if self.__hdr is None: return None return self.__hdr['sequence'] & 0x3FFF @property def packet_length(self: CCSDSio) -> int | None: """Returns size of packet data in bytes. Value equals secondary header + user data (always odd) """ if self.__hdr is None: return None return self.__hdr['length'] # ---------- define empty telemetry packet ----------
[docs] def open_next_file(self: CCSDSio) -> None: """Open next file from file_list.""" flname = next(self.file_list) if not Path(flname).is_file(): raise FileNotFoundError(f'{flname} does not exist') self.close() # pylint: disable=consider-using-with self.fp = open(flname, 'rb') # noqa: SIM115
[docs] @staticmethod def fix_dem_hk24(dem_hk: np.ndarray) -> np.ndarray: """Correct 32-bit values in the DemHk. Which originate from 24-bit values of the detector register parameters. Parameters ---------- dem_hk : numpy.ndarray SPEXone DEM housekeeping packages Returns ------- numpy.ndarray SPEXone DEM housekeeping packages """ for key in ['DET_EXPTIME', 'DET_EXPSTEP', 'DET_KP1', 'DET_KP2', 'DET_EXPTIME2', 'DET_EXPSTEP2']: dem_hk[key] = dem_hk[key] >> 8 return dem_hk
[docs] @staticmethod def fix_sci_hk24(sci_hk: np.ndarray) -> np.ndarray: """Correct 32-bit values in the Science HK. Which originate from 24-bit values in the detector register parameters. In addition:: - copy the first 4 bytes of DET_CHENA to DET_ILVDS - parameter 'REG_BINNING_TABLE_START' was writen in little-endian Parameters ---------- sci_hk : numpy.ndarray SPEXone Science telemetry packages Returns ------- numpy.ndarray SPEXone Science telemetry packages """ if np.all(sci_hk['ICUSWVER'] < 0x129): key = 'REG_BINNING_TABLE_START' sci_hk[key] = np.ndarray(shape=sci_hk.shape, dtype='<u4', buffer=sci_hk[key]) sci_hk['DET_ILVDS'] = sci_hk['DET_CHENA'] & 0xf for key in ['TS1_DEM_N_T', 'TS2_HOUSING_N_T', 'TS3_RADIATOR_N_T', 'TS4_DEM_R_T', 'TS5_HOUSING_R_T', 'TS6_RADIATOR_R_T', 'LED1_ANODE_V', 'LED1_CATH_V', 'LED1_I', 'LED2_ANODE_V', 'LED2_CATH_V', 'LED2_I', 'ADC1_VCC', 'ADC1_REF', 'ADC1_T', 'ADC2_VCC', 'ADC2_REF', 'ADC2_T', 'DET_EXPTIME', 'DET_EXPSTEP', 'DET_KP1', 'DET_KP2', 'DET_EXPTIME2', 'DET_EXPSTEP2', 'DET_CHENA']: sci_hk[key] = sci_hk[key] >> 8 return sci_hk
def __rd_science(self: CCSDSio, hdr: np.ndarray) -> np.ndarray: """Read Science telemetry packet. Parameters ---------- hdr : numpy.ndarray CCSDS header information Returns ------- numpy.ndarray SPEXone Science telemetry packages """ num_bytes = self.packet_length - TIME_DTYPE.itemsize + 1 packet = np.empty(1, dtype=np.dtype([ ('packet_header', HDR_DTYPE), ('science_hk', SCIHK_DTYPE), ('icu_time', TIME_DTYPE), ('image_data', 'O')])) packet['packet_header'] = hdr # first segment or unsegmented data packet provides Science_HK if self.grouping_flag in (1, 3): packet['science_hk'] = self.fix_sci_hk24( np.fromfile(self.fp, count=1, dtype=SCIHK_DTYPE)) num_bytes -= SCIHK_DTYPE.itemsize packet['icu_time'] = np.fromfile(self.fp, count=1, dtype=TIME_DTYPE) num_bytes -= TIME_DTYPE.itemsize # read detector image data packet['image_data'][0] = np.fromfile(self.fp, dtype='>u2', count=num_bytes // 2) return packet def __rd_nomhk(self: CCSDSio, hdr: np.ndarray) -> np.ndarray: """Read NomHK telemetry packet. Parameters ---------- hdr : numpy.ndarray CCSDS header information Returns ------- numpy.ndarray SPEXone Nominal housekeeping packages """ packet = np.empty(1, dtype=np.dtype([ ('packet_header', HDR_DTYPE), ('nominal_hk', tmtc_dtype(0x320))])) packet['packet_header'] = hdr packet['nominal_hk'] = np.fromfile(self.fp, count=1, dtype=tmtc_dtype(0x320)) return packet def __rd_demhk(self: CCSDSio, hdr: np.ndarray) -> np.ndarray: """Read DemHK telemetry packet. Parameters ---------- hdr : numpy.ndarray CCSDS header information Returns ------- numpy.ndarray SPEXone detector housekeeping packages """ packet = np.empty(1, dtype=np.dtype([ ('packet_header', HDR_DTYPE), ('detector_hk', tmtc_dtype(0x322))])) packet['packet_header'] = hdr packet['detector_hk'] = self.fix_dem_hk24( np.fromfile(self.fp, count=1, dtype=tmtc_dtype(0x322))) return packet def __rd_tc_accept(self: CCSDSio, _: np.ndarray) -> np.ndarray: """Read/dump TcAccept packet.""" self.fp.seek(-1 * HDR_DTYPE.itemsize, 1) packet = np.fromfile(self.fp, count=1, dtype=np.dtype([ ('packet_header', HDR_DTYPE), ('TcPacketId', '>u2'), ('TcSeqControl', '>u2')])) print('[TcAccept]: ', self, packet['TcPacketId'][0], packet['TcSeqControl'][0]) return packet def __rd_tc_execute(self: CCSDSio, _: np.ndarray) -> np.ndarray: """Read/dump TcExecute packet.""" self.fp.seek(-1 * HDR_DTYPE.itemsize, 1) packet = np.fromfile(self.fp, count=1, dtype=np.dtype([ ('packet_header', HDR_DTYPE), ('TcPacketId', '>u2'), ('TcSeqControl', '>u2')])) print('[TcExecute]:', self, packet['TcPacketId'][0], packet['TcSeqControl'][0]) return packet def __rd_tc_fail(self: CCSDSio, _: np.ndarray) -> np.ndarray: """Read/dump TcFail packet.""" self.fp.seek(-1 * HDR_DTYPE.itemsize, 1) packet = np.fromfile(self.fp, count=1, dtype=np.dtype([ ('packet_header', HDR_DTYPE), ('TcPacketId', '>u2'), ('TcSeqControl', '>u2'), ('TcFailCode', '>u2'), ('FailParameter1', '>u2'), ('FailParameter2', '>u2')])) print('[TcFail]: ', self, packet['TcPacketId'][0], packet['TcSeqControl'][0], bin(packet['TcFailCode'][0]), packet['FailParameter1'][0], packet['FailParameter2'][0] ) return packet def __rd_tc_reject(self: CCSDSio, _: np.ndarray) -> np.ndarray: """Read/dump TcReject packet.""" self.fp.seek(-1 * HDR_DTYPE.itemsize, 1) packet = np.fromfile(self.fp, count=1, dtype=np.dtype([ ('packet_header', HDR_DTYPE), ('TcPacketId', '>u2'), ('TcSeqControl', '>u2'), ('TcRejectCode', '>u2'), ('RejectParameter1', '>u2'), ('RejectParameter2', '>u2')])) print('[TcReject]: ', self, packet['TcPacketId'][0], packet['TcSeqControl'][0], bin(packet['TcRejectCode'][0]), packet['RejectParameter1'][0], packet['RejectParameter2'][0]) return packet def __rd_other(self: CCSDSio, hdr: np.ndarray) -> np.ndarray | None: """Read other telemetry packet.""" num_bytes = self.packet_length - TIME_DTYPE.itemsize + 1 if not 0x320 <= self.ap_id <= 0x350: self.found_invalid_apid = True self.fp.seek(num_bytes, 1) return None packet = np.empty(1, dtype=np.dtype([ ('packet_header', HDR_DTYPE), ('raw_data', 'u1', num_bytes)])) packet['packet_header'] = hdr packet['raw_data'] = np.fromfile(self.fp, count=num_bytes, dtype='u1') return packet
[docs] def read_packet(self: CCSDSio) -> np.ndarray | None: """Read next telemetry packet. Returns ------- numpy.ndarray CCSDS packet data """ if self.fp is None: return None # read primary/secondary header hdr = np.fromfile(self.fp, count=1, dtype=HDR_DTYPE) if hdr.size == 0: try: self.open_next_file() except StopIteration: return None except FileNotFoundError: return None hdr = np.fromfile(self.fp, count=1, dtype=HDR_DTYPE) if hdr.size == 0: return None # save packet header as class attribute self.__hdr = hdr[0] # read telemetry packet rd_func = {0x350: self.__rd_science, 0x320: self.__rd_nomhk, 0x322: self.__rd_demhk, 0x331: self.__rd_tc_accept, 0x332: self.__rd_tc_reject, 0x333: self.__rd_tc_execute, 0x334: self.__rd_tc_fail}.get(self.ap_id, self.__rd_other) return rd_func(hdr)
[docs] @staticmethod def select_tm(packets_in: tuple, ap_id: int) -> tuple: """Select telemetry packages on SPEXone ApID. Parameters ---------- packets_in: tuple SPEXone telemetry packages ap_id: int SPEXone ApID Returns ------- tuple selected telemetry packages """ packets = () for packet in packets_in: if 'packet_header' not in packet.dtype.names: continue if (packet['packet_header']['type'] & 0x7FF) == ap_id: packets += (packet,) return packets
[docs] def science_tm(self: CCSDSio, packets_in: tuple) -> tuple: """Combine segmented Science telemetry packages. Parameters ---------- packets_in: tuple Science or house-keeping telemetry packages Returns ------- tuple unsegmented Science telemetry packages """ # reject non-Science telemetry packages packets = self.select_tm(packets_in, 0x350) if not packets: return () # check if grouping_flag of first segment equals 1 # else reject all segments with grouping_flag != 1 self.__hdr = packets[0]['packet_header'] if self.grouping_flag != 1: ii = 0 for packet in packets: self.__hdr = packet['packet_header'] if self.grouping_flag == 1: break ii += 1 print(f'[WARNING]: first frame incomplete - skipped {ii} segments') packets = packets[ii:] if not packets: return () # check if grouping_flag of last segment equals 2 # else reject all segments after the last segment # with grouping_flag == 2 self.__hdr = packets[-1]['packet_header'] if self.grouping_flag != 2: ii = 0 for packet in packets: self.__hdr = packet['packet_header'] if self.grouping_flag == 2: break ii += 1 print(f'[WARNING]: last frame incomplete - rejected {ii} segments') packets = packets[:-ii] if not packets: return () res = () offs = 0 prev_grp_flag = 2 for packet in packets: grouping_flag = (packet['packet_header']['sequence'] >> 14) & 0x3 # print(prev_grp_flag, grouping_flag, len(res), offs, # packet['image_data'].size) # handle segmented data if grouping_flag == 1: # first segment # group_flag of previous package should be 2 if prev_grp_flag != 2: if packet['image_data'].size in (3853, 7853): print(MSG_SKIP_FRAME) prev_grp_flag = 2 offs = 0 else: raise RuntimeError(MSG_CORRUPT_APID) img_size = packet['science_hk']['IMRLEN'] // 2 rec_buff = np.empty(1, dtype=np.dtype([ ('packet_header', HDR_DTYPE), ('science_hk', SCIHK_DTYPE), ('icu_time', TIME_DTYPE), ('image_data', 'O')]))[0] img_buff = np.empty(img_size, dtype='u2') rec_buff['packet_header'] = packet['packet_header'] rec_buff['science_hk'] = packet['science_hk'] rec_buff['icu_time'] = packet['icu_time'] img_buff[offs:offs + packet['image_data'].size] = \ packet['image_data'] offs += packet['image_data'].size else: # continuation or last segment # group_flag of previous package should be 0 or 1 if prev_grp_flag == 2: raise RuntimeError(MSG_CORRUPT_FRAME) img_buff[offs:offs + packet['image_data'].size] = \ packet['image_data'] offs += packet['image_data'].size if grouping_flag == 2: if offs == img_size: rec_buff['image_data'] = img_buff res += (rec_buff,) else: print(MSG_SKIP_FRAME) offs = 0 # keep current group flag for next read prev_grp_flag = grouping_flag return res