import os
import sys
import json
import Tigger
import random
import string
import logging
import aimfast
import argparse
import tempfile
import numpy as np
from regions import Regions
from functools import partial
from collections import OrderedDict
import scipy
from scipy import stats
from scipy.stats import linregress
from scipy.interpolate import interp1d
from scipy.ndimage import measurements as measure
from bokeh.io import export_svgs
from bokeh.transform import transform
from bokeh.models.widgets import Div, PreText
from bokeh.models.widgets import DataTable, TableColumn
from bokeh.models import Circle
from bokeh.models import CheckboxGroup, CustomJS
from bokeh.models import HoverTool, LinearAxis, Range1d
from bokeh.models import ColorBar, ColumnDataSource, ColorBar
from bokeh.models import LogColorMapper, LogTicker, LinearColorMapper
from bokeh.layouts import row, column, gridplot, grid
from bokeh.plotting import figure, output_file, show, save, ColumnDataSource
from astropy.wcs import WCS
from astropy import units as u
from astropy.table import Table
from astropy.io import fits as fitsio
from astropy.coordinates import Angle, SkyCoord
from Tigger.Models import SkyModel, ModelClasses
from Tigger.Coordinates import angular_dist_pos_angle
from sklearn.metrics import mean_squared_error, r2_score
from aimfast.auxiliary import deg2arcsec, deg2arcsec, rad2arcsec
from aimfast.auxiliary import dec2deg, ra2deg, rad2deg, deg2rad, unwrap
from aimfast.auxiliary import aegean, bdsf, get_subimage, get_online_catalog
from aimfast.auxiliary import deg2dec, deg2ra
# Get version
from pkg_resources import get_distribution
_version = get_distribution('aimfast').version
# Unit multipleirs for plotting
FLUX_UNIT_SCALER = {
'jansky': [1e0, 'Jy'],
'milli': [1e3, 'mJy'],
'micro': [1e6, u'\u03bcJy'],
'nano': [1e9, 'nJy'],
}
POSITION_UNIT_SCALER = {
'deg': [1e0, 'deg'],
'arcmin': [60.0, u'`'],
'arcsec': [3600.0, u'``'],
}
# Backgound color for plots
BG_COLOR = 'rgb(229,229,229)'
# Highlighters
R = '\033[31m' # red
W = '\033[0m' # white (normal)
HEADER = '\033[95m'
OKBLUE = '\033[94m'
OKGREEN = '\033[92m'
WARNING = '\033[93m'
FAIL = '\033[91m'
ENDC = '\033[0m'
BOLD = '\033[1m'
UNDERLINE = '\033[4m'
# Decimal places
DECIMALS = 2
[docs]def create_logger():
"""Create a console logger"""
log = logging.getLogger(__name__)
cfmt = logging.Formatter(('%(name)s - %(asctime)s %(levelname)s - %(message)s'))
log.setLevel(logging.DEBUG)
console = logging.StreamHandler()
console.setLevel(logging.INFO)
console.setFormatter(cfmt)
log.addHandler(console)
return log
LOGGER = create_logger()
[docs]def generate_default_config(configfile):
"Generate default config file for running source finders"
from shutil import copyfile
LOGGER.info(f"Getting parameter file: {configfile}")
aim_path = os.path.dirname(os.path.dirname(os.path.abspath(aimfast.__file__)))
copyfile(f"{aim_path}/aimfast/source_finder.yml", configfile)
[docs]def get_aimfast_data(filename='fidelity_results.json', dir='.'):
"Extracts data from the json data file"
filepath = f"{dir}/{filename}"
LOGGER.info('Extracting data from the json data file')
with open(filepath) as f:
data = json.load(f)
return data
[docs]def json_dump(data_dict, filename='fidelity_results.json'):
"""Dumps the computed dictionary results into a json file.
Parameters
----------
data_dict : dict
Dictionary with output results to save.
filename : str
Name of file json file where fidelity results will be dumped.
Default is 'fidelity_results.json' in the current directory.
Note1
----
If the fidelity_results.json file exists, it will be append, and only
repeated image assessments will be replaced.
"""
LOGGER.info(f"Dumping results into the '{filename}' file")
try:
# Extract data from the json data file
with open(filename) as data_file:
data_existing = json.load(data_file)
data_existing.update(data_dict)
data = data_existing
except IOError:
data = data_dict
if data:
with open(filename, 'w') as f:
json.dump(data, f)
[docs]def fitsInfo(fitsname=None):
"""Get fits header info.
Parameters
----------
fitsname : fits file
Restored image (cube)
Returns
-------
fitsinfo : dict
Dictionary of fits information
e.g. {'wcs': wcs, 'ra': ra, 'dec': dec,
'dra': dra, 'ddec': ddec, 'raPix': raPix,
'decPix': decPix, 'b_size': beam_size,
'numPix': numPix, 'centre': centre,
'skyArea': skyArea}
"""
hdu = fitsio.open(fitsname)
hdr = hdu[0].header
ra = hdr['CRVAL1']
dra = abs(hdr['CDELT1'])
raPix = hdr['CRPIX1']
dec = hdr['CRVAL2']
ddec = abs(hdr['CDELT2'])
decPix = hdr['CRPIX2']
wcs = WCS(hdr)
numPix = hdr['NAXIS1']
try:
beam_size = (hdr['BMAJ'], hdr['BMIN'], hdr['BPA'])
except:
beam_size = None
try:
centre = (hdr['CRVAL1'], hdr['CRVAL2'])
except:
centre = None
try:
freq0=None
for i in range(1, hdr['NAXIS']+1):
if hdr['CTYPE{0:d}'.format(i)].startswith('FREQ'):
freq0 = hdr['CRVAL{0:d}'.format(i)]
except:
freq0=None
skyArea = (numPix * ddec) ** 2
fitsinfo = {'wcs': wcs, 'ra': ra, 'dec': dec,
'dra': dra, 'ddec': ddec, 'raPix': raPix,
'decPix': decPix, 'b_size': beam_size,
'numPix': numPix, 'centre': centre,
'skyArea': skyArea, 'freq0': freq0}
return fitsinfo
[docs]def measure_psf(psffile, arcsec_size=20):
"""Measure point spread function after deconvolution.
Parameters
----------
psfile : fits file
Point spread function file.
arcsec_size : float
Cross section size
Returns
-------
r0 : float
Average psf size.
"""
with fitsio.open(psffile) as hdu:
pp = hdu[0].data.T[:, :, 0, 0]
secpix = abs(hdu[0].header['CDELT1'] * 3600)
# Get midpoint and size of cross-sections
xmid, ymid = measure.maximum_position(pp)
sz = int(arcsec_size / secpix)
xsec = pp[xmid - sz: xmid + sz, ymid]
ysec = pp[xmid, ymid - sz: ymid + sz]
def fwhm(tsec):
"""Determine the full width half maximum"""
tmid = len(tsec) / 2.0
# First minima off the peak, and flatten cross-section outside them
xmin = measure.minimum_position(tsec[:tmid])[0]
tsec[:xmin] = tsec[xmin]
xmin = measure.minimum_position(tsec[tmid:])[0]
tsec[tmid + xmin:] = tsec[tmid + xmin]
if tsec[0] > 0.5 or tsec[-1] > 0.5:
LOGGER.info(f"PSF FWHM over {arcsec_size * 2:.2f} arcsec")
return arcsec_size, arcsec_size
x1 = interp1d(tsec[:tmid], range(tmid))(0.5)
x2 = interp1d(1 - tsec[tmid:], range(tmid, len(tsec)))(0.5)
return x1, x2
ix0, ix1 = fwhm(xsec)
iy0, iy1 = fwhm(ysec)
rx, ry = (ix1 - ix0) * secpix, (iy1 - iy0) * secpix
r0 = (rx + ry) / 2.0
return r0
[docs]def get_box(wcs, radec, w):
"""Get box of width w around source coordinates radec.
Parameters
----------
radec : tuple
RA and DEC in degrees.
w : int
Width of box.
wcs : atropy.wcs instance
World Coordinate System.
Returns
-------
box : tuple
A box centred at radec.
"""
radec_pix = SkyCoord(*radec,unit='deg').to_pixel(wcs)
raPix, decPix = radec_pix[0] , radec_pix[1]
raPix = int(raPix)
decPix = int(decPix)
box = (slice(decPix - int(w / 2), decPix + int(w / 2)),
slice(raPix - int(w / 2), raPix + int(w / 2)))
return box
[docs]def noise_sigma(noise_image):
"""Determines the noise sigma level in a dirty image with no source
Parameters
----------
noise_image : file
Noise image (cube).
Returns
-------
noise_std : float
Noise image standard deviation
"""
# Read the simulated noise image
dirty_noise_hdu = fitsio.open(noise_image)
# Get the header data unit for the simulated noise
dirty_noise_data = dirty_noise_hdu[0].data
# Get the noise sigma
noise_std = dirty_noise_data.std()
return noise_std
def _get_ra_dec_range(area, phase_centre):
"""Get RA and DEC range from area of observations and phase centre"""
ra = phase_centre[0]
dec = phase_centre[1]
d_ra = np.sqrt(area) / 2.0
d_dec = np.sqrt(area) / 2.0
ra_range = [ra - d_ra, ra + d_ra]
dec_range = [dec - d_dec, dec + d_dec]
return ra_range, dec_range
def _source_angular_dist_pos_angle(src1, src2):
"""Computes the angular distance between the two points on a sphere, and
the position angle (North through East) of the direction from 1 to 2.""";
ra1, dec1 = src1.pos.ra, src1.pos.dec
ra2, dec2 = src2.pos.ra, src2.pos.dec
return angular_dist_pos_angle(ra1, dec1, ra2, dec2)
def _get_phase_centre(model):
"""Compute the phase centre of observation"""
# Get all sources in the model
model_sources = model.sources
# Get source Ra and Dec coordinates
RA = [rad2deg(src.pos.ra) for src in model_sources]
DEC = [rad2deg(src.pos.dec) for src in model_sources]
xc = np.sum(RA)/len(RA)
yc = np.sum(DEC)/len(DEC)
return (xc ,yc)
def _get_random_pixel_coord(num, sky_area, phase_centre=[0.0, -30.0]):
"""Provides random pixel coordinates
Parameters
----------
num: int
Number of data points
sky: float
Sky area to extract random points
Phase tracking centre of the telescope during observation [ra0,dec0]
phase_centre: list
Phase centre in degrees
Returns
-------
COORDs: list
List of coordinates
"""
ra_range, dec_range = _get_ra_dec_range(sky_area, phase_centre)
COORDs = []
for i in range(num):
current = []
# add another number to the current list
current.append(random.uniform(ra_range[0], ra_range[1]))
current.append(random.uniform(dec_range[0], dec_range[1]))
# convert current list into a tuple and add to resulting list
COORDs.append(tuple(current))
random.shuffle(COORDs)
return COORDs
[docs]def get_image_products(images, mask):
"""Get a product of images with a mask
Parameters
----------
images: list
List of fits images to get product
mask: str
Mask to multiply the images
Returns
-------
prod_images: list
List of resulting fits images
"""
LOGGER.info("Computing product...")
prod_images = []
msk = fitsio.open(mask)
msk_data = msk[0].data
for img in images:
outname = img.replace('.fits', '.prod.fits')
im = fitsio.open(img)
img_data = im[0].data
p_img_data = img_data * msk_data
LOGGER.info("Writing output images")
if os.path.exists(outname):
LOGGER.warning("Output image exists")
sys.exit(1)
else:
im[0].data = p_img_data
im.writeto(outname)
LOGGER.info(f"New image: {outname}")
prod_images.append(outname)
return prod_images
[docs]def residual_image_stats(fitsname, test_normality=None, data_range=None,
threshold=None, chans=None, mask=None):
"""Gets statistcal properties of a residual image.
Parameters
----------
fitsname : file
Residual image (cube).
test_normality : str
Perform normality testing using either `shapiro` or `normaltest`.
data_range : int, optional
Range of data to perform normality testing.
threshold : float, optional
Cut-off threshold to select channels in a cube
chans : str, optional
Channels to compute stats (e.g. 1;0~50;100~200)
mask : file
Fits mask to get stats in image
Returns
-------
props : dict
Dictionary of stats properties.
e.g. {'MEAN': 0.0, 'STDDev': 0.1, 'RMS': 0.1,
'SKEW': 0.2, 'KURT': 0.3, 'MAD': 0.4,
'MAX': 0.7, 'SUM_NEG': -0.1}
Notes
-----
If normality_test=True, dictionary of stats props becomes \
e.g. {'MEAN': 0.0, 'STDDev': 0.1, 'SKEW': 0.2, 'KURT': 0.3, \
'MAD': 0.4, 'RMS': 0.5, 'SUM_NEG': -0.1, 'MAX': 0.7, \
'NORM': (123.3,0.012)} \
whereby the first element is the statistics (or average if data_range specified) \
of the datasets and second element is the p-value.
"""
# Open the residual image
residual_hdu = fitsio.open(fitsname)
# Get the header data unit for the residual rms
residual_data = residual_hdu[0].data
# Get residual data
# In case the first two axes are swapped
data = (residual_data[0]
if residual_data.shape[0] == 1
else residual_data[1])
if chans:
nchans = []
chan_ranges = chans.split(';')
for cr in chan_ranges:
if '~' in cr:
c = cr.split('~')
nchans.extend(range(int(c[0]), int(c[1]) + 1))
else:
nchans.append(int(cr))
residual_data = data[nchans]
data = residual_data
if threshold:
nchans = []
for i in range(data.shape[0]):
d = data[i][data[i] > float(threshold)]
if d.shape[0] > 0:
nchans.append(i)
residual_data = data[nchans]
data = residual_data
if mask:
import numpy.ma as ma
mask_hdu = fitsio.open(mask)
mask_data = mask_hdu[0].data
residual_data = ma.masked_array(data, mask=mask_data)
data = residual_data
residual_data = data
props = image_stats(residual_data, test_normality=test_normality)
return props
[docs]def image_stats(image_data, test_normality=None, data_range=None):
img_stats = dict()
# Get the min value
LOGGER.info("Computing min ...")
img_stats['MIN'] = float("{0:.6}".format(image_data.min()))
LOGGER.info("MIN = {}".format(img_stats['MIN']))
# Get the max value
LOGGER.info("Computing max ...")
img_stats['MAX'] = float("{0:.6}".format(image_data.max()))
LOGGER.info("MAX = {}".format(img_stats['MAX']))
# Get the mean value
LOGGER.info("Computing mean ...")
img_stats['MEAN'] = float("{0:.6}".format(image_data.mean()))
LOGGER.info("MEAN = {}".format(img_stats['MEAN']))
# Get the rms value
LOGGER.info("Computing root mean square ...")
img_stats['RMS'] = float("{0:.6f}".format(np.sqrt(np.mean(np.square(image_data)))))
LOGGER.info("RMS = {}".format(img_stats['RMS']))
# Get the sigma value
LOGGER.info("Computing standard deviation ...")
img_stats['STDDev'] = float("{0:.6f}".format(image_data.std()))
LOGGER.info("STDDev = {}".format(img_stats['STDDev']))
# Flatten image
img_data = image_data.flatten()
# Get the maximum absolute deviation
LOGGER.info("Computing median absolute deviation ...")
img_stats['MAD'] = float("{0:.6f}".format(stats.median_abs_deviation(img_data)))
LOGGER.info("MAD = {}".format(img_stats['MAD']))
# Compute the skewness of the residual
LOGGER.info("Computing skewness ...")
img_stats['SKEW'] = float("{0:.6f}".format(stats.skew(img_data)))
LOGGER.info("SKEW = {}".format(img_stats['SKEW']))
# Compute the kurtosis of the residual
LOGGER.info("Computing kurtosis ...")
img_stats['KURT'] = float("{0:.6f}".format(stats.kurtosis(img_data, fisher=False)))
LOGGER.info("KURT = {}".format(img_stats['KURT']))
# Compute the sum of Negative pixels
LOGGER.info("Computing sum of negative pixels ...")
img_stats['SUM_NEG'] = float("{0:.6f}".format(np.sum(img_data[np.where(img_data<0.0)])))
LOGGER.info("SUM_NEG = {}".format(img_stats['SUM_NEG']))
# Perform normality testing
if test_normality:
LOGGER.info("Performing normality test ...")
norm_props = normality_testing(img_data, test_normality, data_range)
img_stats.update(norm_props)
LOGGER.info("NORM = {}".format(img_stats['NORM']))
# Return dictionary of results
return img_stats
[docs]def fix_wcs_fits(wcs, dropaxis=2):
"""This removes the degenerated dimensions in APLpy 2.X...
The input must be the object returned by aplpy.FITSFigure().
`dropaxis` is the index where to start dropping the axis (by default it assumes the 3rd,4th place).
"""
temp_wcs = wcs.dropaxis(dropaxis)
temp_wcs = temp_wcs.dropaxis(dropaxis)
return temp_wcs
[docs]def get_region_stats(fitsname, regions_file):
"""Extract flux densities measurements within the provided region"""
regions_stats = dict()
LOGGER.info(f'Reading region file: {regions_file}')
regions_list = Regions.read(regions_file, format='ds9')
LOGGER.info(f'Number of regions: {len(regions_list)}')
image = fitsio.open(fitsname)
image_data = image[0].data
fitsinfo = fitsInfo(fitsname)
wcs = fitsinfo['wcs']
beam = fitsinfo['b_size']
dra = fitsinfo['dra']
beam_area = (beam[0]*beam[1])/(dra*dra)
for i, input_region in enumerate(regions_list):
if hasattr(input_region, 'to_pixel'):
input_region = input_region.to_pixel(fix_wcs_fits(wcs))
mask = input_region.to_mask().to_image(image_data.shape[-2:])
data = mask * image_data[0][0]
#nndata=nndata[~np.isnan(data)]
nndata = np.flip(data, axis=0)
nndata = nndata[~np.isnan(nndata)]
nndata = nndata[nndata != -0.0]
stats = image_stats(nndata)
regions_stats[f'region-{i}'] = stats
return regions_stats
[docs]def normality_testing(data, test_normality='normaltest', data_range=None):
"""Performs a normality test on the image data.
Parameters
----------
data : numpy.array
Residual residual array. i.e. fitsio.open(fitsname)[0].data
test_normality : str
Perform normality testing using either `shapiro` or `normaltest`.
data_range : int
Range of data to perform normality testing.
Returns
-------
normality : dict
dictionary of stats props.
e.g. {'NORM': (123.3, 0.012)}
whereby the first element is the statistics
(or average if data_range specified) of the
datasets and second element is the p-value.
"""
norm_res = []
normality = dict()
# Get residual image data
res_data = data
# Shuffle the data
random.shuffle(res_data)
# Normality test
counter = 0
# Check size of image data
if len(res_data) == 0:
raise ValueError(f"{R}No data to compute stats."
"\nEither threshold too high "
"or all data is masked.{W}")
if data_range:
for dataset in range(len(res_data) / int(data_range)):
i = counter
counter += data_range
norm_res.append(getattr(stats, test_normality)(res_data[i: counter]))
# Compute sum of pvalue
if test_normality == 'normaltest':
sum_statistics = sum([norm.statistic for norm in norm_res])
sum_pvalues = sum([norm.pvalue for norm in norm_res])
elif test_normality == 'shapiro':
sum_statistics = sum([norm[0] for norm in norm_res])
sum_pvalues = sum([norm[1] for norm in norm_res])
normality['NORM'] = (sum_statistics / dataset, sum_pvalues / dataset)
else:
norm_res = getattr(stats, test_normality)(res_data)
if test_normality == 'normaltest':
statistic = float(norm_res.statistic)
pvalue = float(norm_res.pvalue)
normality['NORM'] = (statistic, pvalue)
elif test_normality == 'shapiro':
normality['NORM'] = norm_res
return normality
[docs]def model_dynamic_range(lsmname, fitsname, beam_size=5, area_factor=2):
"""Gets the dynamic range using model lsm and residual fits.
Parameters
----------
fitsname : fits file
Residual image (cube).
lsmname : lsm.html or .txt file
Model .lsm.html from pybdsm (or .txt converted tigger file).
beam_size : float
Average beam size in arcsec.
area_factor : float
Factor to multiply the beam area.
Returns
-------
DR : dict
DRs - dynamic range values.
"""
# Open the residual image
residual_hdu = fitsio.open(fitsname)
residual_data = residual_hdu[0].data
# Load model file
model_lsm = Tigger.load(lsmname)
# Get detected sources
model_sources = model_lsm.sources
# Obtain peak flux source
peak_flux = None
try:
sources_flux = dict([(model_source, model_source.getTag('I_peak'))
for model_source in model_sources])
peak_source_flux = [(_model_source, flux)
for _model_source, flux in sources_flux.items()
if flux == max(list(sources_flux.values()))][0][0]
peak_flux = peak_source_flux.getTag('I_peak')
except TypeError:
pass
if not peak_flux:
# In case no I_peak is not found use the integrated flux
sources_flux = dict([(model_source, model_source.flux.I)
for model_source in model_sources])
peak_source_flux = [(_model_source, flux)
for _model_source, flux in sources_flux.items()
if flux == max(list(sources_flux.values()))][0][0]
peak_flux = peak_source_flux.flux.I
# Get astrometry of the source in degrees
RA = rad2deg(peak_source_flux.pos.ra)
DEC = rad2deg(peak_source_flux.pos.dec)
# Get source region and slice
width = int(beam_size * area_factor)
imslice = get_box(fitsInfo(fitsname)['wcs'], (RA, DEC), width)
source_res_area = np.array(residual_data[0, 0, :, :][imslice])
min_flux = source_res_area.min()
local_std = source_res_area.std()
global_std = residual_data[0, 0, ...].std()
# Compute dynamic range
DR = {
"deepest_negative" : peak_flux/abs(min_flux)*1e0,
"local_rms" : peak_flux/local_std*1e0,
"global_rms" : peak_flux/global_std*1e0,
}
return DR
[docs]def image_dynamic_range(fitsname, residual, area_factor=6):
"""Gets the dynamic range in a restored image.
Parameters
----------
fitsname : fits file
Restored image (cube).
residual : fits file
Residual image (cube).
area_factor: int
Factor to multiply the beam area.
Returns
-------
DR : dict
DRs - dynamic range values.
"""
fits_info = fitsInfo(fitsname)
# Get beam size otherwise use default (~6``).
beam_default = (0.00151582804885738, 0.00128031965017612, 20.0197348935424)
beam_deg = fits_info['b_size'] if fits_info['b_size'] else beam_default
# Open the restored and residual images
restored_hdu = fitsio.open(fitsname)
residual_hdu = fitsio.open(residual)
# Get the header data unit for the peak and residual rms
restored_data = restored_hdu[0].data
residual_data = residual_hdu[0].data
# Get the max value
peak_flux = abs(restored_data.max())
# Get pixel coordinates of the peak flux
pix_coord = np.argwhere(restored_data == peak_flux)[0]
nchan = (restored_data.shape[1] if restored_data.shape[0] == 1
else restored_data.shape[0])
# Compute number of pixel in beam and extend by factor area_factor
ra_num_pix = round((beam_deg[0] * area_factor) / fits_info['dra'])
dec_num_pix = round((beam_deg[1] * area_factor) / fits_info['ddec'])
# Create target image slice
imslice = np.array([pix_coord[2]-ra_num_pix/2, pix_coord[2]+ra_num_pix/2,
pix_coord[3]-dec_num_pix/2, pix_coord[3]+dec_num_pix/2])
imslice = np.array(list(map(int, imslice)))
# If image is cube then average along freq axis
min_flux = 0.0
for frq_ax in range(nchan):
# In the case where the 0th and 1st axis of the image are not in order
# i.e. (0, nchan, x_pix, y_pix)
if residual_data.shape[0] == 1:
target_area = residual_data[0, frq_ax, :, :][imslice]
else:
target_area = residual_data[frq_ax, 0, :, :][imslice]
min_flux += target_area.min()
if frq_ax == nchan - 1:
min_flux = min_flux/float(nchan)
# Compute dynamic range
local_std = target_area.std()
global_std = residual_data[0, 0, ...].std()
# Compute dynamic range
DR = {
"deepest_negative" : peak_flux / abs(min_flux) * 1e0,
"local_rms" : peak_flux / local_std * 1e0,
"global_rms" : peak_flux / global_std * 1e0,
}
return DR
[docs]def get_src_scale(source_shape):
"""Get scale measure of the source in arcsec.
Parameters
----------
source_shape : lsm object
Source shape object from model
Returns
-------
(scale_out_arc_sec, scale_out_err_arc_sec) : tuple
Output source scale with error value
"""
if source_shape:
shape_out = source_shape.getShape()
shape_out_err = source_shape.getShapeErr()
minx = shape_out[0]
majx = shape_out[1]
minx_err = shape_out_err[0]
majx_err = shape_out_err[1]
if minx > 0 and majx > 0:
scale_out = np.sqrt(minx*majx)
scale_out_err = np.sqrt(minx_err*minx_err + majx_err*majx_err)
elif minx > 0:
scale_out = minx
scale_out_err = minx_err
elif majx > 0:
scale_out = majx
scale_out_err = majx_err
else:
scale_out = 0
scale_out_err = 0
else:
scale_out = 0
scale_out_err = 0
scale_out_arc_sec = rad2arcsec(scale_out)
scale_out_err_arc_sec = rad2arcsec(scale_out_err)
return scale_out_arc_sec, scale_out_err_arc_sec
[docs]def get_model(catalog):
"""Get model model object from file catalog"""
def tigger_src_ascii(src, idx):
"""Get ascii catalog source as a tigger source """
name = "SRC%d" % idx
flux = ModelClasses.Polarization(float(src["int_flux"]), 0, 0, 0,
I_err=float(src["err_int_flux"]))
ra, ra_err = map(np.deg2rad, (float(src["ra"]), float(src["err_ra"])))
dec, dec_err = map(np.deg2rad, (float(src["dec"]),
float(src["err_dec"])))
pos = ModelClasses.Position(ra, dec, ra_err=ra_err, dec_err=dec_err)
ex, ex_err = map(np.deg2rad, (float(src["a"]), float(src["err_a"])))
ey, ey_err = map(np.deg2rad, (float(src["b"]), float(src["err_b"])))
pa, pa_err = map(np.deg2rad, (float(src["pa"]), float(src["err_pa"])))
if ex and ey:
shape = ModelClasses.Gaussian(ex, ey, pa, ex_err=ex_err,
ey_err=ey_err, pa_err=pa_err)
else:
shape = None
source = SkyModel.Source(name, pos, flux, shape=shape)
# Adding source peak flux (error) as extra flux attributes for sources,
# and to avoid null values for point sources I_peak = src["Total_flux"]
if shape:
source.setAttribute("I_peak", float(src["peak_flux"]))
source.setAttribute("I_peak_err", float(src["err_peak_flux"]))
else:
source.setAttribute("I_peak", float(src["int_flux"]))
source.setAttribute("I_peak_err", float(src["err_int_flux"]))
return source
def tigger_src_nvss(src, idx):
"""Get ascii catalog source as a tigger source """
name = "SRC%d" % idx
flux = ModelClasses.Polarization(float(src["S1.4"]/1000.), 0, 0, 0,
I_err=float(src["e_S1.4"]/1000.))
ra, ra_err = map(np.deg2rad, (float(ra2deg(src["RAJ2000"])),
float(src["e_RAJ2000"]/3600.)))
dec, dec_err = map(np.deg2rad, (float(dec2deg(src["DEJ2000"])),
float(src["e_DEJ2000"]/3600.)))
pos = ModelClasses.Position(ra, dec, ra_err=ra_err, dec_err=dec_err)
ex, ex_err = map(np.deg2rad, (float(src['MajAxis']), float(0.00)))
ey, ey_err = map(np.deg2rad, (float(src['MinAxis']), float(0.00)))
pa, pa_err = map(np.deg2rad, (float(0.00), float(0.00)))
if ex and ey:
shape = ModelClasses.Gaussian(ex, ey, pa, ex_err=ex_err,
ey_err=ey_err, pa_err=pa_err)
else:
shape = None
source = SkyModel.Source(name, pos, flux, shape=shape)
# Adding source peak flux (error) as extra flux attributes for sources,
# and to avoid null values for point sources I_peak = src["Total_flux"]
source.setAttribute("I_peak", float(src["S1.4"]/1000.))
source.setAttribute("I_peak_err", float(src["e_S1.4"]/1000.))
return source
def tigger_src_sumss(src, idx):
"""Get ascii catalog source as a tigger source """
name = "SRC%d" % idx
flux = ModelClasses.Polarization(float(src["St"]/1000.), 0, 0, 0,
I_err=float(src["e_St1.4"]/1000.))
ra, ra_err = map(np.deg2rad, (float(ra2deg(src["RAJ2000"])),
float(src["e_RAJ2000"]/3600.)))
dec, dec_err = map(np.deg2rad, (float(dec2deg(src["DEJ2000"])),
float(src["e_DEJ2000"]/3600.)))
pos = ModelClasses.Position(ra, dec, ra_err=ra_err, dec_err=dec_err)
ex, ex_err = map(np.deg2rad, (float(src['MajAxis']), float(0.00)))
ey, ey_err = map(np.deg2rad, (float(src['MinAxis']), float(0.00)))
pa, pa_err = map(np.deg2rad, (float(src['PA']), float(0.00)))
if ex and ey:
shape = ModelClasses.Gaussian(ex, ey, pa, ex_err=ex_err,
ey_err=ey_err, pa_err=pa_err)
else:
shape = None
source = SkyModel.Source(name, pos, flux, shape=shape)
# Adding source peak flux (error) as extra flux attributes for sources,
# and to avoid null values for point sources I_peak = src["Total_flux"]
source.setAttribute("I_peak", float(src["Sp"]/1000.))
source.setAttribute("I_peak_err", float(src["e_Sp"]/1000.))
return source
def tigger_src_fits(src, idx, freq0=None):
"""Get fits catalog source as a tigger source """
name = "SRC%d" % idx
flux = ModelClasses.Polarization(float(src["Total_flux"]), 0, 0, 0,
I_err=float(src["E_Total_flux"]))
ra, ra_err = map(np.deg2rad, (float(src["RA"]), float(src["E_RA"])))
dec, dec_err = map(np.deg2rad, (float(src["DEC"]), float(src["E_DEC"])))
pos = ModelClasses.Position(ra, dec, ra_err=ra_err, dec_err=dec_err)
ex, ex_err = map(np.deg2rad, (float(src["DC_Maj"]), float(src["E_DC_Maj"])))
ey, ey_err = map(np.deg2rad, (float(src["DC_Min"]), float(src["E_DC_Min"])))
pa, pa_err = map(np.deg2rad, (float(src["PA"]), float(src["E_PA"])))
# Try to get spectral index
if ex and ey:
shape = ModelClasses.Gaussian(ex, ey, pa, ex_err=ex_err,
ey_err=ey_err, pa_err=pa_err)
else:
shape = None
source = SkyModel.Source(name, pos, flux, shape=shape)
# Adding source peak flux (error) as extra flux attributes for sources,
# and to avoid null values for point sources I_peak = src["Total_flux"]
if shape:
pass # TODO: Check for other models what peak is
#source.setAttribute("I_peak", src["Peak_flux"])
#source.setAttribute("I_peak_err", src["E_peak_flux"])
else:
source.setAttribute("I_peak", src["Total_flux"])
source.setAttribute("I_peak_err", src["E_Total_flux"])
if freq0:
try:
spi, spi_err = (src['Spec_Indx'], src['E_Spec_Indx'])
source.spectrum = ModelClasses.SpectralIndex(spi, freq0)
source.setAttribute('spi_error', spi_err)
except:
pass
return source
def tigger_src_wsclean(src, idx):
"""Get ascii catalog source as a tigger source """
name = src['col1']
flux = ModelClasses.Polarization(float(src["col5"]), 0, 0, 0,
I_err=float(0.00))
ra, ra_err = map(np.deg2rad, (float(ra2deg(src["col3"])),
float(0.00)))
dec, dec_err = map(np.deg2rad, (float(dec2deg(src["col4"])),
float(0.00)))
pos = ModelClasses.Position(ra, dec, ra_err=ra_err, dec_err=dec_err)
ex, ex_err = map(np.deg2rad, (float(src["col9"])
if type(src["col9"]) is not
np.ma.core.MaskedConstant else
0.00, float(0.00)))
ey, ey_err = map(np.deg2rad, (float(src["col10"])
if type(src["col10"]) is not
np.ma.core.MaskedConstant else
0.00, float(0.00)))
pa, pa_err = map(np.deg2rad, (float(0.00), float(0.00)))
if ex and ey:
shape = ModelClasses.Gaussian(ex, ey, pa, ex_err=ex_err,
ey_err=ey_err, pa_err=pa_err)
else:
shape = None
source = SkyModel.Source(name, pos, flux, shape=shape)
# Adding source peak flux (error) as extra flux attributes for sources,
# and to avoid null values for point sources I_peak = src["Total_flux"]
source.setAttribute("I_peak", float(src['col5']))
source.setAttribute("I_peak_err", float(0.00))
return source
tfile = tempfile.NamedTemporaryFile(suffix='.txt')
tfile.flush()
with open(tfile.name, "w") as stdw:
stdw.write("#format:name ra_d dec_d i emaj_s emin_s pa_d\n")
model = Tigger.load(tfile.name)
tfile.close()
ext = os.path.splitext(catalog)[-1]
if ext in ['.html', '.txt']:
if 'catalog_table' in catalog and not catalog.endswith('.html'):
data = Table.read(catalog, format='ascii')
for i, src in enumerate(data):
# Check which online catalog the source belongs to
# Prefix is in the name by default when created
if 'nvss' in catalog and not catalog.endswith('.html'):
model.sources.append(tigger_src_nvss(src, i))
if 'sumss' in catalog and not catalog.endswith('.html'):
model.sources.append(tigger_src_sumss(src, i))
centre = _get_phase_centre(model)
model.ra0, model.dec0 = map(np.deg2rad, centre)
model.save(catalog[:-4]+".lsm.html")
elif 'sources.txt' in catalog:
data = Table.read(catalog, format='ascii')
for i, src in enumerate(data):
if i:
model.sources.append(tigger_src_wsclean(src, i))
centre = _get_phase_centre(model)
model.ra0, model.dec0 = map(np.deg2rad, centre)
model.save(catalog[:-4]+".lsm.html")
else:
model = Tigger.load(catalog)
if ext in ['.tab', '.csv']:
data = Table.read(catalog, format='ascii')
if ext == '.tab':
fits_file = catalog.replace('_comp.tab', '.fits')
else:
fits_file = catalog.replace('_comp.csv', '.fits')
fitsinfo = fitsInfo(fits_file)
for i, src in enumerate(data):
model.sources.append(tigger_src_ascii(src, i))
centre = fitsinfo['centre'] or _get_phase_centre(model)
model.ra0, model.dec0 = map(np.deg2rad, centre)
model.save(catalog[:-4]+".lsm.html")
if ext in ['.fits']:
data = Table.read(catalog, format='fits')
fits_file = catalog.replace('-pybdsf', '')
fitsinfo = fitsInfo(fits_file)
freq0 = fitsinfo['freq0']
for i, src in enumerate(data):
model.sources.append(tigger_src_fits(src, i, freq0))
centre = fitsinfo['centre'] or _get_phase_centre(model)
model.ra0, model.dec0 = map(np.deg2rad, centre)
model.save(catalog[:-5]+".lsm.html")
return model
[docs]def get_detected_sources_properties(model_1, model_2, tolerance, shape_limit=6.0,
all_sources=False, closest_only=False, off_axis=None):
"""Extracts the output simulation sources properties.
Parameters
----------
models_1 : file
Tigger formatted or txt model 1 file
models_2 : file
Compare all sources in the catalog (else only sources with maj<shape_limit)
tolerance : float
Tolerace to cross-match sources
shape_limit: float
Cross match only sources with maj-axis less than this value
closest_only: bool
Returns the closest source only as the matching source
off_axis: float
Cross-match only sources within this distance from the centre
Returns
-------
(targets_flux, targets_scale, targets_position) : tuple
Tuple of target flux, morphology and astrometry information
"""
model_lsm1 = get_model(model_1)
model_lsm2 = get_model(model_2)
# Sources from the input model
model1_sources = model_lsm1.sources
# {"source_name": [I_out, I_out_err, I_in, source_name]}
targets_flux = dict() # recovered sources flux
# {"source_name": [delta_pos_angle_arc_sec, ra_offset, dec_offset,
# delta_phase_centre_arc_sec, I_in, source_name]
targets_position = dict() # recovered sources position
# {"source_name: [shape_out=(maj, min, angle), shape_out_err=, shape_in=,
# scale_out, scale_out_err, I_in, source_name]
targets_scale = dict() # recovered sources scale
deci = DECIMALS # round off to this decimal places
tolerance *= (np.pi / (3600.0 * 180)) # Convert to radians
names = dict()
closest_only = True
for model1_source in model1_sources:
I_out = 0.0
I_out_err = 0.0
source1_name = model1_source.name
ra1 = model1_source.pos.ra
dec1 = model1_source.pos.dec
ra_err1 = model1_source.pos.ra_err
dec_err1 = model1_source.pos.dec_err
I_in = model1_source.flux.I
I_in_err = model1_source.flux.I_err if model1_source.flux.I_err else 0.0
model2_sources = model_lsm2.getSourcesNear(ra1, dec1, tolerance)
if not model2_sources:
continue
# More than one source detected, thus we sum up all the detected sources with
# a radius equal to the beam size in radians around the true target coordinate
# Or use the closest source only
if closest_only:
if len(model2_sources) > 1:
rdist = np.array([_source_angular_dist_pos_angle(model1_source,
model2_source)[0]
for model2_source in model2_sources])
model2_sources = [model2_sources[np.argmin(rdist)]]
I_out_err_list = []
I_out_list = []
for target in model2_sources:
I_out_list.append(target.flux.I)
I_out_err_list.append(target.flux.I_err * target.flux.I_err)
if I_out_list[0] > 0.0:
model2_source = model2_sources[0]
try:
shape_in = tuple(map(rad2arcsec, model1_source.shape.getShape()))
shape_in_err = tuple(map(rad2arcsec, model1_source.shape.getShapeErr()))
except AttributeError:
shape_in = (0, 0, 0)
shape_in_err = (0, 0, 0)
if model2_source.shape:
shape_out = tuple(map(rad2arcsec, model2_source.shape.getShape()))
shape_out_err = tuple(map(rad2arcsec, model2_source.shape.getShapeErr()))
else:
shape_out = (0, 0, 0)
shape_out_err = (0, 0, 0)
if not all_sources:
if shape_out[0] > shape_limit:
continue
if closest_only:
I_out = model2_source.flux.I
I_out_err = model2_source.flux.I_err
ra2 = model2_source.pos.ra
dec2 = model2_source.pos.dec
ra_err2 = model2_source.pos.ra_err
dec_err2 = model2_source.pos.dec_err
else:
# weighting with the flux error appears to be dangerous thing as
# these values are very small taking their reciprocal
# leads to very high weights
# Also if the model has no errors this will raise
# a div by zero exception (ZeroDivisionError)
try:
I_out = sum([val / err for val, err in zip(I_out_list, I_out_err_list)])
I_out_err = sum([1.0 / I_out_error for I_out_error
in I_out_err_list])
I_out_var_err = np.sqrt(1.0 / I_out_err)
I_out /= I_out_err
I_out_err = I_out_var_err
ra2 = (np.sum([src.pos.ra * src.flux.I for src in model2_sources]) /
np.sum([src.flux.I for src in model2_sources]))
dec2 = (np.sum([src.pos.dec * src.flux.I for src in model2_sources]) /
np.sum([src.flux.I for src in model2_sources]))
# Get position weighted error
# _err_a = np.sqrt(np.sum([np.sqrt((src.flux.I_err/src.flux.I)**2 +
# (src.pos.ra_err/src.pos.ra)**2)*np.abs(src.flux.I*src.pos.ra)
# for src in model2_sources]))
# _a = np.sum([src.flux.I*src.pos.ra for src in model2_sources])
# _err_b = np.sqrt(np.sum([src.flux.I_err**2 for src in model2_sources]))
# _b = np.sum([src.flux.I for src in model2_sources])
# ra_err = np.abs(ra) * (np.sqrt((_err_a / _a)**2 + (_err_b / _b)**2))
# _err_a = np.sqrt(np.sum([np.sqrt((src.flux.I_err/src.flux.I)**2 +
# (src.pos.ra_err/src.pos.dec)**2)*abs(src.flux.I*src.pos.dec)
# for src in model2_sources]))
# _a = np.sum([src.flux.I*src.pos.dec for src in model2_sources])
# _err_b = np.sqrt(np.sum([src.flux.I_err**2 for src in model2_sources]))
# _b = np.sum([src.flux.I for src in model2_sources])
# dec_err = np.abs(dec) * (np.sqrt((_err_a / _a)**2 + (_err_b / _b)**2))
ra_err2 = sorted(model2_sources, key=lambda x: x.flux.I, reverse=True)[0].pos.ra_err
dec_err2 = sorted(model2_sources, key=lambda x: x.flux.I, reverse=True)[0].pos.dec_err
except ZeroDivisionError:
if len(model2_sources) > 1:
LOGGER.warn('Position ({}, {}): Since more than one source is detected'
' at the matched position,'
'only the closest to the matched position will be considered.'
'NB: This is because model2 does not have photometric errors.'
'otherwise a weighted average source would be returned'.format(
rad2deg(ra1), rad2deg(dec1)))
rdist = np.array([_source_angular_dist_pos_angle(model2_source, model1_source)[0]
for model2_source in model2_sources])
model2_sources = [model2_sources[np.argmin(rdist)]]
model2_source = model2_sources[0]
I_out = model2_source.flux.I
I_out_err = model2_source.flux.I_err
ra2 = model2_source.pos.ra
dec2 = model2_source.pos.dec
ra_err2 = model2_source.pos.ra_err
dec_err2 = model2_source.pos.dec_err
RA0, DEC0 = model_lsm1.ra0, model_lsm1.dec0
source2_name = model2_source.name
if ra2 > np.pi:
ra2 -= 2.0*np.pi
if ra1 > np.pi:
ra1 -= 2.0*np.pi
delta_pos_angle_arc_sec = angular_dist_pos_angle(
rad2arcsec(ra1), rad2arcsec(dec1),
rad2arcsec(ra2), rad2arcsec(dec2))[0]
delta_pos_angle_arc_sec = float('{0:.7f}'.format(delta_pos_angle_arc_sec))
if RA0 or DEC0:
delta_phase_centre = angular_dist_pos_angle(RA0, DEC0, ra2, dec2)
delta_phase_centre_arc_sec = rad2arcsec(delta_phase_centre[0])
else:
delta_phase_centre_arc_sec = None
src_scale = get_src_scale(model2_source.shape)
if not off_axis:
off_axis = 360.0
if delta_phase_centre_arc_sec <= deg2arcsec(off_axis):
targets_flux[source2_name] = [I_out, I_out_err,
I_in, I_in_err,
(source1_name, source2_name)]
targets_position[source1_name] = [delta_pos_angle_arc_sec,
rad2arcsec(ra2 - ra1),
rad2arcsec(dec2 - dec1),
delta_phase_centre_arc_sec, I_in,
rad2arcsec(ra_err2),
rad2arcsec(dec_err2),
(round(rad2deg(ra1), deci),
round(rad2deg(dec1), deci)),
(source1_name, source2_name)]
targets_scale[source1_name] = [shape_out, shape_out_err, shape_in,
src_scale[0], src_scale[1], I_in,
source2_name]
names[source1_name] = source2_name
else:
LOGGER.info(f"Source {source1_name} will be unmatched "
"due to greater distance from phase centre")
sources1 = model_lsm1.sources
sources2 = model_lsm2.sources
targets_not_matching_a, targets_not_matching_b = targets_not_matching(sources1,
sources2,
names)
sources_overlay = get_source_overlay(sources1, sources2)
num_of_sources = len(targets_flux)
LOGGER.info(f"Number of sources matched: {num_of_sources}")
return (targets_flux, targets_scale, targets_position,
targets_not_matching_a, targets_not_matching_b,
sources_overlay)
[docs]def compare_models(models, tolerance=0.2, plot=True, all_sources=False, shape_limit=6.0,
off_axis=None, closest_only=False, prefix=None, flux_plot='log',
fxlabels=None, fylabels=None, ftitles=None, svg=False,
title_size='16pt', x_label_size='12pt', y_label_size='12pt',
legend_size='10pt', xmajor_size='8pt', ymajor_size='8pt',
bar_size='12pt', bar_major_size='8pt'):
"""Plot model1 source properties against that of model2
Parameters
----------
models : dict
Tigger formatted model files e.g {model1: model2}.
tolerance : float
Tolerace in detecting source from model 2 (in arcsec).
plot : bool
Output html plot from which a png can be obtained.
all_source: bool
Compare all sources in the catalog (else only point-like source)
shape_limit: float
Cross match only sources with maj-axis less than this value
closest_only: bool
Returns the closest source only as the matching source
flux_plot: str
The type of output flux comparison plot (options:log,snr,inout)
prefix : str
Prefix for output htmls
fxlabels : str[]
X-axis labels for the flux comparison plots
fylabels : str[]
Y-axis labels for the flux comparison plots
fylabels : str[]
Title labels for the flux comparison plots
Returns
-------
results : dict
Dictionary of source properties from each model.
"""
results = dict()
for _models in models:
input_model = _models[0]
output_model = _models[1]
heading = input_model["label"]
results[heading] = {'models': [input_model["path"], output_model["path"]]}
results[heading]['flux'] = []
results[heading]['shape'] = []
results[heading]['position'] = []
# No matching source
results[heading]['no_match1'] = []
results[heading]['no_match2'] = []
results[heading]['overlay'] = []
props = get_detected_sources_properties('{}'.format(input_model["path"]),
'{}'.format(output_model["path"]),
all_sources=all_sources,
tolerance=tolerance,
closest_only=closest_only,
off_axis=off_axis)
for i in range(len(props[0])):
flux_prop = list(props[0].items())
results[heading]['flux'].append(flux_prop[i][-1])
for i in range(len(props[1])):
shape_prop = list(props[1].items())
results[heading]['shape'].append(shape_prop[i][-1])
for i in range(len(props[2])):
pos_prop = list(props[2].items())
results[heading]['position'].append(pos_prop[i][-1])
for i in range(len(props[3])):
no_match_prop1 = list(props[3].items())
results[heading]['no_match1'].append(no_match_prop1[i][-1])
for i in range(len(props[4])):
no_match_prop2 = list(props[4].items())
results[heading]['no_match2'].append(no_match_prop2[i][-1])
for i in range(len(props[5])):
no_match_prop2 = list(props[5].items())
results[heading]['overlay'].append(no_match_prop2[i][-1])
results[heading]['tolerance'] = tolerance
if plot:
_source_flux_plotter(results, models, prefix=prefix, plot_type=flux_plot,
titles=ftitles, xlabels=fxlabels, ylabels=fylabels,
svg=svg, title_size=title_size, x_label_size=x_label_size,
y_label_size=y_label_size, legend_size=legend_size,
xmajor_size=xmajor_size, ymajor_size=ymajor_size,
bar_size=bar_size, bar_major_size=bar_major_size)
_source_astrometry_plotter(results, models, prefix=prefix, svg=svg,
title_size=title_size, x_label_size=x_label_size,
y_label_size=y_label_size, legend_size=legend_size,
xmajor_size=xmajor_size, ymajor_size=ymajor_size,
bar_size=bar_size, bar_major_size=bar_major_size)
return results
[docs]def compare_residuals(residuals, skymodel=None, points=None,
inline=False, area_factor=None,
prefix=None, fov_factor=None,
units='micro',
title_size='14pt',
xmajor_size='6pt',
ymajor_size='6pt',
legend_size='10pt',
x_label_size='12pt',
y_label_size='12pt'):
if skymodel:
res = _source_residual_results(residuals, skymodel, area_factor)
else:
res = _random_residual_results(residuals, points,
fov_factor, area_factor)
_residual_plotter(residuals, results=res, points=points,
inline=inline, prefix=prefix, units=units,
title_size=title_size,
legend_size=legend_size,
xmajor_size=xmajor_size,
ymajor_size=ymajor_size,
x_label_size=x_label_size,
y_label_size=y_label_size)
return res
[docs]def targets_not_matching(sources1, sources2, matched_names, flux_units='milli'):
"""Plot model-model fluxes from lsm.html/txt models
Parameters
----------
sources1: list
List of sources from model 1
sources2: list
List of sources Sources from model 2
matched_names: dict
Dict of names from model 2 that matched that of model 1
flux_units: str
Units of flux density for tabulated values
Returns
-------
target_no_match1: dict
Sources from model 1 that have no match in model 2
target_no_match2: dict
Sources from model 2 that have no match in model 1
"""
deci = DECIMALS # round off to this decimal places
units = flux_units
targets_not_matching_a = dict()
targets_not_matching_b = dict()
for s1 in sources1:
if s1.name not in matched_names.keys():
props1 = [s1.name,
round(s1.flux.I*FLUX_UNIT_SCALER[units][0], deci),
round(s1.flux.I_err*FLUX_UNIT_SCALER[units][0], deci)
if s1.flux.I_err else None,
unwrap(round(rad2deg(s1.pos.ra), deci)),
f'{rad2deg(s1.pos.ra_err):.{deci}e}'
if s1.pos.ra_err else None,
round(rad2deg(s1.pos.dec), deci),
f'{rad2deg(s1.pos.dec_err):.{deci}e}'
if s1.pos.dec_err else None]
targets_not_matching_a[s1.name] = props1
for s2 in sources2:
if s2.name not in matched_names.values():
props2 = [s2.name,
round(s2.flux.I*FLUX_UNIT_SCALER[units][0], deci),
round(s2.flux.I_err*FLUX_UNIT_SCALER[units][0], deci)
if s2.flux.I_err else None,
unwrap(round(rad2deg(s2.pos.ra), deci)),
f'{rad2deg(s2.pos.ra_err):.{deci}e}'
if s2.pos.ra_err else None,
round(rad2deg(s2.pos.dec), deci),
f'{rad2deg(s2.pos.dec_err):.{deci}e}'
if s2.pos.dec_err else None]
targets_not_matching_b[s2.name] = props2
return targets_not_matching_a, targets_not_matching_b
[docs]def get_source_overlay(sources1, sources2):
"""Get source from models compare for overlay"""
sources = dict()
for s1 in sources1:
props1 = [s1.name,
s1.flux.I, s1.flux.I_err,
s1.pos.ra, s1.pos.ra_err,
s1.pos.dec, s1.pos.dec_err,
1]
sources[s1.name+'-1'] = props1
LOGGER.info("Model 1 source: {}".format(len(sources1)))
for s2 in sources2:
props2 = [s2.name,
s2.flux.I, s2.flux.I_err,
s2.pos.ra, s2.pos.ra_err,
s2.pos.dec, s2.pos.dec_err,
2]
sources[s2.name+'-2'] = props2
LOGGER.info("Model 2 source: {}".format(len(sources2)))
return sources
[docs]def plot_photometry(models, label=None, tolerance=0.2, phase_centre=None,
all_sources=False, flux_plot='log', off_axis=None,
shape_limit=6.0):
"""Plot model-model fluxes from lsm.html/txt models
Parameters
----------
models : dict
Tigger/text formatted model files e.g {model1: model2}.
label : str
Use this label instead of the FITS image path when saving data.
tolerance: float
Radius around the source to be cross matched (in arcsec).
phase_centre : str
Phase centre of catalog (if not already embeded)
all_source: bool
Compare all sources in the catalog (else only point-like source)
"""
_models = []
i = 0
for model1, model2 in models.items():
_models.append([dict(label="{}-model_a_{}".format(label, i), path=model1),
dict(label="{}-model_b_{}".format(label, i), path=model2)])
i += 1
results = compare_models(_models, tolerance, False, phase_centre, all_sources, off_axis)
_source_flux_plotter(results, _models, inline=True, plot_type=flux_plot)
[docs]def plot_astrometry(models, label=None, tolerance=0.2, phase_centre=None,
all_sources=False, off_axis=None):
"""Plot model-model positions from lsm.html/txt models
Parameters
----------
models : dict
Tigger/text formatted model files e.g {model1: model2}.
label : str
Use this label instead of the FITS image path when saving data.
tolerance: float
Radius around the source to be cross matched.
phase_centre : str
Phase centre of catalog (if not already embeded)
all_source: bool
Compare all sources in the catalog (else only point-like source)
"""
_models = []
i = 0
for model1, model2 in models.items():
_models.append([dict(label="{}-model_a_{}".format(label, i), path=model1),
dict(label="{}-model_b_{}".format(label, i), path=model2)])
i += 1
results = compare_models(_models, tolerance, False, phase_centre, all_sources, off_axis)
_source_astrometry_plotter(results, _models, inline=True)
[docs]def plot_residuals_noise(res_noise_images, skymodel=None, label=None,
area_factor=2.0, points=100):
"""Plot residual-residual or noise data
Parameters
----------
res_noise_images: dict
Dictionary of residual images to plot {res1.fits: res2.fits}.
skymodel: file
Skymodel file to locate on source residuals (lsm.html/txt)
label : str
Use this label instead of the FITS image path when saving data.
area_factor : float
Factor to multiply the beam area.
points: int
Number of data point to generate in case of random residuals.
"""
_residual_images = []
i = 0
for res1, res2 in res_noise_images.items():
_residual_images.append([dict(label="{}-res_a_{}".format(label, i), path=res1),
dict(label="{}-res_b_{}".format(label, i), path=res2)])
i += 1
compare_residuals(_residual_images, skymodel, points, True, area_factor)
def _source_flux_plotter(results, all_models, inline=False, units='milli',
prefix=None, plot_type='log', titles=None, svg=False,
xlabels=None, ylabels=None, title_size='16pt',
x_label_size='12pt', y_label_size='12pt',
legend_size='10pt', xmajor_size='8pt',
ymajor_size='8pt', bar_size='12pt',
bar_major_size='8pt'):
"""Plot flux results and save output as html file.
Parameters
----------
results : dict
Structured output results.
models : list
Tigger/text formatted model files.
e.g. [[{'label': 'model_a_1', 'path': 'point_skymodel1.txt'},
{'label': 'model_b_1', 'path': 'point_skymodel1.lsm.html'}]]
inline : bool
Allow inline plotting inside a notebook.
units : str
Data points and axis label units
plot_type: str
The type of output flux comparison plot (options:log,snr,inout)
prefix : str
Prefix for output htmls
fxlabels : str[]
X-axis labels for the flux comparison plots
fylabels : str[]
Y-axis labels for the flux comparison plots
fylabels : str[]
Title labels for the flux comparison plots
title_size : str
Title label size for the flux comparison plots
x_label_size : str
X-axis label size for the flux comparison plots
y_label_size : str
Y-axis label size for the flux comparison plots
legend_size : str
Legend label size for the flux comparison plots
xmajor_size : str
X-axis major label size for the flux comparison plots
ymajor_size : str
Y-axis major label size for the flux comparison plots
bar_size : str
Colorbar text font size
bar_major_size : str
Colorbar major axis text font size
svg : bool
Whether to save svg plots in addition to the standard html
"""
if prefix:
outfile = f'{prefix}-FluxOffset.html'
else:
outfile = 'FluxOffset.html'
output_file(outfile)
flux_plot_list = []
for pair, model_pair in enumerate(all_models):
heading = model_pair[0]['label']
name_labels = []
flux_in_data = []
flux_out_data = []
source_scale = []
positions_in_out = []
flux_in_err_data = []
flux_out_err_data = []
phase_centre_dist = []
no_match1 = results[heading]['no_match1']
no_match2 = results[heading]['no_match2']
for n in range(len(results[heading]['flux'])):
flux_out_data.append(results[heading]['flux'][n][0])
flux_out_err_data.append(results[heading]['flux'][n][1])
flux_in_data.append(results[heading]['flux'][n][2])
flux_in_err_data.append(results[heading]['flux'][n][3])
name_labels.append(results[heading]['flux'][n][4])
phase_centre_dist.append(results[heading]['position'][n][3])
positions_in_out.append(results[heading]['position'][n][7])
source_scale.append(results[heading]['shape'][n][3])
if len(flux_in_data) > 1:
# Error lists
err_xs1 = []
err_ys1 = []
err_xs2 = []
err_ys2 = []
model_1_name = model_pair[0]['path'].split('/')[-1].split('.')[0]
model_2_name = model_pair[1]['path'].split('/')[-1].split('.')[0]
# Format data points value to a readable units
# and select type of comparison plot
x = np.array(flux_in_data) * FLUX_UNIT_SCALER[units][0]
y = np.array(flux_out_data) * FLUX_UNIT_SCALER[units][0]
xerr = np.array(flux_in_err_data) * FLUX_UNIT_SCALER[units][0]
yerr = np.array(flux_out_err_data) * FLUX_UNIT_SCALER[units][0]
if plot_type == 'inout':
x1 = x
y1 = y
xerr1 = xerr
yerr1 = yerr
axis_labels = [f"{model_1_name} S1 ({FLUX_UNIT_SCALER[units][1]})"
if not xlabels else xlabels[pair],
f"{model_2_name} S2 ({FLUX_UNIT_SCALER[units][1]})"
if not ylabels else ylabels[pair]]
elif plot_type == 'log':
x1 = np.log(x)
y1 = np.log(y)
xerr1 = np.log(xerr)
yerr1 = np.log(yerr)
axis_labels = [f"log S1: {model_1_name}" if not xlabels else xlabels[pair],
f"log S2: {model_2_name}" if not ylabels else ylabels[pair]]
elif plot_type == 'snr':
x1 = np.log(x)
y1 = (x/y)
xerr1 = xerr
yerr1 = yerr
axis_labels = ['log S1' if not xlabels else xlabels[pair],
'S1/S2' if not ylabels else ylabels[pair]]
# RA and Dec with a cross-match in deg:arcmin:arcsec
position_ra_dec = [(deg2ra(ra), deg2dec(dec)) for (ra, dec) in positions_in_out]
# Phase centre distance in degree
z = np.array(phase_centre_dist)/3600.
# Compute some fit stats of the two models being compared
if plot_type in ['log', 'inout']:
flux_MSE = mean_squared_error(x1, y1)
reg1 = linregress(x1, y1)
flux_R_score = reg1.rvalue
elif plot_type in ['snr']:
reg1 = linregress(x1, y1)
mean_val = np.mean(y1)
median = np.median(y1)
std_val = np.std(y1)
mad_val = scipy.stats.median_abs_deviation(y1)
max_val = y1.max()
min_val = y1.min()
# Table with stats data
deci = DECIMALS # Round off to this decimal places
cols = ["Stats", "Value"]
if plot_type in ['log', 'inout']:
stats = {"Stats": ["Slope",
f"Intercept ({FLUX_UNIT_SCALER[units][1]})",
f"RMS_Error ({FLUX_UNIT_SCALER[units][1]})",
"R2"],
"Value": [f"{reg1.slope:.{deci}f}",
f"{reg1.intercept:.{deci}f}",
f"{np.sqrt(flux_MSE):.{deci}e}",
f"{flux_R_score:.{deci}f}"]}
elif plot_type in ['snr']:
stats = {"Stats": ["MAX", "MIN", "MEAN", "MAD", "MEDIAN", "STD"],
"Value": [f"{max_val:.{deci}f}",
f"{min_val:.{deci}f}",
f"{mean_val:.{deci}f}",
f"{mad_val:.{deci}f}",
f"{median:.{deci}f}",
f"{std_val:.{deci}f}"]}
# Create additional feature on the plot such as hover, display text
TOOLS = "crosshair,pan,wheel_zoom,box_zoom,reset,hover,save"
source = ColumnDataSource(
data=dict(flux_1=x, flux_2=y,
plot_flux_1=x1, plot_flux_2=y1,
flux_1_err=xerr1, flux_2_err=yerr1,
phase_centre_dist=z,
ra_dec=position_ra_dec,
label=name_labels))
text = "Flux Offset" if not titles else titles[pair]
# Create a plot object
plot_flux = figure(title=text,
x_axis_label=axis_labels[0],
y_axis_label=axis_labels[1],
tools=TOOLS)
# Plot title font sizes
plot_flux.title.text_font_size = title_size
plot_flux.xaxis.axis_label_text_font_size = x_label_size
plot_flux.yaxis.axis_label_text_font_size = y_label_size
plot_flux.xaxis.major_label_text_font_size = xmajor_size
plot_flux.yaxis.major_label_text_font_size = ymajor_size
# Create a color bar and size objects
color_bar_height = 100
mapper_opts = dict(palette="Plasma11",
low=min(z),
high=max(z))
flux_mapper = LinearColorMapper(**mapper_opts)
color_bar = ColorBar(color_mapper=flux_mapper,
ticker=plot_flux.xaxis.ticker,
formatter=plot_flux.xaxis.formatter,
title='Distance off-axis (deg)',
title_text_font_size=bar_size,
title_text_align='center',
major_label_text_font_size=bar_major_size,
orientation='horizontal')
#color_bar_plot = figure(title="Distance off-axis (deg)",
#title_location="below",
#height=color_bar_height,
#toolbar_location=None,
#outline_line_color='red',
#min_border=0)
#color_bar_plot.title.text_font_size = '20pt'
# Get errors from the input/output fluxes
for xval, yval, xerr, yerr in zip(x1, y1,
np.array(flux_in_err_data) * FLUX_UNIT_SCALER[units][0],
np.array(flux_out_err_data) * FLUX_UNIT_SCALER[units][0]):
err_xs1.append((xval - xerr, xval + xerr))
err_ys2.append((yval - yerr, yval + yerr))
err_ys1.append((yval, yval))
err_xs2.append((xval, xval))
# Create S2plot object for errors
error1_plot = plot_flux.multi_line(err_xs1, err_ys1,
legend_label="Errors",
color="red")
error2_plot = plot_flux.multi_line(err_xs2, err_ys2,
legend_label="Errors",
color="red")
# Create a plot object for a Fit
if plot_type == 'inout':
fit_points = 100
slope = reg1.slope
intercept = reg1.intercept
fit_xs = np.linspace(0 if 0 < min(x1) else min(x1), max(x1), fit_points)
fit_ys = slope * fit_xs + intercept
# Regression fit plot
fit = plot_flux.line(fit_xs, fit_ys,
legend_label="Fit",
color="blue")
# Create a plot object for I_out = I_in line .i.e. Perfect match
min_val = min(x1) if min(x1) < min(y1) else min(y1)
max_val = max(y1) if max(y1) > max(x1) else max(x1)
equal = plot_flux.line(np.array([0 if 0 < min_val else min_val, max_val]),
np.array([0 if 0 < min_val else min_val, max_val]),
legend_label=u"S1=S2",
line_dash="dashed",
color="gray")
elif plot_type == 'snr':
fit_points = 100
# Regression fit plot
min_val = min(x1) if max(x1) < 0 else 0
max_val = max(x1)
# Create a plot object for I_out = I_in line .i.e. Perfect match
equal = plot_flux.line(np.array([min_val, max_val]),
np.array([1, 1]),
legend_label=u"S1/S2=1",
line_dash="dashed",
color="gray")
elif plot_type == 'log':
fit_points = 100
slope = reg1.slope
intercept = reg1.intercept
min_val = min(x1) if min(x1) < min(y1) else min(y1)
max_val = max(y1) if max(y1) > max(x1) else max(x1)
# Regression fit plot
fit_xs = np.linspace(min_val, max_val, fit_points)
fit_ys = slope * fit_xs + intercept
fit = plot_flux.line(fit_xs, fit_ys,
legend_label="Fit",
color="blue")
# Create a plot object for I_out = I_in line .i.e. Perfect match
equal = plot_flux.line(np.array([0 if 0 < min_val else min_val, max_val]),
np.array([0 if 0 < min_val else min_val, max_val]),
legend_label=u"log(S1)=log(S2)",
line_dash="dashed",
color="gray")
# Create a plot object for the data points
data = plot_flux.circle('plot_flux_1', 'plot_flux_2',
name='data',
legend_label="Data",
source=source,
line_color=None,
fill_color={"field": "phase_centre_dist",
"transform": flux_mapper})
source = ColumnDataSource(data=stats)
columns = [TableColumn(field=x, title=x.capitalize()) for x in cols]
dtab = DataTable(source=source, columns=columns,
width=500, max_width=550,
height=100, max_height=150)
table_title = Div(text="Cross Matching Statistics")
table_title.align = "center"
stats_table = column([table_title, dtab])
# Table with no match data1
_fu = FLUX_UNIT_SCALER[units][1]
cols1 = ["Source", "Flux [%s]"%_fu, "Flux_err [%s]"%_fu, "RA",
"RA_err ['']", "DEC", "DEC_err ['']"]
stats1 = {"Source": [s[0] for s in no_match1],
"Flux [%s]"%_fu: [s[1] for s in no_match1],
"Flux_err [%s]"%_fu: [s[2] for s in no_match1],
"RA": [deg2ra(s[3], deci) for s in no_match1],
"RA_err ['']": [round(deg2arcsec(s[4] if s[4] else 0),
deci) for s in no_match1],
"DEC": [deg2dec(s[5], deci) for s in no_match1],
"DEC_err ['']": [round(deg2arcsec(s[6] if s[6] else 0),
deci) for s in no_match1]}
source1 = ColumnDataSource(data=stats1)
columns1 = [TableColumn(field=x, title=x.capitalize()) for x in cols1]
dtab1 = DataTable(source=source1, columns=columns1,
width=500, max_width=550,
height=150, max_height=200)
table_title1 = Div(text=f"Non-matching sources from {model_1_name}")
table_title1.align = "center"
stats_table1 = column([table_title1, dtab1])
# Table with no match data1
cols2 = ["Source", "Flux [%s]"%_fu, "Flux_err [%s]"%_fu, "RA",
"RA_err ['']", "DEC", "DEC_err ['']"]
stats2 = {"Source": [s[0] for s in no_match2],
"Flux [%s]"%_fu: [s[1] for s in no_match2],
"Flux_err [%s]"%_fu: [s[2] for s in no_match2],
"RA": [deg2ra(s[3], deci) for s in no_match2],
"RA_err ['']": [round(deg2arcsec(s[4] if s[4] else 0),
deci) for s in no_match2],
"DEC": [deg2dec(s[5], deci) for s in no_match2],
"DEC_err ['']": [round(deg2arcsec(s[6]),
deci) for s in no_match2]}
source2 = ColumnDataSource(data=stats2)
columns2 = [TableColumn(field=x, title=x.capitalize()) for x in cols2]
dtab2 = DataTable(source=source2, columns=columns2,
width=500, max_width=550,
height=150, max_height=200)
table_title2 = Div(text=f"Non-matching sources from {model_2_name}")
table_title2.align = "center"
stats_table2 = column([table_title2, dtab2])
# Attaching the hover object with labels
hover = plot_flux.select(dict(type=HoverTool))
hover.tooltips = OrderedDict([
("source", "(@label)"),
("(S1,S2)", "(@flux_1, @flux_2)"),
("(S_err1, S_err2)"," (@flux_1_err, @flux_2_err)"),
("(RA,DEC)", "@ra_dec"),
("Distance off-axis", "@phase_centre_dist")])
# Legend position, size and title align
plot_flux.legend.location = "top_left"
plot_flux.legend.label_text_font_size = legend_size
plot_flux.title.align = "center"
plot_flux.legend.click_policy = "hide"
# Colorbar position
plot_flux.add_layout(color_bar, "below")
#color_bar_plot.add_layout(color_bar, "below")
#color_bar_plot.title.align = "center"
# Append all plots
flux_plot_list.append(column(row(plot_flux,
column(stats_table,
stats_table1,
stats_table2))))
else:
LOGGER.warn('No photometric plot created for {}'.format(model_pair[1]["path"]))
if flux_plot_list:
# Make the plots in a column layout
flux_plots = column(flux_plot_list)
if svg:
plot_flux.output_backend='svg'
prefix = '.'.join(outfile.split('.')[:-1])
export_svgs(flux_plots, filename=f"{prefix}.svg")
# Save the plot (html)
save(flux_plots, title=outfile)
LOGGER.info('Saving photometry comparisons in {}'.format(outfile))
def _source_astrometry_plotter(results, all_models, inline=False, units='',
prefix=None, svg=False, title_size='16pt',
x_label_size='12pt', y_label_size='12pt',
legend_size='10pt', xmajor_size='6pt',
ymajor_size='6pt', bar_size='8pt',
bar_major_size='8pt'):
"""Plot astrometry results and save output as html file.
Parameters
----------
results: dict
Structured output results.
models : list
Tigger/text formatted model files.
e.g. [[{'label': 'model_a_1', 'path': 'point_skymodel1.txt'},
{'label': 'model_b_1', 'path': 'point_skymodel1.lsm.html'}]]
inline : bool
Allow inline plotting inside a notebook.
units : str
Data points and axis label units
prefix : str
Prefix for output htmls
svg : bool
Whether to save svg plots in addition to the standard html
title_size : str
Title label size for the flux comparison plots
x_label_size : str
X-axis label size for the flux comparison plots
y_label_size : str
Y-axis label size for the flux comparison plots
legend_size : str
Legend label size for the flux comparison plots
xmajor_size : str
X-axis major label size for the flux comparison plots
ymajor_size : str
Y-axis major label size for the flux comparison plots
bar_size : str
Colorbar text font size
bar_major_size : str
Colorbar major axis text font size
"""
if prefix:
outfile = f'{prefix}-PositionOffset.html'
else:
outfile = 'PositionOffset.html'
output_file(outfile)
position_plot_list = []
for model_pair in all_models:
RA_offset = []
RA_err = []
DEC_offset = []
DEC_err = []
source_labels = []
flux_in_data = []
flux_out_data = []
delta_pos_data = []
positions_in_out = []
phase_centre_dist = []
heading = model_pair[0]['label']
overlays = results[heading]['overlay']
tolerance = results[heading]['tolerance']
for n in range(len(results[heading]['flux'])):
flux_out_data.append(results[heading]['flux'][n][0])
delta_pos_data.append(results[heading]['position'][n][0])
RA_offset.append(results[heading]['position'][n][1])
DEC_offset.append(results[heading]['position'][n][2])
phase_centre_dist.append(results[heading]['position'][n][3])
flux_in_data.append(results[heading]['position'][n][4])
RA_err.append(results[heading]['position'][n][5])
DEC_err.append(results[heading]['position'][n][6])
positions_in_out.append(results[heading]['position'][n][7])
source_labels.append(results[heading]['position'][n][8])
# Compute some stats of the two models being compared
if len(flux_in_data) > 1:
model_1_name = model_pair[0]['path'].split('/')[-1].split('.')[0]
model_2_name = model_pair[1]['path'].split('/')[-1].split('.')[0]
RA_mean = np.mean(RA_offset)
DEC_mean = np.mean(DEC_offset)
r1, r2 = np.array(RA_offset).std(), np.array(DEC_offset).std()
# Generate data for a sigma circle around data points
fit_points = 100
pi, cos, sin = np.pi, np.cos, np.sin
theta = np.linspace(0, 2.0 * pi, fit_points)
x1 = RA_mean+(r1 * cos(theta))
y1 = DEC_mean+(r2 * sin(theta))
# Get the number of sources recovered and within 1 sigma
recovered_sources = len(DEC_offset)
one_sigma_sources = len([
(ra_off, dec_off) for ra_off, dec_off in zip(RA_offset, DEC_offset)
if abs(ra_off) <= max(abs(x1)) and abs(dec_off) <= max(abs(y1))])
# Format data list into numpy arrays
x_ra = np.array(RA_offset)
y_dec = np.array(DEC_offset)
x_ra_err = np.array(RA_err)
y_dec_err = np.array(DEC_err)
# TODO: Use flux as a radius dimension
flux_in_mjy = np.array(flux_in_data) * FLUX_UNIT_SCALER['milli'][0]
flux_out_mjy = np.array(flux_out_data) * FLUX_UNIT_SCALER['milli'][0]
z = np.array(phase_centre_dist)/3600. # For color
# RA and Dec with a cross-match in deg:arcmin:arcsec
position_ra_dec = [(deg2ra(ra), deg2dec(dec)) for (ra, dec) in positions_in_out]
# Create additional feature on the plot such as hover, display text
TOOLS = "crosshair,pan,wheel_zoom,box_zoom,reset,hover,save"
source = ColumnDataSource(
data=dict(ra_offset=x_ra,
ra_err=x_ra_err,
dec_offset=y_dec,
dec_err=y_dec_err,
ra_dec=position_ra_dec,
phase_centre_dist=z,
flux_s1=flux_in_mjy,
flux_s2=flux_out_mjy,
label=source_labels))
# Create a plot object
plot_position = figure(title="Position Offset",
x_axis_label='RA offset ({:s})'.format(
POSITION_UNIT_SCALER['arcsec'][1]),
y_axis_label='DEC offset ({:s})'.format(
POSITION_UNIT_SCALER['arcsec'][1]),
tools=TOOLS)
plot_position.title.text_font_size = title_size
plot_position.xaxis.axis_label_text_font_size = x_label_size
plot_position.yaxis.axis_label_text_font_size = y_label_size
# Create an image overlay
s1_ra_rad = [src[3] for src in overlays if src[-1] == 1]
s1_ra_deg = [unwrap(rad2deg(s_ra)) for s_ra in s1_ra_rad]
s1_dec_rad = [src[5] for src in overlays if src[-1] == 1]
s1_dec_deg = [rad2deg(s_dec) for s_dec in s1_dec_rad]
s1_ra_err = [rad2deg(src[4]*3600.) for src in overlays if src[-1] == 1]
s1_dec_err = [rad2deg(src[6]*3600.) for src in overlays if src[-1] == 1]
s1_labels = [src[0] for src in overlays if src[-1] == 1]
s1_flux = [src[1] for src in overlays if src[-1] == 1]
s2_ra_rad = [src[3] for src in overlays if src[-1] == 2]
s2_ra_deg = [unwrap(rad2deg(s_ra)) for s_ra in s2_ra_rad]
s2_dec_rad = [src[5] for src in overlays if src[-1] == 2]
s2_dec_deg = [rad2deg(s_dec) for s_dec in s2_dec_rad]
s2_ra_err = [rad2deg(src[4]*3600.) for src in overlays if src[-1] == 2]
s2_dec_err = [rad2deg(src[6]*3600.) for src in overlays if src[-1] == 2]
s2_labels = [src[0] for src in overlays if src[-1] == 2]
s2_flux = [src[1] for src in overlays if src[-1] == 2]
overlay_source1 = ColumnDataSource(
data=dict(ra1=s1_ra_deg, dec1=s1_dec_deg,
str_ra1=[deg2ra(_s1_radeg) for _s1_radeg in s1_ra_deg],
str_dec1=[deg2dec(_s1_decdeg) for _s1_decdeg in s1_dec_deg],
ra_err1=s1_ra_err, dec_err1=s1_dec_err,
label1=s1_labels, flux1=s1_flux))
overlay_source2 = ColumnDataSource(
data=dict(ra2=s2_ra_deg, dec2=s2_dec_deg,
str_ra2=[deg2ra(_s2_radeg) for _s2_radeg in s2_ra_deg],
str_dec2=[deg2dec(_s2_decdeg) for _s2_decdeg in s2_dec_deg],
ra_err2=s2_ra_err, dec_err2=s2_dec_err,
label2=s2_labels, flux2=s2_flux))
plot_overlay = figure(title="Catalogs Overlay",
x_axis_label='RA ({:s})'.format(
POSITION_UNIT_SCALER['deg'][1]),
y_axis_label='DEC ({:s})'.format(
POSITION_UNIT_SCALER['deg'][1]),
match_aspect=True,
tools=("crosshair,pan,wheel_zoom,"
"box_zoom,reset,save"))
plot_overlay.ellipse('ra1', 'dec1',
source=overlay_source1,
width=tolerance/3600.,
height=tolerance/3600.,
line_color=None,
color='#CAB2D6')
plot_overlay_1 = plot_overlay.circle('ra1', 'dec1',
name='model1',
legend_label=model_1_name,
source=overlay_source1,
#line_color=None,
color='blue')
plot_overlay_2 = plot_overlay.circle('ra2', 'dec2',
name='model2',
legend_label=model_2_name,
source=overlay_source2,
#line_color=None,
color='red')
plot_position.title.text_font_size = title_size
plot_position.xaxis.axis_label_text_font_size = x_label_size
plot_position.yaxis.axis_label_text_font_size = y_label_size
plot_position.xaxis.major_label_text_font_size = xmajor_size
plot_position.yaxis.major_label_text_font_size = ymajor_size
plot_position.axis.axis_label_text_font_style = 'normal'
plot_overlay.title.text_font_size = title_size
plot_overlay.xaxis.axis_label_text_font_size = x_label_size
plot_overlay.yaxis.axis_label_text_font_size = y_label_size
plot_overlay.legend.label_text_font_size = legend_size
plot_overlay.xaxis.major_label_text_font_size = xmajor_size
plot_overlay.yaxis.major_label_text_font_size = ymajor_size
plot_overlay.axis.axis_label_text_font_style = 'normal'
plot_overlay.title.align = "center"
plot_overlay.legend.location = "top_left"
plot_overlay.legend.click_policy = "hide"
color_bar_height = 100
plot_overlay.x_range.flipped = True
# Colorbar Mapper
mapper_opts = dict(palette="Plasma11",
low=min(z),
high=max(z))
position_mapper = LinearColorMapper(**mapper_opts)
color_bar = ColorBar(color_mapper=position_mapper,
ticker=plot_position.xaxis.ticker,
formatter=plot_position.xaxis.formatter,
location=(0,0),
title='Distance off-axis (deg)',
title_text_font_size=bar_size,
title_text_align='center',
major_label_text_font_size=bar_major_size,
orientation='horizontal')
# color_bar_plot = figure(title="Distance off-axis (deg)",
# title_location="below",
# height=color_bar_height,
# toolbar_location=None,
# outline_line_color=None,
# min_border=0)
# color_bar_plot.title.text_font_size = '10pt'
# Get errors from the output positions
err_xs1 = []
err_ys1 = []
err_xs2 = []
err_ys2 = []
for x, y, xerr, yerr in zip(x_ra, y_dec, np.array(RA_err), np.array(DEC_err)):
err_xs1.append((x - xerr, x + xerr))
err_ys1.append((y, y))
err_xs2.append((x, x))
err_ys2.append((y - yerr, y + yerr))
# Create a plot object for errors
error1_plot = plot_position.multi_line(err_xs1, err_ys1,
legend_label="Errors",
color="red")
error2_plot = plot_position.multi_line(err_xs2, err_ys2,
legend_label="Errors",
color="red")
# Creat an sigma circle plot object
sigma_plot = plot_position.line(np.array(x1), np.array(y1),
legend_label='Sigma')
# Create position data points plot object
plot_position.circle('ra_offset', 'dec_offset',
name='data',
source=source,
line_color=None,
legend_label='Data',
fill_color={"field": "phase_centre_dist",
"transform": position_mapper})
# Table with stats data
deci = DECIMALS # round off to this decimal places
cols = ["Stats", "Value"]
stats = {"Stats": ["Total sources",
"(RA, DEC) mean offset ['']",
"Sigma sources",
"(RA, DEC) sigma offset ['']"],
"Value": [recovered_sources,
f"({round(deg2arcsec(RA_mean), deci)},"
f"{round(deg2arcsec(DEC_mean), deci)})",
one_sigma_sources,
f"({round(deg2arcsec(r1), deci)},"
f"{round(deg2arcsec(r2), deci)})"]}
source = ColumnDataSource(data=stats)
columns = [TableColumn(field=x, title=x.capitalize()) for x in cols]
dtab = DataTable(source=source, columns=columns,
width=450, max_width=500,
height=100, max_height=150)
table_title = Div(text="Cross Matching Statistics")
table_title.align = "center"
stats_table = column([table_title, dtab])
# Attaching the hover object with labels
hover = plot_position.select(dict(type=HoverTool))
hover.tooltips = OrderedDict([
("source", "(@label)"),
("(S1,S2) [mJy]",
"(@flux_s1, @flux_s2)"),
("(RA,DEC)", "(@ra_dec)"),
("(RA_err,DEC_err)",
"(@ra_err, @dec_err)"),
("(RA_offset,DEC_offset)",
"(@ra_offset, @dec_offset)"),
("Distance off-axis",
"@phase_centre_dist")])
plot_overlay.add_tools(
HoverTool(renderers=[plot_overlay_1],
tooltips=OrderedDict([
("source1", "@label1"),
("Flux (mJy)", "@flux1"),
("(RA,DEC)", "(@str_ra1, @str_dec1)"),
("(RA_err,DEC_err)",
"(@ra_err1, @dec_err1)")])))
plot_overlay.add_tools(
HoverTool(renderers=[plot_overlay_2],
tooltips=OrderedDict([
("source2", "@label2"),
("Flux (mJy)", "@flux2"),
("(RA,DEC)", "(@str_ra2, @str_dec2)"),
("(RA_err,DEC_err)",
"(@ra_err2, @dec_err2)")])))
# Legend position and title align
plot_position.legend.location = "top_left"
plot_position.legend.click_policy = "hide"
plot_position.title.align = "center"
# Colorbar position
plot_position.add_layout(color_bar, "below")
plot_position.legend.label_text_font_size = legend_size
# color_bar_plot.add_layout(color_bar, "below")
# color_bar_plot.title.align = "center"
if svg:
plot_overlay.output_backend = "svg"
plot_position.output_backend = "svg"
prefix = '.'.join(outfile.split('.')[:-1])
export_svgs(column(plot_overlay), filename=f"{prefix}_1.svg")
export_svgs(column(plot_position), filename=f"{prefix}_2.svg")
# Append object to plot list
position_plot_list.append(column(row(plot_position, plot_overlay,
column(stats_table))))
else:
LOGGER.warn('No plot astrometric created for {}'.format(model_pair[1]["path"]))
if position_plot_list:
# Make the plots in a column layout
position_plots = column(position_plot_list)
# Save the plot (html)
save(position_plots, title=outfile)
LOGGER.info('Saving astrometry comparisons in {}'.format(outfile))
def _residual_plotter(res_noise_images, points=None, results=None,
inline=False, prefix=None, title_size='16pt',
x_label_size='12pt', y_label_size='12pt',
legend_size='10pt', xmajor_size='6pt',
ymajor_size='6pt', units='micro', svg=False):
"""Plot ratios of random residuals and noise
Parameters
----------
res_noise_images: dict
Structured input images with labels.
points: int
Number of data point to generate in case of random residuals
results: dict
Structured output results.
inline : bool
Allow inline plotting inside a notebook.
prefix : str
Prefix for output htmls
"""
if points:
title = "Random Residual Noise"
if prefix:
outfile = f'{prefix}-RandomResidualNoiseRatio.html'
else:
outfile = 'RandomResidualNoiseRatio.html'
else:
title = "Source Residual Noise"
if prefix:
outfile = f'{prefix}-SourceResidualNoiseRatio.html'
else:
outfile = 'SourceResidualNoiseRatio.html'
output_file(outfile)
residual_plot_list = []
for residual_pair in res_noise_images:
residuals1 = []
residuals2 = []
name_labels = []
phase_centre_dist = []
res_noise_ratio = []
res_image = residual_pair[0]['label']
for res_src in results[res_image]:
residuals1.append(res_src[0])
residuals2.append(res_src[1])
res_noise_ratio.append(res_src[2])
phase_centre_dist.append(res_src[3])
name_labels.append(res_src[4])
if len(name_labels) > 1:
# Get sigma value of residuals
res1 = np.array(residuals1) * FLUX_UNIT_SCALER[units][0]
res2 = np.array(residuals2) * FLUX_UNIT_SCALER[units][0]
# Get ratio data
y1 = np.array(res_noise_ratio)
x1 = np.array(range(len(res_noise_ratio)))
# Create additional feature on the plot such as hover, display text
TOOLS = "crosshair,pan,wheel_zoom,box_zoom,reset,hover,save"
source = ColumnDataSource(
data=dict(x=x1, y=y1, res1=res1, res2=res2, label=name_labels))
text1 = residual_pair[0]["path"].split("/")[-1].split('.fits')[0]
text2 = residual_pair[1]["path"].split("/")[-1].split('.fits')[0]
# Get y2 label and range
y2_label = "Flux density ({})".format(FLUX_UNIT_SCALER[units][1])
y_max = max(res1) if max(res1) > max(res2) else max(res2)
y_min = min(res1) if min(res1) < min(res2) else min(res2)
# Create a plot objects and set axis limits
plot_residual = figure(title=title,
x_axis_label='Sources',
y_axis_label='Res1-to-Res2',
plot_width=1200, plot_height=800,
tools=TOOLS)
plot_residual.y_range = Range1d(start=min(y1) - .01, end=max(y1) + .01)
plot_residual.extra_y_ranges = {y2_label: Range1d(start=y_min - .01 * abs(y_min),
end=y_max + .01 * abs(y_max))}
plot_residual.add_layout(LinearAxis(y_range_name=y2_label,
axis_label=y2_label),
'right')
plot_residual.axis.axis_label_text_font_style = 'bold'
res1_object = plot_residual.line(x1, res1,
color='red',
legend_label=f'res1: {text1}',
y_range_name=y2_label)
res2_object = plot_residual.line(x1, res2,
color='blue',
legend_label=f'res2: {text2}',
y_range_name=y2_label)
res_ratio_object = plot_residual.line('x', 'y',
name='ratios',
source=source,
color='green',
legend_label='res1-to-res2')
plot_residual.title.text_font_size = title_size
plot_residual.xaxis.axis_label_text_font_size = x_label_size
plot_residual.yaxis.axis_label_text_font_size = y_label_size
plot_residual.legend.label_text_font_size = legend_size
plot_residual.xaxis.major_label_text_font_size = xmajor_size
plot_residual.yaxis.major_label_text_font_size = ymajor_size
# Table with stats data
cols = ["Stats", "Value"]
stats = {"Stats": [f"{text1} ({FLUX_UNIT_SCALER[units][1]})",
f"{text2} ({FLUX_UNIT_SCALER[units][1]})",
"Res1-to-Res2"],
"Value": [np.mean(residuals1) * FLUX_UNIT_SCALER[units][0],
np.mean(residuals2) * FLUX_UNIT_SCALER[units][0],
np.mean(residuals2) / np.mean(residuals1)]}
source = ColumnDataSource(data=stats)
columns = [TableColumn(field=x, title=x.capitalize()) for x in cols]
dtab = DataTable(source=source, columns=columns,
width=550, max_width=800,
height=100, max_height=150)
table_title = Div(text="Cross Match Stats")
table_title.align = "center"
stats_table = column([table_title, dtab])
# Attaching the hover object with labels
hover = plot_residual.select(dict(type=HoverTool))
hover.tooltips = OrderedDict([
("ratio", "@y"),
("(Res1,Res2)", "(@res1,@res2)"),
("source", "@label")])
# Position of legend and title align
plot_residual.legend.location = "top_left"
plot_residual.title.align = "center"
# Add object to plot list
residual_plot_list.append(row(plot_residual, column(stats_table)))
if svg:
plot_residual.output_backend = "svg"
prefix = '.'.join(outfile.split('.')[:-1])
export_svgs(plot_residual, filename=f"{prefix}.svg")
else:
LOGGER.warn('No plot created. Found 0 or 1 data point in {}'.format(res_image))
if residual_plot_list:
# Make the plots in a column layout
residual_plots = column(residual_plot_list)
# Save the plot (html)
save(residual_plots, title=outfile)
LOGGER.info('Saving residual comparision plots {}'.format(outfile))
def _random_residual_results(res_noise_images, data_points=None,
fov_factor=None, area_factor=None):
"""Plot ratios of random residuals and noise
Parameters
----------
res_noise_images: list
List of dictionaries with residual images
data_points: int
Number of data points to extract
area_factor : float
Factor to multiply the beam area
fov_factor : float
Factor to multiply the field of view for random points
Returns
-------
results : dict
Dictionary of source residual properties from each residual image.
"""
LOGGER.info("Plotting ratios of random residuals and noise")
# dictinary to store results
results = dict()
# Get beam size otherwise use default (~6``).
beam_default = (0.00151582804885738, 0.00128031965017612, 20.0197348935424)
for images in res_noise_images:
# Source counter
i = 0
# Get label
res_label1 = images[0]['label']
# Get residual image names
res_image1 = images[0]['path']
res_image2 = images[1]['path']
# Data structure for residuals compared
results[res_label1] = []
# Get fits info
fits_info = fitsInfo(res_image1)
# Get beam size otherwise use default (~6``).
beam_deg = fits_info['b_size'] if fits_info['b_size'] else beam_default
# In case the images was not deconvloved aslo use default beam
if beam_deg == (0.0, 0.0, 0.0):
beam_deg = beam_default
# Open residual images header
res_hdu1 = fitsio.open(res_image1)
res_hdu2 = fitsio.open(res_image2)
# Get data from residual images
res_data1 = res_hdu1[0].data
res_data2 = res_hdu2[0].data
# Get random pixel coordinates
pix_coord_deg = _get_random_pixel_coord(
data_points,
phase_centre=fits_info['centre'],
sky_area=fits_info['skyArea'] * fov_factor)
# Get the number of frequency channels
nchan1 = (res_data1.shape[1]
if res_data1.shape[0] == 1
else res_data1.shape[0])
nchan2 = (res_data2.shape[1]
if res_data2.shape[0] == 1
else res_data2.shape[0])
for RA, DEC in pix_coord_deg:
i += 1
# Get width of box around source
width = int(deg2arcsec(beam_deg[0]) * area_factor)
# Get a image slice around source
imslice = get_box(fits_info["wcs"], (RA, DEC), width)
# Get noise rms in the box around the point coordinate
res1_area = res_data1[0, 0, :, :][imslice]
res2_area = res_data2[0, 0, :, :][imslice]
# Ignore empty arrays due to points at the edge
if not res1_area.size or not res2_area.size:
continue
res1_rms = res1_area.std()
res2_rms = res2_area.std()
# if image is cube then average along freq axis
if nchan1 > 1:
flux_rms1 = 0.0
for frq_ax in range(nchan1):
# In case the first two axes are swapped
if res_data1.shape[0] == 1:
target_area1 = res_data1[0, frq_ax, :, :][imslice]
else:
target_area1 = res_data1[frq_ax, 0, :, :][imslice]
# Sum of all the fluxes
flux_rms1 += target_area1.std()
# Get the average std and mean along all frequency channels
res1_rms = flux_rms1/float(nchan1)
if nchan2 > 1:
flux_rms2 = 0.0
for frq_ax in range(nchan2):
# In case the first two axes are swapped
if res_data2.shape[0] == 1:
target_area2 = res_data2[0, frq_ax, :, :][imslice]
else:
target_area2 = res_data2[frq_ax, 0, :, :][imslice]
# Sum of all the fluxes
flux_rms2 += target_area2.std()
# Get the average std and mean along all frequency channels
res2_rms = flux_rms2/float(nchan2)
# Get phase centre and determine phase centre distance
RA0 = fits_info['centre'][0]
DEC0 = fits_info['centre'][1]
phase_centre_dist= (np.sqrt((RA-RA0)**2 + (DEC-DEC0)**2))
# Store all outputs in the results data structure
results[res_label1].append([res1_rms*1e0,
res2_rms*1e0,
res1_rms/res2_rms*1e0,
phase_centre_dist,
'source{0}'.format(i)])
return results
def _source_residual_results(res_noise_images, skymodel, area_factor=None):
"""Plot ratios of source residuals and noise
Parameters
----------
res_noise: list
List of dictionaries with residual images
skymodel: file
Tigger skymodel file to locate on source residuals
area_factor : float
Factor to multiply the beam area.
Returns
-------
results : dict
Dictionary of source residual properties from each residual image.
"""
LOGGER.info("Plotting ratios of source residuals and noise")
# Dictinary to store results
results = dict()
# Get beam size otherwise use default (6``).
beam_default = (0.00151582804885738, 0.00128031965017612, 20.0197348935424)
for images in res_noise_images:
# Get label
res_label1 = images[0]['label']
# Get residual image names
res_image1 = images[0]['path']
res_image2 = images[1]['path']
# Data structure for residuals compared
results[res_label1] = []
# Get fits info
fits_info = fitsInfo(res_image1)
# Get beam size otherwise use default (~6``).
beam_deg = fits_info['b_size'] if fits_info['b_size'] else beam_default
# In case the images was not deconvloved also use default beam
if beam_deg == (0.0, 0.0, 0.0):
beam_deg = beam_default
# Open residual images header
res_hdu1 = fitsio.open(res_image1)
res_hdu2 = fitsio.open(res_image2)
# Get data from residual images
res_data1 = res_hdu1[0].data
res_data2 = res_hdu2[0].data
# Load skymodel to get source positions
model_lsm = Tigger.load(skymodel)
# Get all sources in the model
model_sources = model_lsm.sources
# Data structure for each residuals to compare
results[res_label1] = []
# Get the number of frequency channels
nchan1 = (res_data1.shape[1]
if res_data1.shape[0] == 1
else res_data1.shape[0])
nchan2 = (res_data2.shape[1]
if res_data2.shape[0] == 1
else res_data2.shape[0])
for model_source in model_sources:
# Get phase centre Ra and Dec coordinates
RA0 = model_lsm.ra0
DEC0 = model_lsm.dec0
# Get source Ra and Dec coordinates
ra = model_source.pos.ra
dec = model_source.pos.dec
# Convert to degrees
RA = rad2deg(ra)
DEC = rad2deg(dec)
# Remove any wraps
if ra > np.pi:
ra -= 2.0*np.pi
# Get distance from phase centre
delta_phase_centre = angular_dist_pos_angle(RA0, DEC0, ra, dec)
phase_dist_arcsec = rad2arcsec(delta_phase_centre[0])
phase_centre_dist = phase_dist_arcsec/3600.
# Get width of box around source
width = int(deg2arcsec(beam_deg[0]) * area_factor)
# Get a image slice around source
imslice = get_box(fits_info["wcs"], (RA, DEC), width)
# Get noise rms in the box around the point coordinate
res1_area = res_data1[0, 0, :, :][imslice]
res2_area = res_data2[0, 0, :, :][imslice]
# Ignore empty arrays due to sources at the edge
if not res1_area.size or not res2_area.size:
continue
res1_rms = res1_area.std()
res2_rms = res2_area.std()
# if image is cube then average along freq axis
if nchan1 > 1:
flux_rms1 = 0.0
for frq_ax in range(nchan1):
# In case the first two axes are swapped
if res_data1.shape[0] == 1:
target_area1 = res_data1[0, frq_ax, :, :][imslice]
else:
target_area1 = res_data1[frq_ax, 0, :, :][imslice]
# Sum of all the fluxes
flux_rms1 += target_area1.std()
# Get the average std and mean along all frequency channels
res1_rms = flux_rms1/float(nchan1)
if nchan2 > 1:
flux_rms2 = 0.0
for frq_ax in range(nchan2):
# In case the first two axes are swapped
if res_data2.shape[0] == 1:
target_area2 = res_data2[0, frq_ax, :, :][imslice]
else:
target_area2 = res_data2[frq_ax, 0, :, :][imslice]
# Sum of all the fluxes
flux_rms2 += target_area2.std()
# Get the average std and mean along all frequency channels
res2_rms = flux_rms2/float(nchan2)
# Store all outputs in the results data structure
results[res_label1].append([res1_rms * 1e0,
res2_rms * 1e0,
res1_rms / res2_rms * 1e0,
phase_centre_dist,
model_source.name,
model_source.flux.I])
return results
[docs]def plot_aimfast_stats(fidelity_results_file, units='micro', prefix=''):
"""Plot stats results if more that one residual images where assessed"""
with open(fidelity_results_file) as f:
data = json.load(f)
res_stats = dict()
dr_stats = dict()
for par, val in data.items():
val_copy = val.copy()
if '.fits' not in par and 'models' not in val and type(val) is not list:
for p, v in val.items():
if type(v) is dict:
dr_stats[p] = v
val_copy.pop(p)
res_stats[par] = val_copy
res_stats[par]['NORM'] = res_stats[par]['NORM'][0]
res_stats = dict(sorted(res_stats.items()))
dr_stats = dict(sorted(dr_stats.items()))
im_keys = []
rms_values = []
stddev_values = []
mad_values = []
max_values = []
skew_values = []
kurt_values = []
norm_values = []
for res_stat in res_stats:
im_keys.append(res_stat.replace('-residual', ''))
rms_values.append(res_stats[res_stat]['RMS'])
stddev_values.append(res_stats[res_stat]['STDDev'])
mad_values.append(res_stats[res_stat]['MAD'])
max_values.append(res_stats[res_stat]['MAX'])
skew_values.append(res_stats[res_stat]['SKEW'])
kurt_values.append(res_stats[res_stat]['KURT'])
norm_values.append(res_stats[res_stat]['NORM'])
width = 400
height = 300
multiplier = FLUX_UNIT_SCALER[units][0]
# Vriance plots
variance_plotter = figure(x_range=im_keys, x_axis_label="Image", y_axis_label="Flux density (µJy)",
plot_width=width, plot_height=height, title='Residual Variance')
variance_plotter.line(im_keys, np.array(stddev_values)*multiplier, legend_label='std', color='blue')
variance_plotter.line(im_keys, np.array(mad_values)*multiplier, legend_label='mad', color='red')
variance_plotter.line(im_keys, np.array(max_values)*multiplier, legend_label='max', color='green')
variance_plotter.title.align = 'center'
# Moment 3 & 4 plots
mom34_plotter = figure(x_range=im_keys, x_axis_label="Image", y_axis_label="Value",
plot_width=width, plot_height=height, title='Skewness & Kurtosis')
mom34_plotter.line(im_keys, skew_values, legend_label='Skewness', color='blue')
mom34_plotter.line(im_keys, kurt_values, legend_label='kurtosis', color='red')
mom34_plotter.title.align = 'center'
# Normality test plot
normalised = np.array(norm_values)/norm_values[0]
norm_plotter = figure(x_range=im_keys, x_axis_label="Image", y_axis_label="Value",
plot_width=width, plot_height=height, title='Normality Tests')
norm_plotter.vbar(x=im_keys, top=normalised, width=0.9)
#norm_plotter.y_range.start = 0
norm_plotter.title.align = 'center'
# Dynamic Range plot
dr_keys = []
dr_values = []
for dr_stat in dr_stats:
dr_keys.append(dr_stat.replace('-model', ''))
dr_values.append(dr_stats[dr_stat]['DR'])
dr_plotter = figure(x_range=dr_keys, x_axis_label="Image", y_axis_label="Value",
plot_width=width, plot_height=height, title='Dynamic Range')
dr_plotter.vbar(x=dr_keys, top=dr_values, width=0.9)
#dr_plotter.y_range.start = 0
dr_plotter.title.align = 'center'
outfile = '{}-stats-plot.html'.format(prefix or 'aimfast')
output_file(outfile)
save(column(row(variance_plotter, mom34_plotter),
row(norm_plotter, dr_plotter)), title=outfile)
[docs]def plot_subimage_stats(fitsnames, centre_coords, sizes, htmlprefix='default',
title_size='12pt', x_label_size='10pt', y_label_size='10pt',
bar_label_size='15pt', units='micro', svg=False):
"""Plot subimages and stats"""
output_dict = {}
subplot_list = []
plot_height = 300
plot_width = 400
for im in range(len(centre_coords)):
im_subplot_list = []
LOGGER.info(f"Making Subimage with centre pixels ({centre_coords[im]})")
size = sizes[im]
centre_coord = centre_coords[im]
rx, ry = centre_coord[0], centre_coord[1]
rx_0, ry_0 = int(rx-size/2), int(ry-size/2)
for n, fitsname in enumerate(fitsnames):
fitsinfo = fitsInfo(fitsname)
subimage_data = get_subimage(fitsname, centre_coord, size)
subimg_stats = image_stats(subimage_data, test_normality='normaltest')
centre_str = ','.join([str(cc) for cc in centre_coord])
sub_stats ={"RMS": round(subimg_stats['RMS'] * FLUX_UNIT_SCALER[units][0],
DECIMALS),
"STDDev": round(subimg_stats['STDDev'] * FLUX_UNIT_SCALER[units][0],
DECIMALS),
"MAD": round(subimg_stats['MAD'] * FLUX_UNIT_SCALER[units][0],
DECIMALS),
"MIN": round(subimg_stats['MIN'] * FLUX_UNIT_SCALER[units][0],
DECIMALS),
"MAX": round(subimg_stats['MAX'] * FLUX_UNIT_SCALER[units][0],
DECIMALS),
"SUM_NEG": round(subimg_stats['SUM_NEG'] * FLUX_UNIT_SCALER[units][0],
DECIMALS),
"SKEW": round(subimg_stats['SKEW'], DECIMALS),
"KURT": round(subimg_stats['KURT'], DECIMALS),
"NORM": round(subimg_stats['NORM'][0], DECIMALS),
"UNITS": units}
output_dict[f"centre-{centre_str}-{n}"] = {fitsname: sub_stats}
cols = ["Stats", f"Value ({FLUX_UNIT_SCALER[units][1]})"]
stats = {"Stats": ["RMS", "STDDev", "MAD", "MIN", "SUM_NEG",
"MAX", "*SKEW", "*KURT", "*NORM"],
f"Value ({FLUX_UNIT_SCALER[units][1]})":
[sub_stats['RMS'], sub_stats['STDDev'],
sub_stats['MAD'], sub_stats['MIN'],
sub_stats['SUM_NEG'], sub_stats['MAX'],
sub_stats['SKEW'],
sub_stats['KURT'],
sub_stats['NORM']]}
source = ColumnDataSource(data=stats)
columns = [TableColumn(field=x, title=x.capitalize()) for x in cols]
dtab = DataTable(source=source, columns=columns,
width=250, max_width=350,
height=200, max_height=250)
table_title = Div(text="Sub-image Statistics")
table_title.align = "center"
stats_table = column([table_title, dtab])
plot_title = f"{fitsname.split('/')[-1].split('.')[0]} sub-image: {im+1}"
if len(im_subplot_list) > 0:
s1 = im_subplot_list[0]
subplot = figure(title=plot_title,
x_axis_label='Right Ascension (deg)',
y_axis_label='Declination (deg)',
width=plot_width, height=plot_height,
x_range=s1.x_range, y_range=s1.y_range,
tooltips=[("(x, y)", "($x, $y)"),
(f"value ({FLUX_UNIT_SCALER[units][1]})",
"@image")])
else:
# Initial column 1 plot
subplot = figure(title=plot_title,
x_axis_label='Right Ascension (deg)',
y_axis_label='Declination (deg)',
width=plot_width, height=plot_height,
tooltips=[("(x, y)", "($x, $y)"),
(f"value ({FLUX_UNIT_SCALER[units][1]})",
"@image")])
# must give a vector of images
subimage = subimage_data[0,0,:,:]
if svg:
# Save subimages as svg
try:
import matplotlib.pyplot as plt
wcs = fitsinfo['wcs']
ax = plt.subplot(111, projection=wcs, slices=('x','y',0,0))
shw = plt.imshow(subimage * FLUX_UNIT_SCALER[units][0],
extent=[rx_0, rx_0+size, ry_0, ry_0+size],
vmin=-0.1, vmax=1)
outname = fitsname.split('.fits')[0]
bar = plt.colorbar(shw)
plt.xlabel('Right Ascension (hours)',
fontsize=float(x_label_size.split('pt')[0]))
plt.ylabel('Declination (deg)',
fontsize=float(y_label_size.split('pt')[0]))
bar.set_label(f"Flux density ({FLUX_UNIT_SCALER[units][1]})",
fontsize=float(bar_label_size.split('pt')[0]))
plt.savefig(f"{outname}.svg")
print(f"{outname}.svg")
except ImportError:
LOGGER.warn("SVGs are requested but matplotlib is not installed")
LOGGER.warn("RUN: pip install aimfast[svg_images]")
subplot.image(image=[subimage * FLUX_UNIT_SCALER[units][0]],
x=rx_0, y=ry_0, dw=size, dh=size,
palette="Plasma11", level="image")
color_mapper = LinearColorMapper(palette="Plasma11",
low=subimage.min() * FLUX_UNIT_SCALER[units][0],
high=subimage.max() * FLUX_UNIT_SCALER[units][0])
color_bar = ColorBar(color_mapper=color_mapper, width=80, label_standoff=4,
location=(0, 0), orientation='vertical')
color_bar_plot = figure(title=f"Flux Density ({FLUX_UNIT_SCALER[units][1]})",
title_location="right",
height=plot_height, width=8,
toolbar_location=None, min_border=0,
outline_line_color=None)
color_bar_plot.add_layout(color_bar, 'right')
color_bar_plot.title.align="center"
color_bar_plot.title.text_font_size = '10pt'
im_subplot_list.append(subplot)
im_subplot_list.append(stats_table)
subplot_list.append(column(row(im_subplot_list)))
output_file(f"{htmlprefix}_subimage_stats.html", title="subimage plots and stats")
save(column(subplot_list))
return output_dict
[docs]def get_source_properties_from_catalog(catalog_file):
model = get_model(catalog_file)
sources = model.sources
source_properties = {}
for source in sources:
if 'name' not in source_properties.keys():
source_properties['name'] = [source.name]
else:
source_properties['name'].append(source.name)
fluxes = source.get_attr('flux').strAttributes().split(',')
for flux in fluxes:
prop = flux.split('=')[0]
val = float(flux.split('=')[1])
if prop not in source_properties.keys():
source_properties[prop] = [val]
else:
source_properties[prop].append(val)
positions = source.get_attr('pos').strAttributes().split(',')
for pos in positions:
prop = pos.split('=')[0]
if prop not in source_properties.keys():
source_properties[prop] = [rad2deg(getattr(source.pos, prop))]
else:
source_properties[prop].append(rad2deg(getattr(source.pos, prop)))
try:
shapes = source.get_attr('shape').strAttributes().split(',')
except AttributeError:
shapes = ['ex=0', 'ey=0', 'pa=0',
'ex_err=0', 'ey_err=0', 'pa_err=0']
for shape in shapes:
prop = shape.split('=')[0]
val = float(shape.split('=')[1])
if prop not in source_properties.keys():
source_properties[prop] = [val]
else:
source_properties[prop].append(val)
try:
spectrums = source.get_attr('spectrum').strAttributes().split(',')
spectrums.append(f"spi_err={source.spi_error}")
except AttributeError:
spectrums = ['spi=999', 'spi_err=999', 'freq0=999']
for spectrum in spectrums:
prop = spectrum.split('=')[0]
val = float(spectrum.split('=')[1])
if prop not in source_properties.keys():
source_properties[prop] = [val]
else:
source_properties[prop].append(val)
return source_properties
[docs]def plot_model_columns(catalog_file, x, y, x_err=None, y_err=None, svg=False,
x_label=None, y_label=None, title=None, html_prefix=None,
title_size='16pt', x_label_size='12pt', y_label_size='12pt',
legend_size='10pt', xmajor_size='6pt', ymajor_size='6pt',
units='micro'):
"""Plot catalog columns including their uncertainties"""
width, height = 800, 800
if 'lsm.html' in catalog_file:
source_properties = get_source_properties_from_catalog(catalog_file)
else:
data = Table.read(catalog_file)
print('Model not yet supported')
bokeh_source = ColumnDataSource(data=source_properties)
x_y_plotter = figure(x_axis_label=x if not x_label else x_label,
y_axis_label=y if not y_label else y_label,
plot_width=width, plot_height=height,
#tools=TOOLS,
title=f"{catalog_file.split('.')[0]} {x.upper()} vs {y.upper()}"
if not title else title)
x_y_plotter.scatter(x, y, source=bokeh_source,
name='x_y_data')
x_y_plotter.title.align = 'center'
x_y_plotter.title.text_font_size = title_size
x_y_plotter.xaxis.axis_label_text_font_size = x_label_size
x_y_plotter.yaxis.axis_label_text_font_size = y_label_size
#x_y_plotter.legend.label_text_font_size = legend_size
x_y_plotter.xaxis.major_label_text_font_size = xmajor_size
x_y_plotter.yaxis.major_label_text_font_size = ymajor_size
x_y_plotter.axis.axis_label_text_font_style = 'normal'
if x in ['RA', 'ra']:
x_y_plotter.x_range.flipped = True
elif y in ['RA', 'ra']:
x_y_plotter.y_range.flipped = True
# Attaching the hover object with labels
tool_values = []
for p in source_properties.keys():
tool_values.append((p, f'@{p}'))
x_y_plotter.add_tools(HoverTool(tooltips=tool_values))
# create the coordinates for the errorbars
err_xs = []
err_ys = []
xs = source_properties[x]
ys = source_properties[y]
if x_err:
xerrs = source_properties[x_err]
for x, y, xerr in zip(xs, ys, xerrs):
err_xs.append((x - xerr, x + xerr))
err_ys.append((y, y))
x_y_plotter.multi_line(err_xs, err_ys, color='red')
if y_err:
yerrs = source_properties[y_err]
for x, y, yerr in zip(xs, ys, yerrs):
err_xs.append((x, x))
err_ys.append((y - yerr, y + yerr))
x_y_plotter.multi_line(err_xs, err_ys, color='red')
column_list = source_properties.keys()
bokeh_source_table = ColumnDataSource(data=source_properties)
columns = [TableColumn(field=col, title=col) for col in column_list]
dtab = DataTable(source=bokeh_source_table, columns=columns,
width=width, max_width=width + 50,
height=height, max_height=width + 50)
table_title = Div(text="Source Table")
table_title.align = "center"
source_table = column([table_title, dtab])
LOGGER.info(f"Total number of sources: {len(source_properties['name'])}")
if not html_prefix:
output_file_name = f"{catalog_file.split('.')[0]}_column_properties.html"
else:
output_file_name = f"{html_prefix}.html"
LOGGER.info(f"Saving results in {output_file_name}")
output_file(output_file_name)
save(row(source_table, x_y_plotter))
svg = True
if svg:
x_y_plotter.output_backend = "svg"
prefix = '.'.join(output_file_name.split('.')[:-1])
export_svgs(x_y_plotter, filename=f"{prefix}.svg")
[docs]def plot_model_data(catalog_file, html_prefix=''):
"""Plotting catalog table"""
width, height = 1000, 2000
if 'lsm.html' in catalog_file:
source_properties = get_source_properties_from_catalog(catalog_file)
else:
data = Table.read(catalog_file)
print('Model not yet supported')
column_list = source_properties.keys()
bokeh_source_table = ColumnDataSource(data=source_properties)
columns = [TableColumn(field=col, title=col) for col in column_list]
dtab = DataTable(source=bokeh_source_table, columns=columns,
width=width, max_width=width + 50,
height=height, max_height=width + 50)
table_title = Div(text="Source Table")
table_title.align = "center"
source_table = column([table_title, dtab])
LOGGER.info(f"Total number of sources: {len(source_properties['name'])}")
print(html_prefix)
if not html_prefix:
output_file_name = f"{catalog_file.split('.')[0]}_column_properties.html"
else:
output_file_name = f"{html_prefix}.html"
LOGGER.info(f"Saving results in {output_file_name}")
output_file(output_file_name)
save(source_table)
[docs]def get_sf_params(configfile):
import yaml
with open(r'{}'.format(configfile)) as file:
sf_parameters = yaml.load(file, Loader=yaml.FullLoader)
return sf_parameters
[docs]def source_finding(sf_params, sf=None):
outfile = None
aegean_sf = sf_params.pop('aegean')
pybd_sf = sf_params.pop('pybdsf')
enable_aegean = aegean_sf.pop('enable')
enable_pybdsf = pybd_sf.pop('enable')
if enable_pybdsf or sf in ['pybdsf']:
filename = pybd_sf['filename']
LOGGER.info(f"Running pybdsf source finder on image: {filename}")
outfile = bdsf(filename, pybd_sf, LOGGER)
elif enable_aegean or sf in ['aegean']:
filename = aegean_sf['filename']
LOGGER.info(f"Running aegean source finder on image: {filename}")
outfile = aegean(filename, aegean_sf, LOGGER)
else:
LOGGER.warn(f"{WARNING}No source finder selected.{ENDC}")
return outfile
[docs]def get_argparser():
"""Get argument parser."""
parser = argparse.ArgumentParser(
description=("Examine radio image fidelity and source recovery by obtaining: \n"
"- The four (4) moments of a residual image \n"
"- The Dynamic range in restored image \n"
"- Comparing the fits images by running source finder \n"
"- Comparing the tigger models and online catalogs (NVSS, SUMSS) \n"
"- Comparing the on source/random residuals to noise \n"
"- Comparing residual stats from sub-images"))
subparser = parser.add_subparsers(dest='subcommand')
sf = subparser.add_parser('source-finder')
sf.add_argument('-c', '--config', dest='config',
help='Config file to run source finder of choice (YAML format)')
sf.add_argument('-gc', '--generate-config', dest='generate',
help='Genrate config file to run source finder of choice')
argument = partial(parser.add_argument)
argument('-v', "--version", action='version',
version='{0:s} version {1:s}'.format(parser.prog, _version))
# Inputs to analyse
argument('--compare-models', dest='models', nargs=2, action='append',
help='List of tigger model (text/lsm.html) files to compare \n'
'e.g. --compare-models model1.lsm.html model2.lsm.html')
argument('--compare-images', dest='images', nargs=2, action='append',
help='List of restored image (fits) files to compare. \n'
'Note that this will initially run a source finder. \n'
'e.g. --compare-images image1.fits image2.fits')
argument('--compare-online', dest='online', nargs=1, action='append',
help='List of catalog models (html/ascii, fits) restored image (fits)'
' files to compare with online catalog. \n'
'e.g. --compare-online image1.fit')
argument('--compare-residuals', dest='noise', nargs=2, action='append',
help='List of noise-like (fits) files to compare \n'
'e.g. --compare-residuals residual1.fits residual2.fits')
argument('--compare-residual-subimages', dest='subimage_noise',
nargs='+', action='append',
help='List of noise-like (fits) files to compare \n'
'e.g. --compare-residuals residual1.fits residual2.fits')
argument('-catalog', '--tigger-model', dest='model',
help='Name of the tigger model lsm.html file or any supported catalog')
argument('--restored-image', dest='restored',
help='Name of the restored image fits file')
argument('-psf', '--psf-image', dest='psf',
help='Name of the point spread function file or psf size in arcsec')
argument('--residual-image', dest='residual',
help='Name of the residual image fits file')
argument('--mask-image', dest='mask',
help='Name of the mask image fits file')
argument('-fdr', '--fidelity-results', dest='json',
help='aimfast fidelity results file (JSON format)')
argument('-reg', '--input-regions', dest='reg',
help='Region file with regions to generate stats)')
# Source finding
argument('-c', '--config', dest='config',
help='Config file to run source finder of choice (YAML format)')
argument('-sf', '--source-finder', dest='sourcery',
choices=('aegean', 'pybdsf'), default='pybdsf',
help='Source finder to run if comparing restored images')
# Online catalog query
argument("--online-catalog-name", dest='catalog_name',
help='Prefix of output catalog file name')
argument('-oc', '--online-catalog', dest='online_catalog',
choices=('sumss', 'nvss'), default='nvss',
help='Online catalog to compare local image/model.')
argument('-ptc', '--centre_coord', dest='centre_coord',
default="0:0:0, -30:0:0",
help='Centre of online catalog to compare local image/model \n'
'in "RA hh:mm:ss, Dec deg:min:sec".')
argument('-w', '--width', dest='width',
help='Field of view width to querry online catalog in degrees.'
'e.g. -w 3.0d')
# Image stats parameters
argument('--normality-test', dest='test_normality',
choices=('shapiro', 'normaltest'),
help='Name of model to use for normality testing. \n'
'options: [shapiro, normaltest] \n'
'NB: normaltest is the D`Agostino')
argument('-dr', '--data-range', dest='data_range',
help='Data range to perform normality testing')
argument('-thresh', '--threshold', dest='thresh',
help='Get stats of channels with pixel flux above thresh in Jy/Beam. \n'
'Also this can be used to filter out sources from online catalog')
argument('-chans', '--channels', dest='channels',
help='Get stats of specified channels e.g. "10~20;100~1000"')
argument('-cps', '--centre-pixels-size', dest='centre_pix_size',
nargs='+', action='append',
help='List of subimage centre pixels and their sizes to compute stats. \n'
'e.g. 500,500,20 200,10,5')
# Formatting
argument('-dp', '--data-points', dest='points',
help='Data points to sample the residual/noise image')
argument('-fp', '--flux-plot', dest='fluxplot', default='log',
choices=('log', 'snr', 'inout'),
help='Type of plot for flux comparison of the two catalogs')
argument('-units', '--units', dest='units', default="jansky",
choices=('jansky', 'milli', 'micro', 'nano'),
help='Units to represent the results')
argument('-deci', '--decimals', dest='deci', default=2,
help='Number of decimal places to round off results')
argument('-oa', '--only-off-axis', dest='off_axis', default=None,
help='Plot only cross-matched sources with distance from the phase centre'
' less than this value')
argument('-af', '--area-factor', dest='factor', type=float, default=2,
help='Factor to multiply the beam area to get target peak area')
argument('-fov', '--fov-factor', dest='fov_factor', type=float, default=0.9,
help='Factor to multiply the field of view for random points. i.e. 0.0-1.0')
argument('-tol', '--tolerance', dest='tolerance', type=float, default=0.2,
help='Tolerance to cross-match sources in arcsec')
argument('-as', '--all-source', dest='all', default=False, action='store_true',
help='Compare all sources irrespective of shape, otherwise only '
'point-like sources are compared')
argument('-closest', '--closest', dest='closest_only', default=False, action='store_true',
help='Use the closest source only when cross matching sources')
argument('-sl', '--shape-limit', dest='shape_limit', default=6.0,
help='Cross-match only sources with a maj-axis equal or less than this value')
argument("--label",
help='Use this label instead of the FITS image path when saving '
'data as JSON file')
# Plot labelling for basic catalog plotting
argument('-x', '--x-col-data', dest='x_col',
help='Catalog column name to plot on the x-axis')
argument('-y', '--y-col-data', dest='y_col',
help='Catalog column name to plot on the y-axis')
argument('-x-err', '--x-col-err-data', dest='x_col_err',
help='Catalog column name to plot error data on the x-axis')
argument('-y-err', '--y-col-err-data', dest='y_col_err',
help='Catalog column name to plot error data on the y-axis')
argument('-x-label', '--x-label', dest='x_label',
help='x-axis labels for the plot')
argument('-y-label', '--y-label', dest='y_label',
help='y-axis labels for the plots')
argument('-title', '--plot-title', dest='title',
help="Title label for the basic catalog plot")
# Plot labelling for the flux comparison plotting
argument('-fx', '--flux-xlabels', dest='fxlabels', nargs='+',
help="x-axis labels for the Flux plots")
argument('-fy', '--flux-ylabels', dest='fylabels', nargs='+',
help="y-axis labels for the Flux plots")
argument('-ftitle', '--flux-plot-title', dest='ftitles', nargs='+',
help="Title labels for the Flux plots")
# Plot labelling for the position (comparison & overlay) plotting
argument('-px1', '--position-xlabels1', dest='pxlabels1', nargs='+',
help="x-axis labels for the position plots")
argument('-py1', '--position-ylabels1', dest='pylabels1', nargs='+',
help="y-axis labels for the comparison position plots")
argument('-ptitle1', '--position-plot-title1', dest='ptitles1', nargs='+',
help="Title labels for the comparison position plots")
argument('-px2', '--position-xlabels2', dest='pxlabels2', nargs='+',
help="x-axis labels for the overlay position plots")
argument('-py2', '--position-ylabels2', dest='pylabels2', nargs='+',
help="y-axis labels for the overlay position plots")
argument('-ptitle2', '--position-plot-title2', dest='ptitles2', nargs='+',
help="Title labels for the overlay position plots")
# Plot labelling sizes for all plots
argument('-bar-major-size', '--colorbar-major-labels-size', dest='bar_major_size', default='6pt',
help="x-axis label size for plots")
argument('-bar-size', '--colorbar-labels-size', dest='barsize', default='14pt',
help="x-axis label size for plots")
argument('-x-size', '--xlabels-size', dest='xsize', default='14pt',
help="x-axis label size for plots")
argument('-y-size', '--ylabels-size', dest='ysize', default='14pt',
help="y-axis label size for plots")
argument('-x-maj-size', '--x-major-labels-size', dest='xmaj_size', default='6pt',
help="x-axis major label size for plots")
argument('-y-maj-size', '--y-mojar-labels-size', dest='ymaj_size', default='6pt',
help="y-axis major label size for plots")
argument('-legend-size', '--legend-font-size', dest='legsize', default='14pt',
help="Label size for legends on the plots")
argument('-title-size', '--plot-title-size', dest='tsize', default='18pt',
help="Title label size for plots")
# Outputs
argument("--html-prefix", dest='htmlprefix',
help='Prefix of output html files. Default: None.')
argument("--outfile",
help='Name of output file name. Default: fidelity_results.json')
argument('-svg', '--save-svg', dest='svg', default=False, action='store_true',
help='Save plots in SVG format.')
return parser
[docs]def main():
"""Main function."""
LOGGER.info("Welcome to AIMfast")
LOGGER.info(f"Version: {_version}")
_command = ' '.join(sys.argv)
LOGGER.info(f"Command: {_command}")
output_dict = dict()
parser = get_argparser()
args = parser.parse_args()
# Print default args
LOGGER.info(' '.join(f'{k}={v}' for k, v in vars(args).items()))
DECIMALS = args.deci
svg = args.svg
if args.subcommand:
if args.config:
source_finding(get_sf_params(args.config))
if args.generate:
generate_default_config(args.generate)
elif args.json:
plot_aimfast_stats(args.json, prefix=args.htmlprefix)
elif not args.residual and not args.restored and not args.model \
and not args.models and not args.noise and not args.images \
and not args.subimage_noise and not args.online and not args.json:
LOGGER.warn(f"{R}No arguments file(s) provided.{W}")
LOGGER.warn(f"{R}Or 'aimfast -h' for arguments.{W}")
if args.label:
residual_label = "{0:s}-residual".format(args.label)
restored_label = "{0:s}-restored".format(args.label)
model_label = "{0:s}-model".format(args.label)
else:
residual_label = args.residual
restored_label = args.restored
model_label = args.model
if args.model and args.x_col and args.y_col:
plot_model_columns(args.model, args.x_col, args.y_col,
args.x_col_err, args.y_col_err,
x_label=args.x_label,
y_label=args.y_label,
title=args.title,
title_size=args.tsize,
x_label_size=args.xsize,
y_label_size=args.ysize,
legend_size=args.legsize,
xmajor_size=args.xmaj_size,
ymajor_size=args.ymaj_size,
units=args.units,
html_prefix=args.htmlprefix)
if args.model and not args.noise and args.residual:
if not args.residual:
raise RuntimeError(f"{R}Please provide residual fits file{W}")
if args.psf:
psf_val = args.psf.replace(".", "", 1)
if psf_val.isdigit():
psf_size = float(args.psf)
else:
psf_size = measure_psf(args.psf)
else:
psf_size = 6
LOGGER.warning(f"{R}Please provide psf fits file or psf size.\n"
"Otherwise a default beam size of six (~6``) asec "
f"is used{W}")
if args.factor:
DR = model_dynamic_range(args.model, args.residual, psf_size,
area_factor=args.factor)
else:
DR = model_dynamic_range(args.model, args.residual, psf_size)
if args.test_normality in ['shapiro', 'normaltest']:
stats = residual_image_stats(args.residual,
args.test_normality,
args.data_range,
args.thresh,
args.channels,
args.mask)
else:
if not args.test_normality:
stats = residual_image_stats(args.residual,
args.test_normality,
args.data_range,
args.thresh,
args.channels,
args.mask)
else:
LOGGER.error(f"{R}Please provide correct normality model{W}")
stats.update({model_label: {
'DR' : DR["global_rms"],
'DR_deepest_negative' : DR["deepest_negative"],
'DR_global_rms' : DR['global_rms'],
'DR_local_rms' : DR['local_rms']}})
output_dict[residual_label] = stats
elif args.residual and not args.reg:
if args.residual not in output_dict.keys():
if args.test_normality in ['shapiro', 'normaltest']:
stats = residual_image_stats(args.residual,
args.test_normality,
args.data_range,
args.thresh,
args.channels,
args.mask)
else:
if not args.test_normality:
stats = residual_image_stats(args.residual,
args.test_normality,
args.data_range,
args.thresh,
args.channels,
args.mask)
else:
LOGGER.error(f"{R}Please provide correct normality model{W}")
output_dict[residual_label] = stats
if args.restored and args.residual:
if args.factor:
DR = image_dynamic_range(args.restored, args.residual,
area_factor=args.factor)
else:
DR = image_dynamic_range(args.restored, args.residual)
output_dict[restored_label] = {
'DR' : DR["global_rms"],
'DR_deepest_negative' : DR["deepest_negative"],
'DR_global_rms' : DR['global_rms'],
'DR_local_rms' : DR['local_rms']}
if args.models:
models = args.models
LOGGER.info(f"Number of model pair(s) to compare: {len(models)}")
if len(models) < 1:
LOGGER.warn(f"{R}Can only compare two models at a time.{W}")
else:
models_list = []
for i, comp_mod in enumerate(models):
model1, model2 = comp_mod[0], comp_mod[1]
models_list.append(
[dict(label="{}-model_a_{}".format(args.label, i),
path=model1),
dict(label="{}-model_b_{}".format(args.label, i),
path=model2)],
)
output_dict = compare_models(models_list,
tolerance=args.tolerance,
off_axis=args.off_axis,
all_sources=args.all,
shape_limit=args.shape_limit,
closest_only=args.closest_only,
prefix=args.htmlprefix,
flux_plot=args.fluxplot,
ftitles=args.ftitles,
fxlabels=args.fxlabels,
fylabels=args.fylabels,
title_size=args.tsize,
x_label_size=args.xsize,
y_label_size=args.ysize,
legend_size=args.legsize,
xmajor_size=args.xmaj_size,
ymajor_size=args.ymaj_size,
bar_size=args.barsize,
bar_major_size=args.bar_major_size,
svg=svg)
if args.noise:
residuals = args.noise
LOGGER.info(f"Number of residual pairs to compare: {len(residuals)}")
if len(residuals) < 1:
LOGGER.error(f"{R}Can only compare atleast one residual pair.{W}")
else:
residuals_list = []
for i, comp_res in enumerate(residuals):
res1, res2 = comp_res[0], comp_res[1]
residuals_list.append(
[dict(label="{}-res_a_{}".format(args.label, i),
path=res1),
dict(label="{}-res_b_{}".format(args.label, i),
path=res2)],
)
if args.model:
output_dict = compare_residuals(residuals_list,
args.model,
units=args.units,
title_size=args.tsize,
legend_size=args.legsize,
xmajor_size=args.xmaj_size,
ymajor_size=args.ymaj_size,
x_label_size=args.xsize,
y_label_size=args.ysize,
area_factor=args.factor,
prefix=args.htmlprefix)
else:
output_dict = compare_residuals(
residuals_list,
area_factor=args.factor,
fov_factor=args.fov_factor,
units=args.units,
title_size=args.tsize,
xmajor_size=args.xmaj_size,
ymajor_size=args.ymaj_size,
legend_size=args.legsize,
x_label_size=args.xsize,
y_label_size=args.ysize,
prefix=args.htmlprefix,
points=int(args.points) if args.points else 100)
if args.images:
configfile = args.config
if not configfile:
configfile = 'default_sf_config.yml'
generate_default_config(configfile)
images = args.images
sourcery = args.sourcery
images_list = []
for i, comp_ims in enumerate(images):
if args.mask:
image1, image2 = get_image_products(comp_ims, args.mask)
else:
image1, image2 = comp_ims[0], comp_ims[1]
sf_params1 = get_sf_params(configfile)
sf_params1[sourcery]['filename'] = image1
out1 = source_finding(sf_params1, sourcery)
sf_params2 = get_sf_params(configfile)
sf_params2[sourcery]['filename'] = image2
out2 = source_finding(sf_params2, sourcery)
images_list.append(
[dict(label="{}-model_a_{}".format(args.label, i),
path=out1),
dict(label="{}-model_b_{}".format(args.label, i),
path=out2)])
output_dict = compare_models(images_list,
tolerance=args.tolerance,
off_axis=args.off_axis,
shape_limit=args.shape_limit,
all_sources=args.all,
closest_only=args.closest_only,
prefix=args.htmlprefix,
flux_plot=args.fluxplot,
ftitles=args.ftitles,
fxlabels=args.fxlabels,
fylabels=args.fylabels,
title_size=args.tsize,
x_label_size=args.xsize,
y_label_size=args.ysize,
legend_size=args.legsize,
xmajor_size=args.xmaj_size,
ymajor_size=args.ymaj_size,
svg=svg)
if args.online:
models = args.online
sourcery = args.sourcery
threshold = args.thresh
width = args.width or '5.0d'
LOGGER.info(f'Using sky width of {width}')
catalog_prefix = args.catalog_name or 'default'
online_catalog = args.online_catalog
catalog_name = f"{catalog_prefix}_{online_catalog}_catalog_table.txt"
images_list = []
LOGGER.info(f'Extracting phase centre coordinates form {models[0][0]}')
if models[0][0].endswith('.html'):
Tigger_model = Tigger.load(models[0][0])
centre_ra_deg, centre_dec_deg = _get_phase_centre(Tigger_model)
centre_coord = deg2ra(centre_ra_deg) + ',' + deg2dec(centre_dec_deg)
centre_coord = centre_coord.split(',')
elif models[0][0].endswith('.fits'):
centre_ra_deg, centre_dec_deg = fitsInfo(models[0][0])['centre']
centre_coord = deg2ra(centre_ra_deg) + ',' + deg2dec(centre_dec_deg)
centre_coord = centre_coord.split(',')
else:
if args.centre_coord:
centre_coord = args.centre_coord.split(',')
else:
LOGGER.error('Please supply central coordinates using -ptc. See --help')
LOGGER.info(f'Quering the {online_catalog} catalog with width of {width} at {centre_coord}')
table = get_online_catalog(catalog=online_catalog.upper(), centre_coord=centre_coord,
width='5.0d', thresh=threshold, catalog_table=catalog_name)
if table:
for i, ims in enumerate(models):
image1 = ims[0]
if image1.endswith('.fits'):
configfile = 'default_sf_config.yml'
generate_default_config(configfile)
sf_params1 = get_sf_params(configfile)
sf_params1[sourcery]['filename'] = image1
out1 = source_finding(sf_params1, sourcery)
image1 = out1
images_list.append(
[dict(label="{}-model_a_{}".format(args.label, i),
path=image1),
dict(label="{}-model_b_{}".format(args.label, i),
path=catalog_name)])
output_dict = compare_models(images_list,
tolerance=args.tolerance,
shape_limit=args.shape_limit,
off_axis=args.off_axis,
all_sources=args.all,
closest_only=args.closest_only,
prefix=args.htmlprefix,
flux_plot=args.fluxplot,
ftitles=args.ftitles,
fxlabels=args.fxlabels,
fylabels=args.fylabels,
title_size=args.tsize,
x_label_size=args.xsize,
y_label_size=args.ysize,
legend_size=args.legsize,
xmajor_size=args.xmaj_size,
ymajor_size=args.ymaj_size,
svg=svg)
else:
LOGGER.warn(f'No object found around (ICRS) position {centre_coord}')
if args.subimage_noise:
centre_coords = []
output_dict = {}
sizes = []
if args.centre_pix_size:
for cps in args.centre_pix_size[0]:
centre_pix = (int(cps.split(',')[0]), int(cps.split(',')[1]))
centre_coords.append(centre_pix)
sizes.append(int(cps.split(',')[-1]))
print(args.svg)
output_dict = plot_subimage_stats(args.subimage_noise[0],
centre_coords, sizes,
units=args.units,
svg=args.svg,
title_size=args.tsize,
x_label_size=args.xsize,
y_label_size=args.ysize,
bar_label_size=args.barsize,
htmlprefix=(args.htmlprefix
if args.htmlprefix else 'default'))
else:
LOGGER.error(f"{R}Provide Centre coordinates in pixels "
f"and size of subimage(s).{W}")
if args.reg:
centre_coords = []
stats = get_region_stats(args.residual, args.reg)
output_dict[residual_label] = stats
if output_dict:
if args.outfile:
json_dump(output_dict, filename=args.outfile)
else:
json_dump(output_dict)