Source code for pyicat_plus.metadata.nexus

"""
Nexus structure from ICAT metadata
"""

from typing import Optional
from collections.abc import Iterable
import logging

from .definitions import load_icat_fields
from .definitions import IcatFieldGroup


logger = logging.getLogger(__name__)


[docs] def as_nxtype(value, nxtype): """Nexus convert to the Nexus type as defined in the ICAT definitions""" if nxtype is None: return value elif nxtype == "NX_CHAR": return value # TODO: the ICAT definitions abuse NX_CHAR if isinstance(value, str): return value elif isinstance(value, bytes): return str(value) elif isinstance(value, Iterable): return [str(s) for s in value] else: return str(value) elif nxtype == "NX_DATE_TIME": if not isinstance(value, str): value = value.isoformat() return str(value) elif nxtype == "NX_INT": return int(value) elif nxtype == "NX_FLOAT": return float(value) else: return value
[docs] def create_nxtreedict( metadata: dict, icat_fields: Optional[IcatFieldGroup] = None, add_icat_attrs: bool = False, ): if icat_fields is None: icat_fields = load_icat_fields() nxtreedict = dict() for field_name, field_value in metadata.items(): field = icat_fields.get_field_with_field_name(field_name) if field is None: logger.warning(f"{field_name} not a valid ICAT field") continue # Prepare parent groups group = icat_fields adict = nxtreedict if group.info.NX_class: adict["@NX_class"] = group.info.NX_class for name in field.parent: group = group[name] adict = adict.setdefault(name, dict()) if group.info.NX_class: adict["@NX_class"] = group.info.NX_class # Add value adict[field.name] = as_nxtype(field_value, field.nxtype) if add_icat_attrs: adict[f"{field.name}@icat_field"] = field.field_name if field.units: adict[f"{field.name}@units"] = field.units return nxtreedict