Source code for dorado.dorphot.dorphotClass

from tqdm import tqdm
import numpy as np
import os

from ..core.coreClass import *
from ..timeseries.timeseriesClass import timeSeries
from ..graf.grafClass import star_chart
import ccdprocx

from astropy.time import Time
from astropy.nddata.ccddata import CCDData
CCDData._config_ccd_requires_unit = False
import astroalign as aa
from astropy.io import fits
from astropy.wcs import WCS
from astropy.wcs.utils import proj_plane_pixel_scales as ppps


from astropy.visualization import MinMaxInterval, ZScaleInterval, SquaredStretch, SqrtStretch, AsinhStretch, LinearStretch, LogStretch, HistEqStretch, PowerStretch, SinhStretch

from astropy.visualization.mpl_normalize import ImageNormalize
from astropy.stats import SigmaClip, gaussian_sigma_to_fwhm
from photutils import Background2D, MedianBackground

from photutils.aperture import CircularAperture, aperture_photometry, CircularAnnulus

from astroquery.gaia import Gaia
import astropy.units as u
from astropy.table import Table
import time
from astropy.coordinates import SkyCoord
from scipy.ndimage import median_filter

import skimage.exposure as skie
from skimage.feature import blob_dog, blob_log, blob_doh
# https://scikit-image.org/docs/stable/api/skimage.feature.html#skimage.feature.blob_log
# https://scikit-image.org/docs/stable/auto_examples/features_detection/plot_blob.html
# StarSeeker is a method inspired by the above links and developed
# For the precursor to DORADO, DRACO circa 2019.

__all__ = ['aicoPhot', 'dracoPhot']

