Source code for pyicat_plus.client.archive

import json
from enum import Enum
from typing import Optional, List

from .messaging import IcatMessagingClient
from . import defaults


[docs] class StatusType(Enum): ARCHIVING = "archiving" RESTORATION = "restoration"
[docs] class StatusLevel(Enum): INFO = "info" WARNING = "warning" ERROR = "error"
[docs] class IcatArchiveStatusClient: """Client for storing archive and restoration status in ICAT.""" def __init__( self, queue_urls: Optional[List[str]] = None, queue_name: Optional[str] = None, monitor_port: Optional[int] = None, timeout: Optional[float] = None, ): if queue_name is None: queue_name = defaults.ARCHIVE_QUEUE if queue_urls is None: queue_urls = defaults.ARCHIVE_BROKERS self._client = IcatMessagingClient( queue_urls, queue_name, monitor_port=monitor_port, timeout=timeout )
[docs] def disconnect(self): self._client.disconnect()
[docs] def send_archive_status( self, dataset_id: int, type: StatusType, level: StatusLevel, message: str ): assert dataset_id, "ICAT requires the datasetId" assert type, "ICAT requires the type" assert level, "ICAT requires the level" root = { "datasetId": dataset_id, "type": type.value, "level": level.value, "message": message, } data = json.dumps(root).encode("utf-8") self._client.send(data)
[docs] def check_health(self): """Raises an exception when not healthy""" self._client.check_health()