Source code for pyicat_plus.tests.servers.icat_session

import os
import json
import logging
from uuid import uuid4
from typing import Optional


logger = logging.getLogger("ICATPLUS SERVER")


[docs] class IcatSession: def __init__(self, root_dir: Optional[str] = None): if root_dir is None: root_dir = "." if root_dir: os.makedirs(root_dir, exist_ok=True) self._root_dir = root_dir self._session_file = os.path.join(root_dir, "session.json")
[docs] def login(self, credentials: dict) -> dict: if credentials["password"] != "correct": raise PermissionError("Password incorrect") session_id = str(uuid4()) session_data = {"sessionId": session_id} with open(self._session_file, "w") as f: json.dump(session_data, f) logger.info("Login session id = %s", session_id) return session_data
@property def session_data(self) -> Optional[dict]: if os.path.exists(self._session_file): with open(self._session_file, "r") as f: return json.load(f) @property def session_id(self) -> Optional[str]: data = self.session_data if data: session_id = data["sessionId"] else: session_id = None logger.info("Current session id = %s", session_id) return session_id
[docs] def is_allowed(self, session_id: str) -> bool: logger.info("Validate session id = %s", session_id) return session_id == self.session_id