Source code for pyicat_plus.tests.servers.stomp_publisher
import socket
import logging
from stompest.config import StompConfig
from stompest.protocol import StompSpec
from stompest.sync import Stomp
from ...utils.log_utils import basic_config
logger = logging.getLogger("STOMP PUBLISHER")
[docs]
def read_socket(host, port):
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect((host, port))
buffer = b""
try:
while True:
buffer += sock.recv(16384)
if buffer:
out, sep, buffer = buffer.rpartition(b"\n")
if sep:
for bdata in out.split(b"\n"):
yield bdata
finally:
sock.close()
[docs]
def main(host=None, port=60001, queue=None, port_in=0):
if not host:
host = "localhost"
if not queue:
queue = "/queue/icatIngest"
queueURLs = [f"{host}:{port}"]
cfgURL = "failover:(tcp://" + ",tcp://".join(queueURLs) + ")"
cfgURL += "?maxReconnectAttempts=3,initialReconnectDelay=250,maxReconnectDelay=1000"
client = Stomp(StompConfig(cfgURL, version=StompSpec.VERSION_1_1))
client.connect(
versions=[StompSpec.VERSION_1_1], heartBeats=(0, 0), connectedTimeout=1
)
header = {
"persistent": "true",
StompSpec.ACK_HEADER: StompSpec.ACK_CLIENT_INDIVIDUAL,
}
if port_in:
for body in read_socket("localhost", port_in):
client.send(queue, body=body, headers=header)
else:
while True:
body = input(f"Message to send to '{queue}': ")
client.send(queue, body=body.encode(), headers=header)
if __name__ == "__main__":
import argparse
basic_config(
logger=logger,
level=logging.DEBUG,
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
)
parser = argparse.ArgumentParser(
description="Redirect socket input to a STOMP queue"
)
parser.add_argument(
"--host", default="localhost", type=str, help="STOMP server host"
)
parser.add_argument("--port", default=60001, type=int, help="STOMP server port")
parser.add_argument(
"--queue", default="/queue/icatIngest", type=str, help="STOMP queue"
)
parser.add_argument("--port_in", default=0, type=int, help="input socket")
args = parser.parse_args()
main(host=args.host, port=args.port, port_in=args.port_in, queue=args.queue)