Source code for pyspex.binning_tables

#
# This file is part of pyspex
#
# https://github.com/rmvanhees/pyspex.git
#
# Copyright (c) 2019-2023 SRON - Netherlands Institute for Space Research
#    All Rights Reserved
#
# License:  BSD-3-Clause
"""Tools to read or write definitions of SPEXone binning-tables."""

from __future__ import annotations

__all__ = ['BinningTables']

from datetime import datetime, timezone
from os import environ
from pathlib import Path

import numpy as np

# pylint: disable=no-name-in-module
from netCDF4 import Dataset

from .version import pyspex_version

# - global parameters ------------------------------
FILL_VALUE = 0xFFFFFFFF  # 0X7FFFFFFF


# - local functions --------------------------------
[docs] class BinningTables: """Class to handle SPEXone binning-table definitions. Parameters ---------- ckd_dir : Path Specify the name of directory with SPEXone binning-table files. Raises ------ FileNotFoundError Directory with SPEXone binning-table files does not exist. Notes ----- Syntax of the file name with SPEXone binning-tables: SPX1_CKD_BIN_TBL_<yyyymmddTHHMMSS>_<NNN>.nc where yyyymmddTHHMMSS defines the validity start (UTC) and NNN the release number of the file format. The binning tables as defined on-ground are supposed to be available during the whole mission at the same on-board memory location. Because these original binning tables are necessary for re-processing and may facilitate instrument performance monitoring. Therefore, it is preferred that a new binning table is added to the current set, without changing the validity start string. However, new binning-table file should be released in case any of the binning tables are overwritten. Examples -------- # create new file with binning-table definitions:: > bin_tbl = BinningTables() > bin_tbl.create_if_needed(validity_start) > bin_tbl.add_table(0, lineskip_arr, binning_table) > bin_tbl.add_table(1, lineskip_arr, binning_table) # add a new binning-table to an existing file:: > bin_tbl BinningTables() > bin_tbl.create_if_needed(validity_start) > bin_tbl.add_table(2, lineskip_arr, binning_table) # use binning-table '130' to unbin SPEXone detector data:: > bin_tbl BinningTables() > bin_tbl.search(coverage_start) > img = bin_tbl.unbin(130, img_binned) """ def __init__(self: BinningTables, ckd_dir: str | None = None) -> None: """Initialize class attributes.""" if ckd_dir is None: self.ckd_dir = Path('/nfs/SPEXone/share/ckd') if not self.ckd_dir.is_dir(): self.ckd_dir = Path(environ.get('CKD_DIR', '.')) else: self.ckd_dir = Path(ckd_dir) if not self.ckd_dir.is_dir(): raise FileNotFoundError('directory with SPEXone CKD does not exist') self.ckd_file = None
[docs] def create_if_needed(self: BinningTables, validity_start: str, release: int = 1) -> None: """Initialize CKD file for binning tables if not exist. Parameters ---------- validity_start: str Validity start of the CKD data, as ``yyyymmddTHHMMSS`` release : int, default=1 Release number, start at 1 """ self.ckd_file = f'SPX1_CKD_BIN_TBL_{validity_start}_{release:03d}.nc' if (self.ckd_dir / self.ckd_file).is_file(): return # initialize netCDF file with binning tables with Dataset(self.ckd_dir / self.ckd_file, 'w') as fid: fid.title = 'SPEXone Level-1 binning-tables' fid.Conventions = 'CF-1.6' fid.project = 'PACE Project' fid.instrument = 'SPEXone' fid.institution = 'SRON Netherlands Institute for Space Research' fid.processing_version = pyspex_version() fid.validity_start = validity_start + '+00:00' fid.release_number = np.uint16(release) fid.date_created = datetime.now(timezone.utc).isoformat( timespec='seconds') fid.createDimension('row', 1024) fid.createDimension('column', 1024)
[docs] def search(self: BinningTables, coverage_start: str | None = None) -> None: """Search CKD file with binning tables. Parameters ---------- coverage_start : str, default=None time_coverage_start or start of the measurement (UTC) Raises ------ FileNotFoundError No CKD with binning tables found """ ckd_files = list(Path(self.ckd_dir).glob('SPX1_CKD_BIN_TBL_*.nc')) if not ckd_files: raise FileNotFoundError('No CKD with binning tables found') ckd_files = [x.name for x in ckd_files] # use the latest version of the binning-table CKD if coverage_start is None: self.ckd_file = sorted(ckd_files)[-1] return # use binning-table CKD based on coverage_start coverage_date = datetime.fromisoformat(coverage_start) for ckd_fl in sorted(ckd_files, reverse=True): validity_date = datetime.strptime(ckd_fl.split('_')[4] + '+00:00', '%Y%m%dT%H%M%S%z') if validity_date < coverage_date: self.ckd_file = ckd_fl break else: raise FileNotFoundError('No valid CKD with binning tables found')
[docs] def add_table(self: BinningTables, table_id: int, lineskip_arr: np.ndarray, binning_table: np.ndarray) -> None: """Add a binning table definition to existing file. Parameters ---------- table_id : int Table identifier (integer between 1 and 255) lineskip_arr : ndarray Lineskip array definition binning_table : ndarray Binning table definition """ index, count = np.unique(binning_table[lineskip_arr == 1, :], return_counts=True) with Dataset(self.ckd_dir / self.ckd_file, 'r+') as fid: gid = fid.createGroup(f'/Table_{table_id:03d}') gid.tabel_id = table_id gid.REG_BINNING_TABLE_START = hex(0x80000000 + 0x400000 * (table_id - 1)) gid.enabled_lines = np.uint16(lineskip_arr.sum()) gid.flex_binned_pixels = np.uint32(index.max()+1) gid.date_created = datetime.now(timezone.utc).isoformat( timespec='seconds') dset = gid.createVariable('binning_table', 'u4', ('row', 'column'), fill_value=FILL_VALUE, chunksizes=(128, 128), zlib=True, complevel=1, shuffle=True) dset.long_name = 'binning table' dset.valid_min = np.uint32(0) dset.valid_max = np.uint32(index.max()) dset[:] = binning_table dset = gid.createVariable('lineskip_arr', 'u1', ('row',), zlib=True, complevel=1, shuffle=True) dset.long_name = 'lineskip array' dset.valid_min = np.uint8(0) dset.valid_max = np.uint8(1) dset[:] = lineskip_arr gid.createDimension('bins', count.size) dset = gid.createVariable('count_table', 'u2', ('bins',), zlib=True, complevel=1, shuffle=True) dset.long_name = 'number of aggregated pixel readings' dset.valid_min = np.uint16(0) dset.valid_max = np.uint16(count.max()) dset[:] = count.astype('u2')
[docs] def unbin(self: BinningTables, table_id: int, img_binned: np.ndarray) -> np.ndarray: """Return unbinned detector data. Parameters ---------- table_id : int Table identifier (integer between 1 and 255) img_binned : np.ndarray Binned image data (1D array) Returns ------- np.ndarray unbinned image data (no interpolation). """ with Dataset(self.ckd_dir / self.ckd_file, 'r') as fid: if f'Table_{table_id:03d}' not in fid.groups: raise KeyError(f'Table_{table_id:03d} not defined') gid = fid[f'Table_{table_id:03d}'] binning_table = gid.variables['binning_table'][:] lineskip_arr = gid.variables['lineskip_arr'][:] count_table = gid.variables['count_table'][:] revert = np.full(binning_table.shape, np.nan) table = binning_table[lineskip_arr == 1, :].reshape(-1) revert[lineskip_arr == 1, :] = \ (img_binned / count_table)[table].reshape(-1, 1024) return revert