Skip to content

Suggestion: Add a TangeDeviceBase class #2

@bj-s

Description

@bj-s

For easier one to one mapping of Tango Devices to Karabo Devices, I would like to propose to add a TangeDeviceBase class that would allow - similar to the TangoMotor class - for an easier development of (lab) specific device classes for a one to one mapping. Based heavily on the TangoMotor implementation, such device class could look like the following:

from asyncio import Event, Lock, TimeoutError, wait_for
from enum import IntEnum

from tango import (
    ConnectionFailed as TangoConnectionFailed, DevState as TangoState)
from tango.asyncio import DeviceProxy as TangoDevProxy

from karabo.middlelayer import (
    AccessLevel, AccessMode, Assignment, Bool, Configurable, Double, Hash,
    KaraboError, MetricPrefix, Slot, State, String, Unit, VectorHash,
    VectorString, background, has_changes, isSet, sleep, slot, unit)

from tangoMirror.TangoMirrorBase import SyncMode, TangoMirrorBase, parse_exception

from ._version import version as deviceVersion
from .state import TangoStateParser


FAST_POLLING_S = 0.1
SLOW_POLLING_S = 1.

TANGO_UNITS = (
    "None",
    "meter", "millimeter", "micrometer", "nanometer", "picometer",
    "degree", "meter_per_second", "millimeter_per_second",
    "micrometer_per_second", "nanometer_per_second", "picometer",
    "")

DEFAULT_PROPERTIES = [
    Hash({"karaboKey": "tangoState", "propertyName": "State",
          "syncMode": SyncMode.POLLING.value,
          "tangoMeasurementUnit": "None",
          "pollingTime": FAST_POLLING_S, "isEnabled": True}),
    Hash({"karaboKey": "tangoStatus", "propertyName": "Status",
          "syncMode": SyncMode.POLLING.value,
          "tangoMeasurementUnit": "None",
          "pollingTime": FAST_POLLING_S, "isEnabled": True}),
]

DEFAULT_CMDS = [
    # Hash({"karaboKey": "stop", "commandName": "Stop", "isEnabled": True}),
]


class PropertyRow(Configurable):
    class ColIdx(IntEnum):
        KARABO_KEY = 0
        PROPERTY_NAME = 1
        TANGO_UNIT = 2
        SYNC_MODE = 3
        POLLING_TIME = 4
        IS_ENABLED = 5

    karaboKey = String(
        displayedName="Karabo Key", accessMode=AccessMode.READONLY,
        defaultValue="")
    propertyName = String(displayedName="Tango Property Name",
                          defaultValue="")
    tangoMeasurementUnit = String(
        displayedName="Tango Measurement Unit",
        options=TANGO_UNITS,
        defaultValue="None")
    syncMode = String(
        displayedName="Sync Mode",
        description="Select the Tango event subscription mechanism to use. "
                    f"{SyncMode.ON_CHANGE.value} = update when Tango "
                    "change event received. "
                    f"{SyncMode.ON_PERIODIC.value} = update when Tango "
                    f"periodic event received. "
                    f"{SyncMode.POLLING.value} = have Karabo poll the values.",
        defaultValue=SyncMode.POLLING.value,
        options=[mode.value for mode in SyncMode],
        accessMode=AccessMode.RECONFIGURABLE)
    pollingTime = Double(
        displayedName="Polling Time",
        description="Update interval duration for 'On periodic' and "
                    "'Polling' sync. modes. Unused in 'On change' mode.",
        unitSymbol=Unit.SECOND,
        defaultValue=0.1,
        minInc=0.01,
        maxInc=2.)
    isEnabled = Bool(displayedName="Is Enabled", defaultValue=True)


class CommandRow(Configurable):
    class ColIdx(IntEnum):
        KARABO_KEY = 0
        CMD_NAME = 1
        IS_ENABLED = 2

    karaboKey = String(
        displayedName="Karabo Key", accessMode=AccessMode.READONLY,
        defaultValue="")
    commandName = String(displayedName="Tango Command Name",
                         defaultValue="")
    isEnabled = Bool(displayedName="Is Enabled", defaultValue=True)