[docs]class aicoPhot: def __init__(self): # TODO make observatory class self.temp = None # list of calibration frames from disk
[docs] def calibrate(self, cr, filter, use_med_cr_removal = False, rb = 0, use_lac_cr_removal = False, scln = False, scln_xy = [1,-1, 1,-1]): ''' calibrate performs 'pre-processing' CCDData calibration to a data stack within a ceres object. This involves Bias and Flatfield correction and optional median value based cosmic ray removal based on the ccdprocx.cosmicray_median() method. Parameters ---------- cr: string The relevent name string of Ceres instance to calibrate. filter: str String representation of the relevent filter. use_med_cr_removal: boolean controls whether to use cosmic ray removal in calibration, may affect runtime. Default is False. rb: int the rbox value for ccdprocx.cosmicray_median(). Default is 0. use_lac_cr_removal : boolean controls whether to use lacosmic ray removal in calibration, may affect runtime. Default is False ::NOTE:: the package ccdproc is required. scln: boolean controls whether to crop out scanlines. Default is False scln_xy: array array of position values for line cropping. [xmin, xmax, ymin, ymax]. Default is [1, -1, 1, -1] ''' # for bla in series: add bias corrected = True to header stack = Dorado.ceres[Dorado.ceres_keys[cr]].data[Dorado.ceres[Dorado.ceres_keys[cr]].filters[filter]] flat = stack.flat bias = Dorado.ceres[Dorado.ceres_keys[cr]].bias c_series = [] print('Calibrating') if use_lac_cr_removal: try: import ccdproc except Exception: print('Failed to open ccdproc. Is it installed?') for im in tqdm(stack.data, colour = 'green'): # bar.update() im.data = im.data.astype('uint16') flat.data = flat.data.astype('uint16') bias.data = bias.data.astype('uint16') im = ccdprocx.ccd_process(im, master_bias = bias, master_flat = flat) if scln: im.data = im.data[scln_xy[0]:scln_xy[1], scln_xy[2]:scln_xy[3]] im.mask = im.mask[scln_xy[0]:scln_xy[1], scln_xy[2]:scln_xy[3]] im.uncertainty = im.uncertainty[scln_xy[0]:scln_xy[1], scln_xy[2]:scln_xy[3]] if use_med_cr_removal: im = ccdprocx.cosmicray_median(im, rbox = rb) if use_lac_cr_removal: im == ccdproc.cosmicray_lacosmic(im) im.data = im.data.astype('uint16') c_series.append(im) # add more flags for header Dorado.ceres[Dorado.ceres_keys[cr]].data[Dorado.ceres[Dorado.ceres_keys[cr]].filters[filter]].data = c_series Dorado.ceres[Dorado.ceres_keys[cr]].data[Dorado.ceres[Dorado.ceres_keys[cr]].filters[filter]].calibrated = True
[docs] def imarith(self, cr, filter, operator, operand): ''' imarith is a basic replication of the IRAF image arithmatic tool imarith. Parameters ---------- cr: string The relevent name string of Ceres instance to perform image arithmatic on. filter: str String representation of the relevent filter. operator: string String of operator to be used for arithmatic. Supported operations are '+', '-', '/', and '*' ''' # TODO should this be in stack? like have a wrapper here? # mod to check datatype using type() # mod to remove im_count and make possible to use single image NOTE might already be done # mod to accomodate CCDdata object NOTE might already be done series = Dorado.ceres[Dorado.ceres_keys[cr]].data[Dorado.ceres[Dorado.ceres_keys[cr]].filters[filter]] for i in range(len(series)): if (operator == '+'): series[i].data = series[i].data + operand elif (operator == '-'): series[i].data = series[i].data - operand elif (operator == '/'): series[i].data = series[i].data / operand elif (operator == '*'): series[i].data = series[i].data * operand Dorado.ceres[Dorado.ceres_keys[cr]].data[Dorado.ceres[Dorado.ceres_keys[cr]].filters[filter]] = series
[docs] def getWCS(self, cr, filter, alignto = None, cache = True): ''' getWCS obtains WCS information for an image either via previously solved data in the cache or by passing the image to astrometryNet via dorado.plate_solve(). Parameters ---------- cr: string The relevent name string of Ceres instance to get WCS information for. filter: str String representation of the relevent filter. alignto: int Index of image to use as reference. Default is stack.alignto cache: boolean Controls whether to call astrometryNet for solve or use solved result stored in cache from previous run. Default is True. ''' # TODO mod so cache results are target specific series = Dorado.ceres[Dorado.ceres_keys[cr]].data[Dorado.ceres[Dorado.ceres_keys[cr]].filters[filter]] if alignto == None: alignto = series.alignTo if cache: hdulist = fits.open(Dorado.dordir / 'cache' / 'astrometryNet' / 'solved.fits') Dorado.ceres[Dorado.ceres_keys[cr]].data[Dorado.ceres[Dorado.ceres_keys[cr]].filters[filter]].wcs = WCS(hdulist[0].header, hdulist) Dorado.ceres[Dorado.ceres_keys[cr]].data[Dorado.ceres[Dorado.ceres_keys[cr]].filters[filter]].solved = CCDData.read(Dorado.dordir / 'cache' / 'astrometryNet' / 'solved.fits') hdulist.close() else: toalign = series.data[alignto] fname, cachedir = Dorado.mkcacheObj(toalign, 'astrometryNet') path = [cachedir, fname] writearray = [cachedir, 'solved.fits'] solved, wcs_header = Dorado.plate_solve(path, writearray = writearray) Dorado.delcacheObj( fname, 'astrometryNet') Dorado.ceres[Dorado.ceres_keys[cr]].data[Dorado.ceres[Dorado.ceres_keys[cr]].filters[filter]].wcs = WCS(wcs_header) Dorado.ceres[Dorado.ceres_keys[cr]].data[Dorado.ceres[Dorado.ceres_keys[cr]].filters[filter]].solved = solved
[docs] def align(self, cr, filter, alignto = None, getWCS = True, cache = False, ds = 2, ma = 5): ''' align aligns a filter stack within a specified ceres object to a specified frame (default is first frame). align can optionally retrieve the corresponding WCS data for the aligned stack. Parameters ---------- cr: string The relevent name string of Ceres instance to align. filter: str String representation of the relevent filter. alignto: int Index of image to use as reference. Default is stack.alignto getWCS: boolean Controls whether to obtain WCS information for stack. cache: boolean Controls whether to call astrometryNet for solve or use solved result stored in cache from previous run. Default is True. ds: int or float Sets the detection sigma value for astroalign.register. Default is 2. ma: int or float Sets the minimum area value for astroalign.register. Default is 5. ''' series = Dorado.ceres[Dorado.ceres_keys[cr]].data[Dorado.ceres[Dorado.ceres_keys[cr]].filters[filter]] if alignto == None: alignto = series.alignTo else: series.alignTo = alignto toalign = series.data[alignto] ## TODO :: make this use ceres.getWCS() if getWCS: if cache: toalign = CCDData.read(Dorado.dordir / 'cache' / 'astrometryNet' / 'solved.fits') #, unit = Dorado.unit) hdulist = fits.open(Dorado.dordir / 'cache' / 'astrometryNet' / 'solved.fits') Dorado.ceres[Dorado.ceres_keys[cr]].data[Dorado.ceres[Dorado.ceres_keys[cr]].filters[filter]].wcs = WCS(hdulist[0].header, hdulist) hdulist.close() Dorado.ceres[Dorado.ceres_keys[cr]].data[Dorado.ceres[Dorado.ceres_keys[cr]].filters[filter]].solved = toalign else: fname, cachedir = Dorado.mkcacheObj(toalign, 'astrometryNet') path = [cachedir, fname] writearray = [cachedir, 'solved.fits'] solved, wcs_header = Dorado.plate_solve(path, writearray = writearray) toalign = solved Dorado.delcacheObj( fname, 'astrometryNet') Dorado.ceres[Dorado.ceres_keys[cr]].data[Dorado.ceres[Dorado.ceres_keys[cr]].filters[filter]].wcs = WCS(wcs_header) Dorado.ceres[Dorado.ceres_keys[cr]].data[Dorado.ceres[Dorado.ceres_keys[cr]].filters[filter]].solved = solved # delete cache object # save solved to target aa_series = [] skipped = [] print('Aligning') for image in tqdm(series.data, colour = 'green'): # bar.update() try: img, _ = aa.register(image.data, toalign.data, detection_sigma = ds, min_area = ma) aaim = image aaim.data = img aa_series.append(aaim) except: skipped.append(image) # print('Image skipped') if len(skipped) != 0: print(len(skipped), ' images skipped.') ## TODO :: need to redo times and such for less ims Dorado.ceres[Dorado.ceres_keys[cr]].data[Dorado.ceres[Dorado.ceres_keys[cr]].filters[filter]].data = aa_series Dorado.ceres[Dorado.ceres_keys[cr]].data[Dorado.ceres[Dorado.ceres_keys[cr]].filters[filter]].aligned = True
[docs] def differential_magnitude(self, flux1, flux2, mag2): ''' Probably could use target class inputs and uncertainties ''' mag1 = -2.5 * np.log10(flux1/flux2) + mag2 return mag1
[docs] def apPhot(self, cr, filter, toid, control_toid = None, shape = 21, unc = 0.1): ''' apPhot performs basic aperture photometry based on photutils.aperture_photometry on a target within stack. Target photometry can optionally be compared to a control target within the stack via the differential photometry method. Parameters ---------- cr: string The relevent name string of Ceres instance to perform aperture photometry on. filter: str String representation of the relevent filter. toid: string control_toid: string shape: int unc: float ''' # TODO this needs a better name # TODO get seeing from PSF stack = Dorado.ceres[Dorado.ceres_keys[cr]].data[Dorado.ceres[Dorado.ceres_keys[cr]].filters[filter]] # TODO if no wcs, complain alot w = stack.wcs toi = Dorado.targets[Dorado.target_keys[toid]] xy = w.wcs_world2pix(toi.coords.ra.deg, toi.coords.dec.deg, 1) ra = toi.coords.ra.deg dec = toi.coords.dec.deg pos = [(float(xy[0]), float(xy[1]))] aperture = CircularAperture(pos, r = shape) annulus_aperture = CircularAnnulus(pos, r_in = shape + 2, r_out = shape + 5) apers = [aperture, annulus_aperture] # TODO can a control be chosen? can more than one be chosen? if control_toid != None: control_toi = Dorado.targets[Dorado.target_keys[control_toid]] xyc = w.wcs_world2pix(control_toi.coords.ra.deg, control_toi.coords.dec.deg, 1) posc = [(float(xyc[0]), float(xyc[1]))] aperturec = CircularAperture(posc, r = shape) annulus_aperturec = CircularAnnulus(posc, r_in = shape + 2, r_out = shape + 5) apersc = [aperturec, annulus_aperturec] timestr = [] exptimes = [] ray = [] decx = [] x = [] y = [] flux = [] # NOTE this is exptime corrected fluxunc = [] apsum = [] # NOTE this is raw aperture sum apsum_unc = [] print('Performing photometry') for image in tqdm(stack.data, colour = 'green'): error = unc * image.data results = aperture_photometry(image, apers) #, error = error) bkg_mean = results['aperture_sum_1'] / annulus_aperture.area bkg_sum = bkg_mean * aperture.area results['flux_fit'] = results['aperture_sum_0'] - bkg_sum timestr.append(image.header['DATE-OBS']) # TODO This is most likely needed, but verify exptimes.append(image.header['EXPTIME']) # TODO is this also needed ray.append(ra) # TODO is this really needed? decx.append(dec) x.append(results['xcenter'][0]) # TODO is this needed either? y.append(results['ycenter'][0]) if control_toid != None: resultsc = aperture_photometry(image, apersc) # , error = error) bkg_meanc = resultsc['aperture_sum_1'] / annulus_aperturec.area bkg_sumc = bkg_meanc * aperturec.area resultsc['flux_fit'] = resultsc['aperture_sum_0'] - bkg_sumc apsum.append(results['flux_fit'][0] - resultsc['flux_fit'][0]) flux.append((results['flux_fit'][0] - resultsc['flux_fit'][0])/image.header['EXPTIME']) else: apsum.append(results['flux_fit'][0]) flux.append(results['flux_fit'][0]/image.header['EXPTIME']) fluxunc.append(1) ## TODO:: modify this to account for exposure time and control apsum_unc.append(1) times = Time(timestr) ts = timeSeries(times = times, flux = flux, exptimes = exptimes, x = x, y = y, ra = ray, dec = decx, flux_unc = fluxunc, apsum = apsum, apsum_unc = apsum_unc) Dorado.targets[Dorado.target_keys[toid]].filters[filter] = len(toi.ts) Dorado.targets[Dorado.target_keys[toid]].ts.append(ts)
# TODO accomodate targets embedded in core (list of targets) # TODO the name for this function needs updating
[docs] def mkBase(self, cr, filter, sigClip = False, minmax = False): ''' mkBase stacks filter data within a ceres object to produce a base or 'average' stacked image. Parameters ---------- cr: string The relevent name string of Ceres instance to create a base stacked image for. filter: str String representation of the relevent filter. sigClip: boolean minmax: boolean ''' ## TODO :: add the option to change the combination method. Right now default is # sigma clipped median combination. series = Dorado.ceres[Dorado.ceres_keys[cr]].data[Dorado.ceres[Dorado.ceres_keys[cr]].filters[filter]] # toalign = series.data[series.alignTo] c = ccdprocx.Combiner(series.data) if minmax: c.minmax_clipping(min_clip = 0.1) if sigClip: c.sigma_clipping() base = c.median_combine() base.header['stacked'] = True base.header['numsubs'] = len(series.data) base.header['DATE-OBS'] = series.data[0].header['DATE-OBS'] base.header['filter'] = series.filter Dorado.ceres[Dorado.ceres_keys[cr]].data[Dorado.ceres[Dorado.ceres_keys[cr]].filters[filter]].base = base
## TODO :: sort out what is in the header of this base file.
[docs] def calBase(self, cr, filter): ''' calBase calibrates a base image for a stack by computing a 2D background for the image and removing it from the base image. Parameters ---------- cr: string The relevent name string of Ceres instance to perform base image calibration on. filter: str String representation of the relevent filter. ''' # TODO this needs hella optimization and direction # I am 99% sure this is to remove the background gradient from the stacked base image img = Dorado.ceres[Dorado.ceres_keys[cr]].data[Dorado.ceres[Dorado.ceres_keys[cr]].filters[filter]].base norm = ImageNormalize(stretch=SqrtStretch()) sigma_clip = SigmaClip(sigma=3.) bkg_estimator = MedianBackground() bkg = Background2D(img, (50, 50), filter_size=(3, 3), sigma_clip=sigma_clip, bkg_estimator=bkg_estimator) base = img base.data = img.data / bkg.background.value base.data[np.isnan(base.data)] = 0 Dorado.ceres[Dorado.ceres_keys[cr]].data[Dorado.ceres[Dorado.ceres_keys[cr]].filters[filter]].base = base
[docs]class dracoPhot: def __init__(self, limit_Mag = 16, search_bounds = [30, 30]): #TODO make observatory class self.limit_Mag = limit_Mag self.search_bounds = search_bounds self.scale = 'mm' self.transform = 'sqrt'
[docs] def getWCS(self, cr, filter, alignto = None, cache = True): ''' getWCS obtains WCS information for an image either via previously solved data in the cache or by passing the image to astrometryNet via dorado.plate_solve(). Parameters ---------- cr: string The relevent name string of Ceres instance to get WCS information for. filter: str String representation of the relevent filter. alignto: int Index of image to use as reference. Default is stack.alignto cache: boolean Controls whether to call astrometryNet for solve or use solved result stored in cache from previous run. Default is True. ''' # TODO mod so cache results are target specific series = Dorado.ceres[Dorado.ceres_keys[cr]].data[Dorado.ceres[Dorado.ceres_keys[cr]].filters[filter]] if alignto == None: alignto = series.alignTo if cache: hdulist = fits.open(Dorado.dordir / 'cache' / 'astrometryNet' / 'solved.fits') Dorado.ceres[Dorado.ceres_keys[cr]].data[Dorado.ceres[Dorado.ceres_keys[cr]].filters[filter]].wcs = WCS(hdulist[0].header, hdulist) Dorado.ceres[Dorado.ceres_keys[cr]].data[Dorado.ceres[Dorado.ceres_keys[cr]].filters[filter]].solved = CCDData.read(Dorado.dordir / 'cache' / 'astrometryNet' / 'solved.fits') hdulist.close() else: if alignto == 'base': toalign = series.base else: toalign = series.data[alignto] fname, cachedir = Dorado.mkcacheObj(toalign, 'astrometryNet') path = [cachedir, fname] writearray = [cachedir, 'solved.fits'] solved, wcs_header = Dorado.plate_solve(path, writearray = writearray) Dorado.delcacheObj( fname, 'astrometryNet') Dorado.ceres[Dorado.ceres_keys[cr]].data[Dorado.ceres[Dorado.ceres_keys[cr]].filters[filter]].wcs = WCS(wcs_header) Dorado.ceres[Dorado.ceres_keys[cr]].data[Dorado.ceres[Dorado.ceres_keys[cr]].filters[filter]].solved = solved
[docs] def imarith(self, cr, filter, operator, operand): ''' imarith is a basic replication of the IRAF image arithmatic tool imarith. Parameters ---------- cr: string The relevent name string of Ceres instance to perform image arithmatic on. filter: str String representation of the relevent filter. operator: string String of operator to be used for arithmatic. Supported operations are '+', '-', '/', and '*' ''' # TODO should this be in stack? like have a wrapper here? # mod to check datatype using type() # mod to remove im_count and make possible to use single image NOTE might already be done # mod to accomodate CCDdata object NOTE might already be done series = Dorado.ceres[Dorado.ceres_keys[cr]].data[Dorado.ceres[Dorado.ceres_keys[cr]].filters[filter]] for i in range(len(series)): if (operator == '+'): series[i].data = series[i].data + operand elif (operator == '-'): series[i].data = series[i].data - operand elif (operator == '/'): series[i].data = series[i].data / operand elif (operator == '*'): series[i].data = series[i].data * operand Dorado.ceres[Dorado.ceres_keys[cr]].data[Dorado.ceres[Dorado.ceres_keys[cr]].filters[filter]] = series
[docs] def differential_magnitude(self, flux1, flux2, mag2): ''' Probably could use target class inputs and uncertainties ''' mag1 = -2.5 * np.log10(flux1/flux2) + mag2 return mag1
[docs] def apPhot(self, cr, filter, toid, search = True, read_date = None) : ''' apPhot performs basic aperture photometry based on photutils.aperture_photometry on a target within stack. Target photometry can optionally be compared to a control target within the stack via the differential photometry method. Parameters ---------- cr: string The relevent name string of Ceres instance to perform aperture photometry on. filter: str String representation of the relevent filter. toid: string ''' # TODO this needs a better name # TODO get seeing from PSF stack = Dorado.ceres[Dorado.ceres_keys[cr]].data[Dorado.ceres[Dorado.ceres_keys[cr]].filters[filter]] # TODO if no wcs, complain alot w = stack.wcs self.projectdir = Dorado.dordir / 'data' / 'projects' / toid self.projectdatedir = Dorado.dordir / 'data' / 'projects' / toid / Dorado.ceres[Dorado.ceres_keys[cr]].datestr os.makedirs(self.projectdatedir, exist_ok = True) out_filename_prefix = toid + '_' self.get_stars(cr, filter, toid, search, read_date) print('Initial FWHM: ', np.mean(self.stars['FWHM']), '+/-', np.std(self.stars['FWHM']), 'px') sxy = w.proj_plane_pixel_scales() #ppps(w) print('Pixel scales: ', sxy) print('Initial FWHM: ', (np.mean(self.stars['FWHM']) * sxy[0]).to(u.arcsec), '+/-', (np.std(self.stars['FWHM']) * sxy[0]).to(u.arcsec), 'px') print('Performing Photometry...') # The run table is most likely superseeded by the log table # side note, the log table is less of a log and more of results # since it doesnt log the procedure, and instead contains results run = Table(names = ('time', '', 'sky',)) # TODO:: Maybe add instrument temp, stuff like airmass, alt/az, airtemp, other params self.log = Table(names = ('time', 'image', 'exptime', 'zp_m', 'zp_b', 'sky', 'FWHM', 'FWHM_std', 'seeing')) for i in tqdm(range(len(stack.data)), colour = 'green'): im = stack.data[i] tstr = str(Time(im.header['DATE-OBS'], format='fits').mjd) imname = tstr + '_' + str(i) imPhot = photo(im, self.stars, w, im_index = i) imPhot.apPhot_step() # Should the PSF photometry step go here? zpv = imPhot.get_zero_point() imPhot.mag_calibrate() imPhot.set_time() outstr = out_filename_prefix + imname + '.fits' self.log.add_row(imPhot.write(self.projectdatedir / outstr)) self.log.write(self.projectdatedir / (out_filename_prefix + 'log.fits'), overwrite = True) print('Photometry completed.')
# TODO:: write out a summary table to terminal??
[docs] def inIm(self, tab, cr, filter, border = 0): stack = Dorado.ceres[Dorado.ceres_keys[cr]].data[Dorado.ceres[Dorado.ceres_keys[cr]].filters[filter]] height, width = stack.data[stack.alignTo].data.shape num = len(tab) tab = tab[tab['y'] >= 0 + border] tab = tab[tab['y'] <= height - border] tab = tab[tab['x'] >= 0 + border] tab = tab[tab['x'] <= width - border] print('Went from ', num , ' stars in input, to ', len(tab), ' stars in field area.') return tab
[docs] def get_field(self, cr, filter, toid): startTime = time.time() stack = Dorado.ceres[Dorado.ceres_keys[cr]].data[Dorado.ceres[Dorado.ceres_keys[cr]].filters[filter]] w = stack.wcs im = stack.data[stack.alignTo] width = u.Quantity(self.search_bounds[0], u.arcmin) height = u.Quantity(self.search_bounds[1], u.arcmin) coords = SkyCoord.from_name(toid) # Make the Query # Columns we want to keep columnse = [ 'phot_g_mean_mag', 'phot_bp_mean_mag', 'phot_rp_mean_mag', 'teff_val', 'dist'] # and what we want to name those columns columns = ['g_mag', 'bp_mag', 'rp_mag', 'teff', 'dist'] # Result return limit, default was 50 Gaia.ROW_LIMIT = 10000 re = Gaia.query_object(coordinate=coords, width=width, height=height) # Parse the results de = [str(d) for d in re['DESIGNATION']] # This fixes the bad return from Gaia # Construct the star object table r = Table((de, re['ra'], re['dec']), names = ('DESIGNATION', 'ra', 'dec')) # Fill in rest of columns for (s, se )in zip(columns, columnse): r[s] = re[se] # Limit the results to a set magnitude r = r[r['rp_mag'] <= self.limit_Mag] # Get stellar object pixel position r['x'], r['y'] = w.world_to_pixel(SkyCoord(r['ra'], r['dec'])) # This is a hardcoded scale size for plotting r['r'] = 28 - r['rp_mag'].value # Do we keep it? # Figure out which results are in the image r = self.inIm(r, cr, filter, 100) # Round of the sig figs r.round(4) # output runtime executionTime = np.round((time.time() - startTime), 3) print('Execution time in seconds for Gaia lookup: ' + str(executionTime)) self.star_chart(r, im, 'Gaia', w) return r
[docs] def starSeeker(self, cr, filter): if self.scale == 'mm': interval = MinMaxInterval() elif self.scale == 'z': interval = ZScaleInterval() if self.transform == 'sqrt': stretch = SqrtStretch() elif self.transform == 'squared': stretch = SquaredStretch() elif self.transform == 'asinh': stretch = AsinhStretch() elif self.transform == 'sinh': stretch = SinhStretch() elif self.transform == 'linear': stretch = LinearStretch() elif self.transform == 'log': stretch = LogStretch() elif self.transform == 'power': stretch = PowerStretch() elif self.transform == 'hist': stretch = HistEqStretch() startTime = time.time() stack = Dorado.ceres[Dorado.ceres_keys[cr]].data[Dorado.ceres[Dorado.ceres_keys[cr]].filters[filter]] w = stack.wcs im = stack.data[stack.alignTo] data = im.data # mf = median_filter(data, size= 5) # datamf = data - mf # limg = np.arcsinh(datamf) norm = ImageNormalize(data, interval=interval, stretch=stretch) limg = norm(data) limg = limg / limg.max() low = np.percentile(limg, 1) high = np.percentile(limg, 99.5) opt_img = skie.exposure.rescale_intensity(limg, in_range=(low,high)) # Laplacian blob detection stars = blob_log(opt_img, max_sigma=25, min_sigma = 5, num_sigma=10, threshold=.2) # full width at half max should be calculated from original unscaled image # how can we do this per image? fwhm = gaussian_sigma_to_fwhm * stars[:,2] # Convert from sigma to radii in the 3rd column. stars[:, 2] = stars[:, 2] * np.sqrt(2) y, x, r= stars[:, 0], stars[:, 1], stars[:, 2] results = Table((x, y, r, fwhm), names = ('x', 'y', 'r', 'FWHM')) co = w.pixel_to_world(stars[:,1], stars[:,0]) results['ra'] = co.ra results['dec'] = co.dec results = self.inIm(results, cr, filter, 100) print('Found ', len(results), ' stars via starseeker.') executionTime = np.round((time.time() - startTime), 3) print('Execution time in seconds for starSeeker: ' + str(executionTime)) img = im.copy() img.data = opt_img self.star_chart(results, img, 'Starseeker', w) return results, img
[docs] def get_stars(self, cr, filter, toid, search, read_date): # add these back if needed :: , limit_Mag = 16, search_bounds = [30, 20] # ask if stars should be saved with flag if search: gaia_stars = self.get_field(cr, filter, toid) s3, opt_img = self.starSeeker(cr, filter) gaia_stars['detection_separation'] = np.zeros(len(gaia_stars)) gaia_stars['detection_x'] = np.zeros(len(gaia_stars)) gaia_stars['detection_y'] = np.zeros(len(gaia_stars)) gaia_stars['detection_r'] = np.zeros(len(gaia_stars)) gaia_stars['FWHM'] = np.zeros(len(gaia_stars)) matched = Table(names = gaia_stars.colnames, dtype = gaia_stars.dtype) print('Matching stars..') self.unmatched = 0 for s in (pbar := tqdm(gaia_stars, colour = 'green')): pbar.set_description('Matching star : ' + str(s['DESIGNATION'])) pbar.refresh() sm = self.match_star(s, s3) if sm != False: matched.add_row(sm) self.stars = matched # should this really be an internal list # Or should it belong to anoter class # projectdir = Dorado.dordir / 'data' / 'projects' / toid os.makedirs(self.projectdir / 'stars', exist_ok = True) self.stars.write(self.projectdir / 'stars' / 'stars.fits', overwrite = True) print(self.unmatched, ' stars were not matched.') # crop out unmatched stars else: if read_date != None: fname = 'stars_' + str(read_date) + '.fits' self.stars = Table.read(self.projectdir / 'stars' / fname) else: print('No read date given for saved star catalogue.')
[docs] def match_star(self, star, s3): sx, sy = star['x'], star['y'] sr = star['r'] separation = np.sqrt((sx - s3['x'])**2 + (sy - s3['y'])**2) candidate = s3[separation <= sr] sep = separation[separation <= sr] if len(sep) > 1: print(len(sep)) print(candidate) candidate = candidate[sep == np.min(sep)][0] sep = sep[sep == np.min(sep)][0] star['detection_separation'] = sep star['detection_x'] = candidate['x'] star['detection_y'] = candidate['y'] star['detection_r'] = candidate['r'] star['FWHM'] = candidate['FWHM'] return star elif len(sep) == 0: self.unmatched += 1 return False else: star['detection_separation'] = sep star['detection_x'] = candidate['x'] star['detection_y'] = candidate['y'] star['detection_r'] = candidate['r'] star['FWHM'] = candidate['FWHM'] return star
[docs] def star_chart(self, stars, im, toid, w): sc = star_chart(im, title = toid, wcs = w) sc.plt_stars(stars, label = 'Stars') sc.add_compass() sc.add_scale() sc.plot()
## TODO :: figure out how to handle aligning and processing planetary images and image with low star counts def mag(flux, exp = 1, unc = None): mag_inst = -2.5 * np.log10(flux / exp) if unc: mag_inst_unc = unc / (flux * 2.30258509) return mag_inst, mag_inst_unc else: return mag_inst class photo: ''' Modify this to add PSF stuff ''' def __init__(self, image, stars, wcs, time = None, im_index = 0): self.image = image self.im_index = im_index self.stars = stars self.wcs = wcs # currently not in use, grabbing from im header becaaus Im a degenerate self.time = time self.set_time() try: self.exp = image.header['EXPTIME'] except: try: self.exp = image.header['EXPOSURE'] except: print('ERROR: No image exposure length info found in fits header using default keywords.') # this needs a more rigorous treatment, but for now its a good aproximation. self.sky = np.mean((np.mean(self.image.data), np.median(self.image.data))) self.fwhm = 0 self.seeing = 0 def apPhot_step(self): self.stars['aperture_sum'] = np.zeros(len(self.stars)) self.stars['inst_mag'] = np.zeros(len(self.stars)) self.stars['aperture_sum_raw'] = np.zeros(len(self.stars)) self.stars['inst_mag_raw'] = np.zeros(len(self.stars)) for i in range(len(self.stars)): pos = (self.stars[i]['x'], self.stars[i]['y']) # TODO :: why is there no annulus? seriously, this is basic photometry # and I haven't even made an annulus aperture. pathetic shape = float(1.2 * self.stars[i]['detection_r']) # its possible this was handing back a row instead of a float if shape <= 0: print('negative or zero shape encountered', shape) aperture = CircularAperture(pos, r=shape) annulus_aperture = CircularAnnulus(positions = pos, r_in = (shape + 2), r_out = (shape + 5)) apers = [aperture, annulus_aperture] phot_table = aperture_photometry(self.image.data, apers) bkg_mean = phot_table['aperture_sum_1'] / annulus_aperture.area bkg_sum = bkg_mean * aperture.area self.stars[i]['aperture_sum_raw'] = phot_table['aperture_sum_0'] self.stars[i]['aperture_sum'] = phot_table['aperture_sum_0'] - bkg_sum self.stars[i]['inst_mag_raw'] = mag(phot_table['aperture_sum_0'], self.exp) self.stars[i]['inst_mag'] = mag(phot_table['aperture_sum_0'] - bkg_sum, self.exp) def get_zero_point(self): stars = self.stars[self.stars['inst_mag'] <= -2.3] # Heres a place to mod, this cutoff value self.zero_point_vals = np.polyfit(stars['inst_mag'], stars['rp_mag'], 1) self.zero_point = np.poly1d(self.zero_point_vals) return self.zero_point_vals def mag_calibrate(self): # Well this is elegent, its a single line of code, well, it was, then I ruined it with this comment self.stars['fit_mag'] = self.zero_point(self.stars['inst_mag']) def set_time(self): self.stars['time'] = [float(Time(self.image.header['DATE-OBS'], format='fits').mjd) for i in range(len(self.stars))] def write(self, filename): self.stars.write(filename, overwrite = True) # why am I doing this in the same action as writing the table? who knows really # I could pretend that it was to add the filename to the log table # TODO:: add filename to log table :) mean_fwhm, std_fwhm = np.mean(self.stars['FWHM']), np.std(self.stars['FWHM']) self.fwhm = mean_fwhm return [float(self.stars['time'][0]), self.im_index, self.exp, self.zero_point_vals[0], self.zero_point_vals[1], self.sky, self.fwhm, std_fwhm, self.seeing] # class tessPhot: # def __init__(self): # self.temp = None # def dorphot(self, cere, filter, aperture_shape, annulus_shape): # stack = cere.data[cere.filters[filter]] # if stack.ts == []: # #tess_bjds, make sure to read them in as time objects for cere # stack.ts = timeSeries() # ts = stack.ts # perture = RectangularAperture((5,4), 3,3) # annulus_aperture = RectangularAnnulus((5,4), 4, 5, 5) # # Combine the aperture # aps = [aperture , annulus_aperture] # for s in tqdm(range(stack.length), colour = 'green'): # # Place the aperature on the current stamp and perform aperture photometry # phot_table = aperture_photometry(calibrated_fluxes[s,:,:], aps) # # Compute the background values from the annulus photometry # bkg_sum = (phot_table['aperture_sum_1'] / annulus_aperture.area) * aperture.area # # Subtract the background from main aperture # phot_table['residual_aperture_sum'] = phot_table['aperture_sum_0'] - bkg_sum # # Store the aperture results # aperture_sums.append(float(phot_table['aperture_sum_0']) - float(bkg_sum)) # # Store the flux error # err.append(np.mean(flux_err[s,:,:]))