Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-10 08:39:06

0001 import copy
0002 import functools
0003 import json
0004 import os
0005 import random
0006 import re
0007 import socket
0008 import time
0009 import traceback
0010 from collections import namedtuple
0011 from contextlib import contextmanager
0012 from dataclasses import MISSING, InitVar, asdict, dataclass, field
0013 from datetime import datetime, timedelta
0014 from typing import Any, Dict, List
0015 
0016 from pandacommon.pandalogger.LogWrapper import LogWrapper
0017 from pandacommon.pandalogger.PandaLogger import PandaLogger
0018 from pandacommon.pandautils.base import SpecBase
0019 from pandacommon.pandautils.PandaUtils import get_sql_IN_bind_variables, naive_utcnow
0020 
0021 from pandaserver.config import panda_config
0022 from pandaserver.dataservice.ddm import rucioAPI
0023 
0024 import polars as pl  # isort:skip
0025 
0026 import idds.common.constants  # isort:skip
0027 import idds.common.utils  # isort:skip
0028 from idds.client.client import Client as iDDS_Client  # isort:skip
0029 
0030 
0031 # main logger
0032 logger = PandaLogger().getLogger(__name__.split(".")[-1])
0033 
0034 
0035 # ==============================================================
0036 
0037 # global DC lock component name
0038 GLOBAL_DC_LOCK_NAME = "DataCarousel"
0039 
0040 # schema version of database config
0041 DC_CONFIG_SCHEMA_VERSION = 1
0042 
0043 # final task statuses
0044 FINAL_TASK_STATUSES = ["done", "finished", "failed", "exhausted", "aborted", "toabort", "aborting", "broken", "tobroken"]
0045 
0046 # named tuple for attribute with type
0047 AttributeWithType = namedtuple("AttributeWithType", ["attribute", "type"])
0048 
0049 # DDM rule activity map for data carousel
0050 DDM_RULE_ACTIVITY_MAP = {"anal": "Data Carousel Analysis", "prod": "Data Carousel Production"}
0051 
0052 # template strings
0053 # source replica expression prefix
0054 SRC_REPLI_EXPR_PREFIX = "rse_type=DISK"
0055 # destination replica expression for datasets to pin (with replica on disk but without rule to stay on datadisk)
0056 TO_PIN_DST_REPLI_EXPR = "type=DATADISK"
0057 # source or destination replica expression for available RSEs (exclude downtime)
0058 AVAIL_REPLI_EXPR_SUFFIX = "&availability_write&availability_read"
0059 
0060 # DDM rule lifetime in day to keep
0061 STAGING_LIFETIME_DAYS = 15
0062 DONE_LIFETIME_DAYS = 30
0063 
0064 # DDM rule refresh hard lifetime limits (refresh only if lifetime within the range)
0065 TO_REFRESH_MAX_LIFETIME_DAYS = 7
0066 TO_REFRESH_MIN_LIFETIME_HOURS = 2
0067 
0068 # maximum quota of files for fair share queue before normal queue
0069 QUEUE_FAIR_SHARE_MAX_QUOTA = 10000
0070 
0071 # polars config
0072 pl.Config.set_ascii_tables(True)
0073 pl.Config.set_tbl_hide_dataframe_shape(True)
0074 pl.Config.set_tbl_hide_column_data_types(True)
0075 pl.Config.set_tbl_rows(-1)
0076 pl.Config.set_tbl_cols(-1)
0077 pl.Config.set_tbl_width_chars(140)
0078 
0079 # ==============================================================
0080 
0081 
0082 class DataCarouselRequestStatus(object):
0083     """
0084     Data carousel request status
0085     """
0086 
0087     queued = "queued"
0088     staging = "staging"
0089     done = "done"
0090     cancelled = "cancelled"
0091     retired = "retired"
0092 
0093     active_statuses = [queued, staging]
0094     final_statuses = [done, cancelled, retired]
0095     unfinished_statuses = [staging, cancelled]
0096     reusable_statuses = [queued, staging, done]
0097     resubmittable_statuses = [staging, cancelled, done, retired]
0098 
0099 
0100 class DataCarouselRequestSpec(SpecBase):
0101     """
0102     Data carousel request specification
0103     """
0104 
0105     # attributes with types
0106     attributes_with_types = (
0107         AttributeWithType("request_id", int),
0108         AttributeWithType("dataset", str),
0109         AttributeWithType("source_rse", str),
0110         AttributeWithType("destination_rse", str),
0111         AttributeWithType("ddm_rule_id", str),
0112         AttributeWithType("status", str),
0113         AttributeWithType("total_files", int),
0114         AttributeWithType("staged_files", int),
0115         AttributeWithType("dataset_size", int),
0116         AttributeWithType("staged_size", int),
0117         AttributeWithType("creation_time", datetime),
0118         AttributeWithType("start_time", datetime),
0119         AttributeWithType("end_time", datetime),
0120         AttributeWithType("modification_time", datetime),
0121         AttributeWithType("check_time", datetime),
0122         AttributeWithType("source_tape", str),
0123         AttributeWithType("parameters", str),
0124         AttributeWithType("last_staged_time", datetime),
0125         AttributeWithType("locked_by", str),
0126         AttributeWithType("lock_time", datetime),
0127     )
0128     # attributes
0129     attributes = tuple([attr.attribute for attr in attributes_with_types])
0130     # attributes which have 0 by default
0131     _zeroAttrs = ()
0132     # attributes to force update
0133     _forceUpdateAttrs = ()
0134     # mapping between sequence and attr
0135     _seqAttrMap = {"request_id": f"{panda_config.schemaJEDI}.JEDI_DATA_CAROUSEL_REQUEST_ID_SEQ.nextval"}
0136 
0137     @property
0138     def parameter_map(self) -> dict:
0139         """
0140         Get the dictionary parsed by the parameters attribute in JSON
0141         Possible parameters:
0142             "reuse_rule" (bool): reuse DDM rule instead of submitting new one
0143             "resub_from" (int): resubmitted from this oringal request ID
0144             "prev_src" (str): previous source RSE
0145             "prev_dst" (str): previous destination RSE
0146             "excluded_dst_list" (list[str]): list of excluded destination RSEs
0147             "rule_unfound" (bool): DDM rule not found
0148             "to_pin" (bool): whether to pin the dataset
0149             "suggested_dst_list" (list[str]): list of suggested destination RSEs
0150             "remove_when_done" (bool): remove request and DDM rule asap when request done to save disk space
0151             "task_id": (int): task_id of the task which initiates the request
0152             "init_task_gshare": (str): (deprecated) original gshare of the task which initiates the request, for statistics
0153             "task_gshare": (str): original gshare of the task which initiates the request, for statistics
0154             "task_type": (str): type of the task (prod, anal) which initiates this request, for statistics
0155             "task_user": (str): user of the task which initiates this request, for statistics
0156             "task_group": (str): working group of the task which initiates this request, for statistics
0157 
0158         Returns:
0159             dict : dict of parameters if it is JSON or empty dict if null
0160         """
0161         if self.parameters is None:
0162             return {}
0163         else:
0164             return json.loads(self.parameters)
0165 
0166     @parameter_map.setter
0167     def parameter_map(self, value_map: dict):
0168         """
0169         Set the dictionary and store in parameters attribute in JSON
0170 
0171         Args:
0172             value_map (dict): dict to set the parameter map
0173         """
0174         self.parameters = json.dumps(value_map)
0175 
0176     def get_parameter(self, param: str) -> Any:
0177         """
0178         Get the value of one parameter. None as default
0179 
0180         Args:
0181             param (str): parameter name
0182 
0183         Returns:
0184             Any : value of the parameter; None if parameter not set
0185         """
0186         tmp_dict = self.parameter_map
0187         return tmp_dict.get(param)
0188 
0189     def set_parameter(self, param: str, value):
0190         """
0191         Set the value of one parameter and store in parameters attribute in JSON
0192 
0193         Args:
0194             param (str): parameter name
0195             value (Any): value of the parameter to set; must be JSON-serializable
0196         """
0197         tmp_dict = self.parameter_map
0198         tmp_dict[param] = value
0199         self.parameter_map = tmp_dict
0200 
0201     def update_parameters(self, params: dict):
0202         """
0203         Update values of parameters with a dict and store in parameters attribute in JSON
0204 
0205         Args:
0206             params (dict): dict of parameter names and values to set
0207         """
0208         tmp_dict = self.parameter_map
0209         tmp_dict.update(params)
0210         self.parameter_map = tmp_dict
0211 
0212 
0213 class DataCarouselRequestTransaction(object):
0214     """
0215     Data Carousel request transaction object
0216     This is a wrapper for DataCarouselRequestSpec to provide additional methods about DB transaction
0217     """
0218 
0219     def __init__(self, dc_req_spec: DataCarouselRequestSpec, db_cur, db_log):
0220         """
0221         Constructor
0222 
0223         Args:
0224             dc_req_spec (DataCarouselRequestSpec): request specification
0225             db_cur: database cursor for DB operations
0226             db_log: database logger for logging DB operations
0227         """
0228         self.spec = dc_req_spec
0229         self.db_cur = db_cur
0230         self.db_log = db_log
0231 
0232     def update_spec(self, dc_req_spec: DataCarouselRequestSpec, to_db: bool = True) -> bool | None:
0233         """
0234         Update the request specification with a new one
0235 
0236         Args:
0237             dc_req_spec (DataCarouselRequestSpec): new request specification to update
0238             to_db (bool): whether to update the request in DB; default True
0239 
0240         Returns:
0241             bool|None : True if updated in DB successfully, False if failed to update in DB, None if not updating in DB
0242         """
0243         if to_db:
0244             # update the request in DB
0245             comment = " /* DataCarouselRequestTransaction.update_spec */"
0246             dc_req_spec.modification_time = naive_utcnow()
0247             sql_update = (
0248                 f"UPDATE {panda_config.schemaJEDI}.data_carousel_requests " f"SET {dc_req_spec.bindUpdateChangesExpression()} " "WHERE request_id=:request_id "
0249             )
0250             var_map = dc_req_spec.valuesMap(useSeq=False, onlyChanged=True)
0251             var_map[":request_id"] = dc_req_spec.request_id
0252             self.db_cur.execute(sql_update + comment, var_map)
0253             if self.db_cur.rowcount == 0:
0254                 # no rows updated; request_id not found
0255                 self.db_log.warning(f"request_id={dc_req_spec.request_id} not updated")
0256                 return False
0257             else:
0258                 self.spec = dc_req_spec
0259                 self.db_log.debug(f"updated request_id={dc_req_spec.request_id} {dc_req_spec.bindUpdateChangesExpression()}")
0260                 return True
0261         else:
0262             self.spec = dc_req_spec
0263             return None
0264 
0265 
0266 # ==============================================================
0267 # Dataclasses of configurations #
0268 # ===============================
0269 
0270 
0271 @dataclass
0272 class SourceTapeConfig:
0273     """
0274     Dataclass for source tape configuration parameters
0275 
0276     Fields:
0277         active                  (bool)  : whether the tape is active
0278         max_size                (int)   : maximum number of n_files_queued + nfiles_staging from this tape
0279         max_staging_ratio       (int)   : maximum ratio percent of nfiles_staging / (n_files_queued + nfiles_staging)
0280         destination_expression  (str)   : rse_expression for DDM to filter the destination RSE
0281     """
0282 
0283     active: bool = False
0284     max_size: int = 10000
0285     max_staging_ratio: int = 50
0286     destination_expression: str = "type=DATADISK&datapolicynucleus=True&freespace>300"
0287 
0288 
0289 @dataclass
0290 class SourceRSEConfig:
0291     """
0292     Dataclass for source RSE configuration parameters
0293 
0294     Fields:
0295         tape                    (str)       : is mapped to this source physical tape
0296         active                  (bool|None) : whether the source RSE is active
0297         max_size                (int|None)  : maximum number of n_files_queued + nfiles_staging from this RSE
0298         max_staging_ratio       (int|None)  : maximum ratio percent of nfiles_staging / (n_files_queued + nfiles_staging)
0299     """
0300 
0301     tape: str
0302     active: bool | None = None
0303     max_size: int | None = None
0304     max_staging_ratio: int | None = None
0305 
0306 
0307 # Main config; must be bottommost of all config dataclasses
0308 @dataclass
0309 class DataCarouselMainConfig:
0310     """
0311     Dataclass for DataCarousel main configuration parameters
0312 
0313     Fields:
0314         source_tapes_config     (dict)  : configuration of source physical tapes, in form of {"TAPE_1": SourceTapeConfig_of_TAPE_1, ...}
0315         source_rses_config      (dict)  : configuration of source RSEs, each should be mapped to some source tape, in form of {"RSE_1": SourceRSEConfig_of_RSE_1, ...}
0316         excluded_destinations   (list)  : excluded destination RSEs
0317         early_access_users      (list)  : PanDA user names for early access to Data Carousel
0318     """
0319 
0320     source_tapes_config: Dict[str, Any] = field(default_factory=dict)
0321     source_rses_config: Dict[str, Any] = field(default_factory=dict)
0322     excluded_destinations: List[str] = field(default_factory=list)
0323     early_access_users: List[str] = field(default_factory=list)
0324 
0325     def __post_init__(self):
0326         # map of the attributes with nested dict and corresponding dataclasses
0327         converting_attr_type_map = {
0328             "source_tapes_config": SourceTapeConfig,
0329             "source_rses_config": SourceRSEConfig,
0330         }
0331         # convert the value-dicts of the attributes to corresponding dataclasses
0332         for attr, klass in converting_attr_type_map.items():
0333             if isinstance(_map := getattr(self, attr, None), dict):
0334                 converted_dict = {}
0335                 for key, value in _map.items():
0336                     converted_dict[key] = klass(**value)
0337                 setattr(self, attr, converted_dict)
0338 
0339 
0340 # ==============================================================
0341 # Functions #
0342 # ===========
0343 
0344 
0345 def get_resubmit_request_spec(dc_req_spec: DataCarouselRequestSpec, exclude_prev_dst: bool = False) -> DataCarouselRequestSpec | None:
0346     """
0347     Get a new request spec to resubmit according to original request spec
0348 
0349     Args:
0350         dc_req_spec (DataCarouselRequestSpec): oringal spec of the request
0351         exclude_prev_dst (bool): whether to exclude previous destination
0352 
0353     Returns:
0354         DataCarouselRequestSpec|None : spec of the request to resubmit, or None if failed
0355     """
0356     tmp_log = LogWrapper(logger, f"get_resubmit_request_spec")
0357     try:
0358         # make new request spec
0359         now_time = naive_utcnow()
0360         dc_req_spec_to_resubmit = DataCarouselRequestSpec()
0361         # attributes to reset
0362         dc_req_spec_to_resubmit.staged_files = 0
0363         dc_req_spec_to_resubmit.staged_size = 0
0364         dc_req_spec_to_resubmit.status = DataCarouselRequestStatus.queued
0365         dc_req_spec_to_resubmit.creation_time = now_time
0366         # get attributes from original request
0367         # TODO: now copy source_rse and source tape from original; may need to re-choose source in the future
0368         dc_req_spec_to_resubmit.dataset = dc_req_spec.dataset
0369         dc_req_spec_to_resubmit.total_files = dc_req_spec.total_files
0370         dc_req_spec_to_resubmit.dataset_size = dc_req_spec.dataset_size
0371         dc_req_spec_to_resubmit.source_rse = dc_req_spec.source_rse
0372         dc_req_spec_to_resubmit.source_tape = dc_req_spec.source_tape
0373         # parameters according to original requests
0374         orig_parameter_map = dc_req_spec.parameter_map
0375         # orig_excluded_dst_set = set(orig_parameter_map.get("excluded_dst_list", []))
0376         excluded_dst_set = set()
0377         if exclude_prev_dst:
0378             # exclude previous destination
0379             excluded_dst_set.add(dc_req_spec.destination_rse)
0380         # TODO: mechanism to exclude problematic source or destination RSE (need approach to store historical datasets/RSEs)
0381         dc_req_spec_to_resubmit.parameter_map = {
0382             "resub_from": dc_req_spec.request_id,
0383             "prev_src": dc_req_spec.source_rse,
0384             "prev_dst": dc_req_spec.destination_rse,
0385             "excluded_dst_list": list(excluded_dst_set),
0386             "task_id": orig_parameter_map.get("task_id"),
0387             "task_gshare": orig_parameter_map.get("task_gshare"),
0388             "init_task_gshare": orig_parameter_map.get("init_task_gshare"),
0389         }
0390         # return
0391         tmp_log.debug(f"got resubmit request spec for request_id={dc_req_spec.request_id}")
0392         return dc_req_spec_to_resubmit
0393     except Exception:
0394         tmp_log.error(f"got error ; {traceback.format_exc()}")
0395 
0396 
0397 # ==============================================================
0398 
0399 
0400 class DataCarouselInterface(object):
0401     """
0402     Interface for data carousel methods
0403     """
0404 
0405     # constructor
0406     def __init__(self, taskbufferIF, *args, **kwargs):
0407         # attributes
0408         self.taskBufferIF = taskbufferIF
0409         self.ddmIF = rucioAPI
0410         self.tape_rses = []
0411         self.datadisk_rses = []
0412         self.disk_rses = []
0413         self.dc_config_map = None
0414         self._last_update_ts_dict = {}
0415         # full pid
0416         self.full_pid = f"{socket.getfqdn().split('.')[0]}-{os.getpgrp()}-{os.getpid()}"
0417         # refresh
0418         self._refresh_all_attributes()
0419 
0420     def _refresh_all_attributes(self):
0421         """
0422         Refresh by calling all update methods
0423         """
0424         self._update_rses(time_limit_minutes=30)
0425         self._update_dc_config(time_limit_minutes=5)
0426 
0427     @staticmethod
0428     def refresh(func):
0429         """
0430         Decorator to call _refresh_all_attributes before the method
0431         """
0432 
0433         @functools.wraps(func)
0434         def wrapper(self, *args, **kwargs):
0435             self._refresh_all_attributes()
0436             return func(self, *args, **kwargs)
0437 
0438         return wrapper
0439 
0440     def _acquire_global_dc_lock(self, timeout_sec: int = 10, lock_expiration_sec: int = 30) -> str | None:
0441         """
0442         Acquire global Data Carousel lock in DB
0443 
0444         Args:
0445             timeout_sec (int): timeout in seconds for blocking to retry
0446             lock_expiration_sec (int): age of lock in seconds to be considered expired and can be acquired immediately
0447 
0448         Returns:
0449             str|None : full process ID of this process if got lock; None if timeout
0450         """
0451         tmp_log = LogWrapper(logger, f"_acquire_global_dc_lock pid={self.full_pid}")
0452         # time the retry loop
0453         start_mono_time = time.monotonic()
0454         while time.monotonic() - start_mono_time <= timeout_sec:
0455             # try to get the lock
0456             got_lock = self.taskBufferIF.lockProcess_PANDA(
0457                 component=GLOBAL_DC_LOCK_NAME,
0458                 pid=self.full_pid,
0459                 time_limit=lock_expiration_sec / 60.0,
0460             )
0461             if got_lock:
0462                 # got the lock; return
0463                 tmp_log.debug(f"got lock")
0464                 return self.full_pid
0465             else:
0466                 # did not get lock; retry
0467                 time.sleep(0.05)
0468         # timeout
0469         tmp_log.debug(f"timed out; skipped")
0470         return None
0471 
0472     def _release_global_dc_lock(self, full_pid: str | None):
0473         """
0474         Release global Data Carousel lock in DB
0475 
0476         Args:
0477             full_pid (str|None): full process ID which acquired the lock; if None, use self.full_pid
0478         """
0479         tmp_log = LogWrapper(logger, f"_release_global_dc_lock pid={full_pid}")
0480         # get full_pid
0481         if full_pid is None:
0482             full_pid = self.full_pid
0483         # try to release the lock
0484         ret = self.taskBufferIF.unlockProcess_PANDA(
0485             component=GLOBAL_DC_LOCK_NAME,
0486             pid=full_pid,
0487         )
0488         if ret:
0489             # released the lock
0490             tmp_log.debug(f"released lock")
0491         else:
0492             # failed to release lock; skip
0493             tmp_log.error(f"failed to released lock; skipped")
0494         return ret
0495 
0496     @contextmanager
0497     def global_dc_lock(self, timeout_sec: int = 10, lock_expiration_sec: int = 30):
0498         """
0499         Context manager for global Data Carousel lock in DB
0500 
0501         Args:
0502             timeout_sec (int): timeout in seconds for blocking to retry
0503             lock_expiration_sec (int): age of lock in seconds to be considered expired and can be acquired immediately
0504 
0505         Yields:
0506             str : full process ID of this process if got lock; None if timeout
0507         """
0508         # tmp_log = LogWrapper(logger, f"global_dc_lock pid={self.full_pid}")
0509         # try to release the lock
0510         full_pid = self._acquire_global_dc_lock(timeout_sec, lock_expiration_sec)
0511         try:
0512             yield full_pid
0513         finally:
0514             self._release_global_dc_lock(full_pid)
0515 
0516     def get_request_by_id(self, request_id: int) -> DataCarouselRequestSpec | None:
0517         """
0518         Get the spec of the request specified by request_id
0519 
0520         Args:
0521             request_id (int): request_id of the request
0522 
0523         Returns:
0524             DataCarouselRequestSpec|None : spec of the request, or None if failed
0525         """
0526         tmp_log = LogWrapper(logger, f"get_request_by_id request_id={request_id}")
0527         sql = f"SELECT {DataCarouselRequestSpec.columnNames()} " f"FROM {panda_config.schemaJEDI}.data_carousel_requests " f"WHERE request_id=:request_id "
0528         var_map = {":request_id": request_id}
0529         res_list = self.taskBufferIF.querySQL(sql, var_map, arraySize=99999)
0530         if res_list is not None:
0531             if len(res_list) > 1:
0532                 tmp_log.error("more than one requests; unexpected")
0533             else:
0534                 for res in res_list:
0535                     dc_req_spec = DataCarouselRequestSpec()
0536                     dc_req_spec.pack(res)
0537                     return dc_req_spec
0538         else:
0539             tmp_log.warning("no request found; skipped")
0540             return None
0541 
0542     def get_request_by_dataset(self, dataset: str) -> DataCarouselRequestSpec | None:
0543         """
0544         Get the reusable request (not in cancelled or retired) of the dataset
0545 
0546         Args:
0547             dataset (str): dataset name
0548 
0549         Returns:
0550             DataCarouselRequestSpec|None : spec of the request of dataset, or None if failed
0551         """
0552         tmp_log = LogWrapper(logger, f"get_request_by_dataset dataset={dataset}")
0553         status_var_names_str, status_var_map = get_sql_IN_bind_variables(DataCarouselRequestStatus.reusable_statuses, prefix=":status")
0554         sql = (
0555             f"SELECT {DataCarouselRequestSpec.columnNames()} "
0556             f"FROM {panda_config.schemaJEDI}.data_carousel_requests "
0557             f"WHERE dataset=:dataset "
0558             f"AND status IN ({status_var_names_str}) "
0559         )
0560         var_map = {":dataset": dataset}
0561         var_map.update(status_var_map)
0562         res_list = self.taskBufferIF.querySQL(sql, var_map, arraySize=99999)
0563         if res_list is not None:
0564             if len(res_list) > 1:
0565                 tmp_log.error("more than one reusable requests; unexpected")
0566             else:
0567                 for res in res_list:
0568                     dc_req_spec = DataCarouselRequestSpec()
0569                     dc_req_spec.pack(res)
0570                     return dc_req_spec
0571         else:
0572             tmp_log.warning("no reusable request; skipped")
0573             return None
0574 
0575     # @contextmanager
0576     # def request_transaction_by_id(self, request_id: int):
0577     #     """
0578     #     Context manager to get a transaction for the Data Carousel request by request_id
0579 
0580     #     Args:
0581     #         request_id (int): request ID of the Data Carousel request
0582 
0583     #     Yields:
0584     #         DataCarouselRequestTransaction : request object specified by request_id
0585     #     """
0586     #     tmp_log = LogWrapper(logger, f"request_transaction_by_id pid={self.full_pid} request_id={request_id}")
0587     #     with self.taskBufferIF.transaction(name="DataCarouselRequestTransaction") as (db_cur, db_log):
0588     #         # try to get the request
0589     #         dc_req_spec = None
0590     #         sql_get = (
0591     #             f"SELECT {DataCarouselRequestSpec.columnNames()} " f"FROM {panda_config.schemaJEDI}.data_carousel_requests " f"WHERE request_id=:request_id "
0592     #         )
0593     #         var_map = {":request_id": request_id}
0594     #         res_list = db_cur.execute(sql_get, var_map).fetchall()
0595     #         if res_list is not None:
0596     #             if len(res_list) > 1:
0597     #                 tmp_log.error("more than one requests; unexpected")
0598     #             else:
0599     #                 for res in res_list:
0600     #                     dc_req_spec = DataCarouselRequestSpec()
0601     #                     dc_req_spec.pack(res)
0602     #         dc_req_txn = DataCarouselRequestTransaction(dc_req_spec, db_cur, db_log)
0603     #         # yield and run wrapped function
0604     #         yield dc_req_txn
0605 
0606     # @contextmanager
0607     # def request_lock_transaction_by_id(self, request_id: int, lock_expiration_sec: int = 120):
0608     #     """
0609     #     Context manager to lock and get a transaction for the Data Carousel request for update into DB
0610 
0611     #     Args:
0612     #         request_id (int): request ID of the Data Carousel request to acquire lock
0613     #         lock_expiration_sec (int): age of lock in seconds to be considered expired and can be acquired immediately
0614 
0615     #     Yields:
0616     #         DataCarouselRequestTransaction | None : request object specified by request_id if got lock; None if did not get lock
0617     #     """
0618     #     tmp_log = LogWrapper(logger, f"lock_request_for_update pid={self.full_pid} request_id={request_id}")
0619     #     #
0620     #     got_lock = False
0621     #     try:
0622     #         with self.taskBufferIF.transaction(name="DataCarouselRequestLock") as (db_cur, db_log):
0623     #             # try to get the lock
0624     #             sql_lock = (
0625     #                 f"SELECT request_id FROM {panda_config.schemaJEDI}.data_carousel_requests "
0626     #                 f"WHERE request_id=:request_id "
0627     #                 f"AND (locked_by IS NULL OR locked_by=:locked_by OR lock_time < :min_lock_time) "
0628     #             )
0629     #             var_map = {
0630     #                 ":request_id": request_id,
0631     #                 ":locked_by": self.full_pid,
0632     #                 ":min_lock_time": naive_utcnow() - timedelta(seconds=lock_expiration_sec),
0633     #             }
0634     #             res_list = db_cur.execute(sql_lock, var_map).fetchall()
0635     #             if res_list is not None:
0636     #                 if len(res_list) > 1:
0637     #                     tmp_log.error("more than one requests; unexpected")
0638     #                 elif len(res_list) == 0:
0639     #                     # no rows found; did not get the lock
0640     #                     db_log.debug(f"{self.full_pid} did not get lock for request_id={request_id}")
0641     #                 else:
0642     #                     # got the lock; update locked_by and lock_time
0643     #                     sql_update = (
0644     #                         f"UPDATE {panda_config.schemaJEDI}.data_carousel_requests "
0645     #                         f"SET locked_by=:locked_by, lock_time=:lock_time "
0646     #                         f"WHERE request_id=:request_id "
0647     #                     )
0648     #                     var_map = {
0649     #                         ":locked_by": self.full_pid,
0650     #                         ":lock_time": naive_utcnow(),
0651     #                         ":request_id": request_id,
0652     #                     }
0653     #                     db_cur.execute(sql_update, var_map)
0654     #                     got_lock = True
0655     #                     db_log.debug(f"{self.full_pid} got lock for request_id={request_id}")
0656     #             else:
0657     #                 # did not get the lock
0658     #                 db_log.debug(f"{self.full_pid} did not get lock for request_id={request_id}")
0659     #         if got_lock:
0660     #             # got the lock
0661     #             tmp_log.debug(f"got lock")
0662     #             with self.request_transaction_by_id(request_id) as dc_req_txn:
0663     #                 # check if request spec is None
0664     #                 if dc_req_txn.spec is None:
0665     #                     tmp_log.error(f"request_id={request_id} not found; skipped")
0666     #                     yield None
0667     #                 else:
0668     #                     # yield and let wrapped function run
0669     #                     yield dc_req_txn
0670     #         else:
0671     #             # did not get the lock
0672     #             tmp_log.debug(f"did not get lock for request_id={request_id}")
0673     #             yield None
0674     #     finally:
0675     #         if got_lock:
0676     #             # release the lock
0677     #             with self.taskBufferIF.transaction(name="DataCarouselRequestUnlock") as (db_cur, db_log):
0678     #                 sql_unlock = (
0679     #                     f"UPDATE {panda_config.schemaJEDI}.data_carousel_requests "
0680     #                     f"SET locked_by=NULL, lock_time=NULL "
0681     #                     f"WHERE request_id=:request_id AND locked_by=:locked_by "
0682     #                 )
0683     #                 var_map = {
0684     #                     ":request_id": request_id,
0685     #                     ":locked_by": self.full_pid,
0686     #                 }
0687     #                 db_cur.execute(sql_unlock, var_map)
0688     #                 db_log.debug(f"{self.full_pid} released lock for request_id={request_id}")
0689 
0690     @contextmanager
0691     def request_lock(self, request_id: int, lock_expiration_sec: int = 120):
0692         """
0693         Context manager to lock and unlock the Data Carousel request for update into DB
0694 
0695         Args:
0696             request_id (int): request ID of the Data Carousel request to acquire lock
0697             lock_expiration_sec (int): age of lock in seconds to be considered expired and can be acquired immediately
0698 
0699         Yields:
0700             DataCarouselRequestSpec|None : spec of the request if got lock; None if exception or failed
0701         """
0702         tmp_log = LogWrapper(logger, f"request_lock pid={self.full_pid} request_id={request_id}")
0703         #
0704         got_lock = None
0705         locked_spec = None
0706         try:
0707             # try to update locked_by and lock_time
0708             sql_lock = (
0709                 f"UPDATE {panda_config.schemaJEDI}.data_carousel_requests "
0710                 f"SET locked_by=:locked_by, lock_time=:lock_time "
0711                 f"WHERE request_id=:request_id "
0712                 f"AND (locked_by IS NULL OR locked_by=:locked_by OR lock_time < :min_lock_time) "
0713             )
0714             var_map = {
0715                 ":locked_by": self.full_pid,
0716                 ":lock_time": naive_utcnow(),
0717                 ":request_id": request_id,
0718                 ":min_lock_time": naive_utcnow() - timedelta(seconds=lock_expiration_sec),
0719             }
0720             row_count = self.taskBufferIF.querySQL(sql_lock, var_map)
0721             if row_count is None:
0722                 tmp_log.error(f"failed to update DB to lock; skipped")
0723             elif row_count > 1:
0724                 tmp_log.error(f"more than one requests updated to lock; unexpected")
0725             elif row_count == 0:
0726                 # no row updated; did not get the lock
0727                 got_lock = False
0728                 tmp_log.debug(f"did not get lock; skipped")
0729             else:
0730                 # got the lock
0731                 got_lock = True
0732                 tmp_log.debug(f"got lock")
0733             if got_lock:
0734                 # yield the updated request spec locked
0735                 locked_spec = self.get_request_by_id(request_id)
0736                 yield locked_spec
0737             else:
0738                 # did not get lock; still need to yield None as a generator
0739                 yield None
0740         finally:
0741             if got_lock:
0742                 # release the lock
0743                 sql_unlock = (
0744                     f"UPDATE {panda_config.schemaJEDI}.data_carousel_requests "
0745                     f"SET locked_by=NULL, lock_time=NULL "
0746                     f"WHERE request_id=:request_id AND locked_by=:locked_by "
0747                 )
0748                 var_map = {
0749                     ":request_id": request_id,
0750                     ":locked_by": self.full_pid,
0751                 }
0752                 row_count = self.taskBufferIF.querySQL(sql_unlock, var_map)
0753                 if row_count is None:
0754                     tmp_log.error(f"failed to update DB to unlock; skipped")
0755                 elif row_count > 1:
0756                     tmp_log.error(f"more than one requests updated to unlock; unexpected")
0757                 elif row_count == 0:
0758                     tmp_log.error(f"no request updated to unlock; skipped")
0759                 else:
0760                     tmp_log.debug(f"released lock")
0761 
0762     def _update_rses(self, time_limit_minutes: int | float = 30):
0763         """
0764         Update RSEs per TAPE and DATADISK cached in this object
0765         Run if cache outdated; else do nothing
0766         Check both DATADISK and DISK (including other DISK sources like SCRATCHDISK or LOCALDISK which are transient)
0767 
0768         Args:
0769             time_limit_minutes (int|float): time limit of the cache in minutes
0770         """
0771         tmp_log = LogWrapper(logger, "_update_rses")
0772         nickname = "rses"
0773         try:
0774             # check last update
0775             now_time = naive_utcnow()
0776             self._last_update_ts_dict.setdefault(nickname, None)
0777             last_update_ts = self._last_update_ts_dict[nickname]
0778             if last_update_ts is None or (now_time - last_update_ts) >= timedelta(minutes=time_limit_minutes):
0779                 # get RSEs from DDM
0780                 tape_rses = self.ddmIF.list_rses("rse_type=TAPE")
0781                 if tape_rses is not None:
0782                     self.tape_rses = list(tape_rses)
0783                 datadisk_rses = self.ddmIF.list_rses(f"type=DATADISK{AVAIL_REPLI_EXPR_SUFFIX}")
0784                 if datadisk_rses is not None:
0785                     self.datadisk_rses = list(datadisk_rses)
0786                 disk_rses = self.ddmIF.list_rses(f"rse_type=DISK{AVAIL_REPLI_EXPR_SUFFIX}")
0787                 if disk_rses is not None:
0788                     self.disk_rses = list(disk_rses)
0789                 # tmp_log.debug(f"TAPE: {self.tape_rses} ; DATADISK: {self.datadisk_rses} ; DISK: {self.disk_rses}")
0790                 # tmp_log.debug(f"got {len(self.tape_rses)} tapes , {len(self.datadisk_rses)} datadisks , {len(self.disk_rses)} disks")
0791                 # update last update timestamp
0792                 self._last_update_ts_dict[nickname] = naive_utcnow()
0793         except Exception:
0794             tmp_log.error(f"got error ; {traceback.format_exc()}")
0795 
0796     def _update_dc_config(self, time_limit_minutes: int | float = 5):
0797         """
0798         Update Data Carousel configuration from DB
0799         Run if cache outdated; else do nothing
0800 
0801         Args:
0802             time_limit_minutes (int|float): time limit of the cache in minutes
0803         """
0804         tmp_log = LogWrapper(logger, "_update_dc_config")
0805         nickname = "main"
0806         try:
0807             # check last update
0808             now_time = naive_utcnow()
0809             self._last_update_ts_dict.setdefault(nickname, None)
0810             last_update_ts = self._last_update_ts_dict[nickname]
0811             if last_update_ts is None or (now_time - last_update_ts) >= timedelta(minutes=time_limit_minutes):
0812                 # get DC config from DB
0813                 res_dict = self.taskBufferIF.getConfigValue("data_carousel", f"DATA_CAROUSEL_CONFIG", "jedi", "atlas")
0814                 if res_dict is None:
0815                     tmp_log.warning(f"got None from DB ; skipped")
0816                     return
0817                 # check schema version
0818                 try:
0819                     schema_version = res_dict["metadata"]["schema_version"]
0820                 except KeyError:
0821                     tmp_log.error(f"failed to get metadata.schema_version ; skipped")
0822                     return
0823                 else:
0824                     if schema_version != DC_CONFIG_SCHEMA_VERSION:
0825                         tmp_log.error(f"metadata.schema_version does not match ({schema_version} != {DC_CONFIG_SCHEMA_VERSION}); skipped")
0826                         return
0827                 # get config data
0828                 dc_config_data_dict = res_dict.get("data")
0829                 if dc_config_data_dict is None:
0830                     tmp_log.error(f"got empty config data; skipped")
0831                     return
0832                 # update
0833                 self.dc_config_map = DataCarouselMainConfig(**dc_config_data_dict)
0834                 # update last update timestamp
0835                 self._last_update_ts_dict[nickname] = naive_utcnow()
0836         except Exception:
0837             tmp_log.error(f"got error ; {traceback.format_exc()}")
0838 
0839     def _get_related_tasks(self, request_id: int) -> list[int] | None:
0840         """
0841         Get all related tasks to the give request
0842 
0843         Args:
0844             request_id (int): request_id of the request
0845 
0846         Returns:
0847             list[int]|None : list of jediTaskID of related tasks, or None if failed
0848         """
0849         # tmp_log = LogWrapper(logger, f"_get_related_tasks request_id={request_id}")
0850         sql = f"SELECT task_id " f"FROM {panda_config.schemaJEDI}.data_carousel_relations " f"WHERE request_id=:request_id " f"ORDER BY task_id "
0851         var_map = {":request_id": request_id}
0852         res = self.taskBufferIF.querySQL(sql, var_map, arraySize=99999)
0853         if res is not None:
0854             ret_list = [x[0] for x in res]
0855             return ret_list
0856         else:
0857             return None
0858 
0859     def _get_input_ds_from_task_params(self, task_params_map: dict) -> dict:
0860         """
0861         Get input datasets from tasks parameters
0862 
0863         Args:
0864             task_params_map (dict): task parameter map
0865 
0866         Returns:
0867             dict : map with elements in form of datasets: jobParameters
0868         """
0869         ret_map = {}
0870         for job_param in task_params_map.get("jobParameters", []):
0871             if job_param.get("param_type") in ["input", "pseudo_input"]:
0872                 # dataset names can be comma-separated
0873                 raw_dataset_str = job_param.get("dataset")
0874                 jobparam_dataset_list = []
0875                 if raw_dataset_str:
0876                     jobparam_dataset_list = raw_dataset_str.split(",")
0877                 for dataset in jobparam_dataset_list:
0878                     ret_map[dataset] = job_param
0879         return ret_map
0880 
0881     def _get_full_replicas_per_type(self, dataset: str) -> dict:
0882         """
0883         Get full replicas per type of a dataset
0884 
0885         Args:
0886             dataset (str): dataset name
0887 
0888         Returns:
0889             dict : map in form of datasets: jobParameters
0890         """
0891         # tmp_log = LogWrapper(logger, f"_get_full_replicas_per_type dataset={dataset}")
0892         ds_repli_dict = self.ddmIF.convert_list_dataset_replicas(dataset, use_file_lookup=True, skip_incomplete_element=True)
0893         tape_replicas = []
0894         datadisk_replicas = []
0895         disk_replicas = []
0896         for rse in ds_repli_dict:
0897             if rse in self.tape_rses:
0898                 tape_replicas.append(rse)
0899             if rse in self.datadisk_rses:
0900                 datadisk_replicas.append(rse)
0901             if rse in self.disk_rses:
0902                 disk_replicas.append(rse)
0903         # return
0904         ret = {"tape": tape_replicas, "datadisk": datadisk_replicas, "disk": disk_replicas}
0905         # tmp_log.debug(f"{ret}")
0906         return ret
0907 
0908     def _get_filtered_replicas(self, dataset: str) -> tuple[dict, (str | None), bool]:
0909         """
0910         Get filtered replicas of a dataset and the staging rule and whether all replicas are without rules
0911 
0912         Args:
0913             dataset (str): dataset name
0914 
0915         Returns:
0916             dict : filtered replicas map (rules considered)
0917             str | None : staging rule, None if not existing
0918             bool : whether all replicas on datadisk are without rules
0919             dict : original replicas map
0920         """
0921         replicas_map = self._get_full_replicas_per_type(dataset)
0922         rules = self.ddmIF.list_did_rules(dataset, all_accounts=True)
0923         rse_expression_list = []
0924         staging_rule = None
0925         if rules:
0926             for rule in rules:
0927                 if rule["account"] in ["panda"] and rule["activity"] in DDM_RULE_ACTIVITY_MAP.values():
0928                     # rule of the dataset from ProdSys or PanDA already exists; reuse it
0929                     staging_rule = rule
0930                 else:
0931                     rse_expression_list.append(rule["rse_expression"])
0932         filtered_replicas_map = {"tape": [], "datadisk": []}
0933         has_datadisk_replica = len(replicas_map["datadisk"]) > 0
0934         has_disk_replica = len(replicas_map["disk"]) > 0
0935         for replica in replicas_map["tape"]:
0936             if replica in rse_expression_list:
0937                 filtered_replicas_map["tape"].append(replica)
0938         if len(replicas_map["tape"]) >= 1 and len(filtered_replicas_map["tape"]) == 0 and len(rules) == 0:
0939             filtered_replicas_map["tape"] = replicas_map["tape"]
0940         for replica in replicas_map["datadisk"]:
0941             if staging_rule is not None or replica in rse_expression_list:
0942                 filtered_replicas_map["datadisk"].append(replica)
0943         all_disk_repli_ruleless = has_disk_replica and len(filtered_replicas_map["datadisk"]) == 0
0944         return filtered_replicas_map, staging_rule, all_disk_repli_ruleless, replicas_map
0945 
0946     def _get_datasets_from_collection(self, collection: str) -> list[str] | None:
0947         """
0948         Get a list of datasets from DDM collection (container or dataset) in order to support inputs of container containing multiple datasets
0949         If the collection is a dataset, the method returns a list of the sole dataset
0950         If the collection is a container, the method returns a list of datasets inside the container
0951 
0952         Args:
0953             collection (str): name of the DDM collection (container or dataset)
0954 
0955         Returns:
0956             list[str] | None : list of datasets if successful; None if failed with exception
0957         """
0958         tmp_log = LogWrapper(logger, f"_get_datasets_from_collections collection={collection}")
0959         try:
0960             # assure the DID format of the collection to be scope:dataset_name
0961             collection = self.ddmIF.get_did_str(collection)
0962             tmp_log = LogWrapper(logger, f"_get_datasets_from_collections collection={collection}")
0963             # check the collection
0964             ret_list = []
0965             collection_meta = self.ddmIF.get_dataset_metadata(collection, ignore_missing=True)
0966             if collection_meta is None:
0967                 # collection metadata not found
0968                 tmp_log.warning(f"collection metadata not found")
0969                 return None
0970             elif collection_meta["state"] == "missing":
0971                 # DID not found
0972                 tmp_log.warning(f"DID not found")
0973                 return None
0974             did_type = collection_meta["did_type"]
0975             if did_type == "CONTAINER":
0976                 # is container, get datasets inside
0977                 jobparam_dataset_list = self.ddmIF.list_datasets_in_container_JEDI(collection)
0978                 if jobparam_dataset_list is None:
0979                     tmp_log.warning(f"cannot list datasets in this container")
0980                 else:
0981                     ret_list = jobparam_dataset_list
0982             elif did_type == "DATASET":
0983                 # is dataset
0984                 ret_list = [collection]
0985             else:
0986                 tmp_log.warning(f"invalid DID type: {did_type}")
0987                 return None
0988         except Exception:
0989             tmp_log.error(f"got error ; {traceback.format_exc()}")
0990             return None
0991         return ret_list
0992 
0993     def _get_active_source_tapes(self) -> set[str] | None:
0994         """
0995         Get the set of active source physical tapes according to DC config
0996 
0997         Returns:
0998             set[str] | None : set of source tapes if successful; None if failed with exception
0999         """
1000         tmp_log = LogWrapper(logger, f"_get_active_source_tapes")
1001         try:
1002             active_source_tapes_set = {tape for tape, tape_config in self.dc_config_map.source_tapes_config.items() if tape_config.active}
1003         except Exception:
1004             # other unexpected errors
1005             tmp_log.error(f"got error ; {traceback.format_exc()}")
1006             return None
1007         else:
1008             return active_source_tapes_set
1009 
1010     def _get_active_source_rses(self) -> set[str] | None:
1011         """
1012         Get the set of active source RSEs according to DC config
1013 
1014         Returns:
1015             set[str] | None : set of source RSEs if successful; None if failed with exception
1016         """
1017         tmp_log = LogWrapper(logger, f"_get_active_source_tapes")
1018         try:
1019             active_source_tapes = self._get_active_source_tapes()
1020             active_source_rses_set = set()
1021             for rse, rse_config in self.dc_config_map.source_rses_config.items():
1022                 try:
1023                     # both physical tape and RSE are active
1024                     if rse_config.tape in active_source_tapes and rse_config.active is not False:
1025                         active_source_rses_set.add(rse)
1026                 except Exception:
1027                     # errors with the rse
1028                     tmp_log.error(f"got error with {rse} ; {traceback.format_exc()}")
1029                     continue
1030         except Exception:
1031             # other unexpected errors
1032             tmp_log.error(f"got error ; {traceback.format_exc()}")
1033             return None
1034         else:
1035             return active_source_rses_set
1036 
1037     def _get_source_type_of_dataset(
1038         self, dataset: str, active_source_rses_set: set | None = None
1039     ) -> tuple[(str | None), (set | None), (str | None), bool, list]:
1040         """
1041         Get source type and permanent (tape or datadisk) RSEs of a dataset
1042 
1043         Args:
1044             dataset (str): dataset name
1045             active_source_rses_set (set | None): active source RSE set to reuse. If None, will get a new one in the method
1046 
1047         Returns:
1048             str | None : source type of the dataset, "datadisk" if replica on any datadisk, "tape" if replica only on tapes, None if not found
1049             set | None : set of permanent RSEs, otherwise None
1050             str | None : staging rule if existing, otherwise None
1051             bool : whether to pin the dataset
1052             list : list of suggested destination RSEs; currently only datadisks with full replicas to pin
1053         """
1054         tmp_log = LogWrapper(logger, f"_get_source_type_of_dataset dataset={dataset}")
1055         try:
1056             # initialize
1057             source_type = None
1058             rse_set = set()
1059             to_pin = False
1060             suggested_destination_rses_set = set()
1061             # get active source rses
1062             if active_source_rses_set is None:
1063                 active_source_rses_set = self._get_active_source_rses()
1064             # get filtered replicas and staging rule of the dataset
1065             filtered_replicas_map, staging_rule, all_disk_repli_ruleless, orig_replicas_map = self._get_filtered_replicas(dataset)
1066             # algorithm
1067             if filtered_replicas_map["datadisk"]:
1068                 # replicas already on datadisk and with rule
1069                 source_type = "datadisk"
1070                 # source datadisk RSEs from DDM
1071                 rse_set = {replica for replica in filtered_replicas_map["datadisk"]}
1072             elif filtered_replicas_map["tape"]:
1073                 # replicas on tape and without rule to pin it on datadisk
1074                 source_type = "tape"
1075                 # source tape RSEs from DDM
1076                 rse_set = {replica for replica in filtered_replicas_map["tape"]}
1077                 # filter out inactive source tape RSEs according to DC config
1078                 if active_source_rses_set is not None:
1079                     rse_set &= active_source_rses_set
1080                 # condiser unfound if no active source tape
1081                 if not rse_set:
1082                     source_type = None
1083                     tmp_log.warning(f"all its source tapes are inactive")
1084                 # dataset pinning
1085                 if all_disk_repli_ruleless:
1086                     # replica on disks but without rule to pin on datadisk; to pin the dataset to datadisk
1087                     to_pin = True
1088                     suggested_destination_rses_set |= set(orig_replicas_map["datadisk"])
1089             else:
1090                 # no replica found on tape nor on datadisk (can be on transient disk); skip
1091                 pass
1092             # return
1093             tmp_log.debug(
1094                 f"source_type={source_type} rse_set={rse_set} staging_rule={staging_rule} to_pin={to_pin} "
1095                 f"suggested_destination_rses_set={suggested_destination_rses_set}"
1096             )
1097             return (source_type, rse_set, staging_rule, to_pin, list(suggested_destination_rses_set))
1098         except Exception as e:
1099             # other unexpected errors
1100             raise e
1101 
1102     def _choose_tape_source_rse(self, dataset: str, rse_set: set, staging_rule, no_cern: bool = True) -> tuple[str, (str | None), (str | None)]:
1103         """
1104         Choose a TAPE source RSE
1105         If with exsiting staging rule, then get source RSE from it
1106 
1107         Args:
1108             dataset (str): dataset name
1109             rse_set (set): set of TAPE source RSE set to choose from
1110             staging_rule: DDM staging rule
1111             no_cern: skip CERN-PROD RSE whenever possible if True
1112 
1113         Returns:
1114             str: the dataset name
1115             str | None : source RSE found or chosen, None if failed
1116             str | None : DDM rule ID of staging rule if existing, otherwise None
1117         """
1118         tmp_log = LogWrapper(logger, f"_choose_tape_source_rse dataset={dataset}")
1119         try:
1120             # initialize
1121             ddm_rule_id = None
1122             source_rse = None
1123             random_choose = False
1124             # whether with existing staging rule
1125             if staging_rule:
1126                 # with existing staging rule ; prepare to reuse it
1127                 ddm_rule_id = staging_rule["id"]
1128                 # extract source RSE from rule
1129                 source_replica_expression = staging_rule["source_replica_expression"]
1130                 if source_replica_expression is not None:
1131                     for rse in rse_set:
1132                         # match tape rses with source_replica_expression
1133                         tmp_match = re.search(rse, source_replica_expression)
1134                         if tmp_match is not None:
1135                             source_rse = rse
1136                             tmp_log.debug(f"got source_rse={source_rse} from rse_set matching ddm rule source_replica_expression={source_replica_expression}")
1137                             break
1138                     if source_rse is None:
1139                         # direct regex search from source_replica_expression; reluctant as source_replica_expression can be messy
1140                         tmp_match = re.search(rf"{SRC_REPLI_EXPR_PREFIX}\|([A-Za-z0-9-_]+)", source_replica_expression)
1141                         if tmp_match is not None:
1142                             source_rse = tmp_match.group(1)
1143                             tmp_log.debug(f"directly got source_rse={source_rse} only from ddm rule source_replica_expression={source_replica_expression}")
1144                     if source_rse is None:
1145                         # still not getting source RSE from rule; unexpected
1146                         tmp_log.error(f"ddm_rule_id={ddm_rule_id} cannot get source_rse from source_replica_expression: {source_replica_expression}")
1147                         raise RuntimeError("cannot get source_rse from staging rule's source_replica_expression")
1148                     else:
1149                         tmp_log.debug(f"already staging with ddm_rule_id={ddm_rule_id} source_rse={source_rse}")
1150                 else:
1151                     # no source_replica_expression of the rule; choose any source
1152                     tmp_log.warning(f"already staging with ddm_rule_id={ddm_rule_id} without source_replica_expression; to choose a random source_rse")
1153                     random_choose = True
1154                 # keep alive the rule
1155                 if (rule_expiration_time := staging_rule["expires_at"]) and (rule_expiration_time - naive_utcnow()) < timedelta(days=DONE_LIFETIME_DAYS):
1156                     self._refresh_ddm_rule(ddm_rule_id, 86400 * DONE_LIFETIME_DAYS)
1157                     tmp_log.debug(f"ddm_rule_id={ddm_rule_id} refreshed rule to be {DONE_LIFETIME_DAYS} days long")
1158             else:
1159                 # no existing staging rule; to choose randomly
1160                 random_choose = True
1161             if random_choose:
1162                 # no existing staging rule or cannot get from source_replica_expression; choose source_rse randomly
1163                 rse_list = list(rse_set)
1164                 # choose source RSE
1165                 if len(rse_list) == 1:
1166                     source_rse = rse_list[0]
1167                 elif len(rse_list) == 0:
1168                     tmp_log.error("no source_rse found to choose from")
1169                     raise RuntimeError("no source_rse found to choose from")
1170                 else:
1171                     non_CERN_rse_list = [rse for rse in rse_list if "CERN-PROD" not in rse]
1172                     if non_CERN_rse_list and no_cern:
1173                         # choose non-CERN-PROD source RSE
1174                         source_rse = random.choice(non_CERN_rse_list)
1175                     else:
1176                         source_rse = random.choice(rse_list)
1177                 tmp_log.debug(f"chose source_rse={source_rse}")
1178             # add to prestage
1179             return (dataset, source_rse, ddm_rule_id)
1180         except Exception as e:
1181             # other unexpected errors
1182             raise e
1183 
1184     def _get_source_tape_from_rse(self, source_rse: str) -> str:
1185         """
1186         Get the source tape of a source RSE
1187 
1188         Args:
1189             source_rse (str): name of the source RSE
1190 
1191         Returns:
1192             str : source tape
1193         """
1194         try:
1195             # source_rse is RSE
1196             source_tape = self.dc_config_map.source_rses_config[source_rse].tape
1197         except KeyError:
1198             # source_rse is physical tape
1199             source_tape = source_rse
1200         return source_tape
1201 
1202     @refresh
1203     def get_input_datasets_to_prestage(self, task_id: int, task_params_map: dict, dsname_list: list | None = None) -> tuple[list, dict]:
1204         """
1205         Get the input datasets, their source RSEs (tape) of the task which need pre-staging from tapes, and DDM rule ID of existing DDM rule
1206 
1207         Args:
1208             task_id (int): JEDI task ID of the task params
1209             task_params_map (dict): task params of the JEDI task
1210             dsname_list (list|None): if not None, filter only datasets in this list of dataset names to stage, may also including extra datasets if the task is resubmitted/rerefined
1211 
1212         Returns:
1213             list[tuple[str, str|None, str|None]]: list of tuples in the form of (dataset, source_rse, ddm_rule_id)
1214             dict[str|list]: dict of list of datasets, including pseudo inputs (meant to be marked as no_staging), already on datadisk (meant to be marked as no_staging), only on tape, and not found
1215         """
1216         tmp_log = LogWrapper(logger, f"get_input_datasets_to_prestage task_id={task_id}")
1217         try:
1218             # initialize
1219             all_input_datasets_set = set()
1220             jobparam_ds_coll_map = {}
1221             extra_datasets_set = set()
1222             raw_coll_list = []
1223             raw_coll_did_list = []
1224             coll_on_tape_set = set()
1225             ret_prestaging_list = []
1226             ret_map = {
1227                 "pseudo_coll_list": [],
1228                 "unfound_coll_list": [],
1229                 "empty_coll_list": [],
1230                 "tape_coll_did_list": [],
1231                 "no_tape_coll_did_list": [],
1232                 "to_skip_ds_list": [],
1233                 "to_reuse_staging_ds_list": [],
1234                 "to_reuse_staged_ds_list": [],
1235                 "tape_ds_list": [],
1236                 "datadisk_ds_list": [],
1237                 "to_pin_ds_list": [],
1238                 "unfound_ds_list": [],
1239                 "related_dcreq_ids": [],
1240             }
1241             # get active source rses
1242             active_source_rses_set = self._get_active_source_rses()
1243             # loop over inputs defined in task's job parameters
1244             input_collection_map = self._get_input_ds_from_task_params(task_params_map)
1245             for collection, job_param in input_collection_map.items():
1246                 # pseudo inputs
1247                 if job_param.get("param_type") == "pseudo_input":
1248                     ret_map["pseudo_coll_list"].append(collection)
1249                     tmp_log.debug(f"collection={collection} is pseudo input ; skipped")
1250                     continue
1251                 # with real inputs
1252                 raw_coll_list.append(collection)
1253                 raw_coll_did_list.append(self.ddmIF.get_did_str(collection))
1254                 jobparam_dataset_list = self._get_datasets_from_collection(collection)
1255                 if jobparam_dataset_list is None:
1256                     ret_map["unfound_coll_list"].append(collection)
1257                     tmp_log.warning(f"collection={collection} not found")
1258                     continue
1259                 elif not jobparam_dataset_list:
1260                     ret_map["empty_coll_list"].append(collection)
1261                     tmp_log.warning(f"collection={collection} is empty")
1262                     continue
1263                 # with contents to consider
1264                 for dataset in jobparam_dataset_list:
1265                     jobparam_ds_coll_map[dataset] = collection
1266             # merge of jobparam_dataset_list and dnsname_list
1267             jobparam_datasets_set = set(jobparam_ds_coll_map.keys())
1268             all_input_datasets_set |= jobparam_datasets_set
1269             if dsname_list is not None:
1270                 # dsname_list is given; filter out extra container slash
1271                 master_datasets_set = set([dsname for dsname in dsname_list if not dsname.endswith("/")])
1272                 # extra dataset not in job parameters when task resubmitted/rerefined
1273                 extra_datasets_set = master_datasets_set - jobparam_datasets_set - set(raw_coll_did_list)
1274                 all_input_datasets_set |= extra_datasets_set
1275             all_input_datasets_list = sorted(list(all_input_datasets_set))
1276             if extra_datasets_set:
1277                 tmp_log.debug(f"datasets appended for incexec: {sorted(list(extra_datasets_set))}")
1278             # check source of each dataset
1279             for dataset in all_input_datasets_list:
1280                 # check if dataset in the required dsname_list
1281                 if dsname_list is not None and dataset not in dsname_list:
1282                     # not in dsname_list; skip
1283                     ret_map["to_skip_ds_list"].append(dataset)
1284                     tmp_log.debug(f"dataset={dataset} not in dsname_list ; skipped")
1285                     continue
1286                 # check if already in existing Data Carousel requests
1287                 existing_dcreq_id = self.taskBufferIF.get_data_carousel_request_id_by_dataset_JEDI(dataset)
1288                 if existing_dcreq_id is not None:
1289                     ret_map["related_dcreq_ids"].append(existing_dcreq_id)
1290                 # get source type and RSEs
1291                 source_type, rse_set, staging_rule, to_pin, suggested_dst_list = self._get_source_type_of_dataset(dataset, active_source_rses_set)
1292                 if staging_rule:
1293                     # reuse existing DDM rule
1294                     if collection := jobparam_ds_coll_map.get(dataset):
1295                         coll_on_tape_set.add(collection)
1296                     _, source_rse, ddm_rule_id = self._choose_tape_source_rse(dataset, rse_set, staging_rule)
1297                     tmp_log.debug(f"dataset={dataset} has existing ddm_rule_id={ddm_rule_id} ; to reuse it")
1298                     prestaging_tuple = (dataset, source_rse, ddm_rule_id, to_pin, suggested_dst_list)
1299                     tmp_log.debug(f"got prestaging for existing rule: {prestaging_tuple}")
1300                     # add to prestage
1301                     ret_prestaging_list.append(prestaging_tuple)
1302                     # whether already staged
1303                     if staging_rule.get("state") == "OK":
1304                         # already staged
1305                         ret_map["to_reuse_staged_ds_list"].append(dataset)
1306                         tmp_log.debug(f"dataset={dataset} ddm_rule_id={ddm_rule_id} already staged")
1307                     else:
1308                         # still staging
1309                         ret_map["to_reuse_staging_ds_list"].append(dataset)
1310                         tmp_log.debug(f"dataset={dataset} ddm_rule_id={ddm_rule_id} still staging")
1311                 elif source_type == "datadisk":
1312                     # replicas already on datadisk; skip
1313                     ret_map["datadisk_ds_list"].append(dataset)
1314                     tmp_log.debug(f"dataset={dataset} already has replica on datadisks {rse_set} ; skipped")
1315                     continue
1316                 elif source_type == "tape":
1317                     # replicas only on tape
1318                     ret_map["tape_ds_list"].append(dataset)
1319                     tmp_log.debug(f"dataset={dataset} on tapes {rse_set} ; choosing one")
1320                     if collection := jobparam_ds_coll_map.get(dataset):
1321                         coll_on_tape_set.add(collection)
1322                     # choose source RSE
1323                     _, source_rse, ddm_rule_id = self._choose_tape_source_rse(dataset, rse_set, staging_rule)
1324                     prestaging_tuple = (dataset, source_rse, ddm_rule_id, to_pin, suggested_dst_list)
1325                     tmp_log.debug(f"got prestaging: {prestaging_tuple}")
1326                     # add to prestage
1327                     ret_prestaging_list.append(prestaging_tuple)
1328                     # dataset to pin
1329                     if to_pin:
1330                         ret_map["to_pin_ds_list"].append(dataset)
1331                 else:
1332                     # no replica found on tape nor on datadisk; skip
1333                     ret_map["unfound_ds_list"].append(dataset)
1334                     tmp_log.debug(f"dataset={dataset} has no replica on any tape or datadisk ; skipped")
1335                     continue
1336             # collection DID without datasets on tape
1337             for collection in raw_coll_list:
1338                 collection_did = self.ddmIF.get_did_str(collection)
1339                 if collection in coll_on_tape_set:
1340                     ret_map["tape_coll_did_list"].append(collection_did)
1341                 else:
1342                     ret_map["no_tape_coll_did_list"].append(collection_did)
1343             # return
1344             tmp_log.debug(f"got {len(ret_prestaging_list)} input datasets to prestage and {len(ret_map['related_dcreq_ids'])} related existing requests")
1345             return ret_prestaging_list, ret_map
1346         except Exception as e:
1347             tmp_log.error(f"got error ; {traceback.format_exc()}")
1348             raise e
1349 
1350     def _fill_total_files_and_size(self, dc_req_spec: DataCarouselRequestSpec) -> bool | None:
1351         """
1352         Fill total files and dataset size of the Data Carousel request spec
1353 
1354         Args:
1355             dc_req_spec (DataCarouselRequestSpec): Data Carousel request spec
1356 
1357         Returns:
1358             bool|None : True if successful, None if getting None from DDM, False if error occurs
1359         """
1360         try:
1361             tmp_log = LogWrapper(logger, f"_fill_total_files_and_size request_id={dc_req_spec.request_id}")
1362             # get dataset metadata
1363             dataset_meta = self.ddmIF.get_dataset_metadata(dc_req_spec.dataset)
1364             # fill
1365             dc_req_spec.total_files = dataset_meta["length"]
1366             dc_req_spec.dataset_size = dataset_meta["bytes"]
1367             if dc_req_spec.total_files is not None:
1368                 return True
1369             else:
1370                 return None
1371         except Exception as e:
1372             tmp_log.error(f"failed to fill total files and size; {e}")
1373             return False
1374 
1375     def submit_data_carousel_requests(
1376         self, task_id: int, prestaging_list: list[tuple[str, str | None, str | None]], options: dict | None = None, submit_idds_request: bool = True
1377     ) -> bool | None:
1378         """
1379         Submit data carousel requests for a task
1380 
1381         Args:
1382             task_id (int): JEDI task ID
1383             prestaging_list (list[tuple[str, str|None, str|None, bool, list|None]]): list of tuples in the form of (dataset, source_rse, ddm_rule_id, to_pin, suggested_dst_list)
1384             options (dict|None): extra options for submission
1385             submit_idds_request (bool): whether to submit iDDS staging requests for the datasets already with DDM rules; default True
1386 
1387         Returns:
1388             bool | None : True if submission successful, or None if failed
1389         """
1390         tmp_log = LogWrapper(logger, f"submit_data_carousel_requests task_id={task_id}")
1391         n_req_to_submit = len(prestaging_list)
1392         log_str = f"to submit {len(prestaging_list)} requests"
1393         if options:
1394             log_str = f"options={options} " + log_str
1395         tmp_log.debug(log_str)
1396         # fill dc request spec for each input dataset
1397         dc_req_spec_list = []
1398         now_time = naive_utcnow()
1399         for dataset, source_rse, ddm_rule_id, to_pin, suggested_dst_list in prestaging_list:
1400             dc_req_spec = DataCarouselRequestSpec()
1401             dc_req_spec.dataset = dataset
1402             self._fill_total_files_and_size(dc_req_spec)
1403             dc_req_spec.staged_files = 0
1404             dc_req_spec.staged_size = 0
1405             dc_req_spec.ddm_rule_id = ddm_rule_id
1406             dc_req_spec.source_rse = source_rse
1407             dc_req_spec.source_tape = self._get_source_tape_from_rse(dc_req_spec.source_rse)
1408             if to_pin:
1409                 # to pin the dataset; set to_pin in parameter
1410                 dc_req_spec.set_parameter("to_pin", True)
1411             dc_req_spec.status = DataCarouselRequestStatus.queued
1412             dc_req_spec.creation_time = now_time
1413             if dc_req_spec.ddm_rule_id:
1414                 # already with DDM rule; go to staging directly
1415                 dc_req_spec.status = DataCarouselRequestStatus.staging
1416                 dc_req_spec.start_time = now_time
1417                 dc_req_spec.set_parameter("reuse_rule", True)
1418                 # to submit iDDS staging requests about the task and existing DDM rule
1419                 if submit_idds_request:
1420                     # get all tasks related to this request
1421                     self._submit_idds_stagein_request(task_id, dc_req_spec)
1422                     tmp_log.debug(f"submitted corresponding iDDS request for this task and existing ddm_rule_id={dc_req_spec.ddm_rule_id}")
1423             if suggested_dst_list:
1424                 dc_req_spec.set_parameter("suggested_dst_list", suggested_dst_list)
1425             # options
1426             if options:
1427                 if options.get("remove_when_done"):
1428                     # remove rule when done
1429                     dc_req_spec.set_parameter("remove_when_done", True)
1430                 if task_type := options.get("task_type"):
1431                     dc_req_spec.set_parameter("task_type", task_type)
1432             # append to list
1433             dc_req_spec_list.append(dc_req_spec)
1434         # insert dc requests for the task
1435         n_req_inserted = self.taskBufferIF.insert_data_carousel_requests_JEDI(task_id, dc_req_spec_list)
1436         tmp_log.info(f"submitted {n_req_inserted}/{n_req_to_submit} requests")
1437         ret = n_req_inserted is not None
1438         # return
1439         return ret
1440 
1441     def add_data_carousel_relations(self, task_id: int, request_ids: list[int] | None) -> bool:
1442         """
1443         Add Data Carousel relations for a task
1444 
1445         Args:
1446             task_id (int): JEDI task ID
1447             request_ids (list[int]|None): list of Data Carousel request IDs to relate; None to skip
1448 
1449         Returns:
1450             bool : True if successful, or False if failed
1451         """
1452         tmp_log = LogWrapper(logger, f"add_data_carousel_relations task_id={task_id}")
1453         if request_ids is None:
1454             tmp_log.debug(f"request_ids is None ; skipped")
1455             return True
1456         n_req_to_add = len(request_ids)
1457         tmp_log.debug(f"to add {n_req_to_add} relations")
1458         n_req_added = self.taskBufferIF.insert_data_carousel_relations_JEDI(task_id, request_ids)
1459         tmp_log.info(f"added {n_req_added}/{n_req_to_add} relations")
1460         return n_req_added is not None
1461 
1462     def _get_dc_requests_table_dataframe(self) -> pl.DataFrame | None:
1463         """
1464         Get Data Carousel requests table as dataframe for statistics
1465 
1466         Returns:
1467             polars.DataFrame|None : dataframe of current Data Carousel requests table if successful, or None if failed
1468         """
1469         sql = f"SELECT {','.join(DataCarouselRequestSpec.attributes)} " f"FROM {panda_config.schemaJEDI}.data_carousel_requests " f"ORDER BY request_id "
1470         var_map = {}
1471         res = self.taskBufferIF.querySQL(sql, var_map, arraySize=99999)
1472         if res is not None:
1473             dc_req_df = pl.DataFrame(res, schema=DataCarouselRequestSpec.attributes_with_types, orient="row")
1474             return dc_req_df
1475         else:
1476             return None
1477 
1478     def _get_source_tapes_config_dataframe(self) -> pl.DataFrame:
1479         """
1480         Get source tapes config as dataframe
1481 
1482         Returns:
1483             polars.DataFrame : dataframe of source tapes config
1484         """
1485         tmp_list = []
1486         for k, v in self.dc_config_map.source_tapes_config.items():
1487             tmp_dict = {"source_tape": k}
1488             tmp_dict.update(asdict(v))
1489             tmp_list.append(tmp_dict)
1490         source_tapes_config_df = pl.DataFrame(tmp_list)
1491         return source_tapes_config_df
1492 
1493     def _get_source_rses_config_dataframe(self) -> pl.DataFrame:
1494         """
1495         Get source RSEs config as dataframe
1496 
1497         Returns:
1498             polars.DataFrame : dataframe of source RSEs config
1499         """
1500         tmp_list = []
1501         for k, v in self.dc_config_map.source_rses_config.items():
1502             tmp_dict = {"source_rse": k}
1503             tmp_dict.update(asdict(v))
1504             tmp_list.append(tmp_dict)
1505         source_rses_config_df = pl.DataFrame(tmp_list)
1506         return source_rses_config_df
1507 
1508     def _get_source_tape_stats_dataframe(self) -> pl.DataFrame | None:
1509         """
1510         Get statistics of source tapes as dataframe
1511 
1512         Returns:
1513             polars.DataFrame : dataframe of statistics of source tapes if successful, or None if failed
1514         """
1515         # get Data Carousel requests dataframe of staging requests
1516         dc_req_df = self._get_dc_requests_table_dataframe()
1517         if dc_req_df is None:
1518             return None
1519         dc_req_df = dc_req_df.filter(pl.col("status") == DataCarouselRequestStatus.staging)
1520         # get columns from parameters; note that str.json_path_match() always casts to string
1521         dc_req_df = dc_req_df.with_columns(
1522             to_pin_str=pl.col("parameters").str.json_path_match(r"$.to_pin").fill_null(False),
1523             task_gshare=pl.col("parameters").str.json_path_match(r"$.task_gshare"),
1524             init_task_gshare=pl.col("parameters").str.json_path_match(r"$.init_task_gshare"),
1525         )
1526         # get source tapes and RSEs config dataframes
1527         source_tapes_config_df = self._get_source_tapes_config_dataframe()
1528         # source_rses_config_df = self._get_source_rses_config_dataframe()
1529         # dataframe of staging requests with physical tapes, ignoring to_pin requests
1530         # dc_req_full_df = dc_req_df.join(source_rses_config_df.select("source_rse", "tape"), on="source_rse", how="left")
1531         dc_req_full_df = dc_req_df.filter(pl.col("to_pin_str") != "true")
1532         # dataframe of source RSE stats; add staging_files as the remaining files to finish
1533         source_rse_stats_df = (
1534             dc_req_full_df.select(
1535                 "source_tape",
1536                 "total_files",
1537                 "staged_files",
1538                 (pl.col("total_files") - pl.col("staged_files")).alias("staging_files"),
1539             )
1540             .group_by("source_tape")
1541             .sum()
1542         )
1543         # make dataframe of source tapes stats; add quota_size
1544         df = source_tapes_config_df.join(source_rse_stats_df, on="source_tape", how="left")
1545         df = df.with_columns(
1546             pl.col("total_files").fill_null(strategy="zero"),
1547             pl.col("staged_files").fill_null(strategy="zero"),
1548             pl.col("staging_files").fill_null(strategy="zero"),
1549         )
1550         df = df.with_columns(quota_size=(pl.col("max_size") - pl.col("staging_files")))
1551         source_tape_stats_df = df
1552         # another dataframe of source RSE stats broken down by gshare
1553         source_rse_gshare_stats_df = (
1554             dc_req_full_df.select(
1555                 "source_tape",
1556                 "init_task_gshare",
1557                 "total_files",
1558                 "staged_files",
1559                 (pl.col("total_files") - pl.col("staged_files")).alias("staging_files"),
1560             )
1561             .group_by(["source_tape", "init_task_gshare"])
1562             .sum()
1563             .sort(["source_tape", "init_task_gshare"], nulls_last=True)
1564         ).with_columns(
1565             pl.col("total_files").fill_null(strategy="zero"),
1566             pl.col("staged_files").fill_null(strategy="zero"),
1567             pl.col("staging_files").fill_null(strategy="zero"),
1568         )
1569         # return final dataframes
1570         return source_tape_stats_df, source_rse_gshare_stats_df
1571 
1572     def _get_gshare_stats(self) -> dict:
1573         """
1574         Get current gshare stats
1575 
1576         Returns:
1577             dict : dictionary of gshares
1578         """
1579         # get share and hs info
1580         gshare_status = self.taskBufferIF.getGShareStatus()
1581         # initialize
1582         gshare_dict = dict()
1583         # rank and data
1584         for idx, leaf in enumerate(gshare_status):
1585             rank = idx + 1
1586             gshare = leaf["name"]
1587             gshare_dict[gshare] = {
1588                 "gshare": gshare,
1589                 "rank": rank,
1590                 "queuing_hs": leaf["queuing"],
1591                 "running_hs": leaf["running"],
1592                 "target_hs": leaf["target"],
1593                 "usage_perc": leaf["running"] / leaf["target"] if leaf["target"] > 0 else 999999,
1594                 "queue_perc": leaf["queuing"] / leaf["target"] if leaf["target"] > 0 else 999999,
1595             }
1596         # return
1597         return gshare_dict
1598 
1599     def _queued_requests_tasks_to_dataframe(self, queued_requests: list | None) -> pl.DataFrame:
1600         """
1601         Transfrom Data Carousel queue requests and their tasks into dataframe
1602 
1603         Args:
1604             queued_requests (list|None): list of tuples in form of (queued_request, [taskspec1, taskspec2, ...])
1605 
1606         Returns:
1607             polars.DataFrame : dataframe of queued requests
1608         """
1609         # get source RSEs config for tape mapping
1610         # source_rses_config_df = self._get_source_rses_config_dataframe()
1611         # get current gshare rank
1612         gshare_dict = self._get_gshare_stats()
1613         gshare_rank_dict = {k: v["rank"] for k, v in gshare_dict.items()}
1614         # make dataframe of queued requests and their tasks
1615         tmp_list = []
1616         for dc_req_spec, task_specs in queued_requests:
1617             for task_spec in task_specs:
1618                 tmp_dict = {
1619                     "request_id": dc_req_spec.request_id,
1620                     "dataset": dc_req_spec.dataset,
1621                     "source_rse": dc_req_spec.source_rse,
1622                     "source_tape": dc_req_spec.source_tape,
1623                     "to_pin": dc_req_spec.get_parameter("to_pin"),
1624                     "total_files": dc_req_spec.total_files,
1625                     "dataset_size": dc_req_spec.dataset_size,
1626                     "jediTaskID": task_spec.jediTaskID,
1627                     "taskType": task_spec.taskType,
1628                     "userName": task_spec.userName,
1629                     "workingGroup": task_spec.workingGroup,
1630                     "gshare": task_spec.gshare,
1631                     "gshare_rank": gshare_rank_dict.get(task_spec.gshare, 999),
1632                     "task_priority": task_spec.currentPriority if task_spec.currentPriority else (task_spec.taskPriority if task_spec.taskPriority else 1000),
1633                 }
1634                 tmp_list.append(tmp_dict)
1635         df = pl.DataFrame(
1636             tmp_list,
1637             schema={
1638                 "request_id": pl.datatypes.Int64,
1639                 "dataset": pl.datatypes.String,
1640                 "source_rse": pl.datatypes.String,
1641                 "source_tape": pl.datatypes.String,
1642                 "to_pin": pl.datatypes.Boolean,
1643                 "total_files": pl.datatypes.Int64,
1644                 "dataset_size": pl.datatypes.Int64,
1645                 "jediTaskID": pl.datatypes.Int64,
1646                 "taskType": pl.datatypes.String,
1647                 "userName": pl.datatypes.String,
1648                 "workingGroup": pl.datatypes.String,
1649                 "gshare": pl.datatypes.String,
1650                 "gshare_rank": pl.datatypes.Int64,
1651                 "task_priority": pl.datatypes.Int64,
1652             },
1653         )
1654         # fill null
1655         df = df.with_columns(
1656             pl.col("to_pin").fill_null(value=False),
1657             pl.col("total_files").fill_null(strategy="zero"),
1658             pl.col("dataset_size").fill_null(strategy="zero"),
1659         )
1660         # join to add physical tape
1661         # df = df.join(source_rses_config_df.select("source_rse", "tape"), on="source_rse", how="left")
1662         # return final dataframe
1663         queued_requests_tasks_df = df
1664         return queued_requests_tasks_df
1665 
1666     @refresh
1667     def get_requests_to_stage(self, *args, **kwargs) -> list[tuple[DataCarouselRequestSpec, dict]]:
1668         """
1669         Get the queued requests which should proceed to get staging
1670 
1671         Args:
1672             ? (?): ?
1673 
1674         Returns:
1675             list[tuple[DataCarouselRequestSpec, dict]] : list of requests to stage and dict of extra parameters to set before to stage
1676         """
1677         tmp_log = LogWrapper(logger, "get_requests_to_stage")
1678         ret_list = []
1679         queued_requests = self.taskBufferIF.get_data_carousel_queued_requests_JEDI()
1680         if queued_requests is None or not queued_requests:
1681             tmp_log.debug(f"no requests to stage or to pin ; skipped")
1682             return ret_list
1683         # get stats of tapes
1684         source_tape_stats_df, source_rse_gshare_stats_df = self._get_source_tape_stats_dataframe()
1685         # tmp_log.debug(f"source_tape_stats_df: \n{source_tape_stats_df}")
1686         tmp_log.debug(f"source_rse_gshare_stats_df: \n{source_rse_gshare_stats_df.select(['source_tape', 'init_task_gshare', 'total_files', 'staging_files'])}")
1687         source_tape_stats_dict_list = source_tape_stats_df.to_dicts()
1688         # map of request_id and dc_req_spec of queued requests
1689         request_id_spec_map = {dc_req_spec.request_id: dc_req_spec for dc_req_spec, _ in queued_requests}
1690         # get dataframe of queued requests and tasks
1691         queued_requests_tasks_df = self._queued_requests_tasks_to_dataframe(queued_requests)
1692         # sort queued requests : by to_pin, gshare_rank, task_priority, jediTaskID, request_id
1693         df = queued_requests_tasks_df.sort(["to_pin", "gshare_rank", "task_priority", "request_id"], descending=[True, False, True, False], nulls_last=True)
1694         # get unique requests with the sorted order
1695         df = df.unique(subset=["request_id"], keep="first", maintain_order=True)
1696         # evaluate per tape
1697         queued_requests_df = df
1698         for source_tape_stats_dict in source_tape_stats_dict_list:
1699             source_tape = source_tape_stats_dict["source_tape"]
1700             quota_size = source_tape_stats_dict["quota_size"]
1701             # dataframe of the physical tape
1702             tmp_df = queued_requests_df.filter(pl.col("source_tape") == source_tape)
1703             # split with to_pin and not to_pin
1704             to_pin_df = tmp_df.filter(pl.col("to_pin"))
1705             tmp_queued_df = tmp_df.filter(pl.col("to_pin").not_())
1706             # fill dummy cumulative sum (0) for reqeusts to pin
1707             to_pin_df = to_pin_df.with_columns(cum_total_files=pl.lit(0, dtype=pl.datatypes.Int64), cum_dataset_size=pl.lit(0, dtype=pl.datatypes.Int64))
1708             # fair share for gshares
1709             if True and not tmp_queued_df.is_empty() and (fair_share_init_quota := min(QUEUE_FAIR_SHARE_MAX_QUOTA, max(quota_size, 0))) > 0:
1710                 queued_gshare_list = tmp_queued_df.select("gshare").unique().to_dict()["gshare"]
1711                 tmp_source_rse_gshare_stats_list = (
1712                     source_rse_gshare_stats_df.filter((pl.col("source_tape") == source_tape)).select("init_task_gshare", "staging_files").to_dicts()
1713                 )
1714                 gshare_staging_files_map = {x["init_task_gshare"]: x["staging_files"] for x in tmp_source_rse_gshare_stats_list}
1715                 n_queued_gshares = len(queued_gshare_list)
1716                 n_staging_files_of_gshares = sum([gshare_staging_files_map.get(gshare, 0) for gshare in queued_gshare_list])
1717                 # fair share quota per gshare and the virtual one including remaing staging files
1718                 fair_share_quota_per_gshare = fair_share_init_quota // n_queued_gshares
1719                 virtual_fair_share_quota_per_gshare = (fair_share_init_quota + n_staging_files_of_gshares) // n_queued_gshares
1720                 # list of gshares to stage by exluding gshares already having remaining staging files exceeding fair_share_quota_per_gshare
1721                 to_stage_gshare_list = [
1722                     gshare for gshare in queued_gshare_list if gshare_staging_files_map.get(gshare, 0) < virtual_fair_share_quota_per_gshare
1723                 ]
1724                 n_gshares_to_stage = len(to_stage_gshare_list)
1725                 # initialize dataframe with schema
1726                 fair_share_queued_df = None
1727                 unchosen_queued_df = None
1728                 more_fair_shared_queued_df = tmp_queued_df.clear()
1729                 # fair share quota to distribute to each gshare
1730                 for gshare in to_stage_gshare_list:
1731                     # remaining_fair_share_quota_of_gshare = fair_share_quota_per_gshare - gshare_staging_files_map.get(gshare, 0)
1732                     n_staging_files = gshare_staging_files_map.get(gshare, 0)
1733                     per_gshare_df = tmp_queued_df.filter(pl.col("gshare") == gshare)
1734                     per_gshare_df = per_gshare_df.with_columns(
1735                         cum_tot_files_in_gshare=(pl.col("total_files").cum_sum() + pl.lit(n_staging_files, dtype=pl.datatypes.Int64))
1736                     )
1737                     if fair_share_queued_df is None:
1738                         fair_share_queued_df = per_gshare_df.clear()
1739                     if unchosen_queued_df is None:
1740                         unchosen_queued_df = per_gshare_df.clear()
1741                     fair_share_queued_df.extend(per_gshare_df.filter(pl.col("cum_tot_files_in_gshare") <= fair_share_quota_per_gshare))
1742                     unchosen_queued_df.extend(per_gshare_df.filter(pl.col("cum_tot_files_in_gshare") > fair_share_quota_per_gshare))
1743                 # remaining fair share quota to distribute again
1744                 n_fair_share_files_to_stage = fair_share_queued_df.select("total_files").sum().to_dict()["total_files"][0]
1745                 fair_share_remaining_quota = fair_share_init_quota - n_fair_share_files_to_stage
1746                 if fair_share_remaining_quota > 0:
1747                     unchosen_queued_df = unchosen_queued_df.sort(["cum_tot_files_in_gshare", "gshare_rank"], descending=[False, False])
1748                     unchosen_queued_df = unchosen_queued_df.with_columns(tmp_cum_total_files=pl.col("total_files").cum_sum())
1749                     more_fair_shared_queued_df = unchosen_queued_df.filter(pl.col("tmp_cum_total_files") <= fair_share_remaining_quota)
1750                     # get one more request among gshares which are not yet staging (may exceed fair_share_remaining_quota)
1751                     not_yet_staging_gshares_set = (
1752                         set(to_stage_gshare_list)
1753                         - set(fair_share_queued_df.select("gshare").unique().to_dict()["gshare"])
1754                         - set(more_fair_shared_queued_df.select("gshare").unique().to_dict()["gshare"])
1755                     )
1756                     if not_yet_staging_gshares_set:
1757                         lucky_gshare = random.choice(list(not_yet_staging_gshares_set))
1758                         tmp_lucky_df = unchosen_queued_df.filter(
1759                             (pl.col("gshare") == lucky_gshare) & (pl.col("tmp_cum_total_files") > fair_share_remaining_quota)
1760                         ).head(1)
1761                         more_fair_shared_queued_df.extend(tmp_lucky_df)
1762                 # fill back to all queued requests
1763                 tmp_queued_df = pl.concat(
1764                     [fair_share_queued_df.select(tmp_queued_df.columns), more_fair_shared_queued_df.select(tmp_queued_df.columns), tmp_queued_df]
1765                 ).unique(subset=["request_id"], keep="first", maintain_order=True)
1766             # get cumulative sum of queued files per physical tape
1767             tmp_queued_df = tmp_queued_df.with_columns(cum_total_files=pl.col("total_files").cum_sum(), cum_dataset_size=pl.col("dataset_size").cum_sum())
1768             # number of queued requests at the physical tape
1769             n_queued = len(tmp_queued_df)
1770             n_to_pin = len(to_pin_df)
1771             n_total = n_queued + n_to_pin
1772             # print dataframe in log
1773             tmp_to_print_df = None
1774             if n_total:
1775                 tmp_to_print_df = pl.concat([to_pin_df, tmp_queued_df])
1776                 tmp_to_print_df = tmp_to_print_df.select(
1777                     ["request_id", "source_rse", "jediTaskID", "gshare", "gshare_rank", "task_priority", "total_files", "cum_total_files", "to_pin"]
1778                 )
1779                 tmp_to_print_df = tmp_to_print_df.with_columns(gshare_and_rank=pl.concat_str([pl.col("gshare"), pl.col("gshare_rank")], separator=" : "))
1780             # filter requests to respect the tape quota size; at most one request can reach or exceed quota size if quota size > 0
1781             to_stage_df = pl.concat(
1782                 [
1783                     tmp_queued_df.filter(pl.col("cum_total_files") < quota_size),
1784                     tmp_queued_df.filter((pl.col("cum_total_files") >= quota_size) & (quota_size > 0)).head(1),
1785                 ]
1786             )
1787             # append the requests to ret_list
1788             temp_key_list = ["request_id", "jediTaskID", "gshare", "taskType", "userName", "workingGroup"]
1789             # to_pin_request_id_list = to_pin_df.select(["request_id"]).to_dict(as_series=False)["request_id"]
1790             to_pin_request_list = to_pin_df.select(temp_key_list).to_dicts()
1791             to_stage_request_list = to_stage_df.select(temp_key_list).to_dicts()
1792             for request_dict in to_pin_request_list:
1793                 request_id = request_dict["request_id"]
1794                 extra_params = {
1795                     "task_id": request_dict["jediTaskID"],
1796                     "task_gshare": request_dict["gshare"],
1797                     # "task_type": request_dict["taskType"],
1798                     "task_user": request_dict["userName"],
1799                     "task_group": request_dict["workingGroup"],
1800                 }
1801                 dc_req_spec = request_id_spec_map.get(request_id)
1802                 if dc_req_spec:
1803                     ret_list.append((dc_req_spec, extra_params))
1804             to_stage_count = 0
1805             for request_dict in to_stage_request_list:
1806                 request_id = request_dict["request_id"]
1807                 extra_params = {
1808                     "task_id": request_dict["jediTaskID"],
1809                     "task_gshare": request_dict["gshare"],
1810                     "init_task_gshare": request_dict["gshare"],
1811                     # "task_type": request_dict["taskType"],
1812                     "task_user": request_dict["userName"],
1813                     "task_group": request_dict["workingGroup"],
1814                 }
1815                 dc_req_spec = request_id_spec_map.get(request_id)
1816                 if dc_req_spec:
1817                     ret_list.append((dc_req_spec, extra_params))
1818                     to_stage_count += 1
1819             if tmp_to_print_df is not None and not (to_pin_df.is_empty() and to_stage_df.is_empty()):
1820                 tmp_to_print_df = tmp_to_print_df.with_columns(
1821                     result=pl.when(pl.col("request_id").is_in(to_pin_df["request_id"]))
1822                     .then(pl.lit("pin"))
1823                     .when(pl.col("request_id").is_in(to_stage_df["request_id"]))
1824                     .then(pl.lit("stage"))
1825                     .otherwise(pl.lit(" "))
1826                 )
1827                 tmp_to_print_df = tmp_to_print_df.select(
1828                     ["request_id", "source_rse", "jediTaskID", "gshare_and_rank", "task_priority", "total_files", "cum_total_files", "result"]
1829                 )
1830                 tmp_log.debug(f"  source_tape={source_tape} , quota_size={quota_size} : \n{tmp_to_print_df}")
1831             if n_total:
1832                 tmp_log.debug(f"source_tape={source_tape} got {to_stage_count}/{n_queued} requests to stage, {n_to_pin} requests to pin")
1833         tmp_log.debug(f"totally got {len(ret_list)} requests to stage or to pin")
1834         # return
1835         return ret_list
1836 
1837     def _choose_destination_rse(self, expression: str, suggested_dst_list: list | None = None, excluded_dst_list: list | None = None) -> str | None:
1838         """
1839         Choose a destination (datadisk) RSE based on the destination RSE expression
1840 
1841         Args:
1842             expression (set): destination RSE expression
1843             suggested_dst_list (list|None): list of suggested destination list; RSEs in the list will be prioritized
1844             excluded_dst_list (list|None): list of excluded destination RSEs; RSEs in the list will be excluded from the choice
1845 
1846         Returns:
1847             str | None : destination RSE chosen, None if failed
1848         """
1849         tmp_log = LogWrapper(logger, f"_choose_destination_rse expression={expression} suggested_dst_list={suggested_dst_list}")
1850         try:
1851             # initialize
1852             destination_rse = None
1853             rse_set = set()
1854             # get RSEs from DDM
1855             the_rses = self.ddmIF.list_rses(expression)
1856             if the_rses is not None:
1857                 rse_set = set(the_rses)
1858             # exclude RSEs in excluded_destinations from config
1859             if excluded_destinations_set := set(self.dc_config_map.excluded_destinations):
1860                 rse_set -= excluded_destinations_set
1861             # exclude RSEs in excluded_dst_list
1862             if excluded_dst_list:
1863                 excluded_dst_set = set(excluded_dst_list)
1864                 rse_set -= excluded_dst_set
1865             # prioritize RSEs in suggested_dst_list
1866             if suggested_dst_list and (prioritized_rse_set := set(suggested_dst_list) & rse_set):
1867                 rse_set = prioritized_rse_set
1868             # get the list
1869             rse_list = list(rse_set)
1870             # tmp_log.debug(f"choosing destination_rse from {rse_list}")
1871             # choose destination RSE
1872             if rse_list:
1873                 if len(rse_list) == 1:
1874                     destination_rse = rse_list[0]
1875                 else:
1876                     destination_rse = random.choice(rse_list)
1877                 # tmp_log.debug(f"chose destination_rse={destination_rse}")
1878             else:
1879                 tmp_log.warning(f"no destination_rse match; skipped")
1880             # return
1881             return destination_rse
1882         except Exception as e:
1883             # other unexpected errors
1884             tmp_log.error(f"got error ; {traceback.format_exc()}")
1885             return None
1886 
1887     def _choose_destination_rse_for_request(self, dc_req_spec: DataCarouselRequestSpec) -> str | None:
1888         """
1889         Choose destination RSE for the request
1890 
1891         Args:
1892             dc_req_spec (DataCarouselRequestSpec): spec of the request
1893 
1894         Returns:
1895             str|None : DDM destination RSE if successful, or None if failed
1896         """
1897         tmp_log = LogWrapper(logger, f"_choose_destination_rse_for_request request_id={dc_req_spec.request_id}")
1898         # initialize
1899         # get source physical tape
1900         try:
1901             source_tape = self._get_source_tape_from_rse(dc_req_spec.source_rse)
1902         except Exception:
1903             # other unexpected errors
1904             tmp_log.error(f"got error ; {traceback.format_exc()}")
1905             return None
1906         # parameters about this tape source from DC config
1907         try:
1908             source_tape_config = self.dc_config_map.source_tapes_config[source_tape]
1909         except (KeyError, AttributeError):
1910             # no destination_expression for this tape; skipped
1911             tmp_log.warning(f"failed to get destination_expression from config; skipped ; {traceback.format_exc()}")
1912             return None
1913         except Exception:
1914             # other unexpected errors
1915             tmp_log.error(f"got error ; {traceback.format_exc()}")
1916             return None
1917         # destination expression
1918         if dc_req_spec.get_parameter("to_pin"):
1919             # to pin; use the simple to pin destination
1920             tmp_log.debug(f"has to_pin")
1921             tmp_dst_expr = TO_PIN_DST_REPLI_EXPR
1922         else:
1923             # destination_expression from DC config
1924             tmp_dst_expr = source_tape_config.destination_expression
1925         # excluded destination RSEs
1926         excluded_dst_list = dc_req_spec.get_parameter("excluded_dst_list")
1927         if excluded_dst_list:
1928             # adjust destination_expression according to excluded_dst_list
1929             for excluded_dst_rse in excluded_dst_list:
1930                 tmp_dst_expr += f"\\{excluded_dst_rse}"
1931         # add expression suffix for available RSEs
1932         tmp_dst_expr += AVAIL_REPLI_EXPR_SUFFIX
1933         # get suggested_dst_list
1934         suggested_dst_list = dc_req_spec.get_parameter("suggested_dst_list")
1935         # choose a destination RSE
1936         destination_rse = self._choose_destination_rse(expression=tmp_dst_expr, suggested_dst_list=suggested_dst_list, excluded_dst_list=excluded_dst_list)
1937         if destination_rse is not None:
1938             tmp_log.debug(f"chose destination RSE to be {destination_rse}")
1939         else:
1940             tmp_log.error(f"failed to choose destination RSE; skipped")
1941             return None
1942         # return
1943         return destination_rse
1944 
1945     def _submit_ddm_rule(self, dc_req_spec: DataCarouselRequestSpec, destination_rse: str | None = None) -> tuple[str | None, str | None]:
1946         """
1947         Submit DDM replication rule to stage the dataset of the request
1948 
1949         Args:
1950             dc_req_spec (DataCarouselRequestSpec): spec of the request
1951             destination_rse (str|None): predetermined destination RSE to stage; if None, will be chosen randomly
1952 
1953         Returns:
1954             str | None : DDM rule_id of the new rule if submission successful, or None if failed
1955             str | None : destination RSE chosen
1956         """
1957         tmp_log = LogWrapper(logger, f"_submit_ddm_rule request_id={dc_req_spec.request_id}")
1958         # initialize
1959         tmp_dst_expr = None
1960         expression = None
1961         lifetime_days = 45
1962         weight = None
1963         source_replica_expression = None
1964         # source replica expression
1965         if dc_req_spec.source_rse:
1966             source_replica_expression = f"{SRC_REPLI_EXPR_PREFIX}|{dc_req_spec.source_rse}"
1967         else:
1968             # no source_rse; unexpected
1969             tmp_log.warning(f"source_rse is None ; skipped")
1970             return None, None
1971         # get destination expression RSE
1972         if destination_rse is None:
1973             destination_rse = self._choose_destination_rse_for_request(dc_req_spec)
1974         if destination_rse is not None:
1975             # got a destination RSE
1976             expression = str(destination_rse)
1977         else:
1978             # no match of destination RSE; return None and stay queued
1979             tmp_log.error(f"failed to get destination RSE; skipped")
1980             return None, None
1981         # get task type for DDM rule activity
1982         ddm_rule_activity = DDM_RULE_ACTIVITY_MAP["prod"]
1983         if task_type := dc_req_spec.get_parameter("task_type"):
1984             ddm_rule_activity = DDM_RULE_ACTIVITY_MAP.get(task_type, ddm_rule_activity)
1985         # submit ddm staging rule
1986         ddm_rule_id = self.ddmIF.make_staging_rule(
1987             dataset_name=dc_req_spec.dataset,
1988             expression=expression,
1989             activity=ddm_rule_activity,
1990             lifetime=lifetime_days,
1991             weight=weight,
1992             notify="P",
1993             source_replica_expression=source_replica_expression,
1994         )
1995         # return
1996         return ddm_rule_id, destination_rse
1997 
1998     def _submit_idds_stagein_request(self, task_id: int, dc_req_spec: DataCarouselRequestSpec) -> Any:
1999         """
2000         Submit corresponding iDDS stage-in request for given Data Carousel request and task
2001         Currently only used for manual testing or after resubmitting Data Carousel requests
2002 
2003         Args:
2004             task_id (int): jediTaskID of the task
2005             dc_req_spec (DataCarouselRequestSpec): request to submit iDDS stage-in request
2006 
2007         Returns:
2008             Any : iDDS requests ID returned from iDDS
2009         """
2010         tmp_log = LogWrapper(logger, f"_submit_idds_stagein_request request_id={dc_req_spec.request_id}")
2011         # dataset and rule_id
2012         dataset = dc_req_spec.dataset
2013         rule_id = dc_req_spec.ddm_rule_id
2014         ds_str_list = dataset.split(":")
2015         tmp_scope = ds_str_list[0]
2016         tmp_name = ds_str_list[1]
2017         # iDDS request
2018         c = iDDS_Client(idds.common.utils.get_rest_host())
2019         req = {
2020             "scope": tmp_scope,
2021             "name": tmp_name,
2022             "requester": "panda",
2023             "request_type": idds.common.constants.RequestType.StageIn,
2024             "transform_tag": idds.common.constants.RequestType.StageIn.value,
2025             "status": idds.common.constants.RequestStatus.New,
2026             "priority": 0,
2027             "lifetime": 30,
2028             "request_metadata": {
2029                 "workload_id": task_id,
2030                 "rule_id": rule_id,
2031             },
2032         }
2033         tmp_log.debug(f"iDDS request: {req}")
2034         ret = c.add_request(**req)
2035         tmp_log.debug(f"done submit; iDDS_requestID={ret}")
2036         # return
2037         return ret
2038 
2039     @refresh
2040     def stage_request(
2041         self, dc_req_spec: DataCarouselRequestSpec, extra_params: dict | None = None, destination_rse: str | None = None, submit_idds_request=True
2042     ) -> tuple[bool, str | None, DataCarouselRequestSpec]:
2043         """
2044         Stage the dataset of the request and update request status to staging
2045 
2046         Args:
2047             dc_req_spec (DataCarouselRequestSpec): spec of the request
2048             extra_params (dict|None): extra parameters of the request to set; None if nothing to set
2049             destination_rse (str|None): predetermined destination RSE to stage the dataset; None if to choose randomly later
2050             submit_idds_request (bool): whether to submit IDDS request for the dataset; default is True
2051 
2052         Returns:
2053             bool : True for success, False otherwise
2054             str|None : error message if any, None otherwise
2055             DataCarouselRequestSpec : updated spec of the request
2056         """
2057         tmp_log = LogWrapper(logger, f"stage_request request_id={dc_req_spec.request_id}")
2058         is_ok = False
2059         err_msg = None
2060         # renew dc_req_spec from DB
2061         with self.request_lock(dc_req_spec.request_id) as locked_spec:
2062             if not locked_spec:
2063                 # not getting lock; skip
2064                 err_msg = "did not get lock; skipped"
2065                 tmp_log.warning(err_msg)
2066                 return is_ok, err_msg, dc_req_spec
2067             # got locked spec
2068             dc_req_spec = locked_spec
2069             # skip if not queued
2070             if dc_req_spec.status != DataCarouselRequestStatus.queued:
2071                 err_msg = f"status={dc_req_spec.status} not queued; skipped"
2072                 tmp_log.warning(err_msg)
2073                 return is_ok, err_msg, dc_req_spec
2074             # check if still with active related tasks; if not, skip
2075             active_related_tasks = self.taskBufferIF.get_related_tasks_of_data_carousel_request_JEDI(
2076                 dc_req_spec.request_id, status_exclusion_list=FINAL_TASK_STATUSES
2077             )
2078             if not active_related_tasks:
2079                 try:
2080                     # cancel the request
2081                     self.cancel_request(dc_req_spec, reason="queued_while_no_active_tasks")
2082                     tmp_log.debug(f"cancelled since no active related tasks")
2083                 except Exception:
2084                     tmp_log.warning(f"failed to cancel ; {traceback.format_exc()}")
2085                 err_msg = f"no active related tasks; skipped"
2086                 tmp_log.warning(err_msg)
2087                 return is_ok, err_msg, dc_req_spec
2088             # retry to get DDM dataset metadata and skip if total_files is still None
2089             if dc_req_spec.total_files is None:
2090                 _got = self._fill_total_files_and_size(dc_req_spec)
2091                 if not _got:
2092                     err_msg = f"total_files and dataset_size are still None; skipped"
2093                     tmp_log.warning(err_msg)
2094                     return is_ok, err_msg, dc_req_spec
2095             # check existing DDM rule of the dataset
2096             if (ddm_rule_id := dc_req_spec.ddm_rule_id) is not None:
2097                 # DDM rule exists; no need to submit
2098                 tmp_log.debug(f"dataset={dc_req_spec.dataset} already has active DDM rule ddm_rule_id={ddm_rule_id}")
2099             else:
2100                 # no existing rule; submit DDM rule
2101                 ddm_rule_id, destination_rse = self._submit_ddm_rule(dc_req_spec, destination_rse=destination_rse)
2102                 if ddm_rule_id:
2103                     # DDM rule submitted; update ddm_rule_id
2104                     dc_req_spec.ddm_rule_id = ddm_rule_id
2105                     tmp_log.debug(f"submitted DDM rule ddm_rule_id={ddm_rule_id}")
2106                 else:
2107                     # failed to submit
2108                     err_msg = f"failed to submitted DDM rule ; skipped"
2109                     tmp_log.warning(err_msg)
2110                     return is_ok, err_msg, dc_req_spec
2111             # update extra parameters
2112             if extra_params:
2113                 dc_req_spec.update_parameters(extra_params)
2114             # update request to be staging
2115             now_time = naive_utcnow()
2116             dc_req_spec.status = DataCarouselRequestStatus.staging
2117             dc_req_spec.start_time = now_time
2118             ret = self.taskBufferIF.update_data_carousel_request_JEDI(dc_req_spec)
2119             if ret:
2120                 tmp_log.info(f"updated DB about staging; status={dc_req_spec.status}")
2121                 is_ok = True
2122             # log for monitoring
2123             tmp_log.info(
2124                 f"started staging "
2125                 f"dataset={dc_req_spec.dataset} source_tape={dc_req_spec.source_tape} source_rse={dc_req_spec.source_rse} "
2126                 f"destination_rse={destination_rse} ddm_rule_id={dc_req_spec.ddm_rule_id} "
2127                 f"total_files={dc_req_spec.total_files} dataset_size={dc_req_spec.dataset_size} "
2128                 f"task_id={dc_req_spec.get_parameter('task_id')} task_type={dc_req_spec.get_parameter('task_type')} "
2129                 f"task_user={dc_req_spec.get_parameter('task_user')} task_group={dc_req_spec.get_parameter('task_group')} "
2130                 f"to_pin={dc_req_spec.get_parameter('to_pin')}"
2131             )
2132         # to iDDS staging requests
2133         if is_ok and submit_idds_request:
2134             # get all tasks related to this request
2135             task_id_list = self._get_related_tasks(dc_req_spec.request_id)
2136             if task_id_list:
2137                 tmp_log.debug(f"related tasks: {task_id_list}")
2138                 for task_id in task_id_list:
2139                     try:
2140                         self._submit_idds_stagein_request(task_id, dc_req_spec)
2141                         tmp_log.debug(f"submitted corresponding iDDS request for related task {task_id}")
2142                     except Exception as e:
2143                         tmp_log.warning(f"got error while submitting iDDS request; skipped : {traceback.format_exc()}")
2144             else:
2145                 tmp_log.warning(f"failed to get related tasks; skipped to submit iDDS requests")
2146         # return
2147         return is_ok, err_msg, dc_req_spec
2148 
2149     def _refresh_ddm_rule(self, rule_id: str, lifetime: int) -> bool:
2150         """
2151         Refresh lifetime of the DDM rule
2152 
2153         Args:
2154             rule_id (str): DDM rule ID
2155             lifetime (int): lifetime in seconds to set
2156 
2157         Returns:
2158             bool : True for success, False otherwise
2159         """
2160         set_map = {"lifetime": lifetime}
2161         ret = self.ddmIF.update_rule_by_id(rule_id, set_map)
2162         return ret
2163 
2164     def cancel_request(self, dc_req_spec: DataCarouselRequestSpec, by: str = "manual", reason: str | None = None) -> bool | None:
2165         """
2166         Cancel a request
2167         Set corresponding DDM rule to be about to expired (in 5 sec)
2168 
2169         Args:
2170             dc_req_spec (DataCarouselRequestSpec): spec of the request to cancel
2171             by (str): annotation of the caller of this method; default is "manual"
2172             reason (str|None): annotation of the reason for cancelling
2173 
2174         Returns:
2175             bool|None : True for success, None otherwise
2176         """
2177         tmp_log = LogWrapper(logger, f"cancel_request request_id={dc_req_spec.request_id} by={by}" + (f" reason={reason}" if reason else " "))
2178         # cancel
2179         ret = self.taskBufferIF.cancel_data_carousel_request_JEDI(dc_req_spec.request_id)
2180         if ret:
2181             tmp_log.debug(f"cancelled")
2182         elif ret == 0:
2183             tmp_log.debug(f"already terminated; skipped")
2184         else:
2185             tmp_log.error(f"failed to cancel")
2186         # expire DDM rule
2187         if dc_req_spec.ddm_rule_id:
2188             short_time = 5
2189             self._refresh_ddm_rule(dc_req_spec.ddm_rule_id, short_time)
2190         # return
2191         return ret
2192 
2193     def retire_request(self, dc_req_spec: DataCarouselRequestSpec, by: str = "manual", reason: str | None = None) -> bool | None:
2194         """
2195         Retire a done request so that it will not be reused
2196         Set corresponding DDM rule to be about to expired (in 5 sec)
2197 
2198         Args:
2199             dc_req_spec (DataCarouselRequestSpec): spec of the request to cancel
2200             by (str): annotation of the caller of this method; default is "manual"
2201             reason (str|None): annotation of the reason for retiring
2202 
2203         Returns:
2204             bool|None : True for success, None otherwise
2205         """
2206         tmp_log = LogWrapper(logger, f"retire_request request_id={dc_req_spec.request_id} by={by}" + (f" reason={reason}" if reason else " "))
2207         # retire
2208         ret = self.taskBufferIF.retire_data_carousel_request_JEDI(dc_req_spec.request_id)
2209         if ret:
2210             tmp_log.debug(f"retired")
2211         elif ret == 0:
2212             tmp_log.debug(f"cannot retire; skipped")
2213         else:
2214             tmp_log.error(f"failed to retire")
2215         # return
2216         return ret
2217 
2218     def _check_ddm_rule_of_request(self, dc_req_spec: DataCarouselRequestSpec, by: str = "unknown") -> tuple[bool, str | None, dict | None]:
2219         """
2220         Check if the DDM rule of the request is valid.
2221         If rule not found, update the request and try to cancel or retire it.
2222 
2223         Args:
2224             dc_req_spec (DataCarouselRequestSpec): spec of the request
2225 
2226         Returns:
2227             bool : True if valid, False otherwise
2228             str|None : DDM rule ID
2229             dict|None : DDM rule data if valid, None otherwise
2230         """
2231         tmp_log = LogWrapper(logger, f"_check_ddm_rule_of_request request_id={dc_req_spec.request_id} ddm_rule_id={dc_req_spec.ddm_rule_id}")
2232         is_valid = False
2233         # get DDM rule
2234         ddm_rule_id = dc_req_spec.ddm_rule_id
2235         the_rule = self.ddmIF.get_rule_by_id(ddm_rule_id)
2236         if the_rule is False:
2237             # rule not found
2238             with self.request_lock(dc_req_spec.request_id) as locked_spec:
2239                 if not locked_spec:
2240                     # not getting lock; skip
2241                     tmp_log.warning(f"did not get lock; skipped")
2242                     return is_valid, ddm_rule_id, None
2243                 # got locked spec
2244                 dc_req_spec = locked_spec
2245                 dc_req_spec.set_parameter("rule_unfound", True)
2246                 tmp_log.warning(f"rule not found")
2247                 tmp_ret = self.taskBufferIF.update_data_carousel_request_JEDI(dc_req_spec)
2248                 if tmp_ret:
2249                     tmp_log.debug(f"updated DB about rule not found")
2250                 else:
2251                     tmp_log.error(f"failed to update DB ; skipped")
2252             # try to cancel or retire request
2253             if dc_req_spec.status == DataCarouselRequestStatus.staging:
2254                 # requests staging but DDM rule not found; to cancel
2255                 self.cancel_request(dc_req_spec, by=by, reason="rule_unfound")
2256             elif dc_req_spec.status == DataCarouselRequestStatus.done:
2257                 # requests done but DDM rule not found; to retire
2258                 self.retire_request(dc_req_spec, by=by, reason="rule_unfound")
2259         elif the_rule is None:
2260             # got error when getting the rule
2261             tmp_log.error(f"failed to get rule ; skipped")
2262         else:
2263             # rule found
2264             is_valid = True
2265             # tmp_log.debug(f"rule is valid")
2266         # return
2267         return is_valid, ddm_rule_id, the_rule
2268 
2269     def refresh_ddm_rule_of_request(self, dc_req_spec: DataCarouselRequestSpec, lifetime_days: int, force_refresh: bool = False, by: str = "unknown") -> bool:
2270         """
2271         Refresh lifetime of the DDM rule of one request
2272 
2273         Args:
2274             dc_req_spec (DataCarouselRequestSpec): spec of the request
2275             lifetime_days (int): lifetime in days to set
2276             force_refresh (bool): force to refresh regardless of to_refresh max/min lifetime
2277             by (str): annotation of the caller of this method; default is "watchdog"
2278 
2279         Returns:
2280             bool : True for success, False otherwise
2281         """
2282         tmp_log = LogWrapper(logger, f"refresh_ddm_rule_of_request request_id={dc_req_spec.request_id}")
2283         # initialize
2284         ret = False
2285         # check if the rule is valid
2286         is_valid, ddm_rule_id, the_rule = self._check_ddm_rule_of_request(dc_req_spec, by=by)
2287         if not is_valid:
2288             # rule not valid; skipped
2289             tmp_log.error(f"ddm_rule_id={ddm_rule_id} rule not valid; skipped")
2290             return ret
2291         # rule lifetime
2292         rule_lifetime = None
2293         if the_rule["expires_at"]:
2294             now_time = naive_utcnow()
2295             rule_lifetime = the_rule["expires_at"] - now_time
2296         # trigger renewal if force_refresh or when lifetime within the range
2297         if (
2298             rule_lifetime is None
2299             or force_refresh
2300             or (rule_lifetime < timedelta(days=TO_REFRESH_MAX_LIFETIME_DAYS) and rule_lifetime > timedelta(hours=TO_REFRESH_MIN_LIFETIME_HOURS))
2301         ):
2302             ret = self._refresh_ddm_rule(ddm_rule_id, 86400 * lifetime_days)
2303             # tmp_log.debug(f"status={dc_req_spec.status} ddm_rule_id={ddm_rule_id} refreshed lifetime to be {lifetime_days} days long")
2304         else:
2305             # rule_lifetime_days = rule_lifetime.total_seconds() / 86400
2306             # tmp_log.debug(
2307             #     f"ddm_rule_id={ddm_rule_id} not to refresh as lifetime {rule_lifetime_days:.2f}d not within range {TO_REFRESH_MAX_LIFETIME_DAYS}d to {TO_REFRESH_MIN_LIFETIME_HOURS}h"
2308             # )
2309             pass
2310         # return
2311         return ret
2312 
2313     def keep_alive_ddm_rules(self, by: str = "watchdog"):
2314         """
2315         Keep alive DDM rules of requests of active tasks; also check all requests if their DDM rules are valid
2316 
2317         Args:
2318             by (str): annotation of the caller of this method; default is "watchdog"
2319         """
2320         tmp_log = LogWrapper(logger, "keep_alive_ddm_rules")
2321         # get all requests
2322         all_requests_map, _ = self.taskBufferIF.get_data_carousel_requests_by_task_status_JEDI()
2323         # get requests of active tasks
2324         active_tasked_requests_map, _ = self.taskBufferIF.get_data_carousel_requests_by_task_status_JEDI(status_exclusion_list=FINAL_TASK_STATUSES)
2325         for dc_req_spec in all_requests_map.values():
2326             try:
2327                 if dc_req_spec.status not in [DataCarouselRequestStatus.staging, DataCarouselRequestStatus.done]:
2328                     # skip requests without need to keep rules alive
2329                     continue
2330                 if dc_req_spec.request_id in active_tasked_requests_map:
2331                     # requests of active tasks; decide lifetime in days
2332                     days = None
2333                     if dc_req_spec.status == DataCarouselRequestStatus.staging:
2334                         # for requests staging
2335                         days = STAGING_LIFETIME_DAYS
2336                     elif dc_req_spec.status == DataCarouselRequestStatus.done:
2337                         # for requests done
2338                         days = DONE_LIFETIME_DAYS
2339                     # trigger renewal
2340                     if days is not None:
2341                         ret = self.refresh_ddm_rule_of_request(dc_req_spec, lifetime_days=days, by=by)
2342                         if ret:
2343                             tmp_log.debug(
2344                                 f"request_id={dc_req_spec.request_id} status={dc_req_spec.status} ddm_rule_id={dc_req_spec.ddm_rule_id} refreshed lifetime to be {days} days long"
2345                             )
2346                         else:
2347                             # tmp_log.debug(
2348                             #     f"request_id={dc_req_spec.request_id} status={dc_req_spec.status} ddm_rule_id={dc_req_spec.ddm_rule_id} not to renew ; skipped"
2349                             # )
2350                             pass
2351                 else:
2352                     # requests of non-active tasks; check if the rule is valid
2353                     is_valid, ddm_rule_id, _ = self._check_ddm_rule_of_request(dc_req_spec, by=by)
2354                     # if not is_valid:
2355                     #     tmp_log.warning(f"ddm_rule_id={ddm_rule_id} rule not valid")
2356             except Exception:
2357                 tmp_log.error(f"request_id={dc_req_spec.request_id} got error ; {traceback.format_exc()}")
2358 
2359     def _update_staged_files(self, dc_req_spec: DataCarouselRequestSpec, task_id_list: list[int] | None = None) -> bool | None:
2360         """
2361         Update status of files in DB Jedi_Dataset_Contents for a request done staging
2362 
2363         Args:
2364             dc_req_spec (DataCarouselRequestSpec): spec of the request
2365             task_id_list (list[int]|None): list of jediTaskID to update; if None, will get all related tasks
2366 
2367         Returns:
2368             bool|None : True for success, None otherwise
2369         """
2370         tmp_log = LogWrapper(logger, f"_update_staged_files request_id={dc_req_spec.request_id}")
2371         try:
2372             # get scope of dataset
2373             dataset = dc_req_spec.dataset
2374             scope, dsname = self.ddmIF.extract_scope(dataset)
2375             # get lfn of files in the dataset from DDM
2376             lfn_set = self.ddmIF.get_files_in_dataset(dataset, ignore_unknown=True, lfn_only=True)
2377             # make filenames_dict for updateInputFilesStaged_JEDI
2378             filenames_dict = {}
2379             dummy_value_tuple = (None, None)
2380             for lfn in lfn_set:
2381                 filenames_dict[lfn] = dummy_value_tuple
2382             # get all related tasks to update staged files
2383             if task_id_list is None:
2384                 task_id_list = self._get_related_tasks(dc_req_spec.request_id)
2385             if task_id_list:
2386                 # tmp_log.debug(f"related tasks: {task_id_list}")
2387                 n_done_tasks = 0
2388                 for task_id in task_id_list:
2389                     ret = self.taskBufferIF.updateInputFilesStaged_JEDI(task_id, scope, filenames_dict, by="DataCarousel", check_scope=False)
2390                     if ret is None:
2391                         tmp_log.warning(f"failed to update files for task_id={task_id} ; skipped")
2392                     else:
2393                         n_done_tasks += 1
2394                 tmp_log.debug(f"updated staged files for {n_done_tasks}/{len(task_id_list)} related tasks")
2395             else:
2396                 tmp_log.warning(f"failed to get related tasks; skipped")
2397             # return
2398             return True
2399         except Exception:
2400             tmp_log.error(f"got error ; {traceback.format_exc()}")
2401             return None
2402 
2403     def check_staging_requests(self):
2404         """
2405         Check staging requests
2406         """
2407         tmp_log = LogWrapper(logger, "check_staging_requests")
2408         dc_req_specs = self.taskBufferIF.get_data_carousel_staging_requests_JEDI()
2409         if dc_req_specs is None:
2410             tmp_log.warning(f"failed to query requests to check ; skipped")
2411         elif not dc_req_specs:
2412             tmp_log.debug(f"got no requests to check ; skipped")
2413         for dc_req_spec in dc_req_specs:
2414             try:
2415                 to_update = False
2416                 # get DDM rule
2417                 ddm_rule_id = dc_req_spec.ddm_rule_id
2418                 the_rule = self.ddmIF.get_rule_by_id(ddm_rule_id)
2419                 if the_rule is False:
2420                     with self.request_lock(dc_req_spec.request_id) as locked_spec:
2421                         if not locked_spec:
2422                             # not getting lock; skip
2423                             tmp_log.warning(f"did not get lock; skipped")
2424                             continue
2425                         # got locked spec
2426                         dc_req_spec = locked_spec
2427                         # rule not found
2428                         dc_req_spec.set_parameter("rule_unfound", True)
2429                         tmp_log.error(f"request_id={dc_req_spec.request_id} ddm_rule_id={ddm_rule_id} rule not found")
2430                         tmp_ret = self.taskBufferIF.update_data_carousel_request_JEDI(dc_req_spec)
2431                         if tmp_ret:
2432                             tmp_log.debug(f"request_id={dc_req_spec.request_id} updated DB about rule not found")
2433                         else:
2434                             tmp_log.error(f"request_id={dc_req_spec.request_id} failed to update DB ; skipped")
2435                     # try to cancel or retire request
2436                     if dc_req_spec.status == DataCarouselRequestStatus.staging:
2437                         # requests staging but DDM rule not found; to cancel
2438                         self.cancel_request(dc_req_spec, by="watchdog", reason="rule_unfound")
2439                     elif dc_req_spec.status == DataCarouselRequestStatus.done:
2440                         # requests done but DDM rule not found; to retire
2441                         self.retire_request(dc_req_spec, by="watchdog", reason="rule_unfound")
2442                     continue
2443                 elif the_rule is None:
2444                     # got error when getting the rule
2445                     tmp_log.error(f"request_id={dc_req_spec.request_id} failed to get rule of ddm_rule_id={ddm_rule_id} ; skipped")
2446                     continue
2447                 # Got rule; check further
2448                 with self.request_lock(dc_req_spec.request_id) as locked_spec:
2449                     if not locked_spec:
2450                         # not getting lock; skip
2451                         tmp_log.warning(f"did not get lock; skipped")
2452                         continue
2453                     # got locked spec
2454                     dc_req_spec = locked_spec
2455                     # Destination RSE
2456                     if dc_req_spec.destination_rse is None:
2457                         the_replica_locks = self.ddmIF.list_replica_locks_by_id(ddm_rule_id)
2458                         try:
2459                             the_first_file = the_replica_locks[0]
2460                         except IndexError:
2461                             tmp_log.warning(
2462                                 f"request_id={dc_req_spec.request_id} no file from replica lock of ddm_rule_id={ddm_rule_id} ; destination_rse not updated"
2463                             )
2464                         except TypeError:
2465                             tmp_log.warning(
2466                                 f"request_id={dc_req_spec.request_id} error listing replica lock of ddm_rule_id={ddm_rule_id} ; destination_rse not updated"
2467                             )
2468                         else:
2469                             # fill in destination RSE
2470                             destination_rse = the_first_file["rse"]
2471                             dc_req_spec.destination_rse = destination_rse
2472                             tmp_log.debug(f"request_id={dc_req_spec.request_id} filled destination_rse={destination_rse} of ddm_rule_id={ddm_rule_id}")
2473                             to_update = True
2474                     # current staged files
2475                     now_time = naive_utcnow()
2476                     current_staged_files = int(the_rule["locks_ok_cnt"])
2477                     new_staged_files = current_staged_files - dc_req_spec.staged_files
2478                     if new_staged_files > 0:
2479                         # have more staged files than before; update request according to DDM rule
2480                         dc_req_spec.staged_files = current_staged_files
2481                         dc_req_spec.staged_size = int(dc_req_spec.dataset_size * dc_req_spec.staged_files / dc_req_spec.total_files)
2482                         dc_req_spec.last_staged_time = now_time
2483                         to_update = True
2484                     else:
2485                         tmp_log.debug(f"request_id={dc_req_spec.request_id} got {new_staged_files} new staged files")
2486                     # check completion of staging
2487                     if dc_req_spec.staged_files == dc_req_spec.total_files:
2488                         # all files staged; process request to done
2489                         dc_req_spec.status = DataCarouselRequestStatus.done
2490                         dc_req_spec.end_time = now_time
2491                         dc_req_spec.staged_size = dc_req_spec.dataset_size
2492                         to_update = True
2493                     # update to DB if attribute updated
2494                     if to_update:
2495                         ret = self.taskBufferIF.update_data_carousel_request_JEDI(dc_req_spec)
2496                         if ret is not None:
2497                             # updated DB about staging
2498                             tmp_log.info(
2499                                 f"request_id={dc_req_spec.request_id} got {new_staged_files} new staged files; updated DB about staging ; status={dc_req_spec.status}"
2500                             )
2501                             # more for done requests
2502                             if dc_req_spec.status == DataCarouselRequestStatus.done:
2503                                 # force to keep alive the rule
2504                                 tmp_ret = self.refresh_ddm_rule_of_request(dc_req_spec, lifetime_days=DONE_LIFETIME_DAYS, force_refresh=True, by="watchdog")
2505                                 if tmp_ret:
2506                                     tmp_log.debug(
2507                                         f"request_id={dc_req_spec.request_id} status={dc_req_spec.status} ddm_rule_id={dc_req_spec.ddm_rule_id} refreshed lifetime to be {DONE_LIFETIME_DAYS} days long"
2508                                     )
2509                                 # update staged files in DB for done requests
2510                                 tmp_ret = self._update_staged_files(dc_req_spec)
2511                                 if tmp_ret:
2512                                     tmp_log.debug(f"request_id={dc_req_spec.request_id} done; updated staged files")
2513                                 else:
2514                                     tmp_log.warning(f"request_id={dc_req_spec.request_id} done; failed to update staged files ; skipped")
2515                         else:
2516                             tmp_log.error(f"request_id={dc_req_spec.request_id} failed to update DB for ddm_rule_id={ddm_rule_id} ; skipped")
2517                             continue
2518             except Exception:
2519                 tmp_log.error(f"request_id={dc_req_spec.request_id} got error ; {traceback.format_exc()}")
2520 
2521     def _resume_task(self, task_id: int) -> bool:
2522         """
2523         Resume task from staging (to staged-pending)
2524 
2525         Args:
2526             task_id (int): JEDI task ID
2527 
2528         Returns:
2529             bool : True for success, False otherwise
2530         """
2531         tmp_log = LogWrapper(logger, "_resume_task")
2532         # send resume command
2533         ret_val, ret_str = self.taskBufferIF.sendCommandTaskPanda(task_id, "Data Carousel. Resumed from staging", True, "resume", properErrorCode=True)
2534         # check if ok
2535         if ret_val == 0:
2536             return True
2537         else:
2538             tmp_log.warning(f"task_id={task_id} failed to resume the task: error_code={ret_val} {ret_str}")
2539             return False
2540 
2541     def resume_tasks_from_staging(self):
2542         """
2543         Get tasks with enough staged files and resume them
2544         """
2545         tmp_log = LogWrapper(logger, "resume_tasks_from_staging")
2546         ret_requests_map, ret_relation_map = self.taskBufferIF.get_data_carousel_requests_by_task_status_JEDI(status_filter_list=["staging"])
2547         n_resumed_tasks = 0
2548         for task_id, request_id_list in ret_relation_map.items():
2549             to_resume = False
2550             try:
2551                 # _, task_spec = self.taskBufferIF.getTaskWithID_JEDI(task_id, fullFlag=False)
2552                 # if not task_spec:
2553                 #     # task not found
2554                 #     tmp_log.error(f"task_id={task_id} task not found; skipped")
2555                 #     continue
2556                 for request_id in request_id_list:
2557                     dc_req_spec = ret_requests_map[request_id]
2558                     # if task_spec.taskType == "prod":
2559                     #     # condition for production tasks: resume if one file staged
2560                     #     if dc_req_spec.status == DataCarouselRequestStatus.done or (dc_req_spec.staged_files and dc_req_spec.staged_files > 0):
2561                     #         # got at least one data carousel request done for the task, to resume
2562                     #         to_resume = True
2563                     #         break
2564                     # elif task_spec.taskType == "anal":
2565                     #     # condition for analysis tasks
2566                     #     # FIXME: temporary conservative condition for analysis tasks: resume if one dataset staged
2567                     #     if dc_req_spec.status == DataCarouselRequestStatus.done:
2568                     #         # got at least one entire dataset staged, to resume
2569                     #         to_resume = True
2570                     #         break
2571                     # resume as soon as DDM rules are created
2572                     if dc_req_spec.ddm_rule_id and dc_req_spec.status in [
2573                         DataCarouselRequestStatus.staging,
2574                         DataCarouselRequestStatus.done,
2575                         DataCarouselRequestStatus.retired,
2576                     ]:
2577                         to_resume = True
2578                         break
2579                 if to_resume:
2580                     # resume the task
2581                     ret_val = self._resume_task(task_id)
2582                     if ret_val:
2583                         n_resumed_tasks += 1
2584                         tmp_log.debug(f"task_id={task_id} resumed the task")
2585                     else:
2586                         tmp_log.warning(f"task_id={task_id} failed to resume the task; skipped")
2587             except Exception:
2588                 tmp_log.error(f"task_id={task_id} got error ; {traceback.format_exc()}")
2589         # summary
2590         tmp_log.debug(f"resumed {n_resumed_tasks} tasks")
2591 
2592     def _get_orphan_requests(self) -> list[DataCarouselRequestSpec] | None:
2593         """
2594         Get orphan requests, which are active and without any tasks related
2595 
2596         Returns:
2597             list[DataCarouselRequestSpec]|None : list of orphan requests, or None if failure
2598         """
2599         tmp_log = LogWrapper(logger, f"_get_orphan_requests")
2600         status_var_names_str, status_var_map = get_sql_IN_bind_variables(DataCarouselRequestStatus.active_statuses, prefix=":status")
2601         sql = (
2602             f"SELECT {DataCarouselRequestSpec.columnNames()} "
2603             f"FROM {panda_config.schemaJEDI}.data_carousel_requests "
2604             f"WHERE request_id NOT IN "
2605             f"(SELECT rel.request_id FROM {panda_config.schemaJEDI}.data_carousel_relations rel)"
2606             f"AND status IN ({status_var_names_str}) "
2607         )
2608         var_map = {}
2609         var_map.update(status_var_map)
2610         res_list = self.taskBufferIF.querySQL(sql, var_map, arraySize=99999)
2611         if res_list:
2612             ret_list = []
2613             for res in res_list:
2614                 dc_req_spec = DataCarouselRequestSpec()
2615                 dc_req_spec.pack(res)
2616                 ret_list.append(dc_req_spec)
2617             tmp_log.debug(f"got {len(ret_list)} orphan request")
2618             return ret_list
2619         else:
2620             tmp_log.debug("no orphan request; skipped")
2621             return None
2622 
2623     def clean_up_requests(
2624         self, done_age_limit_days: int | float = DONE_LIFETIME_DAYS, outdated_age_limit_days: int | float = DONE_LIFETIME_DAYS, by: str = "watchdog"
2625     ):
2626         """
2627         Clean up terminated and outdated requests
2628 
2629         Args:
2630             done_age_limit_days (int|float): age limit in days for requests done and without active tasks
2631             outdated_age_limit_days (int|float): age limit in days for outdated requests
2632             by (str): annotation of the caller of this method; default is "watchdog"
2633         """
2634         tmp_log = LogWrapper(logger, "clean_up_requests")
2635         try:
2636             # initialize
2637             done_requests_set = set()
2638             cancelled_requests_set = set()
2639             retired_requests_set = set()
2640             # get requests of terminated and active tasks
2641             terminated_tasks_requests_map, terminated_tasks_relation_map = self.taskBufferIF.get_data_carousel_requests_by_task_status_JEDI(
2642                 status_filter_list=FINAL_TASK_STATUSES
2643             )
2644             active_tasks_requests_map, active_tasks_relation_map = self.taskBufferIF.get_data_carousel_requests_by_task_status_JEDI(
2645                 status_exclusion_list=FINAL_TASK_STATUSES
2646             )
2647             now_time = naive_utcnow()
2648             # set of requests of terminated tasks
2649             request_ids_of_terminated_tasks = set()
2650             for request_id_list in terminated_tasks_relation_map.values():
2651                 request_ids_of_terminated_tasks |= set(request_id_list)
2652             # set of requests of active tasks
2653             request_ids_of_active_tasks = set()
2654             for request_id_list in active_tasks_relation_map.values():
2655                 request_ids_of_active_tasks |= set(request_id_list)
2656             # loop over requests of terminated tasks
2657             for request_id in request_ids_of_terminated_tasks:
2658                 if request_id in active_tasks_requests_map:
2659                     # the request is also mapped to some active task, not to be cleaned up; skipped
2660                     continue
2661                 dc_req_spec = terminated_tasks_requests_map[request_id]
2662                 if dc_req_spec.status == DataCarouselRequestStatus.done and (
2663                     (dc_req_spec.end_time and dc_req_spec.end_time < now_time - timedelta(days=done_age_limit_days))
2664                     or dc_req_spec.get_parameter("rule_unfound")
2665                     or dc_req_spec.get_parameter("remove_when_done")
2666                 ):
2667                     # requests done, and old enough or DDM rule not found or to remove when done; to clean up
2668                     done_requests_set.add(request_id)
2669                 elif dc_req_spec.status == DataCarouselRequestStatus.queued:
2670                     # requests queued while related tasks all terminated; to cancel (to clean up in next cycle)
2671                     self.cancel_request(dc_req_spec, by=by, reason="queued_while_all_tasks_ended")
2672                 elif dc_req_spec.status == DataCarouselRequestStatus.staging:
2673                     # requests staging while related tasks all terminated; to cancel (to clean up in next cycle)
2674                     self.cancel_request(dc_req_spec, by=by, reason="staging_while_all_tasks_ended")
2675                 elif dc_req_spec.status == DataCarouselRequestStatus.cancelled:
2676                     # requests cancelled; to clean up
2677                     cancelled_requests_set.add(request_id)
2678                 elif dc_req_spec.status == DataCarouselRequestStatus.retired:
2679                     # requests retired; to clean up
2680                     retired_requests_set.add(request_id)
2681             # loop over requests of active tasks to cancel bad ones
2682             for request_id in request_ids_of_active_tasks:
2683                 dc_req_spec = active_tasks_requests_map[request_id]
2684                 if dc_req_spec.status == DataCarouselRequestStatus.staging and dc_req_spec.get_parameter("rule_unfound"):
2685                     # requests staging but DDM rule not found; to cancel
2686                     self.cancel_request(dc_req_spec, by=by, reason="rule_unfound")
2687             # cancel orphan reqeusts
2688             orphan_requests = self._get_orphan_requests()
2689             if orphan_requests:
2690                 for dc_req_spec in orphan_requests:
2691                     self.cancel_request(dc_req_spec, by=by, reason="orphan")
2692             # delete ddm rules of terminated requests of terminated tasks
2693             for request_id in done_requests_set | cancelled_requests_set | retired_requests_set:
2694                 dc_req_spec = terminated_tasks_requests_map[request_id]
2695                 ddm_rule_id = dc_req_spec.ddm_rule_id
2696                 if ddm_rule_id:
2697                     try:
2698                         ret = self.ddmIF.delete_replication_rule(ddm_rule_id)
2699                         if ret is False:
2700                             tmp_log.warning(f"request_id={request_id} ddm_rule_id={ddm_rule_id} rule not found ; skipped")
2701                         else:
2702                             tmp_log.debug(f"request_id={request_id} ddm_rule_id={ddm_rule_id} deleted DDM rule")
2703                     except Exception:
2704                         tmp_log.error(f"request_id={request_id} ddm_rule_id={ddm_rule_id} failed to delete DDM rule; {traceback.format_exc()}")
2705             # delete terminated requests of terminated tasks
2706             if done_requests_set or cancelled_requests_set or retired_requests_set:
2707                 if done_requests_set:
2708                     # done requests
2709                     ret = self.taskBufferIF.delete_data_carousel_requests_JEDI(list(done_requests_set))
2710                     if ret is None:
2711                         tmp_log.warning(f"failed to delete done requests; skipped")
2712                     else:
2713                         tmp_log.debug(f"deleted {ret} done requests older than {done_age_limit_days} days or rule_unfound or remove_when_done")
2714                 if cancelled_requests_set:
2715                     # cancelled requests
2716                     ret = self.taskBufferIF.delete_data_carousel_requests_JEDI(list(cancelled_requests_set))
2717                     if ret is None:
2718                         tmp_log.warning(f"failed to delete cancelled requests; skipped")
2719                     else:
2720                         tmp_log.debug(f"deleted {ret} cancelled requests")
2721                 if retired_requests_set:
2722                     # retired requests
2723                     ret = self.taskBufferIF.delete_data_carousel_requests_JEDI(list(retired_requests_set))
2724                     if ret is None:
2725                         tmp_log.warning(f"failed to delete retired requests; skipped")
2726                     else:
2727                         tmp_log.debug(f"deleted {ret} retired requests")
2728             else:
2729                 tmp_log.debug(f"no terminated requests to delete; skipped")
2730             # clean up outdated requests
2731             ret_outdated = self.taskBufferIF.clean_up_data_carousel_requests_JEDI(time_limit_days=outdated_age_limit_days)
2732             if ret_outdated is None:
2733                 tmp_log.warning(f"failed to delete outdated requests; skipped")
2734             else:
2735                 tmp_log.debug(f"deleted {ret_outdated} outdated requests older than {outdated_age_limit_days} days")
2736         except Exception:
2737             tmp_log.error(f"got error ; {traceback.format_exc()}")
2738 
2739     def rescue_pending_tasks_with_done_requests(self):
2740         """
2741         Rescue pending tasks which have data carousel requests in done status by updating staged files in DB
2742         Usually these tasks reuse data carousel requests done previously
2743         """
2744         tmp_log = LogWrapper(logger, "rescue_pending_tasks_with_done_requests")
2745         # get requests of pending tasks
2746         pending_tasked_requests_map, pending_tasked_relation_map = self.taskBufferIF.get_data_carousel_requests_by_task_status_JEDI(
2747             status_filter_list=["pending"]
2748         )
2749         # create inverse relation map: request_id -> list of task_id
2750         pending_tasked_inverse_relation_map = dict()
2751         for task_id, request_id_list in pending_tasked_relation_map.items():
2752             for request_id in request_id_list:
2753                 pending_tasked_inverse_relation_map.setdefault(request_id, [])
2754                 pending_tasked_inverse_relation_map[request_id].append(task_id)
2755         # loop over requests
2756         for dc_req_spec in pending_tasked_requests_map.values():
2757             try:
2758                 if dc_req_spec.status == DataCarouselRequestStatus.done:
2759                     # requests in done status of pending tasks; trigger update staged files
2760                     task_id_list = pending_tasked_inverse_relation_map.get(dc_req_spec.request_id, [])
2761                     if task_id_list:
2762                         self._update_staged_files(dc_req_spec, task_id_list=task_id_list)
2763                         tmp_log.debug(f"request_id={dc_req_spec.request_id} status={dc_req_spec.status} updated staged files for pending tasks: {task_id_list}")
2764             except Exception:
2765                 tmp_log.error(f"request_id={dc_req_spec.request_id} got error ; {traceback.format_exc()}")
2766 
2767     def resubmit_request(
2768         self, orig_dc_req_spec: DataCarouselRequestSpec, submit_idds_request=True, exclude_prev_dst: bool = False, by: str = "manual", reason: str | None = None
2769     ) -> tuple[DataCarouselRequestSpec | None, str | None]:
2770         """
2771         Resubmit a request by ending the old request and submitting a new request
2772         The request status must be in staging, done, cancelled, retired
2773         A staging request will be cancelled, a done request will be retired. Cancelled and retired requests are intact
2774         Return the spec of newly resubmitted request
2775 
2776         Args:
2777             orig_dc_req_spec (DataCarouselRequestSpec): original spec of the request to resubmit from
2778             submit_idds_request (bool): whether to submit corresponding iDDS request; default is True
2779             exclude_prev_dst (bool): whether to exclude previous destination
2780             by (str): annotation of the caller of this method; default is "manual"
2781             reason (str|None): annotation of the reason for resubmitting
2782 
2783         Returns:
2784             DataCarouselRequestSpec|None : spec of the resubmitted reqeust spec if success, None otherwise
2785             str|None : error message if any, None otherwise
2786         """
2787         tmp_log = LogWrapper(
2788             logger,
2789             f"resubmit_request orig_request_id={orig_dc_req_spec.request_id} exclude_prev_dst={exclude_prev_dst} by={by}"
2790             + (f" reason={reason}" if reason else " "),
2791         )
2792         # initialized
2793         dc_req_spec_resubmitted = None
2794         err_msg = None
2795         # get lock
2796         with self.request_lock(orig_dc_req_spec.request_id) as locked_spec:
2797             if not locked_spec:
2798                 # not getting lock; skip
2799                 err_msg = "did not get lock; skipped"
2800                 tmp_log.warning(err_msg)
2801                 return None, err_msg
2802             # got locked spec
2803             orig_dc_req_spec = locked_spec
2804             # dummy spec to resubmit
2805             dummy_dc_req_spec_to_resubmit = get_resubmit_request_spec(orig_dc_req_spec, exclude_prev_dst)
2806             # check and choose availble destination RSE
2807             destination_rse = self._choose_destination_rse_for_request(dummy_dc_req_spec_to_resubmit)
2808             if destination_rse is None:
2809                 err_msg = f"no other available destination RSE for request_id={orig_dc_req_spec.request_id}; skipped"
2810                 tmp_log.warning(err_msg)
2811                 return None, err_msg
2812             # resubmit
2813             dc_req_spec_resubmitted = self.taskBufferIF.resubmit_data_carousel_request_JEDI(orig_dc_req_spec.request_id, exclude_prev_dst)
2814             if dc_req_spec_resubmitted:
2815                 new_request_id = dc_req_spec_resubmitted.request_id
2816                 tmp_log.debug(f"resubmitted request_id={new_request_id}")
2817                 # expire DDM rule of original request
2818                 if orig_dc_req_spec.ddm_rule_id:
2819                     short_time = 5
2820                     self._refresh_ddm_rule(orig_dc_req_spec.ddm_rule_id, short_time)
2821                 # stage the resubmitted request immediately without queuing
2822                 is_ok, _, dc_req_spec_resubmitted = self.stage_request(
2823                     dc_req_spec_resubmitted, destination_rse=destination_rse, submit_idds_request=submit_idds_request
2824                 )
2825                 if is_ok:
2826                     tmp_log.debug(f"staged resubmitted request_id={new_request_id}")
2827                 else:
2828                     err_msg = f"failed to stage resubmitted request_id={new_request_id}; skipped"
2829                     tmp_log.warning(err_msg)
2830             elif dc_req_spec_resubmitted is False:
2831                 err_msg = f"request not found or not resubmittable; skipped"
2832                 tmp_log.warning(err_msg)
2833             else:
2834                 err_msg = f"failed to resubmit"
2835                 tmp_log.error(err_msg)
2836         # return
2837         return dc_req_spec_resubmitted, err_msg
2838 
2839     # def _unset_ddm_rule_source_rse(self, rule_id: str) -> bool:
2840     #     """
2841     #     Unset source_replica_expression of a DDM rule
2842 
2843     #     Args:
2844     #         rule_id (str): DDM rule ID
2845 
2846     #     Returns:
2847     #         bool : True for success, False otherwise
2848     #     """
2849     #     set_map = {"source_replica_expression": None}
2850     #     ret = self.ddmIF.update_rule_by_id(rule_id, set_map)
2851     #     return ret
2852 
2853     def change_request_source_rse(
2854         self, dc_req_spec: DataCarouselRequestSpec, cancel_fts: bool = False, change_src_expr: bool = False, source_rse: str | None = None
2855     ) -> tuple[bool | None, DataCarouselRequestSpec, str | None]:
2856         """
2857         Change source RSE
2858         If the request is staging, unset source_replica_expression of its DDM rule
2859         If still queued, re-choose the source_rse to be another tape
2860         Skipped if the request is not staging or queued
2861 
2862         Args:
2863             dc_req_spec (DataCarouselRequestSpec): spec of the request
2864             cancel_fts (bool): whether to cancel current FTS requests on DDM
2865             change_src_expr (bool): whether to change source_replica_expression of the DDM rule by replacing old source with new one, instead of just dropping old source
2866             source_rse (str|None): if set, use this source RSE instead of choosing one randomly, also force change_src_expr to be True; default is None
2867 
2868         Returns:
2869             bool|None : True for success, False for failure, None if skipped
2870             DataCarouselRequestSpec|None: spec of the request after changing source
2871             str|None: error message if any, None otherwise
2872         """
2873         tmp_log = LogWrapper(logger, f"change_request_source_rse request_id={dc_req_spec.request_id}")
2874         if source_rse:
2875             change_src_expr = True
2876         ret = None
2877         err_msg = None
2878         rse_set = set()
2879         # re-choose source_rse for queued request
2880         active_source_rses_set = self._get_active_source_rses()
2881         dataset = dc_req_spec.dataset
2882         source_type, rse_set_orig, staging_rule, to_pin, suggested_dst_list = self._get_source_type_of_dataset(dataset, active_source_rses_set)
2883         replicas_map = self._get_full_replicas_per_type(dataset)
2884         # exclude original source RSE if possible
2885         if dc_req_spec.status == DataCarouselRequestStatus.staging and not rse_set_orig:
2886             # for already staging request, DDM rule already exists, choose source RSE from unfiltered tape replicas if no filtered RSE
2887             rse_set |= set(replicas_map["tape"])
2888         if rse_set_orig:
2889             rse_set |= rse_set_orig
2890         if (
2891             staging_rule
2892             and (source_replica_expression := staging_rule.get("source_replica_expression"))
2893             and source_replica_expression == f"{SRC_REPLI_EXPR_PREFIX}|{dc_req_spec.source_rse}"
2894         ):
2895             # exclude source RSE already in source_replica_expression
2896             rse_set.discard(dc_req_spec.source_rse)
2897         # check status of the request
2898         if dc_req_spec.status == DataCarouselRequestStatus.queued:
2899             # exclude old source RSE for queued request
2900             rse_set.discard(dc_req_spec.source_rse)
2901             if not rse_set:
2902                 # no availible source RSE
2903                 err_msg = f"dataset={dataset} has no other source RSE available; skipped"
2904                 tmp_log.warning(err_msg)
2905                 ret = False
2906             elif source_type == "datadisk":
2907                 # replicas already on datadisk; skip
2908                 err_msg = f"dataset={dataset} already has replica on datadisks {rse_set} ; skipped"
2909                 tmp_log.debug(err_msg)
2910                 ret = False
2911             elif source_type == "tape":
2912                 # replicas only on tape
2913                 new_source_rse = None
2914                 if source_rse:
2915                     if source_rse not in rse_set:
2916                         # source_rse not in available RSEs
2917                         err_msg = f"dataset={dataset} source_rse={source_rse} not in available RSEs {rse_set} ; skipped"
2918                         tmp_log.warning(err_msg)
2919                         ret = False
2920                     else:
2921                         # use source_rse
2922                         new_source_rse = source_rse
2923                 else:
2924                     # choose source RSE
2925                     tmp_log.debug(f"dataset={dataset} on tapes {rse_set} ; choosing one")
2926                     _, new_source_rse, ddm_rule_id = self._choose_tape_source_rse(dataset, rse_set, staging_rule)
2927                     if not new_source_rse:
2928                         # failed to choose source RSE
2929                         err_msg = f"dataset={dataset} failed to choose source RSE ; skipped"
2930                         tmp_log.warning(err_msg)
2931                         ret = False
2932                 # fill new attributes for queued request
2933                 if ret is not False and new_source_rse:
2934                     with self.request_lock(dc_req_spec.request_id) as locked_spec:
2935                         if not locked_spec:
2936                             # not getting lock; skip
2937                             err_msg = "did not get lock; skipped"
2938                             tmp_log.warning(err_msg)
2939                             return False, dc_req_spec, err_msg
2940                         # got locked spec
2941                         dc_req_spec = locked_spec
2942                         # fill attributes
2943                         dc_req_spec.source_rse = new_source_rse
2944                         dc_req_spec.ddm_rule_id = ddm_rule_id
2945                         dc_req_spec.source_tape = self._get_source_tape_from_rse(dc_req_spec.source_rse)
2946                         if to_pin:
2947                             dc_req_spec.set_parameter("to_pin", True)
2948                         if suggested_dst_list:
2949                             dc_req_spec.set_parameter("suggested_dst_list", suggested_dst_list)
2950                         # update DB
2951                         tmp_ret = self.taskBufferIF.update_data_carousel_request_JEDI(dc_req_spec)
2952                         if tmp_ret:
2953                             tmp_log.info(
2954                                 f"updated DB about change source of queued request; source_rse={dc_req_spec.source_rse} , ddm_rule_id={dc_req_spec.ddm_rule_id} , to_pin={to_pin} , suggested_dst_list={suggested_dst_list}"
2955                             )
2956                             ret = True
2957                         else:
2958                             err_msg = f"failed to update DB ; skipped"
2959                             tmp_log.error(err_msg)
2960                             ret = False
2961             else:
2962                 # no replica found on tape nor on datadisk; skip
2963                 err_msg = f"dataset={dataset} has no replica on any tape or datadisk ; skipped"
2964                 tmp_log.debug(err_msg)
2965                 ret = False
2966         elif dc_req_spec.status == DataCarouselRequestStatus.staging:
2967             # unset source_replica_expression of DDM rule for staging request
2968             set_map = {"source_replica_expression": None}
2969             if change_src_expr:
2970                 # change source_replica_expression by replacing old source with new one
2971                 if not rse_set:
2972                     # no availible source RSE
2973                     err_msg = f"dataset={dataset} has no other source RSE available; skipped"
2974                     tmp_log.warning(err_msg)
2975                     ret = False
2976                 else:
2977                     new_source_rse = None
2978                     if source_rse:
2979                         if source_rse not in rse_set:
2980                             # source_rse not in available RSEs
2981                             err_msg = f"dataset={dataset} source_rse={source_rse} not in available RSEs {rse_set} ; skipped"
2982                             tmp_log.warning(err_msg)
2983                             ret = False
2984                         else:
2985                             # use source_rse
2986                             new_source_rse = source_rse
2987                     else:
2988                         # choose source RSE
2989                         tmp_log.debug(f"dataset={dataset} on tapes {rse_set} ; choosing one")
2990                         new_source_rse = random.choice(list(rse_set))
2991                         if not new_source_rse:
2992                             # failed to choose source RSE
2993                             err_msg = f"dataset={dataset} failed to choose source RSE ; skipped"
2994                             tmp_log.warning(err_msg)
2995                             ret = False
2996                     if ret is not False and new_source_rse:
2997                         with self.request_lock(dc_req_spec.request_id) as locked_spec:
2998                             if not locked_spec:
2999                                 # not getting lock; skip
3000                                 err_msg = "did not get lock; skipped"
3001                                 tmp_log.warning(err_msg)
3002                                 return False, dc_req_spec, err_msg
3003                             else:
3004                                 # got locked spec
3005                                 dc_req_spec = locked_spec
3006                                 # fill new attributes for staging request
3007                                 dc_req_spec.source_rse = new_source_rse
3008                                 dc_req_spec.source_tape = self._get_source_tape_from_rse(dc_req_spec.source_rse)
3009                                 # update source_replica_expression
3010                                 set_map["source_replica_expression"] = f"{SRC_REPLI_EXPR_PREFIX}|{dc_req_spec.source_rse}"
3011                                 # update DB
3012                                 tmp_ret = self.taskBufferIF.update_data_carousel_request_JEDI(dc_req_spec)
3013                                 if tmp_ret is not None:
3014                                     tmp_log.info(
3015                                         f"updated DB about change source of staging request; source_rse={dc_req_spec.source_rse} , ddm_rule_id={dc_req_spec.ddm_rule_id}"
3016                                     )
3017                                     ret = True
3018                                 else:
3019                                     err_msg = f"failed to update DB ; skipped"
3020                                     tmp_log.error(err_msg)
3021                                     ret = False
3022             # update DDM rule
3023             if ret is not False:
3024                 tmp_ret = self.ddmIF.update_rule_by_id(dc_req_spec.ddm_rule_id, set_map)
3025                 if tmp_ret:
3026                     tmp_log.debug(f"ddm_rule_id={dc_req_spec.ddm_rule_id} params={set_map} done")
3027                     ret = True
3028                 else:
3029                     err_msg = f"ddm_rule_id={dc_req_spec.ddm_rule_id} params={set_map} failed to update DDM rule ; skipped"
3030                     tmp_log.error(f"ddm_rule_id={dc_req_spec.ddm_rule_id} params={set_map} got {tmp_ret}; skipped")
3031                     ret = False
3032             # cancel FTS requests in separate DDM rule update calls (otherwise DDM will not cancel FTS)
3033             if cancel_fts:
3034                 cancel_fts_success = True
3035                 # call first time to cancel requests
3036                 _set_map = {"cancel_requests": True, "state": "STUCK"}
3037                 tmp_ret = self.ddmIF.update_rule_by_id(dc_req_spec.ddm_rule_id, _set_map)
3038                 if tmp_ret:
3039                     tmp_log.debug(f"ddm_rule_id={dc_req_spec.ddm_rule_id} params={_set_map} done")
3040                 else:
3041                     tmp_log.warning(f"ddm_rule_id={dc_req_spec.ddm_rule_id} params={_set_map} got {tmp_ret} ; skipped")
3042                     cancel_fts_success = False
3043                 # call second time to boost rule
3044                 _set_map = {"boost_rule": True}
3045                 tmp_ret = self.ddmIF.update_rule_by_id(dc_req_spec.ddm_rule_id, _set_map)
3046                 if tmp_ret:
3047                     tmp_log.debug(f"ddm_rule_id={dc_req_spec.ddm_rule_id} params={_set_map} done")
3048                 else:
3049                     tmp_log.warning(f"ddm_rule_id={dc_req_spec.ddm_rule_id} params={_set_map} got {tmp_ret} ; skipped")
3050                     cancel_fts_success = False
3051                 if not cancel_fts_success:
3052                     err_msg = f"ddm_rule_id={dc_req_spec.ddm_rule_id} failed to cancel FTS requests ; skipped"
3053         return ret, dc_req_spec, err_msg