class TangoDeviceBase(TangoMirrorBase):
    __version__ = deviceVersion

    def __init__(self, configuration):
        self.tango_proxy = None
        self.karabo_to_tango_attr_map = {}
        self.tango_to_karabo_attr_map = {}
        self.karabo_to_tango_cmd_map = {}
        self.tango_unit_map = {}
        self.polling_time_map = {}
        self.state_parser = TangoStateParser()
        self.pending_init_values = {}
        self.subscribed_events = set()
        super().__init__(configuration)
        self.on_target_event = Event()
        self.hold_state_update = Lock()
        self.move_task = None
        self.is_reset = False
        self.current_target = None
        self.sync_mode = {}
        self.ignore_limit_switch_errors = False

    __version__ = deviceVersion

    tangoDeviceId = String(
        displayedName="Tango Device Id",
        description="Tango id of the device. Must match the entry in the "
                    "Tango mirror device table",
        accessMode=AccessMode.INITONLY,
        assignment=Assignment.MANDATORY)

    propertiesTable = VectorHash(
        rows=PropertyRow,
        displayedName="Device Properties",
        description="Mapping tango attributes to Karabo properties",
        defaultValue=DEFAULT_PROPERTIES,
        accessMode=AccessMode.INITONLY)

    commandsTable = VectorHash(
        rows=CommandRow,
        displayedName="Device Commands",
        description="Mapping tango commands to Karabo slots",
        defaultValue=DEFAULT_CMDS,
        accessMode=AccessMode.INITONLY)

    tangoState = String(
        displayedName="Tango State",
        displayType='State',
        tags={"watch"},
        accessMode=AccessMode.READONLY)

    def on_tango_state_update(self, value):
        if self.tangoState.value != value:
            self.tangoState = value
        if self.state != State.ERROR:
            new_state = self.state_parser(value)
            background(self.update_state(new_state))

    tangoState.on_update = on_tango_state_update

    tangoStatus = String(
        displayedName="Tango Status",
        tags={"watch"},
        accessMode=AccessMode.READONLY)

    async def execute_cmd(self, karabo_slot_key, param=None):
        tango_cmd_name = self.karabo_to_tango_cmd_map.get(
            karabo_slot_key, None)
        if tango_cmd_name is None:
            raise KaraboError(f"Slot {karabo_slot_key} is disabled.")
        if param is not None:
            await self.tango_proxy.command_inout(tango_cmd_name, param=param)
        else:
            await self.tango_proxy.command_inout(tango_cmd_name)

    async def preInitialization(self):
        for row in self.propertiesTable.value:
            if not row[PropertyRow.ColIdx.IS_ENABLED]:
                continue
            karabo_key = row[PropertyRow.ColIdx.KARABO_KEY]
            tango_name = row[PropertyRow.ColIdx.PROPERTY_NAME]
            tango_unit_name = row[PropertyRow.ColIdx.TANGO_UNIT]
            self.tango_unit_map[tango_name] = getattr(
                unit, tango_unit_name, None)
            self.karabo_to_tango_attr_map[karabo_key] = tango_name
            self.sync_mode[tango_name] = row[PropertyRow.ColIdx.SYNC_MODE]
            self.polling_time_map[tango_name] = row[
                PropertyRow.ColIdx.POLLING_TIME]

        self.tango_to_karabo_attr_map = \
            {v: k for k, v in self.karabo_to_tango_attr_map.items()}

        for row in self.commandsTable.value:
            if not row[CommandRow.ColIdx.IS_ENABLED]:
                continue
            karabo_key = row[CommandRow.ColIdx.KARABO_KEY]
            tango_name = row[CommandRow.ColIdx.CMD_NAME]
            self.karabo_to_tango_cmd_map[karabo_key] = tango_name

    async def onInitialization(self):
        url = self.get_tango_url(self.tangoDeviceId)
        self.tango_proxy = await wait_for(
            TangoDevProxy(url), timeout=self.connectionTimeout.value)

        try:
            dev_attr_list = self.tango_proxy.get_attribute_list()
        except TangoConnectionFailed:
            msg = "Connection failed."
            self.logger.error(msg)
            raise KaraboError(msg)  # TODO do something clever here

        tango_polling = set()
        for attr in self.karabo_to_tango_attr_map.values():
            if attr not in dev_attr_list:
                self.logger.error(f"Tango attribute {attr} not found.")
                continue  # TODO handle this otherwise ?

            sync_mode = self.sync_mode[attr]
            if sync_mode == SyncMode.POLLING.value:
                tango_polling.add(attr)
            else:
                polling_time = self.polling_time_map.get(attr)
                if polling_time is not None:
                    polling_time = int(polling_time * 1000)  # convert s to ms
                evt_id = self.subscribe_event(
                    attr, sync_mode, self.tango_proxy,
                    polling_time=polling_time)
                self.subscribed_events.add(evt_id)

        for key, value in self.pending_init_values.items():
            tango_attr_name = self.karabo_to_tango_attr_map[key]
            tango_value = self.conversion_karabo_to_tango(
                tango_attr_name, value)
            try:
                await self.tango_proxy.write_attribute(
                    tango_attr_name, tango_value)
            except Exception as e:
                msg = parse_exception(e)
                self.logger.error(msg)
                self.status = msg

        for k in self.getDeviceSchema().filterByTags('watch'):
            tango_name = self.karabo_to_tango_attr_map[k]
            if tango_name in tango_polling:
                background(self.watch_tango_device(tango_name))

        await self.update_state(State.ON)

    def onDestruction(self):
        for evt_id in self.subscribed_events:
            if evt_id.done():
                self.tango_proxy.unsubscribe_event(evt_id.result())

    async def update_state(self, new_state):
        updated = False
        if not self.hold_state_update.locked():
            if self.state != new_state:
                self.state = new_state
                updated = True
        return updated

    def conversion_tango_to_karabo(self, tango_attr, tango_value,
                                   karabo_key=None):
        tango_unit = self.tango_unit_map[tango_attr]
        if tango_unit is None:
            return tango_value
        if karabo_key is None:
            karabo_key = self.tango_to_karabo_attr_map[tango_attr]
        karabo_unit = getattr(self, karabo_key).descriptor.units

        tango_value = tango_value * tango_unit

        karabo_value = tango_value.to(karabo_unit).value
        return karabo_value

    def conversion_karabo_to_tango(self, tango_attr, karabo_value):
        tango_unit = self.tango_unit_map[tango_attr]
        if tango_unit is None:
            return karabo_value
        return karabo_value.to(tango_unit).value

    async def refresh_value(self, tango_name):
        tango_attr = await self.tango_proxy.read_attribute(tango_name)
        tango_value = tango_attr.value
        self.update_karabo_parameter(tango_name, tango_value)

    async def watch_tango_device(self, tango_name):
        polling_time = self.polling_time_map[tango_name]
        already_logged = False
        while True:
            try:
                await self.refresh_value(tango_name)
                already_logged = False
            except Exception as e:
                if self.state != State.ERROR:
                    self.state = State.ERROR
                if not already_logged:
                    already_logged = True
                    karabo_name = self.tango_to_karabo_attr_map[tango_name]
                    msg = (f"Could not read {tango_name} ({karabo_name}): "
                           f"{parse_exception(e)}")
                    if self.status != msg:
                        self.status = msg
                        self.logger.error(msg)
            await sleep(polling_time)

    async def write_value(self, key, value):
        if not isSet(value):
            return

        if not self.tango_proxy or not self.karabo_to_tango_attr_map:
            self.pending_init_values[key] = value
            return

        tango_attr_name = self.karabo_to_tango_attr_map[key]
        tango_value = self.conversion_karabo_to_tango(tango_attr_name, value)
        try:
            await self.tango_proxy.write_attribute(
                tango_attr_name, tango_value)
        except Exception as e:
            msg = parse_exception(e)
            if await self.update_state(State.ERROR):
                self.logger.error(msg)
                self.status = msg

    def handle_event(self, args):
        self.logger.debug(f"Received {args.event} EventData "
                          f"from {args.attr_name}")
        if args.err:
            err_desc = [e.desc for e in args.errors]
            self.logger.error("EventData contains error message: "
                              f"{'. '.join(err_desc)}")
            return

        # extract the device name from the full attribute address
        dev_name = args.attr_name.rsplit('/', 1)[0]
        # extract the attribute name (discard '#dbase=___', cut dev_name)
        attr_name = args.attr_name.split('#', 1)[0]
        attr_name = attr_name.replace(dev_name + '/', "")

        # attribute names in events are lowercased, even if tango
        # attribute is defined with uppercase letters, match it
        # here with the actual name
        for att in self.tango_to_karabo_attr_map:
            if att.lower() == attr_name:
                attr_name = att

        # attr_value is a tango.DeviceAttribute object
        tango_value = args.attr_value.value
        self.update_karabo_parameter(attr_name, tango_value)

    def update_karabo_parameter(self, tango_name, tango_value):
        try:
            karabo_key = self.tango_to_karabo_attr_map[tango_name]
            karabo_value = self.conversion_tango_to_karabo(
                tango_name, tango_value, karabo_key)
            karabo_par = getattr(self, karabo_key)

            if hasattr(karabo_par.descriptor, "on_update"):
                karabo_par.descriptor.on_update(self, karabo_value)
            elif has_changes(karabo_par.value, karabo_value):
                setattr(self, karabo_key, karabo_value)

        except Exception as e:
            msg = f"Could not set {karabo_key} to value {karabo_value}: {e}"
            self.logger.error(msg)

