File indexing completed on 2026-04-09 08:38:39
0001 import datetime
0002 import json
0003 import traceback
0004
0005 from pandacommon.pandalogger.LogWrapper import LogWrapper
0006 from pandacommon.pandalogger.PandaLogger import PandaLogger
0007 from pandacommon.pandautils.PandaUtils import naive_utcnow
0008
0009 from pandaserver.api.v1.common import generate_response, request_validation
0010 from pandaserver.srvcore.CoreUtils import clean_user_id
0011
0012 try:
0013 import idds.common.constants
0014 import idds.common.utils
0015 from idds.client.client import Client as iDDS_Client
0016 from idds.client.clientmanager import ClientManager as iDDS_ClientManager
0017 except ImportError:
0018 pass
0019
0020 _logger = PandaLogger().getLogger("api_idds")
0021
0022
0023
0024 def decode_idds_enum(d):
0025 if "__idds_const__" in d:
0026 items = d["__idds_const__"].split(".")
0027 obj = idds.common.constants
0028 for item in items:
0029 obj = getattr(obj, item)
0030 return obj
0031 else:
0032 return d
0033
0034
0035 @request_validation(_logger, secure=True, request_method="POST")
0036 def relay_idds_command(req, command_name: str, args: str = None, kwargs: str = None, manager: bool = False, json_outputs: bool = False):
0037 tmp_log = LogWrapper(
0038 _logger,
0039 f"relay_idds_command-{naive_utcnow().isoformat('/')}",
0040 )
0041
0042 try:
0043 if "+" in command_name:
0044 command_name, idds_host = command_name.split("+")
0045 else:
0046 idds_host = idds.common.utils.get_rest_host()
0047
0048 if manager:
0049 c = iDDS_ClientManager(idds_host)
0050 else:
0051 c = iDDS_Client(idds_host)
0052
0053 if not hasattr(c, command_name):
0054 tmp_str = f"{command_name} is not a command of iDDS {c.__class__.__name__}"
0055 tmp_log.error(tmp_str)
0056 return generate_response(False, tmp_str)
0057
0058 if args:
0059 try:
0060 args = idds.common.utils.json_loads(args)
0061 except Exception as e:
0062 tmp_log.warning(f"failed to load args json with {str(e)}")
0063 args = json.loads(args, object_hook=decode_idds_enum)
0064 else:
0065 args = []
0066
0067 if kwargs:
0068 try:
0069 kwargs = idds.common.utils.json_loads(kwargs)
0070 except Exception as e:
0071 tmp_log.warning(f"failed to load kwargs json with {str(e)}")
0072 kwargs = json.loads(kwargs, object_hook=decode_idds_enum)
0073 else:
0074 kwargs = {}
0075
0076
0077 if json_outputs and manager:
0078 c.setup_json_outputs()
0079
0080
0081 dn = req.subprocess_env.get("SSL_CLIENT_S_DN")
0082 if dn:
0083 c.set_original_user(user_name=clean_user_id(dn))
0084
0085 tmp_log.debug(f"execute: class={c.__class__.__name__} com={command_name} host={idds_host} args={str(args)[:200]} kwargs={str(kwargs)[:200]}")
0086 ret = getattr(c, command_name)(*args, **kwargs)
0087 tmp_log.debug(f"ret: {str(ret)[:200]}")
0088
0089 try:
0090 return generate_response(True, "", ret)
0091 except Exception:
0092
0093 return idds.common.utils.json_dumps((True, ret))
0094
0095 except Exception as e:
0096 tmp_str = f"failed to execute command with {str(e)}"
0097 tmp_log.error(f"{tmp_str} {traceback.format_exc()}")
0098 return generate_response(False, tmp_str)
0099
0100
0101
0102 @request_validation(_logger, secure=True, request_method="POST")
0103 def execute_idds_workflow_command(req, command_name: str, kwargs: str = None, json_outputs: bool = False):
0104 tmp_log = LogWrapper(
0105 _logger,
0106 f"execute_idds_workflow_command-{naive_utcnow().isoformat('/')}",
0107 )
0108 try:
0109 if kwargs:
0110 try:
0111 kwargs = idds.common.utils.json_loads(kwargs)
0112 except Exception:
0113 kwargs = json.loads(kwargs, object_hook=decode_idds_enum)
0114 else:
0115 kwargs = {}
0116
0117 if "+" in command_name:
0118 command_name, idds_host = command_name.split("+")
0119 else:
0120 idds_host = idds.common.utils.get_rest_host()
0121
0122
0123 if command_name in ["get_status"]:
0124 check_owner = False
0125 elif command_name in ["abort", "suspend", "resume", "retry", "finish"]:
0126 check_owner = True
0127 else:
0128 tmp_message = f"{command_name} is unsupported"
0129 tmp_log.error(tmp_message)
0130 return generate_response(False, tmp_message)
0131
0132
0133 c = iDDS_ClientManager(idds_host)
0134 if json_outputs:
0135 c.setup_json_outputs()
0136 dn = req.subprocess_env.get("SSL_CLIENT_S_DN")
0137 if check_owner:
0138
0139 if not dn:
0140 tmp_message = "SSL_CLIENT_S_DN is missing in HTTP request"
0141 tmp_log.error(tmp_message)
0142 return generate_response(False, tmp_message)
0143 requester = clean_user_id(dn)
0144
0145
0146 request_id = kwargs.get("request_id")
0147 if request_id is None:
0148 tmp_message = "request_id is missing"
0149 tmp_log.error(tmp_message)
0150 return generate_response(False, tmp_message)
0151
0152
0153 req = c.get_requests(request_id=request_id)
0154 if not req:
0155 tmp_message = f"request {request_id} is not found"
0156 tmp_log.error(tmp_message)
0157 return generate_response(False, tmp_message)
0158
0159 user_name = req[0].get("username")
0160 if user_name and user_name != requester:
0161 tmp_message = f"request {request_id} is not owned by {requester}"
0162 tmp_log.error(tmp_message)
0163 return generate_response(False, tmp_message)
0164
0165
0166 if dn:
0167 c.set_original_user(user_name=clean_user_id(dn))
0168
0169
0170 tmp_log.debug(f"com={command_name} host={idds_host} kwargs={str(kwargs)}")
0171 ret = getattr(c, command_name)(**kwargs)
0172 tmp_log.debug(str(ret))
0173
0174 if isinstance(ret, dict) and "message" in ret:
0175 return generate_response(True, ret["message"], ret["status"])
0176
0177 return generate_response(True, "", ret)
0178 except Exception as e:
0179 tmp_log.error(f"failed with {str(e)} {traceback.format_exc()}")
0180 return generate_response(False, f"server failed with {str(e)}")