Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-10 08:39:00

0001 import datetime
0002 import json
0003 from concurrent.futures import ThreadPoolExecutor
0004 from threading import Lock
0005 from typing import Any, Dict, List
0006 
0007 from pandacommon.pandalogger.LogWrapper import LogWrapper
0008 from pandacommon.pandalogger.PandaLogger import PandaLogger
0009 from pandacommon.pandautils.PandaUtils import naive_utcnow
0010 
0011 from pandaserver.api.v1.common import (
0012     MESSAGE_DATABASE,
0013     TIME_OUT,
0014     TimedMethod,
0015     generate_response,
0016     get_dn,
0017     has_production_role,
0018     request_validation,
0019 )
0020 from pandaserver.srvcore.panda_request import PandaRequest
0021 from pandaserver.taskbuffer.TaskBuffer import TaskBuffer
0022 from pandaserver.workflow.workflow_core import WorkflowInterface
0023 
0024 _logger = PandaLogger().getLogger("api_workflow")
0025 
0026 # These global variables are initialized in the init_task_buffer method
0027 global_task_buffer = None
0028 global_wfif = None
0029 
0030 # These global variables don't depend on DB access and can be initialized here
0031 # global_proxy_cache = panda_proxy_cache.MyProxyInterface()
0032 # global_token_cache = token_cache.TokenCache()
0033 
0034 
0035 def init_task_buffer(task_buffer: TaskBuffer) -> None:
0036     """
0037     Initialize the task buffer and other interfaces. This method needs to be called before any other method in this module.
0038     """
0039     global global_task_buffer
0040     global_task_buffer = task_buffer
0041 
0042     global global_wfif
0043     global_wfif = WorkflowInterface(global_task_buffer)
0044 
0045 
0046 @request_validation(_logger, secure=True, production=False, request_method="POST")
0047 def submit_workflow_raw_request(req: PandaRequest, params: dict | str) -> dict:
0048     """
0049     Submit raw request of PanDA native workflow.
0050 
0051     API details:
0052         HTTP Method: POST
0053         Path: /v1/workflow/submit_workflow_raw_request
0054 
0055     Args:
0056         req(PandaRequest): internally generated request object containing the env variables
0057         params (dict|str): dictionary or JSON of parameters of the raw request
0058 
0059     Returns:
0060         dict: dictionary `{'success': True/False, 'message': 'Description of error', 'data': <requested data>}`
0061     """
0062 
0063     user_dn = get_dn(req)
0064     prodsourcelabel = "user"
0065 
0066     # FIXME: only for analysis temporarily
0067     # if has_production_role(req):
0068     #     prodsourcelabel = "managed"
0069 
0070     tmp_logger = LogWrapper(_logger, f'submit_workflow_raw_request prodsourcelabel={prodsourcelabel} user_dn="{user_dn}" ')
0071     tmp_logger.debug("Start")
0072     success, message, data = False, "", None
0073     time_start = naive_utcnow()
0074 
0075     if isinstance(params, str):
0076         try:
0077             params = json.loads(params)
0078         except Exception as exc:
0079             message = f"Failed to parse params: {params} {str(exc)}"
0080             tmp_logger.error(message)
0081             return generate_response(success, message, data)
0082 
0083     workflow_id = global_wfif.register_workflow(prodsourcelabel, user_dn, raw_request_params=params)
0084 
0085     if workflow_id is not None:
0086         success = True
0087         data = {"workflow_id": workflow_id}
0088     else:
0089         message = "Failed to submit raw workflow request"
0090 
0091     time_delta = naive_utcnow() - time_start
0092     tmp_logger.debug(f"Done. Took {time_delta.seconds}.{time_delta.microseconds // 1000:03d} sec")
0093 
0094     return generate_response(success, message, data)
0095 
0096 
0097 @request_validation(_logger, secure=True, production=False, request_method="POST")
0098 def submit_workflow(req: PandaRequest, workflow_definition: dict) -> dict:
0099     """
0100     Submit a PanDA native workflow.
0101 
0102     API details:
0103         HTTP Method: POST
0104         Path: /v1/workflow/submit_workflow
0105 
0106     Args:
0107         req(PandaRequest): internally generated request object containing the env variables
0108         workflow_definition (dict): dictionary of workflow definition
0109 
0110     Returns:
0111         dict: dictionary `{'success': True/False, 'message': 'Description of error', 'data': <requested data>}`
0112     """
0113 
0114     user_dn = get_dn(req)
0115     prodsourcelabel = "user"
0116     if has_production_role(req):
0117         prodsourcelabel = "managed"
0118     workflow_name = workflow_definition.get("workflow_name", None)
0119 
0120     tmp_logger = LogWrapper(_logger, f'submit_workflow prodsourcelabel={prodsourcelabel} user_dn="{user_dn}" workflow_name={workflow_name}')
0121     tmp_logger.debug("Start")
0122     success, message, data = False, "", None
0123     time_start = naive_utcnow()
0124 
0125     workflow_id = global_wfif.register_workflow(prodsourcelabel, user_dn, workflow_name, workflow_definition)
0126 
0127     if workflow_id is not None:
0128         success = True
0129         data = {"workflow_id": workflow_id}
0130     else:
0131         message = "Failed to submit workflow"
0132 
0133     time_delta = naive_utcnow() - time_start
0134     tmp_logger.debug(f"Done. Took {time_delta.seconds}.{time_delta.microseconds // 1000:03d} sec")
0135 
0136     return generate_response(success, message, data)