import re
from enum import Enum
from io import StringIO
from xml.etree import ElementTree
from dataclasses import dataclass
from collections.abc import Mapping, Sequence
from typing import Any, Callable, Dict, Iterable, Iterator, Optional, Union
import requests
import icat_esrf_definitions
from .namespace_wrapper import NamespaceWrapper
IcatNodeIdLike = Union[str, Iterable[str]]
IcatItemType = Union["IcatField", "IcatFieldGroup"]
IcatCategory = Enum(
"IcatCategory",
"other technique instrument sample positioner note slit attenuator detector",
)
NX_CLASS_TO_CATEGORY = {
"NXsubentry": IcatCategory.technique,
"NXpositioner": IcatCategory.positioner,
"NXinstrument": IcatCategory.instrument,
"NXsample": IcatCategory.sample,
"NXnote": IcatCategory.note,
"NXslit": IcatCategory.slit,
"NXattenuator": IcatCategory.attenuator,
"NXdetector": IcatCategory.detector,
}
[docs]
class IcatNodeId(Sequence):
"""Identifier of an item in the ICAT metadata tree."""
def __init__(self, iterable: Optional[IcatNodeIdLike] = None) -> None:
self._sequence = tuple(self._parse_init(iterable))
def _parse_init(self, iterable: Optional[IcatNodeIdLike] = None) -> Iterable[str]:
if iterable is None:
return
if isinstance(iterable, str):
yield iterable
return
for s in iterable:
if not isinstance(s, str):
raise TypeError(s)
if s:
yield s
def __getitem__(self, idx) -> str:
value = self._sequence[idx]
if isinstance(value, tuple):
return type(self)(value)
return value
def __len__(self) -> int:
return len(self._sequence)
def __str__(self) -> str:
return ".".join(self)
def __repr__(self) -> str:
return repr(str(self))
def __eq__(self, other: object) -> bool:
if not isinstance(other, type(self)):
return False
return self._sequence == other._sequence
def __add__(self, other: IcatNodeIdLike):
if isinstance(other, str):
other = (other,)
elif not isinstance(other, tuple):
other = tuple(other)
return type(self)(self._sequence + other)
[docs]
def endswith(self, suffix: IcatNodeIdLike):
if isinstance(suffix, str):
suffix = (suffix,)
elif not isinstance(suffix, tuple):
suffix = tuple(suffix)
return self._sequence[-len(suffix) :] == suffix
[docs]
def as_icatnodeid(node_id: IcatNodeIdLike) -> IcatNodeId:
if isinstance(node_id, IcatNodeId):
return node_id
else:
return IcatNodeId(node_id)
[docs]
@dataclass(frozen=True, repr=True, eq=True, order=True)
class IcatField:
"""Description of a single ICAT database field"""
name: str
field_name: str
parent: IcatNodeId
nxtype: str
description: Optional[str]
units: Optional[str]
@property
def node_id(self) -> IcatNodeId:
"""Node identifier in the metadata tree"""
return self.parent + self.name
@property
def info(self) -> "IcatField":
return self
[docs]
@dataclass(frozen=True, repr=True, eq=True, order=True)
class IcatGroup:
"""Description of a group of ICAT database items"""
name: str
parent: IcatNodeId
NX_class: str
@property
def node_id(self) -> IcatNodeId:
"""Node identifier in the metadata tree"""
return self.parent + self.name
@property
def category(self) -> IcatCategory:
return NX_CLASS_TO_CATEGORY.get(self.NX_class, IcatCategory.other)
@property
def info(self) -> "IcatGroup":
return self
[docs]
class IcatFieldGroup(Mapping):
"""A group of ICAT database items"""
def __init__(self, info: IcatGroup, nodes: Dict[str, IcatItemType]) -> None:
self.__info = info
self.__nodes = nodes
def __getitem__(self, node_id: IcatNodeIdLike) -> IcatItemType:
node_id = as_icatnodeid(node_id)
adict = self.__nodes
for name in node_id:
adict = adict[name]
return adict
def __iter__(self) -> Iterator[str]:
return self.__nodes.__iter__()
def __len__(self) -> int:
return self.__nodes.__len__()
def __repr__(self) -> str:
return f"{type(self).__name__}({self.info!r})"
@property
def info(self) -> IcatGroup:
return self.__info
[docs]
def iter_fields(self) -> Iterable[IcatField]:
for icat_item in self.values():
if isinstance(icat_item, IcatField):
yield icat_item
else:
yield from icat_item.iter_fields()
[docs]
def iter_groups(self) -> Iterable["IcatFieldGroup"]:
cls = type(self)
for icat_item in self.values():
if isinstance(icat_item, cls):
yield icat_item
yield from icat_item.iter_groups()
[docs]
def iter_group_names(self) -> Iterable[str]:
for group in self.iter_groups():
yield str(group.info.node_id)
[docs]
def iter_field_names(self) -> Iterable[str]:
for field in self.iter_fields():
yield str(field.node_id)
[docs]
def get_field_with_field_name(self, field_name: str) -> Optional[IcatField]:
for field in self.iter_fields():
if field.field_name == field_name:
return field
[docs]
def iter_groups_with_type(
self, categories: Union[IcatCategory, str, Iterable[Union[IcatCategory, str]]]
) -> Iterable["IcatFieldGroup"]:
if isinstance(categories, (IcatCategory, str)):
categories = {categories}
categories = {
(
category
if isinstance(category, IcatCategory)
else IcatCategory.__members__[category]
)
for category in categories
}
for group in self.iter_groups():
if group.info.category in categories:
yield group
[docs]
def iter_items_with_node_id_suffix(
self, node_id_suffix: IcatNodeIdLike
) -> Iterable["IcatFieldGroup"]:
node_id_suffix = as_icatnodeid(node_id_suffix)
cls = type(self)
for icat_item in self.values():
if icat_item.info.node_id.endswith(node_id_suffix):
yield icat_item
elif isinstance(icat_item, cls):
yield from icat_item.iter_items_with_node_id_suffix(node_id_suffix)
[docs]
def namespace(
self,
getter: Optional[Callable[[Any, str], Any]] = None,
setter: Optional[Callable[[Any, str, Any], None]] = None,
property_decorator: Optional[Callable] = None,
) -> NamespaceWrapper:
adict = dict()
if setter is None:
wrap_setter = None
else:
def wrap_setter(key, value):
field = adict.get(key, None)
if isinstance(field, NamespaceWrapper):
raise AttributeError(f"'{key}' is not an ICAT field")
setter(field.field_name, value)
def wrap_getter(key):
value = adict[key]
if getter is None:
return value
elif isinstance(value, NamespaceWrapper):
return value
else:
return getter(value.info.field_name)
cls = type(self)
for k, v in self.items():
if isinstance(v, cls):
adict[k] = v.namespace(
getter=getter, setter=setter, property_decorator=property_decorator
)
else:
adict[k] = v
return NamespaceWrapper(
property_names=list(adict),
property_decorator=property_decorator,
getter=wrap_getter,
setter=wrap_setter,
)
[docs]
def valid_field_name(self, field_name: str) -> bool:
for field in self.iter_fields():
if field.field_name == field_name:
return True
return False
ICAT_FIELD_NAME = re.compile(r"^\$\{(.+)\}$")
[docs]
def load_group(in_node: ElementTree.Element, **node_attributes) -> IcatFieldGroup:
"""Convert an XML node to an `IcatFieldGroup` instance"""
fields = list()
groups = list()
for element in in_node:
if element.text is None:
match = None
assert element.tag == "link"
continue
match = ICAT_FIELD_NAME.match(element.text)
if match:
assert element.tag != "group"
field_name = match.groups()[0]
fields.append((element.tag, field_name, element.attrib))
else:
assert element.tag == "group"
group_name = element.attrib["groupName"]
groups.append((group_name, element))
info = IcatGroup(**node_attributes)
parent = info.node_id
nodes = dict()
for name, field_name, attrs in fields:
icat_item = IcatField(
name=name,
field_name=field_name,
parent=parent,
nxtype=attrs["NAPItype"],
description=attrs.get("ESRF_description"),
units=attrs.get("units"),
)
nodes[icat_item.name] = icat_item
for group_name, element in groups:
icat_item = load_group(
element, name=group_name, parent=parent, NX_class=element.attrib["NX_class"]
)
nodes[icat_item.info.name] = icat_item
return IcatFieldGroup(info, nodes)
[docs]
def icat_fields_source(url: Optional[str] = None) -> Union[str, StringIO]:
"""Get XML description of all ICAT fields from a URL.
:param url: supports filenames or strings with the scheme prefix "file://",
"xml://" or anything the `requests` library can handle.
:returns: filename or a file object
"""
if not url:
return icat_esrf_definitions.DEFINITIONS_FILE
if re.match(r"[a-z]+://", url) is None:
return url
if url.startswith("file://") or url.startswith("xml://"):
return re.sub(r"^[a-z]+://", "", url)
try:
response = requests.get(url)
response.raise_for_status()
except Exception:
working_url = "https://gitlab.esrf.fr/icat/hdf5-master-config/-/raw/master/src/icat_esrf_definitions/hdf5_cfg.xml"
raise RuntimeError(
"The ICAT definitions URL is wrong. Do not specify a URL to fall back to the"
f" definitions from the 'icat-esrf-definitions' package or use '{working_url}'"
)
return StringIO(response.text)
[docs]
def load_icat_fields(url: Optional[str] = None) -> IcatFieldGroup:
"""Returns an object which allows browsing the ICAT metadata definitions.
:param url: supports filenames or strings with the scheme prefix
"file://", "xml://" or anything the `requests` library can handle.
:returns: a mapping object representing the tree relationship of ICAT fields
"""
tree = ElementTree.parse(icat_fields_source(url=url))
in_node = tree.getroot()
return load_group(in_node, name="", parent=IcatNodeId(), NX_class="NXentry")