Source code for pyke.prf

from abc import abstractmethod
import math
import scipy
import numpy as np
import tqdm
import sys
from astropy.io import fits as pyfits
from oktopus.posterior import PoissonPosterior
from .utils import channel_to_module_output, plot_image

# This is a workaround to get the number of arguments of
# a given function.
# In Python 2, this works by using getargspec.
# Note that `self` is accounted as an argument,
# which is unwanted, hence the subtraction by 1.
# On the other hand, Python 3 handles that trivially with the
# signature function.
if sys.version_info[0] == 2:
    from inspect import getargspec
    def _get_number_of_arguments(func):
        list_of_args = getargspec(func).args
        if 'self' in list_of_args:
            return len(list_of_args) - 1
        else:
            return len(list_of_args)
else:
    from inspect import signature
    def _get_number_of_arguments(func):
        return len(signature(func).parameters)


__all__ = ['PRFPhotometry', 'SceneModel', 'KeplerPRF', 'SimpleKeplerPRF', 'get_initial_guesses']


[docs]class PRFPhotometry(object): """ This class performs PRF Photometry on TPF-like files. Attributes ---------- scene_model : instance of SceneModel Model which will be fit to the data priors : instance of oktopus.JointPrior Priors on the parameters that will be estimated loss_function : subclass of oktopus.LossFunction Noise distribution associated with each random measurement Examples -------- >>> from pyke import KeplerTargetPixelFile, SimpleKeplerPRF, SceneModel, PRFPhotometry >>> from oktopus import UniformPrior >>> tpf = KeplerTargetPixelFile("https://archive.stsci.edu/missions/kepler/" ... "target_pixel_files/0084/008462852/" ... "kplr008462852-2013098041711_lpd-targ.fits.gz") # doctest: +SKIP Downloading https://archive.stsci.edu/missions/kepler/target_pixel_files/0084/008462852/kplr008462852-2013098041711_lpd-targ.fits.gz [Done] >>> prf = SimpleKeplerPRF(tpf.channel, tpf.shape[1:], tpf.column, tpf.row) # doctest: +SKIP Downloading http://archive.stsci.edu/missions/kepler/fpc/prf/extracted/kplr16.4_2011265_prf.fits [Done] >>> scene = SceneModel(prfs=prf) # doctest: +SKIP >>> prior = UniformPrior(lb=[1.2e5, 230., 128.,1e2], ub=[3.4e5, 235., 133., 1e3]) # doctest: +SKIP >>> phot = PRFPhotometry(scene, prior) # doctest: +SKIP >>> results = phot.fit(tpf.flux) # doctest: +SKIP >>> flux_fit = results[:, 0] # doctest: +SKIP >>> x_fit = results[:, 1] # doctest: +SKIP >>> y_fit = results[:, 2] # doctest: +SKIP >>> bkg_fit = results[:, 3] # doctest: +SKIP """ def __init__(self, scene_model, prior, loss_function=PoissonPosterior, **kwargs): self.scene_model = scene_model self.prior = prior self.loss_function = loss_function self.loss_kwargs = kwargs self.opt_params = np.array([]) self.residuals = np.array([]) self.loss_value = np.array([]) self.uncertainties = np.array([])
[docs] def fit(self, tpf_flux, x0=None, cadences='all', method='powell', **kwargs): """ Fits the scene model to the given data in ``tpf_flux``. Parameters ---------- tpf_flux : array-like A pixel flux time-series, i.e., the pixel data, e.g, KeplerTargetPixelFile.flux, such that (time, row, column) represents the shape of ``tpf_flux``. x0 : array-like or None Initial guesses on the parameters. The default is to use the mean of the prior distribution. cadences : array-like of ints or str A list or array that contains the cadences which will be fitted. Default is to fit all cadences. kwargs : dict Dictionary of additional parameters to be passed to `scipy.optimize.minimize`. Returns ------- opt_params : array-like Matrix with the optimized parameter values. The i-th line contain the best parameter values at the i-th cadence. The order of the parameters in every line follows the order of the ``scene_model``. """ self.opt_params = np.array([]) self.residuals = np.array([]) self.loss_value = np.array([]) self.uncertainties = np.array([]) if x0 is None: x0 = self.prior.mean if cadences == 'all': cadences = range(tpf_flux.shape[0]) for t in tqdm.tqdm(cadences): loss = self.loss_function(tpf_flux[t], self.scene_model, prior=self.prior, **self.loss_kwargs) result = loss.fit(x0=x0, method='powell', **kwargs) opt_params = result.x residuals = tpf_flux[t] - self.scene_model(*opt_params) self.loss_value = np.append(self.loss_value, result.fun) self.opt_params = np.append(self.opt_params, opt_params) self.residuals = np.append(self.residuals, residuals) self.opt_params = self.opt_params.reshape((tpf_flux.shape[0], len(x0))) self.residuals = self.residuals.reshape(tpf_flux.shape) return self.opt_params
[docs] def get_residuals(self): return self.residuals
[docs]class SceneModel(object): """ This class builds a generic model for a scene. Attributes ---------- prfs : list of callables A list of prfs bkg_model : callable A function that models the background variation. Default is a constant background """ def __init__(self, prfs, bkg_model=lambda bkg: np.array([bkg])): self.prfs = np.asarray([prfs]).reshape(-1) self.bkg_model = bkg_model self._prepare_scene_model() def __call__(self, *params): return self.evaluate(*params) def _prepare_scene_model(self): self.n_models = len(self.prfs) self.bkg_order = _get_number_of_arguments(self.bkg_model) model_orders = [0] for i in range(self.n_models): model_orders.append(_get_number_of_arguments(self.prfs[i].evaluate)) self.n_params = np.cumsum(model_orders)
[docs] def evaluate(self, *params): """ Parameters ---------- flux : scalar or array-like Total integrated flux of the PRF model center_col, center_row : scalar or array-like Column and row coordinates of the center scale_col, scale_row : scalar or array-like Pixel scale in the column and row directions rotation_angle : float Rotation angle in radians bkg_params : scalar or array-like Parameters for the background model """ self.mm = [] for i in range(self.n_models): self.mm.append(self.prfs[i](*params[self.n_params[i]:self.n_params[i+1]])) self.scene_model = np.sum(self.mm, axis=0) + self.bkg_model(*params[-self.bkg_order:]) return self.scene_model
[docs] def gradient(self, *params): grad = [] for i in range(self.n_models): grad.append(self.prfs[i].gradient(*params[self.n_params[i]:self.n_params[i+1]])) grad.append(self.bkg_model.gradient(*params[-self.bkg_order:])) grad = sum(grad, []) return grad
[docs] def plot(self, *params, **kwargs): pflux = self.evaluate(*params) plot_image(pflux, title='Scene Model, Channel: {}'.format(self.prfs[0].channel), extent=(self.prfs[0].column, self.prfs[0].column + self.prfs[0].shape[1], self.prfs[0].row, self.prfs[0].row + self.prfs[0].shape[0]), **kwargs)
[docs]class KeplerPRF(object): """ Kepler's Pixel Response Function as designed by [1]_. This class provides the necessary interface to load Kepler PRF calibration files and to create a model that can be fit as a function of flux, center positions, width, and rotation angle. Attributes ---------- channel : int KeplerTargetPixelFile.channel shape : (int, int) KeplerTargetPixelFile.shape[1:] column : int KeplerTargetPixelFile.column row : int KeplerTargetPixelFile.row Examples -------- Objects from the KeplerPRF class are defined by a channel number, a pair of dimensions (the size of the image), and a reference coordinate (bottom left corner). In this example, we create a KeplerPRF object located at channel #44 with dimension equals 10 x 10, reference row and column coordinate equals (5, 5). After the object has been created, we may translate it to a given center coordinate. Additionally, we can specify total flux, pixel scales, and rotation around the object's center. >>> import math >>> import matplotlib.pyplot as plt >>> from pyke import KeplerPRF >>> kepprf = KeplerPRF(channel=44, shape=(10, 10), column=5, row=5) # doctest: +SKIP Downloading http://archive.stsci.edu/missions/kepler/fpc/prf/extracted/kplr13.4_2011265_prf.fits [Done] >>> prf = kepprf(flux=1000, center_col=10, center_row=10, ... scale_row=0.7, scale_col=0.7, rotation_angle=math.pi/2) # doctest: +SKIP >>> plt.imshow(prf, origin='lower') # doctest: +SKIP References ---------- .. [1] S. T. Bryson. The Kepler Pixel Response Function, 2010. <https://arxiv.org/abs/1001.0331>. """ def __init__(self, channel, shape, column, row): self.channel = channel self.shape = shape self.column = column self.row = row self.col_coord, self.row_coord, self.interpolate = self._prepare_prf() def __call__(self, flux, center_col, center_row, scale_col, scale_row, rotation_angle): return self.evaluate(flux, center_col, center_row, scale_col, scale_row, rotation_angle)
[docs] def evaluate(self, flux, center_col, center_row, scale_col, scale_row, rotation_angle): """ Interpolates the PRF model onto detector coordinates. Parameters ---------- flux : float Total integrated flux of the PRF center_col, center_row : float Column and row coordinates of the center scale_col, scale_row : float Pixel scale in the column and row directions rotation_angle : float Rotation angle in radians Returns ------- prf_model : 2D array Two dimensional array representing the PRF values parametrized by flux, centroids, widths, and rotation. """ cosa = math.cos(rotation_angle) sina = math.sin(rotation_angle) delta_col = self.col_coord - center_col delta_row = self.row_coord - center_row delta_col, delta_row = np.meshgrid(delta_col, delta_row) rot_row = delta_row * cosa - delta_col * sina rot_col = delta_row * sina + delta_col * cosa self.prf_model = flux * self.interpolate(rot_row.flatten() * scale_row, rot_col.flatten() * scale_col, grid=False).reshape(self.shape) return self.prf_model
def _read_prf_calibration_file(self, path, ext): prf_cal_file = pyfits.open(path) data = prf_cal_file[ext].data # looks like these data below are the same for all prf calibration files crval1p = prf_cal_file[ext].header['CRVAL1P'] crval2p = prf_cal_file[ext].header['CRVAL2P'] cdelt1p = prf_cal_file[ext].header['CDELT1P'] cdelt2p = prf_cal_file[ext].header['CDELT2P'] prf_cal_file.close() return data, crval1p, crval2p, cdelt1p, cdelt2p def _prepare_prf(self): n_hdu = 5 min_prf_weight = 1e-6 module, output = channel_to_module_output(self.channel) # determine suitable PRF calibration file if module < 10: prefix = 'kplr0' else: prefix = 'kplr' prfs_url_path = "http://archive.stsci.edu/missions/kepler/fpc/prf/extracted/" prffile = prfs_url_path + prefix + str(module) + '.' + str(output) + '_2011265_prf.fits' # read PRF images prfn = [0] * n_hdu crval1p = np.zeros(n_hdu, dtype='float32') crval2p = np.zeros(n_hdu, dtype='float32') cdelt1p = np.zeros(n_hdu, dtype='float32') cdelt2p = np.zeros(n_hdu, dtype='float32') for i in range(n_hdu): prfn[i], crval1p[i], crval2p[i], cdelt1p[i], cdelt2p[i] = self._read_prf_calibration_file(prffile, i+1) prfn = np.array(prfn) PRFcol = np.arange(0.5, np.shape(prfn[0])[1] + 0.5) PRFrow = np.arange(0.5, np.shape(prfn[0])[0] + 0.5) PRFcol = (PRFcol - np.size(PRFcol) / 2) * cdelt1p[0] PRFrow = (PRFrow - np.size(PRFrow) / 2) * cdelt2p[0] # interpolate the calibrated PRF shape to the target position rowdim, coldim = self.shape[0], self.shape[1] prf = np.zeros(np.shape(prfn[0]), dtype='float32') prf_weight = np.zeros(n_hdu, dtype='float32') ref_column = self.column + .5 * coldim ref_row = self.row + .5 * rowdim for i in range(n_hdu): prf_weight[i] = math.sqrt((ref_column - crval1p[i]) ** 2 + (ref_row - crval2p[i]) ** 2) if prf_weight[i] < min_prf_weight: prf_weight[i] = min_prf_weight prf += prfn[i] / prf_weight[i] prf /= (np.nansum(prf) * cdelt1p[0] * cdelt2p[0]) # location of the data image centered on the PRF image (in PRF pixel units) col_coord = np.arange(self.column + .5, self.column + coldim + .5) row_coord = np.arange(self.row + .5, self.row + rowdim + .5) # x-axis correspond to row-axis in scipy.RectBivariate # not to be confused with our convention, in which the # x-axis correspond to the column-axis interpolate = scipy.interpolate.RectBivariateSpline(PRFrow, PRFcol, prf) return col_coord, row_coord, interpolate
[docs] def plot(self, *params, **kwargs): pflux = self.evaluate(*params) plot_image(pflux, title='Kepler PRF Model, Channel: {}'.format(self.channel), extent=(self.column, self.column + self.shape[1], self.row, self.row + self.shape[0]), **kwargs)
[docs]class SimpleKeplerPRF(KeplerPRF): """ Simple model of KeplerPRF. This class provides identical functionality as in KeplerPRF, except that it is parametrized only by flux and center positions. The width scales and angle are fixed to 1.0 and 0, respectivelly. """ def __call__(self, flux, center_col, center_row): return self.evaluate(flux, center_col, center_row)
[docs] def evaluate(self, flux, center_col, center_row): """ Interpolates the PRF model onto detector coordinates. Parameters ---------- flux : float Total integrated flux of the PRF center_col, center_row : float Column and row coordinates of the center Returns ------- prf_model : 2D array Two dimensional array representing the PRF values parametrized by flux and centroids. """ delta_col = self.col_coord - center_col delta_row = self.row_coord - center_row self.prf_model = flux * self.interpolate(delta_row, delta_col) return self.prf_model
[docs] def gradient(self, flux, center_col, center_row): """ This function returns the gradient of the SimpleKeplerPRF model with respect to flux, center_col, and center_row. Parameters ---------- flux : float Total integrated flux of the PRF center_col, center_row : float Column and row coordinates of the center Returns ------- grad_prf : list Returns a list of arrays where the elements are the derivative of the KeplerPRF model with respect to flux, center_col, and center_row, respectively. """ delta_col = self.col_coord - center_col delta_row = self.row_coord - center_row deriv_flux = self.interpolate(delta_row, delta_col) deriv_center_col = - flux * self.interpolate(delta_row, delta_col, dy=1) deriv_center_row = - flux * self.interpolate(delta_row, delta_col, dx=1) return [deriv_flux, deriv_center_col, deriv_center_row]
[docs]def get_initial_guesses(data, ref_col, ref_row): """ Compute the initial guesses for total flux, centers position, and PSF width using the sample moments of the data. Parameters ---------- data : 2D array-like Image data ref_col, ref_row : scalars Reference column and row (coordinates of the bottom left corner) Return ------ flux0, col0, row0, sigma0: floats Inital guesses for flux, center position, and width """ flux0 = np.nansum(data) yy, xx = np.indices(data.shape) + 0.5 yy = ref_row + yy xx = ref_col + xx col0 = np.nansum(xx * data) / flux0 row0 = np.nansum(yy * data) / flux0 marg_col = data[:, int(np.round(col0 - ref_col))] marg_row = data[int(np.round(row0 - ref_row)), :] sigma_y = math.sqrt(np.abs((np.arange(marg_row.size) - row0) ** 2 * marg_row).sum() / marg_row.sum()) sigma_x = math.sqrt(np.abs((np.arange(marg_col.size) - col0) ** 2 * marg_col).sum() / marg_col.sum()) sigma0 = math.sqrt((sigma_x**2 + sigma_y**2)/2.0) return flux0, col0, row0, sigma0