import os
import sys
import logging
import argparse
import datetime
from tqdm import tqdm
from time import sleep
from glob import glob
from typing import Optional, Tuple, Generator
from ..utils import raw_data
from ..utils import path_utils
from ..utils import sync_types
from ..client.main import IcatClient
from ..client.bliss import get_icat_client
from ..utils.log_utils import basic_config
from ..utils.sync_store import ExperimentalSessionStore
logger = logging.getLogger("ICAT SYNC")
[docs]
def sync_raw(
icat_client: IcatClient,
beamline: Optional[str] = None,
proposal: Optional[str] = None,
session: Optional[str] = None,
root_dir: Optional[str] = None,
dry_run: bool = True,
unsupervised: bool = False,
raw_data_format: str = "esrfv3",
save_dir: Optional[str] = None,
cache_dir: Optional[str] = None,
invalidate_cache: bool = False,
) -> None:
with ExperimentalSessionStore(
cache_dir=cache_dir,
save_dir=save_dir,
raw_data_format=raw_data_format,
invalidate_cache=invalidate_cache,
) as exp_session_store:
if beamline is None:
beamline = "*"
if proposal is None:
proposal = "*"
if session is None:
session = "*"
session_filter = raw_data.get_session_dir(
proposal,
beamline,
session,
root_dir=root_dir,
raw_data_format=raw_data_format,
)
logger.info("Discovering sessions %s ...", session_filter)
session_dirs = glob(path_utils.markdir(session_filter))
logger.info("Discovering sessions finished.")
logger.info("Parse %d session directories ...", len(session_dirs))
if dry_run and logger.getEffectiveLevel() > logging.DEBUG:
session_dirs = tqdm(session_dirs)
for session_dir in session_dirs:
proposal_from_dir, beamline_from_dir, session_from_dir = (
raw_data.parse_session_dir(session_dir, raw_data_format)
)
if not proposal_from_dir:
continue
session = path_utils.basename(session_dir)
try:
_ = sync_session(
icat_client,
proposal_from_dir,
beamline_from_dir,
session_from_dir,
dry_run=dry_run,
unsupervised=unsupervised,
raw_data_format=raw_data_format,
exp_session_store=exp_session_store,
)
except Exception as e:
raise RuntimeError(
f"Failed syncing beamline={beamline_from_dir}, proposal={proposal_from_dir}, session={session_from_dir}"
) from e
logger.info("Session directories parsed.")
[docs]
def sync_session(
icat_client: IcatClient,
proposal: str,
beamline: str,
session: str,
root_dir: Optional[str] = None,
dry_run: bool = True,
unsupervised: bool = False,
allow_open_ended: bool = True,
raw_data_format: str = "esrfv3",
exp_session_store: Optional[ExperimentalSessionStore] = None,
) -> Optional[sync_types.ExperimentalSession]:
remove_from_cache_afterwards = False
exp_session = None
try:
# Experimental session from cache
if exp_session_store is not None:
session_dir = raw_data.get_session_dir(
proposal,
beamline,
session,
root_dir=root_dir,
raw_data_format=raw_data_format,
)
exp_session = exp_session_store.get_session(session_dir)
# Experimental session from disk and ICAT
if exp_session is None:
exp_session = _discover_exp_session(
icat_client,
proposal,
beamline,
session,
root_dir=root_dir,
raw_data_format=raw_data_format,
)
if exp_session is None:
# Not an experimental session
return exp_session
# Experimental session from ICAT
if exp_session.icat_investigation is None:
_discover_icat_investigation(
icat_client,
exp_session,
allow_open_ended=allow_open_ended,
raw_data_format=raw_data_format,
)
# Print summary of the experimental session
exp_session.pprint()
# Register datasets that failed to be registered
for dataset in _iter_datasets_to_upload(
exp_session, "not_uploaded", dry_run, unsupervised
):
remove_from_cache_afterwards = True
icat_client.store_dataset_from_file(dataset.metadata_file)
sleep(0.1) # do not overload ICAT
# Register datasets that were not even attempted to be registered
for dataset in _iter_datasets_to_upload(
exp_session, "unregistered", dry_run, unsupervised
):
remove_from_cache_afterwards = True
icat_client.store_dataset(
beamline=dataset.beamline,
proposal=dataset.proposal,
dataset=dataset.name,
path=dataset.path,
metadata=dataset.metadata,
)
sleep(0.1) # do not overload ICAT
# Add missing data files for registered datasets without data
for dataset in _iter_datasets_to_upload(
exp_session, "registered_without_files", dry_run, unsupervised
):
remove_from_cache_afterwards = True
icat_client.add_files(dataset.icat_dataset.icat_dataset_id)
sleep(0.1) # do not overload ICAT
finally:
if exp_session_store and exp_session:
if remove_from_cache_afterwards:
exp_session_store.remove_session(exp_session)
else:
exp_session_store.add_session(exp_session)
return exp_session
def _iter_datasets_to_upload(
exp_session: sync_types.ExperimentalSession,
dataset_status: str,
dry_run: bool,
unsupervised: bool,
verbose: bool = False,
) -> Generator[sync_types.Dataset, None, None]:
datasets = exp_session.datasets[dataset_status]
if not datasets:
return
action = f'Upload "{dataset_status}" dataset'
if dry_run:
do_yield = False
do_print = verbose
elif unsupervised:
if exp_session.allow_unsupervised_upload(dataset_status):
do_yield = True
do_print = True
else:
do_yield = False
do_print = verbose
else:
if _ask_execute_confirmation(exp_session, action + "s"):
do_yield = True
do_print = True
else:
do_yield = False
do_print = verbose
if not do_yield and not do_print:
return
for dataset in datasets:
if do_print:
_print_dataset_details(dataset, action)
if do_yield:
yield dataset
def _ask_execute_confirmation(
exp_session: sync_types.ExperimentalSession, action: str
) -> bool:
logger.info("Search URL: %s", exp_session.icat_investigation.search_url)
result = input(f"{action}? (y/[n])")
return result.lower() in ("y", "yes")
def _print_dataset_details(dataset: sync_types.Dataset, title: str):
logger.debug("-> %s: %s", title, dataset.path)
logger.debug(" Name: %s", dataset.name)
if dataset.metadata:
logger.debug(" Sample: %s", dataset.metadata["Sample_name"])
dataset_startdate = dataset.startdate
dataset_enddate = dataset.enddate
logger.debug(" Start time: %s", dataset_startdate)
logger.debug(" End time: %s", dataset_enddate)
logger.debug(" Duration: %s", dataset_enddate - dataset_startdate)
def _discover_exp_session(
icat_client: IcatClient,
proposal: str,
beamline: str,
session: str,
root_dir: Optional[str] = None,
raw_data_format: str = "esrfv3",
) -> Optional[sync_types.ExperimentalSession]:
"""Discovery the experimental session on disk and in ICAT"""
try:
session_startdate_fromdir = datetime.datetime.strptime(session, "%Y%m%d").date()
except ValueError:
# Not a session directory
return
session_dir = raw_data.get_session_dir(
proposal, beamline, session, root_dir=root_dir, raw_data_format=raw_data_format
)
raw_root_dir = raw_data.get_raw_data_dir(
session_dir, raw_data_format=raw_data_format
)
return sync_types.ExperimentalSession(
session_dir=session_dir,
raw_root_dir=raw_root_dir,
raw_data_format=raw_data_format,
proposal=proposal,
beamline=beamline,
session=session,
startdate=session_startdate_fromdir,
search_url=_data_portal_search_url(proposal, beamline),
datasets=dict(),
)
def _discover_icat_investigation(
icat_client: IcatClient,
exp_session: sync_types.ExperimentalSession,
allow_open_ended: bool = True,
raw_data_format: str = "esrfv3",
) -> None:
# Get the ICAT investigation related to the raw data directory
investigation = icat_client.investigation_info(
exp_session.beamline,
exp_session.proposal,
date=exp_session.startdate,
allow_open_ended=allow_open_ended,
)
if not investigation:
return
if not investigation.get("startDate"):
return
icat_investigation = sync_types.IcatInvestigation(
id=investigation["id"],
url=investigation["data portal"],
search_url=_data_portal_search_url(
investigation["proposal"], investigation["beamline"]
),
registered_datasets=list(),
)
icat_investigation.startdate = _as_datetime(investigation["startDate"])
if investigation.get("endDate"):
icat_investigation.enddate = _as_datetime(investigation["endDate"])
# Add dataset information
exp_session.icat_investigation = icat_investigation
_discover_icat_datasets(icat_client, exp_session, allow_open_ended=allow_open_ended)
_discover_raw_datasets(exp_session, raw_data_format=raw_data_format)
def _discover_icat_datasets(
icat_client: IcatClient,
exp_session: sync_types.ExperimentalSession,
allow_open_ended: bool = True,
) -> None:
logger.info("Extracting metadata from ICAT for %s ...", exp_session.raw_root_dir)
exp_session.icat_investigation.registered_datasets = (
icat_client.registered_datasets(
exp_session.beamline,
exp_session.proposal,
date=exp_session.startdate,
allow_open_ended=allow_open_ended,
)
)
logger.info("Extracting metadata finished.")
def _discover_raw_datasets(
exp_session: sync_types.ExperimentalSession,
raw_data_format: str = "esrfv3",
) -> None:
"""Browse all datasets from the raw data directory and compare with
the datasets registered with the ICAT investigation"""
dataset_filters = raw_data.get_dataset_filters(
exp_session.raw_root_dir, raw_data_format=raw_data_format
)
if len(dataset_filters) > 1:
logger.info(
"Extracting metadata from HDF5 for %d filters ...", len(dataset_filters)
)
else:
logger.info("Extracting metadata from HDF5 for %s ...", dataset_filters[0])
registered_datasets = {
path_utils.markdir(dset.dataset_id.path): dset
for dset in exp_session.icat_investigation.registered_datasets
}
dataset_dirs = []
for pattern in dataset_filters:
dataset_dirs.extend(glob(path_utils.markdir(pattern)))
dataset_dirs.sort(key=_get_creation_time)
for dataset_dir in dataset_dirs:
name = raw_data.get_raw_dataset_name(
dataset_dir, raw_data_format=raw_data_format
)
if not os.path.isdir(dataset_dir):
status_reason = ["Dataset path is not a directory"]
elif not name:
status_reason = ["Dataset path has the wrong format"]
else:
status_reason = []
dataset = sync_types.Dataset(
path=dataset_dir,
proposal=exp_session.proposal,
beamline=exp_session.beamline,
name=name,
raw_root_dir=exp_session.raw_root_dir,
status_reason=status_reason,
)
if not dataset.is_invalid():
dataset.icat_dataset = registered_datasets.get(dataset.path)
if not dataset.is_registered():
_add_metadata_from_raw_data(
dataset, exp_session, raw_data_format=raw_data_format
)
exp_session.add_dataset(dataset)
logger.info("Metadata from HDF5 extracted.")
def _get_creation_time(file_path: str) -> float:
"""
Get the creation time of a file or directory.
"""
if os.path.exists(file_path):
return os.path.getctime(file_path)
return 0
def _data_portal_search_url(
proposal: str,
beamline: str,
) -> str:
return f"https://data.esrf.fr/beamline/{beamline}?search={proposal}"
def _add_metadata_from_raw_data(
dataset: sync_types.Dataset,
exp_session: sync_types.ExperimentalSession,
raw_data_format: str = "esrfv3",
) -> None:
try:
dataset_metadata = raw_data.get_raw_dataset_metadata(
dataset.path, raw_data_format=raw_data_format
)
err_msg = f"data format assumed to be '{raw_data_format}'"
except Exception as e:
dataset_metadata = dict()
err_msg = str(e)
if set(dataset_metadata) != {"Sample_name", "startDate", "endDate"}:
dataset.status_reason.append(f"Cannot extract dataset metadata: {err_msg}")
return
dataset.startdate = _as_datetime(dataset_metadata["startDate"])
dataset.enddate = _as_datetime(dataset_metadata["endDate"])
dataset_startdate, msg = _force_date_within_range(
exp_session.icat_investigation.startdate,
exp_session.icat_investigation.enddate,
dataset.startdate,
end=False,
)
if msg:
dataset.status_reason.append(msg)
dataset_enddate, msg = _force_date_within_range(
exp_session.icat_investigation.startdate,
exp_session.icat_investigation.enddate,
dataset.enddate,
end=True,
)
if msg:
dataset.status_reason.append(msg)
dataset_metadata["startDate"] = dataset_startdate
dataset_metadata["endDate"] = dataset_enddate
dataset.metadata = dataset_metadata
def _force_date_within_range(
session_startdate: datetime.datetime,
session_enddate: Optional[datetime.datetime],
dataset_date: datetime.datetime,
end: bool = False,
) -> Tuple[datetime.datetime, Optional[str]]:
if end:
action = "ended"
dstart_seconds = 2
dend_seconds = 1
else:
action = "started"
dstart_seconds = 1
dend_seconds = 2
msg = None
if session_enddate is not None and dataset_date >= session_enddate:
msg = f"{action} {dataset_date - session_enddate} after the end of the session"
dataset_date = session_enddate - datetime.timedelta(seconds=dend_seconds)
if dataset_date <= session_startdate:
msg = f"{action} {session_startdate - dataset_date} before the start of the session"
dataset_date = session_startdate + datetime.timedelta(seconds=dstart_seconds)
return dataset_date, msg
def _as_datetime(isoformat: str) -> datetime.datetime:
return datetime.datetime.fromisoformat(isoformat).astimezone()
[docs]
def main(argv=None):
if argv is None:
argv = sys.argv
parser = argparse.ArgumentParser(
description="Register missing raw dataset with ICAT"
)
parser.add_argument(
"--beamline", type=str.lower, required=False, help="Beamline name (e.g. id00)"
)
parser.add_argument(
"--proposal",
type=str.lower,
required=False,
help="Proposal name (e.g. ihch123)",
)
parser.add_argument(
"--session", required=False, help="Session name (e.g. 20231028)"
)
parser.add_argument(
"--register",
action="store_true",
required=False,
help="Register datasets with ICAT when needed (prompts for confirmation)",
)
parser.add_argument(
"--auto-register",
action="store_true",
required=False,
help="Register datasets with ICAT when needed without confirmation (only when it is safe to do so)",
)
parser.add_argument(
"--format",
required=False,
choices=["esrfv1", "esrfv2", "esrfv3", "id16bspec", "mx"],
default="esrfv3",
help="Raw data structure",
)
parser.add_argument(
"--save-dir",
type=str,
default=None,
required=False,
help="Directory to save the sync statistics for humans",
)
parser.add_argument(
"--cache-dir",
type=str,
default=None,
required=False,
help="Directory to cache the sync state for machines",
)
parser.add_argument(
"--no-print", action="store_true", help="Do not print the session summaries"
)
parser.add_argument(
"--invalidate-cache",
action="store_true",
help="Invalidate session when loading the cache",
)
args = parser.parse_args(argv[1:])
basic_config(
logger=logger,
level=logging.INFO if args.no_print else logging.DEBUG,
format="%(message)s",
)
icat_client = get_icat_client(timeout=600)
dry_run = not args.register and not args.auto_register
unsupervised = args.auto_register
sync_raw(
icat_client,
beamline=args.beamline,
proposal=args.proposal,
session=args.session,
dry_run=dry_run,
unsupervised=unsupervised,
raw_data_format=args.format,
save_dir=args.save_dir,
cache_dir=args.cache_dir,
invalidate_cache=args.invalidate_cache,
)
icat_client.disconnect()
if __name__ == "__main__":
sys.exit(main())