File indexing completed on 2026-04-10 08:39:00
0001 import datetime
0002 from concurrent.futures import ThreadPoolExecutor
0003 from threading import Lock
0004 from typing import Any, Dict, List
0005
0006 from pandacommon.pandalogger.LogWrapper import LogWrapper
0007 from pandacommon.pandalogger.PandaLogger import PandaLogger
0008 from pandacommon.pandautils.PandaUtils import naive_utcnow
0009
0010 from pandaserver.api.v1.common import (
0011 MESSAGE_DATABASE,
0012 TIME_OUT,
0013 TimedMethod,
0014 generate_response,
0015 get_dn,
0016 request_validation,
0017 )
0018 from pandaserver.srvcore.panda_request import PandaRequest
0019 from pandaserver.taskbuffer.DataCarousel import (
0020 DataCarouselInterface,
0021 DataCarouselRequestStatus,
0022 )
0023 from pandaserver.taskbuffer.TaskBuffer import TaskBuffer
0024
0025 _logger = PandaLogger().getLogger("api_data_carousel")
0026
0027
0028 global_task_buffer = None
0029 global_dcif = None
0030
0031
0032
0033
0034
0035
0036 def init_task_buffer(task_buffer: TaskBuffer) -> None:
0037 """
0038 Initialize the task buffer and other interfaces. This method needs to be called before any other method in this module.
0039 """
0040 global global_task_buffer
0041 global_task_buffer = task_buffer
0042
0043 global global_dcif
0044 global_dcif = DataCarouselInterface(global_task_buffer)
0045
0046
0047 @request_validation(_logger, secure=True, production=True, request_method="POST")
0048 def change_staging_destination(req: PandaRequest, request_id: int | None = None, dataset: str | None = None) -> dict:
0049 """
0050 Change destination of staging
0051
0052 The current active staging request will be cancelled, and a new request will be created with the newly selected destination RSE, excluding the original destination.
0053 The reqeusts can be specified by request_id or dataset (if both exist, request_id is taken).
0054 Requires a secure connection production role.
0055
0056 API details:
0057 HTTP Method: POST
0058 Path: /v1/data_carousel/change_staging_destination
0059
0060 Args:
0061 req(PandaRequest): internally generated request object
0062 request_id (int|None): request_id of the staging request, e.g. `123`
0063 dataset (str|None): dataset name of the staging request in the format of Rucio DID, e.g. `"mc20_13TeV:mc20_13TeV.700449.Sh_2211_Wtaunu_mW_120_ECMS_BFilter.merge.AOD.e8351_s3681_r13144_r13146_tid36179107_00"`
0064
0065 Returns:
0066 dict: dictionary `{'success': True/False, 'message': 'Description of error', 'data': <requested data>}`
0067 """
0068 tmp_logger = LogWrapper(_logger, f"change_staging_destination request_id={request_id} dataset={dataset}")
0069 tmp_logger.debug("Start")
0070 success, message, data = False, "", None
0071 dc_req_spec_resubmitted = None
0072 to_submit_idds = False
0073 time_start = naive_utcnow()
0074
0075 dc_req_spec = None
0076 if request_id is not None:
0077
0078 dc_req_spec = global_dcif.get_request_by_id(request_id)
0079 elif request_id is None and dataset is not None:
0080
0081 dc_req_spec = global_dcif.get_request_by_dataset(dataset)
0082
0083 if dc_req_spec is not None:
0084 dc_req_spec_resubmitted, err_msg = global_dcif.resubmit_request(dc_req_spec, submit_idds_request=False, exclude_prev_dst=True)
0085 if not dc_req_spec_resubmitted or err_msg:
0086 err_msg = f"failed to resubmit request_id={dc_req_spec.request_id} : {err_msg}"
0087 tmp_logger.error(err_msg)
0088 success, message = False, err_msg
0089 else:
0090 to_submit_idds = True
0091 else:
0092 err_msg = f"failed to get corresponding request"
0093 tmp_logger.error(err_msg)
0094 success, message = False, err_msg
0095
0096 if dc_req_spec_resubmitted and dc_req_spec_resubmitted.status == DataCarouselRequestStatus.staging:
0097 success = True
0098 data = {"request_id": dc_req_spec.request_id, "new_request_id": dc_req_spec_resubmitted.request_id, "dataset": dc_req_spec_resubmitted.dataset}
0099 message = "new request resubmitted, destination changed"
0100 if to_submit_idds:
0101 new_request_id = dc_req_spec_resubmitted.request_id
0102 task_id_list = global_dcif._get_related_tasks(new_request_id)
0103 if task_id_list:
0104 tmp_logger.debug(f"related tasks: {task_id_list}")
0105 with ThreadPoolExecutor() as thread_pool:
0106 thread_pool.map((lambda task_id: global_dcif._submit_idds_stagein_request(task_id, dc_req_spec_resubmitted)), task_id_list)
0107 tmp_logger.debug(f"submitted corresponding iDDS requests for related tasks")
0108 message += "; submitted iDDS requests"
0109
0110 else:
0111 err_msg = f"failed to get related tasks; skipped to submit iDDS requests"
0112 tmp_logger.warning(err_msg)
0113 message += f"; {err_msg}"
0114
0115 time_delta = naive_utcnow() - time_start
0116 tmp_logger.debug(f"Done. Took {time_delta.seconds}.{time_delta.microseconds // 1000:03d} sec")
0117
0118 return generate_response(success, message, data)
0119
0120
0121 @request_validation(_logger, secure=True, production=True, request_method="POST")
0122 def change_staging_source(
0123 req: PandaRequest,
0124 request_id: int | None = None,
0125 dataset: str | None = None,
0126 cancel_fts: bool = False,
0127 change_src_expr: bool = False,
0128 source_rse: str | None = None,
0129 ) -> dict:
0130 """
0131 Change source of staging
0132
0133 If the request is queued, its source_rse will be rechosen, excluding the original source.
0134 If the request is staging, the source_replica_expression of its DDM rule is unset so new source can be tried.
0135 Only effective on queued or staging requests.
0136 The requests can be specified by request_id or dataset (if both exist, request_id is taken).
0137 Requires a secure connection production role.
0138
0139 API details:
0140 HTTP Method: POST
0141 Path: /v1/data_carousel/change_staging_source
0142
0143 Args:
0144 req(PandaRequest): internally generated request object
0145 request_id (int|None): request_id of the staging request, e.g. `123`
0146 dataset (str|None): dataset name of the staging request in the format of Rucio DID, e.g. `"mc20_13TeV:mc20_13TeV.700449.Sh_2211_Wtaunu_mW_120_ECMS_BFilter.merge.AOD.e8351_s3681_r13144_r13146_tid36179107_00"`
0147 cancel_fts (bool): whether to cancel current FTS requests on DDM, False by default
0148 change_src_expr (bool): whether to change source_replica_expression of the DDM rule by replacing old source with new one, instead of just dropping old source
0149 source_rse (str|None): if set, use this source RSE instead of choosing one randomly, also force change_src_expr to be True; default is None
0150
0151 Returns:
0152 dict: dictionary `{'success': True/False, 'message': 'Description of error', 'data': <requested data>}`
0153 """
0154 tmp_logger = LogWrapper(
0155 _logger,
0156 f"change_staging_source request_id={request_id} dataset={dataset} cancel_fts={cancel_fts} change_src_expr={change_src_expr} source_rse={source_rse}",
0157 )
0158 tmp_logger.debug("Start")
0159 success, message, data = False, "", None
0160 time_start = naive_utcnow()
0161
0162 dc_req_spec = None
0163 if request_id is not None:
0164
0165 dc_req_spec = global_dcif.get_request_by_id(request_id)
0166 elif request_id is None and dataset is not None:
0167
0168 dc_req_spec = global_dcif.get_request_by_dataset(dataset)
0169
0170 if dc_req_spec is not None:
0171 status = dc_req_spec.status
0172 orig_source_rse = dc_req_spec.source_rse
0173 if status not in [DataCarouselRequestStatus.queued, DataCarouselRequestStatus.staging]:
0174 err_msg = f"request_id={dc_req_spec.request_id} status={status} not queued or staging; skipped"
0175 tmp_logger.warning(err_msg)
0176 success, message = False, err_msg
0177 else:
0178 ret, dc_req_spec, err_msg = global_dcif.change_request_source_rse(dc_req_spec, cancel_fts, change_src_expr, source_rse)
0179 if not ret:
0180 err_msg = f"failed to change source request_id={dc_req_spec.request_id} : {err_msg}"
0181 tmp_logger.error(err_msg)
0182 success, message = False, err_msg
0183 else:
0184 success = True
0185 if dc_req_spec.status == DataCarouselRequestStatus.queued or change_src_expr:
0186 message = f"status={status} changed source_rse from {orig_source_rse} to {dc_req_spec.source_rse}"
0187 else:
0188 message = f"status={status} source replica expression is dropped"
0189 data = {
0190 "request_id": dc_req_spec.request_id,
0191 "dataset": dc_req_spec.dataset,
0192 "source_rse": dc_req_spec.source_rse,
0193 "ddm_rule_id": dc_req_spec.ddm_rule_id,
0194 }
0195 else:
0196 err_msg = f"failed to get corresponding request"
0197 tmp_logger.error(err_msg)
0198 success, message = False, err_msg
0199
0200 time_delta = naive_utcnow() - time_start
0201 tmp_logger.debug(f"Done. Took {time_delta.seconds}.{time_delta.microseconds // 1000:03d} sec")
0202
0203 return generate_response(success, message, data)
0204
0205
0206 @request_validation(_logger, secure=True, production=True, request_method="POST")
0207 def force_to_staging(req: PandaRequest, request_id: int | None = None, dataset: str | None = None) -> dict:
0208 """
0209 Force to staging
0210
0211 The request will skip the queue and go to staging immediately (will submit DDM rules).
0212 Only effective on queued requests.
0213 The reqeusts can be specified by request_id or dataset (if both exist, request_id is taken).
0214 Requires a secure connection production role.
0215
0216 API details:
0217 HTTP Method: POST
0218 Path: /v1/data_carousel/force_to_staging
0219
0220 Args:
0221 req(PandaRequest): internally generated request object
0222 request_id (int|None): request_id of the staging request, e.g. `123`
0223 dataset (str|None): dataset name of the staging request in the format of Rucio DID, e.g. `"mc20_13TeV:mc20_13TeV.700449.Sh_2211_Wtaunu_mW_120_ECMS_BFilter.merge.AOD.e8351_s3681_r13144_r13146_tid36179107_00"`
0224
0225 Returns:
0226 dict: dictionary `{'success': True/False, 'message': 'Description of error', 'data': <requested data>}`
0227 """
0228 tmp_logger = LogWrapper(_logger, f"force_to_staging request_id={request_id} dataset={dataset}")
0229 tmp_logger.debug("Start")
0230 success, message, data = False, "", None
0231 time_start = naive_utcnow()
0232
0233 dc_req_spec = None
0234 if request_id is not None:
0235
0236 dc_req_spec = global_dcif.get_request_by_id(request_id)
0237 elif request_id is None and dataset is not None:
0238
0239 dc_req_spec = global_dcif.get_request_by_dataset(dataset)
0240
0241 if dc_req_spec is not None:
0242 is_ok, err_msg, dc_req_spec = global_dcif.stage_request(dc_req_spec)
0243 if not is_ok:
0244 err_msg = f"failed to stage request_id={dc_req_spec.request_id} : {err_msg}"
0245 tmp_logger.error(err_msg)
0246 success, message = False, err_msg
0247 else:
0248 success = True
0249 message = f"status has become {dc_req_spec.status}"
0250 data = {
0251 "request_id": dc_req_spec.request_id,
0252 "dataset": dc_req_spec.dataset,
0253 "status": dc_req_spec.status,
0254 "ddm_rule_id": dc_req_spec.ddm_rule_id,
0255 }
0256 else:
0257 err_msg = f"failed to get corresponding request"
0258 tmp_logger.error(err_msg)
0259 success, message = False, err_msg
0260
0261 time_delta = naive_utcnow() - time_start
0262 tmp_logger.debug(f"Done. Took {time_delta.seconds}.{time_delta.microseconds // 1000:03d} sec")
0263
0264 return generate_response(success, message, data)