Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-09 08:38:39

0001 import datetime
0002 import json
0003 import traceback
0004 
0005 from pandacommon.pandalogger.LogWrapper import LogWrapper
0006 from pandacommon.pandalogger.PandaLogger import PandaLogger
0007 from pandacommon.pandautils.PandaUtils import naive_utcnow
0008 
0009 from pandaserver.api.v1.common import generate_response, request_validation
0010 from pandaserver.srvcore.CoreUtils import clean_user_id
0011 
0012 try:
0013     import idds.common.constants
0014     import idds.common.utils
0015     from idds.client.client import Client as iDDS_Client
0016     from idds.client.clientmanager import ClientManager as iDDS_ClientManager
0017 except ImportError:
0018     pass
0019 
0020 _logger = PandaLogger().getLogger("api_idds")
0021 
0022 
0023 # json decoder for idds constants
0024 def decode_idds_enum(d):
0025     if "__idds_const__" in d:
0026         items = d["__idds_const__"].split(".")
0027         obj = idds.common.constants
0028         for item in items:
0029             obj = getattr(obj, item)
0030         return obj
0031     else:
0032         return d
0033 
0034 
0035 @request_validation(_logger, secure=True, request_method="POST")
0036 def relay_idds_command(req, command_name: str, args: str = None, kwargs: str = None, manager: bool = False, json_outputs: bool = False):
0037     tmp_log = LogWrapper(
0038         _logger,
0039         f"relay_idds_command-{naive_utcnow().isoformat('/')}",
0040     )
0041 
0042     try:
0043         if "+" in command_name:
0044             command_name, idds_host = command_name.split("+")
0045         else:
0046             idds_host = idds.common.utils.get_rest_host()
0047 
0048         if manager:
0049             c = iDDS_ClientManager(idds_host)
0050         else:
0051             c = iDDS_Client(idds_host)
0052 
0053         if not hasattr(c, command_name):
0054             tmp_str = f"{command_name} is not a command of iDDS {c.__class__.__name__}"
0055             tmp_log.error(tmp_str)
0056             return generate_response(False, tmp_str)
0057 
0058         if args:
0059             try:
0060                 args = idds.common.utils.json_loads(args)
0061             except Exception as e:
0062                 tmp_log.warning(f"failed to load args json with {str(e)}")
0063                 args = json.loads(args, object_hook=decode_idds_enum)
0064         else:
0065             args = []
0066 
0067         if kwargs:
0068             try:
0069                 kwargs = idds.common.utils.json_loads(kwargs)
0070             except Exception as e:
0071                 tmp_log.warning(f"failed to load kwargs json with {str(e)}")
0072                 kwargs = json.loads(kwargs, object_hook=decode_idds_enum)
0073         else:
0074             kwargs = {}
0075 
0076         # json outputs
0077         if json_outputs and manager:
0078             c.setup_json_outputs()
0079 
0080         # set original username
0081         dn = req.subprocess_env.get("SSL_CLIENT_S_DN")
0082         if dn:
0083             c.set_original_user(user_name=clean_user_id(dn))
0084 
0085         tmp_log.debug(f"execute: class={c.__class__.__name__} com={command_name} host={idds_host} args={str(args)[:200]} kwargs={str(kwargs)[:200]}")
0086         ret = getattr(c, command_name)(*args, **kwargs)
0087         tmp_log.debug(f"ret: {str(ret)[:200]}")
0088 
0089         try:
0090             return generate_response(True, "", ret)
0091         except Exception:
0092             # TODO: I don't know how to handle this
0093             return idds.common.utils.json_dumps((True, ret))
0094 
0095     except Exception as e:
0096         tmp_str = f"failed to execute command with {str(e)}"
0097         tmp_log.error(f"{tmp_str} {traceback.format_exc()}")
0098         return generate_response(False, tmp_str)
0099 
0100 
0101 # relay iDDS workflow command with ownership check
0102 @request_validation(_logger, secure=True, request_method="POST")
0103 def execute_idds_workflow_command(req, command_name: str, kwargs: str = None, json_outputs: bool = False):
0104     tmp_log = LogWrapper(
0105         _logger,
0106         f"execute_idds_workflow_command-{naive_utcnow().isoformat('/')}",
0107     )
0108     try:
0109         if kwargs:
0110             try:
0111                 kwargs = idds.common.utils.json_loads(kwargs)
0112             except Exception:
0113                 kwargs = json.loads(kwargs, object_hook=decode_idds_enum)
0114         else:
0115             kwargs = {}
0116 
0117         if "+" in command_name:
0118             command_name, idds_host = command_name.split("+")
0119         else:
0120             idds_host = idds.common.utils.get_rest_host()
0121 
0122         # check permission
0123         if command_name in ["get_status"]:
0124             check_owner = False
0125         elif command_name in ["abort", "suspend", "resume", "retry", "finish"]:
0126             check_owner = True
0127         else:
0128             tmp_message = f"{command_name} is unsupported"
0129             tmp_log.error(tmp_message)
0130             return generate_response(False, tmp_message)
0131 
0132         # check owner
0133         c = iDDS_ClientManager(idds_host)
0134         if json_outputs:
0135             c.setup_json_outputs()
0136         dn = req.subprocess_env.get("SSL_CLIENT_S_DN")
0137         if check_owner:
0138             # requester
0139             if not dn:
0140                 tmp_message = "SSL_CLIENT_S_DN is missing in HTTP request"
0141                 tmp_log.error(tmp_message)
0142                 return generate_response(False, tmp_message)
0143             requester = clean_user_id(dn)
0144 
0145             # get request_id
0146             request_id = kwargs.get("request_id")
0147             if request_id is None:
0148                 tmp_message = "request_id is missing"
0149                 tmp_log.error(tmp_message)
0150                 return generate_response(False, tmp_message)
0151 
0152             # get request
0153             req = c.get_requests(request_id=request_id)
0154             if not req:
0155                 tmp_message = f"request {request_id} is not found"
0156                 tmp_log.error(tmp_message)
0157                 return generate_response(False, tmp_message)
0158 
0159             user_name = req[0].get("username")
0160             if user_name and user_name != requester:
0161                 tmp_message = f"request {request_id} is not owned by {requester}"
0162                 tmp_log.error(tmp_message)
0163                 return generate_response(False, tmp_message)
0164 
0165         # set original username
0166         if dn:
0167             c.set_original_user(user_name=clean_user_id(dn))
0168 
0169         # execute command
0170         tmp_log.debug(f"com={command_name} host={idds_host} kwargs={str(kwargs)}")
0171         ret = getattr(c, command_name)(**kwargs)
0172         tmp_log.debug(str(ret))
0173 
0174         if isinstance(ret, dict) and "message" in ret:
0175             return generate_response(True, ret["message"], ret["status"])
0176 
0177         return generate_response(True, "", ret)
0178     except Exception as e:
0179         tmp_log.error(f"failed with {str(e)} {traceback.format_exc()}")
0180         return generate_response(False, f"server failed with {str(e)}")