Source code for ska_tmc_cdm.schemas.central_node.sdp

"""
The schemas.central_node module defines Marshmallow schemas that map TMC
Central Node message classes to/from a JSON representation.
"""
from marshmallow import Schema, fields, post_dump, post_load

from ska_tmc_cdm.messages.central_node.sdp import (
    Channel,
    PbDependency,
    ProcessingBlockConfiguration,
    ScanType,
    SDPConfiguration,
    SDPWorkflow,
)
from ska_tmc_cdm.schemas import CODEC

__all__ = [
    "ScanTypeSchema",
    "SDPWorkflowSchema",
    "PbDependencySchema",
    "ChannelSchema",
    "ProcessingBlockSchema",
    "SDPConfigurationSchema",
]


[docs]class ChannelSchema(Schema): """ Marshmallow schema for the SubBand class. """ count = fields.Integer(data_key="count", required=True) start = fields.Integer(data_key="start", required=True) stride = fields.Integer(data_key="stride", required=True) freq_min = fields.Float(data_key="freq_min", required=True) freq_max = fields.Float(data_key="freq_max", required=True) link_map = fields.List(fields.List(fields.Int), data_key="link_map", required=True)
[docs] @post_load def create_channel(self, data, **_): # pylint: disable=no-self-use """ Convert parsed JSON back into a Channel object. :param data: Marshmallow-provided dict containing parsed JSON values :param _: kwargs passed by Marshmallow :return: SubBand object populated from data """ count = data["count"] start = data["start"] stride = data["stride"] freq_min = data["freq_min"] freq_max = data["freq_max"] link_map = data["link_map"] return Channel(count, start, stride, freq_min, freq_max, link_map)
[docs]class ScanTypeSchema(Schema): """ Marshmallow schema for the ScanType class. """ scan_type_id = fields.String(data_key="scan_type_id", required=True) reference_frame = fields.String(data_key="reference_frame", required=True) ra = fields.String(data_key="ra", required=True) dec = fields.String(data_key="dec", required=True) channels = fields.Nested(ChannelSchema, data_key="channels", many=True)
[docs] @post_load def create_scan_type(self, data, **_): # pylint: disable=no-self-use """ Convert parsed JSON back into a ScanType object. :param data: Marshmallow-provided dict containing parsed JSON values :param _: kwargs passed by Marshmallow :return: ScanType object populated from data """ scan_type_id = data["scan_type_id"] reference_frame = data["reference_frame"] ra = data["ra"] # pylint: disable=invalid-name dec = data["dec"] channels = data["channels"] return ScanType(scan_type_id, reference_frame, ra, dec, channels)
[docs]class SDPWorkflowSchema(Schema): # pylint: disable=too-few-public-methods """ Represents the type of workflow being configured on the SDP """ name = fields.String(data_key="name", required=True) kind = fields.String(data_key="kind", required=True) version = fields.String(data_key="version", required=True)
[docs] @post_load def create_sdp_wf(self, data, **_): # pylint: disable=no-self-use """ Convert parsed JSON back into a SDP Workflow object. :param data: Marshmallow-provided dict containing parsed JSON values :param _: kwargs passed by Marshmallow :return: SDP Workflow object populated from data """ name = data["name"] kind = data["kind"] version = data["version"] return SDPWorkflow(name, kind, version)
[docs]class PbDependencySchema(Schema): # pylint: disable=too-few-public-methods """ Marshmallow schema for the PbDepedency class. """ pb_id = fields.String(data_key="pb_id") pb_type = fields.List(fields.String, data_key="kind")
[docs] @post_load def create_pb_dependency(self, data, **_): # pylint: disable=no-self-use """ Convert parsed JSON back into a PbDependency object. :param data: Marshmallow-provided dict containing parsed JSON values :param _: kwargs passed by Marshmallow :return: PbDependency object populated from data """ pb_id = data["pb_id"] pb_type = data["pb_type"] return PbDependency(pb_id, pb_type)
[docs]class ProcessingBlockSchema(Schema): """ Marshmallow schema for the ProcessingBlock class. """ pb_id = fields.String(data_key="pb_id", required=True) workflow = fields.Nested(SDPWorkflowSchema) parameters = fields.Dict() dependencies = fields.Nested(PbDependencySchema, many=True, missing=None)
[docs] @post_dump def filter_nulls(self, data, **_): # pylint: disable=no-self-use """ Filter out null values from JSON. :param data: Marshmallow-provided dict containing parsed object values :param _: kwargs passed by Marshmallow :return: dict suitable for PB configuration """ return {k: v for k, v in data.items() if v is not None}
[docs] @post_load def create_processing_block_config(self, data, **_): # pylint: disable=no-self-use """ Convert parsed JSON back into a PB object. :param data: Marshmallow-provided dict containing parsed JSON values :param _: kwargs passed by Marshmallow :return: PB object populated from data """ return ProcessingBlockConfiguration(**data)
[docs]@CODEC.register_mapping(SDPConfiguration) class SDPConfigurationSchema(Schema): """ Marsmallow class for the SDPConfiguration class """ interface = fields.String() eb_id = fields.String(data_key="eb_id", required=True) max_length = fields.Float(data_key="max_length", required=True) scan_types = fields.Nested(ScanTypeSchema, many=True) processing_blocks = fields.Nested(ProcessingBlockSchema, many=True)
[docs] @post_dump def filter_nulls(self, data, **_): # pylint: disable=no-self-use """ Filter out null values from JSON. :param data: Marshmallow-provided dict containing parsed object values :param _: kwargs passed by Marshmallow :return: dict suitable for PB configuration """ return {k: v for k, v in data.items() if v is not None}
[docs] @post_load def create_sdp_config(self, data, **_): # pylint: disable=no-self-use """ Convert parsed JSON back into a SDPConfiguration object. :param data: Marshmallow-provided dict containing parsed JSON values :param _: kwargs passed by Marshmallow :return: SDPConfiguration object populated from data """ return SDPConfiguration(**data)