Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-10 08:39:02

0001 """
0002 find candidate site to distribute input datasets
0003 
0004 """
0005 
0006 import fnmatch
0007 import re
0008 import sys
0009 import time
0010 import uuid
0011 from typing import Dict, List, Tuple
0012 
0013 from pandacommon.pandalogger.LogWrapper import LogWrapper
0014 from pandacommon.pandalogger.PandaLogger import PandaLogger
0015 from pandacommon.pandautils.PandaUtils import naive_utcnow
0016 
0017 from pandaserver.dataservice.DataServiceUtils import select_scope
0018 from pandaserver.dataservice.ddm import rucioAPI
0019 from pandaserver.taskbuffer import JobUtils
0020 
0021 _logger = PandaLogger().getLogger("dyn_data_distributer")
0022 
0023 # files in datasets
0024 g_files_in_ds_map = {}
0025 
0026 
0027 class DynDataDistributer:
0028     """
0029     Find candidate site to distribute input datasets.
0030     """
0031 
0032     def __init__(self, jobs, siteMapper, simul=False, token=None, dataset_lifetime=14):
0033         self.jobs = jobs
0034         self.site_mapper = siteMapper
0035         if token is None:
0036             self.token = naive_utcnow().isoformat(" ")
0037         else:
0038             self.token = token
0039         # use a fixed list since some clouds don't have active T2s
0040         self.simul = simul
0041         self.last_message = ""
0042         # lifetime for temporary datasets
0043         self.dataset_lifetime = dataset_lifetime
0044 
0045     def get_replica_locations(self, input_ds: str, check_used_file: bool) -> Tuple[bool, Dict]:
0046         """
0047         Get replica locations for a given dataset.
0048 
0049         Args:
0050             input_ds (str): The name of the input dataset.
0051             check_used_file (bool): Flag to check used file.
0052 
0053         Returns:
0054             tuple: A tuple containing the status (bool) and the result (dict or str).
0055         """
0056         tmp_log = LogWrapper(_logger, f"get_replica_locations-{naive_utcnow().isoformat('/')}")
0057         tmp_log.debug(f"get_replica_locations {input_ds}")
0058 
0059         # return for failure
0060         res_for_failure = False, {"": {"": ([], [], [], 0, False, False, 0, 0, [])}}
0061 
0062         # get replica locations
0063         if input_ds.endswith("/"):
0064             # container
0065             status, tmp_rep_maps = self.get_list_dataset_replicas_in_container(input_ds)
0066             # get used datasets
0067             if status and check_used_file:
0068                 status, tmp_used_dataset_list = self.get_used_datasets(tmp_rep_maps)
0069                 # remove unused datasets
0070                 new_rep_maps = {}
0071                 for tmp_key, tmp_val in tmp_rep_maps.items():
0072                     if tmp_key in tmp_used_dataset_list:
0073                         new_rep_maps[tmp_key] = tmp_val
0074                 tmp_rep_maps = new_rep_maps
0075         else:
0076             # normal dataset
0077             status, tmp_rep_map = self.get_list_dataset_replicas(input_ds)
0078             tmp_rep_maps = {input_ds: tmp_rep_map}
0079 
0080         if not status:
0081             # failed
0082             tmp_log.error("failed to get replica locations for {input_ds}")
0083             tmp_log.debug("end")
0084             return res_for_failure
0085 
0086         tmp_log.debug("end")
0087         return True, tmp_rep_maps
0088 
0089     def get_all_sites(self) -> List:
0090         """
0091         Retrieves all sites that meet certain conditions.
0092 
0093         This method filters out sites based on the following conditions:
0094         - The site should be capable of running analysis.
0095         - The site should not be a GPU site.
0096         - The site should not use VP.
0097         - The status of the site should be "online".
0098 
0099         Returns:
0100             list: A list of SiteSpec objects that meet the above conditions.
0101         """
0102         all_sites = []
0103         for site_name in self.site_mapper.siteSpecList:
0104             site_spec = self.site_mapper.siteSpecList[site_name]
0105             # analysis only
0106             if not site_spec.runs_analysis():
0107                 continue
0108             # skip GPU
0109             if site_spec.isGPU():
0110                 continue
0111             # skip VP
0112             if site_spec.use_vp(JobUtils.ANALY_PS):
0113                 continue
0114             # online
0115             if site_spec.status not in ["online"]:
0116                 continue
0117             all_sites.append(site_spec)
0118         return all_sites
0119 
0120     def get_candidate_sites(self, tmp_rep_maps: Dict, prod_source_label: str, job_label: str) -> Tuple[bool, Dict]:
0121         """
0122         Retrieves candidate sites for data distribution based on certain conditions.
0123 
0124         This method filters out candidate sites based on the following condition:
0125         - The site should have a replica of the dataset.
0126 
0127         Args:
0128             tmp_rep_maps (dict): A dictionary containing dataset names as keys and their replica maps as values.
0129             prod_source_label (str): The label of the production source.
0130             job_label (str): The label of the job.
0131 
0132         Returns:
0133             tuple: A tuple containing a boolean status and a dictionary. The dictionary has dataset names as keys and
0134                    another dictionary as values. The inner dictionary has cloud names as keys and a tuple of various
0135                    site-related lists and values as values.
0136         """
0137         all_site_map = self.get_all_sites()
0138         return_map = {}
0139         cloud = "WORLD"
0140         for tmp_ds, tmp_rep_map in tmp_rep_maps.items():
0141             cand_sites = []
0142             sites_com_ds = []
0143             sites_comp_pd2p = []
0144             t1_has_replica = False
0145             t1_has_primary = False
0146             n_sec_replicas = 0
0147             cand_for_mou = []
0148             n_user_sub = 0
0149             for tmp_site_spec in all_site_map:
0150                 tmp_scope_input, tmp_scope_output = select_scope(tmp_site_spec, prod_source_label, job_label)
0151                 if tmp_scope_input not in tmp_site_spec.ddm_endpoints_input:
0152                     continue
0153                 rses = tmp_site_spec.ddm_endpoints_input[tmp_scope_input].getLocalEndPoints()
0154                 for ddm_endpoint in tmp_rep_map:
0155                     tmp_stat_map = tmp_rep_map[ddm_endpoint]
0156                     if ddm_endpoint in rses and tmp_stat_map[0]["total"] == tmp_stat_map[0]["found"] and ddm_endpoint.endswith("DATADISK"):
0157                         sites_com_ds.append(tmp_site_spec.sitename)
0158                         break
0159                 cand_sites.append(tmp_site_spec.sitename)
0160             return_map.setdefault(tmp_ds, {})
0161             if sites_com_ds:
0162                 cand_sites = sites_com_ds
0163             return_map[tmp_ds][cloud] = (
0164                 cand_sites,
0165                 sites_com_ds,
0166                 sites_comp_pd2p,
0167                 n_user_sub,
0168                 t1_has_replica,
0169                 t1_has_primary,
0170                 n_sec_replicas,
0171                 0,
0172                 cand_for_mou,
0173             )
0174         return True, return_map
0175 
0176     def get_candidates(self, input_ds: str, prod_source_label: str, job_label: str, check_used_file: bool = True) -> Tuple[bool, Dict]:
0177         """
0178         Get candidate sites for subscription.
0179 
0180         Args:
0181             input_ds (str): The name of the input dataset.
0182             prod_source_label (str): The label of the production source.
0183             job_label (str): The label of the job.
0184             check_used_file (bool, optional): Flag to check used file. Defaults to True.
0185 
0186         Returns:
0187             tuple: A tuple containing the status (bool) and the result (dict or str).
0188         """
0189         # Get replica locations
0190         status, tmp_rep_maps = self.get_replica_locations(input_ds, check_used_file)
0191         if not status:
0192             return status, tmp_rep_maps
0193 
0194         # Get candidate sites
0195         return self.get_candidate_sites(tmp_rep_maps, prod_source_label, job_label)
0196 
0197     def get_list_dataset_replicas(self, dataset: str, max_attempts: int = 3) -> Tuple[bool, Dict]:
0198         """
0199         Get the list of replicas for a given dataset.
0200 
0201         Args:
0202             dataset (str): The name of the dataset.
0203             max_attempts (int, optional): The maximum number of attempts to get the replicas. Defaults to 3.
0204 
0205         Returns:
0206             tuple: A tuple containing the status (bool) and the result (dict or str).
0207         """
0208         tmp_log = LogWrapper(_logger, f"get_list_dataset_replicas-{naive_utcnow().isoformat('/')}")
0209         tmp_log.debug(f"get_list_dataset_replicas {dataset}")
0210 
0211         for attempt in range(max_attempts):
0212             tmp_log.debug(f"{attempt}/{max_attempts} listDatasetReplicas {dataset}")
0213             status, replicas = rucioAPI.list_dataset_replicas(dataset)
0214             if status == 0:
0215                 tmp_log.debug(f"get_list_dataset_replicas->{str(replicas)}")
0216                 tmp_log.debug("end")
0217                 return True, replicas
0218             time.sleep(10)
0219 
0220         tmp_log.error(f"bad response for {dataset}")
0221         tmp_log.debug("end")
0222         return False, {}
0223 
0224     def get_list_dataset_replicas_in_container(self, container: str, max_attempts: int = 3) -> Tuple[bool, Dict]:
0225         """
0226         Get the list of replicas for a given container.
0227 
0228         Args:
0229             container (str): The name of the container.
0230             max_attempts (int, optional): The maximum number of attempts to get the replicas. Defaults to 3.
0231 
0232         Returns:
0233             tuple: A tuple containing the status (bool) and the result (dict or str).
0234         """
0235         tmp_log = LogWrapper(_logger, f"get_list_dataset_replicas_in_container-{naive_utcnow().isoformat('/')}")
0236 
0237         tmp_log.debug(f"get_list_dataset_replicas_in_container {container}")
0238 
0239         # response for failure
0240         res_for_failure = False, {}
0241 
0242         # get datasets in container
0243         for attempt in range(max_attempts):
0244             tmp_log.debug(f"{attempt}/{max_attempts} listDatasetsInContainer {container}")
0245             datasets, out = rucioAPI.list_datasets_in_container(container)
0246             if datasets is not None:
0247                 break
0248             time.sleep(60)
0249 
0250         if datasets is None:
0251             tmp_log.error(out)
0252             tmp_log.error(f"bad DDM response for {container}")
0253             tmp_log.debug("end")
0254             return res_for_failure
0255 
0256         # loop over all datasets
0257         all_rep_map = {}
0258         for dataset in datasets:
0259             # get replicas
0260             status, tmp_rep_sites = self.get_list_dataset_replicas(dataset)
0261             if not status:
0262                 tmp_log.debug("end")
0263                 return res_for_failure
0264             # append
0265             all_rep_map[dataset] = tmp_rep_sites
0266 
0267         # return
0268         tmp_log.debug("end")
0269         return True, all_rep_map
0270 
0271     def get_used_datasets(self, dataset_map: Dict, max_attempts: int = 3) -> Tuple[bool, Dict]:
0272         """
0273         Get the datasets that are used by jobs.
0274 
0275         Args:
0276             dataset_map (dict): The map of datasets.
0277             max_attempts (int, optional): The maximum number of attempts to get the file list. Defaults to 3.
0278 
0279         Returns:
0280             tuple: A tuple containing the status (bool) and the used datasets list.
0281         """
0282         tmp_log = LogWrapper(_logger, f"get_used_datasets-{naive_utcnow().isoformat('/')}")
0283         tmp_log.debug(f"get_used_datasets {str(dataset_map)}")
0284 
0285         res_for_failure = (False, [])
0286         used_ds_list = []
0287 
0288         # loop over all datasets
0289         for dataset_name in dataset_map:
0290             # get file list
0291             for attempt in range(max_attempts):
0292                 try:
0293                     tmp_log.debug(f"{attempt}/{max_attempts} listFilesInDataset {dataset_name}")
0294                     file_items, out = rucioAPI.list_files_in_dataset(dataset_name)
0295                     status = True
0296                     break
0297                 except Exception:
0298                     status = False
0299                     err_type, err_value = sys.exc_info()[:2]
0300                     out = f"{err_type} {err_value}"
0301                     time.sleep(60)
0302 
0303             if not status:
0304                 tmp_log.error(out)
0305                 tmp_log.error(f"bad DDM response to get size of {dataset_name}")
0306                 tmp_log.debug("end")
0307                 return res_for_failure
0308 
0309             # check if jobs use the dataset
0310             used_flag = False
0311             for tmp_job in self.jobs:
0312                 for tmp_file in tmp_job.Files:
0313                     if tmp_file.type == "input" and tmp_file.lfn in file_items:
0314                         used_flag = True
0315                         break
0316                 # escape
0317                 if used_flag:
0318                     break
0319 
0320             # used
0321             if used_flag:
0322                 used_ds_list.append(dataset_name)
0323 
0324         # return
0325         tmp_log.debug(f"used datasets = {str(used_ds_list)}")
0326         tmp_log.debug("end")
0327         return True, used_ds_list
0328 
0329     def get_file_from_dataset(self, dataset_name: str, guid: str, max_attempts: int = 3) -> Tuple[bool, Dict]:
0330         """
0331         Get file information from a dataset.
0332 
0333         Args:
0334             dataset_name (str): The name of the dataset.
0335             guid (str): The GUID of the file.
0336             max_attempts (int, optional): The maximum number of attempts to get the file list. Defaults to 3.
0337 
0338         Returns:
0339             tuple: A tuple containing the status (bool) and the file information (dict or None).
0340         """
0341         tmp_log = LogWrapper(_logger, f"get_file_from_dataset-{naive_utcnow().isoformat('/')}")
0342         tmp_log.debug(f"get_file_from_dataset {dataset_name} {guid}")
0343 
0344         res_for_failure = (False, None)
0345 
0346         # get files in datasets
0347         global g_files_in_ds_map
0348         if dataset_name not in g_files_in_ds_map:
0349             # get file list
0350             for attempt in range(max_attempts):
0351                 try:
0352                     tmp_log.debug(f"{attempt}/{max_attempts} listFilesInDataset {dataset_name}")
0353                     file_items, out = rucioAPI.list_files_in_dataset(dataset_name)
0354                     status = True
0355                     break
0356                 except Exception:
0357                     status = False
0358                     err_type, err_value = sys.exc_info()[:2]
0359                     out = f"{err_type} {err_value}"
0360                     time.sleep(60)
0361 
0362             if not status:
0363                 tmp_log.error(out)
0364                 tmp_log.error(f"bad DDM response to get size of {dataset_name}")
0365                 tmp_log.debug("end")
0366                 return res_for_failure
0367                 # append
0368             g_files_in_ds_map[dataset_name] = file_items
0369 
0370         # check if file is in the dataset
0371         for tmp_lfn, tmp_val in g_files_in_ds_map[dataset_name].items():
0372             if uuid.UUID(tmp_val["guid"]) == uuid.UUID(guid):
0373                 ret_map = tmp_val
0374                 ret_map["lfn"] = tmp_lfn
0375                 ret_map["dataset"] = dataset_name
0376                 tmp_log.debug("end")
0377                 return True, ret_map
0378 
0379         tmp_log.debug("end")
0380         return res_for_failure
0381 
0382     def register_dataset_container_with_datasets(
0383         self, container_name: str, files: List, replica_map: Dict, n_sites: int = 1, owner: str = None, max_attempts: int = 3
0384     ) -> Tuple[bool, Dict]:
0385         """
0386         Register a new dataset container with datasets.
0387 
0388         Args:
0389             container_name (str): The name of the container.
0390             files (list): The list of files to be included in the dataset.
0391             replica_map (dict): The map of replicas.
0392             n_sites (int, optional): The number of sites. Defaults to 1.
0393             owner (str, optional): The owner of the dataset. Defaults to None.
0394             max_attempts (int, optional): The maximum number of attempts to register the container. Defaults to 3.
0395 
0396         Returns:
0397             bool: The status of the registration process.
0398         """
0399         tmp_logger = LogWrapper(_logger, f"register_dataset_container_with_datasets-{naive_utcnow().isoformat('/')}")
0400         tmp_logger.debug(f"register_dataset_container_with_datasets {container_name}")
0401 
0402         # parse DN
0403         if owner is not None:
0404             status, user_info = rucioAPI.finger(owner)
0405             if not status:
0406                 tmp_logger.debug(f"failed to finger: {user_info}")
0407             else:
0408                 owner = user_info["nickname"]
0409             tmp_logger.debug(f"parsed DN={owner}")
0410 
0411         # sort by locations
0412         files_map = {}
0413         for tmp_file in files:
0414             tmp_locations = sorted(replica_map[tmp_file["dataset"]])
0415             new_locations = []
0416             # skip STAGING
0417             for tmp_location in tmp_locations:
0418                 if not tmp_location.endswith("STAGING"):
0419                     new_locations.append(tmp_location)
0420             if not new_locations:
0421                 continue
0422             tmp_locations = new_locations
0423             tmp_key = tuple(tmp_locations)
0424             files_map.setdefault(tmp_key, [])
0425             # append file
0426             files_map[tmp_key].append(tmp_file)
0427 
0428         # get nfiles per dataset
0429         n_files_per_dataset, _ = divmod(len(files), n_sites)
0430         if n_files_per_dataset == 0:
0431             n_files_per_dataset = 1
0432         max_files_per_dataset = 1000
0433         if n_files_per_dataset >= max_files_per_dataset:
0434             n_files_per_dataset = max_files_per_dataset
0435 
0436         # register new datasets
0437         dataset_names = []
0438         tmp_index = 1
0439         for tmp_locations, tmp_files in files_map.items():
0440             tmp_sub_index = 0
0441             while tmp_sub_index < len(tmp_files):
0442                 tmp_dataset_name = container_name[:-1] + "_%04d" % tmp_index
0443                 tmp_ret = self.register_dataset_with_location(
0444                     tmp_dataset_name,
0445                     tmp_files[tmp_sub_index : tmp_sub_index + n_files_per_dataset],
0446                     tmp_locations,
0447                     owner=None,
0448                 )
0449                 # failed
0450                 if not tmp_ret:
0451                     tmp_logger.error(f"failed to register {tmp_dataset_name}")
0452                     tmp_logger.debug("end")
0453                     return False
0454                 # append dataset
0455                 dataset_names.append(tmp_dataset_name)
0456                 tmp_index += 1
0457                 tmp_sub_index += n_files_per_dataset
0458 
0459         # register container
0460         for attempt in range(max_attempts):
0461             try:
0462                 tmp_logger.debug(f"{attempt}/{max_attempts} registerContainer {container_name}")
0463                 status = rucioAPI.register_container(container_name, dataset_names)
0464                 out = "OK"
0465                 break
0466             except Exception:
0467                 status = False
0468                 err_type, err_value = sys.exc_info()[:2]
0469                 out = f"{err_type} {err_value}"
0470                 time.sleep(10)
0471 
0472         if not status:
0473             tmp_logger.error(out)
0474             tmp_logger.error(f"bad DDM response to register {container_name}")
0475             tmp_logger.debug("end")
0476             return False
0477 
0478         # return
0479         tmp_logger.debug(out)
0480         tmp_logger.debug("end")
0481         return True
0482 
0483     def register_dataset_with_location(self, dataset_name: str, files: List, locations: List, owner: str = None, max_attempts: int = 3) -> bool:
0484         """
0485         Register a new dataset with locations.
0486 
0487         Args:
0488             dataset_name (str): The name of the dataset.
0489             files (list): The list of files to be included in the dataset.
0490             locations (list): The list of locations where the dataset will be registered.
0491             owner (str, optional): The owner of the dataset. Defaults to None.
0492             max_attempts (int, optional): The maximum number of attempts to register the dataset. Defaults to 3.
0493 
0494         Returns:
0495             bool: The status of the registration process.
0496         """
0497         tmp_logger = LogWrapper(_logger, f"register_dataset_with_location-{naive_utcnow().isoformat('/')}")
0498         tmp_logger.debug(f"register_dataset_with_location {dataset_name}")
0499 
0500         res_for_failure = False
0501 
0502         # get file info
0503         guids = []
0504         lfns = []
0505         file_sizes = []
0506         checksums = []
0507         for tmp_file in files:
0508             guids.append(tmp_file["guid"])
0509             lfns.append(tmp_file["scope"] + ":" + tmp_file["lfn"])
0510             file_sizes.append(int(tmp_file["filesize"]))
0511             checksums.append(tmp_file["checksum"])
0512 
0513         # register new dataset
0514         for attempt in range(max_attempts):
0515             try:
0516                 tmp_logger.debug(f"{attempt}/{max_attempts} registerNewDataset {dataset_name} len={len(files)}")
0517                 out = rucioAPI.register_dataset(dataset_name, lfns, guids, file_sizes, checksums, lifetime=self.dataset_lifetime)
0518                 tmp_logger.debug(out)
0519                 break
0520             except Exception:
0521                 err_type, err_value = sys.exc_info()[:2]
0522                 tmp_logger.error(f"{err_type} {err_value}")
0523                 if attempt + 1 == max_attempts:
0524                     tmp_logger.error(f"failed to register {dataset_name} in rucio")
0525                     tmp_logger.debug("end")
0526                     return res_for_failure
0527                 time.sleep(10)
0528 
0529         # freeze dataset
0530         for attempt in range(max_attempts):
0531             tmp_logger.debug(f"{attempt}/{max_attempts} freezeDataset {dataset_name}")
0532             try:
0533                 rucioAPI.close_dataset(dataset_name)
0534                 status = True
0535             except Exception:
0536                 err_type, err_value = sys.exc_info()[:2]
0537                 out = f"failed to freeze : {err_type} {err_value}"
0538                 status = False
0539             if not status:
0540                 time.sleep(10)
0541             else:
0542                 break
0543         if not status:
0544             tmp_logger.error(out)
0545             tmp_logger.error(f"bad DDM response to freeze {dataset_name}")
0546             tmp_logger.debug("end")
0547             return res_for_failure
0548 
0549         # register locations
0550         for tmp_location in locations:
0551             for attempt in range(max_attempts):
0552                 try:
0553                     tmp_logger.debug(f"{attempt}/{max_attempts} registerDatasetLocation {dataset_name} {tmp_location}")
0554                     out = rucioAPI.register_dataset_location(dataset_name, [tmp_location], self.dataset_lifetime, owner)
0555                     tmp_logger.debug(out)
0556                     status = True
0557                     break
0558 
0559                 except Exception:
0560                     status = False
0561                     err_type, err_value = sys.exc_info()[:2]
0562                     tmp_logger.error(f"{err_type} {err_value}")
0563                     if attempt + 1 == max_attempts:
0564                         tmp_logger.error(f"failed to register {dataset_name} in rucio")
0565                         tmp_logger.debug("end")
0566                         return res_for_failure
0567                     time.sleep(10)
0568 
0569             if not status:
0570                 tmp_logger.error(out)
0571                 tmp_logger.error(f"bad DDM response to register location {dataset_name}")
0572                 tmp_logger.debug("end")
0573                 return res_for_failure
0574         return True
0575 
0576     def get_datasets_by_guids(self, out_map: Dict, guids: List[str], dataset_filters: List[str]) -> Tuple[bool, Dict]:
0577         """
0578         Get datasets by GUIDs.
0579 
0580         Args:
0581             out_map (dict): The output map.
0582             guids (list): The list of GUIDs.
0583             dataset_filters (list): The list of dataset filters.
0584 
0585         Returns:
0586             tuple: A tuple containing a boolean status and a dictionary.
0587         """
0588         tmp_logger = LogWrapper(_logger, f"get_datasets_by_guids-{naive_utcnow().isoformat('/')}")
0589         tmp_logger.debug(f"get_datasets_by_guids {str(guids)}")
0590 
0591         ret_map = {}
0592         try:
0593             for guid in guids:
0594                 tmp_dataset_names = []
0595                 if guid not in out_map:
0596                     tmp_logger.error(f"GUID={guid} not found")
0597                     tmp_logger.debug("end")
0598                     return False, {}
0599 
0600                 for tmp_dataset_name in out_map[guid]:
0601                     if (
0602                         tmp_dataset_name.startswith("panda")
0603                         or tmp_dataset_name.startswith("user")
0604                         or tmp_dataset_name.startswith("group")
0605                         or tmp_dataset_name.startswith("archive")
0606                         or re.search("_sub\d+$", tmp_dataset_name) is not None
0607                         or re.search("_dis\d+$", tmp_dataset_name) is not None
0608                         or re.search("_shadow$", tmp_dataset_name) is not None
0609                     ):
0610                         continue
0611 
0612                     if dataset_filters:
0613                         flag_match = False
0614                         for tmp_filter in dataset_filters:
0615                             if fnmatch.fnmatchcase(tmp_dataset_name, tmp_filter):
0616                                 flag_match = True
0617                                 break
0618                         if not flag_match:
0619                             continue
0620 
0621                     tmp_dataset_names.append(tmp_dataset_name)
0622 
0623                 if not tmp_dataset_names:
0624                     tmp_logger.debug(f"no datasets found for GUID={guid}")
0625                     continue
0626 
0627                 if len(tmp_dataset_names) != 1:
0628                     tmp_logger.debug(f"use the first dataset in {str(tmp_dataset_names)} for GUID:{guid}")
0629 
0630                 ret_map[guid] = tmp_dataset_names[0]
0631         except Exception as e:
0632             tmp_logger.error(f"failed to parse get_datasets_by_guids: {e}")
0633             tmp_logger.debug("end")
0634             return False, {}
0635 
0636         return True, ret_map
0637 
0638     def list_datasets_by_guids(self, guids: List[str], dataset_filters: List[str], max_attempts: int = 3) -> Tuple[bool, Dict]:
0639         """
0640         List datasets by GUIDs.
0641 
0642         Args:
0643             guids (list): The list of GUIDs.
0644             dataset_filters (list): The list of dataset filters.
0645             max_attempts (int, optional): The maximum number of attempts to list the datasets. Defaults to 3.
0646 
0647         Returns:
0648             tuple: A tuple containing the status (bool) and the result (dict or str).
0649         """
0650         tmp_logger = LogWrapper(_logger, f"list_datasets_by_guids-{naive_utcnow().isoformat('/')}")
0651         tmp_logger.debug(f"list_datasets_by_guids {str(guids)}")
0652 
0653         res_for_failure = (False, {})
0654         res_for_fatal = (False, {"isFatal": True})
0655 
0656         # get size of datasets
0657         for attempt in range(max_attempts):
0658             tmp_logger.debug(f"{attempt}/{max_attempts} listDatasetsByGUIDs GUIDs={str(guids)}")
0659             try:
0660                 out = rucioAPI.list_datasets_by_guids(guids)
0661                 status = True
0662                 break
0663             except Exception:
0664                 err_type, err_value = sys.exc_info()[:2]
0665                 out = f"failed to get datasets with GUIDs : {err_type} {err_value}"
0666                 status = False
0667                 time.sleep(10)
0668 
0669         if not status:
0670             tmp_logger.error(out)
0671             tmp_logger.error(f"bad DDM response to get size of {str(guids)}")
0672             if "DataIdentifierNotFound" in out:
0673                 tmp_logger.error("DataIdentifierNotFound in listDatasetsByGUIDs")
0674                 tmp_logger.debug("end")
0675                 return res_for_fatal
0676             tmp_logger.debug("end")
0677             return res_for_failure
0678 
0679         tmp_logger.debug(out)
0680 
0681         # get map
0682         status, ret_map = self.get_datasets_by_guids(out, guids, dataset_filters)
0683 
0684         if not status:
0685             return res_for_failure
0686 
0687         tmp_logger.debug("end")
0688         return True, ret_map
0689 
0690     def convert_evt_run_to_datasets(
0691         self, event_run_list: List, dataset_type: str, stream_name: str, dataset_filters: List, ami_tag: str, run_evt_guid_map: Dict
0692     ) -> Tuple[bool, Dict, List]:
0693         """
0694         Convert event/run list to datasets.
0695 
0696         Args:
0697             event_run_list (list): The list of run events.
0698             dataset_type (str): The type of the dataset.
0699             stream_name (str): The name of the stream.
0700             dataset_filters (list): The list of dataset filters.
0701             ami_tag (str): The AMI tag.
0702             run_evt_guid_map (dict): The map of run events to GUIDs.
0703 
0704         Returns:
0705             tuple: A tuple containing the status (bool), the result (dict or str), and the list of all files.
0706         """
0707         tmp_logger = LogWrapper(_logger, f"convert_evt_run_to_datasets-{naive_utcnow().isoformat('/')}")
0708         tmp_logger.debug(f"convert_evt_run_to_datasets type={dataset_type} stream={stream_name} dsPatt={str(dataset_filters)} amitag={ami_tag}")
0709 
0710         # check data type
0711         failed_ret = False, {}, []
0712         fatal_ret = False, {"isFatal": True}, []
0713         stream_ref = "Stream" + dataset_type
0714         # import event lookup client
0715         if run_evt_guid_map == {}:
0716             if len(event_run_list) == 0:
0717                 tmp_logger.error("Empty list for run and events was provided")
0718                 tmp_logger.debug("end")
0719                 return failed_ret
0720             # Hadoop EI
0721             from .event_lookup_client_ei import EventLookupClientEI
0722 
0723             event_lookup_if = EventLookupClientEI()
0724             # loop over all events
0725             n_events_per_loop = 500
0726             i_events_total = 0
0727             while i_events_total < len(event_run_list):
0728                 tmp_event_run_list = event_run_list[i_events_total : i_events_total + n_events_per_loop]
0729                 tmp_logger.debug(f"EI lookup for {i_events_total}/{len(event_run_list)}")
0730                 i_events_total += n_events_per_loop
0731                 reg_start = naive_utcnow()
0732                 guid_list_elssi, tmp_com, tmp_out, tmp_err = event_lookup_if.do_lookup(
0733                     tmp_event_run_list,
0734                     stream=stream_name,
0735                     tokens=stream_ref,
0736                     ami_tag=ami_tag,
0737                 )
0738                 reg_time = naive_utcnow() - reg_start
0739                 tmp_logger.debug(f"EI command: {tmp_com}")
0740                 tmp_logger.debug(f"took {reg_time.seconds}.{reg_time.microseconds / 1000:03f} sec for {len(tmp_event_run_list)} events")
0741                 # failed
0742                 if tmp_err not in [None, ""] or len(guid_list_elssi) == 0:
0743                     tmp_logger.debug(tmp_com)
0744                     tmp_logger.debug(tmp_out)
0745                     tmp_logger.debug(tmp_err)
0746                     tmp_logger.error("invalid return from EventIndex")
0747                     tmp_logger.debug("end")
0748                     return failed_ret
0749                 # check events
0750                 for run_nr, evt_nr in tmp_event_run_list:
0751                     param_str = f"Run:{run_nr} Evt:{evt_nr} Stream:{stream_name}"
0752                     tmp_logger.debug(param_str)
0753                     tmp_run_evt_key = (int(run_nr), int(evt_nr))
0754                     # not found
0755                     if tmp_run_evt_key not in guid_list_elssi or len(guid_list_elssi[tmp_run_evt_key]) == 0:
0756                         tmp_logger.debug(tmp_com)
0757                         tmp_logger.debug(tmp_out)
0758                         tmp_logger.debug(tmp_err)
0759                         tmp_logger.error(f"no GUIDs were found in EventIndex for {param_str}")
0760                         tmp_logger.debug("end")
0761                         return fatal_ret
0762                     # append
0763                     run_evt_guid_map[tmp_run_evt_key] = guid_list_elssi[tmp_run_evt_key]
0764         # convert to datasets
0765         all_datasets = []
0766         all_files = []
0767         all_locations = {}
0768         for tmp_idx in run_evt_guid_map:
0769             tmp_guids = run_evt_guid_map[tmp_idx]
0770             run_nr, evt_nr = tmp_idx
0771             tmp_ds_ret, tmp_dataset_map = self.list_datasets_by_guids(tmp_guids, dataset_filters)
0772             # failed
0773             if not tmp_ds_ret:
0774                 tmp_logger.error(f"failed to convert GUIDs to datasets")
0775                 if "isFatal" in tmp_dataset_map and tmp_dataset_map["isFatal"] is True:
0776                     tmp_logger.debug("end")
0777                     return fatal_ret
0778                 tmp_logger.debug("end")
0779                 return failed_ret
0780             # empty
0781             if not tmp_dataset_map:
0782                 tmp_logger.error(f"there is no dataset for Run:{run_nr} Evt:{evt_nr} GUIDs:{str(tmp_guids)}")
0783                 tmp_logger.debug("end")
0784                 return fatal_ret
0785             if len(tmp_dataset_map) != 1:
0786                 tmp_logger.error(f"there are multiple datasets {str(tmp_dataset_map)} for Run:{run_nr} Evt:{evt_nr} GUIDs:{str(tmp_guids)}")
0787                 tmp_logger.debug("end")
0788                 return fatal_ret
0789 
0790             # append
0791             for tmp_guid, tmp_dataset_name in tmp_dataset_map.items():
0792                 # collect dataset names
0793                 if tmp_dataset_name not in all_datasets:
0794                     all_datasets.append(tmp_dataset_name)
0795                     # get location
0796                     stat_rep, replica_map = self.get_list_dataset_replicas(tmp_dataset_name)
0797                     # failed
0798                     if not stat_rep:
0799                         tmp_logger.error(f"failed to get locations for {tmp_dataset_name}")
0800                         tmp_logger.debug("end")
0801                         return failed_ret
0802                     # collect locations
0803                     tmp_location_list = []
0804                     for tmp_location in replica_map:
0805                         # use only complete replicas
0806                         ds_stat_dict = replica_map[tmp_location][0]
0807                         if ds_stat_dict["total"] is not None and ds_stat_dict["total"] == ds_stat_dict["found"]:
0808                             if tmp_location not in tmp_location_list:
0809                                 tmp_location_list.append(tmp_location)
0810                     all_locations[tmp_dataset_name] = tmp_location_list
0811 
0812                 # get file info
0813                 tmp_file_ret, tmp_file_info = self.get_file_from_dataset(tmp_dataset_name, tmp_guid)
0814                 # failed
0815                 if not tmp_file_ret:
0816                     tmp_logger.error(f"failed to get fileinfo for GUID:{tmp_guid} DS:{tmp_dataset_name}")
0817                     tmp_logger.debug("end")
0818                     return failed_ret
0819                 # collect files
0820                 all_files.append(tmp_file_info)
0821         # return
0822         tmp_logger.debug(f"converted to {str(all_datasets)}, {str(all_locations)}, {str(all_files)}")
0823         tmp_logger.debug("end")
0824         return True, all_locations, all_files