#
# This file is part of pyspex
#
# https://github.com/rmvanhees/pyspex.git
#
# Copyright (c) 2019-2023 SRON - Netherlands Institute for Space Research
# All Rights Reserved
#
# License: BSD-3-Clause
"""Collect OGSE data in a database.
This module contain routines to read reference diode measurements and
wavelength monitor data. These data are supposed to be written to a HDF5
database. From these database selected data can be added to a SPEXone Level-1a
product.
"""
from __future__ import annotations
__all__ = ['read_ref_diode', 'read_wav_mon',
'add_ogse_ref_diode', 'add_ogse_wav_mon']
from datetime import datetime
from io import StringIO
from pathlib import Path
from typing import Any
import h5py
import numpy as np
from xarray import DataArray, Dataset, open_dataset
# - global parameters ------------------------------
# ---------- CREATE OGSE DATABASES ----------
[docs]
def read_ref_diode(ogse_dir: Path, file_list: list,
verbose: bool = False) -> Dataset:
"""Read reference diode data into a xarray.Dataset.
(input: comma separated values).
"""
data = None
unit_dict = {'Unix Time (s)': ('seconds', 'f8'),
'Excel Time (d)': ('days', 'f4'),
'Normalized Time (s)': ('seconds', 'f8'),
'amps': ('A', 'f4'), 'scaled': ('1', 'f4'),
'last_zero': ('seconds', 'f8'), 'lamp_on': ('flag', 'b'),
'voltage': ('V', 'f4'), 'current': ('A', 'f4')}
fields_to_skip = ('', 'averaging', 'lamp_on_counter', 'record_timestamp',
'scale', 'tempC')
names = []
units = []
for flname in file_list:
formats = []
usecols = ()
all_valid_lines = ''
with open(ogse_dir / flname, encoding='ascii') as fid:
while True:
line = fid.readline().strip()
if not line.startswith('Unix'):
continue
fields = line.split(',')
for ii, field in enumerate(fields):
if field in fields_to_skip:
continue
res = field.strip().split(' (')
res[0] = res[0].replace(' ', '_')
if res[0] in names:
continue
names.append(res[0])
units.append(unit_dict[field][0])
formats.append(unit_dict[field][1])
usecols += (ii,)
break
if verbose:
print(len(names), names)
print(len(units), units)
print(len(formats), formats)
print(len(usecols), usecols)
# return (data, units)
while True:
line = fid.readline()
if not line:
break
if ',,,,' not in line:
all_valid_lines += line.replace(',,', ',0,')
# read data (skip first line)
res = np.loadtxt(StringIO(all_valid_lines), delimiter=',',
skiprows=1, usecols=usecols,
converters={0: lambda s: float(s) - 3600},
dtype={'names': names, 'formats': formats})
data = res if data is None else np.concatenate((data, res))
if verbose:
print('data :', len(data))
time_key = 'Unix_Time'
res = {'time': DataArray(
data[time_key],
coords={'time': data[time_key]},
attrs={'longname': 'time',
'units': 'seconds since 1970-01-01 00:00:00'})}
for ii, key in enumerate(names):
if key == time_key:
continue
res[key] = DataArray(data[key], coords={'time': data[time_key]},
attrs={'longname': key, 'units': units[ii]})
xds = Dataset(res).sortby('time')
xds.attrs = {'source': 'Calibrated diode inside integrated sphere',
'comment': 'Generated on SRON clean-room tablet'}
return xds
# ---------------
[docs]
def read_wav_mon(ogse_dir: Path, file_list: list,
verbose: bool = False) -> Dataset:
"""
Read wavelength monitor data into a xarray.Dataset.
(input comma separated values).
"""
def byte_to_timestamp(str_date: bytes) -> float:
"""Return a timestamp (UTC) from a byte-string.
Parameters
----------
str_date : byte
date-time string in ISO format with timezone CET
Returns
-------
timestamp : float
Unix timestamp in UTC
"""
buff = str_date.strip().decode('ascii') + '+01:00'
return datetime.strptime(buff, '%Y-%m-%dT%H:%M:%S.%f%z').timestamp()
names = ('timestamp', 'spectrum')
formats = None
data = None
wavelength = None
t_intg = None
n_avg = None
for flname in file_list:
with open(ogse_dir / flname, encoding='ascii') as fid:
while True:
line = fid.readline().strip()
if line.startswith('Integration'):
fields = line.split(':')
scalar_t_intg = float(fields[1])
continue
if line.startswith('Averaging'):
fields = line.split(':')
scalar_n_avg = int(fields[1])
continue
if line.startswith('Timestamp'):
fields = line.split(',')
wavelength = np.array(fields[1:], dtype=float)
# define dtype of the data
formats = ('f8', f'{len(wavelength)}f8')
break
if verbose:
print(f'Integration time: {t_intg}ms')
print(f'Averaging Nr.: {n_avg}')
print(len(names), names)
print(len(formats), formats)
# return (None, None)
res = np.loadtxt(fid, delimiter=',',
converters={0: byte_to_timestamp},
dtype={'names': names, 'formats': formats})
if verbose:
print('data :', len(res))
if data is None:
data = res
t_intg = np.full(res.size, scalar_t_intg)
n_avg = np.full(res.size, scalar_n_avg)
else:
data = np.concatenate((data, res))
t_intg = np.concatenate((t_intg, np.full(res.size, scalar_t_intg)))
n_avg = np.concatenate((n_avg, np.full(res.size, scalar_n_avg)))
time_key = 'timestamp'
res = {'time': DataArray(
data[time_key],
coords={'time': data[time_key]},
attrs={'longname': 'time',
'units': 'seconds since 1970-01-01 00:00:00'}),
'wavelength': DataArray(wavelength,
coords={'wavelength': wavelength},
attrs={'longname': 'wavelength grid',
'units': 'nm'}),
't_intg': DataArray(t_intg.astype('i2'),
dims=['time'],
attrs={'longname': 'Integration time',
'units': 'nm'}),
'n_avg': DataArray(n_avg.astype('i2'),
dims=['time'],
attrs={'longname': 'Averaging number',
'units': '1'}),
'spectrum': DataArray(data['spectrum'],
dims=['time', 'wavelength'],
attrs={'longname': 'radiance spectrum',
'units': 'W/(m^2.sr.nm)'})}
xds = Dataset(res).sortby('time')
xds.attrs = {'source': 'Avantes fibre spectrometer',
'comment': 'Generated on SRON clean-room tablet'}
return xds
# ----- SELECT OGSE DATA FROM DATABASE AND ADD TO L1A PRODUCT -----
def read_date_stats() -> tuple:
"""Read output of program 'date' executed at freckle (ITOS) and shogun (SRON)."""
if Path('/array/slot1F/spex_one/OCAL/date_stats').is_dir():
data_dir = Path('/array/slot2B/spex_ocal/ambient/date_stats')
else:
data_dir = Path('/nfs/SPEXone/ocal/ambient/date_stats')
flname = data_dir / 'cmp_date_egse_itos2.txt'
all_valid_lines = ''
with open(flname, encoding='ascii') as fid:
names = fid.readline().strip().lstrip(' #').split('\t\t\t')
formats = len(names) * ('f8',)
while True:
line = fid.readline()
if not line:
break
fields = line.split('\t')
if '' not in fields:
all_valid_lines += line
res = np.loadtxt(StringIO(all_valid_lines),
dtype={'names': names, 'formats': formats})
# a constant difference between the SRON/ITOS timestamps is introduced
# by the calls of 'date' via 'ssh'. This difference is estimated at 350ms.
t_itos = (1000 * res['ITOS'] - 350).astype(int)
t_sron = (1000 * res['SRON(1)']).astype(int)
return t_itos.astype('datetime64[ms]'), t_sron.astype('datetime64[ms]')
def clock_offset(l1a_file: Path) -> tuple[Any, int | Any, Any]:
"""Derive offset between msmt_start/msmt_stop and the SRON clock."""
# determine duration of the measurement (ITOS clock)
with h5py.File(l1a_file, 'r') as fid:
# pylint: disable=unsubscriptable-object
res = fid.attrs['input_files']
if isinstance(res, bytes):
input_file = Path(res.decode('ascii')).stem.rstrip('_hk')
else:
input_file = Path(res[0]).stem.rstrip('_hk')
# pylint: disable=no-member
msmt_start = np.datetime64(
fid.attrs['time_coverage_start'].decode('ascii').split('+')[0])
msmt_stop = np.datetime64(
fid.attrs['time_coverage_end'].decode('ascii').split('+')[0])
duration = (msmt_stop - msmt_start).astype('timedelta64[s]') + 1
# print('duration: ', duration)
# use the timestamp in the filename to correct ICU time
date_start = datetime.strptime(input_file.split('_')[-1],
'%Y%m%dT%H%M%S.%f')
msmt_start = np.datetime64(date_start.isoformat()).astype('datetime64[s]')
msmt_stop = msmt_start + duration
# print('msmt: ', msmt_start, msmt_stop)
# correct msmt_start and msmt_stop to SRON clock
t_itos, t_sron = read_date_stats()
indx = np.argsort(np.abs(t_itos - t_sron))[0]
t_diff = t_sron[indx] - t_itos[indx]
# print('T_sron - T_itos:', t_diff)
return msmt_start, msmt_stop, t_diff
[docs]
def add_ogse_ref_diode(ref_db: Path, l1a_file: Path) -> None:
"""Select reference data taken during a measurement and add to a L1A product."""
# msmt_start and msmt_stop are generated with the ITOS clock
msmt_start, msmt_stop, t_diff = clock_offset(l1a_file)
# ref_time is generated with the SRON clock
xds = open_dataset(ref_db, group='/gse_data/ReferenceDiode')
ref_time = xds['time'].values - t_diff
indx = np.where((ref_time >= msmt_start) & (ref_time <= msmt_stop))[0]
if indx.size == 0:
print('ReferenceDiode',
ref_time.min(), msmt_start, msmt_stop, ref_time.max())
print('[WARNING]: no reference-diode data found')
return
# update Level-1A product with OGSE/EGSE information
xds = xds.isel(time=indx)
xds.to_netcdf(l1a_file, mode='a', group='/gse_data/ReferenceDiode')
[docs]
def add_ogse_wav_mon(ref_db: Path, l1a_file: Path) -> None:
"""Select reference data taken during a measurement and add to a L1A product."""
# msmt_start and msmt_stop are generated with the ITOS clock
msmt_start, msmt_stop, t_diff = clock_offset(l1a_file)
# ref_time is generated with the SRON clock
xds = open_dataset(ref_db, group='/gse_data/WaveMonitor')
ref_time = xds['time'].values - t_diff
mask = (ref_time >= msmt_start) & (ref_time <= msmt_stop)
if mask.sum() == 0:
print('WaveMonitor',
ref_time.min(), msmt_start, msmt_stop, ref_time.max())
print('[WARNING]: no wavelength monitoring data found')
return
# update Level-1A product with OGSE/EGSE information
xds = xds.isel(time=mask.nonzero()[0])
xds.to_netcdf(l1a_file, mode='a', group='/gse_data/WaveMonitor')
def __test() -> None:
"""Small function to test this module."""
ogse_dir = Path('/data/richardh/SPEXone/ambient/polarimetric/calibration/'
'light_level/Logs/')
file_list = ['POLARIMETRIC_LIGHT_LEVELS_ref_det_session174.csv']
print('---------- SHOW DATASET ----------')
print(read_ref_diode(ogse_dir, file_list, verbose=True))
# print('---------- WRITE DATASET ----------')
# create_ref_diode_db(ogse_dir, file_list)
file_list = ['POLARIMETRIC_LIGHTLEVELS_3_Avantes_20210211T165045.csv']
print('---------- SHOW DATASET ----------')
print(read_wav_mon(ogse_dir, file_list, verbose=True))
# print('---------- WRITE DATASET ----------')
# create_wav_mon_db(ogse_dir, file_list)
# --------------------------------------------------
if __name__ == '__main__':
__test()