import warnings
warnings.filterwarnings('ignore')
# import sys
# import os
# import numpy as np
import ccdproc
from astropy.time import Time
from astropy.table import QTable, Table
import astroalign as aa
from astropy.wcs import WCS
from astropy.utils.console import ProgressBar, ProgressBarOrSpinner
# from astropy.coordinates import SkyCoord as acoord
# import astropy.units as un
from astropy.io import fits
from astropy.nddata.ccddata import CCDData
# photometry imports
from photutils.psf import IntegratedGaussianPRF, DAOGroup
from photutils.background import MMMBackground, MADStdBackgroundRMS
from astropy.modeling.fitting import LevMarLSQFitter
from astropy.stats import gaussian_sigma_to_fwhm
from photutils.psf import IterativelySubtractedPSFPhotometry
from photutils import DAOStarFinder
from astropy.stats import mad_std
# from astroquery.simbad import Simbad
from ..timeseries import timeSeries
'''
Ceres is the handler of image series in Dorado,
'''
__all__ = ['Ceres']
[docs]class Ceres:
'''
The Ceres class encapsulates a set of astronomical data from a single night of observation.
Ceres can handle multiple stacks of data in different filters and perform a variety of
actions on them in an orgainized fashion.
Attributes
----------
filters: dictionary
data: Stack array
bias: CCDdata
time: 'astropy.Time'
datestr: str
'''
def __init__(self, filters = {}, data = [], bias = None, time = None, datestr = None):
# metadata
self.filters = filters
self.data = data
self.bias = bias
self.time = time
self.datestr = datestr
# location, weather, timezone, camera, observer array
# refering instance of clippy to call and save later?
# or call clippy directly and feed it a ceres object
try:
self.date = Time(int(self.time.mjd), format = 'mjd')
except:
self.date = None
[docs] def add_stack(self, stack):
# eventually stacks themelves should have some metadata
# to denote stuff like calibration status
self.filters[stack.filter] = len(self.data)
self.data.append(stack)
[docs] def rem_stack(self, filter):
del self.data[self.filters[filter]]
# delete time strings
[docs] def calibrate(self, filter):
# for bla in series: add bias corrected = True to header
stack = self.data[self.filters[filter]]
flat = stack.flat
bias = self.bias
c_series = []
with ProgressBar(len(stack.data)) as bar:
for im in stack.data:
bar.update()
im.data = im.data.astype('uint16')
flat.data = flat.data.astype('uint16')
bias.data = bias.data.astype('uint16')
im = ccdproc.ccd_process(im, master_bias = bias, master_flat = flat)
im.data = im.data.astype('uint16')
c_series.append(im)
self.data[self.filters[filter]].data = c_series
self.data[self.filters[filter]].calibrated = True
[docs] def imarith(self, filter, operator, operand):
# mod to check datatype using type()
# mod to remove im_count and make possible to use single image
# mod to accomodate CCDdata object
series = self.data[self.filters[filter]]
for i in range(len(series)):
if (operator == '+'):
series[i].data = series[i].data + operand
elif (operator == '-'):
series[i].data = series[i].data - operand
elif (operator == '/'):
series[i].data = series[i].data / operand
elif (operator == '*'):
series[i].data = series[i].data * operand
self.data[self.filters[filter]] = series
[docs] def getWCS(self, filter, clippy, alignto = None, cache = True):
series = self.data[self.filters[filter]]
if alignto == None:
alignto = series.alignTo
if cache:
hdulist = fits.open(clippy.dordir / 'cache' / 'astrometryNet' / 'solved.fits')
self.data[self.filters[filter]].wcs = WCS(hdulist[0].header, hdulist)
self.data[self.filters[filter]].solved = CCDData.read(clippy.dordir / 'cache' / 'astrometryNet' / 'solved.fits')
hdulist.close()
else:
toalign = series.data[alignto]
fname, cachedir = clippy.mkcacheObj(toalign, 'astrometryNet')
path = [cachedir, fname]
writearray = [cachedir, 'solved.fits']
solved, wcs_header = clippy.plate_solve(path, writearray = writearray)
clippy.delcacheObj( fname, 'astrometryNet')
self.data[self.filters[filter]].wcs = WCS(wcs_header)
self.data[self.filters[filter]].solved = solved
[docs] def align(self, filter, clippy, alignto = None, getWCS = True, cache = False):
series = self.data[self.filters[filter]]
if alignto == None:
alignto = series.alignTo
toalign = series.data[alignto]
if getWCS:
if cache:
toalign = CCDData.read(clippy.dordir / 'cache' / 'astrometryNet' / 'solved.fits', unit = clippy.unit)
hdulist = fits.open(clippy.dordir / 'cache' / 'astrometryNet' / 'solved.fits')
self.data[self.filters[filter]].wcs = WCS(hdulist[0].header, hdulist)
hdulist.close()
self.data[self.filters[filter]].solved = toalign
else:
fname, cachedir = clippy.mkcacheObj(toalign, 'astrometryNet')
path = [cachedir, fname]
writearray = [cachedir, 'solved.fits']
solved, wcs_header = clippy.plate_solve(path, writearray = writearray)
toalign = solved
clippy.delcacheObj( fname, 'astrometryNet')
self.data[self.filters[filter]].wcs = WCS(wcs_header)
self.data[self.filters[filter]].solved = solved
# delete cache object
# save solved to target
aa_series = []
with ProgressBar(len(series.data)) as bar:
for image in series.data:
bar.update()
try:
img, _ = aa.register(image.data, toalign.data)
image.data = img
aa_series.append(image)
except:
print('Image skipped')
self.data[self.filters[filter]].data = aa_series
self.data[self.filters[filter]].aligned = True
[docs] def dorphot(self, filter, zellars, zcontrol = None, shape = 21):
# get seeing from PSF
stack = self.data[self.filters[filter]]
# if no wcs, complain alot
w = stack.wcs
xy = w.wcs_world2pix(zellars.coords.ra.deg, zellars.coords.dec.deg, 1)
if zcontrol != None:
xyc = w.wcs_world2pix(zcontrol.coords.ra.deg, zcontrol.coords.dec.deg, 1)
pos = Table(names=['x_0', 'y_0'], data = ([float(xy[0]), float(xyc[0])], [float(xy[1]), float(xyc[1])]))
else:
pos = Table(names=['x_0', 'y_0'], data = ([float(xy[0])], [float(xy[1])]))
sigma_psf = 2.0
bkg_sigma = mad_std(stack.data[0])
daofind = DAOStarFinder(fwhm = 4., threshold = 3. * bkg_sigma)
daogroup = DAOGroup(2.0 * sigma_psf * gaussian_sigma_to_fwhm)
mmm_bkg = MMMBackground()
psf_model = IntegratedGaussianPRF(sigma = sigma_psf)
psf_model.x_0.fixed = True
psf_model.y_0.fixed = True
fitter = LevMarLSQFitter()
bkgrms = MADStdBackgroundRMS()
times = []
exptimes = []
ray = []
decx = []
x = []
y = []
flux = []
fluxunc = []
apsum = []
apsum_unc = []
# if radec get xy
itera = 0
with ProgressBarOrSpinner(len(stack.data), msg = 'Performing PSF photometry') as bar:
for image in stack.data:
bar.update(itera)
itera = itera + 1
photometry = IterativelySubtractedPSFPhotometry(finder = daofind, group_maker = daogroup, bkg_estimator = mmm_bkg,
psf_model = psf_model, fitter = LevMarLSQFitter(), niters = 1, fitshape = (shape, shape))
results = photometry(image = image, init_guesses= pos)
[ra, dec] = w.wcs_pix2world(results['x_fit'], results['y_fit'], 1)
times.append(Time(image.header['DATE-OBS']))
exptimes.append(image.header['EXPTIME'])
ray.append(ra)
decx.append(dec)
x.append(results['x_fit'][0])
y.append(results['y_fit'][0])
if zcontrol != None:
apsum.append(results['flux_fit'][0] - results['flux_fit'][1])
flux.append((results['flux_fit'][0] - results['flux_fit'][1])/image.header['EXPTIME'])
else:
apsum.append(results['flux_fit'][0])
flux.append(results['flux_fit'][0]/image.header['EXPTIME'])
fluxunc.append(results['flux_unc'][0]) ## TODO:: modify this to account for exposure time and control
apsum_unc.append(results['flux_unc'][0])
# ts = QTable([times, exptimes, x, y, ray, decx, flux, fluxunc], names=('time', 'exptime', 'x', 'y', 'ra', 'dec', 'flux', 'flux_unc'), meta={'name': filter})
ts = timeSeries(times = times, flux = flux, exptimes = exptimes, x = x, y = y, ra = ray, dec = decx, flux_unc = fluxunc, apsum = apsum, apsum_unc = apsum_unc)
zellars.filters[filter] = len(zellars.ts)
zellars.ts.append(ts)