import os
import logging
import datetime
import dataclasses
from typing import Optional, Dict, Any, List, Set, Generator
from . import path_utils
from ..client.interface import Dataset as IcatDataset
logger = logging.getLogger("ICAT SYNC")
[docs]
@dataclasses.dataclass(eq=True)
class Dataset:
path: str # Path of the directory
proposal: str
beamline: str
name: Optional[str]
raw_root_dir: str
status_reason: List[str]
startdate: Optional[datetime.datetime] = None # From HDF5
enddate: Optional[datetime.datetime] = None # From HDF5
metadata: Optional[dict] = (
None # From HDF5 (times are clipped to the session time slot)
)
icat_dataset: Optional[IcatDataset] = None # From ICAT
def __post_init__(self) -> None:
self.path = path_utils.markdir(self.path)
[docs]
def as_dict(self) -> Dict[str, Any]:
return dataclasses.asdict(self)
[docs]
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> "Dataset":
"""Factory method to create a Dataset instance from a dictionary."""
data = data.copy()
icat_dataset = data.get("icat_dataset")
if icat_dataset is not None:
data["icat_dataset"] = IcatDataset.from_dict(icat_dataset)
return cls(**data)
[docs]
def is_invalid(self) -> bool:
return self.name is None or not os.path.isdir(self.path)
[docs]
def to_be_uploaded(self) -> bool:
return not self.is_registered() and os.path.exists(self.metadata_file)
[docs]
def is_registered(self) -> bool:
return self.icat_dataset is not None
[docs]
def is_non_empty_dir(self) -> bool:
if not os.path.isdir(self.path):
return False
return bool(os.listdir(self.path))
[docs]
def is_registered_without_files(self) -> bool:
return (
self.is_registered()
and self.icat_dataset.dataset_metadata.file_count == 0
and self.is_non_empty_dir()
)
[docs]
def get_status(self) -> str:
# Decreasing order of registration:
if self.is_registered_without_files():
return "registered_without_files"
if self.is_registered():
return "registered"
if self.to_be_uploaded():
return "not_uploaded"
# Not registered of invalid
if self.is_invalid() or not self.has_metadata():
return "invalid"
return "unregistered"
@property
def metadata_file(self) -> str:
"""File stored by Bliss when ending the proposal without confirmation of registration"""
return os.path.join(
self.raw_root_dir, "__icat__", f"{path_utils.basename(self.path)}.xml"
)
[docs]
@dataclasses.dataclass(eq=True)
class IcatInvestigation:
id: str
url: str
search_url: str
registered_datasets: List[IcatDataset]
startdate: Optional[datetime.datetime] = None
enddate: Optional[datetime.datetime] = None
[docs]
def as_dict(self) -> Dict[str, Any]:
return dataclasses.asdict(self)
[docs]
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> "IcatInvestigation":
"""Factory method to create an IcatInvestigation instance from a dictionary."""
data = data.copy()
data["registered_datasets"] = [
IcatDataset.from_dict(dataset_data)
for dataset_data in data["registered_datasets"]
]
return cls(**data)
@property
def has_timeslot(self) -> bool:
return self.startdate is not None
@property
def open_ended(self) -> bool:
return self.enddate is None
@property
def started(self) -> bool:
if not self.has_timeslot:
return False
now = datetime.datetime.now().astimezone()
startdate = self.startdate - datetime.timedelta(days=2)
return now >= startdate
def _start_from_now(self) -> str:
if not self.has_timeslot:
return ""
now = datetime.datetime.now().astimezone()
if now >= self.startdate:
return f"(Started {now - self.startdate} ago)"
else:
return f"(Will start in {self.startdate - now})"
@property
def ended(self) -> bool:
if self.open_ended:
return False
now = datetime.datetime.now().astimezone()
enddate = self.enddate + datetime.timedelta(days=2)
return now >= enddate
def _end_from_now(self) -> str:
if self.open_ended:
return ""
now = datetime.datetime.now().astimezone()
if now >= self.enddate:
return f"(Ended {now - self.enddate} ago)"
else:
return f"(Will end in {self.enddate - now})"
@property
def ongoing(self) -> bool:
return self.started and not self.ended
@property
def duration(self) -> Optional[datetime.timedelta]:
if not self.open_ended:
return self.enddate - self.startdate
@property
def timeslot(self) -> str:
if self.open_ended:
return f"{self.startdate} - ???"
return f"{self.startdate} - {self.enddate} ({self.duration})"
[docs]
def pprint(self) -> None:
logger.debug("Investigation ID: %s", self.id)
logger.debug(" URL: %s", self.url)
logger.debug(" Search URL: %s", self.search_url)
if not self.has_timeslot:
logger.debug("Invalid investigation (no start date)")
return
logger.debug(" Start time: %s %s", self.startdate, self._start_from_now())
if self.open_ended:
logger.debug(" Duration:", "open-ended (not official)")
return
logger.debug(" End time: %s %s", self.enddate, self._end_from_now())
if self.ongoing:
logger.debug(" Duration: %s (WARNING: ONGOING)", self.duration)
else:
logger.debug(" Duration: %s", self.duration)
[docs]
@dataclasses.dataclass(eq=True)
class ExperimentalSession:
session_dir: str
raw_root_dir: str # RAW_DATA directory
raw_data_format: str # e.g. "esrfv3"
proposal: str
beamline: str
session: str
startdate: datetime.date # from the directory name
search_url: str
datasets: Dict[str, List[Dataset]] # from browsing the root directory
icat_investigation: Optional[IcatInvestigation] = None
DATASET_STATUSES = (
"unregistered",
"not_uploaded",
"registered",
"invalid",
"registered_without_files",
)
def __post_init__(self) -> None:
self.datasets = {
status: self.datasets.get(status, list())
for status in self.DATASET_STATUSES
}
self.session_dir = path_utils.markdir(self.session_dir)
self.raw_root_dir = path_utils.markdir(self.raw_root_dir)
[docs]
def as_dict(self) -> Dict[str, Any]:
return dataclasses.asdict(self)
[docs]
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> "ExperimentalSession":
data = data.copy()
data["datasets"] = {
key: [Dataset.from_dict(dataset_data) for dataset_data in dataset_list]
for key, dataset_list in data["datasets"].items()
}
icat_investigation = data.get("icat_investigation")
if icat_investigation is not None:
data["icat_investigation"] = IcatInvestigation.from_dict(icat_investigation)
return cls(**data)
[docs]
@classmethod
def allow_unsupervised_upload(cls, dataset_status: str) -> bool:
if dataset_status not in cls.DATASET_STATUSES:
raise ValueError(
f"'{dataset_status}' is not as valid dataset status. Valid statuses are {list(cls.DATASET_STATUSES)}"
)
return dataset_status == "registered_without_files"
@property
def in_icat_investigation(self) -> Optional[bool]:
if self.icat_investigation is None:
return
if not self.icat_investigation.has_timeslot:
return False
inside_timeslot = self.startdate >= self.icat_investigation.startdate.date()
if not self.icat_investigation.open_ended:
inside_timeslot &= self.startdate <= self.icat_investigation.enddate.date()
return inside_timeslot
[docs]
def add_dataset(self, dataset: Dataset) -> None:
self.datasets[dataset.get_status()].append(dataset)
[docs]
def iter_datasets(self) -> Generator[Dataset, None, None]:
for lst in self.datasets.values():
yield from lst
[docs]
def dataset_statuses(self) -> Set[str]:
return set(status for status, lst in self.datasets.items() if lst)
[docs]
def dataset_status_counts(self) -> Dict[str, int]:
return {status: len(lst) for status, lst in self.datasets.items()}
[docs]
def tabular_data(self) -> Dict[str, str]:
data = self._init_tabular_data()
self._add_icat_tabular_data(data)
return data
def _init_tabular_data(self) -> Dict[str, str]:
status_counts = {
status: str(counts)
for status, counts in self.dataset_status_counts().items()
}
return {
"beamline": self.beamline,
"proposal": self.proposal,
"session": str(self.startdate),
"url": "",
"search_url": self.search_url,
"path": self.raw_root_dir,
**status_counts,
"timeslot": "",
}
def _add_icat_tabular_data(self, data: Dict[str, str]) -> None:
if self.icat_investigation is None:
return
data["url"] = self.icat_investigation.url
data["search_url"] = self.icat_investigation.search_url
if self.in_icat_investigation:
data["session"] += " (NOT INSIDE TIMESLOT)"
if not self.icat_investigation.started:
data["timeslot"] = f"FUTURE: {self.icat_investigation.timeslot}"
elif self.icat_investigation.ended:
data["timeslot"] = f"FINISHED: {self.icat_investigation.timeslot}"
elif self.icat_investigation.open_ended:
data["timeslot"] = f"INDEFINITE: {self.icat_investigation.timeslot}"
elif self.icat_investigation.ongoing:
data["timeslot"] = f"ONGOING: {self.icat_investigation.timeslot}"
else:
raise RuntimeError("Unknown session time state")
return data
[docs]
def pprint(self) -> None:
separator = "--------------------------------"
logger.debug("")
logger.debug(separator)
self._pprint_datasets()
logger.debug("")
self._pprint_raw_session()
logger.debug("")
self._pprint_icat_investigation()
logger.debug("")
logger.debug(separator)
logger.debug("")
def _pprint_raw_session(self) -> None:
logger.debug("Directory: %s", self.raw_root_dir)
if self.in_icat_investigation:
logger.debug(" Start time: %s", self.startdate)
else:
logger.debug(
" Start time: %s (WARNING: not inside time slot!)", self.startdate
)
def _pprint_icat_investigation(self) -> None:
if self.icat_investigation is None:
logger.debug("No corresponding investigation")
logger.debug(" Search URL: %s", self.search_url)
else:
self.icat_investigation.pprint()
def _pprint_datasets(self) -> None:
_print_dataset_summary("Registered datasets", self.datasets["registered"])
_print_dataset_summary("Invalid datasets", self.datasets["invalid"])
_print_dataset_summary("Unregistered datasets", self.datasets["unregistered"])
_print_dataset_summary("Datasets not uploaded", self.datasets["not_uploaded"])
_print_dataset_summary(
"Registered datasets without files",
self.datasets["registered_without_files"],
)
logger.debug("")
if (
self.datasets["invalid"]
or self.datasets["not_uploaded"]
or self.datasets["unregistered"]
or self.datasets["registered_without_files"]
):
logger.debug("Datasets (TODO):")
elif not self.datasets["registered"]:
logger.debug("Datasets (EMPTY):")
else:
logger.debug("Datasets (OK):")
logger.debug(" %s registered", len(self.datasets["registered"]))
logger.debug(" %s invalid", len(self.datasets["invalid"]))
logger.debug(" %s not uploaded", len(self.datasets["not_uploaded"]))
logger.debug(" %s unregistered", len(self.datasets["unregistered"]))
logger.debug(
" %s registered without files",
len(self.datasets["registered_without_files"]),
)
def _print_dataset_summary(title: str, datasets: List[Dataset]):
if not datasets:
return
logger.debug("")
logger.debug("%s:", title)
for dataset in datasets:
logger.debug(" %s", dataset.path)
for s in dataset.status_reason:
logger.debug(" %s", s)