File indexing completed on 2026-04-10 08:39:06
0001 import copy
0002 import functools
0003 import json
0004 import os
0005 import random
0006 import re
0007 import socket
0008 import time
0009 import traceback
0010 from collections import namedtuple
0011 from contextlib import contextmanager
0012 from dataclasses import MISSING, InitVar, asdict, dataclass, field
0013 from datetime import datetime, timedelta
0014 from typing import Any, Dict, List
0015
0016 from pandacommon.pandalogger.LogWrapper import LogWrapper
0017 from pandacommon.pandalogger.PandaLogger import PandaLogger
0018 from pandacommon.pandautils.base import SpecBase
0019 from pandacommon.pandautils.PandaUtils import get_sql_IN_bind_variables, naive_utcnow
0020
0021 from pandaserver.config import panda_config
0022 from pandaserver.dataservice.ddm import rucioAPI
0023
0024 import polars as pl
0025
0026 import idds.common.constants
0027 import idds.common.utils
0028 from idds.client.client import Client as iDDS_Client
0029
0030
0031
0032 logger = PandaLogger().getLogger(__name__.split(".")[-1])
0033
0034
0035
0036
0037
0038 GLOBAL_DC_LOCK_NAME = "DataCarousel"
0039
0040
0041 DC_CONFIG_SCHEMA_VERSION = 1
0042
0043
0044 FINAL_TASK_STATUSES = ["done", "finished", "failed", "exhausted", "aborted", "toabort", "aborting", "broken", "tobroken"]
0045
0046
0047 AttributeWithType = namedtuple("AttributeWithType", ["attribute", "type"])
0048
0049
0050 DDM_RULE_ACTIVITY_MAP = {"anal": "Data Carousel Analysis", "prod": "Data Carousel Production"}
0051
0052
0053
0054 SRC_REPLI_EXPR_PREFIX = "rse_type=DISK"
0055
0056 TO_PIN_DST_REPLI_EXPR = "type=DATADISK"
0057
0058 AVAIL_REPLI_EXPR_SUFFIX = "&availability_write&availability_read"
0059
0060
0061 STAGING_LIFETIME_DAYS = 15
0062 DONE_LIFETIME_DAYS = 30
0063
0064
0065 TO_REFRESH_MAX_LIFETIME_DAYS = 7
0066 TO_REFRESH_MIN_LIFETIME_HOURS = 2
0067
0068
0069 QUEUE_FAIR_SHARE_MAX_QUOTA = 10000
0070
0071
0072 pl.Config.set_ascii_tables(True)
0073 pl.Config.set_tbl_hide_dataframe_shape(True)
0074 pl.Config.set_tbl_hide_column_data_types(True)
0075 pl.Config.set_tbl_rows(-1)
0076 pl.Config.set_tbl_cols(-1)
0077 pl.Config.set_tbl_width_chars(140)
0078
0079
0080
0081
0082 class DataCarouselRequestStatus(object):
0083 """
0084 Data carousel request status
0085 """
0086
0087 queued = "queued"
0088 staging = "staging"
0089 done = "done"
0090 cancelled = "cancelled"
0091 retired = "retired"
0092
0093 active_statuses = [queued, staging]
0094 final_statuses = [done, cancelled, retired]
0095 unfinished_statuses = [staging, cancelled]
0096 reusable_statuses = [queued, staging, done]
0097 resubmittable_statuses = [staging, cancelled, done, retired]
0098
0099
0100 class DataCarouselRequestSpec(SpecBase):
0101 """
0102 Data carousel request specification
0103 """
0104
0105
0106 attributes_with_types = (
0107 AttributeWithType("request_id", int),
0108 AttributeWithType("dataset", str),
0109 AttributeWithType("source_rse", str),
0110 AttributeWithType("destination_rse", str),
0111 AttributeWithType("ddm_rule_id", str),
0112 AttributeWithType("status", str),
0113 AttributeWithType("total_files", int),
0114 AttributeWithType("staged_files", int),
0115 AttributeWithType("dataset_size", int),
0116 AttributeWithType("staged_size", int),
0117 AttributeWithType("creation_time", datetime),
0118 AttributeWithType("start_time", datetime),
0119 AttributeWithType("end_time", datetime),
0120 AttributeWithType("modification_time", datetime),
0121 AttributeWithType("check_time", datetime),
0122 AttributeWithType("source_tape", str),
0123 AttributeWithType("parameters", str),
0124 AttributeWithType("last_staged_time", datetime),
0125 AttributeWithType("locked_by", str),
0126 AttributeWithType("lock_time", datetime),
0127 )
0128
0129 attributes = tuple([attr.attribute for attr in attributes_with_types])
0130
0131 _zeroAttrs = ()
0132
0133 _forceUpdateAttrs = ()
0134
0135 _seqAttrMap = {"request_id": f"{panda_config.schemaJEDI}.JEDI_DATA_CAROUSEL_REQUEST_ID_SEQ.nextval"}
0136
0137 @property
0138 def parameter_map(self) -> dict:
0139 """
0140 Get the dictionary parsed by the parameters attribute in JSON
0141 Possible parameters:
0142 "reuse_rule" (bool): reuse DDM rule instead of submitting new one
0143 "resub_from" (int): resubmitted from this oringal request ID
0144 "prev_src" (str): previous source RSE
0145 "prev_dst" (str): previous destination RSE
0146 "excluded_dst_list" (list[str]): list of excluded destination RSEs
0147 "rule_unfound" (bool): DDM rule not found
0148 "to_pin" (bool): whether to pin the dataset
0149 "suggested_dst_list" (list[str]): list of suggested destination RSEs
0150 "remove_when_done" (bool): remove request and DDM rule asap when request done to save disk space
0151 "task_id": (int): task_id of the task which initiates the request
0152 "init_task_gshare": (str): (deprecated) original gshare of the task which initiates the request, for statistics
0153 "task_gshare": (str): original gshare of the task which initiates the request, for statistics
0154 "task_type": (str): type of the task (prod, anal) which initiates this request, for statistics
0155 "task_user": (str): user of the task which initiates this request, for statistics
0156 "task_group": (str): working group of the task which initiates this request, for statistics
0157
0158 Returns:
0159 dict : dict of parameters if it is JSON or empty dict if null
0160 """
0161 if self.parameters is None:
0162 return {}
0163 else:
0164 return json.loads(self.parameters)
0165
0166 @parameter_map.setter
0167 def parameter_map(self, value_map: dict):
0168 """
0169 Set the dictionary and store in parameters attribute in JSON
0170
0171 Args:
0172 value_map (dict): dict to set the parameter map
0173 """
0174 self.parameters = json.dumps(value_map)
0175
0176 def get_parameter(self, param: str) -> Any:
0177 """
0178 Get the value of one parameter. None as default
0179
0180 Args:
0181 param (str): parameter name
0182
0183 Returns:
0184 Any : value of the parameter; None if parameter not set
0185 """
0186 tmp_dict = self.parameter_map
0187 return tmp_dict.get(param)
0188
0189 def set_parameter(self, param: str, value):
0190 """
0191 Set the value of one parameter and store in parameters attribute in JSON
0192
0193 Args:
0194 param (str): parameter name
0195 value (Any): value of the parameter to set; must be JSON-serializable
0196 """
0197 tmp_dict = self.parameter_map
0198 tmp_dict[param] = value
0199 self.parameter_map = tmp_dict
0200
0201 def update_parameters(self, params: dict):
0202 """
0203 Update values of parameters with a dict and store in parameters attribute in JSON
0204
0205 Args:
0206 params (dict): dict of parameter names and values to set
0207 """
0208 tmp_dict = self.parameter_map
0209 tmp_dict.update(params)
0210 self.parameter_map = tmp_dict
0211
0212
0213 class DataCarouselRequestTransaction(object):
0214 """
0215 Data Carousel request transaction object
0216 This is a wrapper for DataCarouselRequestSpec to provide additional methods about DB transaction
0217 """
0218
0219 def __init__(self, dc_req_spec: DataCarouselRequestSpec, db_cur, db_log):
0220 """
0221 Constructor
0222
0223 Args:
0224 dc_req_spec (DataCarouselRequestSpec): request specification
0225 db_cur: database cursor for DB operations
0226 db_log: database logger for logging DB operations
0227 """
0228 self.spec = dc_req_spec
0229 self.db_cur = db_cur
0230 self.db_log = db_log
0231
0232 def update_spec(self, dc_req_spec: DataCarouselRequestSpec, to_db: bool = True) -> bool | None:
0233 """
0234 Update the request specification with a new one
0235
0236 Args:
0237 dc_req_spec (DataCarouselRequestSpec): new request specification to update
0238 to_db (bool): whether to update the request in DB; default True
0239
0240 Returns:
0241 bool|None : True if updated in DB successfully, False if failed to update in DB, None if not updating in DB
0242 """
0243 if to_db:
0244
0245 comment = " /* DataCarouselRequestTransaction.update_spec */"
0246 dc_req_spec.modification_time = naive_utcnow()
0247 sql_update = (
0248 f"UPDATE {panda_config.schemaJEDI}.data_carousel_requests " f"SET {dc_req_spec.bindUpdateChangesExpression()} " "WHERE request_id=:request_id "
0249 )
0250 var_map = dc_req_spec.valuesMap(useSeq=False, onlyChanged=True)
0251 var_map[":request_id"] = dc_req_spec.request_id
0252 self.db_cur.execute(sql_update + comment, var_map)
0253 if self.db_cur.rowcount == 0:
0254
0255 self.db_log.warning(f"request_id={dc_req_spec.request_id} not updated")
0256 return False
0257 else:
0258 self.spec = dc_req_spec
0259 self.db_log.debug(f"updated request_id={dc_req_spec.request_id} {dc_req_spec.bindUpdateChangesExpression()}")
0260 return True
0261 else:
0262 self.spec = dc_req_spec
0263 return None
0264
0265
0266
0267
0268
0269
0270
0271 @dataclass
0272 class SourceTapeConfig:
0273 """
0274 Dataclass for source tape configuration parameters
0275
0276 Fields:
0277 active (bool) : whether the tape is active
0278 max_size (int) : maximum number of n_files_queued + nfiles_staging from this tape
0279 max_staging_ratio (int) : maximum ratio percent of nfiles_staging / (n_files_queued + nfiles_staging)
0280 destination_expression (str) : rse_expression for DDM to filter the destination RSE
0281 """
0282
0283 active: bool = False
0284 max_size: int = 10000
0285 max_staging_ratio: int = 50
0286 destination_expression: str = "type=DATADISK&datapolicynucleus=True&freespace>300"
0287
0288
0289 @dataclass
0290 class SourceRSEConfig:
0291 """
0292 Dataclass for source RSE configuration parameters
0293
0294 Fields:
0295 tape (str) : is mapped to this source physical tape
0296 active (bool|None) : whether the source RSE is active
0297 max_size (int|None) : maximum number of n_files_queued + nfiles_staging from this RSE
0298 max_staging_ratio (int|None) : maximum ratio percent of nfiles_staging / (n_files_queued + nfiles_staging)
0299 """
0300
0301 tape: str
0302 active: bool | None = None
0303 max_size: int | None = None
0304 max_staging_ratio: int | None = None
0305
0306
0307
0308 @dataclass
0309 class DataCarouselMainConfig:
0310 """
0311 Dataclass for DataCarousel main configuration parameters
0312
0313 Fields:
0314 source_tapes_config (dict) : configuration of source physical tapes, in form of {"TAPE_1": SourceTapeConfig_of_TAPE_1, ...}
0315 source_rses_config (dict) : configuration of source RSEs, each should be mapped to some source tape, in form of {"RSE_1": SourceRSEConfig_of_RSE_1, ...}
0316 excluded_destinations (list) : excluded destination RSEs
0317 early_access_users (list) : PanDA user names for early access to Data Carousel
0318 """
0319
0320 source_tapes_config: Dict[str, Any] = field(default_factory=dict)
0321 source_rses_config: Dict[str, Any] = field(default_factory=dict)
0322 excluded_destinations: List[str] = field(default_factory=list)
0323 early_access_users: List[str] = field(default_factory=list)
0324
0325 def __post_init__(self):
0326
0327 converting_attr_type_map = {
0328 "source_tapes_config": SourceTapeConfig,
0329 "source_rses_config": SourceRSEConfig,
0330 }
0331
0332 for attr, klass in converting_attr_type_map.items():
0333 if isinstance(_map := getattr(self, attr, None), dict):
0334 converted_dict = {}
0335 for key, value in _map.items():
0336 converted_dict[key] = klass(**value)
0337 setattr(self, attr, converted_dict)
0338
0339
0340
0341
0342
0343
0344
0345 def get_resubmit_request_spec(dc_req_spec: DataCarouselRequestSpec, exclude_prev_dst: bool = False) -> DataCarouselRequestSpec | None:
0346 """
0347 Get a new request spec to resubmit according to original request spec
0348
0349 Args:
0350 dc_req_spec (DataCarouselRequestSpec): oringal spec of the request
0351 exclude_prev_dst (bool): whether to exclude previous destination
0352
0353 Returns:
0354 DataCarouselRequestSpec|None : spec of the request to resubmit, or None if failed
0355 """
0356 tmp_log = LogWrapper(logger, f"get_resubmit_request_spec")
0357 try:
0358
0359 now_time = naive_utcnow()
0360 dc_req_spec_to_resubmit = DataCarouselRequestSpec()
0361
0362 dc_req_spec_to_resubmit.staged_files = 0
0363 dc_req_spec_to_resubmit.staged_size = 0
0364 dc_req_spec_to_resubmit.status = DataCarouselRequestStatus.queued
0365 dc_req_spec_to_resubmit.creation_time = now_time
0366
0367
0368 dc_req_spec_to_resubmit.dataset = dc_req_spec.dataset
0369 dc_req_spec_to_resubmit.total_files = dc_req_spec.total_files
0370 dc_req_spec_to_resubmit.dataset_size = dc_req_spec.dataset_size
0371 dc_req_spec_to_resubmit.source_rse = dc_req_spec.source_rse
0372 dc_req_spec_to_resubmit.source_tape = dc_req_spec.source_tape
0373
0374 orig_parameter_map = dc_req_spec.parameter_map
0375
0376 excluded_dst_set = set()
0377 if exclude_prev_dst:
0378
0379 excluded_dst_set.add(dc_req_spec.destination_rse)
0380
0381 dc_req_spec_to_resubmit.parameter_map = {
0382 "resub_from": dc_req_spec.request_id,
0383 "prev_src": dc_req_spec.source_rse,
0384 "prev_dst": dc_req_spec.destination_rse,
0385 "excluded_dst_list": list(excluded_dst_set),
0386 "task_id": orig_parameter_map.get("task_id"),
0387 "task_gshare": orig_parameter_map.get("task_gshare"),
0388 "init_task_gshare": orig_parameter_map.get("init_task_gshare"),
0389 }
0390
0391 tmp_log.debug(f"got resubmit request spec for request_id={dc_req_spec.request_id}")
0392 return dc_req_spec_to_resubmit
0393 except Exception:
0394 tmp_log.error(f"got error ; {traceback.format_exc()}")
0395
0396
0397
0398
0399
0400 class DataCarouselInterface(object):
0401 """
0402 Interface for data carousel methods
0403 """
0404
0405
0406 def __init__(self, taskbufferIF, *args, **kwargs):
0407
0408 self.taskBufferIF = taskbufferIF
0409 self.ddmIF = rucioAPI
0410 self.tape_rses = []
0411 self.datadisk_rses = []
0412 self.disk_rses = []
0413 self.dc_config_map = None
0414 self._last_update_ts_dict = {}
0415
0416 self.full_pid = f"{socket.getfqdn().split('.')[0]}-{os.getpgrp()}-{os.getpid()}"
0417
0418 self._refresh_all_attributes()
0419
0420 def _refresh_all_attributes(self):
0421 """
0422 Refresh by calling all update methods
0423 """
0424 self._update_rses(time_limit_minutes=30)
0425 self._update_dc_config(time_limit_minutes=5)
0426
0427 @staticmethod
0428 def refresh(func):
0429 """
0430 Decorator to call _refresh_all_attributes before the method
0431 """
0432
0433 @functools.wraps(func)
0434 def wrapper(self, *args, **kwargs):
0435 self._refresh_all_attributes()
0436 return func(self, *args, **kwargs)
0437
0438 return wrapper
0439
0440 def _acquire_global_dc_lock(self, timeout_sec: int = 10, lock_expiration_sec: int = 30) -> str | None:
0441 """
0442 Acquire global Data Carousel lock in DB
0443
0444 Args:
0445 timeout_sec (int): timeout in seconds for blocking to retry
0446 lock_expiration_sec (int): age of lock in seconds to be considered expired and can be acquired immediately
0447
0448 Returns:
0449 str|None : full process ID of this process if got lock; None if timeout
0450 """
0451 tmp_log = LogWrapper(logger, f"_acquire_global_dc_lock pid={self.full_pid}")
0452
0453 start_mono_time = time.monotonic()
0454 while time.monotonic() - start_mono_time <= timeout_sec:
0455
0456 got_lock = self.taskBufferIF.lockProcess_PANDA(
0457 component=GLOBAL_DC_LOCK_NAME,
0458 pid=self.full_pid,
0459 time_limit=lock_expiration_sec / 60.0,
0460 )
0461 if got_lock:
0462
0463 tmp_log.debug(f"got lock")
0464 return self.full_pid
0465 else:
0466
0467 time.sleep(0.05)
0468
0469 tmp_log.debug(f"timed out; skipped")
0470 return None
0471
0472 def _release_global_dc_lock(self, full_pid: str | None):
0473 """
0474 Release global Data Carousel lock in DB
0475
0476 Args:
0477 full_pid (str|None): full process ID which acquired the lock; if None, use self.full_pid
0478 """
0479 tmp_log = LogWrapper(logger, f"_release_global_dc_lock pid={full_pid}")
0480
0481 if full_pid is None:
0482 full_pid = self.full_pid
0483
0484 ret = self.taskBufferIF.unlockProcess_PANDA(
0485 component=GLOBAL_DC_LOCK_NAME,
0486 pid=full_pid,
0487 )
0488 if ret:
0489
0490 tmp_log.debug(f"released lock")
0491 else:
0492
0493 tmp_log.error(f"failed to released lock; skipped")
0494 return ret
0495
0496 @contextmanager
0497 def global_dc_lock(self, timeout_sec: int = 10, lock_expiration_sec: int = 30):
0498 """
0499 Context manager for global Data Carousel lock in DB
0500
0501 Args:
0502 timeout_sec (int): timeout in seconds for blocking to retry
0503 lock_expiration_sec (int): age of lock in seconds to be considered expired and can be acquired immediately
0504
0505 Yields:
0506 str : full process ID of this process if got lock; None if timeout
0507 """
0508
0509
0510 full_pid = self._acquire_global_dc_lock(timeout_sec, lock_expiration_sec)
0511 try:
0512 yield full_pid
0513 finally:
0514 self._release_global_dc_lock(full_pid)
0515
0516 def get_request_by_id(self, request_id: int) -> DataCarouselRequestSpec | None:
0517 """
0518 Get the spec of the request specified by request_id
0519
0520 Args:
0521 request_id (int): request_id of the request
0522
0523 Returns:
0524 DataCarouselRequestSpec|None : spec of the request, or None if failed
0525 """
0526 tmp_log = LogWrapper(logger, f"get_request_by_id request_id={request_id}")
0527 sql = f"SELECT {DataCarouselRequestSpec.columnNames()} " f"FROM {panda_config.schemaJEDI}.data_carousel_requests " f"WHERE request_id=:request_id "
0528 var_map = {":request_id": request_id}
0529 res_list = self.taskBufferIF.querySQL(sql, var_map, arraySize=99999)
0530 if res_list is not None:
0531 if len(res_list) > 1:
0532 tmp_log.error("more than one requests; unexpected")
0533 else:
0534 for res in res_list:
0535 dc_req_spec = DataCarouselRequestSpec()
0536 dc_req_spec.pack(res)
0537 return dc_req_spec
0538 else:
0539 tmp_log.warning("no request found; skipped")
0540 return None
0541
0542 def get_request_by_dataset(self, dataset: str) -> DataCarouselRequestSpec | None:
0543 """
0544 Get the reusable request (not in cancelled or retired) of the dataset
0545
0546 Args:
0547 dataset (str): dataset name
0548
0549 Returns:
0550 DataCarouselRequestSpec|None : spec of the request of dataset, or None if failed
0551 """
0552 tmp_log = LogWrapper(logger, f"get_request_by_dataset dataset={dataset}")
0553 status_var_names_str, status_var_map = get_sql_IN_bind_variables(DataCarouselRequestStatus.reusable_statuses, prefix=":status")
0554 sql = (
0555 f"SELECT {DataCarouselRequestSpec.columnNames()} "
0556 f"FROM {panda_config.schemaJEDI}.data_carousel_requests "
0557 f"WHERE dataset=:dataset "
0558 f"AND status IN ({status_var_names_str}) "
0559 )
0560 var_map = {":dataset": dataset}
0561 var_map.update(status_var_map)
0562 res_list = self.taskBufferIF.querySQL(sql, var_map, arraySize=99999)
0563 if res_list is not None:
0564 if len(res_list) > 1:
0565 tmp_log.error("more than one reusable requests; unexpected")
0566 else:
0567 for res in res_list:
0568 dc_req_spec = DataCarouselRequestSpec()
0569 dc_req_spec.pack(res)
0570 return dc_req_spec
0571 else:
0572 tmp_log.warning("no reusable request; skipped")
0573 return None
0574
0575
0576
0577
0578
0579
0580
0581
0582
0583
0584
0585
0586
0587
0588
0589
0590
0591
0592
0593
0594
0595
0596
0597
0598
0599
0600
0601
0602
0603
0604
0605
0606
0607
0608
0609
0610
0611
0612
0613
0614
0615
0616
0617
0618
0619
0620
0621
0622
0623
0624
0625
0626
0627
0628
0629
0630
0631
0632
0633
0634
0635
0636
0637
0638
0639
0640
0641
0642
0643
0644
0645
0646
0647
0648
0649
0650
0651
0652
0653
0654
0655
0656
0657
0658
0659
0660
0661
0662
0663
0664
0665
0666
0667
0668
0669
0670
0671
0672
0673
0674
0675
0676
0677
0678
0679
0680
0681
0682
0683
0684
0685
0686
0687
0688
0689
0690 @contextmanager
0691 def request_lock(self, request_id: int, lock_expiration_sec: int = 120):
0692 """
0693 Context manager to lock and unlock the Data Carousel request for update into DB
0694
0695 Args:
0696 request_id (int): request ID of the Data Carousel request to acquire lock
0697 lock_expiration_sec (int): age of lock in seconds to be considered expired and can be acquired immediately
0698
0699 Yields:
0700 DataCarouselRequestSpec|None : spec of the request if got lock; None if exception or failed
0701 """
0702 tmp_log = LogWrapper(logger, f"request_lock pid={self.full_pid} request_id={request_id}")
0703
0704 got_lock = None
0705 locked_spec = None
0706 try:
0707
0708 sql_lock = (
0709 f"UPDATE {panda_config.schemaJEDI}.data_carousel_requests "
0710 f"SET locked_by=:locked_by, lock_time=:lock_time "
0711 f"WHERE request_id=:request_id "
0712 f"AND (locked_by IS NULL OR locked_by=:locked_by OR lock_time < :min_lock_time) "
0713 )
0714 var_map = {
0715 ":locked_by": self.full_pid,
0716 ":lock_time": naive_utcnow(),
0717 ":request_id": request_id,
0718 ":min_lock_time": naive_utcnow() - timedelta(seconds=lock_expiration_sec),
0719 }
0720 row_count = self.taskBufferIF.querySQL(sql_lock, var_map)
0721 if row_count is None:
0722 tmp_log.error(f"failed to update DB to lock; skipped")
0723 elif row_count > 1:
0724 tmp_log.error(f"more than one requests updated to lock; unexpected")
0725 elif row_count == 0:
0726
0727 got_lock = False
0728 tmp_log.debug(f"did not get lock; skipped")
0729 else:
0730
0731 got_lock = True
0732 tmp_log.debug(f"got lock")
0733 if got_lock:
0734
0735 locked_spec = self.get_request_by_id(request_id)
0736 yield locked_spec
0737 else:
0738
0739 yield None
0740 finally:
0741 if got_lock:
0742
0743 sql_unlock = (
0744 f"UPDATE {panda_config.schemaJEDI}.data_carousel_requests "
0745 f"SET locked_by=NULL, lock_time=NULL "
0746 f"WHERE request_id=:request_id AND locked_by=:locked_by "
0747 )
0748 var_map = {
0749 ":request_id": request_id,
0750 ":locked_by": self.full_pid,
0751 }
0752 row_count = self.taskBufferIF.querySQL(sql_unlock, var_map)
0753 if row_count is None:
0754 tmp_log.error(f"failed to update DB to unlock; skipped")
0755 elif row_count > 1:
0756 tmp_log.error(f"more than one requests updated to unlock; unexpected")
0757 elif row_count == 0:
0758 tmp_log.error(f"no request updated to unlock; skipped")
0759 else:
0760 tmp_log.debug(f"released lock")
0761
0762 def _update_rses(self, time_limit_minutes: int | float = 30):
0763 """
0764 Update RSEs per TAPE and DATADISK cached in this object
0765 Run if cache outdated; else do nothing
0766 Check both DATADISK and DISK (including other DISK sources like SCRATCHDISK or LOCALDISK which are transient)
0767
0768 Args:
0769 time_limit_minutes (int|float): time limit of the cache in minutes
0770 """
0771 tmp_log = LogWrapper(logger, "_update_rses")
0772 nickname = "rses"
0773 try:
0774
0775 now_time = naive_utcnow()
0776 self._last_update_ts_dict.setdefault(nickname, None)
0777 last_update_ts = self._last_update_ts_dict[nickname]
0778 if last_update_ts is None or (now_time - last_update_ts) >= timedelta(minutes=time_limit_minutes):
0779
0780 tape_rses = self.ddmIF.list_rses("rse_type=TAPE")
0781 if tape_rses is not None:
0782 self.tape_rses = list(tape_rses)
0783 datadisk_rses = self.ddmIF.list_rses(f"type=DATADISK{AVAIL_REPLI_EXPR_SUFFIX}")
0784 if datadisk_rses is not None:
0785 self.datadisk_rses = list(datadisk_rses)
0786 disk_rses = self.ddmIF.list_rses(f"rse_type=DISK{AVAIL_REPLI_EXPR_SUFFIX}")
0787 if disk_rses is not None:
0788 self.disk_rses = list(disk_rses)
0789
0790
0791
0792 self._last_update_ts_dict[nickname] = naive_utcnow()
0793 except Exception:
0794 tmp_log.error(f"got error ; {traceback.format_exc()}")
0795
0796 def _update_dc_config(self, time_limit_minutes: int | float = 5):
0797 """
0798 Update Data Carousel configuration from DB
0799 Run if cache outdated; else do nothing
0800
0801 Args:
0802 time_limit_minutes (int|float): time limit of the cache in minutes
0803 """
0804 tmp_log = LogWrapper(logger, "_update_dc_config")
0805 nickname = "main"
0806 try:
0807
0808 now_time = naive_utcnow()
0809 self._last_update_ts_dict.setdefault(nickname, None)
0810 last_update_ts = self._last_update_ts_dict[nickname]
0811 if last_update_ts is None or (now_time - last_update_ts) >= timedelta(minutes=time_limit_minutes):
0812
0813 res_dict = self.taskBufferIF.getConfigValue("data_carousel", f"DATA_CAROUSEL_CONFIG", "jedi", "atlas")
0814 if res_dict is None:
0815 tmp_log.warning(f"got None from DB ; skipped")
0816 return
0817
0818 try:
0819 schema_version = res_dict["metadata"]["schema_version"]
0820 except KeyError:
0821 tmp_log.error(f"failed to get metadata.schema_version ; skipped")
0822 return
0823 else:
0824 if schema_version != DC_CONFIG_SCHEMA_VERSION:
0825 tmp_log.error(f"metadata.schema_version does not match ({schema_version} != {DC_CONFIG_SCHEMA_VERSION}); skipped")
0826 return
0827
0828 dc_config_data_dict = res_dict.get("data")
0829 if dc_config_data_dict is None:
0830 tmp_log.error(f"got empty config data; skipped")
0831 return
0832
0833 self.dc_config_map = DataCarouselMainConfig(**dc_config_data_dict)
0834
0835 self._last_update_ts_dict[nickname] = naive_utcnow()
0836 except Exception:
0837 tmp_log.error(f"got error ; {traceback.format_exc()}")
0838
0839 def _get_related_tasks(self, request_id: int) -> list[int] | None:
0840 """
0841 Get all related tasks to the give request
0842
0843 Args:
0844 request_id (int): request_id of the request
0845
0846 Returns:
0847 list[int]|None : list of jediTaskID of related tasks, or None if failed
0848 """
0849
0850 sql = f"SELECT task_id " f"FROM {panda_config.schemaJEDI}.data_carousel_relations " f"WHERE request_id=:request_id " f"ORDER BY task_id "
0851 var_map = {":request_id": request_id}
0852 res = self.taskBufferIF.querySQL(sql, var_map, arraySize=99999)
0853 if res is not None:
0854 ret_list = [x[0] for x in res]
0855 return ret_list
0856 else:
0857 return None
0858
0859 def _get_input_ds_from_task_params(self, task_params_map: dict) -> dict:
0860 """
0861 Get input datasets from tasks parameters
0862
0863 Args:
0864 task_params_map (dict): task parameter map
0865
0866 Returns:
0867 dict : map with elements in form of datasets: jobParameters
0868 """
0869 ret_map = {}
0870 for job_param in task_params_map.get("jobParameters", []):
0871 if job_param.get("param_type") in ["input", "pseudo_input"]:
0872
0873 raw_dataset_str = job_param.get("dataset")
0874 jobparam_dataset_list = []
0875 if raw_dataset_str:
0876 jobparam_dataset_list = raw_dataset_str.split(",")
0877 for dataset in jobparam_dataset_list:
0878 ret_map[dataset] = job_param
0879 return ret_map
0880
0881 def _get_full_replicas_per_type(self, dataset: str) -> dict:
0882 """
0883 Get full replicas per type of a dataset
0884
0885 Args:
0886 dataset (str): dataset name
0887
0888 Returns:
0889 dict : map in form of datasets: jobParameters
0890 """
0891
0892 ds_repli_dict = self.ddmIF.convert_list_dataset_replicas(dataset, use_file_lookup=True, skip_incomplete_element=True)
0893 tape_replicas = []
0894 datadisk_replicas = []
0895 disk_replicas = []
0896 for rse in ds_repli_dict:
0897 if rse in self.tape_rses:
0898 tape_replicas.append(rse)
0899 if rse in self.datadisk_rses:
0900 datadisk_replicas.append(rse)
0901 if rse in self.disk_rses:
0902 disk_replicas.append(rse)
0903
0904 ret = {"tape": tape_replicas, "datadisk": datadisk_replicas, "disk": disk_replicas}
0905
0906 return ret
0907
0908 def _get_filtered_replicas(self, dataset: str) -> tuple[dict, (str | None), bool]:
0909 """
0910 Get filtered replicas of a dataset and the staging rule and whether all replicas are without rules
0911
0912 Args:
0913 dataset (str): dataset name
0914
0915 Returns:
0916 dict : filtered replicas map (rules considered)
0917 str | None : staging rule, None if not existing
0918 bool : whether all replicas on datadisk are without rules
0919 dict : original replicas map
0920 """
0921 replicas_map = self._get_full_replicas_per_type(dataset)
0922 rules = self.ddmIF.list_did_rules(dataset, all_accounts=True)
0923 rse_expression_list = []
0924 staging_rule = None
0925 if rules:
0926 for rule in rules:
0927 if rule["account"] in ["panda"] and rule["activity"] in DDM_RULE_ACTIVITY_MAP.values():
0928
0929 staging_rule = rule
0930 else:
0931 rse_expression_list.append(rule["rse_expression"])
0932 filtered_replicas_map = {"tape": [], "datadisk": []}
0933 has_datadisk_replica = len(replicas_map["datadisk"]) > 0
0934 has_disk_replica = len(replicas_map["disk"]) > 0
0935 for replica in replicas_map["tape"]:
0936 if replica in rse_expression_list:
0937 filtered_replicas_map["tape"].append(replica)
0938 if len(replicas_map["tape"]) >= 1 and len(filtered_replicas_map["tape"]) == 0 and len(rules) == 0:
0939 filtered_replicas_map["tape"] = replicas_map["tape"]
0940 for replica in replicas_map["datadisk"]:
0941 if staging_rule is not None or replica in rse_expression_list:
0942 filtered_replicas_map["datadisk"].append(replica)
0943 all_disk_repli_ruleless = has_disk_replica and len(filtered_replicas_map["datadisk"]) == 0
0944 return filtered_replicas_map, staging_rule, all_disk_repli_ruleless, replicas_map
0945
0946 def _get_datasets_from_collection(self, collection: str) -> list[str] | None:
0947 """
0948 Get a list of datasets from DDM collection (container or dataset) in order to support inputs of container containing multiple datasets
0949 If the collection is a dataset, the method returns a list of the sole dataset
0950 If the collection is a container, the method returns a list of datasets inside the container
0951
0952 Args:
0953 collection (str): name of the DDM collection (container or dataset)
0954
0955 Returns:
0956 list[str] | None : list of datasets if successful; None if failed with exception
0957 """
0958 tmp_log = LogWrapper(logger, f"_get_datasets_from_collections collection={collection}")
0959 try:
0960
0961 collection = self.ddmIF.get_did_str(collection)
0962 tmp_log = LogWrapper(logger, f"_get_datasets_from_collections collection={collection}")
0963
0964 ret_list = []
0965 collection_meta = self.ddmIF.get_dataset_metadata(collection, ignore_missing=True)
0966 if collection_meta is None:
0967
0968 tmp_log.warning(f"collection metadata not found")
0969 return None
0970 elif collection_meta["state"] == "missing":
0971
0972 tmp_log.warning(f"DID not found")
0973 return None
0974 did_type = collection_meta["did_type"]
0975 if did_type == "CONTAINER":
0976
0977 jobparam_dataset_list = self.ddmIF.list_datasets_in_container_JEDI(collection)
0978 if jobparam_dataset_list is None:
0979 tmp_log.warning(f"cannot list datasets in this container")
0980 else:
0981 ret_list = jobparam_dataset_list
0982 elif did_type == "DATASET":
0983
0984 ret_list = [collection]
0985 else:
0986 tmp_log.warning(f"invalid DID type: {did_type}")
0987 return None
0988 except Exception:
0989 tmp_log.error(f"got error ; {traceback.format_exc()}")
0990 return None
0991 return ret_list
0992
0993 def _get_active_source_tapes(self) -> set[str] | None:
0994 """
0995 Get the set of active source physical tapes according to DC config
0996
0997 Returns:
0998 set[str] | None : set of source tapes if successful; None if failed with exception
0999 """
1000 tmp_log = LogWrapper(logger, f"_get_active_source_tapes")
1001 try:
1002 active_source_tapes_set = {tape for tape, tape_config in self.dc_config_map.source_tapes_config.items() if tape_config.active}
1003 except Exception:
1004
1005 tmp_log.error(f"got error ; {traceback.format_exc()}")
1006 return None
1007 else:
1008 return active_source_tapes_set
1009
1010 def _get_active_source_rses(self) -> set[str] | None:
1011 """
1012 Get the set of active source RSEs according to DC config
1013
1014 Returns:
1015 set[str] | None : set of source RSEs if successful; None if failed with exception
1016 """
1017 tmp_log = LogWrapper(logger, f"_get_active_source_tapes")
1018 try:
1019 active_source_tapes = self._get_active_source_tapes()
1020 active_source_rses_set = set()
1021 for rse, rse_config in self.dc_config_map.source_rses_config.items():
1022 try:
1023
1024 if rse_config.tape in active_source_tapes and rse_config.active is not False:
1025 active_source_rses_set.add(rse)
1026 except Exception:
1027
1028 tmp_log.error(f"got error with {rse} ; {traceback.format_exc()}")
1029 continue
1030 except Exception:
1031
1032 tmp_log.error(f"got error ; {traceback.format_exc()}")
1033 return None
1034 else:
1035 return active_source_rses_set
1036
1037 def _get_source_type_of_dataset(
1038 self, dataset: str, active_source_rses_set: set | None = None
1039 ) -> tuple[(str | None), (set | None), (str | None), bool, list]:
1040 """
1041 Get source type and permanent (tape or datadisk) RSEs of a dataset
1042
1043 Args:
1044 dataset (str): dataset name
1045 active_source_rses_set (set | None): active source RSE set to reuse. If None, will get a new one in the method
1046
1047 Returns:
1048 str | None : source type of the dataset, "datadisk" if replica on any datadisk, "tape" if replica only on tapes, None if not found
1049 set | None : set of permanent RSEs, otherwise None
1050 str | None : staging rule if existing, otherwise None
1051 bool : whether to pin the dataset
1052 list : list of suggested destination RSEs; currently only datadisks with full replicas to pin
1053 """
1054 tmp_log = LogWrapper(logger, f"_get_source_type_of_dataset dataset={dataset}")
1055 try:
1056
1057 source_type = None
1058 rse_set = set()
1059 to_pin = False
1060 suggested_destination_rses_set = set()
1061
1062 if active_source_rses_set is None:
1063 active_source_rses_set = self._get_active_source_rses()
1064
1065 filtered_replicas_map, staging_rule, all_disk_repli_ruleless, orig_replicas_map = self._get_filtered_replicas(dataset)
1066
1067 if filtered_replicas_map["datadisk"]:
1068
1069 source_type = "datadisk"
1070
1071 rse_set = {replica for replica in filtered_replicas_map["datadisk"]}
1072 elif filtered_replicas_map["tape"]:
1073
1074 source_type = "tape"
1075
1076 rse_set = {replica for replica in filtered_replicas_map["tape"]}
1077
1078 if active_source_rses_set is not None:
1079 rse_set &= active_source_rses_set
1080
1081 if not rse_set:
1082 source_type = None
1083 tmp_log.warning(f"all its source tapes are inactive")
1084
1085 if all_disk_repli_ruleless:
1086
1087 to_pin = True
1088 suggested_destination_rses_set |= set(orig_replicas_map["datadisk"])
1089 else:
1090
1091 pass
1092
1093 tmp_log.debug(
1094 f"source_type={source_type} rse_set={rse_set} staging_rule={staging_rule} to_pin={to_pin} "
1095 f"suggested_destination_rses_set={suggested_destination_rses_set}"
1096 )
1097 return (source_type, rse_set, staging_rule, to_pin, list(suggested_destination_rses_set))
1098 except Exception as e:
1099
1100 raise e
1101
1102 def _choose_tape_source_rse(self, dataset: str, rse_set: set, staging_rule, no_cern: bool = True) -> tuple[str, (str | None), (str | None)]:
1103 """
1104 Choose a TAPE source RSE
1105 If with exsiting staging rule, then get source RSE from it
1106
1107 Args:
1108 dataset (str): dataset name
1109 rse_set (set): set of TAPE source RSE set to choose from
1110 staging_rule: DDM staging rule
1111 no_cern: skip CERN-PROD RSE whenever possible if True
1112
1113 Returns:
1114 str: the dataset name
1115 str | None : source RSE found or chosen, None if failed
1116 str | None : DDM rule ID of staging rule if existing, otherwise None
1117 """
1118 tmp_log = LogWrapper(logger, f"_choose_tape_source_rse dataset={dataset}")
1119 try:
1120
1121 ddm_rule_id = None
1122 source_rse = None
1123 random_choose = False
1124
1125 if staging_rule:
1126
1127 ddm_rule_id = staging_rule["id"]
1128
1129 source_replica_expression = staging_rule["source_replica_expression"]
1130 if source_replica_expression is not None:
1131 for rse in rse_set:
1132
1133 tmp_match = re.search(rse, source_replica_expression)
1134 if tmp_match is not None:
1135 source_rse = rse
1136 tmp_log.debug(f"got source_rse={source_rse} from rse_set matching ddm rule source_replica_expression={source_replica_expression}")
1137 break
1138 if source_rse is None:
1139
1140 tmp_match = re.search(rf"{SRC_REPLI_EXPR_PREFIX}\|([A-Za-z0-9-_]+)", source_replica_expression)
1141 if tmp_match is not None:
1142 source_rse = tmp_match.group(1)
1143 tmp_log.debug(f"directly got source_rse={source_rse} only from ddm rule source_replica_expression={source_replica_expression}")
1144 if source_rse is None:
1145
1146 tmp_log.error(f"ddm_rule_id={ddm_rule_id} cannot get source_rse from source_replica_expression: {source_replica_expression}")
1147 raise RuntimeError("cannot get source_rse from staging rule's source_replica_expression")
1148 else:
1149 tmp_log.debug(f"already staging with ddm_rule_id={ddm_rule_id} source_rse={source_rse}")
1150 else:
1151
1152 tmp_log.warning(f"already staging with ddm_rule_id={ddm_rule_id} without source_replica_expression; to choose a random source_rse")
1153 random_choose = True
1154
1155 if (rule_expiration_time := staging_rule["expires_at"]) and (rule_expiration_time - naive_utcnow()) < timedelta(days=DONE_LIFETIME_DAYS):
1156 self._refresh_ddm_rule(ddm_rule_id, 86400 * DONE_LIFETIME_DAYS)
1157 tmp_log.debug(f"ddm_rule_id={ddm_rule_id} refreshed rule to be {DONE_LIFETIME_DAYS} days long")
1158 else:
1159
1160 random_choose = True
1161 if random_choose:
1162
1163 rse_list = list(rse_set)
1164
1165 if len(rse_list) == 1:
1166 source_rse = rse_list[0]
1167 elif len(rse_list) == 0:
1168 tmp_log.error("no source_rse found to choose from")
1169 raise RuntimeError("no source_rse found to choose from")
1170 else:
1171 non_CERN_rse_list = [rse for rse in rse_list if "CERN-PROD" not in rse]
1172 if non_CERN_rse_list and no_cern:
1173
1174 source_rse = random.choice(non_CERN_rse_list)
1175 else:
1176 source_rse = random.choice(rse_list)
1177 tmp_log.debug(f"chose source_rse={source_rse}")
1178
1179 return (dataset, source_rse, ddm_rule_id)
1180 except Exception as e:
1181
1182 raise e
1183
1184 def _get_source_tape_from_rse(self, source_rse: str) -> str:
1185 """
1186 Get the source tape of a source RSE
1187
1188 Args:
1189 source_rse (str): name of the source RSE
1190
1191 Returns:
1192 str : source tape
1193 """
1194 try:
1195
1196 source_tape = self.dc_config_map.source_rses_config[source_rse].tape
1197 except KeyError:
1198
1199 source_tape = source_rse
1200 return source_tape
1201
1202 @refresh
1203 def get_input_datasets_to_prestage(self, task_id: int, task_params_map: dict, dsname_list: list | None = None) -> tuple[list, dict]:
1204 """
1205 Get the input datasets, their source RSEs (tape) of the task which need pre-staging from tapes, and DDM rule ID of existing DDM rule
1206
1207 Args:
1208 task_id (int): JEDI task ID of the task params
1209 task_params_map (dict): task params of the JEDI task
1210 dsname_list (list|None): if not None, filter only datasets in this list of dataset names to stage, may also including extra datasets if the task is resubmitted/rerefined
1211
1212 Returns:
1213 list[tuple[str, str|None, str|None]]: list of tuples in the form of (dataset, source_rse, ddm_rule_id)
1214 dict[str|list]: dict of list of datasets, including pseudo inputs (meant to be marked as no_staging), already on datadisk (meant to be marked as no_staging), only on tape, and not found
1215 """
1216 tmp_log = LogWrapper(logger, f"get_input_datasets_to_prestage task_id={task_id}")
1217 try:
1218
1219 all_input_datasets_set = set()
1220 jobparam_ds_coll_map = {}
1221 extra_datasets_set = set()
1222 raw_coll_list = []
1223 raw_coll_did_list = []
1224 coll_on_tape_set = set()
1225 ret_prestaging_list = []
1226 ret_map = {
1227 "pseudo_coll_list": [],
1228 "unfound_coll_list": [],
1229 "empty_coll_list": [],
1230 "tape_coll_did_list": [],
1231 "no_tape_coll_did_list": [],
1232 "to_skip_ds_list": [],
1233 "to_reuse_staging_ds_list": [],
1234 "to_reuse_staged_ds_list": [],
1235 "tape_ds_list": [],
1236 "datadisk_ds_list": [],
1237 "to_pin_ds_list": [],
1238 "unfound_ds_list": [],
1239 "related_dcreq_ids": [],
1240 }
1241
1242 active_source_rses_set = self._get_active_source_rses()
1243
1244 input_collection_map = self._get_input_ds_from_task_params(task_params_map)
1245 for collection, job_param in input_collection_map.items():
1246
1247 if job_param.get("param_type") == "pseudo_input":
1248 ret_map["pseudo_coll_list"].append(collection)
1249 tmp_log.debug(f"collection={collection} is pseudo input ; skipped")
1250 continue
1251
1252 raw_coll_list.append(collection)
1253 raw_coll_did_list.append(self.ddmIF.get_did_str(collection))
1254 jobparam_dataset_list = self._get_datasets_from_collection(collection)
1255 if jobparam_dataset_list is None:
1256 ret_map["unfound_coll_list"].append(collection)
1257 tmp_log.warning(f"collection={collection} not found")
1258 continue
1259 elif not jobparam_dataset_list:
1260 ret_map["empty_coll_list"].append(collection)
1261 tmp_log.warning(f"collection={collection} is empty")
1262 continue
1263
1264 for dataset in jobparam_dataset_list:
1265 jobparam_ds_coll_map[dataset] = collection
1266
1267 jobparam_datasets_set = set(jobparam_ds_coll_map.keys())
1268 all_input_datasets_set |= jobparam_datasets_set
1269 if dsname_list is not None:
1270
1271 master_datasets_set = set([dsname for dsname in dsname_list if not dsname.endswith("/")])
1272
1273 extra_datasets_set = master_datasets_set - jobparam_datasets_set - set(raw_coll_did_list)
1274 all_input_datasets_set |= extra_datasets_set
1275 all_input_datasets_list = sorted(list(all_input_datasets_set))
1276 if extra_datasets_set:
1277 tmp_log.debug(f"datasets appended for incexec: {sorted(list(extra_datasets_set))}")
1278
1279 for dataset in all_input_datasets_list:
1280
1281 if dsname_list is not None and dataset not in dsname_list:
1282
1283 ret_map["to_skip_ds_list"].append(dataset)
1284 tmp_log.debug(f"dataset={dataset} not in dsname_list ; skipped")
1285 continue
1286
1287 existing_dcreq_id = self.taskBufferIF.get_data_carousel_request_id_by_dataset_JEDI(dataset)
1288 if existing_dcreq_id is not None:
1289 ret_map["related_dcreq_ids"].append(existing_dcreq_id)
1290
1291 source_type, rse_set, staging_rule, to_pin, suggested_dst_list = self._get_source_type_of_dataset(dataset, active_source_rses_set)
1292 if staging_rule:
1293
1294 if collection := jobparam_ds_coll_map.get(dataset):
1295 coll_on_tape_set.add(collection)
1296 _, source_rse, ddm_rule_id = self._choose_tape_source_rse(dataset, rse_set, staging_rule)
1297 tmp_log.debug(f"dataset={dataset} has existing ddm_rule_id={ddm_rule_id} ; to reuse it")
1298 prestaging_tuple = (dataset, source_rse, ddm_rule_id, to_pin, suggested_dst_list)
1299 tmp_log.debug(f"got prestaging for existing rule: {prestaging_tuple}")
1300
1301 ret_prestaging_list.append(prestaging_tuple)
1302
1303 if staging_rule.get("state") == "OK":
1304
1305 ret_map["to_reuse_staged_ds_list"].append(dataset)
1306 tmp_log.debug(f"dataset={dataset} ddm_rule_id={ddm_rule_id} already staged")
1307 else:
1308
1309 ret_map["to_reuse_staging_ds_list"].append(dataset)
1310 tmp_log.debug(f"dataset={dataset} ddm_rule_id={ddm_rule_id} still staging")
1311 elif source_type == "datadisk":
1312
1313 ret_map["datadisk_ds_list"].append(dataset)
1314 tmp_log.debug(f"dataset={dataset} already has replica on datadisks {rse_set} ; skipped")
1315 continue
1316 elif source_type == "tape":
1317
1318 ret_map["tape_ds_list"].append(dataset)
1319 tmp_log.debug(f"dataset={dataset} on tapes {rse_set} ; choosing one")
1320 if collection := jobparam_ds_coll_map.get(dataset):
1321 coll_on_tape_set.add(collection)
1322
1323 _, source_rse, ddm_rule_id = self._choose_tape_source_rse(dataset, rse_set, staging_rule)
1324 prestaging_tuple = (dataset, source_rse, ddm_rule_id, to_pin, suggested_dst_list)
1325 tmp_log.debug(f"got prestaging: {prestaging_tuple}")
1326
1327 ret_prestaging_list.append(prestaging_tuple)
1328
1329 if to_pin:
1330 ret_map["to_pin_ds_list"].append(dataset)
1331 else:
1332
1333 ret_map["unfound_ds_list"].append(dataset)
1334 tmp_log.debug(f"dataset={dataset} has no replica on any tape or datadisk ; skipped")
1335 continue
1336
1337 for collection in raw_coll_list:
1338 collection_did = self.ddmIF.get_did_str(collection)
1339 if collection in coll_on_tape_set:
1340 ret_map["tape_coll_did_list"].append(collection_did)
1341 else:
1342 ret_map["no_tape_coll_did_list"].append(collection_did)
1343
1344 tmp_log.debug(f"got {len(ret_prestaging_list)} input datasets to prestage and {len(ret_map['related_dcreq_ids'])} related existing requests")
1345 return ret_prestaging_list, ret_map
1346 except Exception as e:
1347 tmp_log.error(f"got error ; {traceback.format_exc()}")
1348 raise e
1349
1350 def _fill_total_files_and_size(self, dc_req_spec: DataCarouselRequestSpec) -> bool | None:
1351 """
1352 Fill total files and dataset size of the Data Carousel request spec
1353
1354 Args:
1355 dc_req_spec (DataCarouselRequestSpec): Data Carousel request spec
1356
1357 Returns:
1358 bool|None : True if successful, None if getting None from DDM, False if error occurs
1359 """
1360 try:
1361 tmp_log = LogWrapper(logger, f"_fill_total_files_and_size request_id={dc_req_spec.request_id}")
1362
1363 dataset_meta = self.ddmIF.get_dataset_metadata(dc_req_spec.dataset)
1364
1365 dc_req_spec.total_files = dataset_meta["length"]
1366 dc_req_spec.dataset_size = dataset_meta["bytes"]
1367 if dc_req_spec.total_files is not None:
1368 return True
1369 else:
1370 return None
1371 except Exception as e:
1372 tmp_log.error(f"failed to fill total files and size; {e}")
1373 return False
1374
1375 def submit_data_carousel_requests(
1376 self, task_id: int, prestaging_list: list[tuple[str, str | None, str | None]], options: dict | None = None, submit_idds_request: bool = True
1377 ) -> bool | None:
1378 """
1379 Submit data carousel requests for a task
1380
1381 Args:
1382 task_id (int): JEDI task ID
1383 prestaging_list (list[tuple[str, str|None, str|None, bool, list|None]]): list of tuples in the form of (dataset, source_rse, ddm_rule_id, to_pin, suggested_dst_list)
1384 options (dict|None): extra options for submission
1385 submit_idds_request (bool): whether to submit iDDS staging requests for the datasets already with DDM rules; default True
1386
1387 Returns:
1388 bool | None : True if submission successful, or None if failed
1389 """
1390 tmp_log = LogWrapper(logger, f"submit_data_carousel_requests task_id={task_id}")
1391 n_req_to_submit = len(prestaging_list)
1392 log_str = f"to submit {len(prestaging_list)} requests"
1393 if options:
1394 log_str = f"options={options} " + log_str
1395 tmp_log.debug(log_str)
1396
1397 dc_req_spec_list = []
1398 now_time = naive_utcnow()
1399 for dataset, source_rse, ddm_rule_id, to_pin, suggested_dst_list in prestaging_list:
1400 dc_req_spec = DataCarouselRequestSpec()
1401 dc_req_spec.dataset = dataset
1402 self._fill_total_files_and_size(dc_req_spec)
1403 dc_req_spec.staged_files = 0
1404 dc_req_spec.staged_size = 0
1405 dc_req_spec.ddm_rule_id = ddm_rule_id
1406 dc_req_spec.source_rse = source_rse
1407 dc_req_spec.source_tape = self._get_source_tape_from_rse(dc_req_spec.source_rse)
1408 if to_pin:
1409
1410 dc_req_spec.set_parameter("to_pin", True)
1411 dc_req_spec.status = DataCarouselRequestStatus.queued
1412 dc_req_spec.creation_time = now_time
1413 if dc_req_spec.ddm_rule_id:
1414
1415 dc_req_spec.status = DataCarouselRequestStatus.staging
1416 dc_req_spec.start_time = now_time
1417 dc_req_spec.set_parameter("reuse_rule", True)
1418
1419 if submit_idds_request:
1420
1421 self._submit_idds_stagein_request(task_id, dc_req_spec)
1422 tmp_log.debug(f"submitted corresponding iDDS request for this task and existing ddm_rule_id={dc_req_spec.ddm_rule_id}")
1423 if suggested_dst_list:
1424 dc_req_spec.set_parameter("suggested_dst_list", suggested_dst_list)
1425
1426 if options:
1427 if options.get("remove_when_done"):
1428
1429 dc_req_spec.set_parameter("remove_when_done", True)
1430 if task_type := options.get("task_type"):
1431 dc_req_spec.set_parameter("task_type", task_type)
1432
1433 dc_req_spec_list.append(dc_req_spec)
1434
1435 n_req_inserted = self.taskBufferIF.insert_data_carousel_requests_JEDI(task_id, dc_req_spec_list)
1436 tmp_log.info(f"submitted {n_req_inserted}/{n_req_to_submit} requests")
1437 ret = n_req_inserted is not None
1438
1439 return ret
1440
1441 def add_data_carousel_relations(self, task_id: int, request_ids: list[int] | None) -> bool:
1442 """
1443 Add Data Carousel relations for a task
1444
1445 Args:
1446 task_id (int): JEDI task ID
1447 request_ids (list[int]|None): list of Data Carousel request IDs to relate; None to skip
1448
1449 Returns:
1450 bool : True if successful, or False if failed
1451 """
1452 tmp_log = LogWrapper(logger, f"add_data_carousel_relations task_id={task_id}")
1453 if request_ids is None:
1454 tmp_log.debug(f"request_ids is None ; skipped")
1455 return True
1456 n_req_to_add = len(request_ids)
1457 tmp_log.debug(f"to add {n_req_to_add} relations")
1458 n_req_added = self.taskBufferIF.insert_data_carousel_relations_JEDI(task_id, request_ids)
1459 tmp_log.info(f"added {n_req_added}/{n_req_to_add} relations")
1460 return n_req_added is not None
1461
1462 def _get_dc_requests_table_dataframe(self) -> pl.DataFrame | None:
1463 """
1464 Get Data Carousel requests table as dataframe for statistics
1465
1466 Returns:
1467 polars.DataFrame|None : dataframe of current Data Carousel requests table if successful, or None if failed
1468 """
1469 sql = f"SELECT {','.join(DataCarouselRequestSpec.attributes)} " f"FROM {panda_config.schemaJEDI}.data_carousel_requests " f"ORDER BY request_id "
1470 var_map = {}
1471 res = self.taskBufferIF.querySQL(sql, var_map, arraySize=99999)
1472 if res is not None:
1473 dc_req_df = pl.DataFrame(res, schema=DataCarouselRequestSpec.attributes_with_types, orient="row")
1474 return dc_req_df
1475 else:
1476 return None
1477
1478 def _get_source_tapes_config_dataframe(self) -> pl.DataFrame:
1479 """
1480 Get source tapes config as dataframe
1481
1482 Returns:
1483 polars.DataFrame : dataframe of source tapes config
1484 """
1485 tmp_list = []
1486 for k, v in self.dc_config_map.source_tapes_config.items():
1487 tmp_dict = {"source_tape": k}
1488 tmp_dict.update(asdict(v))
1489 tmp_list.append(tmp_dict)
1490 source_tapes_config_df = pl.DataFrame(tmp_list)
1491 return source_tapes_config_df
1492
1493 def _get_source_rses_config_dataframe(self) -> pl.DataFrame:
1494 """
1495 Get source RSEs config as dataframe
1496
1497 Returns:
1498 polars.DataFrame : dataframe of source RSEs config
1499 """
1500 tmp_list = []
1501 for k, v in self.dc_config_map.source_rses_config.items():
1502 tmp_dict = {"source_rse": k}
1503 tmp_dict.update(asdict(v))
1504 tmp_list.append(tmp_dict)
1505 source_rses_config_df = pl.DataFrame(tmp_list)
1506 return source_rses_config_df
1507
1508 def _get_source_tape_stats_dataframe(self) -> pl.DataFrame | None:
1509 """
1510 Get statistics of source tapes as dataframe
1511
1512 Returns:
1513 polars.DataFrame : dataframe of statistics of source tapes if successful, or None if failed
1514 """
1515
1516 dc_req_df = self._get_dc_requests_table_dataframe()
1517 if dc_req_df is None:
1518 return None
1519 dc_req_df = dc_req_df.filter(pl.col("status") == DataCarouselRequestStatus.staging)
1520
1521 dc_req_df = dc_req_df.with_columns(
1522 to_pin_str=pl.col("parameters").str.json_path_match(r"$.to_pin").fill_null(False),
1523 task_gshare=pl.col("parameters").str.json_path_match(r"$.task_gshare"),
1524 init_task_gshare=pl.col("parameters").str.json_path_match(r"$.init_task_gshare"),
1525 )
1526
1527 source_tapes_config_df = self._get_source_tapes_config_dataframe()
1528
1529
1530
1531 dc_req_full_df = dc_req_df.filter(pl.col("to_pin_str") != "true")
1532
1533 source_rse_stats_df = (
1534 dc_req_full_df.select(
1535 "source_tape",
1536 "total_files",
1537 "staged_files",
1538 (pl.col("total_files") - pl.col("staged_files")).alias("staging_files"),
1539 )
1540 .group_by("source_tape")
1541 .sum()
1542 )
1543
1544 df = source_tapes_config_df.join(source_rse_stats_df, on="source_tape", how="left")
1545 df = df.with_columns(
1546 pl.col("total_files").fill_null(strategy="zero"),
1547 pl.col("staged_files").fill_null(strategy="zero"),
1548 pl.col("staging_files").fill_null(strategy="zero"),
1549 )
1550 df = df.with_columns(quota_size=(pl.col("max_size") - pl.col("staging_files")))
1551 source_tape_stats_df = df
1552
1553 source_rse_gshare_stats_df = (
1554 dc_req_full_df.select(
1555 "source_tape",
1556 "init_task_gshare",
1557 "total_files",
1558 "staged_files",
1559 (pl.col("total_files") - pl.col("staged_files")).alias("staging_files"),
1560 )
1561 .group_by(["source_tape", "init_task_gshare"])
1562 .sum()
1563 .sort(["source_tape", "init_task_gshare"], nulls_last=True)
1564 ).with_columns(
1565 pl.col("total_files").fill_null(strategy="zero"),
1566 pl.col("staged_files").fill_null(strategy="zero"),
1567 pl.col("staging_files").fill_null(strategy="zero"),
1568 )
1569
1570 return source_tape_stats_df, source_rse_gshare_stats_df
1571
1572 def _get_gshare_stats(self) -> dict:
1573 """
1574 Get current gshare stats
1575
1576 Returns:
1577 dict : dictionary of gshares
1578 """
1579
1580 gshare_status = self.taskBufferIF.getGShareStatus()
1581
1582 gshare_dict = dict()
1583
1584 for idx, leaf in enumerate(gshare_status):
1585 rank = idx + 1
1586 gshare = leaf["name"]
1587 gshare_dict[gshare] = {
1588 "gshare": gshare,
1589 "rank": rank,
1590 "queuing_hs": leaf["queuing"],
1591 "running_hs": leaf["running"],
1592 "target_hs": leaf["target"],
1593 "usage_perc": leaf["running"] / leaf["target"] if leaf["target"] > 0 else 999999,
1594 "queue_perc": leaf["queuing"] / leaf["target"] if leaf["target"] > 0 else 999999,
1595 }
1596
1597 return gshare_dict
1598
1599 def _queued_requests_tasks_to_dataframe(self, queued_requests: list | None) -> pl.DataFrame:
1600 """
1601 Transfrom Data Carousel queue requests and their tasks into dataframe
1602
1603 Args:
1604 queued_requests (list|None): list of tuples in form of (queued_request, [taskspec1, taskspec2, ...])
1605
1606 Returns:
1607 polars.DataFrame : dataframe of queued requests
1608 """
1609
1610
1611
1612 gshare_dict = self._get_gshare_stats()
1613 gshare_rank_dict = {k: v["rank"] for k, v in gshare_dict.items()}
1614
1615 tmp_list = []
1616 for dc_req_spec, task_specs in queued_requests:
1617 for task_spec in task_specs:
1618 tmp_dict = {
1619 "request_id": dc_req_spec.request_id,
1620 "dataset": dc_req_spec.dataset,
1621 "source_rse": dc_req_spec.source_rse,
1622 "source_tape": dc_req_spec.source_tape,
1623 "to_pin": dc_req_spec.get_parameter("to_pin"),
1624 "total_files": dc_req_spec.total_files,
1625 "dataset_size": dc_req_spec.dataset_size,
1626 "jediTaskID": task_spec.jediTaskID,
1627 "taskType": task_spec.taskType,
1628 "userName": task_spec.userName,
1629 "workingGroup": task_spec.workingGroup,
1630 "gshare": task_spec.gshare,
1631 "gshare_rank": gshare_rank_dict.get(task_spec.gshare, 999),
1632 "task_priority": task_spec.currentPriority if task_spec.currentPriority else (task_spec.taskPriority if task_spec.taskPriority else 1000),
1633 }
1634 tmp_list.append(tmp_dict)
1635 df = pl.DataFrame(
1636 tmp_list,
1637 schema={
1638 "request_id": pl.datatypes.Int64,
1639 "dataset": pl.datatypes.String,
1640 "source_rse": pl.datatypes.String,
1641 "source_tape": pl.datatypes.String,
1642 "to_pin": pl.datatypes.Boolean,
1643 "total_files": pl.datatypes.Int64,
1644 "dataset_size": pl.datatypes.Int64,
1645 "jediTaskID": pl.datatypes.Int64,
1646 "taskType": pl.datatypes.String,
1647 "userName": pl.datatypes.String,
1648 "workingGroup": pl.datatypes.String,
1649 "gshare": pl.datatypes.String,
1650 "gshare_rank": pl.datatypes.Int64,
1651 "task_priority": pl.datatypes.Int64,
1652 },
1653 )
1654
1655 df = df.with_columns(
1656 pl.col("to_pin").fill_null(value=False),
1657 pl.col("total_files").fill_null(strategy="zero"),
1658 pl.col("dataset_size").fill_null(strategy="zero"),
1659 )
1660
1661
1662
1663 queued_requests_tasks_df = df
1664 return queued_requests_tasks_df
1665
1666 @refresh
1667 def get_requests_to_stage(self, *args, **kwargs) -> list[tuple[DataCarouselRequestSpec, dict]]:
1668 """
1669 Get the queued requests which should proceed to get staging
1670
1671 Args:
1672 ? (?): ?
1673
1674 Returns:
1675 list[tuple[DataCarouselRequestSpec, dict]] : list of requests to stage and dict of extra parameters to set before to stage
1676 """
1677 tmp_log = LogWrapper(logger, "get_requests_to_stage")
1678 ret_list = []
1679 queued_requests = self.taskBufferIF.get_data_carousel_queued_requests_JEDI()
1680 if queued_requests is None or not queued_requests:
1681 tmp_log.debug(f"no requests to stage or to pin ; skipped")
1682 return ret_list
1683
1684 source_tape_stats_df, source_rse_gshare_stats_df = self._get_source_tape_stats_dataframe()
1685
1686 tmp_log.debug(f"source_rse_gshare_stats_df: \n{source_rse_gshare_stats_df.select(['source_tape', 'init_task_gshare', 'total_files', 'staging_files'])}")
1687 source_tape_stats_dict_list = source_tape_stats_df.to_dicts()
1688
1689 request_id_spec_map = {dc_req_spec.request_id: dc_req_spec for dc_req_spec, _ in queued_requests}
1690
1691 queued_requests_tasks_df = self._queued_requests_tasks_to_dataframe(queued_requests)
1692
1693 df = queued_requests_tasks_df.sort(["to_pin", "gshare_rank", "task_priority", "request_id"], descending=[True, False, True, False], nulls_last=True)
1694
1695 df = df.unique(subset=["request_id"], keep="first", maintain_order=True)
1696
1697 queued_requests_df = df
1698 for source_tape_stats_dict in source_tape_stats_dict_list:
1699 source_tape = source_tape_stats_dict["source_tape"]
1700 quota_size = source_tape_stats_dict["quota_size"]
1701
1702 tmp_df = queued_requests_df.filter(pl.col("source_tape") == source_tape)
1703
1704 to_pin_df = tmp_df.filter(pl.col("to_pin"))
1705 tmp_queued_df = tmp_df.filter(pl.col("to_pin").not_())
1706
1707 to_pin_df = to_pin_df.with_columns(cum_total_files=pl.lit(0, dtype=pl.datatypes.Int64), cum_dataset_size=pl.lit(0, dtype=pl.datatypes.Int64))
1708
1709 if True and not tmp_queued_df.is_empty() and (fair_share_init_quota := min(QUEUE_FAIR_SHARE_MAX_QUOTA, max(quota_size, 0))) > 0:
1710 queued_gshare_list = tmp_queued_df.select("gshare").unique().to_dict()["gshare"]
1711 tmp_source_rse_gshare_stats_list = (
1712 source_rse_gshare_stats_df.filter((pl.col("source_tape") == source_tape)).select("init_task_gshare", "staging_files").to_dicts()
1713 )
1714 gshare_staging_files_map = {x["init_task_gshare"]: x["staging_files"] for x in tmp_source_rse_gshare_stats_list}
1715 n_queued_gshares = len(queued_gshare_list)
1716 n_staging_files_of_gshares = sum([gshare_staging_files_map.get(gshare, 0) for gshare in queued_gshare_list])
1717
1718 fair_share_quota_per_gshare = fair_share_init_quota // n_queued_gshares
1719 virtual_fair_share_quota_per_gshare = (fair_share_init_quota + n_staging_files_of_gshares) // n_queued_gshares
1720
1721 to_stage_gshare_list = [
1722 gshare for gshare in queued_gshare_list if gshare_staging_files_map.get(gshare, 0) < virtual_fair_share_quota_per_gshare
1723 ]
1724 n_gshares_to_stage = len(to_stage_gshare_list)
1725
1726 fair_share_queued_df = None
1727 unchosen_queued_df = None
1728 more_fair_shared_queued_df = tmp_queued_df.clear()
1729
1730 for gshare in to_stage_gshare_list:
1731
1732 n_staging_files = gshare_staging_files_map.get(gshare, 0)
1733 per_gshare_df = tmp_queued_df.filter(pl.col("gshare") == gshare)
1734 per_gshare_df = per_gshare_df.with_columns(
1735 cum_tot_files_in_gshare=(pl.col("total_files").cum_sum() + pl.lit(n_staging_files, dtype=pl.datatypes.Int64))
1736 )
1737 if fair_share_queued_df is None:
1738 fair_share_queued_df = per_gshare_df.clear()
1739 if unchosen_queued_df is None:
1740 unchosen_queued_df = per_gshare_df.clear()
1741 fair_share_queued_df.extend(per_gshare_df.filter(pl.col("cum_tot_files_in_gshare") <= fair_share_quota_per_gshare))
1742 unchosen_queued_df.extend(per_gshare_df.filter(pl.col("cum_tot_files_in_gshare") > fair_share_quota_per_gshare))
1743
1744 n_fair_share_files_to_stage = fair_share_queued_df.select("total_files").sum().to_dict()["total_files"][0]
1745 fair_share_remaining_quota = fair_share_init_quota - n_fair_share_files_to_stage
1746 if fair_share_remaining_quota > 0:
1747 unchosen_queued_df = unchosen_queued_df.sort(["cum_tot_files_in_gshare", "gshare_rank"], descending=[False, False])
1748 unchosen_queued_df = unchosen_queued_df.with_columns(tmp_cum_total_files=pl.col("total_files").cum_sum())
1749 more_fair_shared_queued_df = unchosen_queued_df.filter(pl.col("tmp_cum_total_files") <= fair_share_remaining_quota)
1750
1751 not_yet_staging_gshares_set = (
1752 set(to_stage_gshare_list)
1753 - set(fair_share_queued_df.select("gshare").unique().to_dict()["gshare"])
1754 - set(more_fair_shared_queued_df.select("gshare").unique().to_dict()["gshare"])
1755 )
1756 if not_yet_staging_gshares_set:
1757 lucky_gshare = random.choice(list(not_yet_staging_gshares_set))
1758 tmp_lucky_df = unchosen_queued_df.filter(
1759 (pl.col("gshare") == lucky_gshare) & (pl.col("tmp_cum_total_files") > fair_share_remaining_quota)
1760 ).head(1)
1761 more_fair_shared_queued_df.extend(tmp_lucky_df)
1762
1763 tmp_queued_df = pl.concat(
1764 [fair_share_queued_df.select(tmp_queued_df.columns), more_fair_shared_queued_df.select(tmp_queued_df.columns), tmp_queued_df]
1765 ).unique(subset=["request_id"], keep="first", maintain_order=True)
1766
1767 tmp_queued_df = tmp_queued_df.with_columns(cum_total_files=pl.col("total_files").cum_sum(), cum_dataset_size=pl.col("dataset_size").cum_sum())
1768
1769 n_queued = len(tmp_queued_df)
1770 n_to_pin = len(to_pin_df)
1771 n_total = n_queued + n_to_pin
1772
1773 tmp_to_print_df = None
1774 if n_total:
1775 tmp_to_print_df = pl.concat([to_pin_df, tmp_queued_df])
1776 tmp_to_print_df = tmp_to_print_df.select(
1777 ["request_id", "source_rse", "jediTaskID", "gshare", "gshare_rank", "task_priority", "total_files", "cum_total_files", "to_pin"]
1778 )
1779 tmp_to_print_df = tmp_to_print_df.with_columns(gshare_and_rank=pl.concat_str([pl.col("gshare"), pl.col("gshare_rank")], separator=" : "))
1780
1781 to_stage_df = pl.concat(
1782 [
1783 tmp_queued_df.filter(pl.col("cum_total_files") < quota_size),
1784 tmp_queued_df.filter((pl.col("cum_total_files") >= quota_size) & (quota_size > 0)).head(1),
1785 ]
1786 )
1787
1788 temp_key_list = ["request_id", "jediTaskID", "gshare", "taskType", "userName", "workingGroup"]
1789
1790 to_pin_request_list = to_pin_df.select(temp_key_list).to_dicts()
1791 to_stage_request_list = to_stage_df.select(temp_key_list).to_dicts()
1792 for request_dict in to_pin_request_list:
1793 request_id = request_dict["request_id"]
1794 extra_params = {
1795 "task_id": request_dict["jediTaskID"],
1796 "task_gshare": request_dict["gshare"],
1797
1798 "task_user": request_dict["userName"],
1799 "task_group": request_dict["workingGroup"],
1800 }
1801 dc_req_spec = request_id_spec_map.get(request_id)
1802 if dc_req_spec:
1803 ret_list.append((dc_req_spec, extra_params))
1804 to_stage_count = 0
1805 for request_dict in to_stage_request_list:
1806 request_id = request_dict["request_id"]
1807 extra_params = {
1808 "task_id": request_dict["jediTaskID"],
1809 "task_gshare": request_dict["gshare"],
1810 "init_task_gshare": request_dict["gshare"],
1811
1812 "task_user": request_dict["userName"],
1813 "task_group": request_dict["workingGroup"],
1814 }
1815 dc_req_spec = request_id_spec_map.get(request_id)
1816 if dc_req_spec:
1817 ret_list.append((dc_req_spec, extra_params))
1818 to_stage_count += 1
1819 if tmp_to_print_df is not None and not (to_pin_df.is_empty() and to_stage_df.is_empty()):
1820 tmp_to_print_df = tmp_to_print_df.with_columns(
1821 result=pl.when(pl.col("request_id").is_in(to_pin_df["request_id"]))
1822 .then(pl.lit("pin"))
1823 .when(pl.col("request_id").is_in(to_stage_df["request_id"]))
1824 .then(pl.lit("stage"))
1825 .otherwise(pl.lit(" "))
1826 )
1827 tmp_to_print_df = tmp_to_print_df.select(
1828 ["request_id", "source_rse", "jediTaskID", "gshare_and_rank", "task_priority", "total_files", "cum_total_files", "result"]
1829 )
1830 tmp_log.debug(f" source_tape={source_tape} , quota_size={quota_size} : \n{tmp_to_print_df}")
1831 if n_total:
1832 tmp_log.debug(f"source_tape={source_tape} got {to_stage_count}/{n_queued} requests to stage, {n_to_pin} requests to pin")
1833 tmp_log.debug(f"totally got {len(ret_list)} requests to stage or to pin")
1834
1835 return ret_list
1836
1837 def _choose_destination_rse(self, expression: str, suggested_dst_list: list | None = None, excluded_dst_list: list | None = None) -> str | None:
1838 """
1839 Choose a destination (datadisk) RSE based on the destination RSE expression
1840
1841 Args:
1842 expression (set): destination RSE expression
1843 suggested_dst_list (list|None): list of suggested destination list; RSEs in the list will be prioritized
1844 excluded_dst_list (list|None): list of excluded destination RSEs; RSEs in the list will be excluded from the choice
1845
1846 Returns:
1847 str | None : destination RSE chosen, None if failed
1848 """
1849 tmp_log = LogWrapper(logger, f"_choose_destination_rse expression={expression} suggested_dst_list={suggested_dst_list}")
1850 try:
1851
1852 destination_rse = None
1853 rse_set = set()
1854
1855 the_rses = self.ddmIF.list_rses(expression)
1856 if the_rses is not None:
1857 rse_set = set(the_rses)
1858
1859 if excluded_destinations_set := set(self.dc_config_map.excluded_destinations):
1860 rse_set -= excluded_destinations_set
1861
1862 if excluded_dst_list:
1863 excluded_dst_set = set(excluded_dst_list)
1864 rse_set -= excluded_dst_set
1865
1866 if suggested_dst_list and (prioritized_rse_set := set(suggested_dst_list) & rse_set):
1867 rse_set = prioritized_rse_set
1868
1869 rse_list = list(rse_set)
1870
1871
1872 if rse_list:
1873 if len(rse_list) == 1:
1874 destination_rse = rse_list[0]
1875 else:
1876 destination_rse = random.choice(rse_list)
1877
1878 else:
1879 tmp_log.warning(f"no destination_rse match; skipped")
1880
1881 return destination_rse
1882 except Exception as e:
1883
1884 tmp_log.error(f"got error ; {traceback.format_exc()}")
1885 return None
1886
1887 def _choose_destination_rse_for_request(self, dc_req_spec: DataCarouselRequestSpec) -> str | None:
1888 """
1889 Choose destination RSE for the request
1890
1891 Args:
1892 dc_req_spec (DataCarouselRequestSpec): spec of the request
1893
1894 Returns:
1895 str|None : DDM destination RSE if successful, or None if failed
1896 """
1897 tmp_log = LogWrapper(logger, f"_choose_destination_rse_for_request request_id={dc_req_spec.request_id}")
1898
1899
1900 try:
1901 source_tape = self._get_source_tape_from_rse(dc_req_spec.source_rse)
1902 except Exception:
1903
1904 tmp_log.error(f"got error ; {traceback.format_exc()}")
1905 return None
1906
1907 try:
1908 source_tape_config = self.dc_config_map.source_tapes_config[source_tape]
1909 except (KeyError, AttributeError):
1910
1911 tmp_log.warning(f"failed to get destination_expression from config; skipped ; {traceback.format_exc()}")
1912 return None
1913 except Exception:
1914
1915 tmp_log.error(f"got error ; {traceback.format_exc()}")
1916 return None
1917
1918 if dc_req_spec.get_parameter("to_pin"):
1919
1920 tmp_log.debug(f"has to_pin")
1921 tmp_dst_expr = TO_PIN_DST_REPLI_EXPR
1922 else:
1923
1924 tmp_dst_expr = source_tape_config.destination_expression
1925
1926 excluded_dst_list = dc_req_spec.get_parameter("excluded_dst_list")
1927 if excluded_dst_list:
1928
1929 for excluded_dst_rse in excluded_dst_list:
1930 tmp_dst_expr += f"\\{excluded_dst_rse}"
1931
1932 tmp_dst_expr += AVAIL_REPLI_EXPR_SUFFIX
1933
1934 suggested_dst_list = dc_req_spec.get_parameter("suggested_dst_list")
1935
1936 destination_rse = self._choose_destination_rse(expression=tmp_dst_expr, suggested_dst_list=suggested_dst_list, excluded_dst_list=excluded_dst_list)
1937 if destination_rse is not None:
1938 tmp_log.debug(f"chose destination RSE to be {destination_rse}")
1939 else:
1940 tmp_log.error(f"failed to choose destination RSE; skipped")
1941 return None
1942
1943 return destination_rse
1944
1945 def _submit_ddm_rule(self, dc_req_spec: DataCarouselRequestSpec, destination_rse: str | None = None) -> tuple[str | None, str | None]:
1946 """
1947 Submit DDM replication rule to stage the dataset of the request
1948
1949 Args:
1950 dc_req_spec (DataCarouselRequestSpec): spec of the request
1951 destination_rse (str|None): predetermined destination RSE to stage; if None, will be chosen randomly
1952
1953 Returns:
1954 str | None : DDM rule_id of the new rule if submission successful, or None if failed
1955 str | None : destination RSE chosen
1956 """
1957 tmp_log = LogWrapper(logger, f"_submit_ddm_rule request_id={dc_req_spec.request_id}")
1958
1959 tmp_dst_expr = None
1960 expression = None
1961 lifetime_days = 45
1962 weight = None
1963 source_replica_expression = None
1964
1965 if dc_req_spec.source_rse:
1966 source_replica_expression = f"{SRC_REPLI_EXPR_PREFIX}|{dc_req_spec.source_rse}"
1967 else:
1968
1969 tmp_log.warning(f"source_rse is None ; skipped")
1970 return None, None
1971
1972 if destination_rse is None:
1973 destination_rse = self._choose_destination_rse_for_request(dc_req_spec)
1974 if destination_rse is not None:
1975
1976 expression = str(destination_rse)
1977 else:
1978
1979 tmp_log.error(f"failed to get destination RSE; skipped")
1980 return None, None
1981
1982 ddm_rule_activity = DDM_RULE_ACTIVITY_MAP["prod"]
1983 if task_type := dc_req_spec.get_parameter("task_type"):
1984 ddm_rule_activity = DDM_RULE_ACTIVITY_MAP.get(task_type, ddm_rule_activity)
1985
1986 ddm_rule_id = self.ddmIF.make_staging_rule(
1987 dataset_name=dc_req_spec.dataset,
1988 expression=expression,
1989 activity=ddm_rule_activity,
1990 lifetime=lifetime_days,
1991 weight=weight,
1992 notify="P",
1993 source_replica_expression=source_replica_expression,
1994 )
1995
1996 return ddm_rule_id, destination_rse
1997
1998 def _submit_idds_stagein_request(self, task_id: int, dc_req_spec: DataCarouselRequestSpec) -> Any:
1999 """
2000 Submit corresponding iDDS stage-in request for given Data Carousel request and task
2001 Currently only used for manual testing or after resubmitting Data Carousel requests
2002
2003 Args:
2004 task_id (int): jediTaskID of the task
2005 dc_req_spec (DataCarouselRequestSpec): request to submit iDDS stage-in request
2006
2007 Returns:
2008 Any : iDDS requests ID returned from iDDS
2009 """
2010 tmp_log = LogWrapper(logger, f"_submit_idds_stagein_request request_id={dc_req_spec.request_id}")
2011
2012 dataset = dc_req_spec.dataset
2013 rule_id = dc_req_spec.ddm_rule_id
2014 ds_str_list = dataset.split(":")
2015 tmp_scope = ds_str_list[0]
2016 tmp_name = ds_str_list[1]
2017
2018 c = iDDS_Client(idds.common.utils.get_rest_host())
2019 req = {
2020 "scope": tmp_scope,
2021 "name": tmp_name,
2022 "requester": "panda",
2023 "request_type": idds.common.constants.RequestType.StageIn,
2024 "transform_tag": idds.common.constants.RequestType.StageIn.value,
2025 "status": idds.common.constants.RequestStatus.New,
2026 "priority": 0,
2027 "lifetime": 30,
2028 "request_metadata": {
2029 "workload_id": task_id,
2030 "rule_id": rule_id,
2031 },
2032 }
2033 tmp_log.debug(f"iDDS request: {req}")
2034 ret = c.add_request(**req)
2035 tmp_log.debug(f"done submit; iDDS_requestID={ret}")
2036
2037 return ret
2038
2039 @refresh
2040 def stage_request(
2041 self, dc_req_spec: DataCarouselRequestSpec, extra_params: dict | None = None, destination_rse: str | None = None, submit_idds_request=True
2042 ) -> tuple[bool, str | None, DataCarouselRequestSpec]:
2043 """
2044 Stage the dataset of the request and update request status to staging
2045
2046 Args:
2047 dc_req_spec (DataCarouselRequestSpec): spec of the request
2048 extra_params (dict|None): extra parameters of the request to set; None if nothing to set
2049 destination_rse (str|None): predetermined destination RSE to stage the dataset; None if to choose randomly later
2050 submit_idds_request (bool): whether to submit IDDS request for the dataset; default is True
2051
2052 Returns:
2053 bool : True for success, False otherwise
2054 str|None : error message if any, None otherwise
2055 DataCarouselRequestSpec : updated spec of the request
2056 """
2057 tmp_log = LogWrapper(logger, f"stage_request request_id={dc_req_spec.request_id}")
2058 is_ok = False
2059 err_msg = None
2060
2061 with self.request_lock(dc_req_spec.request_id) as locked_spec:
2062 if not locked_spec:
2063
2064 err_msg = "did not get lock; skipped"
2065 tmp_log.warning(err_msg)
2066 return is_ok, err_msg, dc_req_spec
2067
2068 dc_req_spec = locked_spec
2069
2070 if dc_req_spec.status != DataCarouselRequestStatus.queued:
2071 err_msg = f"status={dc_req_spec.status} not queued; skipped"
2072 tmp_log.warning(err_msg)
2073 return is_ok, err_msg, dc_req_spec
2074
2075 active_related_tasks = self.taskBufferIF.get_related_tasks_of_data_carousel_request_JEDI(
2076 dc_req_spec.request_id, status_exclusion_list=FINAL_TASK_STATUSES
2077 )
2078 if not active_related_tasks:
2079 try:
2080
2081 self.cancel_request(dc_req_spec, reason="queued_while_no_active_tasks")
2082 tmp_log.debug(f"cancelled since no active related tasks")
2083 except Exception:
2084 tmp_log.warning(f"failed to cancel ; {traceback.format_exc()}")
2085 err_msg = f"no active related tasks; skipped"
2086 tmp_log.warning(err_msg)
2087 return is_ok, err_msg, dc_req_spec
2088
2089 if dc_req_spec.total_files is None:
2090 _got = self._fill_total_files_and_size(dc_req_spec)
2091 if not _got:
2092 err_msg = f"total_files and dataset_size are still None; skipped"
2093 tmp_log.warning(err_msg)
2094 return is_ok, err_msg, dc_req_spec
2095
2096 if (ddm_rule_id := dc_req_spec.ddm_rule_id) is not None:
2097
2098 tmp_log.debug(f"dataset={dc_req_spec.dataset} already has active DDM rule ddm_rule_id={ddm_rule_id}")
2099 else:
2100
2101 ddm_rule_id, destination_rse = self._submit_ddm_rule(dc_req_spec, destination_rse=destination_rse)
2102 if ddm_rule_id:
2103
2104 dc_req_spec.ddm_rule_id = ddm_rule_id
2105 tmp_log.debug(f"submitted DDM rule ddm_rule_id={ddm_rule_id}")
2106 else:
2107
2108 err_msg = f"failed to submitted DDM rule ; skipped"
2109 tmp_log.warning(err_msg)
2110 return is_ok, err_msg, dc_req_spec
2111
2112 if extra_params:
2113 dc_req_spec.update_parameters(extra_params)
2114
2115 now_time = naive_utcnow()
2116 dc_req_spec.status = DataCarouselRequestStatus.staging
2117 dc_req_spec.start_time = now_time
2118 ret = self.taskBufferIF.update_data_carousel_request_JEDI(dc_req_spec)
2119 if ret:
2120 tmp_log.info(f"updated DB about staging; status={dc_req_spec.status}")
2121 is_ok = True
2122
2123 tmp_log.info(
2124 f"started staging "
2125 f"dataset={dc_req_spec.dataset} source_tape={dc_req_spec.source_tape} source_rse={dc_req_spec.source_rse} "
2126 f"destination_rse={destination_rse} ddm_rule_id={dc_req_spec.ddm_rule_id} "
2127 f"total_files={dc_req_spec.total_files} dataset_size={dc_req_spec.dataset_size} "
2128 f"task_id={dc_req_spec.get_parameter('task_id')} task_type={dc_req_spec.get_parameter('task_type')} "
2129 f"task_user={dc_req_spec.get_parameter('task_user')} task_group={dc_req_spec.get_parameter('task_group')} "
2130 f"to_pin={dc_req_spec.get_parameter('to_pin')}"
2131 )
2132
2133 if is_ok and submit_idds_request:
2134
2135 task_id_list = self._get_related_tasks(dc_req_spec.request_id)
2136 if task_id_list:
2137 tmp_log.debug(f"related tasks: {task_id_list}")
2138 for task_id in task_id_list:
2139 try:
2140 self._submit_idds_stagein_request(task_id, dc_req_spec)
2141 tmp_log.debug(f"submitted corresponding iDDS request for related task {task_id}")
2142 except Exception as e:
2143 tmp_log.warning(f"got error while submitting iDDS request; skipped : {traceback.format_exc()}")
2144 else:
2145 tmp_log.warning(f"failed to get related tasks; skipped to submit iDDS requests")
2146
2147 return is_ok, err_msg, dc_req_spec
2148
2149 def _refresh_ddm_rule(self, rule_id: str, lifetime: int) -> bool:
2150 """
2151 Refresh lifetime of the DDM rule
2152
2153 Args:
2154 rule_id (str): DDM rule ID
2155 lifetime (int): lifetime in seconds to set
2156
2157 Returns:
2158 bool : True for success, False otherwise
2159 """
2160 set_map = {"lifetime": lifetime}
2161 ret = self.ddmIF.update_rule_by_id(rule_id, set_map)
2162 return ret
2163
2164 def cancel_request(self, dc_req_spec: DataCarouselRequestSpec, by: str = "manual", reason: str | None = None) -> bool | None:
2165 """
2166 Cancel a request
2167 Set corresponding DDM rule to be about to expired (in 5 sec)
2168
2169 Args:
2170 dc_req_spec (DataCarouselRequestSpec): spec of the request to cancel
2171 by (str): annotation of the caller of this method; default is "manual"
2172 reason (str|None): annotation of the reason for cancelling
2173
2174 Returns:
2175 bool|None : True for success, None otherwise
2176 """
2177 tmp_log = LogWrapper(logger, f"cancel_request request_id={dc_req_spec.request_id} by={by}" + (f" reason={reason}" if reason else " "))
2178
2179 ret = self.taskBufferIF.cancel_data_carousel_request_JEDI(dc_req_spec.request_id)
2180 if ret:
2181 tmp_log.debug(f"cancelled")
2182 elif ret == 0:
2183 tmp_log.debug(f"already terminated; skipped")
2184 else:
2185 tmp_log.error(f"failed to cancel")
2186
2187 if dc_req_spec.ddm_rule_id:
2188 short_time = 5
2189 self._refresh_ddm_rule(dc_req_spec.ddm_rule_id, short_time)
2190
2191 return ret
2192
2193 def retire_request(self, dc_req_spec: DataCarouselRequestSpec, by: str = "manual", reason: str | None = None) -> bool | None:
2194 """
2195 Retire a done request so that it will not be reused
2196 Set corresponding DDM rule to be about to expired (in 5 sec)
2197
2198 Args:
2199 dc_req_spec (DataCarouselRequestSpec): spec of the request to cancel
2200 by (str): annotation of the caller of this method; default is "manual"
2201 reason (str|None): annotation of the reason for retiring
2202
2203 Returns:
2204 bool|None : True for success, None otherwise
2205 """
2206 tmp_log = LogWrapper(logger, f"retire_request request_id={dc_req_spec.request_id} by={by}" + (f" reason={reason}" if reason else " "))
2207
2208 ret = self.taskBufferIF.retire_data_carousel_request_JEDI(dc_req_spec.request_id)
2209 if ret:
2210 tmp_log.debug(f"retired")
2211 elif ret == 0:
2212 tmp_log.debug(f"cannot retire; skipped")
2213 else:
2214 tmp_log.error(f"failed to retire")
2215
2216 return ret
2217
2218 def _check_ddm_rule_of_request(self, dc_req_spec: DataCarouselRequestSpec, by: str = "unknown") -> tuple[bool, str | None, dict | None]:
2219 """
2220 Check if the DDM rule of the request is valid.
2221 If rule not found, update the request and try to cancel or retire it.
2222
2223 Args:
2224 dc_req_spec (DataCarouselRequestSpec): spec of the request
2225
2226 Returns:
2227 bool : True if valid, False otherwise
2228 str|None : DDM rule ID
2229 dict|None : DDM rule data if valid, None otherwise
2230 """
2231 tmp_log = LogWrapper(logger, f"_check_ddm_rule_of_request request_id={dc_req_spec.request_id} ddm_rule_id={dc_req_spec.ddm_rule_id}")
2232 is_valid = False
2233
2234 ddm_rule_id = dc_req_spec.ddm_rule_id
2235 the_rule = self.ddmIF.get_rule_by_id(ddm_rule_id)
2236 if the_rule is False:
2237
2238 with self.request_lock(dc_req_spec.request_id) as locked_spec:
2239 if not locked_spec:
2240
2241 tmp_log.warning(f"did not get lock; skipped")
2242 return is_valid, ddm_rule_id, None
2243
2244 dc_req_spec = locked_spec
2245 dc_req_spec.set_parameter("rule_unfound", True)
2246 tmp_log.warning(f"rule not found")
2247 tmp_ret = self.taskBufferIF.update_data_carousel_request_JEDI(dc_req_spec)
2248 if tmp_ret:
2249 tmp_log.debug(f"updated DB about rule not found")
2250 else:
2251 tmp_log.error(f"failed to update DB ; skipped")
2252
2253 if dc_req_spec.status == DataCarouselRequestStatus.staging:
2254
2255 self.cancel_request(dc_req_spec, by=by, reason="rule_unfound")
2256 elif dc_req_spec.status == DataCarouselRequestStatus.done:
2257
2258 self.retire_request(dc_req_spec, by=by, reason="rule_unfound")
2259 elif the_rule is None:
2260
2261 tmp_log.error(f"failed to get rule ; skipped")
2262 else:
2263
2264 is_valid = True
2265
2266
2267 return is_valid, ddm_rule_id, the_rule
2268
2269 def refresh_ddm_rule_of_request(self, dc_req_spec: DataCarouselRequestSpec, lifetime_days: int, force_refresh: bool = False, by: str = "unknown") -> bool:
2270 """
2271 Refresh lifetime of the DDM rule of one request
2272
2273 Args:
2274 dc_req_spec (DataCarouselRequestSpec): spec of the request
2275 lifetime_days (int): lifetime in days to set
2276 force_refresh (bool): force to refresh regardless of to_refresh max/min lifetime
2277 by (str): annotation of the caller of this method; default is "watchdog"
2278
2279 Returns:
2280 bool : True for success, False otherwise
2281 """
2282 tmp_log = LogWrapper(logger, f"refresh_ddm_rule_of_request request_id={dc_req_spec.request_id}")
2283
2284 ret = False
2285
2286 is_valid, ddm_rule_id, the_rule = self._check_ddm_rule_of_request(dc_req_spec, by=by)
2287 if not is_valid:
2288
2289 tmp_log.error(f"ddm_rule_id={ddm_rule_id} rule not valid; skipped")
2290 return ret
2291
2292 rule_lifetime = None
2293 if the_rule["expires_at"]:
2294 now_time = naive_utcnow()
2295 rule_lifetime = the_rule["expires_at"] - now_time
2296
2297 if (
2298 rule_lifetime is None
2299 or force_refresh
2300 or (rule_lifetime < timedelta(days=TO_REFRESH_MAX_LIFETIME_DAYS) and rule_lifetime > timedelta(hours=TO_REFRESH_MIN_LIFETIME_HOURS))
2301 ):
2302 ret = self._refresh_ddm_rule(ddm_rule_id, 86400 * lifetime_days)
2303
2304 else:
2305
2306
2307
2308
2309 pass
2310
2311 return ret
2312
2313 def keep_alive_ddm_rules(self, by: str = "watchdog"):
2314 """
2315 Keep alive DDM rules of requests of active tasks; also check all requests if their DDM rules are valid
2316
2317 Args:
2318 by (str): annotation of the caller of this method; default is "watchdog"
2319 """
2320 tmp_log = LogWrapper(logger, "keep_alive_ddm_rules")
2321
2322 all_requests_map, _ = self.taskBufferIF.get_data_carousel_requests_by_task_status_JEDI()
2323
2324 active_tasked_requests_map, _ = self.taskBufferIF.get_data_carousel_requests_by_task_status_JEDI(status_exclusion_list=FINAL_TASK_STATUSES)
2325 for dc_req_spec in all_requests_map.values():
2326 try:
2327 if dc_req_spec.status not in [DataCarouselRequestStatus.staging, DataCarouselRequestStatus.done]:
2328
2329 continue
2330 if dc_req_spec.request_id in active_tasked_requests_map:
2331
2332 days = None
2333 if dc_req_spec.status == DataCarouselRequestStatus.staging:
2334
2335 days = STAGING_LIFETIME_DAYS
2336 elif dc_req_spec.status == DataCarouselRequestStatus.done:
2337
2338 days = DONE_LIFETIME_DAYS
2339
2340 if days is not None:
2341 ret = self.refresh_ddm_rule_of_request(dc_req_spec, lifetime_days=days, by=by)
2342 if ret:
2343 tmp_log.debug(
2344 f"request_id={dc_req_spec.request_id} status={dc_req_spec.status} ddm_rule_id={dc_req_spec.ddm_rule_id} refreshed lifetime to be {days} days long"
2345 )
2346 else:
2347
2348
2349
2350 pass
2351 else:
2352
2353 is_valid, ddm_rule_id, _ = self._check_ddm_rule_of_request(dc_req_spec, by=by)
2354
2355
2356 except Exception:
2357 tmp_log.error(f"request_id={dc_req_spec.request_id} got error ; {traceback.format_exc()}")
2358
2359 def _update_staged_files(self, dc_req_spec: DataCarouselRequestSpec, task_id_list: list[int] | None = None) -> bool | None:
2360 """
2361 Update status of files in DB Jedi_Dataset_Contents for a request done staging
2362
2363 Args:
2364 dc_req_spec (DataCarouselRequestSpec): spec of the request
2365 task_id_list (list[int]|None): list of jediTaskID to update; if None, will get all related tasks
2366
2367 Returns:
2368 bool|None : True for success, None otherwise
2369 """
2370 tmp_log = LogWrapper(logger, f"_update_staged_files request_id={dc_req_spec.request_id}")
2371 try:
2372
2373 dataset = dc_req_spec.dataset
2374 scope, dsname = self.ddmIF.extract_scope(dataset)
2375
2376 lfn_set = self.ddmIF.get_files_in_dataset(dataset, ignore_unknown=True, lfn_only=True)
2377
2378 filenames_dict = {}
2379 dummy_value_tuple = (None, None)
2380 for lfn in lfn_set:
2381 filenames_dict[lfn] = dummy_value_tuple
2382
2383 if task_id_list is None:
2384 task_id_list = self._get_related_tasks(dc_req_spec.request_id)
2385 if task_id_list:
2386
2387 n_done_tasks = 0
2388 for task_id in task_id_list:
2389 ret = self.taskBufferIF.updateInputFilesStaged_JEDI(task_id, scope, filenames_dict, by="DataCarousel", check_scope=False)
2390 if ret is None:
2391 tmp_log.warning(f"failed to update files for task_id={task_id} ; skipped")
2392 else:
2393 n_done_tasks += 1
2394 tmp_log.debug(f"updated staged files for {n_done_tasks}/{len(task_id_list)} related tasks")
2395 else:
2396 tmp_log.warning(f"failed to get related tasks; skipped")
2397
2398 return True
2399 except Exception:
2400 tmp_log.error(f"got error ; {traceback.format_exc()}")
2401 return None
2402
2403 def check_staging_requests(self):
2404 """
2405 Check staging requests
2406 """
2407 tmp_log = LogWrapper(logger, "check_staging_requests")
2408 dc_req_specs = self.taskBufferIF.get_data_carousel_staging_requests_JEDI()
2409 if dc_req_specs is None:
2410 tmp_log.warning(f"failed to query requests to check ; skipped")
2411 elif not dc_req_specs:
2412 tmp_log.debug(f"got no requests to check ; skipped")
2413 for dc_req_spec in dc_req_specs:
2414 try:
2415 to_update = False
2416
2417 ddm_rule_id = dc_req_spec.ddm_rule_id
2418 the_rule = self.ddmIF.get_rule_by_id(ddm_rule_id)
2419 if the_rule is False:
2420 with self.request_lock(dc_req_spec.request_id) as locked_spec:
2421 if not locked_spec:
2422
2423 tmp_log.warning(f"did not get lock; skipped")
2424 continue
2425
2426 dc_req_spec = locked_spec
2427
2428 dc_req_spec.set_parameter("rule_unfound", True)
2429 tmp_log.error(f"request_id={dc_req_spec.request_id} ddm_rule_id={ddm_rule_id} rule not found")
2430 tmp_ret = self.taskBufferIF.update_data_carousel_request_JEDI(dc_req_spec)
2431 if tmp_ret:
2432 tmp_log.debug(f"request_id={dc_req_spec.request_id} updated DB about rule not found")
2433 else:
2434 tmp_log.error(f"request_id={dc_req_spec.request_id} failed to update DB ; skipped")
2435
2436 if dc_req_spec.status == DataCarouselRequestStatus.staging:
2437
2438 self.cancel_request(dc_req_spec, by="watchdog", reason="rule_unfound")
2439 elif dc_req_spec.status == DataCarouselRequestStatus.done:
2440
2441 self.retire_request(dc_req_spec, by="watchdog", reason="rule_unfound")
2442 continue
2443 elif the_rule is None:
2444
2445 tmp_log.error(f"request_id={dc_req_spec.request_id} failed to get rule of ddm_rule_id={ddm_rule_id} ; skipped")
2446 continue
2447
2448 with self.request_lock(dc_req_spec.request_id) as locked_spec:
2449 if not locked_spec:
2450
2451 tmp_log.warning(f"did not get lock; skipped")
2452 continue
2453
2454 dc_req_spec = locked_spec
2455
2456 if dc_req_spec.destination_rse is None:
2457 the_replica_locks = self.ddmIF.list_replica_locks_by_id(ddm_rule_id)
2458 try:
2459 the_first_file = the_replica_locks[0]
2460 except IndexError:
2461 tmp_log.warning(
2462 f"request_id={dc_req_spec.request_id} no file from replica lock of ddm_rule_id={ddm_rule_id} ; destination_rse not updated"
2463 )
2464 except TypeError:
2465 tmp_log.warning(
2466 f"request_id={dc_req_spec.request_id} error listing replica lock of ddm_rule_id={ddm_rule_id} ; destination_rse not updated"
2467 )
2468 else:
2469
2470 destination_rse = the_first_file["rse"]
2471 dc_req_spec.destination_rse = destination_rse
2472 tmp_log.debug(f"request_id={dc_req_spec.request_id} filled destination_rse={destination_rse} of ddm_rule_id={ddm_rule_id}")
2473 to_update = True
2474
2475 now_time = naive_utcnow()
2476 current_staged_files = int(the_rule["locks_ok_cnt"])
2477 new_staged_files = current_staged_files - dc_req_spec.staged_files
2478 if new_staged_files > 0:
2479
2480 dc_req_spec.staged_files = current_staged_files
2481 dc_req_spec.staged_size = int(dc_req_spec.dataset_size * dc_req_spec.staged_files / dc_req_spec.total_files)
2482 dc_req_spec.last_staged_time = now_time
2483 to_update = True
2484 else:
2485 tmp_log.debug(f"request_id={dc_req_spec.request_id} got {new_staged_files} new staged files")
2486
2487 if dc_req_spec.staged_files == dc_req_spec.total_files:
2488
2489 dc_req_spec.status = DataCarouselRequestStatus.done
2490 dc_req_spec.end_time = now_time
2491 dc_req_spec.staged_size = dc_req_spec.dataset_size
2492 to_update = True
2493
2494 if to_update:
2495 ret = self.taskBufferIF.update_data_carousel_request_JEDI(dc_req_spec)
2496 if ret is not None:
2497
2498 tmp_log.info(
2499 f"request_id={dc_req_spec.request_id} got {new_staged_files} new staged files; updated DB about staging ; status={dc_req_spec.status}"
2500 )
2501
2502 if dc_req_spec.status == DataCarouselRequestStatus.done:
2503
2504 tmp_ret = self.refresh_ddm_rule_of_request(dc_req_spec, lifetime_days=DONE_LIFETIME_DAYS, force_refresh=True, by="watchdog")
2505 if tmp_ret:
2506 tmp_log.debug(
2507 f"request_id={dc_req_spec.request_id} status={dc_req_spec.status} ddm_rule_id={dc_req_spec.ddm_rule_id} refreshed lifetime to be {DONE_LIFETIME_DAYS} days long"
2508 )
2509
2510 tmp_ret = self._update_staged_files(dc_req_spec)
2511 if tmp_ret:
2512 tmp_log.debug(f"request_id={dc_req_spec.request_id} done; updated staged files")
2513 else:
2514 tmp_log.warning(f"request_id={dc_req_spec.request_id} done; failed to update staged files ; skipped")
2515 else:
2516 tmp_log.error(f"request_id={dc_req_spec.request_id} failed to update DB for ddm_rule_id={ddm_rule_id} ; skipped")
2517 continue
2518 except Exception:
2519 tmp_log.error(f"request_id={dc_req_spec.request_id} got error ; {traceback.format_exc()}")
2520
2521 def _resume_task(self, task_id: int) -> bool:
2522 """
2523 Resume task from staging (to staged-pending)
2524
2525 Args:
2526 task_id (int): JEDI task ID
2527
2528 Returns:
2529 bool : True for success, False otherwise
2530 """
2531 tmp_log = LogWrapper(logger, "_resume_task")
2532
2533 ret_val, ret_str = self.taskBufferIF.sendCommandTaskPanda(task_id, "Data Carousel. Resumed from staging", True, "resume", properErrorCode=True)
2534
2535 if ret_val == 0:
2536 return True
2537 else:
2538 tmp_log.warning(f"task_id={task_id} failed to resume the task: error_code={ret_val} {ret_str}")
2539 return False
2540
2541 def resume_tasks_from_staging(self):
2542 """
2543 Get tasks with enough staged files and resume them
2544 """
2545 tmp_log = LogWrapper(logger, "resume_tasks_from_staging")
2546 ret_requests_map, ret_relation_map = self.taskBufferIF.get_data_carousel_requests_by_task_status_JEDI(status_filter_list=["staging"])
2547 n_resumed_tasks = 0
2548 for task_id, request_id_list in ret_relation_map.items():
2549 to_resume = False
2550 try:
2551
2552
2553
2554
2555
2556 for request_id in request_id_list:
2557 dc_req_spec = ret_requests_map[request_id]
2558
2559
2560
2561
2562
2563
2564
2565
2566
2567
2568
2569
2570
2571
2572 if dc_req_spec.ddm_rule_id and dc_req_spec.status in [
2573 DataCarouselRequestStatus.staging,
2574 DataCarouselRequestStatus.done,
2575 DataCarouselRequestStatus.retired,
2576 ]:
2577 to_resume = True
2578 break
2579 if to_resume:
2580
2581 ret_val = self._resume_task(task_id)
2582 if ret_val:
2583 n_resumed_tasks += 1
2584 tmp_log.debug(f"task_id={task_id} resumed the task")
2585 else:
2586 tmp_log.warning(f"task_id={task_id} failed to resume the task; skipped")
2587 except Exception:
2588 tmp_log.error(f"task_id={task_id} got error ; {traceback.format_exc()}")
2589
2590 tmp_log.debug(f"resumed {n_resumed_tasks} tasks")
2591
2592 def _get_orphan_requests(self) -> list[DataCarouselRequestSpec] | None:
2593 """
2594 Get orphan requests, which are active and without any tasks related
2595
2596 Returns:
2597 list[DataCarouselRequestSpec]|None : list of orphan requests, or None if failure
2598 """
2599 tmp_log = LogWrapper(logger, f"_get_orphan_requests")
2600 status_var_names_str, status_var_map = get_sql_IN_bind_variables(DataCarouselRequestStatus.active_statuses, prefix=":status")
2601 sql = (
2602 f"SELECT {DataCarouselRequestSpec.columnNames()} "
2603 f"FROM {panda_config.schemaJEDI}.data_carousel_requests "
2604 f"WHERE request_id NOT IN "
2605 f"(SELECT rel.request_id FROM {panda_config.schemaJEDI}.data_carousel_relations rel)"
2606 f"AND status IN ({status_var_names_str}) "
2607 )
2608 var_map = {}
2609 var_map.update(status_var_map)
2610 res_list = self.taskBufferIF.querySQL(sql, var_map, arraySize=99999)
2611 if res_list:
2612 ret_list = []
2613 for res in res_list:
2614 dc_req_spec = DataCarouselRequestSpec()
2615 dc_req_spec.pack(res)
2616 ret_list.append(dc_req_spec)
2617 tmp_log.debug(f"got {len(ret_list)} orphan request")
2618 return ret_list
2619 else:
2620 tmp_log.debug("no orphan request; skipped")
2621 return None
2622
2623 def clean_up_requests(
2624 self, done_age_limit_days: int | float = DONE_LIFETIME_DAYS, outdated_age_limit_days: int | float = DONE_LIFETIME_DAYS, by: str = "watchdog"
2625 ):
2626 """
2627 Clean up terminated and outdated requests
2628
2629 Args:
2630 done_age_limit_days (int|float): age limit in days for requests done and without active tasks
2631 outdated_age_limit_days (int|float): age limit in days for outdated requests
2632 by (str): annotation of the caller of this method; default is "watchdog"
2633 """
2634 tmp_log = LogWrapper(logger, "clean_up_requests")
2635 try:
2636
2637 done_requests_set = set()
2638 cancelled_requests_set = set()
2639 retired_requests_set = set()
2640
2641 terminated_tasks_requests_map, terminated_tasks_relation_map = self.taskBufferIF.get_data_carousel_requests_by_task_status_JEDI(
2642 status_filter_list=FINAL_TASK_STATUSES
2643 )
2644 active_tasks_requests_map, active_tasks_relation_map = self.taskBufferIF.get_data_carousel_requests_by_task_status_JEDI(
2645 status_exclusion_list=FINAL_TASK_STATUSES
2646 )
2647 now_time = naive_utcnow()
2648
2649 request_ids_of_terminated_tasks = set()
2650 for request_id_list in terminated_tasks_relation_map.values():
2651 request_ids_of_terminated_tasks |= set(request_id_list)
2652
2653 request_ids_of_active_tasks = set()
2654 for request_id_list in active_tasks_relation_map.values():
2655 request_ids_of_active_tasks |= set(request_id_list)
2656
2657 for request_id in request_ids_of_terminated_tasks:
2658 if request_id in active_tasks_requests_map:
2659
2660 continue
2661 dc_req_spec = terminated_tasks_requests_map[request_id]
2662 if dc_req_spec.status == DataCarouselRequestStatus.done and (
2663 (dc_req_spec.end_time and dc_req_spec.end_time < now_time - timedelta(days=done_age_limit_days))
2664 or dc_req_spec.get_parameter("rule_unfound")
2665 or dc_req_spec.get_parameter("remove_when_done")
2666 ):
2667
2668 done_requests_set.add(request_id)
2669 elif dc_req_spec.status == DataCarouselRequestStatus.queued:
2670
2671 self.cancel_request(dc_req_spec, by=by, reason="queued_while_all_tasks_ended")
2672 elif dc_req_spec.status == DataCarouselRequestStatus.staging:
2673
2674 self.cancel_request(dc_req_spec, by=by, reason="staging_while_all_tasks_ended")
2675 elif dc_req_spec.status == DataCarouselRequestStatus.cancelled:
2676
2677 cancelled_requests_set.add(request_id)
2678 elif dc_req_spec.status == DataCarouselRequestStatus.retired:
2679
2680 retired_requests_set.add(request_id)
2681
2682 for request_id in request_ids_of_active_tasks:
2683 dc_req_spec = active_tasks_requests_map[request_id]
2684 if dc_req_spec.status == DataCarouselRequestStatus.staging and dc_req_spec.get_parameter("rule_unfound"):
2685
2686 self.cancel_request(dc_req_spec, by=by, reason="rule_unfound")
2687
2688 orphan_requests = self._get_orphan_requests()
2689 if orphan_requests:
2690 for dc_req_spec in orphan_requests:
2691 self.cancel_request(dc_req_spec, by=by, reason="orphan")
2692
2693 for request_id in done_requests_set | cancelled_requests_set | retired_requests_set:
2694 dc_req_spec = terminated_tasks_requests_map[request_id]
2695 ddm_rule_id = dc_req_spec.ddm_rule_id
2696 if ddm_rule_id:
2697 try:
2698 ret = self.ddmIF.delete_replication_rule(ddm_rule_id)
2699 if ret is False:
2700 tmp_log.warning(f"request_id={request_id} ddm_rule_id={ddm_rule_id} rule not found ; skipped")
2701 else:
2702 tmp_log.debug(f"request_id={request_id} ddm_rule_id={ddm_rule_id} deleted DDM rule")
2703 except Exception:
2704 tmp_log.error(f"request_id={request_id} ddm_rule_id={ddm_rule_id} failed to delete DDM rule; {traceback.format_exc()}")
2705
2706 if done_requests_set or cancelled_requests_set or retired_requests_set:
2707 if done_requests_set:
2708
2709 ret = self.taskBufferIF.delete_data_carousel_requests_JEDI(list(done_requests_set))
2710 if ret is None:
2711 tmp_log.warning(f"failed to delete done requests; skipped")
2712 else:
2713 tmp_log.debug(f"deleted {ret} done requests older than {done_age_limit_days} days or rule_unfound or remove_when_done")
2714 if cancelled_requests_set:
2715
2716 ret = self.taskBufferIF.delete_data_carousel_requests_JEDI(list(cancelled_requests_set))
2717 if ret is None:
2718 tmp_log.warning(f"failed to delete cancelled requests; skipped")
2719 else:
2720 tmp_log.debug(f"deleted {ret} cancelled requests")
2721 if retired_requests_set:
2722
2723 ret = self.taskBufferIF.delete_data_carousel_requests_JEDI(list(retired_requests_set))
2724 if ret is None:
2725 tmp_log.warning(f"failed to delete retired requests; skipped")
2726 else:
2727 tmp_log.debug(f"deleted {ret} retired requests")
2728 else:
2729 tmp_log.debug(f"no terminated requests to delete; skipped")
2730
2731 ret_outdated = self.taskBufferIF.clean_up_data_carousel_requests_JEDI(time_limit_days=outdated_age_limit_days)
2732 if ret_outdated is None:
2733 tmp_log.warning(f"failed to delete outdated requests; skipped")
2734 else:
2735 tmp_log.debug(f"deleted {ret_outdated} outdated requests older than {outdated_age_limit_days} days")
2736 except Exception:
2737 tmp_log.error(f"got error ; {traceback.format_exc()}")
2738
2739 def rescue_pending_tasks_with_done_requests(self):
2740 """
2741 Rescue pending tasks which have data carousel requests in done status by updating staged files in DB
2742 Usually these tasks reuse data carousel requests done previously
2743 """
2744 tmp_log = LogWrapper(logger, "rescue_pending_tasks_with_done_requests")
2745
2746 pending_tasked_requests_map, pending_tasked_relation_map = self.taskBufferIF.get_data_carousel_requests_by_task_status_JEDI(
2747 status_filter_list=["pending"]
2748 )
2749
2750 pending_tasked_inverse_relation_map = dict()
2751 for task_id, request_id_list in pending_tasked_relation_map.items():
2752 for request_id in request_id_list:
2753 pending_tasked_inverse_relation_map.setdefault(request_id, [])
2754 pending_tasked_inverse_relation_map[request_id].append(task_id)
2755
2756 for dc_req_spec in pending_tasked_requests_map.values():
2757 try:
2758 if dc_req_spec.status == DataCarouselRequestStatus.done:
2759
2760 task_id_list = pending_tasked_inverse_relation_map.get(dc_req_spec.request_id, [])
2761 if task_id_list:
2762 self._update_staged_files(dc_req_spec, task_id_list=task_id_list)
2763 tmp_log.debug(f"request_id={dc_req_spec.request_id} status={dc_req_spec.status} updated staged files for pending tasks: {task_id_list}")
2764 except Exception:
2765 tmp_log.error(f"request_id={dc_req_spec.request_id} got error ; {traceback.format_exc()}")
2766
2767 def resubmit_request(
2768 self, orig_dc_req_spec: DataCarouselRequestSpec, submit_idds_request=True, exclude_prev_dst: bool = False, by: str = "manual", reason: str | None = None
2769 ) -> tuple[DataCarouselRequestSpec | None, str | None]:
2770 """
2771 Resubmit a request by ending the old request and submitting a new request
2772 The request status must be in staging, done, cancelled, retired
2773 A staging request will be cancelled, a done request will be retired. Cancelled and retired requests are intact
2774 Return the spec of newly resubmitted request
2775
2776 Args:
2777 orig_dc_req_spec (DataCarouselRequestSpec): original spec of the request to resubmit from
2778 submit_idds_request (bool): whether to submit corresponding iDDS request; default is True
2779 exclude_prev_dst (bool): whether to exclude previous destination
2780 by (str): annotation of the caller of this method; default is "manual"
2781 reason (str|None): annotation of the reason for resubmitting
2782
2783 Returns:
2784 DataCarouselRequestSpec|None : spec of the resubmitted reqeust spec if success, None otherwise
2785 str|None : error message if any, None otherwise
2786 """
2787 tmp_log = LogWrapper(
2788 logger,
2789 f"resubmit_request orig_request_id={orig_dc_req_spec.request_id} exclude_prev_dst={exclude_prev_dst} by={by}"
2790 + (f" reason={reason}" if reason else " "),
2791 )
2792
2793 dc_req_spec_resubmitted = None
2794 err_msg = None
2795
2796 with self.request_lock(orig_dc_req_spec.request_id) as locked_spec:
2797 if not locked_spec:
2798
2799 err_msg = "did not get lock; skipped"
2800 tmp_log.warning(err_msg)
2801 return None, err_msg
2802
2803 orig_dc_req_spec = locked_spec
2804
2805 dummy_dc_req_spec_to_resubmit = get_resubmit_request_spec(orig_dc_req_spec, exclude_prev_dst)
2806
2807 destination_rse = self._choose_destination_rse_for_request(dummy_dc_req_spec_to_resubmit)
2808 if destination_rse is None:
2809 err_msg = f"no other available destination RSE for request_id={orig_dc_req_spec.request_id}; skipped"
2810 tmp_log.warning(err_msg)
2811 return None, err_msg
2812
2813 dc_req_spec_resubmitted = self.taskBufferIF.resubmit_data_carousel_request_JEDI(orig_dc_req_spec.request_id, exclude_prev_dst)
2814 if dc_req_spec_resubmitted:
2815 new_request_id = dc_req_spec_resubmitted.request_id
2816 tmp_log.debug(f"resubmitted request_id={new_request_id}")
2817
2818 if orig_dc_req_spec.ddm_rule_id:
2819 short_time = 5
2820 self._refresh_ddm_rule(orig_dc_req_spec.ddm_rule_id, short_time)
2821
2822 is_ok, _, dc_req_spec_resubmitted = self.stage_request(
2823 dc_req_spec_resubmitted, destination_rse=destination_rse, submit_idds_request=submit_idds_request
2824 )
2825 if is_ok:
2826 tmp_log.debug(f"staged resubmitted request_id={new_request_id}")
2827 else:
2828 err_msg = f"failed to stage resubmitted request_id={new_request_id}; skipped"
2829 tmp_log.warning(err_msg)
2830 elif dc_req_spec_resubmitted is False:
2831 err_msg = f"request not found or not resubmittable; skipped"
2832 tmp_log.warning(err_msg)
2833 else:
2834 err_msg = f"failed to resubmit"
2835 tmp_log.error(err_msg)
2836
2837 return dc_req_spec_resubmitted, err_msg
2838
2839
2840
2841
2842
2843
2844
2845
2846
2847
2848
2849
2850
2851
2852
2853 def change_request_source_rse(
2854 self, dc_req_spec: DataCarouselRequestSpec, cancel_fts: bool = False, change_src_expr: bool = False, source_rse: str | None = None
2855 ) -> tuple[bool | None, DataCarouselRequestSpec, str | None]:
2856 """
2857 Change source RSE
2858 If the request is staging, unset source_replica_expression of its DDM rule
2859 If still queued, re-choose the source_rse to be another tape
2860 Skipped if the request is not staging or queued
2861
2862 Args:
2863 dc_req_spec (DataCarouselRequestSpec): spec of the request
2864 cancel_fts (bool): whether to cancel current FTS requests on DDM
2865 change_src_expr (bool): whether to change source_replica_expression of the DDM rule by replacing old source with new one, instead of just dropping old source
2866 source_rse (str|None): if set, use this source RSE instead of choosing one randomly, also force change_src_expr to be True; default is None
2867
2868 Returns:
2869 bool|None : True for success, False for failure, None if skipped
2870 DataCarouselRequestSpec|None: spec of the request after changing source
2871 str|None: error message if any, None otherwise
2872 """
2873 tmp_log = LogWrapper(logger, f"change_request_source_rse request_id={dc_req_spec.request_id}")
2874 if source_rse:
2875 change_src_expr = True
2876 ret = None
2877 err_msg = None
2878 rse_set = set()
2879
2880 active_source_rses_set = self._get_active_source_rses()
2881 dataset = dc_req_spec.dataset
2882 source_type, rse_set_orig, staging_rule, to_pin, suggested_dst_list = self._get_source_type_of_dataset(dataset, active_source_rses_set)
2883 replicas_map = self._get_full_replicas_per_type(dataset)
2884
2885 if dc_req_spec.status == DataCarouselRequestStatus.staging and not rse_set_orig:
2886
2887 rse_set |= set(replicas_map["tape"])
2888 if rse_set_orig:
2889 rse_set |= rse_set_orig
2890 if (
2891 staging_rule
2892 and (source_replica_expression := staging_rule.get("source_replica_expression"))
2893 and source_replica_expression == f"{SRC_REPLI_EXPR_PREFIX}|{dc_req_spec.source_rse}"
2894 ):
2895
2896 rse_set.discard(dc_req_spec.source_rse)
2897
2898 if dc_req_spec.status == DataCarouselRequestStatus.queued:
2899
2900 rse_set.discard(dc_req_spec.source_rse)
2901 if not rse_set:
2902
2903 err_msg = f"dataset={dataset} has no other source RSE available; skipped"
2904 tmp_log.warning(err_msg)
2905 ret = False
2906 elif source_type == "datadisk":
2907
2908 err_msg = f"dataset={dataset} already has replica on datadisks {rse_set} ; skipped"
2909 tmp_log.debug(err_msg)
2910 ret = False
2911 elif source_type == "tape":
2912
2913 new_source_rse = None
2914 if source_rse:
2915 if source_rse not in rse_set:
2916
2917 err_msg = f"dataset={dataset} source_rse={source_rse} not in available RSEs {rse_set} ; skipped"
2918 tmp_log.warning(err_msg)
2919 ret = False
2920 else:
2921
2922 new_source_rse = source_rse
2923 else:
2924
2925 tmp_log.debug(f"dataset={dataset} on tapes {rse_set} ; choosing one")
2926 _, new_source_rse, ddm_rule_id = self._choose_tape_source_rse(dataset, rse_set, staging_rule)
2927 if not new_source_rse:
2928
2929 err_msg = f"dataset={dataset} failed to choose source RSE ; skipped"
2930 tmp_log.warning(err_msg)
2931 ret = False
2932
2933 if ret is not False and new_source_rse:
2934 with self.request_lock(dc_req_spec.request_id) as locked_spec:
2935 if not locked_spec:
2936
2937 err_msg = "did not get lock; skipped"
2938 tmp_log.warning(err_msg)
2939 return False, dc_req_spec, err_msg
2940
2941 dc_req_spec = locked_spec
2942
2943 dc_req_spec.source_rse = new_source_rse
2944 dc_req_spec.ddm_rule_id = ddm_rule_id
2945 dc_req_spec.source_tape = self._get_source_tape_from_rse(dc_req_spec.source_rse)
2946 if to_pin:
2947 dc_req_spec.set_parameter("to_pin", True)
2948 if suggested_dst_list:
2949 dc_req_spec.set_parameter("suggested_dst_list", suggested_dst_list)
2950
2951 tmp_ret = self.taskBufferIF.update_data_carousel_request_JEDI(dc_req_spec)
2952 if tmp_ret:
2953 tmp_log.info(
2954 f"updated DB about change source of queued request; source_rse={dc_req_spec.source_rse} , ddm_rule_id={dc_req_spec.ddm_rule_id} , to_pin={to_pin} , suggested_dst_list={suggested_dst_list}"
2955 )
2956 ret = True
2957 else:
2958 err_msg = f"failed to update DB ; skipped"
2959 tmp_log.error(err_msg)
2960 ret = False
2961 else:
2962
2963 err_msg = f"dataset={dataset} has no replica on any tape or datadisk ; skipped"
2964 tmp_log.debug(err_msg)
2965 ret = False
2966 elif dc_req_spec.status == DataCarouselRequestStatus.staging:
2967
2968 set_map = {"source_replica_expression": None}
2969 if change_src_expr:
2970
2971 if not rse_set:
2972
2973 err_msg = f"dataset={dataset} has no other source RSE available; skipped"
2974 tmp_log.warning(err_msg)
2975 ret = False
2976 else:
2977 new_source_rse = None
2978 if source_rse:
2979 if source_rse not in rse_set:
2980
2981 err_msg = f"dataset={dataset} source_rse={source_rse} not in available RSEs {rse_set} ; skipped"
2982 tmp_log.warning(err_msg)
2983 ret = False
2984 else:
2985
2986 new_source_rse = source_rse
2987 else:
2988
2989 tmp_log.debug(f"dataset={dataset} on tapes {rse_set} ; choosing one")
2990 new_source_rse = random.choice(list(rse_set))
2991 if not new_source_rse:
2992
2993 err_msg = f"dataset={dataset} failed to choose source RSE ; skipped"
2994 tmp_log.warning(err_msg)
2995 ret = False
2996 if ret is not False and new_source_rse:
2997 with self.request_lock(dc_req_spec.request_id) as locked_spec:
2998 if not locked_spec:
2999
3000 err_msg = "did not get lock; skipped"
3001 tmp_log.warning(err_msg)
3002 return False, dc_req_spec, err_msg
3003 else:
3004
3005 dc_req_spec = locked_spec
3006
3007 dc_req_spec.source_rse = new_source_rse
3008 dc_req_spec.source_tape = self._get_source_tape_from_rse(dc_req_spec.source_rse)
3009
3010 set_map["source_replica_expression"] = f"{SRC_REPLI_EXPR_PREFIX}|{dc_req_spec.source_rse}"
3011
3012 tmp_ret = self.taskBufferIF.update_data_carousel_request_JEDI(dc_req_spec)
3013 if tmp_ret is not None:
3014 tmp_log.info(
3015 f"updated DB about change source of staging request; source_rse={dc_req_spec.source_rse} , ddm_rule_id={dc_req_spec.ddm_rule_id}"
3016 )
3017 ret = True
3018 else:
3019 err_msg = f"failed to update DB ; skipped"
3020 tmp_log.error(err_msg)
3021 ret = False
3022
3023 if ret is not False:
3024 tmp_ret = self.ddmIF.update_rule_by_id(dc_req_spec.ddm_rule_id, set_map)
3025 if tmp_ret:
3026 tmp_log.debug(f"ddm_rule_id={dc_req_spec.ddm_rule_id} params={set_map} done")
3027 ret = True
3028 else:
3029 err_msg = f"ddm_rule_id={dc_req_spec.ddm_rule_id} params={set_map} failed to update DDM rule ; skipped"
3030 tmp_log.error(f"ddm_rule_id={dc_req_spec.ddm_rule_id} params={set_map} got {tmp_ret}; skipped")
3031 ret = False
3032
3033 if cancel_fts:
3034 cancel_fts_success = True
3035
3036 _set_map = {"cancel_requests": True, "state": "STUCK"}
3037 tmp_ret = self.ddmIF.update_rule_by_id(dc_req_spec.ddm_rule_id, _set_map)
3038 if tmp_ret:
3039 tmp_log.debug(f"ddm_rule_id={dc_req_spec.ddm_rule_id} params={_set_map} done")
3040 else:
3041 tmp_log.warning(f"ddm_rule_id={dc_req_spec.ddm_rule_id} params={_set_map} got {tmp_ret} ; skipped")
3042 cancel_fts_success = False
3043
3044 _set_map = {"boost_rule": True}
3045 tmp_ret = self.ddmIF.update_rule_by_id(dc_req_spec.ddm_rule_id, _set_map)
3046 if tmp_ret:
3047 tmp_log.debug(f"ddm_rule_id={dc_req_spec.ddm_rule_id} params={_set_map} done")
3048 else:
3049 tmp_log.warning(f"ddm_rule_id={dc_req_spec.ddm_rule_id} params={_set_map} got {tmp_ret} ; skipped")
3050 cancel_fts_success = False
3051 if not cancel_fts_success:
3052 err_msg = f"ddm_rule_id={dc_req_spec.ddm_rule_id} failed to cancel FTS requests ; skipped"
3053 return ret, dc_req_spec, err_msg