File indexing completed on 2026-04-10 08:39:00
0001 import datetime
0002 import json
0003 from concurrent.futures import ThreadPoolExecutor
0004 from threading import Lock
0005 from typing import Any, Dict, List
0006
0007 from pandacommon.pandalogger.LogWrapper import LogWrapper
0008 from pandacommon.pandalogger.PandaLogger import PandaLogger
0009 from pandacommon.pandautils.PandaUtils import naive_utcnow
0010
0011 from pandaserver.api.v1.common import (
0012 MESSAGE_DATABASE,
0013 TIME_OUT,
0014 TimedMethod,
0015 generate_response,
0016 get_dn,
0017 has_production_role,
0018 request_validation,
0019 )
0020 from pandaserver.srvcore.panda_request import PandaRequest
0021 from pandaserver.taskbuffer.TaskBuffer import TaskBuffer
0022 from pandaserver.workflow.workflow_core import WorkflowInterface
0023
0024 _logger = PandaLogger().getLogger("api_workflow")
0025
0026
0027 global_task_buffer = None
0028 global_wfif = None
0029
0030
0031
0032
0033
0034
0035 def init_task_buffer(task_buffer: TaskBuffer) -> None:
0036 """
0037 Initialize the task buffer and other interfaces. This method needs to be called before any other method in this module.
0038 """
0039 global global_task_buffer
0040 global_task_buffer = task_buffer
0041
0042 global global_wfif
0043 global_wfif = WorkflowInterface(global_task_buffer)
0044
0045
0046 @request_validation(_logger, secure=True, production=False, request_method="POST")
0047 def submit_workflow_raw_request(req: PandaRequest, params: dict | str) -> dict:
0048 """
0049 Submit raw request of PanDA native workflow.
0050
0051 API details:
0052 HTTP Method: POST
0053 Path: /v1/workflow/submit_workflow_raw_request
0054
0055 Args:
0056 req(PandaRequest): internally generated request object containing the env variables
0057 params (dict|str): dictionary or JSON of parameters of the raw request
0058
0059 Returns:
0060 dict: dictionary `{'success': True/False, 'message': 'Description of error', 'data': <requested data>}`
0061 """
0062
0063 user_dn = get_dn(req)
0064 prodsourcelabel = "user"
0065
0066
0067
0068
0069
0070 tmp_logger = LogWrapper(_logger, f'submit_workflow_raw_request prodsourcelabel={prodsourcelabel} user_dn="{user_dn}" ')
0071 tmp_logger.debug("Start")
0072 success, message, data = False, "", None
0073 time_start = naive_utcnow()
0074
0075 if isinstance(params, str):
0076 try:
0077 params = json.loads(params)
0078 except Exception as exc:
0079 message = f"Failed to parse params: {params} {str(exc)}"
0080 tmp_logger.error(message)
0081 return generate_response(success, message, data)
0082
0083 workflow_id = global_wfif.register_workflow(prodsourcelabel, user_dn, raw_request_params=params)
0084
0085 if workflow_id is not None:
0086 success = True
0087 data = {"workflow_id": workflow_id}
0088 else:
0089 message = "Failed to submit raw workflow request"
0090
0091 time_delta = naive_utcnow() - time_start
0092 tmp_logger.debug(f"Done. Took {time_delta.seconds}.{time_delta.microseconds // 1000:03d} sec")
0093
0094 return generate_response(success, message, data)
0095
0096
0097 @request_validation(_logger, secure=True, production=False, request_method="POST")
0098 def submit_workflow(req: PandaRequest, workflow_definition: dict) -> dict:
0099 """
0100 Submit a PanDA native workflow.
0101
0102 API details:
0103 HTTP Method: POST
0104 Path: /v1/workflow/submit_workflow
0105
0106 Args:
0107 req(PandaRequest): internally generated request object containing the env variables
0108 workflow_definition (dict): dictionary of workflow definition
0109
0110 Returns:
0111 dict: dictionary `{'success': True/False, 'message': 'Description of error', 'data': <requested data>}`
0112 """
0113
0114 user_dn = get_dn(req)
0115 prodsourcelabel = "user"
0116 if has_production_role(req):
0117 prodsourcelabel = "managed"
0118 workflow_name = workflow_definition.get("workflow_name", None)
0119
0120 tmp_logger = LogWrapper(_logger, f'submit_workflow prodsourcelabel={prodsourcelabel} user_dn="{user_dn}" workflow_name={workflow_name}')
0121 tmp_logger.debug("Start")
0122 success, message, data = False, "", None
0123 time_start = naive_utcnow()
0124
0125 workflow_id = global_wfif.register_workflow(prodsourcelabel, user_dn, workflow_name, workflow_definition)
0126
0127 if workflow_id is not None:
0128 success = True
0129 data = {"workflow_id": workflow_id}
0130 else:
0131 message = "Failed to submit workflow"
0132
0133 time_delta = naive_utcnow() - time_start
0134 tmp_logger.debug(f"Done. Took {time_delta.seconds}.{time_delta.microseconds // 1000:03d} sec")
0135
0136 return generate_response(success, message, data)