Source code for fpfs.catalog

# FPFS shear estimator
# Copyright 20210805 Xiangchong Li.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# python lib
import numpy as np


# functions used for selection
[docs] def tsfunc1(x, deriv=0, mu=0.0, sigma=1.5): """Returns the weight funciton [deriv=0], or the *multiplicative factor* to the weight function for first order derivative [deriv=1]. This is for C1 function Args: deriv (int): whether do derivative [deriv=1] or not [deriv=0] x (ndarray): input data vector mu (float): center of the cut sigma (float): width of the selection function Returns: out (ndarray): the weight funciton [deriv=0], or the *multiplicative factor* to the weight function for first order derivative [deriv=1] """ t = (x - mu) / sigma if deriv == 0: return np.piecewise( t, [t < -1, (t >= -1) & (t <= 1), t > 1], [0.0, lambda t: 1.0 / 2.0 + np.sin(t * np.pi / 2.0) / 2.0, 1.0], ) elif deriv == 1: # multiplicative factor (to weight) for derivative return np.piecewise( t, [t < -1 + 0.01, (t >= -1 + 0.01) & (t <= 1 - 0.01), t > 1 - 0.01], [ 0.0, lambda t: np.pi / 2.0 / sigma * np.cos(t * np.pi / 2.0) / (1.0 + np.sin(t * np.pi / 2.0)), 0.0, ], ) else: raise ValueError("deriv should be 0 or 1")
[docs] def tsfunc2(x, mu=0.0, sigma=1.5, deriv=0): """Returns the weight funciton [deriv=0], or the *multiplicative factor* to the weight function for first order derivative [deriv=1]. This is for C2 funciton Args: deriv (int): whether do derivative [deriv=1] or not [deriv=0] x (ndarray): input data vector mu (float): center of the cut sigma (float): width of the selection function Returns: out (ndarray): the weight funciton [deriv=0], or the *multiplicative factor* to the weight function for first order derivative [deriv=1] """ t = (x - mu) / sigma def func(t): return 1.0 / 2.0 + t / 2.0 + 1.0 / 2.0 / np.pi * np.sin(t * np.pi) def func2(t): # /(1./2.+t/2.+1./2./np.pi*np.sin(t*np.pi)) return 1.0 / 2.0 / sigma + 1.0 / 2.0 / sigma * np.cos(np.pi * t) def func3(t): return -np.pi / 2.0 / sigma**2.0 * np.sin(np.pi * t) def func4(t): return -((np.pi) ** 2.0) / 2.0 / sigma**3.0 * np.cos(np.pi * t) if deriv == 0: return np.piecewise(t, [t < -1, (t >= -1) & (t <= 1), t > 1], [0.0, func, 1.0]) elif deriv == 1: return np.piecewise( t, [t < -1 + 0.01, (t >= -1 + 0.01) & (t <= 1 - 0.01), t > 1 - 0.01], [0.0, lambda t: func2(t) / func(t), 0.0], ) elif deriv == 2: return np.piecewise( t, [t < -1 + 0.01, (t >= -1 + 0.01) & (t <= 1 - 0.01), t > 1 - 0.01], [0.0, lambda t: func3(t) / func(t), 0.0], ) elif deriv == 3: return np.piecewise( t, [t < -1 + 0.01, (t >= -1 + 0.01) & (t <= 1 - 0.01), t > 1 - 0.01], [0.0, lambda t: func4(t) / func(t), 0.0], ) else: raise ValueError("deriv can only be 0,1,2,3")
[docs] def sigfunc(x, deriv=0, mu=0.0, sigma=1.5): """Returns the weight funciton [deriv=0], or the *multiplicative factor* to the weight function for first order derivative [deriv=1] Args: deriv (int): whether do derivative [deriv=1] or not [deriv=0] x (ndarray): input data vector mu (float): center of the cut sigma (float): width of the selection function Returns: out (ndarray): the weight funciton [deriv=0], or the *multiplicative factor* to the weight function for first order derivative [deriv=1] """ expx = np.exp(-(x - mu) / sigma) if deriv == 0: # sigmoid function return 1.0 / (1.0 + expx) elif deriv == 1: # multiplicative factor (to weight) for derivative return 1.0 / sigma * expx / (1.0 + expx) else: raise ValueError("deriv should be 0 or 1")
[docs] def get_wsel_eff(x, cut, sigma, use_sig, deriv=0): """Returns the weight funciton [deriv=0], or the *multiplicative factor* to the weight function for first order derivative [deriv=1] Args: x (ndarray): input selection observable cut (float): the cut on selection observable sigma (float): width of the selection function use_sig (bool): whether use sigmoid [True] of truncated sine [False] deriv (int): whether do derivative (1) or not (0) Returns: out (ndarray): the weight funciton [deriv=0], or the *multiplicative factor* to the weight function for first order derivative [deriv=1] """ if use_sig: out = sigfunc(x, deriv=deriv, mu=cut, sigma=sigma) else: out = tsfunc2(x, deriv=deriv, mu=cut, sigma=sigma) return out
[docs] def get_wbias(x, cut, sigma, use_sig, w_sel, rev=None): """Returns the weight bias due to shear dependence and noise bias [first order in w] Args: x (ndarray): selection observable cut (float): the cut on selection observable sigma (float): width of the selection function use_sig (bool): whether use sigmoid [True] of truncated sine [False] w_sel (ndarray): selection weights as function of selection observable rev (ndarray): selection response array Returns: cor (float): correction for shear response """ if rev is None: cor = 0.0 else: cor = np.sum(rev * w_sel * get_wsel_eff(x, cut, sigma, use_sig, deriv=1)) return cor
# functions to get derived observables from fpfs modes
[docs] def fpfs_m2e(mm, const=1.0, nn=None): """Estimates FPFS ellipticities from fpfs moments Args: mm (ndarray): FPFS moments const (float): the weight constant [default:1] nn (ndarray): noise covaraince elements [default: None] Returns: out (ndarray): an array of [FPFS ellipticities, FPFS ellipticity response, FPFS flux, size and FPFS selection response] """ # ellipticity, q-ellipticity, sizes, e^2, eq types = [ ("fpfs_e1", "<f8"), ("fpfs_e2", "<f8"), ("fpfs_ee", "<f8"), ("fpfs_s0", "<f8"), ("fpfs_s2", "<f8"), ("fpfs_s4", "<f8"), ("fpfs_R1E", "<f8"), ("fpfs_R2E", "<f8"), ("fpfs_RS0", "<f8"), ("fpfs_RS2", "<f8"), ] for i in range(8): types.append(("fpfs_R1Sv%d" % i, "<f8")) types.append(("fpfs_R2Sv%d" % i, "<f8")) # noirev if nn is not None: types = types + [ ("fpfs_HE100", "<f8"), ("fpfs_HE200", "<f8"), ("fpfs_HR00", "<f8"), ("fpfs_HE120", "<f8"), ("fpfs_HE220", "<f8"), ("fpfs_HR20", "<f8"), ] for i in range(8): types.append(("fpfs_HRv%d" % i, "<f8")) types.append(("fpfs_HE1v%d" % i, "<f8")) types.append(("fpfs_HE2v%d" % i, "<f8")) # make the output ndarray out = np.array(np.zeros(mm.size), dtype=types) # FPFS shape weight's inverse _w = mm["fpfs_M00"] + const # FPFS ellipticity e1 = mm["fpfs_M22c"] / _w e2 = mm["fpfs_M22s"] / _w q1 = mm["fpfs_M42c"] / _w q2 = mm["fpfs_M42s"] / _w # FPFS spin-0 observables s0 = mm["fpfs_M00"] / _w s2 = mm["fpfs_M20"] / _w s4 = mm["fpfs_M40"] / _w # intrinsic ellipticity e1e1 = e1 * e1 e2e2 = e2 * e2 e_m22 = e1 * mm["fpfs_M22c"] + e2 * mm["fpfs_M22s"] e_m42 = e1 * mm["fpfs_M42c"] + e2 * mm["fpfs_M42s"] # shear response for detection process (not for deatection function) for i in range(8): out["fpfs_R1Sv%d" % (i)] = e1 * mm["fpfs_v%dr1" % (i)] out["fpfs_R2Sv%d" % (i)] = e2 * mm["fpfs_v%dr2" % (i)] # NOTE: START NOIST BIAS REVISION # noirev if nn is not None: # Selection out["fpfs_HR00"] = ( -( nn["fpfs_N00N00"] * (const / _w + s4 - 4.0 * e1**2.0) - nn["fpfs_N00N40"] ) / _w / np.sqrt(2.0) ) out["fpfs_HR20"] = ( -( nn["fpfs_N00N20"] * (const / _w + s4 - 4.0 * e2**2.0) - nn["fpfs_N20N40"] ) / _w / np.sqrt(2.0) ) out["fpfs_HE100"] = -(nn["fpfs_N00N22c"] - e1 * nn["fpfs_N00N00"]) / _w out["fpfs_HE200"] = -(nn["fpfs_N00N22s"] - e2 * nn["fpfs_N00N00"]) / _w out["fpfs_HE120"] = -(nn["fpfs_N20N22c"] - e1 * nn["fpfs_N00N20"]) / _w out["fpfs_HE220"] = -(nn["fpfs_N20N22s"] - e2 * nn["fpfs_N00N20"]) / _w ratio = nn["fpfs_N00N00"] / _w**2.0 # Detection process and Shear Response for i in range(8): corr1 = ( -1.0 * nn["fpfs_N22cV%dr1" % i] / _w + 1.0 * e1 * nn["fpfs_N00V%dr1" % i] / _w + 1.0 * nn["fpfs_N00N22c"] / _w**2.0 * mm["fpfs_v%dr1" % i] ) corr2 = ( -1.0 * nn["fpfs_N22sV%dr2" % i] / _w + 1.0 * e2 * nn["fpfs_N00V%dr2" % i] / _w + 1.0 * nn["fpfs_N00N22s"] / _w**2.0 * mm["fpfs_v%dr2" % i] ) out["fpfs_R1Sv%d" % i] = (out["fpfs_R1Sv%d" % i] + corr1) / (1 + ratio) out["fpfs_R2Sv%d" % i] = (out["fpfs_R2Sv%d" % i] + corr2) / (1 + ratio) # Heissen out["fpfs_HRv%d" % i] = ( -( nn["fpfs_N00V%d" % i] * (const / _w + s4 - 2.0 * e1**2.0 - 2.0 * e2**2.0) - nn["fpfs_N40V%d" % i] ) / _w / np.sqrt(2.0) ) out["fpfs_HE1v%d" % i] = ( -(nn["fpfs_N22cV%d" % i] - e1 * nn["fpfs_N00V%d" % i]) / _w ) out["fpfs_HE2v%d" % i] = ( -(nn["fpfs_N22sV%d" % i] - e2 * nn["fpfs_N00V%d" % i]) / _w ) # intrinsic shape dispersion (not per component) e1e1 = ( e1e1 - (nn["fpfs_N22cN22c"]) / _w**2.0 + 4.0 * (e1 * nn["fpfs_N00N22c"]) / _w**2.0 ) - 3 * ratio * e1e1 e2e2 = ( e2e2 - (nn["fpfs_N22sN22s"]) / _w**2.0 + 4.0 * (e2 * nn["fpfs_N00N22s"]) / _w**2.0 ) - 3 * ratio * e2e2 e_m22 = ( e_m22 - (nn["fpfs_N22cN22c"] + nn["fpfs_N22sN22s"]) / _w + 2.0 * (nn["fpfs_N00N22c"] * e1 + nn["fpfs_N00N22s"] * e2) / _w ) / (1 + ratio) e_m42 = ( e_m42 - (nn["fpfs_N22cN42c"] + nn["fpfs_N22sN42s"]) / _w + 1.0 * (e1 * nn["fpfs_N00N42c"] + e2 * nn["fpfs_N00N42s"]) / _w + 1.0 * (q1 * nn["fpfs_N00N22c"] + q2 * nn["fpfs_N00N22s"]) / _w ) / (1 + ratio) # noise bias correction for ellipticity # (the following two expressions are the same to the second order of # noise) # e1 = (e1 + nn["fpfs_N00N22c"] / _w**2.0) / (1 + ratio) # e2 = (e2 + nn["fpfs_N00N22s"] / _w**2.0) / (1 + ratio) e1 = (e1 + nn["fpfs_N00N22c"] / _w**2.0) - ratio * e1 e2 = (e2 + nn["fpfs_N00N22s"] / _w**2.0) - ratio * e2 # noise bias correction for flux, size # (the following two expressions are the same to the second order of # noise) # s0 = (s0 + nn["fpfs_N00N00"] / _w**2.0) / (1 + ratio) # s2 = (s2 + nn["fpfs_N00N20"] / _w**2.0) / (1 + ratio) # s4 = (s4 + nn["fpfs_N00N40"] / _w**2.0) / (1 + ratio) s0 = (s0 + nn["fpfs_N00N00"] / _w**2.0) - ratio * s0 s2 = (s2 + nn["fpfs_N00N20"] / _w**2.0) - ratio * s2 s4 = (s4 + nn["fpfs_N00N40"] / _w**2.0) - ratio * s4 # NOTE: END NOIST BIAS REVISION # spin-2 properties out["fpfs_e1"] = e1 # ellipticity out["fpfs_e2"] = e2 del e1, e2 # spin-0 properties out["fpfs_s0"] = s0 # flux out["fpfs_s2"] = s2 # size2 out["fpfs_s4"] = s4 # size4 out["fpfs_ee"] = e1e1 + e2e2 # shape noise # response for ellipticity out["fpfs_R1E"] = (s0 - s4 + 2.0 * e1e1) / np.sqrt(2.0) out["fpfs_R2E"] = (s0 - s4 + 2.0 * e2e2) / np.sqrt(2.0) del s0, s2, s4, e1e1, e2e2 # response for selection process (not response for selection function) out["fpfs_RS0"] = -1.0 * e_m22 / np.sqrt(2.0) # this has spin-4 leakage out["fpfs_RS2"] = -1.0 * e_m42 * np.sqrt(6.0) / 2.0 # this has spin-4 leakage del e_m22, e_m42 return out
[docs] class summary_stats: def __init__(self, mm, ell, use_sig=False): """A class to get the summary statistics [e.g., mean shear] of from the moments and ellipticity. Args: mm (ndarray): FPFS moments ell (ndarray): FPFS ellipticity use_sig (bool): whether use sigmoid [True] of truncated sine [False] """ self.use_sig = use_sig self.mm = mm self.ell = ell self.clear_outcomes() if "fpfs_HR00" in self.ell.dtype.names: self.noirev = True else: self.noirev = False return
[docs] def clear_outcomes(self): """clears the outcome of the class""" self.nsel = 0 self.ws = np.ones(self.ell.shape) # bias self.ncor = 0 self.corE1 = 0.0 # selection bias in e1 self.corE2 = 0.0 # selection bias in e2 self.corR1 = 0.0 # selection bias in R1E (response) self.corR2 = 0.0 # selection bias in R2E (response) # signal self.sumE1 = 0.0 # sum of e1 self.sumE2 = 0.0 # sum of e2 self.sumR1 = 0.0 # sum of R1E (response) self.sumR2 = 0.0 # sum of R2E (response) return
[docs] def update_selection_weight(self, snms, cuts, cutsigs): """Updates the selection weight term with the current selection weight""" if not isinstance(snms, np.ndarray): if ( isinstance(snms, str) and isinstance(cuts, float) and isinstance(cutsigs, float) ): snms = np.array([snms]) cuts = np.array([cuts]) cutsigs = np.array([cutsigs]) else: raise TypeError("snms, cuts and cutsigs should be str, float, float") for selnm, cut, cutsig in zip(snms, cuts, cutsigs): # print(selnm) if selnm == "detect": for iid in range(8): self._update_selection_weight("det_v%d" % iid, cut, cutsig) elif selnm == "detect2": for iid in range(8): self._update_selection_weight("det2_v%d" % iid, cut, cutsig) else: self._update_selection_weight(selnm, cut, cutsig) return
def _update_selection_weight(self, selnm, cut, cutsig): """Updates the selection weight term with the current selection weight""" if not isinstance(selnm, str): raise TypeError("selnm should be str") if not isinstance(cut, float): raise TypeError("cut should be float") if not isinstance(cutsig, float): raise TypeError("cutsig should be float") cut_final = cut if selnm == "M00": scol = self.mm["fpfs_M00"] elif selnm == "M20": scol = -self.mm["fpfs_M20"] elif selnm == "R2": # M00+M20>cut*M00 (M00>0., we have mag cut to ensure it) scol = self.mm["fpfs_M00"] * (1.0 - cut) + self.mm["fpfs_M20"] cut_final = cutsig elif selnm == "R2_upp": # M00+M20<cut*M00 (M00>0.) scol = self.mm["fpfs_M00"] * (cut - 1.0) - self.mm["fpfs_M20"] cut_final = 0.0 elif "det_" in selnm: vn = selnm.split("_")[-1] scol = self.mm["fpfs_%s" % vn] elif "det2_" in selnm: vn = selnm.split("_")[-1] scol = self.mm["fpfs_%s" % vn] - self.mm["fpfs_M00"] * cut cut_final = cutsig else: raise ValueError("Do not support selection vector name: %s" % selnm) # update weight ws = get_wsel_eff(scol, cut_final, cutsig, self.use_sig) self.ws = self.ws * ws # count the total number of selection cuts self.nsel = self.nsel + 1 return
[docs] def update_selection_bias(self, snms, cuts, cutsigs): """Updates the selection bias correction term with the current selection weight """ if not isinstance(snms, np.ndarray): if ( isinstance(snms, str) and isinstance(cuts, float) and isinstance(cutsigs, float) ): snms = [snms] cuts = [cuts] cutsigs = [cutsigs] else: raise TypeError( "snms, cuts and cutsigs should be (lists of) str, float, float" ) for selnm, cut, cutsig in zip(snms, cuts, cutsigs): # print(selnm) if selnm == "detect": for iid in range(8): self._update_selection_bias("det_v%d" % iid, cut, cutsig) elif selnm == "detect2": for iid in range(8): self._update_selection_bias("det2_v%d" % iid, cut, cutsig) else: self._update_selection_bias(selnm, cut, cutsig) assert self.nsel == self.ncor return
def _update_selection_bias(self, selnm, cut, cutsig): """Updates the selection bias correction term with the current selection weight Args: selnm (str): name of the selection variable ['M00', 'M20', 'R2'] cut (float): selection cut cutsig (float): width of the selection weight function. Note, it is closer to heavy step when cutsig is smaller """ if not isinstance(selnm, str): raise TypeError("selnm should be str") if not isinstance(cut, float): raise TypeError("cut should be float") if not isinstance(cutsig, float): raise TypeError("cutsig should be float") cut_final = cut if selnm == "M00": scol = self.mm["fpfs_M00"] # shear response ccol1 = self.ell["fpfs_RS0"] ccol2 = self.ell["fpfs_RS0"] if self.noirev: dcol = self.ell["fpfs_HR00"] ncol1 = self.ell["fpfs_HE100"] ncol2 = self.ell["fpfs_HE200"] else: dcol = None ncol1 = None ncol2 = None elif selnm == "M20": scol = -self.mm["fpfs_M20"] ccol1 = -self.ell["fpfs_RS2"] ccol2 = -self.ell["fpfs_RS2"] if self.noirev: dcol = -self.ell["fpfs_HR20"] ncol1 = -self.ell["fpfs_HE120"] ncol2 = -self.ell["fpfs_HE220"] else: dcol = None ncol1 = None ncol2 = None elif selnm == "R2" or selnm == "R2_upp": if "_upp" in selnm: fp = -1.0 else: fp = 1.0 # cut_final = 0.0 cut_final = cutsig scol = (self.mm["fpfs_M00"] * (1.0 - cut) + self.mm["fpfs_M20"]) * fp ccol1 = (self.ell["fpfs_RS0"] * (1.0 - cut) + self.ell["fpfs_RS2"]) * fp ccol2 = (self.ell["fpfs_RS0"] * (1.0 - cut) + self.ell["fpfs_RS2"]) * fp if self.noirev: dcol = ( self.ell["fpfs_HR00"] * (1.0 - cut) + self.ell["fpfs_HR20"] ) * fp ncol1 = ( self.ell["fpfs_HE100"] * (1.0 - cut) + self.ell["fpfs_HE120"] ) * fp ncol2 = ( self.ell["fpfs_HE200"] * (1.0 - cut) + self.ell["fpfs_HE220"] ) * fp else: dcol = None ncol1 = None ncol2 = None elif "det_" in selnm: vn = selnm.split("_")[-1] scol = self.mm["fpfs_%s" % vn] ccol1 = self.ell["fpfs_R1S%s" % vn] ccol2 = self.ell["fpfs_R2S%s" % vn] if self.noirev: dcol = self.ell["fpfs_HR%s" % vn] ncol1 = self.ell["fpfs_HE1%s" % vn] ncol2 = self.ell["fpfs_HE2%s" % vn] else: dcol = None ncol1 = None ncol2 = None elif "det2_" in selnm: cut_final = cutsig vn = selnm.split("_")[-1] scol = self.mm["fpfs_%s" % vn] - self.mm["fpfs_M00"] * cut ccol1 = self.ell["fpfs_R1S%s" % vn] - self.ell["fpfs_RS0"] * cut ccol2 = self.ell["fpfs_R2S%s" % vn] - self.ell["fpfs_RS0"] * cut if self.noirev: dcol = self.ell["fpfs_HR%s" % vn] - self.ell["fpfs_HR00"] * cut ncol1 = self.ell["fpfs_HE1%s" % vn] - self.ell["fpfs_HE100"] * cut ncol2 = self.ell["fpfs_HE2%s" % vn] - self.ell["fpfs_HE200"] * cut else: dcol = None ncol1 = None ncol2 = None else: raise ValueError("Do not support selection vector name: %s" % selnm) cor_sel_r1 = get_wbias(scol, cut_final, cutsig, self.use_sig, self.ws, ccol1) cor_sel_r2 = get_wbias(scol, cut_final, cutsig, self.use_sig, self.ws, ccol2) cor_noise_r = get_wbias(scol, cut_final, cutsig, self.use_sig, self.ws, dcol) cor_noise_e1 = get_wbias(scol, cut_final, cutsig, self.use_sig, self.ws, ncol1) cor_noise_e2 = get_wbias(scol, cut_final, cutsig, self.use_sig, self.ws, ncol2) self.corR1 = self.corR1 + cor_sel_r1 + cor_noise_r self.corR2 = self.corR2 + cor_sel_r2 + cor_noise_r self.corE1 = self.corE1 + cor_noise_e1 self.corE2 = self.corE2 + cor_noise_e2 self.ncor = self.ncor + 1 return
[docs] def update_ellsum(self): """Updates the weighted sum of ellipticity and response with the currenct selection weight """ self.sumE1 = np.sum(self.ell["fpfs_e1"] * self.ws) self.sumE2 = np.sum(self.ell["fpfs_e2"] * self.ws) self.sumR1 = np.sum(self.ell["fpfs_R1E"] * self.ws) self.sumR2 = np.sum(self.ell["fpfs_R2E"] * self.ws) return
# This file tells the default structure of the data indexes = { "m00": 0, "m20": 1, "m22c": 2, "m22s": 3, "m40": 4, "m42c": 5, "m42s": 6, "v0": 7, "v1": 8, "v2": 9, "v3": 10, "v4": 11, "v5": 12, "v6": 13, "v7": 14, "v0_g1": 15, "v1_g1": 16, "v2_g1": 17, "v3_g1": 18, "v4_g1": 19, "v5_g1": 20, "v6_g1": 21, "v7_g1": 22, "v0_g2": 23, "v1_g2": 24, "v2_g2": 25, "v3_g2": 26, "v4_g2": 27, "v5_g2": 28, "v6_g2": 29, "v7_g2": 30, } col_names = [ "fpfs_M00", "fpfs_M20", "fpfs_M22c", "fpfs_M22s", "fpfs_M40", "fpfs_M42c", "fpfs_M42s", "fpfs_v0", "fpfs_v1", "fpfs_v2", "fpfs_v3", "fpfs_v4", "fpfs_v5", "fpfs_v6", "fpfs_v7", "fpfs_v0r1", "fpfs_v1r1", "fpfs_v2r1", "fpfs_v3r1", "fpfs_v4r1", "fpfs_v5r1", "fpfs_v6r1", "fpfs_v7r1", "fpfs_v0r2", "fpfs_v1r2", "fpfs_v2r2", "fpfs_v3r2", "fpfs_v4r2", "fpfs_v5r2", "fpfs_v6r2", "fpfs_v7r2", ] cov_names = [ "fpfs_N00N00", "fpfs_N20N20", "fpfs_N22cN22c", "fpfs_N22sN22s", "fpfs_N40N40", "fpfs_N00N20", "fpfs_N00N22c", "fpfs_N00N22s", "fpfs_N00N40", "fpfs_N00N42c", "fpfs_N00N42s", "fpfs_N20N22c", "fpfs_N20N22s", "fpfs_N20N40", "fpfs_N22cN42c", "fpfs_N22sN42s", "fpfs_N00V0", "fpfs_N00V0r1", "fpfs_N00V0r2", "fpfs_N22cV0", "fpfs_N22sV0", "fpfs_N22cV0r1", "fpfs_N22sV0r2", "fpfs_N40V0", "fpfs_N00V1", "fpfs_N00V1r1", "fpfs_N00V1r2", "fpfs_N22cV1", "fpfs_N22sV1", "fpfs_N22cV1r1", "fpfs_N22sV1r2", "fpfs_N40V1", "fpfs_N00V2", "fpfs_N00V2r1", "fpfs_N00V2r2", "fpfs_N22cV2", "fpfs_N22sV2", "fpfs_N22cV2r1", "fpfs_N22sV2r2", "fpfs_N40V2", "fpfs_N00V3", "fpfs_N00V3r1", "fpfs_N00V3r2", "fpfs_N22cV3", "fpfs_N22sV3", "fpfs_N22cV3r1", "fpfs_N22sV3r2", "fpfs_N40V3", "fpfs_N00V4", "fpfs_N00V4r1", "fpfs_N00V4r2", "fpfs_N22cV4", "fpfs_N22sV4", "fpfs_N22cV4r1", "fpfs_N22sV4r2", "fpfs_N40V4", "fpfs_N00V5", "fpfs_N00V5r1", "fpfs_N00V5r2", "fpfs_N22cV5", "fpfs_N22sV5", "fpfs_N22cV5r1", "fpfs_N22sV5r2", "fpfs_N40V5", "fpfs_N00V6", "fpfs_N00V6r1", "fpfs_N00V6r2", "fpfs_N22cV6", "fpfs_N22sV6", "fpfs_N22cV6r1", "fpfs_N22sV6r2", "fpfs_N40V6", "fpfs_N00V7", "fpfs_N00V7r1", "fpfs_N00V7r2", "fpfs_N22cV7", "fpfs_N22sV7", "fpfs_N22cV7r1", "fpfs_N22sV7r2", "fpfs_N40V7", ] ncol = 31
[docs] def fpfscov_to_imptcov(data): """Converts FPFS noise Covariance elements into a covariance matrix of lensPT. Args: data (ndarray): FPFS shapelet mode catalog Returns: out (ndarray): Covariance matrix """ # the colum names # M00 -> N00; v1 -> V1 ll = [cn[5:].replace("M", "N").replace("v", "V") for cn in col_names] out = np.zeros((ncol, ncol)) for i in range(ncol): for j in range(ncol): try: try: cname = "fpfs_%s%s" % (ll[i], ll[j]) out[i, j] = data[cname][0] except (ValueError, KeyError): cname = "fpfs_%s%s" % (ll[j], ll[i]) out[i, j] = data[cname][0] except (ValueError, KeyError): out[i, j] = 0.0 return out
[docs] def imptcov_to_fpfscov(data): """Converts FPFS noise Covariance elements into a covariance matrix of lensPT. Args: data (ndarray): impt covariance matrix Returns: out (ndarray): FPFS covariance elements """ # the colum names # M00 -> N00; v1 -> V1 ll = [cn[5:].replace("M", "N").replace("v", "V") for cn in col_names] types = [(cn, "<f8") for cn in cov_names] out = np.zeros(1, dtype=types) for i in range(ncol): for j in range(i, ncol): cname = "fpfs_%s%s" % (ll[i], ll[j]) if cname in cov_names: out[cname][0] = data[i, j] return out