Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-10 08:39:00

0001 import datetime
0002 from concurrent.futures import ThreadPoolExecutor
0003 from threading import Lock
0004 from typing import Any, Dict, List
0005 
0006 from pandacommon.pandalogger.LogWrapper import LogWrapper
0007 from pandacommon.pandalogger.PandaLogger import PandaLogger
0008 from pandacommon.pandautils.PandaUtils import naive_utcnow
0009 
0010 from pandaserver.api.v1.common import (
0011     MESSAGE_DATABASE,
0012     TIME_OUT,
0013     TimedMethod,
0014     generate_response,
0015     get_dn,
0016     request_validation,
0017 )
0018 from pandaserver.srvcore.panda_request import PandaRequest
0019 from pandaserver.taskbuffer.DataCarousel import (
0020     DataCarouselInterface,
0021     DataCarouselRequestStatus,
0022 )
0023 from pandaserver.taskbuffer.TaskBuffer import TaskBuffer
0024 
0025 _logger = PandaLogger().getLogger("api_data_carousel")
0026 
0027 # These global variables are initialized in the init_task_buffer method
0028 global_task_buffer = None
0029 global_dcif = None
0030 
0031 # These global variables don't depend on DB access and can be initialized here
0032 # global_proxy_cache = panda_proxy_cache.MyProxyInterface()
0033 # global_token_cache = token_cache.TokenCache()
0034 
0035 
0036 def init_task_buffer(task_buffer: TaskBuffer) -> None:
0037     """
0038     Initialize the task buffer and other interfaces. This method needs to be called before any other method in this module.
0039     """
0040     global global_task_buffer
0041     global_task_buffer = task_buffer
0042 
0043     global global_dcif
0044     global_dcif = DataCarouselInterface(global_task_buffer)
0045 
0046 
0047 @request_validation(_logger, secure=True, production=True, request_method="POST")
0048 def change_staging_destination(req: PandaRequest, request_id: int | None = None, dataset: str | None = None) -> dict:
0049     """
0050     Change destination of staging
0051 
0052     The current active staging request will be cancelled, and a new request will be created with the newly selected destination RSE, excluding the original destination.
0053     The reqeusts can be specified by request_id or dataset (if both exist, request_id is taken).
0054     Requires a secure connection production role.
0055 
0056     API details:
0057         HTTP Method: POST
0058         Path: /v1/data_carousel/change_staging_destination
0059 
0060     Args:
0061         req(PandaRequest): internally generated request object
0062         request_id (int|None): request_id of the staging request, e.g. `123`
0063         dataset (str|None): dataset name of the staging request in the format of Rucio DID, e.g. `"mc20_13TeV:mc20_13TeV.700449.Sh_2211_Wtaunu_mW_120_ECMS_BFilter.merge.AOD.e8351_s3681_r13144_r13146_tid36179107_00"`
0064 
0065     Returns:
0066         dict: dictionary `{'success': True/False, 'message': 'Description of error', 'data': <requested data>}`
0067     """
0068     tmp_logger = LogWrapper(_logger, f"change_staging_destination request_id={request_id} dataset={dataset}")
0069     tmp_logger.debug("Start")
0070     success, message, data = False, "", None
0071     dc_req_spec_resubmitted = None
0072     to_submit_idds = False
0073     time_start = naive_utcnow()
0074 
0075     dc_req_spec = None
0076     if request_id is not None:
0077         # specified by request_id
0078         dc_req_spec = global_dcif.get_request_by_id(request_id)
0079     elif request_id is None and dataset is not None:
0080         # specified by dataset
0081         dc_req_spec = global_dcif.get_request_by_dataset(dataset)
0082 
0083     if dc_req_spec is not None:
0084         dc_req_spec_resubmitted, err_msg = global_dcif.resubmit_request(dc_req_spec, submit_idds_request=False, exclude_prev_dst=True)
0085         if not dc_req_spec_resubmitted or err_msg:
0086             err_msg = f"failed to resubmit request_id={dc_req_spec.request_id} : {err_msg}"
0087             tmp_logger.error(err_msg)
0088             success, message = False, err_msg
0089         else:
0090             to_submit_idds = True
0091     else:
0092         err_msg = f"failed to get corresponding request"
0093         tmp_logger.error(err_msg)
0094         success, message = False, err_msg
0095 
0096     if dc_req_spec_resubmitted and dc_req_spec_resubmitted.status == DataCarouselRequestStatus.staging:
0097         success = True
0098         data = {"request_id": dc_req_spec.request_id, "new_request_id": dc_req_spec_resubmitted.request_id, "dataset": dc_req_spec_resubmitted.dataset}
0099         message = "new request resubmitted, destination changed"
0100         if to_submit_idds:
0101             new_request_id = dc_req_spec_resubmitted.request_id
0102             task_id_list = global_dcif._get_related_tasks(new_request_id)
0103             if task_id_list:
0104                 tmp_logger.debug(f"related tasks: {task_id_list}")
0105                 with ThreadPoolExecutor() as thread_pool:
0106                     thread_pool.map((lambda task_id: global_dcif._submit_idds_stagein_request(task_id, dc_req_spec_resubmitted)), task_id_list)
0107                 tmp_logger.debug(f"submitted corresponding iDDS requests for related tasks")
0108                 message += "; submitted iDDS requests"
0109 
0110             else:
0111                 err_msg = f"failed to get related tasks; skipped to submit iDDS requests"
0112                 tmp_logger.warning(err_msg)
0113                 message += f"; {err_msg}"
0114 
0115     time_delta = naive_utcnow() - time_start
0116     tmp_logger.debug(f"Done. Took {time_delta.seconds}.{time_delta.microseconds // 1000:03d} sec")
0117 
0118     return generate_response(success, message, data)
0119 
0120 
0121 @request_validation(_logger, secure=True, production=True, request_method="POST")
0122 def change_staging_source(
0123     req: PandaRequest,
0124     request_id: int | None = None,
0125     dataset: str | None = None,
0126     cancel_fts: bool = False,
0127     change_src_expr: bool = False,
0128     source_rse: str | None = None,
0129 ) -> dict:
0130     """
0131     Change source of staging
0132 
0133     If the request is queued, its source_rse will be rechosen, excluding the original source.
0134     If the request is staging, the source_replica_expression of its DDM rule is unset so new source can be tried.
0135     Only effective on queued or staging requests.
0136     The requests can be specified by request_id or dataset (if both exist, request_id is taken).
0137     Requires a secure connection production role.
0138 
0139     API details:
0140         HTTP Method: POST
0141         Path: /v1/data_carousel/change_staging_source
0142 
0143     Args:
0144         req(PandaRequest): internally generated request object
0145         request_id (int|None): request_id of the staging request, e.g. `123`
0146         dataset (str|None): dataset name of the staging request in the format of Rucio DID, e.g. `"mc20_13TeV:mc20_13TeV.700449.Sh_2211_Wtaunu_mW_120_ECMS_BFilter.merge.AOD.e8351_s3681_r13144_r13146_tid36179107_00"`
0147         cancel_fts (bool): whether to cancel current FTS requests on DDM, False by default
0148         change_src_expr (bool): whether to change source_replica_expression of the DDM rule by replacing old source with new one, instead of just dropping old source
0149         source_rse (str|None): if set, use this source RSE instead of choosing one randomly, also force change_src_expr to be True; default is None
0150 
0151     Returns:
0152         dict: dictionary `{'success': True/False, 'message': 'Description of error', 'data': <requested data>}`
0153     """
0154     tmp_logger = LogWrapper(
0155         _logger,
0156         f"change_staging_source request_id={request_id} dataset={dataset} cancel_fts={cancel_fts} change_src_expr={change_src_expr} source_rse={source_rse}",
0157     )
0158     tmp_logger.debug("Start")
0159     success, message, data = False, "", None
0160     time_start = naive_utcnow()
0161 
0162     dc_req_spec = None
0163     if request_id is not None:
0164         # specified by request_id
0165         dc_req_spec = global_dcif.get_request_by_id(request_id)
0166     elif request_id is None and dataset is not None:
0167         # specified by dataset
0168         dc_req_spec = global_dcif.get_request_by_dataset(dataset)
0169 
0170     if dc_req_spec is not None:
0171         status = dc_req_spec.status
0172         orig_source_rse = dc_req_spec.source_rse
0173         if status not in [DataCarouselRequestStatus.queued, DataCarouselRequestStatus.staging]:
0174             err_msg = f"request_id={dc_req_spec.request_id} status={status} not queued or staging; skipped"
0175             tmp_logger.warning(err_msg)
0176             success, message = False, err_msg
0177         else:
0178             ret, dc_req_spec, err_msg = global_dcif.change_request_source_rse(dc_req_spec, cancel_fts, change_src_expr, source_rse)
0179             if not ret:
0180                 err_msg = f"failed to change source request_id={dc_req_spec.request_id} : {err_msg}"
0181                 tmp_logger.error(err_msg)
0182                 success, message = False, err_msg
0183             else:
0184                 success = True
0185                 if dc_req_spec.status == DataCarouselRequestStatus.queued or change_src_expr:
0186                     message = f"status={status} changed source_rse from {orig_source_rse} to {dc_req_spec.source_rse}"
0187                 else:
0188                     message = f"status={status} source replica expression is dropped"
0189                 data = {
0190                     "request_id": dc_req_spec.request_id,
0191                     "dataset": dc_req_spec.dataset,
0192                     "source_rse": dc_req_spec.source_rse,
0193                     "ddm_rule_id": dc_req_spec.ddm_rule_id,
0194                 }
0195     else:
0196         err_msg = f"failed to get corresponding request"
0197         tmp_logger.error(err_msg)
0198         success, message = False, err_msg
0199 
0200     time_delta = naive_utcnow() - time_start
0201     tmp_logger.debug(f"Done. Took {time_delta.seconds}.{time_delta.microseconds // 1000:03d} sec")
0202 
0203     return generate_response(success, message, data)
0204 
0205 
0206 @request_validation(_logger, secure=True, production=True, request_method="POST")
0207 def force_to_staging(req: PandaRequest, request_id: int | None = None, dataset: str | None = None) -> dict:
0208     """
0209     Force to staging
0210 
0211     The request will skip the queue and go to staging immediately (will submit DDM rules).
0212     Only effective on queued requests.
0213     The reqeusts can be specified by request_id or dataset (if both exist, request_id is taken).
0214     Requires a secure connection production role.
0215 
0216     API details:
0217         HTTP Method: POST
0218         Path: /v1/data_carousel/force_to_staging
0219 
0220     Args:
0221         req(PandaRequest): internally generated request object
0222         request_id (int|None): request_id of the staging request, e.g. `123`
0223         dataset (str|None): dataset name of the staging request in the format of Rucio DID, e.g. `"mc20_13TeV:mc20_13TeV.700449.Sh_2211_Wtaunu_mW_120_ECMS_BFilter.merge.AOD.e8351_s3681_r13144_r13146_tid36179107_00"`
0224 
0225     Returns:
0226         dict: dictionary `{'success': True/False, 'message': 'Description of error', 'data': <requested data>}`
0227     """
0228     tmp_logger = LogWrapper(_logger, f"force_to_staging request_id={request_id} dataset={dataset}")
0229     tmp_logger.debug("Start")
0230     success, message, data = False, "", None
0231     time_start = naive_utcnow()
0232 
0233     dc_req_spec = None
0234     if request_id is not None:
0235         # specified by request_id
0236         dc_req_spec = global_dcif.get_request_by_id(request_id)
0237     elif request_id is None and dataset is not None:
0238         # specified by dataset
0239         dc_req_spec = global_dcif.get_request_by_dataset(dataset)
0240 
0241     if dc_req_spec is not None:
0242         is_ok, err_msg, dc_req_spec = global_dcif.stage_request(dc_req_spec)
0243         if not is_ok:
0244             err_msg = f"failed to stage request_id={dc_req_spec.request_id} : {err_msg}"
0245             tmp_logger.error(err_msg)
0246             success, message = False, err_msg
0247         else:
0248             success = True
0249             message = f"status has become {dc_req_spec.status}"
0250             data = {
0251                 "request_id": dc_req_spec.request_id,
0252                 "dataset": dc_req_spec.dataset,
0253                 "status": dc_req_spec.status,
0254                 "ddm_rule_id": dc_req_spec.ddm_rule_id,
0255             }
0256     else:
0257         err_msg = f"failed to get corresponding request"
0258         tmp_logger.error(err_msg)
0259         success, message = False, err_msg
0260 
0261     time_delta = naive_utcnow() - time_start
0262     tmp_logger.debug(f"Done. Took {time_delta.seconds}.{time_delta.microseconds // 1000:03d} sec")
0263 
0264     return generate_response(success, message, data)