File indexing completed on 2026-04-10 08:39:02
0001 """
0002 find candidate site to distribute input datasets
0003
0004 """
0005
0006 import fnmatch
0007 import re
0008 import sys
0009 import time
0010 import uuid
0011 from typing import Dict, List, Tuple
0012
0013 from pandacommon.pandalogger.LogWrapper import LogWrapper
0014 from pandacommon.pandalogger.PandaLogger import PandaLogger
0015 from pandacommon.pandautils.PandaUtils import naive_utcnow
0016
0017 from pandaserver.dataservice.DataServiceUtils import select_scope
0018 from pandaserver.dataservice.ddm import rucioAPI
0019 from pandaserver.taskbuffer import JobUtils
0020
0021 _logger = PandaLogger().getLogger("dyn_data_distributer")
0022
0023
0024 g_files_in_ds_map = {}
0025
0026
0027 class DynDataDistributer:
0028 """
0029 Find candidate site to distribute input datasets.
0030 """
0031
0032 def __init__(self, jobs, siteMapper, simul=False, token=None, dataset_lifetime=14):
0033 self.jobs = jobs
0034 self.site_mapper = siteMapper
0035 if token is None:
0036 self.token = naive_utcnow().isoformat(" ")
0037 else:
0038 self.token = token
0039
0040 self.simul = simul
0041 self.last_message = ""
0042
0043 self.dataset_lifetime = dataset_lifetime
0044
0045 def get_replica_locations(self, input_ds: str, check_used_file: bool) -> Tuple[bool, Dict]:
0046 """
0047 Get replica locations for a given dataset.
0048
0049 Args:
0050 input_ds (str): The name of the input dataset.
0051 check_used_file (bool): Flag to check used file.
0052
0053 Returns:
0054 tuple: A tuple containing the status (bool) and the result (dict or str).
0055 """
0056 tmp_log = LogWrapper(_logger, f"get_replica_locations-{naive_utcnow().isoformat('/')}")
0057 tmp_log.debug(f"get_replica_locations {input_ds}")
0058
0059
0060 res_for_failure = False, {"": {"": ([], [], [], 0, False, False, 0, 0, [])}}
0061
0062
0063 if input_ds.endswith("/"):
0064
0065 status, tmp_rep_maps = self.get_list_dataset_replicas_in_container(input_ds)
0066
0067 if status and check_used_file:
0068 status, tmp_used_dataset_list = self.get_used_datasets(tmp_rep_maps)
0069
0070 new_rep_maps = {}
0071 for tmp_key, tmp_val in tmp_rep_maps.items():
0072 if tmp_key in tmp_used_dataset_list:
0073 new_rep_maps[tmp_key] = tmp_val
0074 tmp_rep_maps = new_rep_maps
0075 else:
0076
0077 status, tmp_rep_map = self.get_list_dataset_replicas(input_ds)
0078 tmp_rep_maps = {input_ds: tmp_rep_map}
0079
0080 if not status:
0081
0082 tmp_log.error("failed to get replica locations for {input_ds}")
0083 tmp_log.debug("end")
0084 return res_for_failure
0085
0086 tmp_log.debug("end")
0087 return True, tmp_rep_maps
0088
0089 def get_all_sites(self) -> List:
0090 """
0091 Retrieves all sites that meet certain conditions.
0092
0093 This method filters out sites based on the following conditions:
0094 - The site should be capable of running analysis.
0095 - The site should not be a GPU site.
0096 - The site should not use VP.
0097 - The status of the site should be "online".
0098
0099 Returns:
0100 list: A list of SiteSpec objects that meet the above conditions.
0101 """
0102 all_sites = []
0103 for site_name in self.site_mapper.siteSpecList:
0104 site_spec = self.site_mapper.siteSpecList[site_name]
0105
0106 if not site_spec.runs_analysis():
0107 continue
0108
0109 if site_spec.isGPU():
0110 continue
0111
0112 if site_spec.use_vp(JobUtils.ANALY_PS):
0113 continue
0114
0115 if site_spec.status not in ["online"]:
0116 continue
0117 all_sites.append(site_spec)
0118 return all_sites
0119
0120 def get_candidate_sites(self, tmp_rep_maps: Dict, prod_source_label: str, job_label: str) -> Tuple[bool, Dict]:
0121 """
0122 Retrieves candidate sites for data distribution based on certain conditions.
0123
0124 This method filters out candidate sites based on the following condition:
0125 - The site should have a replica of the dataset.
0126
0127 Args:
0128 tmp_rep_maps (dict): A dictionary containing dataset names as keys and their replica maps as values.
0129 prod_source_label (str): The label of the production source.
0130 job_label (str): The label of the job.
0131
0132 Returns:
0133 tuple: A tuple containing a boolean status and a dictionary. The dictionary has dataset names as keys and
0134 another dictionary as values. The inner dictionary has cloud names as keys and a tuple of various
0135 site-related lists and values as values.
0136 """
0137 all_site_map = self.get_all_sites()
0138 return_map = {}
0139 cloud = "WORLD"
0140 for tmp_ds, tmp_rep_map in tmp_rep_maps.items():
0141 cand_sites = []
0142 sites_com_ds = []
0143 sites_comp_pd2p = []
0144 t1_has_replica = False
0145 t1_has_primary = False
0146 n_sec_replicas = 0
0147 cand_for_mou = []
0148 n_user_sub = 0
0149 for tmp_site_spec in all_site_map:
0150 tmp_scope_input, tmp_scope_output = select_scope(tmp_site_spec, prod_source_label, job_label)
0151 if tmp_scope_input not in tmp_site_spec.ddm_endpoints_input:
0152 continue
0153 rses = tmp_site_spec.ddm_endpoints_input[tmp_scope_input].getLocalEndPoints()
0154 for ddm_endpoint in tmp_rep_map:
0155 tmp_stat_map = tmp_rep_map[ddm_endpoint]
0156 if ddm_endpoint in rses and tmp_stat_map[0]["total"] == tmp_stat_map[0]["found"] and ddm_endpoint.endswith("DATADISK"):
0157 sites_com_ds.append(tmp_site_spec.sitename)
0158 break
0159 cand_sites.append(tmp_site_spec.sitename)
0160 return_map.setdefault(tmp_ds, {})
0161 if sites_com_ds:
0162 cand_sites = sites_com_ds
0163 return_map[tmp_ds][cloud] = (
0164 cand_sites,
0165 sites_com_ds,
0166 sites_comp_pd2p,
0167 n_user_sub,
0168 t1_has_replica,
0169 t1_has_primary,
0170 n_sec_replicas,
0171 0,
0172 cand_for_mou,
0173 )
0174 return True, return_map
0175
0176 def get_candidates(self, input_ds: str, prod_source_label: str, job_label: str, check_used_file: bool = True) -> Tuple[bool, Dict]:
0177 """
0178 Get candidate sites for subscription.
0179
0180 Args:
0181 input_ds (str): The name of the input dataset.
0182 prod_source_label (str): The label of the production source.
0183 job_label (str): The label of the job.
0184 check_used_file (bool, optional): Flag to check used file. Defaults to True.
0185
0186 Returns:
0187 tuple: A tuple containing the status (bool) and the result (dict or str).
0188 """
0189
0190 status, tmp_rep_maps = self.get_replica_locations(input_ds, check_used_file)
0191 if not status:
0192 return status, tmp_rep_maps
0193
0194
0195 return self.get_candidate_sites(tmp_rep_maps, prod_source_label, job_label)
0196
0197 def get_list_dataset_replicas(self, dataset: str, max_attempts: int = 3) -> Tuple[bool, Dict]:
0198 """
0199 Get the list of replicas for a given dataset.
0200
0201 Args:
0202 dataset (str): The name of the dataset.
0203 max_attempts (int, optional): The maximum number of attempts to get the replicas. Defaults to 3.
0204
0205 Returns:
0206 tuple: A tuple containing the status (bool) and the result (dict or str).
0207 """
0208 tmp_log = LogWrapper(_logger, f"get_list_dataset_replicas-{naive_utcnow().isoformat('/')}")
0209 tmp_log.debug(f"get_list_dataset_replicas {dataset}")
0210
0211 for attempt in range(max_attempts):
0212 tmp_log.debug(f"{attempt}/{max_attempts} listDatasetReplicas {dataset}")
0213 status, replicas = rucioAPI.list_dataset_replicas(dataset)
0214 if status == 0:
0215 tmp_log.debug(f"get_list_dataset_replicas->{str(replicas)}")
0216 tmp_log.debug("end")
0217 return True, replicas
0218 time.sleep(10)
0219
0220 tmp_log.error(f"bad response for {dataset}")
0221 tmp_log.debug("end")
0222 return False, {}
0223
0224 def get_list_dataset_replicas_in_container(self, container: str, max_attempts: int = 3) -> Tuple[bool, Dict]:
0225 """
0226 Get the list of replicas for a given container.
0227
0228 Args:
0229 container (str): The name of the container.
0230 max_attempts (int, optional): The maximum number of attempts to get the replicas. Defaults to 3.
0231
0232 Returns:
0233 tuple: A tuple containing the status (bool) and the result (dict or str).
0234 """
0235 tmp_log = LogWrapper(_logger, f"get_list_dataset_replicas_in_container-{naive_utcnow().isoformat('/')}")
0236
0237 tmp_log.debug(f"get_list_dataset_replicas_in_container {container}")
0238
0239
0240 res_for_failure = False, {}
0241
0242
0243 for attempt in range(max_attempts):
0244 tmp_log.debug(f"{attempt}/{max_attempts} listDatasetsInContainer {container}")
0245 datasets, out = rucioAPI.list_datasets_in_container(container)
0246 if datasets is not None:
0247 break
0248 time.sleep(60)
0249
0250 if datasets is None:
0251 tmp_log.error(out)
0252 tmp_log.error(f"bad DDM response for {container}")
0253 tmp_log.debug("end")
0254 return res_for_failure
0255
0256
0257 all_rep_map = {}
0258 for dataset in datasets:
0259
0260 status, tmp_rep_sites = self.get_list_dataset_replicas(dataset)
0261 if not status:
0262 tmp_log.debug("end")
0263 return res_for_failure
0264
0265 all_rep_map[dataset] = tmp_rep_sites
0266
0267
0268 tmp_log.debug("end")
0269 return True, all_rep_map
0270
0271 def get_used_datasets(self, dataset_map: Dict, max_attempts: int = 3) -> Tuple[bool, Dict]:
0272 """
0273 Get the datasets that are used by jobs.
0274
0275 Args:
0276 dataset_map (dict): The map of datasets.
0277 max_attempts (int, optional): The maximum number of attempts to get the file list. Defaults to 3.
0278
0279 Returns:
0280 tuple: A tuple containing the status (bool) and the used datasets list.
0281 """
0282 tmp_log = LogWrapper(_logger, f"get_used_datasets-{naive_utcnow().isoformat('/')}")
0283 tmp_log.debug(f"get_used_datasets {str(dataset_map)}")
0284
0285 res_for_failure = (False, [])
0286 used_ds_list = []
0287
0288
0289 for dataset_name in dataset_map:
0290
0291 for attempt in range(max_attempts):
0292 try:
0293 tmp_log.debug(f"{attempt}/{max_attempts} listFilesInDataset {dataset_name}")
0294 file_items, out = rucioAPI.list_files_in_dataset(dataset_name)
0295 status = True
0296 break
0297 except Exception:
0298 status = False
0299 err_type, err_value = sys.exc_info()[:2]
0300 out = f"{err_type} {err_value}"
0301 time.sleep(60)
0302
0303 if not status:
0304 tmp_log.error(out)
0305 tmp_log.error(f"bad DDM response to get size of {dataset_name}")
0306 tmp_log.debug("end")
0307 return res_for_failure
0308
0309
0310 used_flag = False
0311 for tmp_job in self.jobs:
0312 for tmp_file in tmp_job.Files:
0313 if tmp_file.type == "input" and tmp_file.lfn in file_items:
0314 used_flag = True
0315 break
0316
0317 if used_flag:
0318 break
0319
0320
0321 if used_flag:
0322 used_ds_list.append(dataset_name)
0323
0324
0325 tmp_log.debug(f"used datasets = {str(used_ds_list)}")
0326 tmp_log.debug("end")
0327 return True, used_ds_list
0328
0329 def get_file_from_dataset(self, dataset_name: str, guid: str, max_attempts: int = 3) -> Tuple[bool, Dict]:
0330 """
0331 Get file information from a dataset.
0332
0333 Args:
0334 dataset_name (str): The name of the dataset.
0335 guid (str): The GUID of the file.
0336 max_attempts (int, optional): The maximum number of attempts to get the file list. Defaults to 3.
0337
0338 Returns:
0339 tuple: A tuple containing the status (bool) and the file information (dict or None).
0340 """
0341 tmp_log = LogWrapper(_logger, f"get_file_from_dataset-{naive_utcnow().isoformat('/')}")
0342 tmp_log.debug(f"get_file_from_dataset {dataset_name} {guid}")
0343
0344 res_for_failure = (False, None)
0345
0346
0347 global g_files_in_ds_map
0348 if dataset_name not in g_files_in_ds_map:
0349
0350 for attempt in range(max_attempts):
0351 try:
0352 tmp_log.debug(f"{attempt}/{max_attempts} listFilesInDataset {dataset_name}")
0353 file_items, out = rucioAPI.list_files_in_dataset(dataset_name)
0354 status = True
0355 break
0356 except Exception:
0357 status = False
0358 err_type, err_value = sys.exc_info()[:2]
0359 out = f"{err_type} {err_value}"
0360 time.sleep(60)
0361
0362 if not status:
0363 tmp_log.error(out)
0364 tmp_log.error(f"bad DDM response to get size of {dataset_name}")
0365 tmp_log.debug("end")
0366 return res_for_failure
0367
0368 g_files_in_ds_map[dataset_name] = file_items
0369
0370
0371 for tmp_lfn, tmp_val in g_files_in_ds_map[dataset_name].items():
0372 if uuid.UUID(tmp_val["guid"]) == uuid.UUID(guid):
0373 ret_map = tmp_val
0374 ret_map["lfn"] = tmp_lfn
0375 ret_map["dataset"] = dataset_name
0376 tmp_log.debug("end")
0377 return True, ret_map
0378
0379 tmp_log.debug("end")
0380 return res_for_failure
0381
0382 def register_dataset_container_with_datasets(
0383 self, container_name: str, files: List, replica_map: Dict, n_sites: int = 1, owner: str = None, max_attempts: int = 3
0384 ) -> Tuple[bool, Dict]:
0385 """
0386 Register a new dataset container with datasets.
0387
0388 Args:
0389 container_name (str): The name of the container.
0390 files (list): The list of files to be included in the dataset.
0391 replica_map (dict): The map of replicas.
0392 n_sites (int, optional): The number of sites. Defaults to 1.
0393 owner (str, optional): The owner of the dataset. Defaults to None.
0394 max_attempts (int, optional): The maximum number of attempts to register the container. Defaults to 3.
0395
0396 Returns:
0397 bool: The status of the registration process.
0398 """
0399 tmp_logger = LogWrapper(_logger, f"register_dataset_container_with_datasets-{naive_utcnow().isoformat('/')}")
0400 tmp_logger.debug(f"register_dataset_container_with_datasets {container_name}")
0401
0402
0403 if owner is not None:
0404 status, user_info = rucioAPI.finger(owner)
0405 if not status:
0406 tmp_logger.debug(f"failed to finger: {user_info}")
0407 else:
0408 owner = user_info["nickname"]
0409 tmp_logger.debug(f"parsed DN={owner}")
0410
0411
0412 files_map = {}
0413 for tmp_file in files:
0414 tmp_locations = sorted(replica_map[tmp_file["dataset"]])
0415 new_locations = []
0416
0417 for tmp_location in tmp_locations:
0418 if not tmp_location.endswith("STAGING"):
0419 new_locations.append(tmp_location)
0420 if not new_locations:
0421 continue
0422 tmp_locations = new_locations
0423 tmp_key = tuple(tmp_locations)
0424 files_map.setdefault(tmp_key, [])
0425
0426 files_map[tmp_key].append(tmp_file)
0427
0428
0429 n_files_per_dataset, _ = divmod(len(files), n_sites)
0430 if n_files_per_dataset == 0:
0431 n_files_per_dataset = 1
0432 max_files_per_dataset = 1000
0433 if n_files_per_dataset >= max_files_per_dataset:
0434 n_files_per_dataset = max_files_per_dataset
0435
0436
0437 dataset_names = []
0438 tmp_index = 1
0439 for tmp_locations, tmp_files in files_map.items():
0440 tmp_sub_index = 0
0441 while tmp_sub_index < len(tmp_files):
0442 tmp_dataset_name = container_name[:-1] + "_%04d" % tmp_index
0443 tmp_ret = self.register_dataset_with_location(
0444 tmp_dataset_name,
0445 tmp_files[tmp_sub_index : tmp_sub_index + n_files_per_dataset],
0446 tmp_locations,
0447 owner=None,
0448 )
0449
0450 if not tmp_ret:
0451 tmp_logger.error(f"failed to register {tmp_dataset_name}")
0452 tmp_logger.debug("end")
0453 return False
0454
0455 dataset_names.append(tmp_dataset_name)
0456 tmp_index += 1
0457 tmp_sub_index += n_files_per_dataset
0458
0459
0460 for attempt in range(max_attempts):
0461 try:
0462 tmp_logger.debug(f"{attempt}/{max_attempts} registerContainer {container_name}")
0463 status = rucioAPI.register_container(container_name, dataset_names)
0464 out = "OK"
0465 break
0466 except Exception:
0467 status = False
0468 err_type, err_value = sys.exc_info()[:2]
0469 out = f"{err_type} {err_value}"
0470 time.sleep(10)
0471
0472 if not status:
0473 tmp_logger.error(out)
0474 tmp_logger.error(f"bad DDM response to register {container_name}")
0475 tmp_logger.debug("end")
0476 return False
0477
0478
0479 tmp_logger.debug(out)
0480 tmp_logger.debug("end")
0481 return True
0482
0483 def register_dataset_with_location(self, dataset_name: str, files: List, locations: List, owner: str = None, max_attempts: int = 3) -> bool:
0484 """
0485 Register a new dataset with locations.
0486
0487 Args:
0488 dataset_name (str): The name of the dataset.
0489 files (list): The list of files to be included in the dataset.
0490 locations (list): The list of locations where the dataset will be registered.
0491 owner (str, optional): The owner of the dataset. Defaults to None.
0492 max_attempts (int, optional): The maximum number of attempts to register the dataset. Defaults to 3.
0493
0494 Returns:
0495 bool: The status of the registration process.
0496 """
0497 tmp_logger = LogWrapper(_logger, f"register_dataset_with_location-{naive_utcnow().isoformat('/')}")
0498 tmp_logger.debug(f"register_dataset_with_location {dataset_name}")
0499
0500 res_for_failure = False
0501
0502
0503 guids = []
0504 lfns = []
0505 file_sizes = []
0506 checksums = []
0507 for tmp_file in files:
0508 guids.append(tmp_file["guid"])
0509 lfns.append(tmp_file["scope"] + ":" + tmp_file["lfn"])
0510 file_sizes.append(int(tmp_file["filesize"]))
0511 checksums.append(tmp_file["checksum"])
0512
0513
0514 for attempt in range(max_attempts):
0515 try:
0516 tmp_logger.debug(f"{attempt}/{max_attempts} registerNewDataset {dataset_name} len={len(files)}")
0517 out = rucioAPI.register_dataset(dataset_name, lfns, guids, file_sizes, checksums, lifetime=self.dataset_lifetime)
0518 tmp_logger.debug(out)
0519 break
0520 except Exception:
0521 err_type, err_value = sys.exc_info()[:2]
0522 tmp_logger.error(f"{err_type} {err_value}")
0523 if attempt + 1 == max_attempts:
0524 tmp_logger.error(f"failed to register {dataset_name} in rucio")
0525 tmp_logger.debug("end")
0526 return res_for_failure
0527 time.sleep(10)
0528
0529
0530 for attempt in range(max_attempts):
0531 tmp_logger.debug(f"{attempt}/{max_attempts} freezeDataset {dataset_name}")
0532 try:
0533 rucioAPI.close_dataset(dataset_name)
0534 status = True
0535 except Exception:
0536 err_type, err_value = sys.exc_info()[:2]
0537 out = f"failed to freeze : {err_type} {err_value}"
0538 status = False
0539 if not status:
0540 time.sleep(10)
0541 else:
0542 break
0543 if not status:
0544 tmp_logger.error(out)
0545 tmp_logger.error(f"bad DDM response to freeze {dataset_name}")
0546 tmp_logger.debug("end")
0547 return res_for_failure
0548
0549
0550 for tmp_location in locations:
0551 for attempt in range(max_attempts):
0552 try:
0553 tmp_logger.debug(f"{attempt}/{max_attempts} registerDatasetLocation {dataset_name} {tmp_location}")
0554 out = rucioAPI.register_dataset_location(dataset_name, [tmp_location], self.dataset_lifetime, owner)
0555 tmp_logger.debug(out)
0556 status = True
0557 break
0558
0559 except Exception:
0560 status = False
0561 err_type, err_value = sys.exc_info()[:2]
0562 tmp_logger.error(f"{err_type} {err_value}")
0563 if attempt + 1 == max_attempts:
0564 tmp_logger.error(f"failed to register {dataset_name} in rucio")
0565 tmp_logger.debug("end")
0566 return res_for_failure
0567 time.sleep(10)
0568
0569 if not status:
0570 tmp_logger.error(out)
0571 tmp_logger.error(f"bad DDM response to register location {dataset_name}")
0572 tmp_logger.debug("end")
0573 return res_for_failure
0574 return True
0575
0576 def get_datasets_by_guids(self, out_map: Dict, guids: List[str], dataset_filters: List[str]) -> Tuple[bool, Dict]:
0577 """
0578 Get datasets by GUIDs.
0579
0580 Args:
0581 out_map (dict): The output map.
0582 guids (list): The list of GUIDs.
0583 dataset_filters (list): The list of dataset filters.
0584
0585 Returns:
0586 tuple: A tuple containing a boolean status and a dictionary.
0587 """
0588 tmp_logger = LogWrapper(_logger, f"get_datasets_by_guids-{naive_utcnow().isoformat('/')}")
0589 tmp_logger.debug(f"get_datasets_by_guids {str(guids)}")
0590
0591 ret_map = {}
0592 try:
0593 for guid in guids:
0594 tmp_dataset_names = []
0595 if guid not in out_map:
0596 tmp_logger.error(f"GUID={guid} not found")
0597 tmp_logger.debug("end")
0598 return False, {}
0599
0600 for tmp_dataset_name in out_map[guid]:
0601 if (
0602 tmp_dataset_name.startswith("panda")
0603 or tmp_dataset_name.startswith("user")
0604 or tmp_dataset_name.startswith("group")
0605 or tmp_dataset_name.startswith("archive")
0606 or re.search("_sub\d+$", tmp_dataset_name) is not None
0607 or re.search("_dis\d+$", tmp_dataset_name) is not None
0608 or re.search("_shadow$", tmp_dataset_name) is not None
0609 ):
0610 continue
0611
0612 if dataset_filters:
0613 flag_match = False
0614 for tmp_filter in dataset_filters:
0615 if fnmatch.fnmatchcase(tmp_dataset_name, tmp_filter):
0616 flag_match = True
0617 break
0618 if not flag_match:
0619 continue
0620
0621 tmp_dataset_names.append(tmp_dataset_name)
0622
0623 if not tmp_dataset_names:
0624 tmp_logger.debug(f"no datasets found for GUID={guid}")
0625 continue
0626
0627 if len(tmp_dataset_names) != 1:
0628 tmp_logger.debug(f"use the first dataset in {str(tmp_dataset_names)} for GUID:{guid}")
0629
0630 ret_map[guid] = tmp_dataset_names[0]
0631 except Exception as e:
0632 tmp_logger.error(f"failed to parse get_datasets_by_guids: {e}")
0633 tmp_logger.debug("end")
0634 return False, {}
0635
0636 return True, ret_map
0637
0638 def list_datasets_by_guids(self, guids: List[str], dataset_filters: List[str], max_attempts: int = 3) -> Tuple[bool, Dict]:
0639 """
0640 List datasets by GUIDs.
0641
0642 Args:
0643 guids (list): The list of GUIDs.
0644 dataset_filters (list): The list of dataset filters.
0645 max_attempts (int, optional): The maximum number of attempts to list the datasets. Defaults to 3.
0646
0647 Returns:
0648 tuple: A tuple containing the status (bool) and the result (dict or str).
0649 """
0650 tmp_logger = LogWrapper(_logger, f"list_datasets_by_guids-{naive_utcnow().isoformat('/')}")
0651 tmp_logger.debug(f"list_datasets_by_guids {str(guids)}")
0652
0653 res_for_failure = (False, {})
0654 res_for_fatal = (False, {"isFatal": True})
0655
0656
0657 for attempt in range(max_attempts):
0658 tmp_logger.debug(f"{attempt}/{max_attempts} listDatasetsByGUIDs GUIDs={str(guids)}")
0659 try:
0660 out = rucioAPI.list_datasets_by_guids(guids)
0661 status = True
0662 break
0663 except Exception:
0664 err_type, err_value = sys.exc_info()[:2]
0665 out = f"failed to get datasets with GUIDs : {err_type} {err_value}"
0666 status = False
0667 time.sleep(10)
0668
0669 if not status:
0670 tmp_logger.error(out)
0671 tmp_logger.error(f"bad DDM response to get size of {str(guids)}")
0672 if "DataIdentifierNotFound" in out:
0673 tmp_logger.error("DataIdentifierNotFound in listDatasetsByGUIDs")
0674 tmp_logger.debug("end")
0675 return res_for_fatal
0676 tmp_logger.debug("end")
0677 return res_for_failure
0678
0679 tmp_logger.debug(out)
0680
0681
0682 status, ret_map = self.get_datasets_by_guids(out, guids, dataset_filters)
0683
0684 if not status:
0685 return res_for_failure
0686
0687 tmp_logger.debug("end")
0688 return True, ret_map
0689
0690 def convert_evt_run_to_datasets(
0691 self, event_run_list: List, dataset_type: str, stream_name: str, dataset_filters: List, ami_tag: str, run_evt_guid_map: Dict
0692 ) -> Tuple[bool, Dict, List]:
0693 """
0694 Convert event/run list to datasets.
0695
0696 Args:
0697 event_run_list (list): The list of run events.
0698 dataset_type (str): The type of the dataset.
0699 stream_name (str): The name of the stream.
0700 dataset_filters (list): The list of dataset filters.
0701 ami_tag (str): The AMI tag.
0702 run_evt_guid_map (dict): The map of run events to GUIDs.
0703
0704 Returns:
0705 tuple: A tuple containing the status (bool), the result (dict or str), and the list of all files.
0706 """
0707 tmp_logger = LogWrapper(_logger, f"convert_evt_run_to_datasets-{naive_utcnow().isoformat('/')}")
0708 tmp_logger.debug(f"convert_evt_run_to_datasets type={dataset_type} stream={stream_name} dsPatt={str(dataset_filters)} amitag={ami_tag}")
0709
0710
0711 failed_ret = False, {}, []
0712 fatal_ret = False, {"isFatal": True}, []
0713 stream_ref = "Stream" + dataset_type
0714
0715 if run_evt_guid_map == {}:
0716 if len(event_run_list) == 0:
0717 tmp_logger.error("Empty list for run and events was provided")
0718 tmp_logger.debug("end")
0719 return failed_ret
0720
0721 from .event_lookup_client_ei import EventLookupClientEI
0722
0723 event_lookup_if = EventLookupClientEI()
0724
0725 n_events_per_loop = 500
0726 i_events_total = 0
0727 while i_events_total < len(event_run_list):
0728 tmp_event_run_list = event_run_list[i_events_total : i_events_total + n_events_per_loop]
0729 tmp_logger.debug(f"EI lookup for {i_events_total}/{len(event_run_list)}")
0730 i_events_total += n_events_per_loop
0731 reg_start = naive_utcnow()
0732 guid_list_elssi, tmp_com, tmp_out, tmp_err = event_lookup_if.do_lookup(
0733 tmp_event_run_list,
0734 stream=stream_name,
0735 tokens=stream_ref,
0736 ami_tag=ami_tag,
0737 )
0738 reg_time = naive_utcnow() - reg_start
0739 tmp_logger.debug(f"EI command: {tmp_com}")
0740 tmp_logger.debug(f"took {reg_time.seconds}.{reg_time.microseconds / 1000:03f} sec for {len(tmp_event_run_list)} events")
0741
0742 if tmp_err not in [None, ""] or len(guid_list_elssi) == 0:
0743 tmp_logger.debug(tmp_com)
0744 tmp_logger.debug(tmp_out)
0745 tmp_logger.debug(tmp_err)
0746 tmp_logger.error("invalid return from EventIndex")
0747 tmp_logger.debug("end")
0748 return failed_ret
0749
0750 for run_nr, evt_nr in tmp_event_run_list:
0751 param_str = f"Run:{run_nr} Evt:{evt_nr} Stream:{stream_name}"
0752 tmp_logger.debug(param_str)
0753 tmp_run_evt_key = (int(run_nr), int(evt_nr))
0754
0755 if tmp_run_evt_key not in guid_list_elssi or len(guid_list_elssi[tmp_run_evt_key]) == 0:
0756 tmp_logger.debug(tmp_com)
0757 tmp_logger.debug(tmp_out)
0758 tmp_logger.debug(tmp_err)
0759 tmp_logger.error(f"no GUIDs were found in EventIndex for {param_str}")
0760 tmp_logger.debug("end")
0761 return fatal_ret
0762
0763 run_evt_guid_map[tmp_run_evt_key] = guid_list_elssi[tmp_run_evt_key]
0764
0765 all_datasets = []
0766 all_files = []
0767 all_locations = {}
0768 for tmp_idx in run_evt_guid_map:
0769 tmp_guids = run_evt_guid_map[tmp_idx]
0770 run_nr, evt_nr = tmp_idx
0771 tmp_ds_ret, tmp_dataset_map = self.list_datasets_by_guids(tmp_guids, dataset_filters)
0772
0773 if not tmp_ds_ret:
0774 tmp_logger.error(f"failed to convert GUIDs to datasets")
0775 if "isFatal" in tmp_dataset_map and tmp_dataset_map["isFatal"] is True:
0776 tmp_logger.debug("end")
0777 return fatal_ret
0778 tmp_logger.debug("end")
0779 return failed_ret
0780
0781 if not tmp_dataset_map:
0782 tmp_logger.error(f"there is no dataset for Run:{run_nr} Evt:{evt_nr} GUIDs:{str(tmp_guids)}")
0783 tmp_logger.debug("end")
0784 return fatal_ret
0785 if len(tmp_dataset_map) != 1:
0786 tmp_logger.error(f"there are multiple datasets {str(tmp_dataset_map)} for Run:{run_nr} Evt:{evt_nr} GUIDs:{str(tmp_guids)}")
0787 tmp_logger.debug("end")
0788 return fatal_ret
0789
0790
0791 for tmp_guid, tmp_dataset_name in tmp_dataset_map.items():
0792
0793 if tmp_dataset_name not in all_datasets:
0794 all_datasets.append(tmp_dataset_name)
0795
0796 stat_rep, replica_map = self.get_list_dataset_replicas(tmp_dataset_name)
0797
0798 if not stat_rep:
0799 tmp_logger.error(f"failed to get locations for {tmp_dataset_name}")
0800 tmp_logger.debug("end")
0801 return failed_ret
0802
0803 tmp_location_list = []
0804 for tmp_location in replica_map:
0805
0806 ds_stat_dict = replica_map[tmp_location][0]
0807 if ds_stat_dict["total"] is not None and ds_stat_dict["total"] == ds_stat_dict["found"]:
0808 if tmp_location not in tmp_location_list:
0809 tmp_location_list.append(tmp_location)
0810 all_locations[tmp_dataset_name] = tmp_location_list
0811
0812
0813 tmp_file_ret, tmp_file_info = self.get_file_from_dataset(tmp_dataset_name, tmp_guid)
0814
0815 if not tmp_file_ret:
0816 tmp_logger.error(f"failed to get fileinfo for GUID:{tmp_guid} DS:{tmp_dataset_name}")
0817 tmp_logger.debug("end")
0818 return failed_ret
0819
0820 all_files.append(tmp_file_info)
0821
0822 tmp_logger.debug(f"converted to {str(all_datasets)}, {str(all_locations)}, {str(all_files)}")
0823 tmp_logger.debug("end")
0824 return True, all_locations, all_files