Source code for pyicat_plus.tests.servers.activemq_rest_server

import time
import datetime
import socketserver
import logging

from .utils import ReuseAddrTCPServer
from ...utils.log_utils import basic_config

logger = logging.getLogger("ACTIVEMQ REST SERVER")

ICAT_QUEUES = [
    "icatIngest",
    "icatArchiveRestoreStatus",
    "icatUpdateDatasetMetadata",
    "icatDataFiles",
]


[docs] class MyTCPRequestHandler(socketserver.StreamRequestHandler):
[docs] def handle(self): request = self._read_request() expected_requests = list( map( lambda q: f"GET /api/jolokia/read/org.apache.activemq:type=Broker,brokerName=metadata,destinationType=Queue,destinationName={q}/ConsumerCount".encode( "utf-8" ), ICAT_QUEUES, ) ) if len([e for e in expected_requests if e in request]) > 0: logger.info(f"Send response to {self.client_address[0]}") self._send_response() elif request: logger.info(f"Unknown request\n {request}") raise RuntimeError("Unknown request")
def _read_request(self): buff = bytearray(16384) request = b"" try: n = self.rfile.readinto1(buff) request = bytes(buff[0:n]) except Exception as e: raise RuntimeError("Error reading request") from e return request def _send_response(self): now = datetime.datetime.now().astimezone() t1 = now + datetime.timedelta(hours=5) out = ( b"HTTP/1.1 200 OK\r\nContent-Type: text/plain;charset=UTF-8\r\nCache-Control: no-cache\r\nPragma: no-cache\r\nDate: " + now.strftime("%a, %d %b %Y %H:%M:%S GTM").encode() + b"\r\nExpires: " + t1.strftime("%a, %d %b %Y %H:%M:%S GTM").encode() + b'\r\nConnection: close\r\nServer: Jetty(7.6.9.v20130131)\r\n\r\n{"timestamp":' + str(int(time.time())).encode() + b',"status":200,"request":{"mbean":"org.apache.activemq:brokerName=metadata,destinationName=icatIngest,destinationType=Queue,type=Broker","attribute":"ConsumerCount","type":"read"},"value":6}' ) self.wfile.write(out)
[docs] def main(port=8778): # Create a TCP Server instance aServer = ReuseAddrTCPServer(("localhost", port), MyTCPRequestHandler) # Listen forever logger.info("CTRL-C to stop") try: aServer.serve_forever() finally: logger.info("Exit.")
if __name__ == "__main__": import argparse basic_config( logger=logger, level=logging.DEBUG, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", ) parser = argparse.ArgumentParser(description="ActiveMQ REST server") parser.add_argument("--port", default=8778, type=int, help="server port") args = parser.parse_args() main(port=args.port)