#
# This file is part of pyspex
#
# https://github.com/rmvanhees/pyspex.git
#
# Copyright (c) 2019-2023 SRON - Netherlands Institute for Space Research
# All Rights Reserved
#
# License: BSD-3-Clause
"""Contains the class `DEMio` to read SPEXone CMV4000 detector data."""
from __future__ import annotations
__all__ = ['DEMio', 'img_sec_of_day']
from datetime import datetime, timedelta, timezone
from pathlib import Path
from typing import Any
import numpy as np
from .lib.tmtc_def import tmtc_dtype
# - global parameters ------------------------------
FULLFRAME_BYTES = 2 * 2048 * 2048
# - local functions --------------------------------
def det_dtype() -> np.dtype:
"""Return the registers of the SPEXone CMV4000 detector as a numpy dtype."""
return np.dtype([
('UNUSED_000', 'u1'),
('NUMBER_LINES', 'u1', (2,)),
('START1', 'u1', (2,)),
('START2', 'u1', (2,)),
('START3', 'u1', (2,)),
('START4', 'u1', (2,)),
('START5', 'u1', (2,)),
('START6', 'u1', (2,)),
('START7', 'u1', (2,)),
('START8', 'u1', (2,)),
('NUMBER_LINES1', 'u1', (2,)),
('NUMBER_LINES2', 'u1', (2,)),
('NUMBER_LINES3', 'u1', (2,)),
('NUMBER_LINES4', 'u1', (2,)),
('NUMBER_LINES5', 'u1', (2,)),
('NUMBER_LINES6', 'u1', (2,)),
('NUMBER_LINES7', 'u1', (2,)),
('NUMBER_LINES8', 'u1', (2,)),
('SUB_S', 'u1', (2,)),
('SUB_A', 'u1', (2,)),
('MONO', 'u1'), # 1 bits
('IMAGE_FLIPPING', 'u1'), # 2 bits
('INTE_SYNC', 'u1'), # 3 bits: Int_sync, Exp_dual, Exp_ext
('EXP_TIME', 'u1', (3,)),
('EXP_STEP', 'u1', (3,)),
('EXP_KP1', 'u1', (3,)),
('EXP_KP2', 'u1', (3,)),
('NR_SLOPES', 'u1'), # 2 bits
('EXP_SEQ', 'u1'),
('EXP_TIME2', 'u1', (3,)),
('EXP_STEP2', 'u1', (3,)),
('UNUSED_062', 'u1'),
('UNUSED_063', 'u1'),
('UNUSED_064', 'u1'),
('UNUSED_065', 'u1'),
('UNUSED_066', 'u1'),
('UNUSED_067', 'u1'),
('UNUSED_068', 'u1'),
('EXP2_SEQ', 'u1'),
('NUMBER_FRAMES', 'u1', (2,)),
('OUTPUT_MODE', 'u1'), # 2 bits
('FOT_LENGTH', 'u1'),
('I_LVDS_REC', 'u1'), # 4 bits
('UNUSED_075', 'u1'),
('UNUSED_076', 'u1'),
('COL_CALIB', 'u1'), # 2 bits: Col_calib, ADC_calib
('TRAINING_PATTERN', 'u1', (2,)), # 12 bits
('CHANNEL_EN', 'u1', (3,)), # 19 bits
('I_LVDS', 'u1'), # 4 bits
('I_COL', 'u1'), # 4 bits
('I_COL_PRECH', 'u1'), # 4 bits
('I_ADC', 'u1'), # 4 bits
('I_AMP', 'u1'), # 4 bits
('VTF_L1', 'u1'), # 7 bits
('VLOW2', 'u1'), # 7 bits
('VLOW3', 'u1'), # 7 bits
('VRES_LOW', 'u1'), # 7 bits
('UNUSED_092', 'u1'),
('UNUSED_093', 'u1'),
('V_PRECH', 'u1'), # 7 bits
('V_REF', 'u1'), # 7 bits
('UNUSED_096', 'u1'),
('UNUSED_097', 'u1'),
('VRAMP1', 'u1'), # 7 bits
('VRAMP2', 'u1'), # 7 bits
('OFFSET', 'u1', (2,)), # 14 bits
('PGA_GAIN', 'u1'), # 2 bits
('ADC_GAIN', 'u1'),
('UNUSED_104', 'u1'),
('UNUSED_105', 'u1'),
('UNUSED_106', 'u1'),
('UNUSED_107', 'u1'),
('T_DIG1', 'u1'), # 4 bits
('T_DIG2', 'u1'), # 4 bits
('UNUSED_110', 'u1'),
('BIT_MODE', 'u1'), # 1 bits
('ADC_RESOLUTION', 'u1'), # 2 bits
('PLL_ENABLE', 'u1'), # 1 bits
('PLL_IN_FRE', 'u1'), # 2 bits
('PLL_BYPASS', 'u1'), # 1 bits
('PLL_RANGE', 'u1'), # 8 bits: PLL range(1), out_fre(3), div(4)
('PLL_LOAD', 'u1'),
('DUMMY', 'u1'),
('UNUSED_119', 'u1'),
('UNUSED_120', 'u1'),
('BLACK_COL_EN', 'u1'), # 2 bits: Black_col_en, PGA_gain
('UNUSED_122', 'u1'),
('V_BLACKSUN', 'u1'), # 6 bits
('UNUSED_124', 'u1'),
('UNUSED_125', 'u1'),
('TEMP', 'u1', (2,))
])
[docs]
def img_sec_of_day(img_sec: np.ndarray, img_subsec: np.ndarray,
img_hk: np.ndarray) -> tuple[datetime, float | Any]:
"""Convert Image CCSDS timestamp to seconds after midnight.
Parameters
----------
img_sec : numpy array (dtype='u4')
Seconds since 1970-01-01 (or 1958-01-01)
img_subsec : numpy array (dtype='u2')
Sub-seconds as (1 / 2**16) seconds
img_hk : numpy array
DemHK telemetry packages
Returns
-------
tuple
reference day: datetime, sec_of_day: numpy.ndarray
"""
# determine for the first timestamp the offset with last midnight [seconds]
epoch = datetime(1970, 1, 1, tzinfo=timezone.utc)
tstamp0 = epoch + timedelta(seconds=int(img_sec[0]))
ref_day = datetime(year=tstamp0.year,
month=tstamp0.month,
day=tstamp0.day, tzinfo=timezone.utc)
# seconds since midnight
offs_sec = (ref_day - epoch).total_seconds()
# Determine offset wrt start-of-integration (IMRO + 1)
# Where the default is defined as IMRO:
# [full-frame] COADDD + 2 (no typo, this is valid for the later MPS's)
# [binned] 2 * COADD + 1 (always valid)
offs_msec = 0
if img_hk['ICUSWVER'][0] > 0x123:
imro = np.empty(img_hk.size, dtype=float)
_mm = img_hk['IMRLEN'] == FULLFRAME_BYTES
imro[_mm] = img_hk['REG_NCOADDFRAMES'][_mm] + 2
imro[~_mm] = 2 * img_hk['REG_NCOADDFRAMES'][~_mm] + 1
offs_msec = img_hk['FTI'] * (imro + 1) / 10
# return seconds since midnight
return ref_day, img_sec - offs_sec + img_subsec / 65536 - offs_msec / 1000
# - class DEMio -------------------------
[docs]
class DEMio:
"""Read SPEXone DEM output.
Parameters
----------
flname : str
filename with header or binary data of DEM measurement
Examples
--------
Read data from a SPEXone DEM binary file::
> dem = DEMio(dem_file)
> img_hk = dem.get_sci_hk()
> img_data = dem.get_data()
"""
def __init__(self: DEMio, flname: str) -> None:
"""Initialize DEMio object."""
self.__hdr = None
if flname.endswith('a.txt'):
self.bin_file = flname.replace('a.txt', 'b.bin')
self.hdr_file = flname
elif flname.endswith('b.bin'):
self.bin_file = flname
self.hdr_file = flname.replace('b.bin', 'a.txt')
else:
raise RuntimeError(f'invalid filename: {flname}')
if Path(self.hdr_file).is_file():
self.__get_hdr()
def __get_hdr(self: DEMio) -> None:
"""Read DEM header data."""
self.__hdr = np.zeros((1,), dtype=det_dtype())
with open(self.hdr_file, encoding='ascii', errors='ignore') as fp:
for line in fp:
columns = line[:-1].split(',')
if columns[0] == 'Reg':
continue
# Fix possible errors in Name
name = columns[2].replace(' [', '[')
name = name.replace('_ ', '_').replace(' ', '_')
value = int(columns[-1])
indx = -1
if columns[2].endswith(':0]') \
or columns[2].endswith('[0]') \
or columns[2].endswith('[2]'):
name = name.split('[')[0]
indx = 0
elif columns[2].endswith(':8]'):
name = name.split('[')[0]
indx = 1
elif columns[2].endswith(':16]'):
name = name.split('[')[0]
indx = 2
elif name == 'Unused':
if columns[0] == '86':
continue
name = f'Unused_{int(columns[0]):03d}'
key = name.upper()
if isinstance(self.__hdr[0][key], np.ndarray):
self.__hdr[0][key][indx] = value
else:
self.__hdr[0][key] = value
@property
def hdr(self: DEMio) -> np.ndarray | None:
"""Return DEM header as numpy compound array."""
if self.__hdr is None:
return None
return self.__hdr[0]
@property
def number_lines(self: DEMio) -> int:
"""Return number of lines (rows).
Register address: [1, 2]
"""
return (self.hdr['NUMBER_LINES'][0]
+ (self.hdr['NUMBER_LINES'][1] << 8))
@property
def number_channels(self: DEMio) -> int:
"""Return number of LVDS channels used."""
return 2 ** (4 - (self.hdr['OUTPUT_MODE'] & 0x3))
@property
def lvds_clock(self: DEMio) -> bool:
"""Return flag for LVDS clock (0: disable, 1: enable).
Register address: 82
"""
return ((self.hdr['PLL_ENABLE'] & 0x3) == 0
and (self.hdr['PLL_BYPASS'] & 0x3) != 0
and (self.hdr['CHANNEL_EN'][2] & 0x4) != 0)
[docs]
def pll_control(self: DEMio) -> tuple:
"""Return PLL control parameters: pll_range, pll_out_fre, pll_div.
PLL_range: range (0 or 1)
PLL_out_fre: output frequency (0, 1, 2 or 5)
PLL_div: 9 (10 bit) or 11 (12 bit)
Register address: 116
"""
pll_div = self.hdr['PLL_RANGE'] & 0xF # bit [0:4]
pll_out_fre = (self.hdr['PLL_RANGE'] >> 4) & 0x7 # bit [4:7]
pll_range = self.hdr['PLL_RANGE'] >> 7 # bit [7]
return pll_range, pll_out_fre, pll_div
@property
def exp_control(self: DEMio) -> tuple:
"""Exposure time control parameters: (inte_sync, exp_dual, exp_ext).
Register address: 41
"""
inte_sync = (self.hdr['INTE_SYNC'] >> 2) & 0x1
exp_dual = (self.hdr['INTE_SYNC'] >> 1) & 0x1
exp_ext = self.hdr['INTE_SYNC'] & 0x1
return inte_sync, exp_dual, exp_ext
@property
def offset(self: DEMio) -> int:
"""Return digital offset including ADC offset.
Register address: [100, 101]
"""
val = ((self.hdr['OFFSET'][1] << 8)
+ self.hdr['OFFSET'][0])
return 70 + (val if val < 8192 else val - 16384)
@property
def pga_gain(self: DEMio) -> float:
"""Return PGA gain (Volt).
Register address: 102
"""
reg_pgagain = self.hdr['PGA_GAIN']
# need first bit of address 121
reg_pgagainfactor = self.hdr['BLACK_COL_EN'] & 0x1
return (1 + 0.2 * reg_pgagain) * 2 ** reg_pgagainfactor
@property
def temp_detector(self: DEMio) -> int:
"""Return detector temperature as raw counts.
Notes
-----
Uncalibrated conversion: ((1184 - 1066) * 0.3 * 40 / 40Mhz) + offs [K]
"""
return (self.hdr['TEMP'][1] << 8) + self.hdr['TEMP'][0]
[docs]
def exp_time(self: DEMio, t_mcp: float = 1e-7) -> float:
"""Return pixel exposure time [s]."""
# Nominal fot_length = 20, except for very short exposure_time
reg_fot = self.hdr['FOT_LENGTH']
reg_exptime = ((self.hdr['EXP_TIME'][2] << 16)
+ (self.hdr['EXP_TIME'][1] << 8)
+ self.hdr['EXP_TIME'][0])
return 129 * t_mcp * (0.43 * reg_fot + reg_exptime)
[docs]
def fot_time(self: DEMio, t_mcp: float = 1e-7) -> float:
"""Return frame overhead time [s]."""
# Nominal fot_length = 20, except for very short exposure_time
reg_fot = self.hdr['FOT_LENGTH']
return 129 * t_mcp * (reg_fot + 2 * (16 // self.number_channels))
[docs]
def rot_time(self: DEMio, t_mcp: float = 1e-7) -> float:
"""Return image read-out time [s]."""
return 129 * t_mcp * (16 // self.number_channels) * self.number_lines
[docs]
def frame_period(self: DEMio, n_coad: int = 1) -> float:
"""Return frame period [s]."""
return 2.38 + (n_coad
* (self.exp_time() + self.fot_time() + self.rot_time()))
[docs]
def get_sci_hk(self: DEMio) -> np.ndarray:
"""Return Science telemetry.
A subset of MPS and housekeeping parameters.
Returns
-------
numpy.ndarray
"""
def convert_val(kk: str) -> int:
"""Convert byte array to integer."""
val = 0
for ii, bval in enumerate(self.__hdr[0][kk]):
val += bval << (ii * 8)
return val
# convert original detector parameter values to telemetry parameters
convert_det_params = {
'DET_NUMLINES': convert_val('NUMBER_LINES'),
'DET_START1': convert_val('START1'),
'DET_START2': convert_val('START2'),
'DET_START3': convert_val('START3'),
'DET_START4': convert_val('START4'),
'DET_START5': convert_val('START5'),
'DET_START6': convert_val('START6'),
'DET_START7': convert_val('START7'),
'DET_START8': convert_val('START8'),
'DET_NUMLINES1': convert_val('NUMBER_LINES1'),
'DET_NUMLINES2': convert_val('NUMBER_LINES2'),
'DET_NUMLINES3': convert_val('NUMBER_LINES3'),
'DET_NUMLINES4': convert_val('NUMBER_LINES4'),
'DET_NUMLINES5': convert_val('NUMBER_LINES5'),
'DET_NUMLINES6': convert_val('NUMBER_LINES6'),
'DET_NUMLINES7': convert_val('NUMBER_LINES7'),
'DET_NUMLINES8': convert_val('NUMBER_LINES8'),
'DET_SUBS': convert_val('SUB_S'),
'DET_SUBA': convert_val('SUB_A'),
'DET_MONO': self.__hdr[0]['MONO'],
'DET_IMFLIP': self.__hdr[0]['IMAGE_FLIPPING'],
'DET_EXPCNTR': self.__hdr[0]['INTE_SYNC'],
'DET_EXPTIME': convert_val('EXP_TIME'),
'DET_EXPSTEP': convert_val('EXP_STEP'),
'DET_KP1': convert_val('EXP_KP1'),
'DET_KP2': convert_val('EXP_KP2'),
'DET_NOFSLOPES': self.__hdr[0]['NR_SLOPES'],
'DET_EXPSEQ': self.__hdr[0]['EXP_SEQ'],
'DET_EXPTIME2': convert_val('EXP_TIME2'),
'DET_EXPSTEP2': convert_val('EXP_STEP2'),
'DET_EXP2_SEQ': self.__hdr[0]['EXP2_SEQ'],
'DET_NOFFRAMES': convert_val('NUMBER_FRAMES'),
'DET_OUTMODE': self.__hdr[0]['OUTPUT_MODE'],
'DET_FOTLEN': self.__hdr[0]['FOT_LENGTH'],
'DET_ILVDSRCVR': self.__hdr[0]['I_LVDS_REC'],
'DET_CALIB': self.__hdr[0]['COL_CALIB'],
'DET_TRAINPTRN': convert_val('TRAINING_PATTERN'),
'DET_CHENA': convert_val('CHANNEL_EN'),
'DET_ILVDS': self.__hdr[0]['I_LVDS'],
'DET_ICOL': self.__hdr[0]['I_COL'],
'DET_ICOLPR': self.__hdr[0]['I_COL_PRECH'],
'DET_IADC': self.__hdr[0]['I_ADC'],
'DET_IAMP': self.__hdr[0]['I_AMP'],
'DET_VTFL1': self.__hdr[0]['VTF_L1'],
'DET_VTFL2': self.__hdr[0]['VLOW2'],
'DET_VTFL3': self.__hdr[0]['VLOW3'],
'DET_VRSTL': self.__hdr[0]['VRES_LOW'],
'DET_VPRECH': self.__hdr[0]['V_PRECH'],
'DET_VREF': self.__hdr[0]['V_REF'],
'DET_VRAMP1': self.__hdr[0]['VRAMP1'],
'DET_VRAMP2': self.__hdr[0]['VRAMP2'],
'DET_OFFSET': convert_val('OFFSET'),
'DET_PGAGAIN': self.__hdr[0]['PGA_GAIN'],
'DET_ADCGAIN': self.__hdr[0]['ADC_GAIN'],
'DET_TDIG1': self.__hdr[0]['T_DIG1'],
'DET_TDIG2': self.__hdr[0]['T_DIG2'],
'DET_BITMODE': self.__hdr[0]['BIT_MODE'],
'DET_ADCRES': self.__hdr[0]['ADC_RESOLUTION'],
'DET_PLLENA': self.__hdr[0]['PLL_ENABLE'],
'DET_PLLINFRE': self.__hdr[0]['PLL_IN_FRE'],
'DET_PLLBYP': self.__hdr[0]['PLL_BYPASS'],
'DET_PLLRATE': self.__hdr[0]['PLL_RANGE'],
'DET_PLLLOAD': self.__hdr[0]['PLL_LOAD'],
'DET_DETDUM': self.__hdr[0]['DUMMY'],
'DET_BLACKCOL': self.__hdr[0]['BLACK_COL_EN'],
'DET_VBLACKSUN': self.__hdr[0]['V_BLACKSUN'],
'DET_T': convert_val('TEMP')
}
sci_hk = np.zeros((1,), dtype=tmtc_dtype(0x350))
sci_hk[0]['REG_FULL_FRAME'] = 1
sci_hk[0]['REG_CMV_OUTPUTMODE'] = 3
for key, value in convert_det_params.items():
sci_hk[0][key] = value
return sci_hk
[docs]
def get_data(self: DEMio, numlines: int = None) -> np.ndarray:
"""Return data of a detector frame (numpy uint16 array).
Parameters
----------
numlines : int, optional
Provide number of detector rows when no headerfile is present
Returns
-------
numpy.ndarray:
data of a detector frame, dtype np.uint16
"""
if numlines is None:
# obtain number of rows
numlines = self.number_lines
# Read binary big-endian data
return np.fromfile(self.bin_file, dtype='>u2').reshape(numlines, -1)