Source code for pyicat_plus.client.investigation

import os
import logging
import datetime
from urllib.parse import urljoin
from typing import Optional, List, Union, Sequence

import requests
import numpy

from ..concurrency import QueryPool
from .interface import DatasetId, Dataset, DatasetMetadata
from ..utils.maxsizedict import MaxSizeDict
from ..utils.url import normalize_url
from . import defaults

logger = logging.getLogger(__name__)

_DEFAULT_START_TIME = datetime.time(hour=8)


[docs] class IcatInvestigationClient: """Client for the investigation part of the ICAT+ REST API. An "investigation" is a time slot assigned to a particular proposal at a particular beamline. REST API docs: https://icatplus.esrf.fr/api-docs/ The ICAT+ server project: https://gitlab.esrf.fr/icat/icat-plus/-/blob/master/README.md """ DEFAULT_SCHEME = "https" def __init__( self, url: str, api_key: Optional[str] = None, timeout: Optional[float] = None, ): if api_key is None: api_key = defaults.ELOGBOOK_TOKEN url = normalize_url(url, default_scheme=self.DEFAULT_SCHEME) path = f"dataacquisition/{api_key}/investigation" query = "?instrumentName={beamline}&investigationName={proposal}" self._investigation_url = urljoin(url, path + query) path = f"dataacquisition/{api_key}/dataset" query = "?investigationId={investigation_id}" self._dataset_url = urljoin(url, path + query) self.raise_error = False self.__query_pool = QueryPool(timeout=timeout, maxqueries=20) self.__investigation_info = MaxSizeDict(maxsize=20) @property def timeout(self): return self.__query_pool.timeout @timeout.setter def timeout(self, value: Optional[float] = None): self.__query_pool.timeout = value def _get_with_response_parsing( self, url: str, timeout: Optional[float] = None ) -> Optional[list]: """Return `None` means the information is not available at this moment. An empty list means that an error has occured on the server side or an actual empty list is returned. """ try: response = self.__query_pool.execute( requests.get, args=(url,), timeout=timeout, default=None ) except requests.exceptions.ReadTimeout: return None except Exception as e: if self.raise_error: raise logger.exception(e) return None if response is None: return None if self.raise_error: response.raise_for_status() elif not response.ok: logger.error("%s: %s", response, response.text) if response.ok: return response.json() else: return list()
[docs] def investigation_info( self, beamline: str, proposal: str, date: Optional[Union[datetime.datetime, datetime.date]] = None, allow_open_ended: bool = True, timeout: Optional[float] = None, ) -> Optional[dict]: """An investigation is defined by a time slot. Find an investigation (if any) for a beamline, proposal and date ("now" when not provided). When there is more than one investigation, select the closest one started before or at the date. If there is no such investigation, get the closest investigation which starts after the date. """ investigation_key = beamline, proposal, date ninfo = self.__investigation_info.get(investigation_key) if ninfo is not None: return ninfo # Get all investigations for this proposal and beamline url = self._investigation_url.format(beamline=beamline, proposal=proposal) investigations = self._get_with_response_parsing(url, timeout=timeout) if investigations is None: return None # not available at the moment # Select investigation investigation = _select_investigation( investigations, date=date, allow_open_ended=allow_open_ended ) if investigation is None: return dict() # no valid investigation found # Normalize information for key in ["parameters", "visitId"]: investigation.pop(key, None) ninfo = dict() ninfo["proposal"] = investigation.pop("name", None) ninfo["beamline"] = investigation.pop("instrument", dict()).get("name", None) ninfo.update(investigation) ninfo["e-logbook"] = ( f"https://data.esrf.fr/investigation/{investigation['id']}/events" ) ninfo["data portal"] = ( f"https://data.esrf.fr/investigation/{investigation['id']}/datasets" ) self.__investigation_info[investigation_key] = ninfo return ninfo
def _investigation_id( self, beamline: str, proposal: str, date: Optional[Union[datetime.datetime, datetime.date]] = None, allow_open_ended: bool = True, timeout: Optional[float] = None, ) -> Optional[int]: info = self.investigation_info( beamline=beamline, proposal=proposal, date=date, allow_open_ended=allow_open_ended, timeout=timeout, ) if info is None: return None return info.get("id", None)
[docs] def registered_dataset_ids( self, beamline: str, proposal: str, date: Optional[Union[datetime.datetime, datetime.date]] = None, allow_open_ended: bool = True, timeout: Optional[float] = None, ) -> Optional[List[DatasetId]]: investigation_id = self._investigation_id( beamline=beamline, proposal=proposal, date=date, allow_open_ended=allow_open_ended, timeout=timeout, ) if investigation_id is None: return None url = self._dataset_url.format(investigation_id=investigation_id) datasets = self._get_with_response_parsing(url, timeout=timeout) if datasets is None: return None # not available at the moment return [self._icat_dataset_to_datasetid(dataset) for dataset in datasets]
[docs] def registered_datasets( self, beamline: str, proposal: str, date: Optional[Union[datetime.datetime, datetime.date]] = None, allow_open_ended: bool = True, timeout: Optional[float] = None, ) -> Optional[List[Dataset]]: investigation_id = self._investigation_id( beamline=beamline, proposal=proposal, date=date, allow_open_ended=allow_open_ended, timeout=timeout, ) if investigation_id is None: return None url = self._dataset_url.format(investigation_id=investigation_id) datasets = self._get_with_response_parsing(url, timeout=timeout) if datasets is None: return None # not available at the moment return [self._icat_dataset_to_dataset(dataset) for dataset in datasets]
@staticmethod def _get_dataset_parameter_value_by_name( dataset: dict, parameter_name: str ) -> Optional[str]: if "parameters" in dataset: for p in dataset["parameters"]: if p["name"] == parameter_name: return p["value"] @classmethod def _icat_dataset_to_datasetid(cls, dataset: dict) -> DatasetId: location = dataset["location"] location, name = os.path.split(location) while location and not name: location, name = os.path.split(location) return DatasetId(name=name, path=dataset["location"]) @classmethod def _icat_dataset_to_dataset(cls, dataset: dict) -> Dataset: dataset_id = cls._icat_dataset_to_datasetid(dataset) file_count = cls._get_dataset_parameter_value_by_name(dataset, "__fileCount") dataset_metadata = DatasetMetadata( file_count=int(0 if file_count is None else file_count) ) return Dataset( dataset_id=dataset_id, icat_dataset_id=dataset["id"], dataset_metadata=dataset_metadata, )
def _select_investigation( investigations: List[dict], date: Optional[Union[datetime.datetime, datetime.date]] = None, allow_open_ended: bool = True, ) -> Optional[dict]: """When `date` is not provided we take it to be "now". This method returns the last investigation that contains the date or has a start/end closest to the date. The investigations are ordered from first to last created. Optionally all open-ended investigations can be ignored. Open-ended investigations have a start date but no end date. These investigations are created by sending dataset or investigation messages with start dates 48h outside any official investigation. :param investigations: unsorted list of investigations :param date: possibly not fully deterministic date :param allow_open_ended: validation option :returns: matching investigation according to the selection rules """ investigations = _filter_valid_investigations( investigations, allow_open_ended=allow_open_ended ) if not investigations: return if len(investigations) == 1: return investigations[0] # Sort investigations by order of importance: database creation time investigations = sorted( investigations, key=lambda investigation: investigation["id"] ) date = _deterministic_date(date, investigations) return _select_investigation_from_sorted_list(investigations, date) def _filter_valid_investigations( investigations: List[dict], allow_open_ended: bool = True ) -> List[dict]: """Filter out investigations with invalid time slots. :param investigations: unsorted list of investigations :param allow_open_ended: validation option :returns: list of valid investigations (preserve order) """ if allow_open_ended: return [ investigation for investigation in investigations if investigation.get("startDate") ] return [ investigation for investigation in investigations if investigation.get("startDate") and investigation.get("endDate") ] def _deterministic_date( date: Optional[Union[datetime.datetime, datetime.date]], investigations: List[dict] ) -> datetime.datetime: """The date might * not exist: take now * have no time zone: add local timezone * be a day without time: select time from the investigations or the default 8 a.m. The resulting date is fully deterministic. :param date: possibly not fully deterministic date :param investigations: sorted by order of importance :returns: fully deterministic date """ if date is None: return datetime.datetime.now().astimezone() if isinstance(date, datetime.datetime): return date.astimezone() # Get the time from the investigations that start on the same day start_dates = [ _tz_aware_fromisoformat(investigation["startDate"]) for investigation in investigations ] is_start_date = [dt.date() == date for dt in start_dates] last_is_start_date = _last_where_true(is_start_date) if last_is_start_date is None: # No investigation starts on the same day start_time = _DEFAULT_START_TIME else: # Last investigation that starts on the same day start_time = start_dates[last_is_start_date].time() date = datetime.datetime.combine(date, start_time) return date.astimezone() def _select_investigation_from_sorted_list( investigations: List[dict], date: datetime.datetime ) -> dict: """ :param investigations: sorted by order of importance :param date: point in time for which we want to find the corresponding investigation :returns: matching investigation according to the selection rules """ # Seconds between date and start/end of each investigation n = len(investigations) startdiff = numpy.zeros(n) enddiff = numpy.full(n, numpy.inf) for i, investigation in enumerate(investigations): startdate = _tz_aware_fromisoformat(investigation["startDate"]) startdiff[i] = (date - startdate).total_seconds() enddate = investigation.get("endDate") if enddate is not None: enddate = _tz_aware_fromisoformat(enddate) enddiff[i] = (enddate - date).total_seconds() # Close open-ended investigation when the next investigation starts (if any) closed_ended = numpy.isfinite(enddiff) starttime_order = numpy.argsort(-startdiff) for i in range(n - 1): idx = starttime_order[i] if not closed_ended[idx]: idx_next = starttime_order[i + 1] enddiff[idx] = -startdiff[idx_next] # Return the last closed investigation which contains the date (if any) contains_date = (startdiff >= 0) & (enddiff >= 0) contains_date_and_closed = contains_date & closed_ended idx = _last_where_true(contains_date_and_closed) if idx is not None: return investigations[idx] # Return the last investigation which contains the date (if any) idx = _last_where_true(contains_date) if idx is not None: return investigations[idx] # Return the last investigation with the closest start or end date startdiff = numpy.abs(startdiff) enddiff = numpy.abs(enddiff) istart = numpy.argmin(startdiff) iend = numpy.argmin(enddiff) min_startdiff = startdiff[istart] min_enddiff = enddiff[iend] if min_startdiff < min_enddiff: return investigations[istart] if min_startdiff > min_enddiff: return investigations[iend] return investigations[max(istart, iend)] def _tz_aware_fromisoformat(date: str) -> datetime.datetime: return datetime.datetime.fromisoformat(date).astimezone() def _last_where_true(bool_arr: Sequence[bool]) -> Optional[int]: indices = numpy.argwhere(bool_arr) if indices.size: return indices[-1][0]