Then the implementation of lab specific devices - mapping their cammands and state - would e.g. look like the following:

from tangoMirror.TangoMirrorBase import SyncMode

from karabo.middlelayer import (
    AccessLevel, AccessMode, Bool, Float, Hash, Slot, State, VectorHash)

from ._version import version as deviceVersion
from .TangoDeviceBase import CommandRow, PropertyRow, TangoDeviceBase

FAST_POLLING_S = 0.1
SLOW_POLLING_S = 1.0


DEFAULT_CMDS = [
    Hash({"karaboKey": "init", "commandName": "Init", "isEnabled": True}),
    Hash({"karaboKey": "power", "commandName": "Power", "isEnabled": True}),
]

DEFAULT_PROPERTIES = [
    Hash(
        {
            "karaboKey": "actualFlow",
            "propertyName": "FlowCurrent",
            "syncMode": SyncMode.POLLING.value,
            "tangoMeasurementUnit": "None",
            "pollingTime": FAST_POLLING_S,
            "isEnabled": True,
        }
    ),
    Hash(
        {
            "karaboKey": "targetFlow",
            "propertyName": "FlowSetPoint",
            "syncMode": SyncMode.POLLING.value,
            "tangoMeasurementUnit": "None",
            "pollingTime": FAST_POLLING_S,
            "isEnabled": True,
        }
    ),
    Hash(
        {
            "karaboKey": "controllerState",
            "propertyName": "ControllerState",
            "syncMode": SyncMode.POLLING.value,
            "tangoMeasurementUnit": "None",
            "pollingTime": SLOW_POLLING_S,
            "isEnabled": True,
        }
    ),
    Hash(
        {
            "karaboKey": "tangoState",
            "propertyName": "State",
            "syncMode": SyncMode.POLLING.value,
            "tangoMeasurementUnit": "None",
            "pollingTime": FAST_POLLING_S,
            "isEnabled": True,
        }
    ),
    Hash(
        {
            "karaboKey": "tangoStatus",
            "propertyName": "Status",
            "syncMode": SyncMode.POLLING.value,
            "tangoMeasurementUnit": "None",
            "pollingTime": FAST_POLLING_S,
            "isEnabled": True,
        }
    ),
]


