Source code for pyicat_plus.client.messaging

import base64
from typing import Optional, List

import requests
from stompest.config import StompConfig
from stompest.protocol import StompSpec
from stompest.protocol import StompSession
from stompest.sync import Stomp
from stompest.error import StompConnectionError

from ..utils.url import normalize_url


[docs] class IcatMessagingClient: """Client for the ICAT message broker. The message broker is currently ActiveMQ, a message broker using the STOMP protocol. It also has a REST server for monitoring the broker status. """ DEFAULT_SCHEME = "tcp" DEFAULT_PORT = 61613 MONITOR_SCHEME = "http" DEFAULT_MONITOR_PORT = 8778 MONITOR_USER = "user" MONITOR_PWD = "user" def __init__( self, queue_urls: List[str], queue_name: str, monitor_port: Optional[int] = None, timeout: Optional[float] = None, ): urls = [ normalize_url( url, default_scheme=self.DEFAULT_SCHEME, default_port=self.DEFAULT_PORT ) for url in queue_urls ] failover = ",".join(urls) url = f"failover:({failover})?maxReconnectAttempts=3,initialReconnectDelay=250,maxReconnectDelay=1000" self.__max_version = StompSpec.VERSION_1_1 self.__client = Stomp(StompConfig(url, version=self.__max_version)) self._socket_timeout = None if timeout is None: timeout = 1 self._connect_timeout = timeout self.__send_destination = "/queue/" + queue_name self.__send_headers = { "persistent": "true", StompSpec.ACK_HEADER: StompSpec.ACK_CLIENT_INDIVIDUAL, } if not monitor_port: monitor_port = self.DEFAULT_MONITOR_PORT self.__consumer_count_url = f"{self.MONITOR_SCHEME}://{{host}}:{monitor_port}/api/jolokia/read/org.apache.activemq:type=Broker,brokerName=metadata,destinationType=Queue,destinationName={queue_name}/ConsumerCount" self.__jolokia_headers = { "Authorization": b"Basic " + base64.b64encode(f"{self.MONITOR_USER}:{self.MONITOR_PWD}".encode()) }
[docs] def reconnect(self): self.disconnect() self._connect()
[docs] def disconnect(self): try: self.__client.disconnect() except StompConnectionError: self.__client.close(flush=True)
def _connect(self): self.__client.connect( versions=[self.__max_version], connectTimeout=self._socket_timeout, connectedTimeout=self._connect_timeout, ) @property def _connected_client(self): if self.__client.session.state != StompSession.CONNECTED: self._connect() return self.__client @property def _host(self): return self._connected_client._transport.host
[docs] def send(self, data: bytes): try: self._send(data) except StompConnectionError: self.reconnect() self._send(data)
def _send(self, data: bytes): self._connected_client.send( self.__send_destination, body=data, headers=self.__send_headers ) @property def _consumer_count(self): url = self.__consumer_count_url.format(host=self._host) response = requests.get(url, headers=self.__jolokia_headers) if not response.ok: raise RuntimeError( response, "Failed to retrieve the ActiveMQ consumer count" ) response = response.json() return response["value"]
[docs] def check_health(self): """Raises an exception when: - not connected - no message consumers """ state = self._connected_client.session.state if state != StompSession.CONNECTED: raise RuntimeError( "The connection with the message broker is " + str(state).upper() ) if self._consumer_count < 1: raise RuntimeError("The message broker has no consumers")