Source code for pyicat_plus.tests.servers.stomp_subscriber

import os
import json
import socket
import logging
import threading
from enum import Enum
from typing import Optional
from json.decoder import JSONDecodeError

import stomp
import xmltodict
from xml.parsers.expat import ExpatError

from .icat_db import IcatDb
from ...utils.log_utils import basic_config
from .activemq_rest_server import ICAT_QUEUES

logger = logging.getLogger("STOMP SUBSCRIBER")

MessageType = Enum("MessageType", "investigation dataset archiving addfiles unknown")


[docs] class MyListener(stomp.ConnectionListener): def __init__(self, conn, icat_data_dir: Optional[str] = None): self.conn = conn self.s_out = None self.icatdb = IcatDb(icat_data_dir) super().__init__()
[docs] def redirect_messages(self, port): if self.s_out is not None: self.s_out.close() self.s_out = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.s_out.connect(("localhost", port)) logger.info(f"Redirect received messages to port {port}")
[docs] def on_message(self, frame): message = frame.body logger.info("received message:\n %s", message) message_type = None try: message = xmltodict.parse( message, process_namespaces=True, namespaces={"http://www.esrf.fr/icat": None}, ) except ExpatError: try: message = json.loads(message) except JSONDecodeError: message_type = MessageType.unknown else: keys = set(message) if keys == {"datasetId", "type", "level", "message"}: message_type = MessageType.archiving elif keys == {"datasetId"}: message_type = MessageType.addfiles else: message_type = MessageType.unknown else: if "investigation" in message: message_type = MessageType.investigation elif "dataset" in message: message_type = MessageType.dataset logger.info("parsed message (%s):\n %s", message_type, message) # Only access specific destinations header = frame.headers icat_queues = ["/queue/" + q for q in ICAT_QUEUES] if header.get("destination") not in icat_queues: return # Only accept valid proposals if message_type in (message_type.investigation, message_type.dataset): if message_type is message_type.investigation: data = message["investigation"] proposal = data["experiment"] else: data = message["dataset"] proposal = data["investigation"] # Convert to backend format file_count = 0 if os.path.exists(data["location"]): file_count = len(os.listdir(data["location"])) data["parameter"].append({"name": "__fileCount", "value": file_count}) data["parameters"] = data.pop("parameter") if proposal and "666" in proposal: logger.info( "Do not register %s for invalid proposal '%s'", message_type, proposal, ) return # Store data if message_type in (message_type.investigation, message_type.dataset): if message_type is message_type.investigation: self.icatdb.start_investigation(data) else: self.icatdb.store_dataset(data) if message_type == message_type.addfiles: dataset_id = message["datasetId"] with self.icatdb.update_dataset(dataset_id) as data: if data is None: logger.error("datasetId %s does not exist", dataset_id) else: file_count = 0 dirname = data["location"] if os.path.exists(dirname): file_count = len(os.listdir(dirname)) for parameter in data["parameters"]: if parameter["name"] == "__fileCount": logger.info( "Update file count for %s to %d", dirname, file_count ) parameter["value"] = file_count break else: logger.info("Add file count for %s to %d", dirname, file_count) data["parameters"].append( {"name": "__fileCount", "value": file_count} ) # Notify that data is valid if self.s_out is not None: self.s_out.sendall(frame.body.encode() + b"\n")
[docs] def main( host=None, port=60001, queue=None, port_out=0, icat_data_dir: Optional[str] = None ): if not host: host = "localhost" if not queue: queue = "/queue/icatIngest" conn = stomp.Connection([(host, port)]) # Listener will run in a different thread listener = MyListener(conn, icat_data_dir) conn.set_listener("", listener) conn.connect("guest", "guest", wait=True) conn.subscribe(destination=queue, id=1, ack="auto") logger.info(f"subscribed to {queue} on STOMP {host}:{port}") if port_out: listener.redirect_messages(port_out) listener.s_out.sendall(b"LISTENING\n") logger.info("CTRL-C to stop") try: threading.Event().wait() finally: logger.info("Exit.")
if __name__ == "__main__": import argparse basic_config( logger=logger, level=logging.DEBUG, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", ) parser = argparse.ArgumentParser( description="STOMP client which subscribes to a STOMP queue and redirect its output to a socket" ) parser.add_argument( "--host", default="localhost", type=str, help="STOMP server host" ) parser.add_argument("--port", default=60001, type=int, help="STOMP server port") parser.add_argument( "--queue", default="/queue/icatIngest", type=str, help="STOMP queue" ) parser.add_argument("--port_out", default=0, type=int, help="output socket") parser.add_argument( "--icat_data_dir", default=None, type=str, help="Dataset directory" ) args = parser.parse_args() main( host=args.host, port=args.port, port_out=args.port_out, queue=args.queue, icat_data_dir=args.icat_data_dir, )