File indexing completed on 2026-04-10 08:39:00
0001
0002
0003 import datetime
0004 import json
0005 import re
0006 from typing import Any, Dict, List
0007
0008 from pandacommon.pandalogger.LogWrapper import LogWrapper
0009 from pandacommon.pandalogger.PandaLogger import PandaLogger
0010 from pandacommon.pandautils.PandaUtils import naive_utcnow
0011
0012 from pandaserver.api.v1.common import (
0013 MESSAGE_TASK_ID,
0014 extract_production_working_groups,
0015 generate_response,
0016 get_dn,
0017 get_email_address,
0018 get_fqan,
0019 has_production_role,
0020 request_validation,
0021 )
0022 from pandaserver.srvcore.CoreUtils import clean_user_id, make_reassign_comment
0023 from pandaserver.srvcore.panda_request import PandaRequest
0024 from pandaserver.taskbuffer import task_split_rules
0025 from pandaserver.taskbuffer.JediTaskSpec import JediTaskSpec
0026 from pandaserver.taskbuffer.TaskBuffer import TaskBuffer
0027
0028 _logger = PandaLogger().getLogger("api_task")
0029
0030 global_task_buffer = None
0031
0032
0033 def init_task_buffer(task_buffer: TaskBuffer) -> None:
0034 """
0035 Initialize the task buffer. This method needs to be called before any other method in this module.
0036 """
0037 global global_task_buffer
0038 global_task_buffer = task_buffer
0039
0040
0041 @request_validation(_logger, secure=True, request_method="POST")
0042 def retry(
0043 req: PandaRequest,
0044 task_id: int,
0045 new_parameters: Dict | str = None,
0046 no_child_retry: bool = False,
0047 discard_events: bool = False,
0048 disable_staging_mode: bool = False,
0049 keep_gshare_priority: bool = False,
0050 ignore_hard_exhausted: bool = False,
0051 ) -> Dict[str, Any]:
0052 """
0053 Task retry
0054
0055 Retry a given task e.g. in exhausted state. Requires a secure connection without a production role to retry own tasks and with a production role to retry others' tasks.
0056
0057 API details:
0058 HTTP Method: POST
0059 Path: /v1/task/retry
0060
0061 Args:
0062 req(PandaRequest): internally generated request object
0063 task_id(int): JEDI Task ID
0064 new_parameters(Dict, optional): a dictionary with the new parameters for rerunning the task. The new parameters are merged with the existing ones.
0065 The parameters are the attributes in the JediTaskSpec object (https://github.com/PanDAWMS/panda-jedi/blob/master/pandajedi/jedicore/JediTaskSpec.py).
0066 no_child_retry(bool, optional): if True, the child tasks are not retried. Defaults to False
0067 discard_events(bool, optional): if True, events will be discarded. Defaults to False
0068 disable_staging_mode(bool, optional): if True, the task skips staging state and directly goes to subsequent state. Defaults to False
0069 keep_gshare_priority(bool, optional): if True, the task keeps current gshare and priority. Defaults to False
0070 ignore_hard_exhausted(bool, optional): if True, the task ignores the limits for hard exhausted state and can be retried even if it is very faulty. Defaults to False
0071
0072 Returns:
0073 dict: The system response `{"success": success, "message": message, "data": data}`. True for success, False for failure, and an error message. Return code in the data field, 0 for success, others for failure.
0074 """
0075 tmp_logger = LogWrapper(_logger, f"retry < task_id={task_id} >")
0076 tmp_logger.debug("Start")
0077
0078 user = get_dn(req)
0079 production_role = has_production_role(req)
0080
0081
0082 if new_parameters:
0083 if isinstance(new_parameters, str):
0084 new_parameters = json.loads(new_parameters)
0085
0086 try:
0087
0088 old_parameters_json = global_task_buffer.getTaskParamsPanda(task_id)
0089 old_parameters = json.loads(old_parameters_json)
0090
0091
0092 old_parameters.update(new_parameters)
0093 final_task_parameters_json = json.dumps(old_parameters)
0094
0095
0096 ret = global_task_buffer.insertTaskParamsPanda(
0097 final_task_parameters_json,
0098 user,
0099 production_role,
0100 [],
0101 properErrorCode=True,
0102 allowActiveTask=True,
0103 )
0104 except Exception as e:
0105 ret = 1, f"new parameter conversion failed with {str(e)}"
0106 else:
0107
0108 if not production_role and ignore_hard_exhausted:
0109 ignore_hard_exhausted = False
0110
0111 qualifier = JediTaskSpec.get_retry_command_qualifiers(no_child_retry, discard_events, disable_staging_mode, keep_gshare_priority, ignore_hard_exhausted)
0112 qualifier = " ".join(qualifier)
0113
0114 ret = global_task_buffer.sendCommandTaskPanda(
0115 task_id,
0116 user,
0117 production_role,
0118 "retry",
0119 properErrorCode=True,
0120 comQualifier=qualifier,
0121 )
0122
0123 if ret[0] == 5:
0124
0125 job_ids = global_task_buffer.getJobdefIDsForFailedJob(task_id)
0126 clean_id = clean_user_id(user)
0127 for job_id in job_ids:
0128 global_task_buffer.finalizePendingJobs(clean_id, job_id)
0129 global_task_buffer.increaseAttemptNrPanda(task_id, 5)
0130 return_str = f"retry has been triggered for failed jobs while the task is still {ret[1]}"
0131 if not new_parameters:
0132 ret = 0, return_str
0133 else:
0134 ret = 3, return_str
0135
0136 tmp_logger.debug("Done")
0137
0138 data, message = ret
0139 success = True
0140 return generate_response(success, message, data)
0141
0142
0143 @request_validation(_logger, secure=True, production=True, request_method="POST")
0144 def resume(req: PandaRequest, task_id: int) -> Dict[str, Any]:
0145 """
0146 Task resume
0147
0148 Resume a given task. This transitions a paused or throttled task back to its previous active state. Resume can also be used to kick a task in staging state to the next state.
0149 Requires a secure connection and production role.
0150
0151 API details:
0152 HTTP Method: POST
0153 Path: /v1/task/resume
0154
0155 Args:
0156 req(PandaRequest): internally generated request object
0157 task_id(int): JEDI Task ID
0158
0159 Returns:
0160 dict: The system response `{"success": success, "message": message, "data": data}`. True for success, False for failure, and an error message. Return code in the data field, 0 for success, others for failure.
0161 """
0162 tmp_logger = LogWrapper(_logger, f"resume < jediTaskID={task_id} >")
0163 tmp_logger.debug("Start")
0164
0165 user = get_dn(req)
0166 is_production_role = has_production_role(req)
0167
0168
0169 try:
0170 task_id = int(task_id)
0171 except ValueError:
0172 tmp_logger.error("Failed due to invalid task_id")
0173 return generate_response(False, message=MESSAGE_TASK_ID)
0174
0175 ret = global_task_buffer.sendCommandTaskPanda(task_id, user, is_production_role, "resume", properErrorCode=True)
0176 data, message = ret
0177 success = data == 0
0178
0179 tmp_logger.debug("Done")
0180
0181 return generate_response(success, message, data)
0182
0183
0184 @request_validation(_logger, secure=True, production=True, request_method="POST")
0185 def release(req: PandaRequest, task_id: int) -> Dict[str, Any]:
0186 """
0187 Task release
0188
0189 Release a given task by skipping iDDS for staging. Requires a secure connection and production role.
0190
0191 API details:
0192 HTTP Method: POST
0193 Path: /v1/task/release
0194
0195 Args:
0196 req(PandaRequest): internally generated request object
0197 task_id(int): JEDI Task ID
0198
0199 Returns:
0200 dict: The system response `{"success": success, "message": message, "data": data}`. True for success, False for failure, and an error message. Return code in the data field, 0 for success, others for failure.
0201 """
0202
0203 tmp_logger = LogWrapper(_logger, f"release < task_id={task_id} >")
0204 tmp_logger.debug("Start")
0205
0206 user = get_dn(req)
0207 is_production_role = has_production_role(req)
0208
0209
0210 try:
0211 task_id = int(task_id)
0212 except ValueError:
0213 tmp_logger.error("Failed due to invalid task_id")
0214 return generate_response(False, message=MESSAGE_TASK_ID)
0215
0216 ret = global_task_buffer.sendCommandTaskPanda(task_id, user, is_production_role, "release", properErrorCode=True)
0217 data, message = ret
0218 success = data == 0
0219
0220 tmp_logger.debug("Done")
0221
0222 return generate_response(success, message, data)
0223
0224
0225
0226 @request_validation(_logger, secure=True, request_method="POST")
0227 def reassign(req: PandaRequest, task_id: int, site: str = None, cloud: str = None, nucleus: str = None, mode: str = None):
0228 """
0229 Task reassign
0230
0231 Reassign a given task to a site, nucleus or cloud - depending on the parameters. Requires a secure connection.
0232
0233 API details:
0234 HTTP Method: POST
0235 Path: /v1/task/reassign
0236
0237 Args:
0238 req(PandaRequest): internally generated request object
0239 task_id(int): JEDI Task ID
0240 site(str, optional): site name
0241 cloud(str, optional): cloud name
0242 nucleus(str, optional): nucleus name
0243 mode(str, optional): `kill` (kills all jobs, default), `soft` (kills queued jobs) or `nokill` (doesn't kill jobs)
0244
0245 Returns:
0246 dict: The system response `{"success": success, "message": message, "data": data}`. True for success, False for failure, and an error message. Return code in the data field, 0 for success, others for failure.
0247 """
0248
0249 tmp_logger = LogWrapper(_logger, f"reassign < task_id={task_id} >")
0250 tmp_logger.debug("Start")
0251
0252
0253 try:
0254 task_id = int(task_id)
0255 except ValueError:
0256 tmp_logger.error("Failed due to invalid task_id")
0257 return generate_response(False, message=MESSAGE_TASK_ID)
0258
0259 user = get_dn(req)
0260 is_production_role = has_production_role(req)
0261
0262
0263 comment = make_reassign_comment(site, cloud, nucleus, mode)
0264
0265 ret = global_task_buffer.sendCommandTaskPanda(
0266 task_id,
0267 user,
0268 is_production_role,
0269 "reassign",
0270 comComment=comment,
0271 properErrorCode=True,
0272 )
0273 data, message = ret
0274 success = data == 0
0275
0276 tmp_logger.debug("Done")
0277
0278 return generate_response(success, message, data)
0279
0280
0281 @request_validation(_logger, secure=True, production=True, request_method="POST")
0282 def pause(req: PandaRequest, task_id: int) -> Dict[str, Any]:
0283 """
0284 Task pause
0285
0286 Pause a given task. Requires a secure connection and production role.
0287
0288 API details:
0289 HTTP Method: POST
0290 Path: /v1/task/pause
0291
0292 Args:
0293 req(PandaRequest): internally generated request object
0294 task_id(int): JEDI Task ID
0295
0296 Returns:
0297 dict: The system response `{"success": success, "message": message, "data": data}`. True for success, False for failure, and an error message. Return code in the data field, 0 for success, others for failure.
0298 """
0299
0300 tmp_logger = LogWrapper(_logger, f"pause < task_id={task_id} >")
0301 tmp_logger.debug("Start")
0302
0303 try:
0304 task_id = int(task_id)
0305 except ValueError:
0306 tmp_logger.error("Failed due to invalid task_id")
0307 return generate_response(False, message=MESSAGE_TASK_ID)
0308
0309 user = get_dn(req)
0310 is_production_role = has_production_role(req)
0311
0312 ret = global_task_buffer.sendCommandTaskPanda(task_id, user, is_production_role, "pause", properErrorCode=True)
0313 data, message = ret
0314 success = data == 0
0315
0316 tmp_logger.debug("Done")
0317
0318 return generate_response(success, message, data)
0319
0320
0321 @request_validation(_logger, secure=True, request_method="POST")
0322 def kill(req: PandaRequest, task_id: int = None, broadcast: bool = False) -> Dict[str, Any]:
0323 """
0324 Task kill
0325
0326 Kill a given task. Requires a secure connection.
0327
0328 API details:
0329 HTTP Method: POST
0330 Path: /v1/task/kill
0331
0332 Args:
0333 req(PandaRequest): internally generated request object
0334 task_id(int): JEDI Task ID
0335 broadcast(bool, optional): broadcast kill command to pilots to kill the jobs
0336
0337 Returns:
0338 dict: The system response `{"success": success, "message": message, "data": data}`. True for success, False for failure, and an error message. Return code in the data field, 0 for success, others for failure.
0339 """
0340 tmp_logger = LogWrapper(_logger, f"kill < task_id={task_id} >")
0341 tmp_logger.debug("Start")
0342
0343
0344 try:
0345 task_id = int(task_id)
0346 except ValueError:
0347 tmp_logger.error("Failed due to invalid task_id")
0348 return generate_response(False, message=MESSAGE_TASK_ID)
0349
0350 user = get_dn(req)
0351 is_production_role = has_production_role(req)
0352
0353 ret = global_task_buffer.sendCommandTaskPanda(
0354 task_id,
0355 user,
0356 is_production_role,
0357 "kill",
0358 properErrorCode=True,
0359 broadcast=broadcast,
0360 )
0361 data, message = ret
0362 success = data == 0
0363
0364 tmp_logger.debug("Done")
0365
0366 return generate_response(success, message, data)
0367
0368
0369 @request_validation(_logger, secure=True, request_method="POST")
0370 def kill_unfinished_jobs(req: PandaRequest, task_id: int, code: int = None, use_email_as_id: bool = False):
0371 """
0372 Kill all unfinished jobs in a task
0373
0374 Kills all unfinished jobs in a task. Requires a secure connection.
0375
0376 API details:
0377 HTTP Method: POST
0378 Path: /v1/task/kill_unfinished_jobs
0379
0380 Args:
0381 req(PandaRequest): internally generated request object containing the env variables
0382 task_id(int): JEDI task ID
0383 code(int, optional): The kill code. Defaults to None.
0384 ```
0385 code
0386 2: expire
0387 3: aborted
0388 4: expire in waiting
0389 7: retry by server
0390 8: rebrokerage
0391 9: force kill
0392 10: fast rebrokerage in overloaded PQ
0393 50: kill by JEDI
0394 51: reassigned by JEDI
0395 52: force kill by JEDI
0396 55: killed since task is (almost) done
0397 60: workload was terminated by the pilot without actual work
0398 91: kill user jobs with prod role
0399 99: force kill user jobs with prod role
0400 ```
0401 use_email_as_id(bool, optional): Use the email as ID. Defaults to False.
0402
0403 Returns:
0404 dict: The system response `{"success": success, "message": message, "data": data}`. The data field contains a list of bools indicating the success of the kill operations.
0405 """
0406
0407 tmp_logger = LogWrapper(_logger, f"kill_unfinished_jobs")
0408
0409
0410 user = get_dn(req)
0411 fqans = get_fqan(req)
0412 is_production_manager = has_production_role(req)
0413
0414 if use_email_as_id:
0415 email = get_email_address(user, tmp_logger)
0416 if email:
0417 user = email
0418
0419 tmp_logger.debug(f"Start user: {user} code: {code} is_production_manager: {is_production_manager} fqans: {fqans} task_id: {task_id}")
0420
0421
0422 wg_prod_roles = extract_production_working_groups(fqans)
0423
0424
0425 job_ids = global_task_buffer.getPandaIDsWithTaskID(task_id)
0426
0427
0428 ret = global_task_buffer.killJobs(job_ids, user, code, is_production_manager, wg_prod_roles, [])
0429 tmp_logger.debug(f"Done with ret: {ret}")
0430 return generate_response(True, data=ret)
0431
0432
0433 @request_validation(_logger, secure=True, request_method="POST")
0434 def finish(req: PandaRequest, task_id: int, soft: bool = False, broadcast: bool = False) -> Dict[str, Any]:
0435 """
0436 Task finish
0437
0438 Finish a given task. Requires a secure connection.
0439
0440 API details:
0441 HTTP Method: POST
0442 Path: /v1/task/finish
0443
0444 Args:
0445 req(PandaRequest): internally generated request object
0446 task_id(int): JEDI Task ID
0447 soft(bool, optional): soft finish
0448 broadcast(bool, optional): broadcast finish command to pilots
0449
0450 Returns:
0451 dict: The system response. True for success, False for failure, and an error message. Return code in the data field, 0 for success, others for failure.
0452 """
0453 tmp_logger = LogWrapper(_logger, f"finish < task_id={task_id} soft={soft} broadcast={broadcast} >")
0454 tmp_logger.debug("Start")
0455
0456 qualifier = None
0457 if soft:
0458 qualifier = "soft"
0459
0460 user = get_dn(req)
0461 is_production_role = has_production_role(req)
0462
0463
0464 try:
0465 task_id = int(task_id)
0466 except ValueError:
0467 tmp_logger.error("Failed due to invalid task_id")
0468 return generate_response(False, message=MESSAGE_TASK_ID)
0469
0470 ret = global_task_buffer.sendCommandTaskPanda(
0471 task_id,
0472 user,
0473 is_production_role,
0474 "finish",
0475 properErrorCode=True,
0476 comQualifier=qualifier,
0477 broadcast=broadcast,
0478 )
0479 data, message = ret
0480 success = data == 0
0481
0482 tmp_logger.debug("Done")
0483
0484 return generate_response(success, message, data)
0485
0486
0487 @request_validation(_logger, secure=True, production=True, request_method="POST")
0488 def reactivate(req: PandaRequest, task_id: int, keep_attempt_nr: bool = False, trigger_job_generation: bool = False) -> Dict[str, Any]:
0489 """
0490 Reactivate task
0491
0492 Reactivate a given task, i.e. recycle a finished/done task. A reactivated task will generate new jobs and then go to done/finished.
0493 Requires a secure connection and production role.
0494
0495 API details:
0496 HTTP Method: POST
0497 Path: /v1/task/reactivate
0498
0499 Args:
0500 req(PandaRequest): internally generated request object
0501 task_id(int): JEDI Task ID
0502 keep_attempt_nr(bool, optional): keep the original attempt number
0503 trigger_job_generation(bool, optional): trigger the job generation
0504
0505 Returns:
0506 dict: The system response. True for success, False for failure, and an error message. Return code in the data field, 0 for success, others for failure.
0507 """
0508 tmp_logger = LogWrapper(_logger, f"reactivate < task_id={task_id} >")
0509 tmp_logger.debug("Start")
0510
0511 try:
0512 task_id = int(task_id)
0513 except ValueError:
0514 tmp_logger.error("Failed due to invalid task_id")
0515 return generate_response(False, message=MESSAGE_TASK_ID)
0516
0517 ret = global_task_buffer.reactivateTask(task_id, keep_attempt_nr, trigger_job_generation)
0518 code, message = ret
0519 success = code == 0
0520
0521 tmp_logger.debug("Done")
0522
0523 return generate_response(success, message, code)
0524
0525
0526 @request_validation(_logger, secure=True, production=True, request_method="POST")
0527 def avalanche(req: PandaRequest, task_id: int) -> Dict[str, Any]:
0528 """
0529 Task avalanche
0530
0531 Avalanche a given task. This triggers the avalanche for tasks in scouting state or dynamically reconfigures the task to skip over the scouting state. Requires a secure connection and production role.
0532
0533 API details:
0534 HTTP Method: POST
0535 Path: /v1/task/avalanche
0536
0537 Args:
0538 req(PandaRequest): internally generated request object
0539 task_id(int): JEDI Task ID
0540
0541 Returns:
0542 dict: The system response. True for success, False for failure, and an error message. Return code in the data field, 0 for success, others for failure.
0543 """
0544 tmp_logger = LogWrapper(_logger, f"avalanche < task_id={task_id} >")
0545 tmp_logger.debug("Start")
0546
0547 user = get_dn(req)
0548 is_production_role = has_production_role(req)
0549
0550
0551 try:
0552 task_id = int(task_id)
0553 except ValueError:
0554 tmp_logger.error("Failed due to invalid task_id")
0555 return generate_response(False, message=MESSAGE_TASK_ID)
0556
0557 ret = global_task_buffer.sendCommandTaskPanda(task_id, user, is_production_role, "avalanche", properErrorCode=True)
0558 data, message = ret
0559 success = data == 0
0560
0561 tmp_logger.debug("Done")
0562
0563 return generate_response(success, message, data)
0564
0565
0566 @request_validation(_logger, secure=True, request_method="POST")
0567 def reload_input(req: PandaRequest, task_id: int, ignore_hard_exhausted: bool = False) -> Dict[str, Any]:
0568 """
0569 Reload input
0570
0571 Request to reload the input for a given task. Requires a secure connection and production role.
0572
0573 API details:
0574 HTTP Method: POST
0575 Path: /v1/task/reload_input
0576
0577 Args:
0578 req(PandaRequest): internally generated request object
0579 task_id(int): JEDI Task ID
0580 ignore_hard_exhausted(bool, optional): ignore the limits for hard exhausted
0581
0582 Returns:
0583 dict: The system response. True for success, False for failure, and an error message. Return code in the data field, 0 for success, others for failure.
0584 """
0585 tmp_logger = LogWrapper(_logger, f"reload_input < task_id={task_id} >")
0586 tmp_logger.debug("Start")
0587
0588 user = get_dn(req)
0589 is_production_role = has_production_role(req)
0590
0591
0592 try:
0593 task_id = int(task_id)
0594 except ValueError:
0595 tmp_logger.error("Failed due to invalid task_id")
0596 return generate_response(False, message=MESSAGE_TASK_ID)
0597
0598
0599 if not is_production_role and ignore_hard_exhausted:
0600 ignore_hard_exhausted = False
0601
0602
0603 com_qualifier = JediTaskSpec.get_retry_command_qualifiers(ignore_hard_exhausted=ignore_hard_exhausted)
0604 com_comment = json.dumps([{}, com_qualifier])
0605
0606 ret = global_task_buffer.sendCommandTaskPanda(task_id, user, is_production_role, "incexec", comComment=com_comment, properErrorCode=True)
0607 data, message = ret
0608 success = data == 0
0609
0610 tmp_logger.debug("Done")
0611
0612 return generate_response(success, message, data)
0613
0614
0615 @request_validation(_logger, secure=True, production=True, request_method="POST")
0616 def reassign_global_share(req: PandaRequest, task_id_list: List[int], share: str, reassign_running_jobs: bool) -> Dict[str, Any]:
0617 """
0618 Reassign the global share of a task
0619
0620 Reassign the global share of a task. Requires a secure connection and production role.
0621
0622 API details:
0623 HTTP Method: POST
0624 Path: /v1/task/reassign_global_share
0625
0626 Args:
0627 req(PandaRequest): internally generated request object
0628 task_id_list(list): List of JEDI task IDs to reassign
0629 share(str): destination share
0630 reassign_running_jobs(bool): whether you want to reassign existing running jobs
0631
0632 Returns:
0633 dict: The system response. True for success, False for failure, and an error message. Return code in the data field, 0 for success, others for failure.
0634 """
0635
0636 tmp_logger = LogWrapper(_logger, f"reassign_global_share < task_id_list={task_id_list} share={share} reassign_running_jobs={reassign_running_jobs} >")
0637 tmp_logger.debug("Start")
0638
0639 if not isinstance(task_id_list, list) or not isinstance(share, str):
0640 tmp_logger.error("Failed due to invalid task list")
0641 return generate_response(False, message="wrong parameters: task_ids must be list and share must be string")
0642
0643 code, message = global_task_buffer.reassignShare(task_id_list, share, reassign_running_jobs)
0644 success = code == 0
0645
0646 tmp_logger.debug("Done")
0647
0648 return generate_response(success, message, code)
0649
0650
0651 @request_validation(_logger, secure=True, production=True, request_method="POST")
0652 def enable_jumbo_jobs(req: PandaRequest, task_id: int, jumbo_jobs_total: int, jumbo_jobs_per_site: int = None):
0653 """
0654 Enable Jumbo jobs
0655
0656 Enables the Jumbo jobs for a given task ID. Requires a secure connection and production role.
0657
0658 API details:
0659 HTTP Method: POST
0660 Path: /v1/task/enable_jumbo_jobs
0661
0662 Args:
0663 req(PandaRequest): internally generated request object
0664 task_id(int): JEDI task ID
0665 jumbo_jobs_total(int): Total number of jumbo jobs
0666 jumbo_jobs_per_site(int): Number of jumbo jobs per site. Defaults to `jumbo_jobs_total`.
0667
0668 Returns:
0669 dict: The system response. True for success, False for failure, and an error message. Return code in the data field, 0 for success, others for failure.
0670 """
0671
0672 tmp_logger = LogWrapper(_logger, f"enable_jumbo_jobs < task_id={task_id} jumbo_jobs_total={jumbo_jobs_total} n_jumbo_jobs_per_site={jumbo_jobs_per_site} >")
0673 tmp_logger.debug("Start")
0674
0675 if not jumbo_jobs_per_site:
0676 jumbo_jobs_per_site = jumbo_jobs_total
0677
0678 code, message = global_task_buffer.enableJumboJobs(task_id, jumbo_jobs_total, jumbo_jobs_per_site)
0679 if jumbo_jobs_total > 0 and code == 0:
0680 tmp_logger.debug("Calling task avalanche")
0681 avalanche(task_id)
0682
0683 success = code == 0
0684
0685 tmp_logger.debug("Done")
0686 return generate_response(success, message, code)
0687
0688
0689 @request_validation(_logger, secure=True, request_method="GET")
0690 def get_jumbo_job_datasets(req: PandaRequest, from_offset: int, to_offset: int = 0) -> Dict:
0691 """
0692 Get jumbo job datasets
0693
0694 Gets a map of the jumbo-job-enabled tasks to their datasets, filtering by the last modification time (now - from_offset to now - to_offset).
0695 Requires a secure connection.
0696
0697 API details:
0698 HTTP Method: GET
0699 Path: /v1/task/get_jumbo_job_datasets
0700
0701 Args:
0702 req(PandaRequest): internally generated request object
0703 from_offset(int): `now - from_offset` in days will serve as the floor for modification time (Previously called n_days)
0704 to_offset(int, optional): `now - to_offset` in days will serve as the ceiling for modification time. Defaults to 0. (Previously called grace_period)
0705
0706 Returns:
0707 dict: The system response `{"success": success, "message": message, "data": data}`.
0708 When successful, the data field contains the dictionary of JEDI task IDs to datasets.
0709 """
0710 tmp_logger = LogWrapper(_logger, f"get_jumbo_job_datasets")
0711
0712 tmp_logger.debug("Start")
0713 jumbo_datasets = global_task_buffer.getJumboJobDatasets(from_offset, to_offset)
0714 tmp_logger.debug("Done")
0715
0716 return generate_response(True, data=jumbo_datasets)
0717
0718
0719 @request_validation(_logger, secure=True, production=True, request_method="POST")
0720 def enable_job_cloning(
0721 req: PandaRequest,
0722 jedi_task_id: int,
0723 mode: str = None,
0724 multiplicity: int = None,
0725 num_sites: int = None,
0726 ) -> Dict[str, Any]:
0727 """
0728 Enable job cloning
0729
0730 Enable job cloning for a given task. Requires secure connection and production role.
0731
0732 API details:
0733 HTTP Method: POST
0734 Path: /v1/task/enable_job_cloning
0735
0736 Args:
0737 req(PandaRequest): internally generated request object
0738 jedi_task_id(int): JEDI Task ID
0739 mode(str, optional): mode of operation, runonce or storeonce
0740 multiplicity(int, optional): number of clones to be created for each target
0741 num_sites(int, optional): number of sites to be used for each target
0742
0743 Returns:
0744 dict: The system response `{"success": success, "message": message, "data": data}`.
0745 When there was an error, the message field contains the description.
0746 """
0747 tmp_logger = LogWrapper(_logger, f"enable_job_cloning < jedi_task_id={jedi_task_id} >")
0748 tmp_logger.debug("Start")
0749 success, message = global_task_buffer.enable_job_cloning(jedi_task_id, mode, multiplicity, num_sites)
0750 tmp_logger.debug("Done")
0751 return generate_response(success, message)
0752
0753
0754 @request_validation(_logger, secure=True, production=True, request_method="POST")
0755 def disable_job_cloning(req: PandaRequest, jedi_task_id: int) -> Dict[str, Any]:
0756 """
0757 Disable job cloning
0758
0759 Disable job cloning for a given task. Requires secure connection and production role.
0760
0761 API details:
0762 HTTP Method: POST
0763 Path: /v1/task/disable_job_cloning
0764
0765 Args:
0766 req(PandaRequest): internally generated request object
0767 jedi_task_id(int): JEDI Task ID
0768
0769 Returns:
0770 dict: The system response `{"success": success, "message": message, "data": data}`.
0771 When there was an error, the message field contains the description.
0772 """
0773 tmp_logger = LogWrapper(_logger, f"disable_job_cloning < jedi_task_id={jedi_task_id} >")
0774 tmp_logger.debug("Start")
0775 success, message = global_task_buffer.disable_job_cloning(jedi_task_id)
0776 tmp_logger.debug("Done")
0777 return generate_response(success, message)
0778
0779
0780 @request_validation(_logger, secure=True, production=True, request_method="POST")
0781 def increase_attempts(req: PandaRequest, task_id: int, increase: int) -> Dict[str, Any]:
0782 """
0783 Increase possible task attempts
0784
0785 Increase possible task attempts. Requires secure connection and production role.
0786
0787 API details:
0788 HTTP Method: POST
0789 Path: /v1/task/increase_attempts
0790
0791 Args:
0792 req(PandaRequest): internally generated request object
0793 task_id(int): JEDI Task ID
0794 increase(int): number of attempts to increase
0795
0796 Returns:
0797 dict: The system response `{"success": success, "message": message, "data": data}`.
0798 When there was an error, the message field contains the description and the data field contains the code.
0799 """
0800 tmp_logger = LogWrapper(_logger, f"increase_attempt_number task_id={task_id}")
0801 tmp_logger.debug("Start")
0802
0803 try:
0804 task_id = int(task_id)
0805 except Exception:
0806 tmp_logger.error("Failed due to invalid task_id")
0807 return generate_response(False, message=MESSAGE_TASK_ID)
0808
0809
0810 try:
0811 increase = int(increase)
0812 if increase < 0:
0813 raise ValueError
0814 except Exception:
0815 message = f"increase must be a positive integer, got {increase}"
0816 tmp_logger.error(message)
0817 return generate_response(False, message=message)
0818
0819 code, message = global_task_buffer.increaseAttemptNrPanda(task_id, increase)
0820 success = code == 0
0821
0822 tmp_logger.debug("Done")
0823
0824 return generate_response(success, message, code)
0825
0826
0827 @request_validation(_logger, secure=True, request_method="GET")
0828 def get_status(req, task_id):
0829 """
0830 Get task status
0831
0832 Get the status of a given task. Requires secure connection.
0833
0834 API details:
0835 HTTP Method: GET
0836 Path: /v1/task/get_status
0837
0838 Args:
0839 req(PandaRequest): internally generated request object
0840 task_id(int): JEDI Task ID
0841
0842 Returns:
0843 dict: The system response `{"success": success, "message": message, "data": data}`.
0844 When successful, the data field contains the status of the task.
0845 When there was an error, the message field contains the description.
0846 """
0847 tmp_logger = LogWrapper(_logger, f"get_status < task_id={task_id} >")
0848 tmp_logger.debug("Start")
0849
0850 try:
0851 task_id = int(task_id)
0852 except ValueError:
0853 tmp_logger.error("Failed due to invalid task_id")
0854 return generate_response(False, message=MESSAGE_TASK_ID)
0855
0856 ret = global_task_buffer.getTaskStatus(task_id)
0857 if not ret:
0858 return generate_response(False, message="Task not found")
0859 status = ret[0]
0860
0861 tmp_logger.debug("Done")
0862
0863 return generate_response(True, data=status)
0864
0865
0866 @request_validation(_logger, request_method="GET", secure=True)
0867 def get_details(req: PandaRequest, task_id: int, include_parameters: bool = False, include_status: bool = False):
0868 """
0869 Get task details
0870
0871 Get the details of a given task. Requires secure connection.
0872
0873 API details:
0874 HTTP Method: GET
0875 Path: /v1/task/get_details
0876
0877 Args:
0878 req(PandaRequest): internally generated request object
0879 task_id(int): JEDI Task ID
0880 include_parameters(bool, optional): flag to include task parameter information (Previously fullFlag)
0881 include_status(bool, optional): flag to include status information (Previously withTaskInfo)
0882
0883 Returns:
0884 dict: The system response `{"success": success, "message": message, "data": data}`.
0885 When successful, the data field contains the task details.
0886 When there was an error, the message field contains the description.
0887 """
0888 tmp_logger = LogWrapper(_logger, f"get_details < task_id={task_id} include_parameters={include_parameters} include_status={include_status} >")
0889 tmp_logger.debug("Start")
0890
0891 details = global_task_buffer.getJediTaskDetails(task_id, include_parameters, include_status)
0892 if not details:
0893 tmp_logger.error("Task not found or error retrieving the details")
0894 return generate_response(False, message="Task not found or error retrieving the details")
0895
0896 tmp_logger.debug("Done")
0897
0898 return generate_response(True, data=details)
0899
0900
0901 @request_validation(_logger, secure=True, production=True, request_method="POST")
0902 def change_attribute(req: PandaRequest, task_id: int, attribute_name: str, value: int) -> Dict[str, Any]:
0903 """
0904 Change a task attribute
0905
0906 Change a task attribute within the list of valid attributes ("ramCount", "wallTime", "cpuTime", "coreCount").
0907 Requires a secure connection and production role.
0908
0909 API details:
0910 HTTP Method: POST
0911 Path: /v1/task/change_attribute
0912
0913 Args:
0914 req(PandaRequest): internally generated request object
0915 task_id(int): JEDI task ID
0916 attribute_name(str): attribute to change
0917 value(int): value to set to the attribute
0918
0919 Returns:
0920 dict: The system response `{"success": success, "message": message, "data": data}`. True for success, False for failure, and an error message.
0921 Return code in the data field, 0 for success, others for failure.
0922 """
0923 tmp_logger = LogWrapper(_logger, f"change_attribute < task_id={task_id} attribute_name={attribute_name} value={value} >")
0924 tmp_logger.debug("Start")
0925
0926
0927 try:
0928 task_id = int(task_id)
0929 except ValueError:
0930 tmp_logger.error("Failed due to invalid task_id")
0931 return generate_response(False, message=MESSAGE_TASK_ID)
0932
0933
0934 valid_attributes = ["ramCount", "wallTime", "cpuTime", "coreCount"]
0935 if attribute_name not in valid_attributes:
0936 tmp_logger.error("Failed due to invalid attribute_name")
0937 return generate_response(False, message=f"{attribute_name} is not a valid attribute. Valid attributes are {valid_attributes}")
0938
0939 n_tasks_changed = global_task_buffer.changeTaskAttributePanda(task_id, attribute_name, value)
0940 if n_tasks_changed is None:
0941 tmp_logger.error("Failed due to exception while changing the attribute")
0942 return generate_response(False, message="Exception while changing the attribute")
0943
0944 if n_tasks_changed == 0:
0945 tmp_logger.error("Failed due to task not found")
0946 return generate_response(False, message="Task not found")
0947
0948 tmp_logger.debug("Done")
0949 return generate_response(True, message=f"{n_tasks_changed} tasks changed")
0950
0951
0952 @request_validation(_logger, secure=True, production=True, request_method="POST")
0953 def change_modification_time(req: PandaRequest, task_id: int, hour_offset: int) -> Dict[str, Any]:
0954 """
0955 Change task modification time
0956
0957 Change the modification time for a task to `now() + hour_offset`. Requires a secure connection and production role.
0958
0959 API details:
0960 HTTP Method: POST
0961 Path: /v1/task/change_modification_time
0962
0963 Args:
0964 req(PandaRequest): internally generated request object
0965 task_id(int): JEDI task ID
0966 hour_offset(int): number of hours to add to the current time. Use a negative value (e.g. -12) to trigger task brokerage.
0967
0968 Returns:
0969 dict: The system response `{"success": success, "message": message, "data": data}`. True for success, False for failure, and an error message. Return code in the data field, 0 for success, others for failure.
0970 """
0971 tmp_logger = LogWrapper(_logger, f"change_modification_time < task_id={task_id} hour_offset={hour_offset} >")
0972 tmp_logger.debug("Start")
0973
0974
0975 try:
0976 new_modification_time = datetime.datetime.now() + datetime.timedelta(hours=hour_offset)
0977 except ValueError:
0978 tmp_logger.error("Failed due to invalid hour_offset")
0979 return generate_response(False, message=f"failed to convert {hour_offset} to time")
0980
0981 n_tasks_changed = global_task_buffer.changeTaskAttributePanda(task_id, "modificationTime", new_modification_time)
0982 if n_tasks_changed is None:
0983 tmp_logger.error("Failed due to exception while changing the attribute")
0984 return generate_response(False, message="Exception while changing the attribute")
0985
0986 if n_tasks_changed == 0:
0987 tmp_logger.error("Failed due to task not found")
0988 return generate_response(False, message="Task not found")
0989
0990 tmp_logger.debug("Done")
0991 return generate_response(True, message=f"{n_tasks_changed} tasks changed")
0992
0993
0994 @request_validation(_logger, secure=True, production=True, request_method="POST")
0995 def change_priority(req: PandaRequest, task_id: int, priority: int):
0996 """
0997 Change priority
0998
0999 Change the priority of a given task. Requires a secure connection and production role.
1000
1001 API details:
1002 HTTP Method: POST
1003 Path: /v1/task/change_priority
1004
1005 Args:
1006 req(PandaRequest): internally generated request object
1007 task_id(int): JEDI task ID
1008 priority(int): new priority for the task
1009
1010 Returns:
1011 dict: The system response `{"success": success, "message": message, "data": data}`.
1012 True for success, False for failure, and an error message. Return code in the data field, 0 for success, others for failure.
1013 """
1014 tmp_logger = LogWrapper(_logger, f"change_priority < task_id={task_id} priority={priority} >")
1015 tmp_logger.debug("Start")
1016
1017
1018 try:
1019 task_id = int(task_id)
1020 except ValueError:
1021 tmp_logger.error("Failed due to invalid task_id")
1022 return generate_response(False, message=MESSAGE_TASK_ID)
1023
1024
1025 try:
1026 priority = int(priority)
1027 except ValueError:
1028 tmp_logger.error("Failed due to invalid priority")
1029 return generate_response(False, message="priority must be an integer")
1030
1031 n_tasks_changed = global_task_buffer.changeTaskPriorityPanda(task_id, priority)
1032
1033 if n_tasks_changed is None:
1034 tmp_logger.error("Failed due to exception while changing the priority")
1035 return generate_response(False, message="Exception while changing the priority")
1036
1037 if n_tasks_changed == 0:
1038 tmp_logger.error("Failed due to task not found")
1039 return generate_response(False, message="Task not found")
1040
1041 tmp_logger.debug("Done")
1042
1043 return generate_response(True, message=f"{n_tasks_changed} tasks changed")
1044
1045
1046 @request_validation(_logger, secure=True, production=True, request_method="POST")
1047 def change_split_rule(req: PandaRequest, task_id: int, attribute_name: str, value: str) -> Dict[str, Any]:
1048 """
1049 Change the split rule
1050
1051 Change the split rule for a task by modifying or adding an `attribute_name=value pair`. Requires a secure connection and production role.
1052
1053 API details:
1054 HTTP Method: POST
1055 Path: /v1/task/change_split_rule
1056
1057 Args:
1058 req(PandaRequest): internally generated request object
1059 task_id(int): JEDI task ID
1060 attribute_name(str): split rule attribute to change. The allowed attributes are defined in `task_split_rules.changeable_split_rule_tags`
1061 value(str): value to set to the attribute
1062
1063 Returns:
1064 dict: The system response `{"success": success, "message": message, "data": data}`. True for success, False for failure, and an error message.
1065 Return code in the data field, 0 for success, others for failure.
1066 """
1067 tmp_logger = LogWrapper(_logger, f"change_split_rule < task_id={task_id} attribute_name={attribute_name} value={value} >")
1068 tmp_logger.debug("Start")
1069
1070
1071 try:
1072 task_id = int(task_id)
1073 except ValueError:
1074 tmp_logger.error("Failed due to invalid task_id")
1075 return generate_response(False, message=MESSAGE_TASK_ID)
1076
1077
1078 if attribute_name not in task_split_rules.changeable_split_rule_tags:
1079 tmp_logger.error("Failed due to invalid attribute_name")
1080 return generate_response(
1081 False, message=f"{attribute_name} is not a valid attribute. Valid attributes are {task_split_rules.changeable_split_rule_tags}", data=2
1082 )
1083
1084 n_tasks_changed = global_task_buffer.changeTaskSplitRulePanda(task_id, attribute_name, value)
1085 if n_tasks_changed is None:
1086 tmp_logger.error("Failed due to exception while changing the split rule")
1087 return generate_response(False, message="Exception while changing the split rule")
1088
1089 if n_tasks_changed == 0:
1090 tmp_logger.error("Failed due to task not found")
1091 return generate_response(False, message="Task not found")
1092
1093 tmp_logger.debug("Done")
1094
1095 return generate_response(True, message=f"{n_tasks_changed} tasks changed")
1096
1097
1098 @request_validation(_logger, secure=True, request_method="GET")
1099 def get_tasks_modified_since(req, since: str, dn: str = None, full: bool = False, min_task_id: int = None, prod_source_label: str = "user") -> Dict[str, Any]:
1100 """
1101 Get tasks modified since
1102
1103 Get the tasks with `modificationtime > since`. Requires a secure connection.
1104
1105 API details:
1106 HTTP Method: GET
1107 Path: /v1/task/get_tasks_modified_since
1108
1109 Args:
1110 req(PandaRequest): internally generated request object
1111 since(str): time in the format `%Y-%m-%d %H:%M:%S`, e.g. `2024-12-18 14:30:45`. The tasks with `modificationtime > since` will be returned
1112 dn(str, optional): user DN
1113 full(bool, optional): flag to include full task information. If `full=False` the basic fields are `jediTaskID, modificationTime, status, processingType, transUses, transHome, architecture, reqID, creationDate, site, cloud, taskName`
1114 min_task_id(int, optional): minimum task ID
1115 prod_source_label(str, optional): task type (e.g. `user`, `managed`, `test`, etc.)
1116
1117 Returns:
1118 dict: The system response `{"success": success, "message": message, "data": data}`. True for success, False for failure, and an error message. Return code in the data field, 0 for success, others for failure.
1119 """
1120 tmp_logger = LogWrapper(_logger, "get_tasks_modified_since")
1121 tmp_logger.debug("Start")
1122
1123 if not dn:
1124 dn = get_dn(req)
1125
1126 tmp_logger.debug(f"parameters dn:{dn} since:{since} full:{full} min_task_id:{min_task_id} prod_source_label:{prod_source_label}")
1127
1128 tasks = global_task_buffer.getJediTasksInTimeRange(dn, since, full, min_task_id, prod_source_label)
1129
1130 tmp_logger.debug("Done")
1131
1132 return generate_response(True, data=tasks)
1133
1134
1135 @request_validation(_logger, secure=True, request_method="GET")
1136 def get_datasets_and_files(req, task_id, dataset_types: List[str] = ("input", "pseudo_input")) -> Dict[str, Any]:
1137 """
1138 Get datasets and files
1139
1140 Get the files in the datasets associated to a given task. You can filter passing a list of dataset types. The return format is:
1141 ```
1142 [
1143 {
1144 "dataset": {
1145 "name": dataset_name,
1146 "id": dataset_id
1147 },
1148 "files": [
1149 {
1150 "lfn": lfn,
1151 "scope": file_scope,
1152 "id": file_id,
1153 "status": status
1154 },
1155 ...
1156 ]
1157 },
1158 ...
1159 ]
1160 ```
1161 Requires a secure connection.
1162
1163 API details:
1164 HTTP Method: GET
1165 Path: /v1/task/get_datasets_and_files
1166
1167 Args:
1168 req(PandaRequest): internally generated request object
1169 task_id(int): JEDI task ID
1170 dataset_types(List, optional): list of dataset types, defaults to `["input", "pseudo_input"]`
1171
1172 Returns:
1173 dict: The system response `{"success": success, "message": message, "data": data}`. True for success, False for failure, and an error message. Return code in the data field, 0 for success, others for failure.
1174 """
1175 tmp_logger = LogWrapper(_logger, f"get_datasets_and_files < task_id={task_id} dataset_types={dataset_types} >")
1176 tmp_logger.debug("Start")
1177
1178 data = global_task_buffer.get_files_in_datasets(task_id, dataset_types)
1179 if data is None:
1180 tmp_logger.error("Failed due to exception while gathering files")
1181 return generate_response(False, message="Database exception while gathering files")
1182
1183 if data == []:
1184 tmp_logger.error("Failed due to no data found for the task")
1185 return generate_response(False, message="No data found for the task")
1186
1187 tmp_logger.debug("Done")
1188
1189 return generate_response(True, data=data)
1190
1191
1192 @request_validation(_logger, secure=True, request_method="GET")
1193 def get_job_ids(req: PandaRequest, task_id: int) -> Dict[str, Any]:
1194 """
1195 Get job IDs
1196
1197 Get a list with the job IDs `[job_id, ...]` (in any status) associated to a given task. Requires a secure connection.
1198
1199 API details:
1200 HTTP Method: GET
1201 Path: /v1/task/get_job_ids
1202
1203 Args:
1204 req(PandaRequest): internally generated request object
1205 task_id(int): JEDI task ID
1206
1207 Returns:
1208 dict: The system response `{"success": success, "message": message, "data": data}`. True for success, False for failure, and an error message. Return code in the data field, 0 for success, others for failure.
1209 """
1210 tmp_logger = LogWrapper(_logger, f"get_job_ids < task_id={task_id} >")
1211 tmp_logger.debug("Start")
1212
1213 try:
1214 task_id = int(task_id)
1215 except ValueError:
1216 tmp_logger.error("Failed due to invalid task_id")
1217 return generate_response(False, message=MESSAGE_TASK_ID)
1218
1219 job_id_list = global_task_buffer.getPandaIDsWithTaskID(task_id)
1220
1221 tmp_logger.debug("Done")
1222
1223 return generate_response(True, data=job_id_list)
1224
1225
1226 @request_validation(_logger, secure=True, request_method="POST")
1227 def submit(req: PandaRequest, task_parameters: Dict, parent_tid: int = None) -> Dict[str, Any]:
1228 """
1229 Register task
1230
1231 Insert the task parameters to register a task. Requires a secure connection.
1232
1233 API details:
1234 HTTP Method: POST
1235 Path: /v1/task/submit
1236
1237 Args:
1238 req(PandaRequest): internally generated request object
1239 task_parameters(dict): Dictionary with all the required task parameters. The parameters are the attributes in the JediTaskSpec object (https://github.com/PanDAWMS/panda-jedi/blob/master/pandajedi/jedicore/JediTaskSpec.py).
1240 parent_tid(int, optional): Parent task ID
1241
1242 Returns:
1243 dict: The system response `{"success": success, "message": message, "data": data}`. True for success, False for failure, and an error message.
1244 Return code in the data field, 0 for success, others for failure.
1245 """
1246 tmp_log = LogWrapper(_logger, f"submit {naive_utcnow().isoformat('/')}")
1247 tmp_log.debug("Start")
1248
1249 user = get_dn(req)
1250 is_production_role = has_production_role(req)
1251 fqans = get_fqan(req)
1252
1253 tmp_log.debug(f"user={user} is_production_role={is_production_role} FQAN:{str(fqans)} parent_tid={parent_tid}")
1254 ret = global_task_buffer.insertTaskParamsPanda(task_parameters, user, is_production_role, fqans, properErrorCode=True, parent_tid=parent_tid, decode=False)
1255
1256 code, message = ret
1257 success = code in (0, 3)
1258 if not success:
1259 return generate_response(False, message=message, data=code)
1260
1261
1262 task_id = None
1263 match = re.search(r"jediTaskID=(\d+)", message)
1264 if match:
1265 try:
1266 task_id = int(match.group(1))
1267 tmp_log.debug(f"Created task with task_id: {task_id}")
1268 except ValueError:
1269 tmp_log.error("Failed to extract the task ID from the message")
1270 task_id = None
1271
1272 tmp_log.debug("Done")
1273
1274 return generate_response(True, message=message, data=task_id)
1275
1276
1277 @request_validation(_logger, request_method="GET")
1278 def get_task_parameters(req: PandaRequest, task_id: int) -> Dict[str, Any]:
1279 """
1280 Get task parameters
1281
1282 Get a dictionary with the task parameters used to create a task.
1283
1284 API details:
1285 HTTP Method: GET
1286 Path: /v1/task/get_task_parameters
1287
1288 Args:
1289 req(PandaRequest): internally generated request object
1290 task_id(int): JEDI task ID
1291
1292 Returns:
1293 dict: The system response `{"success": success, "message": message, "data": data}`. True for success, False for failure, and an error message. Return code in the data field, 0 for success, others for failure.
1294
1295 """
1296 tmp_logger = LogWrapper(_logger, f"get_task_parameters < task_id={task_id} >")
1297 tmp_logger.debug("Start")
1298
1299
1300 try:
1301 task_id = int(task_id)
1302 except Exception:
1303 tmp_logger.error("Failed due to invalid task_id")
1304 return generate_response(False, message=MESSAGE_TASK_ID)
1305
1306
1307 task_parameters_str = global_task_buffer.getTaskParamsMap(task_id)
1308 if not task_parameters_str:
1309 tmp_logger.error("Failed due to task not found")
1310 return generate_response(False, message="Task not found")
1311
1312
1313 try:
1314 task_parameters = json.loads(task_parameters_str)
1315 except json.JSONDecodeError as e:
1316 tmp_logger.error("Failed due to error decoding the task parameters")
1317 return generate_response(False, message=f"Error decoding the task parameters: {str(e)}")
1318
1319 tmp_logger.debug("Done")
1320
1321 return generate_response(True, data=task_parameters)
1322
1323
1324 @request_validation(_logger, secure=True, request_method="GET")
1325 def get_detailed_info(req: PandaRequest, task_id: int) -> Dict[str, Any]:
1326 """
1327 Get detailed task info
1328
1329 Get all task fields together with jobParamsTemplate and taskParams for a given task.
1330 The record is read without locking. Requires a secure connection.
1331
1332 API details:
1333 HTTP Method: GET
1334 Path: /v1/task/get_detailed_info
1335
1336 Args:
1337 req(PandaRequest): internally generated request object
1338 task_id(int): JEDI task ID
1339
1340 Returns:
1341 dict: The system response ``{"success": success, "message": message, "data": data}``.
1342 On success the ``data`` field contains a dictionary with all JediTaskSpec
1343 attributes plus additional information.
1344 On failure ``success`` is False and ``message`` contains the error description.
1345 """
1346 tmp_logger = LogWrapper(_logger, f"get_detailed_info < task_id={task_id} >")
1347 tmp_logger.debug("Start")
1348
1349 try:
1350 task_id = int(task_id)
1351 except Exception:
1352 tmp_logger.error("Failed due to invalid task_id")
1353 return generate_response(False, message=MESSAGE_TASK_ID)
1354
1355 task_info = global_task_buffer.get_task_details_json(task_id)
1356 if task_info is None:
1357 tmp_logger.error("Task not found or error retrieving task info")
1358 return generate_response(False, message="Task not found or error retrieving task info")
1359
1360 tmp_logger.debug("Done")
1361 return generate_response(True, data=task_info)
1362
1363
1364 @request_validation(_logger, secure=True, request_method="GET")
1365 def get_parent_detailed_info(req: PandaRequest, task_id: int) -> Dict[str, Any]:
1366 """
1367 Get detailed parent task info
1368
1369 Resolve the parent task from a given child task and return only the parent
1370 detailed information. Requires a secure connection.
1371
1372 API details:
1373 HTTP Method: GET
1374 Path: /v1/task/get_parent_detailed_info
1375
1376 Args:
1377 req(PandaRequest): internally generated request object
1378 task_id(int): child JEDI task ID
1379
1380 Returns:
1381 dict: The system response ``{"success": success, "message": message, "data": data}``.
1382 On success the ``data`` field contains the parent task details.
1383 On failure ``success`` is False with a message distinguishing:
1384 child-not-found, no-parent, parent-not-found, or retrieval error.
1385 """
1386 tmp_logger = LogWrapper(_logger, f"get_parent_detailed_info < child_task_id={task_id} >")
1387 tmp_logger.debug("Start")
1388
1389 try:
1390 task_id = int(task_id)
1391 except Exception:
1392 tmp_logger.error("Failed due to invalid task_id")
1393 return generate_response(False, message=MESSAGE_TASK_ID)
1394
1395
1396 resolve_status, parent_task_id, task_info = global_task_buffer.get_task_details_json(
1397 task_id,
1398 resolve_parent=True,
1399 include_resolve_status=True,
1400 )
1401 tmp_logger.debug(f"resolve_parent status={resolve_status} child={task_id} parent={parent_task_id}")
1402
1403 if resolve_status == "child_not_found":
1404 tmp_logger.error(f"Child task not found child={task_id} parent={parent_task_id}")
1405 return generate_response(False, message="Child task not found")
1406 if resolve_status == "no_parent":
1407 tmp_logger.error(f"No valid parent task child={task_id} parent={parent_task_id}")
1408 return generate_response(False, message="No parent task found")
1409 if resolve_status == "parent_not_found":
1410 tmp_logger.error(f"Parent task not found child={task_id} parent={parent_task_id}")
1411 return generate_response(False, message="Parent task not found")
1412 if resolve_status in ["target_not_found", "error"] or task_info is None:
1413 tmp_logger.error(f"Parent task retrieval failed status={resolve_status} child={task_id} parent={parent_task_id}")
1414 return generate_response(False, message="Parent task details not found or retrieval error")
1415
1416 tmp_logger.debug(f"Done child={task_id} parent={parent_task_id}")
1417 return generate_response(True, data=task_info)
1418
1419
1420 @request_validation(_logger, secure=True, request_method="GET")
1421 def get_scout_job_descriptions(req: PandaRequest, task_id: int) -> Dict:
1422 """
1423 Get scout job descriptions for a task.
1424
1425 Resolves all PanDA job IDs associated with a JEDI task, retrieves their job
1426 descriptions including archive lookup, and returns only scout jobs. Scout
1427 jobs are identified by the ``sj`` token in the comma-separated
1428 ``specialHandling`` field.
1429 Requires a secure connection.
1430
1431 API details:
1432 HTTP Method: GET
1433 Path: /v1/task/get_scout_job_descriptions
1434
1435 Args:
1436 req(PandaRequest): internally generated request object containing the env variables.
1437 task_id (int): JEDI task ID.
1438
1439 Returns:
1440 dict: The system response `{"success": success, "message": message, "data": data}`.
1441 When successful, the data field contains a list of scout job descriptions.
1442 If no jobs are found or no scout jobs exist, returns success=True with empty list.
1443 """
1444 tmp_logger = LogWrapper(_logger, f"get_scout_job_descriptions task_id={task_id}")
1445 tmp_logger.debug("Start")
1446
1447 try:
1448 task_id = int(task_id)
1449 except Exception:
1450 tmp_logger.error("Failed due to invalid task_id")
1451 return generate_response(False, message=MESSAGE_TASK_ID)
1452
1453 job_ids = global_task_buffer.getPandaIDsWithTaskID(task_id, scout_only=True)
1454 if not job_ids:
1455 tmp_logger.debug("No scout jobs found for task")
1456 return generate_response(True, data=[])
1457
1458 tmp_logger.debug(f"Found {len(job_ids)} scout job IDs for task")
1459
1460 max_ids = 5500
1461 if len(job_ids) > max_ids:
1462 tmp_logger.error(f"List of PanDA IDs is longer than {max_ids}. Truncating the list.")
1463 job_ids = job_ids[:max_ids]
1464
1465 scout_jobs = [job.to_dict_advanced() for job in global_task_buffer.getFullJobStatus(job_ids) if job is not None]
1466
1467 tmp_logger.debug(f"Done with {len(scout_jobs)} scout jobs")
1468 return generate_response(True, data=scout_jobs)
1469
1470
1471 @request_validation(_logger, secure=True, request_method="GET")
1472 def get_job_descriptions(req: PandaRequest, task_id: int, unsuccessful_only: bool = False) -> Dict:
1473 """
1474 Get job descriptions for a task.
1475
1476 Resolves all PanDA job IDs associated with a JEDI task, retrieves their job
1477 descriptions including archive lookup, and returns them. Optionally filters
1478 to only unsuccessful jobs (status ``failed``, ``cancelled``, or ``closed``).
1479 Requires a secure connection.
1480
1481 API details:
1482 HTTP Method: GET
1483 Path: /v1/task/get_job_descriptions
1484
1485 Args:
1486 req(PandaRequest): internally generated request object containing the env variables.
1487 task_id (int): JEDI task ID.
1488 unsuccessful_only (bool): When True, return only failed, cancelled, or closed jobs.
1489 Defaults to False.
1490
1491 Returns:
1492 dict: The system response `{"success": success, "message": message, "data": data}`.
1493 When successful, the data field contains a list of job descriptions.
1494 If no jobs are found, returns success=True with empty list.
1495 """
1496 tmp_logger = LogWrapper(_logger, f"get_job_descriptions task_id={task_id} unsuccessful_only={unsuccessful_only}")
1497 tmp_logger.debug("Start")
1498
1499 try:
1500 task_id = int(task_id)
1501 except Exception:
1502 tmp_logger.error("Failed due to invalid task_id")
1503 return generate_response(False, message=MESSAGE_TASK_ID)
1504
1505 job_ids = global_task_buffer.getPandaIDsWithTaskID(task_id, unsuccessful_only=unsuccessful_only)
1506 if not job_ids:
1507 tmp_logger.debug("No jobs found for task")
1508 return generate_response(True, data=[])
1509
1510 tmp_logger.debug(f"Found {len(job_ids)} job IDs for task")
1511
1512 max_ids = 5500
1513 if len(job_ids) > max_ids:
1514 tmp_logger.error(f"List of PanDA IDs is longer than {max_ids}. Truncating the list.")
1515 job_ids = job_ids[:max_ids]
1516
1517 jobs = [job.to_dict_advanced() for job in global_task_buffer.getFullJobStatus(job_ids) if job is not None]
1518
1519 tmp_logger.debug(f"Done with {len(jobs)} jobs")
1520 return generate_response(True, data=jobs)