Source code for dorado.core.readerClass

import warnings
warnings.filterwarnings('ignore')

from ..ceres import Ceres
from ..stack import Stack
from ..core.coreClass import *
from ..core.utils import *

# import utils
import ccdprocx

from astropy.nddata.ccddata import CCDData
CCDData._config_ccd_requires_unit = False
from astropy.time import Time
from astropy.io import fits
from astropy.utils.misc import isiterable

import lightkurve as lk
from lightkurve.lightcurve import TessLightCurve as tlc

import os
from tqdm import tqdm
import numpy as np

__all__ = ['reader', 'aico_reader'] #, 'tess_reader'

[docs]class reader: ''' readers need to have a dirscan method and a savewrok method. Readers should also probably have a mkceres method. ''' def __init__(self): # should report to logger and read from config here # will this carry over to inherited classes? self.desired_datatype = 'uint16' # datatype to force onto images
[docs] def diread(self, dirarray): """ diread intakes a filepath array and catalogues the contents by either 'is file' or 'is directory' and returns the resulting lists. Parameters ---------- dirarray: str array array containing the path to the desired directory. Returns ------- """ if isiterable(dirarray): path = Dorado.dordir for dir in dirarray: path = path / dir else: path = dirarray # path = self.dordir / 'data' / 'raw' / date contents = os.scandir(path = path) files = [] directories = [] for entry in contents: if not entry.name.startswith('.'): if entry.is_file(): files.append(entry) if entry.is_dir(): directories.append(entry) return files, directories
[docs] def newdat(self): """ newdat is an nfinished function that will scan the '$user/.dorado/data/raw' directory for new data to be processed. Parameters ---------- Returns ------- """ # find data that hasn't been processed yet print('searching for unprocessed data...')
[docs] def force16(self, hdu): """ An unfinished convinience function for forcing the datatype of CCDData to be 16-bit instead of 32-bit for consistency and filesize optimization. Parameters ---------- hdu: CCDData hdu to force 16-bit datatype. Returns ------- NOTE:: in the future this will handle arrays of data instead of single HDU's and will allow for specifying the bit datatype. """ print('This function is not implemented yet. See mkBias() for example functionality.')
# TODO Make it so that each class isnt loaded unless chosen (core class takes string and then imports correct class)
[docs]class aico_reader(reader): ''' ''' def __init__(self): self.biasstr = ['Bias', 'Bias', 'bias', 'BIAS'] self.flatsstr = ['FLAT', 'FlatField', 'flat', 'Flat', 'Flats', 'flats', 'FLATS', 'FlatFields'] self.lightsstr = ['lights', 'Lights', 'LIGHTS'] self.stack_types = {'bias':self.biasstr, 'flats':self.flatsstr, 'lights':self.lightsstr} self.unique_lights = True # whether to find lights by non calibration files (True) or via self.lightstr (False) self.require_cal = False # whether calibration frames are essential, this should be dynamically set by the read calibration level self.calibration_window = 7 def _single_level(self, files): bias = self._read_stack(files, 'bias') flats = self._read_stack(files, 'flats') lights = self._read_stack(files, 'lights') return bias, flats, lights def _multi_level_top(self, files, directories): biasdir = [s for s in directories if s.name in self.biasstr] # There should never be multiple bias directories; unless bias were taken with multiple binning settings or taken multiple times. # Consult chief data officer about this. TODO bias_files, _ = self.diread(biasdir[0]) bias = self._read_stack(bias_files, 'bias') # It is possible to have multiple flats directories for different filters, they should all be contained in a single base flats directory tho flatsdir = [s for s in directories if s.name in self.flatsstr] flats = self._multi_level_sub(flatsdir[0], 'flats') if self.unique_lights: lightsdir = [s for s in directories if (s.name not in self.flatsstr) and (s.name not in self.biasstr)] # may hand back more than one directory else: lightsdir = [s for s in directories if s.name in self.lightsstr] # may hand back more than one directory lights = self.multi_level_sub(lightsdir[0], 'lights') return bias, flats, lights def _multi_level_sub(self, subdir, stack_type): files, directories = self.diread(subdir) sub_files = [] if len(directories) == 0: print('Single directory ', stack_type, ' organization format detected.') sub_files = self._read_stack(files, stack_type) elif len(files) == 0: print('Multi directory ', stack_type, ' organization format detected.') for d in directories: # should not be any further sub folders f, _ = self.diread(d) sub_files += self._read_stack(f, stack_type) # should we continue to hand stack_type if only the desired frame type should be present this deep in the mix? return sub_files def _read_stack(self, files, stack_type): if (stack_type == 'lights') & (self.unique_lights): stackstr = self.stack_types['bias'] + self.stack_types['flats'] # NOTE:: in the future this should be modded to include all non light stack types. else: try: stackstr = self.stack_types[stack_type] except: stackstr = stack_type stack_list = [] for st in stackstr: for s in files: if (stack_type == 'lights') & (self.unique_lights): if (str(st)) not in str(s.name): stack_list.append(s) else: if (str(st)) in str(s.name): stack_list.append(s) stack_files = [] if len(stack_list) > 50: n = 0 # this is a very janky way of writing this, enjoy for f in (pbar := tqdm(stack_list, colour = 'green')): n += 1 pbar.set_description('Reading ' + stack_type + ' : ' + str(n)) pbar.refresh() hdu = CCDData.read(f.path) stack_files.append(hdu) else: for f in stack_list: hdu = CCDData.read(f.path) stack_files.append(hdu) return stack_files
[docs] def dirscan(self, dirarray): ''' Dirscan takes a filepath array and scans the resulting directory for Bias, Flats, and Lights based on a simple filename pattern. Data can be either all in this single directory or in appropriately named sub directories(i.e. '/bias', '/flats', and '/lights'). **This currently only supports single filter data** Parameters ---------- dirarray: str array array containing the path to the desired directory. ''' # TODO handle multi filter searching path = Dorado.dordir for dir in dirarray: path = path / dir files, directories = Dorado.diread(path) # TODO check if files are actually fits files before we break the fits reader if len(directories) == 0: if len(files) == 0: raise Exception('No viable data found') else: print('Single directory level organization format detected.') return self._single_level(files) elif len(files) == 0: print('Multi directory level organization format detected.') # People need to adopt a standard way of saving stuff or I will go nuts trying to come up with all the different ways they can organize(or disorganize) their data. return self._multi_level_top(files, directories) # what if there is usable data in single level and we only search multi-level
def _chkPhi(self, stack_files): # sequence through headers saving every filter keyword phi_list = [] phi_stacks = {} for im in stack_files: phi = im.header['FILTER'] # hardcoded filter keyword for now if phi in phi_list: # check if filter is already accounted for phi_stacks[phi].append(im) else: phi_stacks[phi] = [im] for p in phi_list: print('Found ', len(phi_stacks[p]), ' ', p, ' frames.') # Should we go ahead and construct stacks first before returning? return phi_list, phi_stacks def _chkCal(self): pass
[docs] def mkceres(self, date, sub = 'raw', target = None, calibrated = False, aligned = False): ''' mkceres creates a ceres object from a given datestring for an observation and an optional dorado.target instance. mkceres can be pointed to calibrated or aligned data via the calibrated and aligned boolean flags (Note, in this case 'sub' should be set to 'wrk'). Parameters ---------- date: date string Instance of dorado.stack class to add to self. sub: string pointer string denoting subfolder to '$user/.dorado/data' (dorado directory/data/). Default is 'raw' target: dorado.target dorado.target instance to use as the target for the created dorado.ceres object. Default is None calibrated: boolean sets whether the imported images are calibrated and within an 'calibrated' folder. Default is False. aligned: boolean sets whether the imported images are aligned and within an 'aligned' folder. Default is False. ''' # TODO needs ability to handle multifilter directories # TODO auto handle setting 'wrk' as subfolder if aligned: dirarray = ['data', sub, date, 'aligned'] elif calibrated: dirarray = ['data', sub, date, 'calibrated'] else: dirarray = ['data', sub, date] biases, flats, lights = self.dirscan(dirarray) print(len(flats), ' flats found.') print(len(biases), ' bias frames found.') print(len(lights), ' lights found') if len(lights) > 0: epoch = lights[0].header['DATE-OBS'] datestr = getDateString(epoch) # TODO import this mjdstr = str(np.round(Time(epoch, format='fits').mjd)) else: raise Exception("No usable lights found") # NOTE user may want to run calibration files without lights, or have lights without calibration files # Get Base Bias if len(biases) > 0: bias = self.mkBias(biases) else: bias = self.getBias(mjdstr) # find bias that are near in time # lets split the lights into filters phi_list, phi_stacks = self._chkPhi(lights) # lets split the flats into filters flat = {} if len(flats) > 0: phi_flat_list, phi_flat_stacks = self._chkPhi(flats) for f in phi_flat_list: flat[f] = self.mkFlat(phi_flat_stacks[f]) for phi in phi_list: # This is not a very stable section, needs some love later to handle niche cases # find missing flats near in time to data if phi not in phi_flat_list: phi_flat = self.getFlat(mjdstr, phi) flat[phi] = phi_flat # Construct cere class cere = Ceres(time = Time(epoch, format='fits'), datestr=datestr) cere.bias = bias # Construct stacks and hand them to Ceres for phi in phi_list: cere.add_stack(Stack(phi_stacks[phi], flat = flat[phi], calibrated = calibrated, aligned = aligned, target = target)) return cere
[docs] def mkFlat(self, flats): """ mkFlat takes a list of flats to construct a calibrated flatfield image. Parameters ---------- flats: array[CCDdata] array of raw flatfields. ------------> this needs to be corrected for the new image storage format Returns ------- flat: CCDdata The combined calibrated flatfield image. """ date = Time(flats[0].header['DATE-OBS'], format='fits').mjd filt = flats[0].header['filter'] ## TODO :: standardize filter names and include telescope profiles fname = str(int(date)) + '_' + str(filt) + '_flat.fits' flatdir = Dorado.dordir / 'data' / 'flats' contents = os.scandir(path = flatdir) save = True for entry in contents: if fname in entry.name: save = False flat = CCDData.read(flatdir / fname) #, unit = Dorado.unit) ## NOTE edited # TODO Allow RGB data if save: c = ccdprocx.Combiner(flats) c.sigma_clipping() flat = c.median_combine() # , method = 'average', # sigma_clip = True, sigma_clip_low_thresh = 5, sigma_clip_high_thresh = 5, # sigma_clip_func = np.ma.median, sigma_clip_dev_func = mad_std, unit = self.unit) flat.header['stacked'] = True flat.header['numsubs'] = len(flats) flat.header['DATE-OBS'] = flats[0].header['DATE-OBS'] flat.header['filter'] = flats[0].header['filter'] ## TODO :: There is probably more missing keywords in the combined header, where's Waldo... flat.data = flat.data.astype('uint16') print('Saving Flat for later use') flat.write(flatdir / fname) else: print('Flat for filter and date already saved.') return flat
[docs] def mkBias(self, biasIFC): """ mkBias takes a list of bias images to construct a combined bias image. ------------> this needs to be corrected for the new image storage format Parameters ---------- biasIFC: array[CCDdata] array of raw bias images. Returns ------- bias: CCDdata The combined bias image. """ # Allow specification of median or mean # Allow RGB data date = Time(biasIFC[0].header['DATE-OBS'], format='fits').mjd fname = str(int(date)) + '_Bias.fits' biasdir = Dorado.dordir / 'data' / 'bias' contents = os.scandir(path = biasdir) save = True for entry in contents: if fname in entry.name: save = False bias = CCDData.read(biasdir / fname) #, unit = Dorado.unit) ## NOTE edited if save: bias = ccdprocx.combine(biasIFC, method = 'average') #, unit = Dorado.unit) ## NOTE edited bias.meta['stacked'] = True bias.header['numsubs'] = len(biasIFC) # date = Time(bias.header['DATE-OBS'], format='fits').mjd bias.data = bias.data.astype('uint16') print('Saving Bias for later use') bias.write(biasdir / fname) else: print('Bias for date already saved.') return bias
[docs] def savewrk(self, cr, filters = None): """ savewrk takes a ceres key and a list of filters to save and saves the desired data to the dorado working directory '$user/.dorado/data/wrk/'. Parameters ---------- cr: string The relevent name string of Ceres instance to save. filter: str String representation of the relevent filter. Default is all filters in ceres instance. Returns ------- """ # TODO mod fplate to accept cr name if Dorado.ceres[Dorado.ceres_keys[cr]].datestr == None: Dorado.getDateString(cr) wrkdir = Dorado.dordir / 'data' / 'wrk' datestr = Dorado.ceres[Dorado.ceres_keys[cr]].datestr print('Saved to data/wrk/', datestr) Dorado.mkwrk(cr) if filters == None: filters = Dorado.ceres[Dorado.ceres_keys[cr]].filters.keys() for filter in filters: fildat = Dorado.ceres[Dorado.ceres_keys[cr]].data[Dorado.ceres[Dorado.ceres_keys[cr]].filters[filter]] if (fildat.target == None): fplate = str(int(Dorado.ceres[Dorado.ceres_keys[cr]].date.mjd)) + '-' + filter + '_' else: fplate = str(fildat.target.name) + '-' + filter + '_' if (fildat.calibrated == True) and (fildat.aligned == True): wrdir = wrkdir / datestr / 'aligned' if len(filters) != 1: os.makedirs(wrkdir / datestr / 'aligned' / filter, exist_ok = True) wrdir = wrdir / filter fsub = '_ca' elif (fildat.calibrated == True): wrdir = wrkdir / datestr / 'calibrated' if len(filters) != 1: os.makedirs(wrkdir / datestr / 'calibrated' / filter, exist_ok = True) wrdir = wrdir / filter fsub = '_c' else: wrdir = wrkdir / datestr / 'uncalibrated' if len(filters) != 1: os.makedirs(wrkdir / datestr / 'uncalibrated' / filter, exist_ok = True) wrdir = wrdir / filter fsub = '' if fildat.base != None: fname = fplate + '_base.fits' fildat.base.write(wrkdir / datestr / fname, overwrite = True) if fildat.solved != None: fname = fplate + '_solved.fits' fildat.solved.write(wrkdir / datestr / fname, overwrite = True) for p in range(len(fildat.data)): image = fildat.data[p] tstr = str(Time(image.header['DATE-OBS'], format='fits', out_subfmt='date_hms').value) image.data = image.data.astype('uint16') fname = fplate + str(p) + '_' + tstr + fsub + '.fits' image.write(wrdir / fname, overwrite = True)
[docs] def getBias(self, mjdstr): # NOTE This function does not account for binning/image size target_mjd = int(mjdstr) biasdir = Dorado.dordir / 'data' / 'bias' contents = os.scandir(path = biasdir) candidates = [] for entry in contents: entry_mjd = int(entry.name[0:4]) # grab first five characters of filename. Should be the mjd number if np.abs(target_mjd - entry_mjd) <= self.calibration_window: candidates.append(entry_mjd) if len(candidates) == 0: print('No viable bias frames found. Try increasing your calibration window.') else: mjd_to_use = candidates[np.argmin(candidates- entry_mjd)] #locate the mjd of the closest file in time fname = str(mjd_to_use) + '_Bias.fits' bias = CCDData.read(biasdir / fname) #, unit = Dorado.unit) ## NOTE edited return bias
[docs] def getFlat(self, mjdstr, phi): target_mjd = int(mjdstr) flatdir = Dorado.dordir / 'data' / 'flats' contents = os.scandir(path = flatdir) candidates = [] for entry in contents: if phi in entry.name: entry_mjd = int(entry.name[0:4]) # grab first five characters of filename. Should be the mjd number if np.abs(target_mjd - entry_mjd) <= self.calibration_window: candidates.append(entry_mjd) if len(candidates) == 0: print('No viable flat frames found. Try increasing your calibration window or using a different filter.') else: mjd_to_use = candidates[np.argmin(candidates- entry_mjd)] fname = str(mjd_to_use) + '_' + str(phi) + '_flat.fits' flat = CCDData.read(flatdir / fname) #, unit = Dorado.unit) ## NOTE edited return flat
# class tess_reader: # def __init__(self): # Dorado.init_dir(tess = True) # # check if a name query is needed to retrieve data # # initialize # def mkceres(self, tid, sub = 'raw', target = None): # fits_file = self.dirscan() # tpf = lk.open(fits_file) # with fits.open(fits_file, mode="readonly") as hdulist: # tess_bjds = hdulist[1].data['TIME'] # Should I use 'TIMECORR'? # tess_bjds_corr = hdulist[1].data['TIMECORR'] # raw_counts = hdulist[1].data['RAW_CNTS'] # calibrated_fluxes = hdulist[1].data['FLUX'] # Should I use raw or calibrated? # flux_err = hdulist[1].data['FLUX_ERR'] # tid = hdulist[0].header['TICID'] # TSTART = hdulist[0].header['TSTART'] # TSTOP = hdulist[0].header['TSTOP'] # tidstr = 'TIC ' + str(tid) # cere = Ceres() # # need to make a stack, and reconfigure the ceres class to be more generic in data storage. maybe make the dorphot function # # a wrapper, or make the ability to produce a target pixel file of aico data. # return cere # def dirscan(self, Dorado, dirarray): # path = Dorado.dordir # for dir in dirarray: # path = path / dir # files, directories = Dorado.diread(path) # if len(files) == 0: # print('No files found...') # elif len(files) > 1: # for file in files: # if '_tp.fits' in file: # tp_file = file # elif len(files) == 1: # tp_file = files[0] # return tp_file