import warnings
import astropy.constants as const
import astropy.units as un
import numpy as np
from astropy import wcs
from astropy.io import fits
from astropy.time import Time
from astropy.utils import iers
import pyvisgen.layouts.layouts as layouts
[docs]
def create_vis_hdu(data, obs, source_name="sim-source-0") -> fits.GroupsHDU:
"""Creates the visibility HDU for the given visibility
data and observation.
Parameters
----------
data : :class:`~pyvisgen.simulation.Visibilities`
:class:`~pyvisgen.simulation.Visibilities` object
containing visibility data.
obs : :class:`~pyvisgen.simulation.Observation`
:class:`~pyvisgen.simulation.Observation` class object
containing information on the observation, such as
baselines.
source_name : str, optional
Source name saved to the ``OBJECT`` key inside the
FITS file. Default: ``'sim-source-0'``
Returns
-------
hdu_vis : :class:`~astropy.io.fits.GroupsHDU`
:class:`~astropy.io.fits.GroupsHDU` containing visibility
data for the FITS file.
"""
u = data.u
v = data.v
w = data.w
DATE = np.trunc(data.date)
_DATE = data.date % 1
BASELINE = data.base_num
FREQSEL = np.repeat(np.array([1.0], dtype=">f4"), len(u))
# visibility data
values = data.get_values()
vis = np.stack(
[
values.real,
values.imag,
data.weights.unsqueeze(2).expand(values.real.shape),
], # real, imag, weights [n, freqs, pol]
axis=3,
)[:, None, None, None, ...]
DATA = vis
# in dim 4 = IFs , dim = 1, dim 4 = number of jones, 3 = real, imag, weight
# wcs
ra = obs.ra.cpu().numpy().item()
dec = obs.dec.cpu().numpy().item()
freq = obs.ref_frequency.cpu().numpy().item()
freq_d = obs.bandwidths[0].cpu().numpy().item()
ws = wcs.WCS(naxis=7)
crval_stokes = -1
stokes_comment = "-1=RR, -2=LL, -3=RL, -4=LR"
if obs.polarization == "linear":
crval_stokes = -5
stokes_comment = "-5=XX, -6=YY, -7=XY, -8=YX"
stokes_comment += " or -5=VV, -6=HH, -7=VH, -8=HV"
ws.wcs.crpix = [0, 1, 1, 1, 1, 1, 1]
ws.wcs.crota = [0, 0, 0, 0, 0, 0, 0]
ws.wcs.cdelt = [1, 1, -1, freq_d, 1, 1, 1]
ws.wcs.crval = [0, 1, crval_stokes, freq, 1, ra, dec]
ws.wcs.ctype = ["", "COMPLEX", "STOKES", "FREQ", "IF", "RA", "DEC"]
ws.wcs.specsys = "TOPOCENTER"
ws.wcs.radesys = "FK5"
ws.wcs.equinox = 2000.0
ws.wcs.dateobs = obs.start.isot
ws.wcs.mjdobs = obs.start.mjd
h = ws.to_header()
u_scale = u / const.c
v_scale = v / const.c
w_scale = w / const.c
groupdata_vis = fits.GroupData(
DATA,
bitpix=-32,
parnames=[
"UU",
"VV",
"WW",
"DATE",
"DATE",
"BASELINE",
"FREQSEL",
],
pardata=[u_scale, v_scale, w_scale, DATE, _DATE, BASELINE, FREQSEL],
)
hdu_vis = fits.GroupsHDU(groupdata_vis, header=h)
# add scales and zeors
scale = 1.0
zero = 0.0
parbscales = [scale, scale, scale, scale, scale, scale, scale]
parbzeros = [zero, zero, zero, zero, zero, zero, zero]
for i in range(len(parbscales)):
hdu_vis.header["PSCAL" + str(i + 1)] = parbscales[i]
hdu_vis.header["PZERO" + str(i + 1)] = parbzeros[i]
# add comments
hdu_vis.header.comments["CTYPE2"] = "1=real, 2=imag, 3=weight"
hdu_vis.header.comments["CTYPE3"] = stokes_comment
hdu_vis.header.comments["PTYPE1"] = "u baseline coordinate in light seconds"
hdu_vis.header.comments["PTYPE2"] = "v baseline coordinate in light seconds"
hdu_vis.header.comments["PTYPE3"] = "w baseline coordinate in light seconds"
hdu_vis.header.comments["PTYPE4"] = "Baseline number"
hdu_vis.header.comments["PTYPE5"] = "Julian date"
hdu_vis.header.comments["PTYPE6"] = "Relative Julian date ?"
hdu_vis.header.comments["PTYPE7"] = "Integration time"
date_obs = obs.start.strftime("%Y-%m-%d")
date_map = Time.now().to_value(format="iso", subfmt="date")
# add additional keys
hdu_vis.header["EXTNAME"] = ("AIPS UV", "AIPS UV")
hdu_vis.header["EXTVER"] = (1, "Version number of table")
hdu_vis.header["OBJECT"] = (source_name, "Source name")
hdu_vis.header["TELESCOP"] = (obs.layout, "Telescope name")
hdu_vis.header["INSTRUME"] = (obs.layout, "Instrument name (receiver or ?)")
hdu_vis.header["DATE-OBS"] = (date_obs, "Observation date")
hdu_vis.header["DATE-MAP"] = (date_map, "File processing date")
hdu_vis.header["EPOCH"] = (2000.0, "")
hdu_vis.header["BSCALE"] = (1.0, "Always 1")
hdu_vis.header["BZERO"] = (0.0, "Always 0")
hdu_vis.header["BUNIT"] = ("UNCALIB", "Units of flux")
hdu_vis.header["ALTRPIX"] = (1.0, "Reference pixel for velocity")
hdu_vis.header["OBSRA"] = (ra, "Antenna pointing Ra")
hdu_vis.header["OBSDEC"] = (dec, "Antenna pointing Dec")
return hdu_vis
[docs]
def create_time_hdu(data) -> fits.BinTableHDU:
"""Creates the time HDU for the FITS file.
Parameters
----------
data : :class:`~pyvisgen.simulation.Visibilities`
:class:`~pyvisgen.simulation.Visibilities` object
containing visibility and time data.
Returns
-------
hdu_vis : :class:`~astropy.io.fits.BinTableHDU`
:class:`~astropy.io.fits.BinTableHDU` containing time
data for the FITS file.
"""
TIME = np.array(
[data.date.mean() - int(data.date.min())],
dtype=">f4",
)
col1 = fits.Column(name="TIME", format="1E", unit="days", array=TIME)
TIME_INTERVAL = np.array(
[data.date.max() - data.date.min()],
dtype=">f4",
)
col2 = fits.Column(
name="TIME INTERVAL", format="1E", unit="days", array=TIME_INTERVAL
)
SOURCE_ID = np.ones((1), dtype=">i4") # always the same source
col3 = fits.Column(name="SOURCE ID", format="1J", unit=" ", array=SOURCE_ID)
SUBARRAY = np.ones((1), dtype=">i4") # always same array
col4 = fits.Column(name="SUBARRAY", format="1J", unit=" ", array=SUBARRAY)
FREQ_ID = np.ones((1), dtype=">i4") # always same frequencies
col5 = fits.Column(name="FREQ ID", format="1J", unit=" ", array=FREQ_ID)
START_VIS = np.array(
[data.num.min()],
dtype=">i4",
)
col6 = fits.Column(name="START VIS", format="1J", unit=" ", array=START_VIS)
END_VIS = np.array(
[data.num.max()],
dtype=">i4",
)
col7 = fits.Column(name="END VIS", format="1J", unit=" ", array=END_VIS)
coldefs_time = fits.ColDefs([col1, col2, col3, col4, col5, col6, col7])
hdu_time = fits.BinTableHDU.from_columns(coldefs_time)
# add additional keywords
hdu_time.header["EXTNAME"] = ("AIPS NX", "AIPS NX")
hdu_time.header["EXTVER"] = (1, "Version number of table")
# add comments
hdu_time.header.comments["TTYPE1"] = "Center time of interval"
hdu_time.header.comments["TTYPE2"] = "Duration of interval"
hdu_time.header.comments["TTYPE3"] = "Source number"
hdu_time.header.comments["TTYPE4"] = "Subarray"
hdu_time.header.comments["TTYPE5"] = "Frequency setup ID number"
hdu_time.header.comments["TTYPE6"] = "First visibility number"
hdu_time.header.comments["TTYPE7"] = "End visibility number"
return hdu_time
[docs]
def create_frequency_hdu(obs) -> fits.BinTableHDU:
"""Creates the frequency HDU of the FITS file.
Parameters
----------
obs : :class:`~pyvisgen.simulation.Observation`
:class:`~pyvisgen.simulation.Observation` class object
containing information on the observation, including
frequency data.
Returns
-------
hdu_freq : :class:`~astropy.io.fits.BinTableHDU`
:class:`~astropy.io.fits.BinTableHDU` containing
frequency data for the FITS file.
"""
FRQSEL = np.array([1], dtype=">i4")
col1 = fits.Column(name="FRQSEL", format="1J", array=FRQSEL)
IF_FREQ = np.array(
[0.0],
dtype=">f8",
) # start with 0, add ch_with per IF
col2 = fits.Column(name="IF FREQ", format="1D", unit="Hz", array=IF_FREQ)
CH_WIDTH = np.array([obs.bandwidths[0].cpu().numpy()], dtype=">f4")
col3 = fits.Column(name="CH WIDTH", format="1E", unit="Hz", array=CH_WIDTH)
TOTAL_BANDWIDTH = np.array(
[(obs.bandwidths[0] * len(obs.bandwidths)).cpu().numpy()], dtype=">f4"
)
col4 = fits.Column(
name="TOTAL BANDWIDTH",
format=str(TOTAL_BANDWIDTH.shape[-1]) + "E",
unit="Hz",
array=TOTAL_BANDWIDTH,
)
SIDEBAND = np.array([1])
col5 = fits.Column(name="SIDEBAND", format="1J", unit=" ", array=SIDEBAND)
coldefs_freq = fits.ColDefs([col1, col2, col3, col4, col5])
hdu_freq = fits.BinTableHDU.from_columns(coldefs_freq)
# add additional keywords
hdu_freq.header["EXTNAME"] = ("AIPS FQ", "AIPS FQ")
hdu_freq.header["EXTVER"] = (1, "Version number of table")
# add comments
hdu_freq.header.comments["TTYPE1"] = "Frequency setup ID number"
hdu_freq.header.comments["TTYPE2"] = "Frequency offset"
hdu_freq.header.comments["TTYPE3"] = "Spectral channel separation"
hdu_freq.header.comments["TTYPE4"] = "Total width of spectral window"
hdu_freq.header.comments["TTYPE5"] = "Sideband"
return hdu_freq
[docs]
def create_antenna_hdu(obs) -> fits.BinTableHDU:
"""Creates the antenna HDU for the FITS file.
Parameters
----------
obs : :class:`~pyvisgen.simulation.Observation`
:class:`~pyvisgen.simulation.Observation` class object
containing information on the observation, including
antenna data.
Returns
-------
hdu_ant : :class:`~astropy.io.fits.BinTableHDU`
:class:`~astropy.io.fits.BinTableHDU` containing
antenna data for the FITS file.
"""
array = layouts.get_array_layout(obs.layout, writer=True)
ANNAME = np.array(array["station_name"].values, dtype="U8")
col1 = fits.Column(name="ANNAME", format="8A", array=ANNAME)
STABXYZ = np.array([array["X"], array["Y"], array["Z"]], dtype=">f8").T
col2 = fits.Column(name="STABXYZ", format="3D", unit="METERS", array=STABXYZ)
ORBPARM = np.array([None], dtype=">f8")
col3 = fits.Column(name="ORBPARM", format="0D", unit=" ", array=ORBPARM)
NOSTA = np.arange(len(array), dtype=">i4") + 1
col4 = fits.Column(name="NOSTA", format="1J", unit=" ", array=NOSTA)
MNTSTA = np.zeros(len(array), dtype=">i4")
col5 = fits.Column(name="MNTSTA", format="1J", unit=" ", array=MNTSTA)
STAXOF = np.zeros(len(array), dtype=">f4")
col6 = fits.Column(name="STAXOF", format="1E", unit="METERS", array=STAXOF)
POLTYA = np.full(len(array), "X", dtype="U")
col7 = fits.Column(name="POLTYA", format="1A", unit=" ", array=POLTYA)
POLAA = np.ones(len(array), dtype=">f4") * -90
col8 = fits.Column(name="POLAA", format="1E", unit="DEGREES", array=POLAA)
POLCALA = np.zeros((len(array)), dtype=">f4")
col9 = fits.Column(name="POLCALA", format="1E", unit=" ", array=POLCALA)
POLTYB = np.full(len(array), "Y", dtype="U")
col10 = fits.Column(name="POLTYB", format="1A", unit=" ", array=POLTYB)
POLAB = np.ones(len(array), dtype=">f4") * -90
col11 = fits.Column(name="POLAB", format="1E", unit="DEGREES", array=POLAB)
POLCALB = np.zeros((len(array)), dtype=">f4")
col12 = fits.Column(name="POLCALB", format="1E", unit=" ", array=POLCALB)
DIAMETER = np.array(array["dish_dia"].values, dtype=">f4")
col13 = fits.Column(name="DIAMETER", format="1E", unit="METERS", array=DIAMETER)
coldefs_ant = fits.ColDefs(
[
col1,
col2,
col3,
col4,
col5,
col6,
col7,
col8,
col9,
col10,
col11,
col12,
col13,
]
)
hdu_ant = fits.BinTableHDU.from_columns(coldefs_ant)
freq = (obs.ref_frequency.cpu().numpy() * un.Hz).value
ref_date = Time(
obs.start.isot.split("T")[0] + "T0:00:00.000", format="isot", scale="utc"
)
iers_b = iers.IERS_B.open()
# add additional keywords
hdu_ant.header["EXTNAME"] = ("AIPS AN", "AIPS table file")
hdu_ant.header["EXTVER"] = (1, "Version number of table")
hdu_ant.header["ARRAYX"] = (0, "x coordinate of array center (meters)")
hdu_ant.header["ARRAYY"] = (0, "y coordinate of array center (meters)")
hdu_ant.header["ARRAYZ"] = (0, "z coordinate of array center (meters)")
hdu_ant.header["GSTIA0"] = (
ref_date.sidereal_time("apparent", "greenwich").deg,
"GST at 0h on reference date (degrees)",
)
hdu_ant.header["DEGPDY"] = (
360.98564497329994,
"Earth's rotation rate (degrees/day)",
)
hdu_ant.header["FREQ"] = (freq, "Reference frequency (Hz)")
hdu_ant.header["RDATE"] = (
ref_date.to_value(format="iso", subfmt="date_hms"),
"Reference date",
)
hdu_ant.header["POLARX"] = (
iers_b.pm_xy(ref_date)[0].value,
"x coordinate of North Pole (arc seconds)",
)
hdu_ant.header["POLARY"] = (
iers_b.pm_xy(ref_date)[1].value,
"y coordinate of North Pole (arc seconds)",
)
hdu_ant.header["UT1UTC"] = (iers_b.ut1_utc(ref_date).value, "UT1 - UTC (sec)")
hdu_ant.header["DATUTC"] = (0, "time system - UTC (sec)") # missing
hdu_ant.header["TIMSYS"] = ("UTC", "Time system")
hdu_ant.header["ARRNAM"] = (obs.layout, "Array name")
hdu_ant.header["XYZHAND"] = ("RIGHT", "Handedness of station coordinates")
hdu_ant.header["FRAME"] = ("ITRF", "Coordinate frame")
hdu_ant.header["NUMORB"] = (0, "Number orbital parameters in table (n orb)")
hdu_ant.header["NOPCAL"] = (
2,
"Number of polarization calibration values / IF(n pcal)",
)
hdu_ant.header["NO_IF"] = (1, "Number IFs (n IF)")
hdu_ant.header["FREQID"] = (1, "Frequency setup number")
hdu_ant.header["IATUTC"] = (
ref_date.tai.ymdhms[-1],
"Difference between TAI and UTC",
)
hdu_ant.header["POLTYPE"] = (" ", "Type of polarization calibration")
# add comments
hdu_ant.header.comments["TTYPE1"] = "Antenna name"
hdu_ant.header.comments["TTYPE2"] = "Antenna station coordinates (x, y, z)"
hdu_ant.header.comments["TTYPE3"] = "Orbital parameters"
hdu_ant.header.comments["TTYPE4"] = "Antenna number"
hdu_ant.header.comments["TTYPE5"] = "Mount type"
hdu_ant.header.comments["TTYPE6"] = "Axis offset"
hdu_ant.header.comments["TTYPE7"] = "R, L, feed A"
hdu_ant.header.comments["TTYPE8"] = "Position angle feed A"
hdu_ant.header.comments["TTYPE9"] = "Calibration parameters feed A"
hdu_ant.header.comments["TTYPE10"] = "R, L, polarization 2"
hdu_ant.header.comments["TTYPE11"] = "Position angle feed B"
hdu_ant.header.comments["TTYPE12"] = "Calibration parameters feed B"
hdu_ant.header.comments["TTYPE13"] = "Antenna diameter"
return hdu_ant
[docs]
def create_hdu_list(data, obs) -> fits.HDUList:
"""Creates a :class:`~astropy.io.fits.HDUList` as the
top-level object for the FITS file.
Parameters
----------
data : :class:`~pyvisgen.simulation.Visibilities`
:class:`~pyvisgen.simulation.Visibilities` object containing
data on visibilities and observation time.
obs : :class:`~pyvisgen.simulation.Observation`
:class:`~pyvisgen.simulation.Observation` object containing
data on the source position, baselines, antenna configuration,
and frequencies.
Returns
-------
hdu_list : :class:`~astropy.io.fits.HDUList`
:class:`~astropy.io.fits.HDUList` that comprises of
HDU objects.
"""
warnings.filterwarnings("ignore", module="astropy.io.fits")
vis_hdu = create_vis_hdu(data, obs)
time_hdu = create_time_hdu(data)
freq_hdu = create_frequency_hdu(obs)
ant_hdu = create_antenna_hdu(obs)
hdu_list = fits.HDUList([vis_hdu, freq_hdu, ant_hdu, time_hdu])
return hdu_list