Source code for pyicat_plus.client.add_files

import json
from typing import Optional, List
from .messaging import IcatMessagingClient
from . import defaults


[docs] class IcatAddFilesClient: """Client for adding missing files to an existing dataset""" def __init__( self, queue_urls: Optional[List[str]] = None, queue_name: Optional[str] = None, monitor_port: Optional[int] = None, timeout: Optional[float] = None, ): if queue_name is None: queue_name = defaults.ADD_FILES_QUEUE if queue_urls is None: queue_urls = defaults.ADD_FILES_BROKERS self._client = IcatMessagingClient( queue_urls, queue_name, monitor_port=monitor_port, timeout=timeout )
[docs] def disconnect(self): self._client.disconnect()
[docs] def add_files( self, dataset_id: int, ): assert dataset_id, "ICAT requires the datasetId" root = {"datasetId": dataset_id} data = json.dumps(root).encode("utf-8") self._client.send(data)
[docs] def check_health(self): """Raises an exception when not healthy""" self._client.check_health()