Source code for pyicat_plus.client.elogbook

import base64
import socket
import logging
import datetime
import mimetypes
import warnings
from enum import Enum
from urllib.parse import urljoin
from typing import Optional, Iterable, List, Tuple

import requests

from ..utils.url import normalize_url
from .. import __version__
from . import defaults

logger = logging.getLogger(__name__)


[docs] class IcatElogbookClient: """Client for the e-logbook part of the ICAT+ REST API. REST API docs: https://icatplus.esrf.fr/api-docs/ The ICAT+ server project: https://gitlab.esrf.fr/icat/icat-plus/-/blob/master/README.md """ DEFAULT_SCHEME = "https" def __init__( self, url: str, api_key: Optional[str] = None, timeout: Optional[float] = None, **payload, ): if api_key is None: api_key = defaults.ELOGBOOK_TOKEN url = normalize_url(url, default_scheme=self.DEFAULT_SCHEME) path = f"dataacquisition/{api_key}/notification" self._message_url = urljoin(url, path) path = f"dataacquisition/{api_key}/base64" self._data_url = urljoin(url, path) self._init_payload = payload self._init_payload.setdefault("machine", socket.getfqdn()) self._init_payload.setdefault("software", "pyicat-plus_v" + __version__) self.raise_error = True if timeout is None: timeout = 0.1 self.timeout = timeout def _merge_payloads(self, message_payload: dict, call_payload: dict) -> dict: payloads = self._sorted_payloads(message_payload, call_payload) result = {k: v for payload in payloads for k, v in payload.items()} tags = self._merge_payload_tags(*payloads) if tags: result.pop("tags", None) result["tag"] = tags return result def _sorted_payloads(self, message_payload: dict, call_payload: dict) -> List[dict]: """Sorted by increasing priority""" return [message_payload, self._init_payload, call_payload] def _merge_payload_tags(self, *payloads: Iterable[dict]) -> List[dict]: """The payload tags can be eithers a list of strings or a list of dictionaries. The return value are the merged tags as a list of dictionaries. """ names = set() tags = list() for payload in payloads: ptags = payload.get("tag", list()) + payload.get("tags", list()) for tag in ptags: if isinstance(tag, str): if tag in names: continue names.add(tag) tags.append({"name": tag}) else: if tag["name"] in names: continue names.add(tag["name"]) tags.append(tag) return tags def _post_with_payload( self, url: str, message_payload: dict, call_payload: dict ) -> None: payload = self._merge_payloads(message_payload, call_payload) payload.setdefault( "creationDate", datetime.datetime.now().astimezone().isoformat() ) try: response = requests.post(url, json=payload, timeout=self.timeout) except requests.exceptions.ReadTimeout: return # we have no confirmation that the call succeeded except Exception as e: if self.raise_error: raise logger.exception(e) return if self.raise_error: response.raise_for_status() elif not response.ok: logger.error("%s: %s", response, response.text)
[docs] def send_message( self, message: str, message_type: Optional[str] = None, editable: Optional[bool] = None, formatted: Optional[bool] = None, mimetype: Optional[str] = None, beamline: Optional[str] = None, proposal: Optional[str] = None, dataset: Optional[str] = None, **call_payload, ): url = self._compose_url( url=self._message_url, beamline=beamline, proposal=proposal ) message_payload = self._encode_message( message, message_type=message_type, editable=editable, formatted=formatted, mimetype=mimetype, dataset=dataset, ) self._post_with_payload(url, message_payload, call_payload)
[docs] def send_binary_data( self, data: bytes, mimetype: str, beamline: Optional[str] = None, proposal: Optional[str] = None, **call_payload, ): url = self._compose_url( url=self._data_url, beamline=beamline, proposal=proposal ) message_payload = self._encode_binary_data(data, mimetype=mimetype) self._post_with_payload(url, message_payload, call_payload)
@staticmethod def _compose_url( url: str, beamline: Optional[str] = None, proposal: Optional[str] = None ): query = {} if beamline: query["instrumentName"] = beamline if proposal: query["investigationName"] = proposal query = "&".join([f"{k}={v}" for k, v in query.items()]) return f"{url}?{query}"
[docs] def send_text_file( self, filename: str, beamline: Optional[str] = None, proposal: Optional[str] = None, dataset: Optional[str] = None, message_type: Optional[str] = None, editable: Optional[bool] = None, formatted: Optional[bool] = None, mimetype: Optional[str] = None, **payload, ): with open(filename, "r") as f: message = f.read() self.send_message( message, message_type=message_type, proposal=proposal, beamline=beamline, dataset=dataset, editable=editable, formatted=formatted, mimetype=mimetype, **payload, )
[docs] def send_binary_file( self, filename: str, beamline: Optional[str] = None, proposal: Optional[str] = None, **payload, ): with open(filename, "rb") as f: data = f.read() mimetype, _ = mimetypes.guess_type(filename, strict=True) self.send_binary_data( data, mimetype=mimetype, beamline=beamline, proposal=proposal, **payload )
def _encode_message( self, message: str, message_type: Optional[str] = None, editable: Optional[bool] = None, formatted: Optional[bool] = None, mimetype: Optional[str] = None, dataset: Optional[str] = None, ) -> dict: message_category, message_type = _message_category_and_type( message_type=message_type, editable=editable, formatted=formatted ) if mimetype is None: mimetype = "text/plain" try: format = _MessageFormatMapping[mimetype] except KeyError: raise ValueError( f"mime type '{mimetype}' is not supported ({list(_MessageFormatMapping)})" ) from None message = { "type": message_type.name, "category": message_category.name, "content": [{"format": format.name, "text": message}], } if dataset: message["datasetName"] = dataset return message def _encode_binary_data( self, data: bytes, mimetype: Optional[str] = None, ) -> dict: if not mimetype: # arbitrary binary data mimetype = "application/octet-stream" data_header = f"data:{mimetype};base64," data_blob = base64.b64encode(data).decode("latin-1") return {"base64": data_header + data_blob}
_MessageCategory = Enum("MessageCategory", "debug info error commandLine comment") _MessageType = Enum("MessageType", "annotation notification") _MessageFormat = Enum("MessageType", "plainText html") _MessageCategoryMapping = { "debug": _MessageCategory.debug, "info": _MessageCategory.info, "warning": _MessageCategory.error, "warn": _MessageCategory.error, "error": _MessageCategory.error, "critical": _MessageCategory.error, "fatal": _MessageCategory.error, "command": _MessageCategory.commandLine, "comment": _MessageCategory.comment, } _MessageFormatMapping = { "text/plain": _MessageFormat.plainText, "text/html": _MessageFormat.html, } def _message_category_and_type( message_type: Optional[str] = None, editable: Optional[bool] = None, formatted: Optional[bool] = None, ) -> Tuple[_MessageCategory, _MessageType]: """Derive the ICAT message category from the API message type. The ICAT message types are: - "annotation" (default for comments): editable and unformatted message - "notification" (default for non-comments): formatted and not editable Only comments can be editable and a message cannot be editable and formatted at the same time. """ if message_type is None: message_type = "comment" try: category = _MessageCategoryMapping[message_type.lower()] except KeyError: raise ValueError( f"'{message_type}' is not a valid e-logbook message type" ) from None if category != _MessageCategory.comment: # Non-comments cannot be editable if editable: warnings.warn( f"message type '{message_type}' cannot be editable", UserWarning ) editable = None # Non-comments cannot be unformatted if formatted is not None and not formatted: warnings.warn( f"message type '{message_type}' cannot be unformatted", UserWarning ) formatted = None # A message cannot be editable and formatted at the same time or uneditable and unformatted if formatted == editable and formatted is not None: if formatted: warnings.warn( f"message type '{message_type}' cannot be editable and formatted at the same time", UserWarning, ) else: warnings.warn( f"message type '{message_type}' cannot be uneditable and unformatted at the same time", UserWarning, ) formatted = None editable = None # Editability is specified if editable is not None: if editable: return category, _MessageType.annotation return category, _MessageType.notification # Formatability is specified if formatted is not None: if formatted: return category, _MessageType.notification return category, _MessageType.annotation # By default comments are annotations and the rest are notifications if category == _MessageCategory.comment: return category, _MessageType.annotation return category, _MessageType.notification