For easier one to one mapping of Tango Devices to Karabo Devices, I would like to propose to add a TangeDeviceBase class that would allow - similar to the TangoMotor class - for an easier development of (lab) specific device classes for a one to one mapping. Based heavily on the TangoMotor implementation, such device class could look like the following:
from asyncio import Event, Lock, TimeoutError, wait_for
from enum import IntEnum
from tango import (
ConnectionFailed as TangoConnectionFailed, DevState as TangoState)
from tango.asyncio import DeviceProxy as TangoDevProxy
from karabo.middlelayer import (
AccessLevel, AccessMode, Assignment, Bool, Configurable, Double, Hash,
KaraboError, MetricPrefix, Slot, State, String, Unit, VectorHash,
VectorString, background, has_changes, isSet, sleep, slot, unit)
from tangoMirror.TangoMirrorBase import SyncMode, TangoMirrorBase, parse_exception
from ._version import version as deviceVersion
from .state import TangoStateParser
FAST_POLLING_S = 0.1
SLOW_POLLING_S = 1.
TANGO_UNITS = (
"None",
"meter", "millimeter", "micrometer", "nanometer", "picometer",
"degree", "meter_per_second", "millimeter_per_second",
"micrometer_per_second", "nanometer_per_second", "picometer",
"")
DEFAULT_PROPERTIES = [
Hash({"karaboKey": "tangoState", "propertyName": "State",
"syncMode": SyncMode.POLLING.value,
"tangoMeasurementUnit": "None",
"pollingTime": FAST_POLLING_S, "isEnabled": True}),
Hash({"karaboKey": "tangoStatus", "propertyName": "Status",
"syncMode": SyncMode.POLLING.value,
"tangoMeasurementUnit": "None",
"pollingTime": FAST_POLLING_S, "isEnabled": True}),
]
DEFAULT_CMDS = [
# Hash({"karaboKey": "stop", "commandName": "Stop", "isEnabled": True}),
]
class PropertyRow(Configurable):
class ColIdx(IntEnum):
KARABO_KEY = 0
PROPERTY_NAME = 1
TANGO_UNIT = 2
SYNC_MODE = 3
POLLING_TIME = 4
IS_ENABLED = 5
karaboKey = String(
displayedName="Karabo Key", accessMode=AccessMode.READONLY,
defaultValue="")
propertyName = String(displayedName="Tango Property Name",
defaultValue="")
tangoMeasurementUnit = String(
displayedName="Tango Measurement Unit",
options=TANGO_UNITS,
defaultValue="None")
syncMode = String(
displayedName="Sync Mode",
description="Select the Tango event subscription mechanism to use. "
f"{SyncMode.ON_CHANGE.value} = update when Tango "
"change event received. "
f"{SyncMode.ON_PERIODIC.value} = update when Tango "
f"periodic event received. "
f"{SyncMode.POLLING.value} = have Karabo poll the values.",
defaultValue=SyncMode.POLLING.value,
options=[mode.value for mode in SyncMode],
accessMode=AccessMode.RECONFIGURABLE)
pollingTime = Double(
displayedName="Polling Time",
description="Update interval duration for 'On periodic' and "
"'Polling' sync. modes. Unused in 'On change' mode.",
unitSymbol=Unit.SECOND,
defaultValue=0.1,
minInc=0.01,
maxInc=2.)
isEnabled = Bool(displayedName="Is Enabled", defaultValue=True)
class CommandRow(Configurable):
class ColIdx(IntEnum):
KARABO_KEY = 0
CMD_NAME = 1
IS_ENABLED = 2
karaboKey = String(
displayedName="Karabo Key", accessMode=AccessMode.READONLY,
defaultValue="")
commandName = String(displayedName="Tango Command Name",
defaultValue="")
isEnabled = Bool(displayedName="Is Enabled", defaultValue=True)
class TangoDeviceBase(TangoMirrorBase):
__version__ = deviceVersion
def __init__(self, configuration):
self.tango_proxy = None
self.karabo_to_tango_attr_map = {}
self.tango_to_karabo_attr_map = {}
self.karabo_to_tango_cmd_map = {}
self.tango_unit_map = {}
self.polling_time_map = {}
self.state_parser = TangoStateParser()
self.pending_init_values = {}
self.subscribed_events = set()
super().__init__(configuration)
self.on_target_event = Event()
self.hold_state_update = Lock()
self.move_task = None
self.is_reset = False
self.current_target = None
self.sync_mode = {}
self.ignore_limit_switch_errors = False
__version__ = deviceVersion
tangoDeviceId = String(
displayedName="Tango Device Id",
description="Tango id of the device. Must match the entry in the "
"Tango mirror device table",
accessMode=AccessMode.INITONLY,
assignment=Assignment.MANDATORY)
propertiesTable = VectorHash(
rows=PropertyRow,
displayedName="Device Properties",
description="Mapping tango attributes to Karabo properties",
defaultValue=DEFAULT_PROPERTIES,
accessMode=AccessMode.INITONLY)
commandsTable = VectorHash(
rows=CommandRow,
displayedName="Device Commands",
description="Mapping tango commands to Karabo slots",
defaultValue=DEFAULT_CMDS,
accessMode=AccessMode.INITONLY)
tangoState = String(
displayedName="Tango State",
displayType='State',
tags={"watch"},
accessMode=AccessMode.READONLY)
def on_tango_state_update(self, value):
if self.tangoState.value != value:
self.tangoState = value
if self.state != State.ERROR:
new_state = self.state_parser(value)
background(self.update_state(new_state))
tangoState.on_update = on_tango_state_update
tangoStatus = String(
displayedName="Tango Status",
tags={"watch"},
accessMode=AccessMode.READONLY)
async def execute_cmd(self, karabo_slot_key, param=None):
tango_cmd_name = self.karabo_to_tango_cmd_map.get(
karabo_slot_key, None)
if tango_cmd_name is None:
raise KaraboError(f"Slot {karabo_slot_key} is disabled.")
if param is not None:
await self.tango_proxy.command_inout(tango_cmd_name, param=param)
else:
await self.tango_proxy.command_inout(tango_cmd_name)
async def preInitialization(self):
for row in self.propertiesTable.value:
if not row[PropertyRow.ColIdx.IS_ENABLED]:
continue
karabo_key = row[PropertyRow.ColIdx.KARABO_KEY]
tango_name = row[PropertyRow.ColIdx.PROPERTY_NAME]
tango_unit_name = row[PropertyRow.ColIdx.TANGO_UNIT]
self.tango_unit_map[tango_name] = getattr(
unit, tango_unit_name, None)
self.karabo_to_tango_attr_map[karabo_key] = tango_name
self.sync_mode[tango_name] = row[PropertyRow.ColIdx.SYNC_MODE]
self.polling_time_map[tango_name] = row[
PropertyRow.ColIdx.POLLING_TIME]
self.tango_to_karabo_attr_map = \
{v: k for k, v in self.karabo_to_tango_attr_map.items()}
for row in self.commandsTable.value:
if not row[CommandRow.ColIdx.IS_ENABLED]:
continue
karabo_key = row[CommandRow.ColIdx.KARABO_KEY]
tango_name = row[CommandRow.ColIdx.CMD_NAME]
self.karabo_to_tango_cmd_map[karabo_key] = tango_name
async def onInitialization(self):
url = self.get_tango_url(self.tangoDeviceId)
self.tango_proxy = await wait_for(
TangoDevProxy(url), timeout=self.connectionTimeout.value)
try:
dev_attr_list = self.tango_proxy.get_attribute_list()
except TangoConnectionFailed:
msg = "Connection failed."
self.logger.error(msg)
raise KaraboError(msg) # TODO do something clever here
tango_polling = set()
for attr in self.karabo_to_tango_attr_map.values():
if attr not in dev_attr_list:
self.logger.error(f"Tango attribute {attr} not found.")
continue # TODO handle this otherwise ?
sync_mode = self.sync_mode[attr]
if sync_mode == SyncMode.POLLING.value:
tango_polling.add(attr)
else:
polling_time = self.polling_time_map.get(attr)
if polling_time is not None:
polling_time = int(polling_time * 1000) # convert s to ms
evt_id = self.subscribe_event(
attr, sync_mode, self.tango_proxy,
polling_time=polling_time)
self.subscribed_events.add(evt_id)
for key, value in self.pending_init_values.items():
tango_attr_name = self.karabo_to_tango_attr_map[key]
tango_value = self.conversion_karabo_to_tango(
tango_attr_name, value)
try:
await self.tango_proxy.write_attribute(
tango_attr_name, tango_value)
except Exception as e:
msg = parse_exception(e)
self.logger.error(msg)
self.status = msg
for k in self.getDeviceSchema().filterByTags('watch'):
tango_name = self.karabo_to_tango_attr_map[k]
if tango_name in tango_polling:
background(self.watch_tango_device(tango_name))
await self.update_state(State.ON)
def onDestruction(self):
for evt_id in self.subscribed_events:
if evt_id.done():
self.tango_proxy.unsubscribe_event(evt_id.result())
async def update_state(self, new_state):
updated = False
if not self.hold_state_update.locked():
if self.state != new_state:
self.state = new_state
updated = True
return updated
def conversion_tango_to_karabo(self, tango_attr, tango_value,
karabo_key=None):
tango_unit = self.tango_unit_map[tango_attr]
if tango_unit is None:
return tango_value
if karabo_key is None:
karabo_key = self.tango_to_karabo_attr_map[tango_attr]
karabo_unit = getattr(self, karabo_key).descriptor.units
tango_value = tango_value * tango_unit
karabo_value = tango_value.to(karabo_unit).value
return karabo_value
def conversion_karabo_to_tango(self, tango_attr, karabo_value):
tango_unit = self.tango_unit_map[tango_attr]
if tango_unit is None:
return karabo_value
return karabo_value.to(tango_unit).value
async def refresh_value(self, tango_name):
tango_attr = await self.tango_proxy.read_attribute(tango_name)
tango_value = tango_attr.value
self.update_karabo_parameter(tango_name, tango_value)
async def watch_tango_device(self, tango_name):
polling_time = self.polling_time_map[tango_name]
already_logged = False
while True:
try:
await self.refresh_value(tango_name)
already_logged = False
except Exception as e:
if self.state != State.ERROR:
self.state = State.ERROR
if not already_logged:
already_logged = True
karabo_name = self.tango_to_karabo_attr_map[tango_name]
msg = (f"Could not read {tango_name} ({karabo_name}): "
f"{parse_exception(e)}")
if self.status != msg:
self.status = msg
self.logger.error(msg)
await sleep(polling_time)
async def write_value(self, key, value):
if not isSet(value):
return
if not self.tango_proxy or not self.karabo_to_tango_attr_map:
self.pending_init_values[key] = value
return
tango_attr_name = self.karabo_to_tango_attr_map[key]
tango_value = self.conversion_karabo_to_tango(tango_attr_name, value)
try:
await self.tango_proxy.write_attribute(
tango_attr_name, tango_value)
except Exception as e:
msg = parse_exception(e)
if await self.update_state(State.ERROR):
self.logger.error(msg)
self.status = msg
def handle_event(self, args):
self.logger.debug(f"Received {args.event} EventData "
f"from {args.attr_name}")
if args.err:
err_desc = [e.desc for e in args.errors]
self.logger.error("EventData contains error message: "
f"{'. '.join(err_desc)}")
return
# extract the device name from the full attribute address
dev_name = args.attr_name.rsplit('/', 1)[0]
# extract the attribute name (discard '#dbase=___', cut dev_name)
attr_name = args.attr_name.split('#', 1)[0]
attr_name = attr_name.replace(dev_name + '/', "")
# attribute names in events are lowercased, even if tango
# attribute is defined with uppercase letters, match it
# here with the actual name
for att in self.tango_to_karabo_attr_map:
if att.lower() == attr_name:
attr_name = att
# attr_value is a tango.DeviceAttribute object
tango_value = args.attr_value.value
self.update_karabo_parameter(attr_name, tango_value)
def update_karabo_parameter(self, tango_name, tango_value):
try:
karabo_key = self.tango_to_karabo_attr_map[tango_name]
karabo_value = self.conversion_tango_to_karabo(
tango_name, tango_value, karabo_key)
karabo_par = getattr(self, karabo_key)
if hasattr(karabo_par.descriptor, "on_update"):
karabo_par.descriptor.on_update(self, karabo_value)
elif has_changes(karabo_par.value, karabo_value):
setattr(self, karabo_key, karabo_value)
except Exception as e:
msg = f"Could not set {karabo_key} to value {karabo_value}: {e}"
self.logger.error(msg)
Then the implementation of lab specific devices - mapping their cammands and state - would e.g. look like the following:
from tangoMirror.TangoMirrorBase import SyncMode
from karabo.middlelayer import (
AccessLevel, AccessMode, Bool, Float, Hash, Slot, State, VectorHash)
from ._version import version as deviceVersion
from .TangoDeviceBase import CommandRow, PropertyRow, TangoDeviceBase
FAST_POLLING_S = 0.1
SLOW_POLLING_S = 1.0
DEFAULT_CMDS = [
Hash({"karaboKey": "init", "commandName": "Init", "isEnabled": True}),
Hash({"karaboKey": "power", "commandName": "Power", "isEnabled": True}),
]
DEFAULT_PROPERTIES = [
Hash(
{
"karaboKey": "actualFlow",
"propertyName": "FlowCurrent",
"syncMode": SyncMode.POLLING.value,
"tangoMeasurementUnit": "None",
"pollingTime": FAST_POLLING_S,
"isEnabled": True,
}
),
Hash(
{
"karaboKey": "targetFlow",
"propertyName": "FlowSetPoint",
"syncMode": SyncMode.POLLING.value,
"tangoMeasurementUnit": "None",
"pollingTime": FAST_POLLING_S,
"isEnabled": True,
}
),
Hash(
{
"karaboKey": "controllerState",
"propertyName": "ControllerState",
"syncMode": SyncMode.POLLING.value,
"tangoMeasurementUnit": "None",
"pollingTime": SLOW_POLLING_S,
"isEnabled": True,
}
),
Hash(
{
"karaboKey": "tangoState",
"propertyName": "State",
"syncMode": SyncMode.POLLING.value,
"tangoMeasurementUnit": "None",
"pollingTime": FAST_POLLING_S,
"isEnabled": True,
}
),
Hash(
{
"karaboKey": "tangoStatus",
"propertyName": "Status",
"syncMode": SyncMode.POLLING.value,
"tangoMeasurementUnit": "None",
"pollingTime": FAST_POLLING_S,
"isEnabled": True,
}
),
]
class NuxTangoMassFlowCtrl(TangoDeviceBase):
__version__ = deviceVersion
propertiesTable = VectorHash(
rows=PropertyRow,
displayedName="Device Properties",
description="Mapping tango attributes to Karabo properties",
defaultValue=DEFAULT_PROPERTIES,
accessMode=AccessMode.INITONLY,
)
commandsTable = VectorHash(
rows=CommandRow,
displayedName="Device Commands",
description="Mapping tango commands to Karabo slots",
defaultValue=DEFAULT_CMDS,
accessMode=AccessMode.INITONLY,
)
actualFlow = Float(
displayedName="Actual Flow (Measurement)",
tags={"watch"},
accessMode=AccessMode.READONLY,
)
@Float(
displayedName="Target Flow (Set Point) ",
description="Specify the target flow",
tags={"watch"},
)
async def targetFlow(self, value):
await self.write_value("targetFlow", value)
controllerState = Bool(
displayedName="Controller State", tags={"watch"}, accessMode=AccessMode.READONLY
)
@Slot(displayedName="Init", requiredAccessLevel=AccessLevel.EXPERT)
async def init(self):
await self.execute_cmd("init")
@Slot(displayedName="Power", allowedStates=[State.ON, State.OFF])
async def power(self):
await self.execute_cmd("power")
def __init__(self, configuration):
super().__init__(configuration)
For easier one to one mapping of Tango Devices to Karabo Devices, I would like to propose to add a
TangeDeviceBaseclass that would allow - similar to theTangoMotorclass - for an easier development of (lab) specific device classes for a one to one mapping. Based heavily on theTangoMotorimplementation, such device class could look like the following:Then the implementation of lab specific devices - mapping their cammands and state - would e.g. look like the following: