Source code for jwst.guider_cds.guider_cds

import logging
import warnings

import numpy as np
from stdatamodels.jwst import datamodels

from jwst.lib import reffile_utils

log = logging.getLogger(__name__)

__all__ = ["guider_cds", "get_ref_arr"]


[docs] def guider_cds(model, gain_model, readnoise_model): """ Calculate the count rate for each pixel in all integrations. For each integration in a given FGS guider dataset whose mode is ACQ1, ACQ2, or TRACK, the count rate is the last group minus the first group, divided by the effective integration time. If the mode is ID, the last group minus the first group is calculated for both integrations; the count rate is then given by the minimum of these two values for each pixel, divided by the group time. For the FINEGUIDE mode, the count rate is the average of the last 4 groups minus the average of the first 4 groups, divided by the group time. The variances are calculated using gain and read noise values obtained from reference files if possible, otherwise, default representative constant values are used. From the variances the ERR array is calculated. Parameters ---------- model : `~stdatamodels.jwst.datamodels.GuiderRawModel` Input data model gain_model : `~stdatamodels.jwst.datamodels.GainModel` Gain for all pixels readnoise_model : `~stdatamodels.jwst.datamodels.ReadnoiseModel` Readnoise for all pixels Returns ------- new_model : `~stdatamodels.jwst.datamodels.GuiderCalModel` Output data model """ # get needed sizes and shapes grp_time = model.meta.exposure.group_time exp_type = model.meta.exposure.type n_int = model.data.shape[0] imshape = (model.data.shape[2], model.data.shape[3]) # If exp_type is 'FGS_ID', force output to have single slice, and use # default GAIN and READNOISE values by setting their ref models to None if exp_type[:6] == "FGS_ID": new_model = datamodels.GuiderCalModel((1,) + imshape) else: new_model = datamodels.GuiderCalModel() # get gain and readnoise arrays to calculate ERR array gain_arr = get_ref_arr(model, gain_model) readnoise_arr = get_ref_arr(model, readnoise_model) # set up output data arrays slope_int_cube = np.zeros((n_int,) + imshape, dtype=np.float32) if exp_type[:6] == "FGS_ID": var_rn = np.zeros((1,) + imshape, dtype=np.float32) var_pn = np.zeros((1,) + imshape, dtype=np.float32) else: var_rn = np.zeros((n_int,) + imshape, dtype=np.float32) var_pn = np.zeros((n_int,) + imshape, dtype=np.float32) new_model.dq = model.dq # loop over data integrations for num_int in range(0, n_int): data_sect = model.data[num_int, :, :, :] if exp_type == "FGS_FINEGUIDE": first_4 = data_sect[:4, :, :].mean(axis=0) last_4 = data_sect[-4:, :, :].mean(axis=0) slope_int_cube[num_int, :, :] = last_4 - first_4 var_rn[num_int, :, :] = 2 * (readnoise_arr / grp_time) ** 2 var_pn[num_int, :, :] = slope_int_cube[num_int, :, :] / (gain_arr * grp_time) elif exp_type[:6] == "FGS_ID": grp_last = data_sect[1, :, :] grp_first = data_sect[0, :, :] if num_int == 0: diff_int0 = grp_last - grp_first if num_int == 1: diff_int1 = grp_last - grp_first else: # ACQ1, ACQ2, or TRACK grp_last = data_sect[1, :, :] grp_first = data_sect[0, :, :] slope_int_cube[num_int, :, :] = grp_last - grp_first var_rn[num_int, :, :] = 2 * (readnoise_arr / grp_time) ** 2 var_pn[num_int, :, :] = slope_int_cube[num_int, :, :] / (gain_arr * grp_time) if exp_type[:6] == "FGS_ID": new_model.data[0, :, :] = np.minimum(diff_int1, diff_int0) / grp_time var_rn[0, :, :] = 2 * (readnoise_arr / grp_time) ** 2 # May be zeros in gain array - var will be NaN in these pixels. with warnings.catch_warnings(): warnings.filterwarnings("ignore", "divide by zero", RuntimeWarning) warnings.filterwarnings("ignore", "invalid value", RuntimeWarning) var_pn[0, :, :] = np.minimum(diff_int1, diff_int0) / (gain_arr * grp_time) else: # FINEGUIDE, ACQ1, ACQ2, or TRACK new_model.data = slope_int_cube / grp_time # set err to sqrt of sum of variances var_pn[var_pn < 0] = 0.0 # ensure variance is non-negative new_model.err = (var_rn + var_pn) ** 0.5 # Add all table extensions to be carried over to output if model.hasattr("planned_star_table"): new_model.planned_star_table = model.planned_star_table if model.hasattr("flight_star_table"): new_model.flight_star_table = model.flight_star_table if model.hasattr("pointing_table"): new_model.pointing_table = model.pointing_table if model.hasattr("centroid_table"): new_model.centroid_table = model.centroid_table if model.hasattr("track_sub_table"): new_model.track_sub_table = model.track_sub_table # copy all meta data from input to output model new_model.update(model) # Update BUNIT to reflect count rate new_model.meta.bunit_data = "DN/s" return new_model
[docs] def get_ref_arr(model, reference_model): """ Extract arrays in appropriate shape from the reference files. Parameters ---------- model : `~stdatamodels.jwst.datamodels.GuiderRawModel` Input data model reference_model : `~stdatamodels.jwst.datamodels.ReferenceFileModel` Reference file containing the relevant array; typically either a `~stdatamodels.jwst.datamodels.GainModel` or a `~stdatamodels.jwst.datamodels.ReadnoiseModel` Returns ------- ref_arr : ndarray Values from the reference file """ # extract subarray from reference file, if necessary if reffile_utils.ref_matches_sci(model, reference_model): ref_arr = reference_model.data else: try: ref_sub_model = reffile_utils.get_subarray_model(model, reference_model) log.info( f"Extracting subarray from reference {reference_model.meta.model_type} " f"to match science data." ) ref_arr = ref_sub_model.data ref_sub_model.close() except ValueError as err: if "STACK" in model.meta.exposure.type: if model.data.shape[-2:] != reference_model.data.shape: log.debug( f"The {reference_model.meta.model_type} reference file array does not " "match the shape of stacked FGS data;" "applying the mean value to the data." ) ref_arr = np.zeros(model.data.shape[-2:], dtype=np.float32) + np.mean( reference_model.data ) else: raise ValueError from err return ref_arr