Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-10 08:39:00

0001 # See the task state diagram for a better understanding of some of the actions https://panda-wms.readthedocs.io/en/latest/terminology/terminology.html#task
0002 
0003 import datetime
0004 import json
0005 import re
0006 from typing import Any, Dict, List
0007 
0008 from pandacommon.pandalogger.LogWrapper import LogWrapper
0009 from pandacommon.pandalogger.PandaLogger import PandaLogger
0010 from pandacommon.pandautils.PandaUtils import naive_utcnow
0011 
0012 from pandaserver.api.v1.common import (
0013     MESSAGE_TASK_ID,
0014     extract_production_working_groups,
0015     generate_response,
0016     get_dn,
0017     get_email_address,
0018     get_fqan,
0019     has_production_role,
0020     request_validation,
0021 )
0022 from pandaserver.srvcore.CoreUtils import clean_user_id, make_reassign_comment
0023 from pandaserver.srvcore.panda_request import PandaRequest
0024 from pandaserver.taskbuffer import task_split_rules
0025 from pandaserver.taskbuffer.JediTaskSpec import JediTaskSpec
0026 from pandaserver.taskbuffer.TaskBuffer import TaskBuffer
0027 
0028 _logger = PandaLogger().getLogger("api_task")
0029 
0030 global_task_buffer = None
0031 
0032 
0033 def init_task_buffer(task_buffer: TaskBuffer) -> None:
0034     """
0035     Initialize the task buffer. This method needs to be called before any other method in this module.
0036     """
0037     global global_task_buffer
0038     global_task_buffer = task_buffer
0039 
0040 
0041 @request_validation(_logger, secure=True, request_method="POST")
0042 def retry(
0043     req: PandaRequest,
0044     task_id: int,
0045     new_parameters: Dict | str = None,
0046     no_child_retry: bool = False,
0047     discard_events: bool = False,
0048     disable_staging_mode: bool = False,
0049     keep_gshare_priority: bool = False,
0050     ignore_hard_exhausted: bool = False,
0051 ) -> Dict[str, Any]:
0052     """
0053     Task retry
0054 
0055     Retry a given task e.g. in exhausted state. Requires a secure connection without a production role to retry own tasks and with a production role to retry others' tasks.
0056 
0057     API details:
0058         HTTP Method: POST
0059         Path: /v1/task/retry
0060 
0061     Args:
0062         req(PandaRequest): internally generated request object
0063         task_id(int): JEDI Task ID
0064         new_parameters(Dict, optional): a dictionary with the new parameters for rerunning the task. The new parameters are merged with the existing ones.
0065                                         The parameters are the attributes in the JediTaskSpec object (https://github.com/PanDAWMS/panda-jedi/blob/master/pandajedi/jedicore/JediTaskSpec.py).
0066         no_child_retry(bool, optional): if True, the child tasks are not retried. Defaults to False
0067         discard_events(bool, optional): if True, events will be discarded. Defaults to False
0068         disable_staging_mode(bool, optional): if True, the task skips staging state and directly goes to subsequent state. Defaults to False
0069         keep_gshare_priority(bool, optional): if True, the task keeps current gshare and priority. Defaults to False
0070         ignore_hard_exhausted(bool, optional): if True, the task ignores the limits for hard exhausted state and can be retried even if it is very faulty. Defaults to False
0071 
0072     Returns:
0073         dict: The system response `{"success": success, "message": message, "data": data}`. True for success, False for failure, and an error message. Return code in the data field, 0 for success, others for failure.
0074     """
0075     tmp_logger = LogWrapper(_logger, f"retry < task_id={task_id} >")
0076     tmp_logger.debug("Start")
0077 
0078     user = get_dn(req)
0079     production_role = has_production_role(req)
0080 
0081     # retry with new parameters
0082     if new_parameters:
0083         if isinstance(new_parameters, str):
0084             new_parameters = json.loads(new_parameters)
0085 
0086         try:
0087             # get original parameters
0088             old_parameters_json = global_task_buffer.getTaskParamsPanda(task_id)
0089             old_parameters = json.loads(old_parameters_json)
0090 
0091             # update with new values
0092             old_parameters.update(new_parameters)
0093             final_task_parameters_json = json.dumps(old_parameters)
0094 
0095             # retry with new parameters
0096             ret = global_task_buffer.insertTaskParamsPanda(
0097                 final_task_parameters_json,
0098                 user,
0099                 production_role,
0100                 [],
0101                 properErrorCode=True,
0102                 allowActiveTask=True,
0103             )
0104         except Exception as e:
0105             ret = 1, f"new parameter conversion failed with {str(e)}"
0106     else:
0107         # disable ignore_hard_exhausted if not production_role
0108         if not production_role and ignore_hard_exhausted:
0109             ignore_hard_exhausted = False
0110         # get command qualifier
0111         qualifier = JediTaskSpec.get_retry_command_qualifiers(no_child_retry, discard_events, disable_staging_mode, keep_gshare_priority, ignore_hard_exhausted)
0112         qualifier = " ".join(qualifier)
0113         # normal retry
0114         ret = global_task_buffer.sendCommandTaskPanda(
0115             task_id,
0116             user,
0117             production_role,
0118             "retry",
0119             properErrorCode=True,
0120             comQualifier=qualifier,
0121         )
0122 
0123     if ret[0] == 5:
0124         # retry failed analysis jobs
0125         job_ids = global_task_buffer.getJobdefIDsForFailedJob(task_id)
0126         clean_id = clean_user_id(user)
0127         for job_id in job_ids:
0128             global_task_buffer.finalizePendingJobs(clean_id, job_id)
0129         global_task_buffer.increaseAttemptNrPanda(task_id, 5)
0130         return_str = f"retry has been triggered for failed jobs while the task is still {ret[1]}"
0131         if not new_parameters:
0132             ret = 0, return_str
0133         else:
0134             ret = 3, return_str
0135 
0136     tmp_logger.debug("Done")
0137 
0138     data, message = ret
0139     success = True
0140     return generate_response(success, message, data)
0141 
0142 
0143 @request_validation(_logger, secure=True, production=True, request_method="POST")
0144 def resume(req: PandaRequest, task_id: int) -> Dict[str, Any]:
0145     """
0146     Task resume
0147 
0148     Resume a given task. This transitions a paused or throttled task back to its previous active state. Resume can also be used to kick a task in staging state to the next state.
0149     Requires a secure connection and production role.
0150 
0151     API details:
0152         HTTP Method: POST
0153         Path: /v1/task/resume
0154 
0155     Args:
0156         req(PandaRequest): internally generated request object
0157         task_id(int): JEDI Task ID
0158 
0159     Returns:
0160         dict: The system response `{"success": success, "message": message, "data": data}`. True for success, False for failure, and an error message. Return code in the data field, 0 for success, others for failure.
0161     """
0162     tmp_logger = LogWrapper(_logger, f"resume < jediTaskID={task_id} >")
0163     tmp_logger.debug("Start")
0164 
0165     user = get_dn(req)
0166     is_production_role = has_production_role(req)
0167 
0168     # check task_id
0169     try:
0170         task_id = int(task_id)
0171     except ValueError:
0172         tmp_logger.error("Failed due to invalid task_id")
0173         return generate_response(False, message=MESSAGE_TASK_ID)
0174 
0175     ret = global_task_buffer.sendCommandTaskPanda(task_id, user, is_production_role, "resume", properErrorCode=True)
0176     data, message = ret
0177     success = data == 0
0178 
0179     tmp_logger.debug("Done")
0180 
0181     return generate_response(success, message, data)
0182 
0183 
0184 @request_validation(_logger, secure=True, production=True, request_method="POST")
0185 def release(req: PandaRequest, task_id: int) -> Dict[str, Any]:
0186     """
0187     Task release
0188 
0189     Release a given task by skipping iDDS for staging. Requires a secure connection and production role.
0190 
0191     API details:
0192         HTTP Method: POST
0193         Path: /v1/task/release
0194 
0195     Args:
0196         req(PandaRequest): internally generated request object
0197         task_id(int): JEDI Task ID
0198 
0199     Returns:
0200         dict: The system response `{"success": success, "message": message, "data": data}`. True for success, False for failure, and an error message. Return code in the data field, 0 for success, others for failure.
0201     """
0202 
0203     tmp_logger = LogWrapper(_logger, f"release < task_id={task_id} >")
0204     tmp_logger.debug("Start")
0205 
0206     user = get_dn(req)
0207     is_production_role = has_production_role(req)
0208 
0209     # check task_id
0210     try:
0211         task_id = int(task_id)
0212     except ValueError:
0213         tmp_logger.error("Failed due to invalid task_id")
0214         return generate_response(False, message=MESSAGE_TASK_ID)
0215 
0216     ret = global_task_buffer.sendCommandTaskPanda(task_id, user, is_production_role, "release", properErrorCode=True)
0217     data, message = ret
0218     success = data == 0
0219 
0220     tmp_logger.debug("Done")
0221 
0222     return generate_response(success, message, data)
0223 
0224 
0225 # reassign task to site/cloud
0226 @request_validation(_logger, secure=True, request_method="POST")
0227 def reassign(req: PandaRequest, task_id: int, site: str = None, cloud: str = None, nucleus: str = None, mode: str = None):
0228     """
0229     Task reassign
0230 
0231     Reassign a given task to a site, nucleus or cloud - depending on the parameters. Requires a secure connection.
0232 
0233     API details:
0234         HTTP Method: POST
0235         Path: /v1/task/reassign
0236 
0237     Args:
0238         req(PandaRequest): internally generated request object
0239         task_id(int): JEDI Task ID
0240         site(str, optional): site name
0241         cloud(str, optional): cloud name
0242         nucleus(str, optional): nucleus name
0243         mode(str, optional): `kill` (kills all jobs, default), `soft` (kills queued jobs) or `nokill` (doesn't kill jobs)
0244 
0245     Returns:
0246         dict: The system response `{"success": success, "message": message, "data": data}`. True for success, False for failure, and an error message. Return code in the data field, 0 for success, others for failure.
0247     """
0248 
0249     tmp_logger = LogWrapper(_logger, f"reassign < task_id={task_id} >")
0250     tmp_logger.debug("Start")
0251 
0252     # check task_id
0253     try:
0254         task_id = int(task_id)
0255     except ValueError:
0256         tmp_logger.error("Failed due to invalid task_id")
0257         return generate_response(False, message=MESSAGE_TASK_ID)
0258 
0259     user = get_dn(req)
0260     is_production_role = has_production_role(req)
0261 
0262     # reassign to site, nucleus or cloud
0263     comment = make_reassign_comment(site, cloud, nucleus, mode)
0264 
0265     ret = global_task_buffer.sendCommandTaskPanda(
0266         task_id,
0267         user,
0268         is_production_role,
0269         "reassign",
0270         comComment=comment,
0271         properErrorCode=True,
0272     )
0273     data, message = ret
0274     success = data == 0
0275 
0276     tmp_logger.debug("Done")
0277 
0278     return generate_response(success, message, data)
0279 
0280 
0281 @request_validation(_logger, secure=True, production=True, request_method="POST")
0282 def pause(req: PandaRequest, task_id: int) -> Dict[str, Any]:
0283     """
0284     Task pause
0285 
0286     Pause a given task. Requires a secure connection and production role.
0287 
0288     API details:
0289         HTTP Method: POST
0290         Path: /v1/task/pause
0291 
0292     Args:
0293         req(PandaRequest): internally generated request object
0294         task_id(int): JEDI Task ID
0295 
0296     Returns:
0297         dict: The system response `{"success": success, "message": message, "data": data}`. True for success, False for failure, and an error message. Return code in the data field, 0 for success, others for failure.
0298     """
0299 
0300     tmp_logger = LogWrapper(_logger, f"pause < task_id={task_id} >")
0301     tmp_logger.debug("Start")
0302 
0303     try:
0304         task_id = int(task_id)
0305     except ValueError:
0306         tmp_logger.error("Failed due to invalid task_id")
0307         return generate_response(False, message=MESSAGE_TASK_ID)
0308 
0309     user = get_dn(req)
0310     is_production_role = has_production_role(req)
0311 
0312     ret = global_task_buffer.sendCommandTaskPanda(task_id, user, is_production_role, "pause", properErrorCode=True)
0313     data, message = ret
0314     success = data == 0
0315 
0316     tmp_logger.debug("Done")
0317 
0318     return generate_response(success, message, data)
0319 
0320 
0321 @request_validation(_logger, secure=True, request_method="POST")
0322 def kill(req: PandaRequest, task_id: int = None, broadcast: bool = False) -> Dict[str, Any]:
0323     """
0324     Task kill
0325 
0326     Kill a given task. Requires a secure connection.
0327 
0328     API details:
0329         HTTP Method: POST
0330         Path: /v1/task/kill
0331 
0332     Args:
0333         req(PandaRequest): internally generated request object
0334         task_id(int): JEDI Task ID
0335         broadcast(bool, optional): broadcast kill command to pilots to kill the jobs
0336 
0337     Returns:
0338         dict: The system response `{"success": success, "message": message, "data": data}`. True for success, False for failure, and an error message. Return code in the data field, 0 for success, others for failure.
0339     """
0340     tmp_logger = LogWrapper(_logger, f"kill < task_id={task_id} >")
0341     tmp_logger.debug("Start")
0342 
0343     # check task_id
0344     try:
0345         task_id = int(task_id)
0346     except ValueError:
0347         tmp_logger.error("Failed due to invalid task_id")
0348         return generate_response(False, message=MESSAGE_TASK_ID)
0349 
0350     user = get_dn(req)
0351     is_production_role = has_production_role(req)
0352 
0353     ret = global_task_buffer.sendCommandTaskPanda(
0354         task_id,
0355         user,
0356         is_production_role,
0357         "kill",
0358         properErrorCode=True,
0359         broadcast=broadcast,
0360     )
0361     data, message = ret
0362     success = data == 0
0363 
0364     tmp_logger.debug("Done")
0365 
0366     return generate_response(success, message, data)
0367 
0368 
0369 @request_validation(_logger, secure=True, request_method="POST")
0370 def kill_unfinished_jobs(req: PandaRequest, task_id: int, code: int = None, use_email_as_id: bool = False):
0371     """
0372     Kill all unfinished jobs in a task
0373 
0374     Kills all unfinished jobs in a task. Requires a secure connection.
0375 
0376     API details:
0377         HTTP Method: POST
0378         Path: /v1/task/kill_unfinished_jobs
0379 
0380     Args:
0381         req(PandaRequest): internally generated request object containing the env variables
0382         task_id(int): JEDI task ID
0383         code(int, optional): The kill code. Defaults to None.
0384             ```
0385             code
0386             2: expire
0387             3: aborted
0388             4: expire in waiting
0389             7: retry by server
0390             8: rebrokerage
0391             9: force kill
0392             10: fast rebrokerage in overloaded PQ
0393             50: kill by JEDI
0394             51: reassigned by JEDI
0395             52: force kill by JEDI
0396             55: killed since task is (almost) done
0397             60: workload was terminated by the pilot without actual work
0398             91: kill user jobs with prod role
0399             99: force kill user jobs with prod role
0400             ```
0401         use_email_as_id(bool, optional): Use the email as ID. Defaults to False.
0402 
0403     Returns:
0404         dict: The system response `{"success": success, "message": message, "data": data}`. The data field contains a list of bools indicating the success of the kill operations.
0405     """
0406 
0407     tmp_logger = LogWrapper(_logger, f"kill_unfinished_jobs")
0408 
0409     # retrieve the user information
0410     user = get_dn(req)
0411     fqans = get_fqan(req)
0412     is_production_manager = has_production_role(req)
0413 
0414     if use_email_as_id:
0415         email = get_email_address(user, tmp_logger)
0416         if email:
0417             user = email
0418 
0419     tmp_logger.debug(f"Start user: {user} code: {code} is_production_manager: {is_production_manager} fqans: {fqans} task_id: {task_id}")
0420 
0421     # Extract working groups with production role from FQANs
0422     wg_prod_roles = extract_production_working_groups(fqans)
0423 
0424     # get PandaIDs
0425     job_ids = global_task_buffer.getPandaIDsWithTaskID(task_id)
0426 
0427     # kill jobs
0428     ret = global_task_buffer.killJobs(job_ids, user, code, is_production_manager, wg_prod_roles, [])
0429     tmp_logger.debug(f"Done with ret: {ret}")
0430     return generate_response(True, data=ret)
0431 
0432 
0433 @request_validation(_logger, secure=True, request_method="POST")
0434 def finish(req: PandaRequest, task_id: int, soft: bool = False, broadcast: bool = False) -> Dict[str, Any]:
0435     """
0436     Task finish
0437 
0438     Finish a given task. Requires a secure connection.
0439 
0440     API details:
0441         HTTP Method: POST
0442         Path: /v1/task/finish
0443 
0444     Args:
0445         req(PandaRequest): internally generated request object
0446         task_id(int): JEDI Task ID
0447         soft(bool, optional): soft finish
0448         broadcast(bool, optional): broadcast finish command to pilots
0449 
0450     Returns:
0451         dict: The system response. True for success, False for failure, and an error message. Return code in the data field, 0 for success, others for failure.
0452     """
0453     tmp_logger = LogWrapper(_logger, f"finish < task_id={task_id} soft={soft} broadcast={broadcast} >")
0454     tmp_logger.debug("Start")
0455 
0456     qualifier = None
0457     if soft:
0458         qualifier = "soft"
0459 
0460     user = get_dn(req)
0461     is_production_role = has_production_role(req)
0462 
0463     # check task_id
0464     try:
0465         task_id = int(task_id)
0466     except ValueError:
0467         tmp_logger.error("Failed due to invalid task_id")
0468         return generate_response(False, message=MESSAGE_TASK_ID)
0469 
0470     ret = global_task_buffer.sendCommandTaskPanda(
0471         task_id,
0472         user,
0473         is_production_role,
0474         "finish",
0475         properErrorCode=True,
0476         comQualifier=qualifier,
0477         broadcast=broadcast,
0478     )
0479     data, message = ret
0480     success = data == 0
0481 
0482     tmp_logger.debug("Done")
0483 
0484     return generate_response(success, message, data)
0485 
0486 
0487 @request_validation(_logger, secure=True, production=True, request_method="POST")
0488 def reactivate(req: PandaRequest, task_id: int, keep_attempt_nr: bool = False, trigger_job_generation: bool = False) -> Dict[str, Any]:
0489     """
0490     Reactivate task
0491 
0492     Reactivate a given task, i.e. recycle a finished/done task. A reactivated task will generate new jobs and then go to done/finished.
0493     Requires a secure connection and production role.
0494 
0495     API details:
0496         HTTP Method: POST
0497         Path: /v1/task/reactivate
0498 
0499     Args:
0500         req(PandaRequest): internally generated request object
0501         task_id(int): JEDI Task ID
0502         keep_attempt_nr(bool, optional): keep the original attempt number
0503         trigger_job_generation(bool, optional): trigger the job generation
0504 
0505     Returns:
0506         dict: The system response. True for success, False for failure, and an error message. Return code in the data field, 0 for success, others for failure.
0507     """
0508     tmp_logger = LogWrapper(_logger, f"reactivate < task_id={task_id} >")
0509     tmp_logger.debug("Start")
0510 
0511     try:
0512         task_id = int(task_id)
0513     except ValueError:
0514         tmp_logger.error("Failed due to invalid task_id")
0515         return generate_response(False, message=MESSAGE_TASK_ID)
0516 
0517     ret = global_task_buffer.reactivateTask(task_id, keep_attempt_nr, trigger_job_generation)
0518     code, message = ret
0519     success = code == 0
0520 
0521     tmp_logger.debug("Done")
0522 
0523     return generate_response(success, message, code)
0524 
0525 
0526 @request_validation(_logger, secure=True, production=True, request_method="POST")
0527 def avalanche(req: PandaRequest, task_id: int) -> Dict[str, Any]:
0528     """
0529     Task avalanche
0530 
0531     Avalanche a given task. This triggers the avalanche for tasks in scouting state or dynamically reconfigures the task to skip over the scouting state. Requires a secure connection and production role.
0532 
0533     API details:
0534         HTTP Method: POST
0535         Path: /v1/task/avalanche
0536 
0537     Args:
0538         req(PandaRequest): internally generated request object
0539         task_id(int): JEDI Task ID
0540 
0541     Returns:
0542         dict: The system response. True for success, False for failure, and an error message. Return code in the data field, 0 for success, others for failure.
0543     """
0544     tmp_logger = LogWrapper(_logger, f"avalanche < task_id={task_id} >")
0545     tmp_logger.debug("Start")
0546 
0547     user = get_dn(req)
0548     is_production_role = has_production_role(req)
0549 
0550     # check task_id
0551     try:
0552         task_id = int(task_id)
0553     except ValueError:
0554         tmp_logger.error("Failed due to invalid task_id")
0555         return generate_response(False, message=MESSAGE_TASK_ID)
0556 
0557     ret = global_task_buffer.sendCommandTaskPanda(task_id, user, is_production_role, "avalanche", properErrorCode=True)
0558     data, message = ret
0559     success = data == 0
0560 
0561     tmp_logger.debug("Done")
0562 
0563     return generate_response(success, message, data)
0564 
0565 
0566 @request_validation(_logger, secure=True, request_method="POST")
0567 def reload_input(req: PandaRequest, task_id: int, ignore_hard_exhausted: bool = False) -> Dict[str, Any]:
0568     """
0569     Reload input
0570 
0571     Request to reload the input for a given task. Requires a secure connection and production role.
0572 
0573     API details:
0574         HTTP Method: POST
0575         Path: /v1/task/reload_input
0576 
0577     Args:
0578         req(PandaRequest): internally generated request object
0579         task_id(int): JEDI Task ID
0580         ignore_hard_exhausted(bool, optional): ignore the limits for hard exhausted
0581 
0582     Returns:
0583         dict: The system response. True for success, False for failure, and an error message. Return code in the data field, 0 for success, others for failure.
0584     """
0585     tmp_logger = LogWrapper(_logger, f"reload_input < task_id={task_id} >")
0586     tmp_logger.debug("Start")
0587 
0588     user = get_dn(req)
0589     is_production_role = has_production_role(req)
0590 
0591     # check task_id
0592     try:
0593         task_id = int(task_id)
0594     except ValueError:
0595         tmp_logger.error("Failed due to invalid task_id")
0596         return generate_response(False, message=MESSAGE_TASK_ID)
0597 
0598     # allow ignore_hard_exhausted only for production role
0599     if not is_production_role and ignore_hard_exhausted:
0600         ignore_hard_exhausted = False
0601 
0602     # specify ignore_hard_exhausted as a command qualifier in command's comment
0603     com_qualifier = JediTaskSpec.get_retry_command_qualifiers(ignore_hard_exhausted=ignore_hard_exhausted)
0604     com_comment = json.dumps([{}, com_qualifier])
0605 
0606     ret = global_task_buffer.sendCommandTaskPanda(task_id, user, is_production_role, "incexec", comComment=com_comment, properErrorCode=True)
0607     data, message = ret
0608     success = data == 0
0609 
0610     tmp_logger.debug("Done")
0611 
0612     return generate_response(success, message, data)
0613 
0614 
0615 @request_validation(_logger, secure=True, production=True, request_method="POST")
0616 def reassign_global_share(req: PandaRequest, task_id_list: List[int], share: str, reassign_running_jobs: bool) -> Dict[str, Any]:
0617     """
0618     Reassign the global share of a task
0619 
0620     Reassign the global share of a task. Requires a secure connection and production role.
0621 
0622     API details:
0623         HTTP Method: POST
0624         Path: /v1/task/reassign_global_share
0625 
0626     Args:
0627         req(PandaRequest): internally generated request object
0628         task_id_list(list): List of JEDI task IDs to reassign
0629         share(str): destination share
0630         reassign_running_jobs(bool): whether you want to reassign existing running jobs
0631 
0632     Returns:
0633         dict: The system response. True for success, False for failure, and an error message. Return code in the data field, 0 for success, others for failure.
0634     """
0635 
0636     tmp_logger = LogWrapper(_logger, f"reassign_global_share < task_id_list={task_id_list} share={share} reassign_running_jobs={reassign_running_jobs} >")
0637     tmp_logger.debug("Start")
0638 
0639     if not isinstance(task_id_list, list) or not isinstance(share, str):
0640         tmp_logger.error("Failed due to invalid task list")
0641         return generate_response(False, message="wrong parameters: task_ids must be list and share must be string")
0642 
0643     code, message = global_task_buffer.reassignShare(task_id_list, share, reassign_running_jobs)
0644     success = code == 0
0645 
0646     tmp_logger.debug("Done")
0647 
0648     return generate_response(success, message, code)
0649 
0650 
0651 @request_validation(_logger, secure=True, production=True, request_method="POST")
0652 def enable_jumbo_jobs(req: PandaRequest, task_id: int, jumbo_jobs_total: int, jumbo_jobs_per_site: int = None):
0653     """
0654     Enable Jumbo jobs
0655 
0656     Enables the Jumbo jobs for a given task ID. Requires a secure connection and production role.
0657 
0658     API details:
0659         HTTP Method: POST
0660         Path: /v1/task/enable_jumbo_jobs
0661 
0662     Args:
0663         req(PandaRequest): internally generated request object
0664         task_id(int): JEDI task ID
0665         jumbo_jobs_total(int): Total number of jumbo jobs
0666         jumbo_jobs_per_site(int): Number of jumbo jobs per site. Defaults to `jumbo_jobs_total`.
0667 
0668     Returns:
0669         dict: The system response. True for success, False for failure, and an error message. Return code in the data field, 0 for success, others for failure.
0670     """
0671 
0672     tmp_logger = LogWrapper(_logger, f"enable_jumbo_jobs < task_id={task_id} jumbo_jobs_total={jumbo_jobs_total} n_jumbo_jobs_per_site={jumbo_jobs_per_site} >")
0673     tmp_logger.debug("Start")
0674 
0675     if not jumbo_jobs_per_site:
0676         jumbo_jobs_per_site = jumbo_jobs_total
0677 
0678     code, message = global_task_buffer.enableJumboJobs(task_id, jumbo_jobs_total, jumbo_jobs_per_site)
0679     if jumbo_jobs_total > 0 and code == 0:
0680         tmp_logger.debug("Calling task avalanche")
0681         avalanche(task_id)
0682 
0683     success = code == 0
0684 
0685     tmp_logger.debug("Done")
0686     return generate_response(success, message, code)
0687 
0688 
0689 @request_validation(_logger, secure=True, request_method="GET")
0690 def get_jumbo_job_datasets(req: PandaRequest, from_offset: int, to_offset: int = 0) -> Dict:
0691     """
0692     Get jumbo job datasets
0693 
0694     Gets a map of the jumbo-job-enabled tasks to their datasets, filtering by the last modification time (now - from_offset to now - to_offset).
0695     Requires a secure connection.
0696 
0697     API details:
0698         HTTP Method: GET
0699         Path: /v1/task/get_jumbo_job_datasets
0700 
0701     Args:
0702         req(PandaRequest): internally generated request object
0703         from_offset(int): `now - from_offset` in days will serve as the floor for modification time (Previously called n_days)
0704         to_offset(int, optional): `now - to_offset` in days will serve as the ceiling for modification time. Defaults to 0. (Previously called grace_period)
0705 
0706     Returns:
0707         dict: The system response `{"success": success, "message": message, "data": data}`.
0708               When successful, the data field contains the dictionary of JEDI task IDs to datasets.
0709     """
0710     tmp_logger = LogWrapper(_logger, f"get_jumbo_job_datasets")
0711 
0712     tmp_logger.debug("Start")
0713     jumbo_datasets = global_task_buffer.getJumboJobDatasets(from_offset, to_offset)
0714     tmp_logger.debug("Done")
0715 
0716     return generate_response(True, data=jumbo_datasets)
0717 
0718 
0719 @request_validation(_logger, secure=True, production=True, request_method="POST")
0720 def enable_job_cloning(
0721     req: PandaRequest,
0722     jedi_task_id: int,
0723     mode: str = None,
0724     multiplicity: int = None,
0725     num_sites: int = None,
0726 ) -> Dict[str, Any]:
0727     """
0728     Enable job cloning
0729 
0730     Enable job cloning for a given task. Requires secure connection and production role.
0731 
0732     API details:
0733         HTTP Method: POST
0734         Path: /v1/task/enable_job_cloning
0735 
0736     Args:
0737         req(PandaRequest): internally generated request object
0738         jedi_task_id(int): JEDI Task ID
0739         mode(str, optional): mode of operation, runonce or storeonce
0740         multiplicity(int, optional): number of clones to be created for each target
0741         num_sites(int, optional): number of sites to be used for each target
0742 
0743     Returns:
0744         dict: The system response `{"success": success, "message": message, "data": data}`.
0745               When there was an error, the message field contains the description.
0746     """
0747     tmp_logger = LogWrapper(_logger, f"enable_job_cloning < jedi_task_id={jedi_task_id} >")
0748     tmp_logger.debug("Start")
0749     success, message = global_task_buffer.enable_job_cloning(jedi_task_id, mode, multiplicity, num_sites)
0750     tmp_logger.debug("Done")
0751     return generate_response(success, message)
0752 
0753 
0754 @request_validation(_logger, secure=True, production=True, request_method="POST")
0755 def disable_job_cloning(req: PandaRequest, jedi_task_id: int) -> Dict[str, Any]:
0756     """
0757     Disable job cloning
0758 
0759     Disable job cloning for a given task. Requires secure connection and production role.
0760 
0761     API details:
0762         HTTP Method: POST
0763         Path: /v1/task/disable_job_cloning
0764 
0765     Args:
0766         req(PandaRequest): internally generated request object
0767         jedi_task_id(int): JEDI Task ID
0768 
0769     Returns:
0770         dict: The system response `{"success": success, "message": message, "data": data}`.
0771               When there was an error, the message field contains the description.
0772     """
0773     tmp_logger = LogWrapper(_logger, f"disable_job_cloning < jedi_task_id={jedi_task_id} >")
0774     tmp_logger.debug("Start")
0775     success, message = global_task_buffer.disable_job_cloning(jedi_task_id)
0776     tmp_logger.debug("Done")
0777     return generate_response(success, message)
0778 
0779 
0780 @request_validation(_logger, secure=True, production=True, request_method="POST")
0781 def increase_attempts(req: PandaRequest, task_id: int, increase: int) -> Dict[str, Any]:
0782     """
0783     Increase possible task attempts
0784 
0785     Increase possible task attempts. Requires secure connection and production role.
0786 
0787     API details:
0788         HTTP Method: POST
0789         Path: /v1/task/increase_attempts
0790 
0791     Args:
0792         req(PandaRequest): internally generated request object
0793         task_id(int): JEDI Task ID
0794         increase(int): number of attempts to increase
0795 
0796     Returns:
0797         dict: The system response `{"success": success, "message": message, "data": data}`.
0798               When there was an error, the message field contains the description and the data field contains the code.
0799     """
0800     tmp_logger = LogWrapper(_logger, f"increase_attempt_number task_id={task_id}")
0801     tmp_logger.debug("Start")
0802 
0803     try:
0804         task_id = int(task_id)
0805     except Exception:
0806         tmp_logger.error("Failed due to invalid task_id")
0807         return generate_response(False, message=MESSAGE_TASK_ID)
0808 
0809     # check value for increase
0810     try:
0811         increase = int(increase)
0812         if increase < 0:
0813             raise ValueError
0814     except Exception:
0815         message = f"increase must be a positive integer, got {increase}"
0816         tmp_logger.error(message)
0817         return generate_response(False, message=message)
0818 
0819     code, message = global_task_buffer.increaseAttemptNrPanda(task_id, increase)
0820     success = code == 0
0821 
0822     tmp_logger.debug("Done")
0823 
0824     return generate_response(success, message, code)
0825 
0826 
0827 @request_validation(_logger, secure=True, request_method="GET")
0828 def get_status(req, task_id):
0829     """
0830     Get task status
0831 
0832     Get the status of a given task. Requires secure connection.
0833 
0834     API details:
0835         HTTP Method: GET
0836         Path: /v1/task/get_status
0837 
0838     Args:
0839         req(PandaRequest): internally generated request object
0840         task_id(int): JEDI Task ID
0841 
0842     Returns:
0843         dict: The system response `{"success": success, "message": message, "data": data}`.
0844               When successful, the data field contains the status of the task.
0845               When there was an error, the message field contains the description.
0846     """
0847     tmp_logger = LogWrapper(_logger, f"get_status < task_id={task_id} >")
0848     tmp_logger.debug("Start")
0849 
0850     try:
0851         task_id = int(task_id)
0852     except ValueError:
0853         tmp_logger.error("Failed due to invalid task_id")
0854         return generate_response(False, message=MESSAGE_TASK_ID)
0855 
0856     ret = global_task_buffer.getTaskStatus(task_id)
0857     if not ret:
0858         return generate_response(False, message="Task not found")
0859     status = ret[0]
0860 
0861     tmp_logger.debug("Done")
0862 
0863     return generate_response(True, data=status)
0864 
0865 
0866 @request_validation(_logger, request_method="GET", secure=True)
0867 def get_details(req: PandaRequest, task_id: int, include_parameters: bool = False, include_status: bool = False):
0868     """
0869     Get task details
0870 
0871     Get the details of a given task. Requires secure connection.
0872 
0873     API details:
0874         HTTP Method: GET
0875         Path: /v1/task/get_details
0876 
0877     Args:
0878         req(PandaRequest): internally generated request object
0879         task_id(int): JEDI Task ID
0880         include_parameters(bool, optional): flag to include task parameter information (Previously fullFlag)
0881         include_status(bool, optional): flag to include status information (Previously withTaskInfo)
0882 
0883     Returns:
0884         dict: The system response `{"success": success, "message": message, "data": data}`.
0885               When successful, the data field contains the task details.
0886               When there was an error, the message field contains the description.
0887     """
0888     tmp_logger = LogWrapper(_logger, f"get_details  < task_id={task_id} include_parameters={include_parameters} include_status={include_status} >")
0889     tmp_logger.debug("Start")
0890 
0891     details = global_task_buffer.getJediTaskDetails(task_id, include_parameters, include_status)
0892     if not details:
0893         tmp_logger.error("Task not found or error retrieving the details")
0894         return generate_response(False, message="Task not found or error retrieving the details")
0895 
0896     tmp_logger.debug("Done")
0897 
0898     return generate_response(True, data=details)
0899 
0900 
0901 @request_validation(_logger, secure=True, production=True, request_method="POST")
0902 def change_attribute(req: PandaRequest, task_id: int, attribute_name: str, value: int) -> Dict[str, Any]:
0903     """
0904     Change a task attribute
0905 
0906     Change a task attribute within the list of valid attributes ("ramCount", "wallTime", "cpuTime", "coreCount").
0907     Requires a secure connection and production role.
0908 
0909     API details:
0910         HTTP Method: POST
0911         Path: /v1/task/change_attribute
0912 
0913     Args:
0914         req(PandaRequest): internally generated request object
0915         task_id(int): JEDI task ID
0916         attribute_name(str): attribute to change
0917         value(int): value to set to the attribute
0918 
0919     Returns:
0920         dict: The system response `{"success": success, "message": message, "data": data}`. True for success, False for failure, and an error message.
0921               Return code in the data field, 0 for success, others for failure.
0922     """
0923     tmp_logger = LogWrapper(_logger, f"change_attribute < task_id={task_id} attribute_name={attribute_name} value={value} >")
0924     tmp_logger.debug("Start")
0925 
0926     # check task_id
0927     try:
0928         task_id = int(task_id)
0929     except ValueError:
0930         tmp_logger.error("Failed due to invalid task_id")
0931         return generate_response(False, message=MESSAGE_TASK_ID)
0932 
0933     # check if attribute_name is valid
0934     valid_attributes = ["ramCount", "wallTime", "cpuTime", "coreCount"]
0935     if attribute_name not in valid_attributes:
0936         tmp_logger.error("Failed due to invalid attribute_name")
0937         return generate_response(False, message=f"{attribute_name} is not a valid attribute. Valid attributes are {valid_attributes}")
0938 
0939     n_tasks_changed = global_task_buffer.changeTaskAttributePanda(task_id, attribute_name, value)
0940     if n_tasks_changed is None:  # method excepted
0941         tmp_logger.error("Failed due to exception while changing the attribute")
0942         return generate_response(False, message="Exception while changing the attribute")
0943 
0944     if n_tasks_changed == 0:  # no tasks were changed should mean that it doesn't exist
0945         tmp_logger.error("Failed due to task not found")
0946         return generate_response(False, message="Task not found")
0947 
0948     tmp_logger.debug("Done")
0949     return generate_response(True, message=f"{n_tasks_changed} tasks changed")
0950 
0951 
0952 @request_validation(_logger, secure=True, production=True, request_method="POST")
0953 def change_modification_time(req: PandaRequest, task_id: int, hour_offset: int) -> Dict[str, Any]:
0954     """
0955     Change task modification time
0956 
0957     Change the modification time for a task to `now() + hour_offset`. Requires a secure connection and production role.
0958 
0959     API details:
0960         HTTP Method: POST
0961         Path: /v1/task/change_modification_time
0962 
0963     Args:
0964         req(PandaRequest): internally generated request object
0965         task_id(int): JEDI task ID
0966         hour_offset(int): number of hours to add to the current time. Use a negative value (e.g. -12) to trigger task brokerage.
0967 
0968     Returns:
0969         dict: The system response `{"success": success, "message": message, "data": data}`. True for success, False for failure, and an error message. Return code in the data field, 0 for success, others for failure.
0970     """
0971     tmp_logger = LogWrapper(_logger, f"change_modification_time < task_id={task_id} hour_offset={hour_offset} >")
0972     tmp_logger.debug("Start")
0973 
0974     # check offset
0975     try:
0976         new_modification_time = datetime.datetime.now() + datetime.timedelta(hours=hour_offset)
0977     except ValueError:
0978         tmp_logger.error("Failed due to invalid hour_offset")
0979         return generate_response(False, message=f"failed to convert {hour_offset} to time")
0980 
0981     n_tasks_changed = global_task_buffer.changeTaskAttributePanda(task_id, "modificationTime", new_modification_time)
0982     if n_tasks_changed is None:  # method excepted
0983         tmp_logger.error("Failed due to exception while changing the attribute")
0984         return generate_response(False, message="Exception while changing the attribute")
0985 
0986     if n_tasks_changed == 0:  # no tasks were changed should mean that it doesn't exist
0987         tmp_logger.error("Failed due to task not found")
0988         return generate_response(False, message="Task not found")
0989 
0990     tmp_logger.debug("Done")
0991     return generate_response(True, message=f"{n_tasks_changed} tasks changed")
0992 
0993 
0994 @request_validation(_logger, secure=True, production=True, request_method="POST")
0995 def change_priority(req: PandaRequest, task_id: int, priority: int):
0996     """
0997     Change priority
0998 
0999     Change the priority of a given task. Requires a secure connection and production role.
1000 
1001     API details:
1002         HTTP Method: POST
1003         Path: /v1/task/change_priority
1004 
1005     Args:
1006         req(PandaRequest): internally generated request object
1007         task_id(int): JEDI task ID
1008         priority(int): new priority for the task
1009 
1010     Returns:
1011         dict: The system response `{"success": success, "message": message, "data": data}`.
1012               True for success, False for failure, and an error message. Return code in the data field, 0 for success, others for failure.
1013     """
1014     tmp_logger = LogWrapper(_logger, f"change_priority < task_id={task_id} priority={priority} >")
1015     tmp_logger.debug("Start")
1016 
1017     # check task_id
1018     try:
1019         task_id = int(task_id)
1020     except ValueError:
1021         tmp_logger.error("Failed due to invalid task_id")
1022         return generate_response(False, message=MESSAGE_TASK_ID)
1023 
1024     # check priority
1025     try:
1026         priority = int(priority)
1027     except ValueError:
1028         tmp_logger.error("Failed due to invalid priority")
1029         return generate_response(False, message="priority must be an integer")
1030 
1031     n_tasks_changed = global_task_buffer.changeTaskPriorityPanda(task_id, priority)
1032 
1033     if n_tasks_changed is None:  # method excepted
1034         tmp_logger.error("Failed due to exception while changing the priority")
1035         return generate_response(False, message="Exception while changing the priority")
1036 
1037     if n_tasks_changed == 0:  # no tasks were changed should mean that it doesn't exist
1038         tmp_logger.error("Failed due to task not found")
1039         return generate_response(False, message="Task not found")
1040 
1041     tmp_logger.debug("Done")
1042 
1043     return generate_response(True, message=f"{n_tasks_changed} tasks changed")
1044 
1045 
1046 @request_validation(_logger, secure=True, production=True, request_method="POST")
1047 def change_split_rule(req: PandaRequest, task_id: int, attribute_name: str, value: str) -> Dict[str, Any]:
1048     """
1049     Change the split rule
1050 
1051     Change the split rule for a task by modifying or adding an `attribute_name=value pair`. Requires a secure connection and production role.
1052 
1053     API details:
1054         HTTP Method: POST
1055         Path: /v1/task/change_split_rule
1056 
1057     Args:
1058         req(PandaRequest): internally generated request object
1059         task_id(int): JEDI task ID
1060         attribute_name(str): split rule attribute to change. The allowed attributes are defined in `task_split_rules.changeable_split_rule_tags`
1061         value(str): value to set to the attribute
1062 
1063     Returns:
1064         dict: The system response `{"success": success, "message": message, "data": data}`. True for success, False for failure, and an error message.
1065               Return code in the data field, 0 for success, others for failure.
1066     """
1067     tmp_logger = LogWrapper(_logger, f"change_split_rule < task_id={task_id} attribute_name={attribute_name} value={value} >")
1068     tmp_logger.debug("Start")
1069 
1070     # check task_id
1071     try:
1072         task_id = int(task_id)
1073     except ValueError:
1074         tmp_logger.error("Failed due to invalid task_id")
1075         return generate_response(False, message=MESSAGE_TASK_ID)
1076 
1077     # check attribute
1078     if attribute_name not in task_split_rules.changeable_split_rule_tags:
1079         tmp_logger.error("Failed due to invalid attribute_name")
1080         return generate_response(
1081             False, message=f"{attribute_name} is not a valid attribute. Valid attributes are {task_split_rules.changeable_split_rule_tags}", data=2
1082         )
1083 
1084     n_tasks_changed = global_task_buffer.changeTaskSplitRulePanda(task_id, attribute_name, value)
1085     if n_tasks_changed is None:  # method excepted
1086         tmp_logger.error("Failed due to exception while changing the split rule")
1087         return generate_response(False, message="Exception while changing the split rule")
1088 
1089     if n_tasks_changed == 0:  # no tasks were changed should mean that it doesn't exist
1090         tmp_logger.error("Failed due to task not found")
1091         return generate_response(False, message="Task not found")
1092 
1093     tmp_logger.debug("Done")
1094 
1095     return generate_response(True, message=f"{n_tasks_changed} tasks changed")
1096 
1097 
1098 @request_validation(_logger, secure=True, request_method="GET")
1099 def get_tasks_modified_since(req, since: str, dn: str = None, full: bool = False, min_task_id: int = None, prod_source_label: str = "user") -> Dict[str, Any]:
1100     """
1101     Get tasks modified since
1102 
1103     Get the tasks with `modificationtime > since`. Requires a secure connection.
1104 
1105     API details:
1106         HTTP Method: GET
1107         Path: /v1/task/get_tasks_modified_since
1108 
1109     Args:
1110         req(PandaRequest): internally generated request object
1111         since(str): time in the format `%Y-%m-%d %H:%M:%S`, e.g. `2024-12-18 14:30:45`. The tasks with `modificationtime > since` will be returned
1112         dn(str, optional): user DN
1113         full(bool, optional): flag to include full task information. If `full=False` the basic fields are `jediTaskID, modificationTime, status, processingType, transUses, transHome, architecture, reqID, creationDate, site, cloud, taskName`
1114         min_task_id(int, optional): minimum task ID
1115         prod_source_label(str, optional): task type (e.g. `user`, `managed`, `test`, etc.)
1116 
1117     Returns:
1118         dict: The system response `{"success": success, "message": message, "data": data}`. True for success, False for failure, and an error message. Return code in the data field, 0 for success, others for failure.
1119     """
1120     tmp_logger = LogWrapper(_logger, "get_tasks_modified_since")
1121     tmp_logger.debug("Start")
1122 
1123     if not dn:
1124         dn = get_dn(req)
1125 
1126     tmp_logger.debug(f"parameters dn:{dn} since:{since} full:{full} min_task_id:{min_task_id} prod_source_label:{prod_source_label}")
1127 
1128     tasks = global_task_buffer.getJediTasksInTimeRange(dn, since, full, min_task_id, prod_source_label)
1129 
1130     tmp_logger.debug("Done")
1131 
1132     return generate_response(True, data=tasks)
1133 
1134 
1135 @request_validation(_logger, secure=True, request_method="GET")
1136 def get_datasets_and_files(req, task_id, dataset_types: List[str] = ("input", "pseudo_input")) -> Dict[str, Any]:
1137     """
1138     Get datasets and files
1139 
1140     Get the files in the datasets associated to a given task. You can filter passing a list of dataset types. The return format is:
1141     ```
1142     [
1143         {
1144             "dataset": {
1145                 "name": dataset_name,
1146                 "id": dataset_id
1147             },
1148             "files": [
1149                 {
1150                     "lfn": lfn,
1151                     "scope": file_scope,
1152                     "id": file_id,
1153                     "status": status
1154                 },
1155                 ...
1156             ]
1157         },
1158         ...
1159     ]
1160     ```
1161     Requires a secure connection.
1162 
1163     API details:
1164         HTTP Method: GET
1165         Path: /v1/task/get_datasets_and_files
1166 
1167     Args:
1168         req(PandaRequest): internally generated request object
1169         task_id(int): JEDI task ID
1170         dataset_types(List, optional): list of dataset types, defaults to `["input", "pseudo_input"]`
1171 
1172     Returns:
1173         dict: The system response `{"success": success, "message": message, "data": data}`. True for success, False for failure, and an error message. Return code in the data field, 0 for success, others for failure.
1174     """
1175     tmp_logger = LogWrapper(_logger, f"get_datasets_and_files < task_id={task_id} dataset_types={dataset_types} >")
1176     tmp_logger.debug("Start")
1177 
1178     data = global_task_buffer.get_files_in_datasets(task_id, dataset_types)
1179     if data is None:
1180         tmp_logger.error("Failed due to exception while gathering files")
1181         return generate_response(False, message="Database exception while gathering files")
1182 
1183     if data == []:
1184         tmp_logger.error("Failed due to no data found for the task")
1185         return generate_response(False, message="No data found for the task")
1186 
1187     tmp_logger.debug("Done")
1188 
1189     return generate_response(True, data=data)
1190 
1191 
1192 @request_validation(_logger, secure=True, request_method="GET")
1193 def get_job_ids(req: PandaRequest, task_id: int) -> Dict[str, Any]:
1194     """
1195     Get job IDs
1196 
1197     Get a list with the job IDs `[job_id, ...]` (in any status) associated to a given task. Requires a secure connection.
1198 
1199     API details:
1200         HTTP Method: GET
1201         Path: /v1/task/get_job_ids
1202 
1203     Args:
1204         req(PandaRequest): internally generated request object
1205         task_id(int): JEDI task ID
1206 
1207     Returns:
1208         dict: The system response `{"success": success, "message": message, "data": data}`. True for success, False for failure, and an error message. Return code in the data field, 0 for success, others for failure.
1209     """
1210     tmp_logger = LogWrapper(_logger, f"get_job_ids < task_id={task_id} >")
1211     tmp_logger.debug("Start")
1212 
1213     try:
1214         task_id = int(task_id)
1215     except ValueError:
1216         tmp_logger.error("Failed due to invalid task_id")
1217         return generate_response(False, message=MESSAGE_TASK_ID)
1218 
1219     job_id_list = global_task_buffer.getPandaIDsWithTaskID(task_id)
1220 
1221     tmp_logger.debug("Done")
1222 
1223     return generate_response(True, data=job_id_list)
1224 
1225 
1226 @request_validation(_logger, secure=True, request_method="POST")
1227 def submit(req: PandaRequest, task_parameters: Dict, parent_tid: int = None) -> Dict[str, Any]:
1228     """
1229     Register task
1230 
1231     Insert the task parameters to register a task. Requires a secure connection.
1232 
1233     API details:
1234         HTTP Method: POST
1235         Path: /v1/task/submit
1236 
1237     Args:
1238         req(PandaRequest): internally generated request object
1239         task_parameters(dict): Dictionary with all the required task parameters. The parameters are the attributes in the JediTaskSpec object (https://github.com/PanDAWMS/panda-jedi/blob/master/pandajedi/jedicore/JediTaskSpec.py).
1240         parent_tid(int, optional): Parent task ID
1241 
1242     Returns:
1243         dict: The system response `{"success": success, "message": message, "data": data}`. True for success, False for failure, and an error message.
1244               Return code in the data field, 0 for success, others for failure.
1245     """
1246     tmp_log = LogWrapper(_logger, f"submit {naive_utcnow().isoformat('/')}")
1247     tmp_log.debug("Start")
1248 
1249     user = get_dn(req)
1250     is_production_role = has_production_role(req)
1251     fqans = get_fqan(req)
1252 
1253     tmp_log.debug(f"user={user} is_production_role={is_production_role} FQAN:{str(fqans)} parent_tid={parent_tid}")
1254     ret = global_task_buffer.insertTaskParamsPanda(task_parameters, user, is_production_role, fqans, properErrorCode=True, parent_tid=parent_tid, decode=False)
1255 
1256     code, message = ret
1257     success = code in (0, 3)
1258     if not success:
1259         return generate_response(False, message=message, data=code)
1260 
1261     # Extract the task ID from the message
1262     task_id = None
1263     match = re.search(r"jediTaskID=(\d+)", message)
1264     if match:
1265         try:
1266             task_id = int(match.group(1))  # Convert to an integer
1267             tmp_log.debug(f"Created task with task_id: {task_id}")
1268         except ValueError:
1269             tmp_log.error("Failed to extract the task ID from the message")
1270             task_id = None
1271 
1272     tmp_log.debug("Done")
1273 
1274     return generate_response(True, message=message, data=task_id)
1275 
1276 
1277 @request_validation(_logger, request_method="GET")
1278 def get_task_parameters(req: PandaRequest, task_id: int) -> Dict[str, Any]:
1279     """
1280     Get task parameters
1281 
1282     Get a dictionary with the task parameters used to create a task.
1283 
1284     API details:
1285         HTTP Method: GET
1286         Path: /v1/task/get_task_parameters
1287 
1288     Args:
1289         req(PandaRequest): internally generated request object
1290         task_id(int): JEDI task ID
1291 
1292     Returns:
1293         dict: The system response `{"success": success, "message": message, "data": data}`. True for success, False for failure, and an error message. Return code in the data field, 0 for success, others for failure.
1294 
1295     """
1296     tmp_logger = LogWrapper(_logger, f"get_task_parameters < task_id={task_id} >")
1297     tmp_logger.debug("Start")
1298 
1299     # validate the task id
1300     try:
1301         task_id = int(task_id)
1302     except Exception:
1303         tmp_logger.error("Failed due to invalid task_id")
1304         return generate_response(False, message=MESSAGE_TASK_ID)
1305 
1306     # get the parameters
1307     task_parameters_str = global_task_buffer.getTaskParamsMap(task_id)
1308     if not task_parameters_str:
1309         tmp_logger.error("Failed due to task not found")
1310         return generate_response(False, message="Task not found")
1311 
1312     # decode the parameters
1313     try:
1314         task_parameters = json.loads(task_parameters_str)
1315     except json.JSONDecodeError as e:
1316         tmp_logger.error("Failed due to error decoding the task parameters")
1317         return generate_response(False, message=f"Error decoding the task parameters: {str(e)}")
1318 
1319     tmp_logger.debug("Done")
1320 
1321     return generate_response(True, data=task_parameters)
1322 
1323 
1324 @request_validation(_logger, secure=True, request_method="GET")
1325 def get_detailed_info(req: PandaRequest, task_id: int) -> Dict[str, Any]:
1326     """
1327     Get detailed task info
1328 
1329     Get all task fields together with jobParamsTemplate and taskParams for a given task.
1330     The record is read without locking. Requires a secure connection.
1331 
1332     API details:
1333         HTTP Method: GET
1334         Path: /v1/task/get_detailed_info
1335 
1336     Args:
1337         req(PandaRequest): internally generated request object
1338         task_id(int): JEDI task ID
1339 
1340     Returns:
1341         dict: The system response ``{"success": success, "message": message, "data": data}``.
1342               On success the ``data`` field contains a dictionary with all JediTaskSpec
1343               attributes plus additional information.
1344               On failure ``success`` is False and ``message`` contains the error description.
1345     """
1346     tmp_logger = LogWrapper(_logger, f"get_detailed_info < task_id={task_id} >")
1347     tmp_logger.debug("Start")
1348 
1349     try:
1350         task_id = int(task_id)
1351     except Exception:
1352         tmp_logger.error("Failed due to invalid task_id")
1353         return generate_response(False, message=MESSAGE_TASK_ID)
1354 
1355     task_info = global_task_buffer.get_task_details_json(task_id)
1356     if task_info is None:
1357         tmp_logger.error("Task not found or error retrieving task info")
1358         return generate_response(False, message="Task not found or error retrieving task info")
1359 
1360     tmp_logger.debug("Done")
1361     return generate_response(True, data=task_info)
1362 
1363 
1364 @request_validation(_logger, secure=True, request_method="GET")
1365 def get_parent_detailed_info(req: PandaRequest, task_id: int) -> Dict[str, Any]:
1366     """
1367     Get detailed parent task info
1368 
1369     Resolve the parent task from a given child task and return only the parent
1370     detailed information. Requires a secure connection.
1371 
1372     API details:
1373         HTTP Method: GET
1374         Path: /v1/task/get_parent_detailed_info
1375 
1376     Args:
1377         req(PandaRequest): internally generated request object
1378         task_id(int): child JEDI task ID
1379 
1380     Returns:
1381         dict: The system response ``{"success": success, "message": message, "data": data}``.
1382               On success the ``data`` field contains the parent task details.
1383               On failure ``success`` is False with a message distinguishing:
1384               child-not-found, no-parent, parent-not-found, or retrieval error.
1385     """
1386     tmp_logger = LogWrapper(_logger, f"get_parent_detailed_info < child_task_id={task_id} >")
1387     tmp_logger.debug("Start")
1388 
1389     try:
1390         task_id = int(task_id)
1391     except Exception:
1392         tmp_logger.error("Failed due to invalid task_id")
1393         return generate_response(False, message=MESSAGE_TASK_ID)
1394 
1395     # Resolve parent and retrieve details in one DB-helper call.
1396     resolve_status, parent_task_id, task_info = global_task_buffer.get_task_details_json(
1397         task_id,
1398         resolve_parent=True,
1399         include_resolve_status=True,
1400     )
1401     tmp_logger.debug(f"resolve_parent status={resolve_status} child={task_id} parent={parent_task_id}")
1402 
1403     if resolve_status == "child_not_found":
1404         tmp_logger.error(f"Child task not found child={task_id} parent={parent_task_id}")
1405         return generate_response(False, message="Child task not found")
1406     if resolve_status == "no_parent":
1407         tmp_logger.error(f"No valid parent task child={task_id} parent={parent_task_id}")
1408         return generate_response(False, message="No parent task found")
1409     if resolve_status == "parent_not_found":
1410         tmp_logger.error(f"Parent task not found child={task_id} parent={parent_task_id}")
1411         return generate_response(False, message="Parent task not found")
1412     if resolve_status in ["target_not_found", "error"] or task_info is None:
1413         tmp_logger.error(f"Parent task retrieval failed status={resolve_status} child={task_id} parent={parent_task_id}")
1414         return generate_response(False, message="Parent task details not found or retrieval error")
1415 
1416     tmp_logger.debug(f"Done child={task_id} parent={parent_task_id}")
1417     return generate_response(True, data=task_info)
1418 
1419 
1420 @request_validation(_logger, secure=True, request_method="GET")
1421 def get_scout_job_descriptions(req: PandaRequest, task_id: int) -> Dict:
1422     """
1423     Get scout job descriptions for a task.
1424 
1425     Resolves all PanDA job IDs associated with a JEDI task, retrieves their job
1426     descriptions including archive lookup, and returns only scout jobs. Scout
1427     jobs are identified by the ``sj`` token in the comma-separated
1428     ``specialHandling`` field.
1429     Requires a secure connection.
1430 
1431     API details:
1432         HTTP Method: GET
1433         Path: /v1/task/get_scout_job_descriptions
1434 
1435     Args:
1436         req(PandaRequest): internally generated request object containing the env variables.
1437         task_id (int): JEDI task ID.
1438 
1439     Returns:
1440         dict: The system response `{"success": success, "message": message, "data": data}`.
1441               When successful, the data field contains a list of scout job descriptions.
1442               If no jobs are found or no scout jobs exist, returns success=True with empty list.
1443     """
1444     tmp_logger = LogWrapper(_logger, f"get_scout_job_descriptions task_id={task_id}")
1445     tmp_logger.debug("Start")
1446 
1447     try:
1448         task_id = int(task_id)
1449     except Exception:
1450         tmp_logger.error("Failed due to invalid task_id")
1451         return generate_response(False, message=MESSAGE_TASK_ID)
1452 
1453     job_ids = global_task_buffer.getPandaIDsWithTaskID(task_id, scout_only=True)
1454     if not job_ids:
1455         tmp_logger.debug("No scout jobs found for task")
1456         return generate_response(True, data=[])
1457 
1458     tmp_logger.debug(f"Found {len(job_ids)} scout job IDs for task")
1459 
1460     max_ids = 5500
1461     if len(job_ids) > max_ids:
1462         tmp_logger.error(f"List of PanDA IDs is longer than {max_ids}. Truncating the list.")
1463         job_ids = job_ids[:max_ids]
1464 
1465     scout_jobs = [job.to_dict_advanced() for job in global_task_buffer.getFullJobStatus(job_ids) if job is not None]
1466 
1467     tmp_logger.debug(f"Done with {len(scout_jobs)} scout jobs")
1468     return generate_response(True, data=scout_jobs)
1469 
1470 
1471 @request_validation(_logger, secure=True, request_method="GET")
1472 def get_job_descriptions(req: PandaRequest, task_id: int, unsuccessful_only: bool = False) -> Dict:
1473     """
1474     Get job descriptions for a task.
1475 
1476     Resolves all PanDA job IDs associated with a JEDI task, retrieves their job
1477     descriptions including archive lookup, and returns them. Optionally filters
1478     to only unsuccessful jobs (status ``failed``, ``cancelled``, or ``closed``).
1479     Requires a secure connection.
1480 
1481     API details:
1482         HTTP Method: GET
1483         Path: /v1/task/get_job_descriptions
1484 
1485     Args:
1486         req(PandaRequest): internally generated request object containing the env variables.
1487         task_id (int): JEDI task ID.
1488         unsuccessful_only (bool): When True, return only failed, cancelled, or closed jobs.
1489             Defaults to False.
1490 
1491     Returns:
1492         dict: The system response `{"success": success, "message": message, "data": data}`.
1493               When successful, the data field contains a list of job descriptions.
1494               If no jobs are found, returns success=True with empty list.
1495     """
1496     tmp_logger = LogWrapper(_logger, f"get_job_descriptions task_id={task_id} unsuccessful_only={unsuccessful_only}")
1497     tmp_logger.debug("Start")
1498 
1499     try:
1500         task_id = int(task_id)
1501     except Exception:
1502         tmp_logger.error("Failed due to invalid task_id")
1503         return generate_response(False, message=MESSAGE_TASK_ID)
1504 
1505     job_ids = global_task_buffer.getPandaIDsWithTaskID(task_id, unsuccessful_only=unsuccessful_only)
1506     if not job_ids:
1507         tmp_logger.debug("No jobs found for task")
1508         return generate_response(True, data=[])
1509 
1510     tmp_logger.debug(f"Found {len(job_ids)} job IDs for task")
1511 
1512     max_ids = 5500
1513     if len(job_ids) > max_ids:
1514         tmp_logger.error(f"List of PanDA IDs is longer than {max_ids}. Truncating the list.")
1515         job_ids = job_ids[:max_ids]
1516 
1517     jobs = [job.to_dict_advanced() for job in global_task_buffer.getFullJobStatus(job_ids) if job is not None]
1518 
1519     tmp_logger.debug(f"Done with {len(jobs)} jobs")
1520     return generate_response(True, data=jobs)