Source code for pyicat_plus.client.metadata

import os
from typing import Optional, List
import xml.etree.ElementTree as etree

from .xmlns import dataset_as_xml
from .xmlns import investigation_as_xml
from .messaging import IcatMessagingClient
from . import defaults


[docs] class IcatMetadataClient: """Client for storing dataset metadata in ICAT.""" def __init__( self, queue_urls: Optional[List[str]] = None, queue_name: Optional[str] = None, monitor_port: Optional[int] = None, timeout: Optional[float] = None, ): if queue_urls is None: defaults.METADATA_BROKERS if queue_name is None: queue_name = defaults.METADATA_QUEUE self._client = IcatMessagingClient( queue_urls, queue_name, monitor_port=monitor_port, timeout=timeout )
[docs] def disconnect(self): self._client.disconnect()
[docs] def send_metadata( self, beamline: str, proposal: str, dataset: str, path: str, metadata: dict, ): root = dataset_as_xml( beamline=beamline, proposal=proposal, dataset=dataset, path=path, metadata=metadata, ) self._client.send(etree.tostring(root))
[docs] def store_metadata( self, filename: str, beamline: str, proposal: str, dataset: str, path: str, metadata: dict, ): root = dataset_as_xml( beamline=beamline, proposal=proposal, dataset=dataset, path=path, metadata=metadata, ) filename, ext = os.path.splitext(filename) if not ext: ext = ".xml" filename += ext dirname = os.path.dirname(filename) if dirname: os.makedirs(dirname, exist_ok=True) with open(filename, "wb") as f: f.write(etree.tostring(root))
[docs] def send_metadata_from_file(self, filename: str): filename, ext = os.path.splitext(filename) if not ext: ext = ".xml" filename += ext with open(filename, "rb") as f: payload = f.read() self._client.send(payload)
[docs] def start_investigation( self, beamline: str, proposal: str, start_datetime=None, end_datetime=None ): root = investigation_as_xml( beamline=beamline, proposal=proposal, start_datetime=start_datetime, end_datetime=end_datetime, ) self._client.reconnect() self._client.send(etree.tostring(root))
[docs] def reschedule_investigation(self, investigation_id: str): root = investigation_as_xml( investigation_id=investigation_id, beamline="", proposal="", ) self._client.reconnect() self._client.send(etree.tostring(root))
[docs] def check_health(self): """Raises an exception when not healthy""" self._client.check_health()