File indexing completed on 2026-04-10 08:39:02
0001
0002
0003 """
0004 entry point
0005
0006 """
0007
0008 import datetime
0009 import decimal
0010 import gzip
0011 import io
0012 import json
0013 import os
0014 import signal
0015 import sys
0016 import tempfile
0017 import traceback
0018 from collections import defaultdict
0019 from urllib.parse import parse_qsl
0020
0021 from pandacommon.pandalogger.LogWrapper import LogWrapper
0022 from pandacommon.pandalogger.PandaLogger import PandaLogger
0023 from pandacommon.pandautils.PandaUtils import naive_utcnow
0024 from pandacommon.pandautils.thread_utils import GenericThread
0025 from werkzeug.datastructures import CombinedMultiDict, EnvironHeaders
0026 from werkzeug.formparser import parse_form_data
0027
0028 import pandaserver.taskbuffer.ErrorCode
0029 from pandaserver.api.v1 import credential_management_api as cred_api_v1
0030 from pandaserver.api.v1 import data_carousel_api as data_carousel_api_v1
0031 from pandaserver.api.v1 import event_api as event_api_v1
0032 from pandaserver.api.v1 import file_server_api as file_server_api_v1
0033 from pandaserver.api.v1 import harvester_api as harvester_api_v1
0034 from pandaserver.api.v1 import idds_api as idds_api_v1
0035 from pandaserver.api.v1 import job_api as job_api_v1
0036 from pandaserver.api.v1 import metaconfig_api as metaconfig_api_v1
0037 from pandaserver.api.v1 import pilot_api as pilot_api_v1
0038 from pandaserver.api.v1 import statistics_api as statistics_api_v1
0039 from pandaserver.api.v1 import system_api as system_api_v1
0040 from pandaserver.api.v1 import task_api as task_api_v1
0041 from pandaserver.api.v1 import workflow_api as workflow_api_v1
0042 from pandaserver.api.v1.common import extract_allowed_methods
0043 from pandaserver.config import panda_config
0044
0045
0046 from pandaserver.jobdispatcher.JobDispatcher import (
0047 ackCommands,
0048 checkEventsAvailability,
0049 checkJobStatus,
0050 get_access_token,
0051 get_events_status,
0052 get_max_worker_id,
0053 get_token_key,
0054 getCommands,
0055 getEventRanges,
0056 getJob,
0057 getKeyPair,
0058 getProxy,
0059 getResourceTypes,
0060 getStatus,
0061 jobDispatcher,
0062 updateEventRange,
0063 updateEventRanges,
0064 updateJob,
0065 updateJobsInBulk,
0066 updateWorkerPilotStatus,
0067 )
0068 from pandaserver.srvcore import CoreUtils
0069
0070
0071 from pandaserver.srvcore.allowed_methods import allowed_methods
0072 from pandaserver.srvcore.panda_request import PandaRequest
0073 from pandaserver.taskbuffer.Initializer import initializer
0074 from pandaserver.taskbuffer.TaskBuffer import taskBuffer
0075
0076
0077 from pandaserver.taskbuffer.Utils import (
0078 delete_checkpoint,
0079 deleteFile,
0080 fetchLog,
0081 getAttr,
0082 getServer,
0083 getVomsAttr,
0084 isAlive,
0085 put_checkpoint,
0086 put_file_recovery_request,
0087 put_workflow_request,
0088 putEventPickingRequest,
0089 putFile,
0090 touchFile,
0091 updateLog,
0092 uploadLog,
0093 )
0094 from pandaserver.userinterface import Client
0095
0096
0097 from pandaserver.userinterface.UserIF import (
0098 addHarvesterDialogs,
0099 avalancheTask,
0100 changeTaskAttributePanda,
0101 changeTaskModTimePanda,
0102 changeTaskPriority,
0103 changeTaskSplitRulePanda,
0104 checkSandboxFile,
0105 enableJumboJobs,
0106 execute_idds_workflow_command,
0107 finishTask,
0108 get_ban_users,
0109 get_files_in_datasets,
0110 get_job_statistics_per_site_label_resource,
0111 get_user_secrets,
0112 getFullJobStatus,
0113 getJediTaskDetails,
0114 getJediTasksInTimeRange,
0115 getJobStatisticsPerSite,
0116 getJobStatus,
0117 getJumboJobDatasets,
0118 getPandaIDsWithTaskID,
0119 getScriptOfflineRunning,
0120 getTaskParamsMap,
0121 getTaskStatus,
0122 getUserJobMetadata,
0123 getWorkerStats,
0124 harvesterIsAlive,
0125 increaseAttemptNrPanda,
0126 insertSandboxFileInfo,
0127 insertTaskParams,
0128 killJobs,
0129 killTask,
0130 killUnfinishedJobs,
0131 pauseTask,
0132 reactivateTask,
0133 reassignJobs,
0134 reassignShare,
0135 reassignTask,
0136 relay_idds_command,
0137 release_task,
0138 reloadInput,
0139 reportWorkerStats_jobtype,
0140 resumeTask,
0141 retryTask,
0142 send_command_to_job,
0143 set_user_secret,
0144 setDebugMode,
0145 setNumSlotsForWP,
0146 submitJobs,
0147 sweepPQ,
0148 updateServiceMetrics,
0149 updateWorkers,
0150 userIF,
0151 )
0152
0153 _logger = PandaLogger().getLogger("Entry")
0154
0155 LATEST = "1"
0156
0157
0158
0159 cred_api_v1_methods = extract_allowed_methods(cred_api_v1)
0160 data_carousel_api_v1_methods = extract_allowed_methods(data_carousel_api_v1)
0161 event_api_v1_methods = extract_allowed_methods(event_api_v1)
0162 file_server_api_v1_methods = extract_allowed_methods(file_server_api_v1)
0163 harvester_api_v1_methods = extract_allowed_methods(harvester_api_v1)
0164 idds_api_v1_methods = extract_allowed_methods(idds_api_v1)
0165 job_api_v1_methods = extract_allowed_methods(job_api_v1)
0166 metaconfig_api_v1_methods = extract_allowed_methods(metaconfig_api_v1)
0167 pilot_api_v1_methods = extract_allowed_methods(pilot_api_v1)
0168 statistics_api_v1_methods = extract_allowed_methods(statistics_api_v1)
0169 system_api_v1_methods = extract_allowed_methods(system_api_v1)
0170 task_api_v1_methods = extract_allowed_methods(task_api_v1)
0171 workflow_api_v1_methods = extract_allowed_methods(workflow_api_v1)
0172
0173
0174 initializer.init()
0175
0176
0177 requester_id = GenericThread().get_full_id(__name__, sys.modules[__name__].__file__)
0178 taskBuffer.init(
0179 panda_config.dbhost,
0180 panda_config.dbpasswd,
0181 nDBConnection=panda_config.nDBConnection,
0182 useTimeout=True,
0183 requester=requester_id,
0184 )
0185
0186 if panda_config.nDBConnection != 0:
0187
0188 cred_api_v1.init_task_buffer(taskBuffer)
0189 data_carousel_api_v1.init_task_buffer(taskBuffer)
0190 event_api_v1.init_task_buffer(taskBuffer)
0191 file_server_api_v1.init_task_buffer(taskBuffer)
0192 harvester_api_v1.init_task_buffer(taskBuffer)
0193
0194 job_api_v1.init_task_buffer(taskBuffer)
0195 metaconfig_api_v1.init_task_buffer(taskBuffer)
0196 pilot_api_v1.init_task_buffer(taskBuffer)
0197 statistics_api_v1.init_task_buffer(taskBuffer)
0198
0199 task_api_v1.init_task_buffer(taskBuffer)
0200 workflow_api_v1.init_task_buffer(taskBuffer)
0201
0202
0203 jobDispatcher.init(taskBuffer)
0204
0205
0206 userIF.init(taskBuffer)
0207
0208
0209 if panda_config.nDBConnection != 0:
0210
0211 ban_user_list = CoreUtils.CachedObject("ban_list", 600, taskBuffer.get_ban_users, _logger)
0212 else:
0213
0214 ban_user_list = CoreUtils.CachedObject("ban_list", 600, Client.get_banned_users, _logger)
0215
0216
0217 def pre_validate_request(panda_request):
0218
0219 if not panda_request.authenticated:
0220 error_message = f"Token authentication failed. {panda_request.message}"
0221 return error_message
0222
0223
0224 username = panda_request.subprocess_env.get("SSL_CLIENT_S_DN", None)
0225 if username:
0226 username = CoreUtils.clean_user_id(username)
0227 if username in ban_user_list:
0228 error_message = f"{username} is banned"
0229 return error_message
0230
0231 return None
0232
0233
0234 def read_body(environ, content_length):
0235
0236 body = b""
0237 while content_length > 0:
0238 chunk = environ["wsgi.input"].read(min(content_length, 1024 * 1024))
0239 if not chunk:
0240 break
0241 content_length -= len(chunk)
0242 body += chunk
0243 if content_length > 0:
0244
0245 raise OSError(f"partial read from client. {content_length} bytes remaining")
0246
0247 return body
0248
0249
0250 def parse_qsl_parameters(environ, body, request_method):
0251
0252 environ["wsgi.input"] = io.BytesIO(body)
0253 environ["CONTENT_LENGTH"] = str(len(body))
0254 environ["wsgi.headers"] = EnvironHeaders(environ)
0255
0256
0257 if request_method in ["GET", "HEAD"]:
0258
0259 results_tmp = defaultdict(list)
0260 parameter_list = parse_qsl(environ.get("QUERY_STRING", ""), keep_blank_values=True)
0261 for key, value in parameter_list:
0262 results_tmp[key].append(value)
0263
0264 params = {key: values[0] if len(values) == 1 else values for key, values in results_tmp.items()}
0265
0266
0267 else:
0268
0269 _, form, files = parse_form_data(environ)
0270
0271
0272 params = dict(CombinedMultiDict([form, files]))
0273 return params
0274
0275
0276 def parse_json_parameters_legacy(body):
0277
0278
0279 body = gzip.decompress(body)
0280
0281
0282 params = json.loads(body)
0283 for key in list(params):
0284 if params[key] is True:
0285 params[key] = "True"
0286 elif params[key] is False:
0287 params[key] = "False"
0288 return params
0289
0290
0291 def parse_json_parameters(body, content_encoding):
0292
0293
0294 if content_encoding == "gzip":
0295 body = gzip.decompress(body)
0296
0297
0298 params = json.loads(body)
0299
0300 return params
0301
0302
0303 def parse_parameters(api_module, json_app, json_body, content_encoding, environ, body, request_method):
0304
0305 if is_new_api(api_module):
0306
0307 if json_body:
0308 return parse_json_parameters(body, content_encoding)
0309 else:
0310 return parse_qsl_parameters(environ, body, request_method)
0311
0312
0313 else:
0314
0315 if json_app:
0316 return parse_json_parameters_legacy(body)
0317
0318 else:
0319 return parse_qsl_parameters(environ, body, request_method)
0320
0321
0322 def is_new_api(api_module):
0323 return api_module != "panda"
0324
0325
0326 def parse_script_name(environ):
0327 method_name = ""
0328 api_module = ""
0329 version = "v0"
0330
0331 if "SCRIPT_NAME" in environ:
0332 script_name = environ["SCRIPT_NAME"]
0333 fields = script_name.split("/")
0334
0335
0336 if script_name.startswith("/server/panda/") and len(fields) == 4:
0337 api_module = "panda"
0338 method_name = fields[-1]
0339
0340
0341 elif script_name.startswith("/api/") and len(fields) == 5:
0342 method_name = fields[-1]
0343 api_module = fields[-2]
0344 version = fields[-3]
0345 if version == "latest":
0346 version = LATEST
0347
0348 else:
0349 _logger.error(f"Could not parse script name: {script_name}")
0350
0351 return method_name, api_module, version
0352
0353
0354 def module_mapping(version, api_module):
0355 mapping = {
0356 "v0": {"panda": {"module": None, "allowed_methods": allowed_methods}},
0357 "v1": {
0358 "creds": {"module": cred_api_v1, "allowed_methods": cred_api_v1_methods},
0359 "data_carousel": {"module": data_carousel_api_v1, "allowed_methods": data_carousel_api_v1_methods},
0360 "event": {"module": event_api_v1, "allowed_methods": event_api_v1_methods},
0361 "file_server": {"module": file_server_api_v1, "allowed_methods": file_server_api_v1_methods},
0362 "harvester": {"module": harvester_api_v1, "allowed_methods": harvester_api_v1_methods},
0363 "idds": {"module": idds_api_v1, "allowed_methods": idds_api_v1_methods},
0364 "job": {"module": job_api_v1, "allowed_methods": job_api_v1_methods},
0365 "metaconfig": {"module": metaconfig_api_v1, "allowed_methods": metaconfig_api_v1_methods},
0366 "pilot": {"module": pilot_api_v1, "allowed_methods": pilot_api_v1_methods},
0367 "statistics": {"module": statistics_api_v1, "allowed_methods": statistics_api_v1_methods},
0368 "system": {"module": system_api_v1, "allowed_methods": system_api_v1_methods},
0369 "task": {"module": task_api_v1, "allowed_methods": task_api_v1_methods},
0370 "workflow": {"module": workflow_api_v1, "allowed_methods": workflow_api_v1_methods},
0371 },
0372 }
0373 try:
0374 return mapping[version][api_module]
0375 except KeyError:
0376 _logger.error(f"Could not find module {api_module} in API version {version}")
0377 return None
0378
0379
0380 def validate_method(method_name, api_module, version):
0381
0382 mapping = module_mapping(version, api_module)
0383 if mapping and method_name in mapping["allowed_methods"]:
0384 return True
0385
0386 return False
0387
0388
0389
0390 def encode_special_cases(obj):
0391 if isinstance(obj, datetime.datetime):
0392 return {"__datetime__": obj.isoformat()}
0393 if isinstance(obj, decimal.Decimal):
0394 if obj == obj.to_integral_value():
0395 return int(obj)
0396 else:
0397 return float(obj)
0398 raise TypeError(f"Type not serializable for {obj} ({type(obj)})")
0399
0400
0401
0402 def application(environ, start_response):
0403
0404 method_name, api_module, version = parse_script_name(environ)
0405
0406 tmp_log = LogWrapper(_logger, f"PID={os.getpid()} module={api_module} method={method_name} version={version}", seeMem=True)
0407 cont_length = int(environ.get("CONTENT_LENGTH", 0))
0408 request_method = environ.get("REQUEST_METHOD", None)
0409
0410
0411 new_api = is_new_api(api_module)
0412
0413
0414
0415 if new_api:
0416 json_app = environ.get("CONTENT_TYPE", None) == "application/json" or environ.get("HTTP_ACCEPT", None) == "application/json"
0417 else:
0418 json_app = environ.get("CONTENT_TYPE", None) == "application/json"
0419 json_body = environ.get("CONTENT_TYPE", None) == "application/json" and request_method in ["PUT", "POST"]
0420
0421
0422
0423 content_encoding = environ.get("HTTP_CONTENT_ENCODING")
0424
0425 tmp_log.debug(f"""start content-length={cont_length} json={json_app} origin={environ.get("HTTP_ORIGIN", None)}""")
0426
0427 start_time = naive_utcnow()
0428 return_type = None
0429
0430
0431 if not validate_method(method_name, api_module, version):
0432 error_message = f"method {method_name} is forbidden"
0433 tmp_log.error(error_message)
0434 start_response("403 Forbidden", [("Content-Type", "text/plain")])
0435 return [f"ERROR : {error_message}".encode()]
0436
0437
0438 try:
0439 if new_api:
0440 module = module_mapping(version, api_module)["module"]
0441 tmp_method = getattr(module, method_name)
0442 else:
0443 tmp_method = globals()[method_name]
0444 except Exception:
0445 error_message = f"method {method_name} is undefined in {api_module} {version}"
0446 tmp_log.error(error_message)
0447 start_response("500 INTERNAL SERVER ERROR", [("Content-Type", "text/plain")])
0448 return ["ERROR : {error_message}".encode()]
0449
0450 try:
0451
0452 panda_request = PandaRequest(environ, tmp_log)
0453
0454
0455 error_message = pre_validate_request(panda_request)
0456 if error_message:
0457 tmp_log.error(error_message)
0458 start_response("403 Forbidden", [("Content-Type", "text/plain")])
0459 return [f"ERROR : {error_message}".encode()]
0460
0461
0462 body = read_body(environ, cont_length)
0463
0464
0465 params = parse_parameters(api_module, json_app, json_body, content_encoding, environ, body, request_method)
0466
0467 if panda_config.entryVerbose:
0468 tmp_log.debug(f"with {str(list(params))}")
0469
0470
0471 param_list = [panda_request]
0472 exec_result = tmp_method(*param_list, **params)
0473
0474
0475 if isinstance(exec_result, dict) and "type" in exec_result and "content" in exec_result:
0476 return_type = exec_result["type"]
0477 exec_result = exec_result["content"]
0478
0479
0480 if exec_result in [True, False]:
0481 exec_result = str(exec_result)
0482
0483
0484 if new_api:
0485 if json_app:
0486 exec_result = json.dumps(exec_result, default=encode_special_cases)
0487 elif not isinstance(exec_result, str):
0488 exec_result = str(exec_result)
0489
0490 except Exception as exc:
0491 tmp_log.error(f"execution failure : {str(exc)}\n {traceback.format_exc()}")
0492 if hasattr(panda_config, "dumpBadRequest") and panda_config.dumpBadRequest:
0493 try:
0494 with tempfile.NamedTemporaryFile(delete=False, prefix="req_dump_") as file_object:
0495 environ["WSGI_INPUT_DUMP"] = file_object.name
0496 file_object.write(body)
0497 os.chmod(file_object.name, 0o775)
0498 except Exception:
0499 tmp_log.error(traceback.format_exc())
0500 pass
0501 error_string = "\n".join(f"{tmp_key} : {str(tmp_value)}" for tmp_key, tmp_value in environ.items())
0502 tmp_log.error(error_string)
0503
0504
0505 start_response("500 INTERNAL SERVER ERROR", [("Content-Type", "text/plain")])
0506
0507 if isinstance(exc, OSError):
0508 tmp_log.warning("force restart due")
0509 os.kill(os.getpid(), signal.SIGINT)
0510
0511 return [str(exc).encode()]
0512
0513 if panda_config.entryVerbose:
0514 tmp_log.debug("done")
0515
0516
0517 duration = naive_utcnow() - start_time
0518 tmp_log.info(
0519 f"exec_time={duration.seconds}.{duration.microseconds // 1000:03d} sec, return_type={return_type} real_type={type(exec_result).__name__} len={len(str(exec_result))} B"
0520 )
0521
0522
0523 if exec_result == pandaserver.taskbuffer.ErrorCode.EC_NotFound:
0524 start_response("404 Not Found", [("Content-Type", "text/plain")])
0525 return ["not found".encode()]
0526
0527 if exec_result == pandaserver.taskbuffer.ErrorCode.EC_Forbidden:
0528 start_response("403 Forbidden", [("Content-Type", "text/plain")])
0529 return ["forbidden".encode()]
0530
0531 if return_type == "json":
0532 start_response("200 OK", [("Content-Type", "application/json")])
0533 else:
0534 start_response("200 OK", [("Content-Type", "text/plain")])
0535
0536 if isinstance(exec_result, str):
0537 exec_result = exec_result.encode()
0538
0539 return [exec_result]