Source code for astro_ghost.DLR

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import matplotlib.patches as mpatches
from matplotlib import colors
from astro_ghost.sourceCleaning import clean_dict
from astro_ghost.NEDQueryFunctions import getNEDInfo
from astropy import units as u
from astropy.coordinates import SkyCoord, Angle, Distance
from astropy.wcs import WCS
from astropy.cosmology import FlatLambdaCDM, z_at_value
from astropy.utils.data import get_pkg_data_filename
from astroquery.vizier import Vizier


[docs]def choose_band_SNR(host_df): """Gets the PS1 band (of grizy) with the highest SNR in PSF mag. From https://www.eso.org/~ohainaut/ccd/sn.html, Error on Mag ~ 1/ (S/N) So calculating S/N for each band as 1/PSFMagErr Estimate the S/N of each band and choose the bands with the highest S/N for the rest of our measurements. :param host_df: The dataframe containing the candidate host galaxy (should just be one galaxy). :type host_df: Pandas DataFrame :return: The PS1 band with the highest S/N. :rtype: str """ host_df.reset_index(inplace=True, drop=True) bands = 'grizy' SNR = [] try: for band in bands: SNR.append(float(1/host_df.iloc[0, "%sKronMagErr"%band])) i = np.nanargmax(np.array(SNR)) except: #if we have issues getting the band with the highest SNR, just use r-band i = 1 return bands[i]
[docs]def calc_DLR_glade(ra_SN, dec_SN, ra_host, dec_host, r_a, a_over_b, phi): """Calculates the DLR between transients and GLADE host galaxy candidates. (very similar to calc_DLR but the parameters are calculated slightly differently) :param ra_SN: The right ascension of the SN, in degrees. :type ra_SN: float :param dec_SN: The declination of the SN, in degrees. :type dec_SN: float :param ra_host: The right ascension of the host, in degrees. :type ra_host: float :param dec_host: The declination of the host, in degrees. :type dec_host: float :param r_a: The semi-major axis of the host in arcseconds. :type r_a: float :param a_over_b: The candidate host axis ratio. :type a_over_b: float :param phi: The galaxy position angle (in radians). :type phi: float :return: The angular separation between galaxy and transient, in arcseconds. :rtype: float :return: The normalized distance (angular separation divided by the DLR). :rtype: float """ xr = (ra_SN- float(ra_host))*3600 yr = (dec_SN - float(dec_host))*3600 SNcoord = SkyCoord(ra_SN*u.deg, dec_SN*u.deg, frame='icrs') hostCoord = SkyCoord(ra_host*u.deg, dec_host*u.deg, frame='icrs') sep = SNcoord.separation(hostCoord) dist = sep.arcsecond badR = 10000000000.0 gam = np.arctan2(xr, yr) theta = phi - gam DLR = r_a/np.sqrt(((a_over_b)*np.sin(theta))**2 + (np.cos(theta))**2) R = float(dist/DLR) if (R != R): return dist, badR return dist, R
[docs]def calc_DLR(ra_SN, dec_SN, ra_host, dec_host, r_a, r_b, source, best_band): """Calculate the directional light radius for a given galaxy and transient pair. Calculation is adapted from Gupta et al., 2013 found at https://repository.upenn.edu/cgi/viewcontent.cgi?referer=https://www.google.com/&httpsredir=1&article=1916&context=edissertations. :param ra_SN: The right ascension of the SN, in degrees. :type ra_SN: float :param dec_SN: The declination of the SN, in degrees. :type dec_SN: float :param ra_host: The right ascension of the host, in degrees. :type ra_host: float :param dec_host: The declination of the host, in degrees. :type dec_host: float :param r_a: The semi-major axis of the host in arcseconds. :type r_a: float :param r_b: The semi-minor axis of the host in arcseconds. :type r_b: float :param source: The Dataframe containing the PS1 information for the candidate host galaxy. :type source: Pandas DataFrame :param best_band: The PS1 passband with the highest S/N, from which second-order moments are estimated. :type best_band: str :return: The angular separation between galaxy and transient, in arcseconds. :rtype: float :return: The normalized distance (angular separation divided by the DLR). :rtype: float """ source.reset_index(inplace=True, drop=True) xr = (ra_SN.deg - float(ra_host))*3600 yr = (dec_SN.deg - float(dec_host))*3600 SNcoord = SkyCoord(ra_SN, dec_SN, frame='icrs') hostCoord = SkyCoord(ra_host*u.deg, dec_host*u.deg, frame='icrs') sep = SNcoord.separation(hostCoord) dist = sep.arcsecond badR = 10000000000.0 XX = float(source.loc[0, best_band + 'momentXX']) YY = float(source.loc[0, best_band + 'momentYY']) XY = float(source.loc[0, best_band + 'momentXY']) # if we don't have spatial information, get rid of it # this gets rid of lots of artifacts without radius information if (XX != XX) | (XY != XY) | \ (YY != YY): return dist, badR U = XY Q = XX - YY if Q == 0: return dist, badR phi = 0.5*np.arctan(U/Q) kappa = Q**2 + U**2 a_over_b = (1 + kappa + 2*np.sqrt(kappa))/(1 - kappa) gam = np.arctan2(xr, yr) theta = phi - gam DLR = r_a/np.sqrt(((a_over_b)*np.sin(theta))**2 + (np.cos(theta))**2) R = float(dist/DLR) if (R != R): return dist, badR return dist, R
[docs]def calc_DLR_SM(ra_SN, dec_SN, ra_host, dec_host, r_a, elong, phi, source, best_band): """ Calculate the DLR method but for Skymapper (southern-hemisphere) sources, which don't have xx and yy moments reported in the catalog. :param ra_SN: The right ascension of the SN, in degrees. :type ra_SN: float :param dec_SN: The declination of the SN, in degrees. :type dec_SN: float :param ra_host: The right ascension of the host, in degrees. :type ra_host: float :param dec_host: The declination of the host, in degrees. :type dec_host: float :param r_a: The semi-major axis of the host in arcseconds. :type r_a: float :param elong: The elongation parameter of the galaxy. :type elong: float :param phi: The rotation angle of the galaxy, in radians. :type phi: float :param source: The Dataframe containing the PS1 information for the candidate host galaxy. :type source: Pandas DataFrame :param best_band: The PS1 passband with the highest S/N, from which second-order moments are estimated. :type best_band: str :return: The angular separation between galaxy and transient, in arcseconds. :rtype: float :return: The normalized distance (angular separation divided by the DLR). :rtype: float """ # EVERYTHING IS IN ARCSECONDS ## taken from "Understanding Type Ia Supernovae Through Their Host Galaxies..." by Gupta #https://repository.upenn.edu/cgi/viewcontent.cgi?referer=https://www.google.com/&httpsredir=1&article=1916&context=edissertations xr = (ra_SN.deg - float(ra_host))*3600 yr = (dec_SN.deg - float(dec_host))*3600 SNcoord = SkyCoord(ra_SN, dec_SN, frame='icrs') hostCoord = SkyCoord(ra_host*u.deg, dec_host*u.deg, frame='icrs') sep = SNcoord.separation(hostCoord) dist = sep.arcsecond badR = 10000000000.0 # if we don't have shape information for a southern-hemisphere galaxy, return if (float(r_a) != float(r_a)) | (float(elong) != float(elong)): return dist, badR gam = np.arctan2(xr, yr) theta = phi - gam #elong == a/b, which allows us to substitute here DLR = r_a/np.sqrt(((elong)*np.sin(theta))**2 + (np.cos(theta))**2) R = float(dist/DLR) if (R != R): return dist, badR return dist, R
[docs]def chooseByDLR(path, hosts, transients, fn, orig_dict, todo="s"): """The wrapper function for selecting hosts by the directional light radius method introduced in Gupta et al., 2013. :param path: Filepath where to write out the results of the DLR algorithm. :type path: str :param hosts: DataFrame containing PS1 information for all candidate hosts. :type hosts: Pandas DataFrame :param transients: DataFrame containing TNS information for all transients. :type transients: Pandas DataFrame :param fn: Filename to write the results of the associations (useful for debugging). :type fn: str :param orig_dict: Dictionary consisting of key,val pairs of transient names, and lists of their candidate host galaxy objIDs in PS1. :type orig_dict: dictionary :param todo: If todo == \\'s\\', save the dictionary and the list of remaining sources. If todo == \\'r\\', return them. :type todo: str :return: The dataframe of PS1 properties for host galaxies found by DLR. :rtype: Pandas DataFrame :return: Dictionary of matches after DLR, with transient names as keys and a list of host galaxy pan-starrs objIDs as values. :rtype: dictionary :return: List of transients for which no reliable host galaxy was found. :rtype: array-like :return: List of transients for which an issue arose in DLR (most likely, a candidate host galaxy in the field didn't have radius information). This list is used to recommend candidates to associate via the gradient ascent method. :rtype: array-like """ dict_mod = orig_dict.copy() if todo=="s": if not os.path.exists(path+'/dictionaries'): os.makedirs(path+'/dictionaries') if not os.path.exists(path+'/tables'): os.makedirs(path+'/tables') hosts['dist/DLR'] = np.nan hosts['dist'] = np.nan noHosts = [] clean_dict(orig_dict, hosts, []) f = open(path+fn, 'w') GDflag = 0 GA_SN = [] for name, host in orig_dict.items(): if (type(host) is not np.int64 and type(host) is not float): if (len(host.shape) > 0) and (host.shape[0] > 0): R_dict = {} ra_dict = {} dist_dict = {} host = np.array(host) if len(host)>0: for tempHost in host: theta = 0 host_df = hosts[hosts["objID"] == tempHost] transient_df = transients[transients["Name"] == str(name)] # If no good, we want to artificially inflate the distance to the SN so that # we don't incorrectly pick this as our host # (but the radius goes into plotting, so ra is artificially shrunk) R = dist = 1.e10 r_a = 0.05 #flag for running gradient ascent, if the DLR method fails. GDflag = 1 if ":" in str(transient_df["RA"].values): ra_SN = Angle(transient_df["RA"].values, unit=u.hourangle) else: ra_SN = Angle(transient_df["RA"].values, unit=u.deg) dec_SN = Angle(transient_df["DEC"].values, unit=u.degree) ra_host = host_df['raMean'].values[0] dec_host = host_df['decMean'].values[0] if len(np.array(ra_SN)) > 1: ra_SN = ra_SN[0] dec_SN = dec_SN[0] if (dec_SN.deg > -30): band = choose_band_SNR(host_df) r_a = r_b = float(host_df[band + 'KronRad'].values[0]) dist, R = calc_DLR(ra_SN, dec_SN, ra_host, dec_host, r_a, r_b, host_df, band) else: band = choose_band_SNR(host_df) r_a = float(host_df['%sKronRad'%band].values[0]) if r_a == r_a: elong = host_df[band + "_elong"].values[0] phi = np.radians(host_df[band + "_pa"].values[0]) r_a = host_df[band + 'KronRad'].values[0]*0.5 #plate scale # in arcsec, the radius containing 90% of the galaxy light. # This empirically has improved association performance # for southern-hemisphere sources. dist, R = calc_DLR_SM(ra_SN, dec_SN, ra_host, dec_host, r_a, elong, phi, host_df, band) R_dict[tempHost] = R ra_dict[tempHost] = r_a dist_dict[tempHost] = dist hosts.loc[hosts['objID'] == tempHost, 'dist/DLR'] = R hosts.loc[hosts['objID'] == tempHost, 'dist'] = dist print("\n transient = \\", file=f) print(name, file=f) print("offset/DLR = \\", file=f) #round for printing purposes R_dict_print = {k:round(v,2) if isinstance(v,float) else v for k,v in R_dict.items()} print(R_dict_print, file=f) ra_dict_print = {k:round(v,2) if isinstance(v,float) else v for k,v in ra_dict.items()} print("candidate semi-major axis = \\", file=f) print(ra_dict_print, file=f) #subset so that we're less than 5 in DLR units chosenHost = min(R_dict, key=R_dict.get) if R_dict[chosenHost] > 5.0: #If we can't find a host, say that this galaxy has no host dict_mod[name] = np.nan noHosts.append(name) print("No host chosen! r/DLR > 5.0.", file=f) continue else: R_dict_sub = dict((k, v) for k, v in R_dict.items() if v <= 5.0) #Sort from lowest to highest DLR value R_dict_sub = {k: v for k, v in sorted(R_dict_sub.items(), key=lambda item: item[1])} if len(R_dict_sub.keys()) > 1: gal_hosts = [] Simbad_hosts = [] for key in R_dict_sub: tempType = hosts[hosts['objID'] == key]['NED_type'].values[0] hasSimbad = hosts[hosts['objID'] == key]['hasSimbad'].values[0] if (tempType == "G"): gal_hosts.append(key) if (hasSimbad) & (tempType != '*'): Simbad_hosts.append(key) if len(gal_hosts) > 0: if gal_hosts[0] != chosenHost and R_dict[gal_hosts[0]] < 5.0: chosenHost = gal_hosts[0] #only change if we're within the light profile of the galaxy print("Choosing the galaxy with the smallest DLR - nearest source had DLR > 1!", file=f) if len(Simbad_hosts) > 0: print("Chosen SIMBAD host!", file=f) if Simbad_hosts[0] != chosenHost and R_dict[Simbad_hosts[0]] < 5.0: chosenHost = Simbad_hosts[0] #only change if we're within the light profile of the galaxy print("Choosing the simbad source with the smallest DLR!", file=f) dict_mod[name] = chosenHost hosts.loc[hosts['objID'] == chosenHost, 'dist/DLR'] = R_dict[chosenHost] hosts.loc[hosts['objID'] == chosenHost, 'dist'] = dist_dict[chosenHost] if (GDflag): GDflag = 0 print("Issue with DLR. Try Gradient Descent!", file=f) GA_SN.append(name) print(float(hosts[hosts['objID'] == chosenHost]['raMean'].values[0]), float(hosts[hosts['objID'] == chosenHost]['decMean'].values[0]), file=f) f.flush() f.close() if todo == "s": with open('../dictionaries/DLR_rankOrdered_hosts.p', 'wb') as fp: pickle.dump(dict_mod, fp, protocol=pickle.HIGHEST_PROTOCOL) hosts.to_csv("../tables/DLR_rankOrdered_hosts.csv") return elif todo =="r": return hosts, dict_mod, noHosts, GA_SN
[docs]def chooseByGladeDLR(path, fn, snDF, verbose=False, todo='r'): """The wrapper function for selecting hosts by the DLR method (Gupta et al., 2013). Here, candidate hosts are taken from the GLADE (Dalya et al., 2021; arXiv:2110.06184) catalog. :param path: Filepath where to write out the results of the DLR algorithm. :type path: str :param fn: Filename to write the results of the associations (useful for debugging). :type fn: str :param transients: DataFrame containing TNS information for all transients. :type transients: Pandas DataFrame :param todo: If todo == \\'s\\', save the dictionary and the list of remaining sources. If todo == \\'r\\', return them. :type todo: str :return: The dataframe of properties for GLADE host galaxies found by DLR. :rtype: Pandas DataFrame :return: List of transients for which no reliable GLADE host galaxy was found. :rtype: array-like """ if todo=="s": if not os.path.exists(path+'/dictionaries'): os.makedirs(path+'/dictionaries') if not os.path.exists(path+'/tables'): os.makedirs(path+'/tables') f = open(path+fn, 'w') foundHostDF = [] noGladeHosts = [] for idx, row in snDF.iterrows(): name = str(row['Name']) ra_SN = float(row['RA']) dec_SN = float(row['DEC']) class_SN = str(row['Obj. Type']) #query the glade catalog Vizier.ROW_LIMIT = -1 Vizier.TIMEOUT = 500 result = Vizier.query_region(SkyCoord(ra=ra_SN, dec=dec_SN,unit=(u.deg, u.deg),frame='icrs'),radius=Angle(0.2, "deg"), catalog=["VII/275/glade1"]) if result: hosts = result[0].to_pandas() else: hosts = pd.DataFrame({'a_b':[np.nan], 'maj':[np.nan], 'min':[np.nan]}) # query NED for GLADE sources and get their radius GLADE_rad = hosts.dropna(subset=['a_b', 'maj', 'min']) GLADE_norad = hosts[~hosts.index.isin(GLADE_rad.index)] from astroquery.ipac.ned import Ned badRadCount = 0 for idx, row in GLADE_norad.iterrows(): try: result_table = Ned.query_region(SkyCoord(ra=row.RAJ2000*u.degree, dec=row.DEJ2000*u.degree, frame='icrs'), radius=(2/3600)*u.deg, equinox='J2000.0') diameters = Ned.get_table(result_table.to_pandas()['Object Name'].values[0], table='diameters') tempDF = diameters.to_pandas() tempMaj_arcsec = np.nanmedian(tempDF.loc[(tempDF['Major Axis Unit'] == 'arcsec') & (tempDF['Major Axis Flag'] == '(a)'), 'Major Axis']) AxisRatio = np.nanmedian(tempDF.loc[(tempDF['Axis Ratio Flag'] == '(b/a)'), 'Axis Ratio']) tempMin_arcsec = tempMaj_arcsec*AxisRatio GLADE_norad.loc[GLADE_norad.index == idx, 'maj'] = tempMaj_arcsec*2./60 GLADE_norad.loc[GLADE_norad.index == idx, 'min'] = tempMin_arcsec*2./60 GLADE_norad.loc[GLADE_norad.index == idx, 'a_b'] = 1/AxisRatio except: badRadCount +=1 #recombine if verbose: print("No NED radius found for %i GLADE galaxies."%badRadCount, file=f) hosts = pd.concat([GLADE_rad, GLADE_norad], ignore_index=True) hosts.dropna(subset=['a_b', 'maj', 'min'], inplace=True) if len(hosts)<1: noGladeHosts.append(name) continue hosts['MajorRad'] = hosts['maj']*60./2 #in arcsec, to radius hosts['MinorRad'] = hosts['min']*60./2 #in arcsec, to radius #get names for the galaxies that match hosts.rename(columns={'RAJ2000':'raMean','DEJ2000':'decMean'}, inplace=True) hosts = getNEDInfo(hosts) R_dict = {} ra_dict = {} dist_dict = {} for idx, row in hosts.iterrows(): tempHost = row['NED_name'] phi = np.radians(row['PAHyp']) r_a = row['MajorRad'] #if it's a mostly round galaxy, the position angle doesn't matter! if (phi != phi) & (row['a_b'] >= 0.9): phi = 0 dist, R = calc_DLR_glade(ra_SN, dec_SN, row['raMean'], row['decMean'], r_a, row['a_b'], phi) R_dict[tempHost] = R ra_dict[tempHost] = r_a dist_dict[tempHost] = dist hosts.loc[hosts['NED_name'] == tempHost, 'dist/DLR'] = R hosts.loc[hosts['NED_name'] == tempHost, 'dist'] = dist print("\n transient = \\", file=f) print(name, file=f) print("offset/DLR = \\", file=f) #round for printing purposes R_dict_print = {k:round(v,2) if isinstance(v,float) else v for k,v in R_dict.items()} print(R_dict_print, file=f) ra_dict_print = {k:round(v,2) if isinstance(v,float) else v for k,v in ra_dict.items()} print("candidate semi-major axis = \\", file=f) print(ra_dict_print, file=f) #subset so that we're less than 5 in DLR units chosenHost = min(R_dict, key=R_dict.get) if R_dict[chosenHost] > 5.0: #If we can't find a host, say that this galaxy has no host noGladeHosts.append(name) print("No host chosen! r/DLR > 5.0.", file=f) continue else: print("Selecting GLADE host: %s"%chosenHost, file=f) foundHost = hosts.loc[hosts['NED_name'] == chosenHost] #add in the associated transient's information foundHost['TransientRA'] = ra_SN foundHost['TransientDEC'] = dec_SN foundHost['TransientName'] = name foundHost['TransientClass'] = class_SN foundHostDF.append(foundHost) f.flush() if len(foundHostDF) > 0: foundHostDF = pd.concat(foundHostDF, ignore_index=True) # adding some relevant redshift information foundHostDF['GLADE_redshift'] = np.nan foundHostDF['GLADE_redshift_flag'] = '' #assume standard cosmology cosmo = FlatLambdaCDM(H0=70, Om0=0.3, Tcmb0=2.725) for idx, row in foundHostDF.iterrows(): if row['Dist'] == row['Dist']: dist = Distance(value=row['Dist'], unit=u.Mpc) calc_z = z_at_value(cosmo.luminosity_distance, dist, zmin=1.e-5, zmax=1, method='Bounded').value foundHostDF['GLADE_redshift'] = calc_z if row['Flag'] <= 3: foundHostDF['GLADE_redshift_flag'] = 'SPEC' else: foundHostDF['GLADE_redshift_flag'] = 'PHOT' #print some relevant information to terminal print("Found %i hosts in GLADE! See %s for details."%(len(foundHostDF), fn)) if todo == "s": foundHostDF.to_csv("../tables/gladeDLR_hosts.csv") return elif todo == "r": return foundHostDF, noGladeHosts else: foundHostDF = pd.DataFrame({'TransientName':[], 'raMean':[], 'decMean':[]}) print("Found no hosts in GLADE.") if todo == 'r': return foundHostDF, noGladeHosts f.close()