class NuxTangoMassFlowCtrl(TangoDeviceBase):
    __version__ = deviceVersion

    propertiesTable = VectorHash(
        rows=PropertyRow,
        displayedName="Device Properties",
        description="Mapping tango attributes to Karabo properties",
        defaultValue=DEFAULT_PROPERTIES,
        accessMode=AccessMode.INITONLY,
    )

    commandsTable = VectorHash(
        rows=CommandRow,
        displayedName="Device Commands",
        description="Mapping tango commands to Karabo slots",
        defaultValue=DEFAULT_CMDS,
        accessMode=AccessMode.INITONLY,
    )

    actualFlow = Float(
        displayedName="Actual Flow (Measurement)",
        tags={"watch"},
        accessMode=AccessMode.READONLY,
    )

    @Float(
        displayedName="Target Flow (Set Point) ",
        description="Specify the target flow",
        tags={"watch"},
    )
    async def targetFlow(self, value):
        await self.write_value("targetFlow", value)

    controllerState = Bool(
        displayedName="Controller State", tags={"watch"}, accessMode=AccessMode.READONLY
    )

    @Slot(displayedName="Init", requiredAccessLevel=AccessLevel.EXPERT)
    async def init(self):
        await self.execute_cmd("init")

    @Slot(displayedName="Power", allowedStates=[State.ON, State.OFF])
    async def power(self):
        await self.execute_cmd("power")

    def __init__(self, configuration):
        super().__init__(configuration)

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions