Source code for ska.cdm.schemas.subarray_node.configure.mccs

"""
This module defines Marshmallow schemas that map the CDM classes for
SubArrayNode MCCS configuration to/from JSON.
"""

from marshmallow import Schema, fields, post_load

from ska.cdm.messages.subarray_node.configure.mccs import MCCSConfiguration
from ska.cdm.messages.subarray_node.configure.mccs import StnConfiguration
from ska.cdm.messages.subarray_node.configure.mccs import SubarrayBeamConfiguration
from ska.cdm.messages.subarray_node.configure.mccs import SubarrayBeamTarget
from ska.cdm.schemas import CODEC

__all__ = [
    "MCCSConfigurationSchema",
    "StnConfigurationSchema",
    "SubarrayBeamConfigurationSchema",
    "SubarrayBeamTargetSchema"
]


[docs]@CODEC.register_mapping(SubarrayBeamTarget) class SubarrayBeamTargetSchema(Schema): # pylint: disable=too-few-public-methods """ Marshmallow schema for the subarray_node.Target class """ az = fields.Float(data_key="az", required=True) el = fields.Float(data_key="el", required=True) name = fields.String(data_key="name", required=True) system = fields.String(data_key="system", required=True)
[docs] @post_load def create_target(self, data, **_): # pylint: disable=no-self-use """ Convert parsed JSON back into a Target object. :param data: dict containing parsed JSON values :param _: kwargs passed by Marshmallow :return: Target instance populated to match JSON """ az = data["az"] el = data["el"] name = data["name"] system = data["system"] return SubarrayBeamTarget( az=az, el=el, name=name, system=system )
[docs]@CODEC.register_mapping(StnConfiguration) class StnConfigurationSchema(Schema): station_id = fields.Integer(data_key="station_id", required=True)
[docs] @post_load def create(self, data, **_): """ Convert parsed JSON back into a StnConfiguration object. :param data: dict containing parsed JSON values :param _: kwargs passed by Marshmallow :return: StnConfiguration instance populated to match JSON :rtype: StnConfiguration """ station_id = data["station_id"] return StnConfiguration(station_id)
[docs]@CODEC.register_mapping(SubarrayBeamConfiguration) class SubarrayBeamConfigurationSchema(Schema): subarray_beam_id = fields.Integer(data_key="subarray_beam_id", required=True) station_ids = fields.List(fields.Integer(data_key="station_ids", required=True)) channels = fields.List(fields.List(fields.Integer), data_key="channels") update_rate = fields.Float(data_key="update_rate") target = fields.Nested(SubarrayBeamTargetSchema, data_key="target") antenna_weights = fields.List(fields.Float(data_key="antenna_weights")) phase_centre = fields.List(fields.Float(data_key="phase_centre"))
[docs] @post_load def create(self, data, **_): """ Convert parsed JSON back into a SubarrayBeamConfiguration object. :param data: dict containing parsed JSON values :param _: kwargs passed by Marshmallow :return: SubarrayBeamConfiguration instance populated to match JSON :rtype: SubarrayBeamConfiguration """ subarray_beam_id = data["subarray_beam_id"] station_ids = data["station_ids"] channels = data["channels"] update_rate = data["update_rate"] target = data["target"] antenna_weights = data["antenna_weights"] phase_centre = data["phase_centre"] return SubarrayBeamConfiguration( subarray_beam_id=subarray_beam_id, station_ids=station_ids, channels=channels, update_rate=update_rate, target=target, antenna_weights=antenna_weights, phase_centre=phase_centre )
[docs]@CODEC.register_mapping(MCCSConfiguration) class MCCSConfigurationSchema(Schema): """ Marshmallow schema for the subarray_node.MCCSConfiguration class """ station_configs = fields.Nested( StnConfigurationSchema, many=True, data_key="stations" ) subarray_beam_configs = fields.Nested( SubarrayBeamConfigurationSchema, many=True, data_key="subarray_beams" )
[docs] @post_load def create(self, data, **_): """ Convert parsed JSON back into a MCCSConfiguration object. :param data: dict containing parsed JSON values :param _: kwargs passed by Marshmallow :return: MCCSConfiguration instance populated to match JSON :rtype: MCCSConfiguration """ stn_configs = data["station_configs"] subarray_beam_configs = data["subarray_beam_configs"] return MCCSConfiguration(stn_configs, subarray_beam_configs)