Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-10 08:39:02

0001 #!/usr/bin/python
0002 
0003 """
0004 entry point
0005 
0006 """
0007 
0008 import datetime
0009 import decimal
0010 import gzip
0011 import io
0012 import json
0013 import os
0014 import signal
0015 import sys
0016 import tempfile
0017 import traceback
0018 from collections import defaultdict
0019 from urllib.parse import parse_qsl
0020 
0021 from pandacommon.pandalogger.LogWrapper import LogWrapper
0022 from pandacommon.pandalogger.PandaLogger import PandaLogger
0023 from pandacommon.pandautils.PandaUtils import naive_utcnow
0024 from pandacommon.pandautils.thread_utils import GenericThread
0025 from werkzeug.datastructures import CombinedMultiDict, EnvironHeaders
0026 from werkzeug.formparser import parse_form_data
0027 
0028 import pandaserver.taskbuffer.ErrorCode
0029 from pandaserver.api.v1 import credential_management_api as cred_api_v1
0030 from pandaserver.api.v1 import data_carousel_api as data_carousel_api_v1
0031 from pandaserver.api.v1 import event_api as event_api_v1
0032 from pandaserver.api.v1 import file_server_api as file_server_api_v1
0033 from pandaserver.api.v1 import harvester_api as harvester_api_v1
0034 from pandaserver.api.v1 import idds_api as idds_api_v1
0035 from pandaserver.api.v1 import job_api as job_api_v1
0036 from pandaserver.api.v1 import metaconfig_api as metaconfig_api_v1
0037 from pandaserver.api.v1 import pilot_api as pilot_api_v1
0038 from pandaserver.api.v1 import statistics_api as statistics_api_v1
0039 from pandaserver.api.v1 import system_api as system_api_v1
0040 from pandaserver.api.v1 import task_api as task_api_v1
0041 from pandaserver.api.v1 import workflow_api as workflow_api_v1
0042 from pandaserver.api.v1.common import extract_allowed_methods
0043 from pandaserver.config import panda_config
0044 
0045 # pylint: disable=W0611
0046 from pandaserver.jobdispatcher.JobDispatcher import (
0047     ackCommands,
0048     checkEventsAvailability,
0049     checkJobStatus,
0050     get_access_token,
0051     get_events_status,
0052     get_max_worker_id,
0053     get_token_key,
0054     getCommands,
0055     getEventRanges,
0056     getJob,
0057     getKeyPair,
0058     getProxy,
0059     getResourceTypes,
0060     getStatus,
0061     jobDispatcher,
0062     updateEventRange,
0063     updateEventRanges,
0064     updateJob,
0065     updateJobsInBulk,
0066     updateWorkerPilotStatus,
0067 )
0068 from pandaserver.srvcore import CoreUtils
0069 
0070 # IMPORTANT: Add any new methods here to allow them to be called from the web I/F
0071 from pandaserver.srvcore.allowed_methods import allowed_methods
0072 from pandaserver.srvcore.panda_request import PandaRequest
0073 from pandaserver.taskbuffer.Initializer import initializer
0074 from pandaserver.taskbuffer.TaskBuffer import taskBuffer
0075 
0076 # pylint: disable=W0611
0077 from pandaserver.taskbuffer.Utils import (
0078     delete_checkpoint,
0079     deleteFile,
0080     fetchLog,
0081     getAttr,
0082     getServer,
0083     getVomsAttr,
0084     isAlive,
0085     put_checkpoint,
0086     put_file_recovery_request,
0087     put_workflow_request,
0088     putEventPickingRequest,
0089     putFile,
0090     touchFile,
0091     updateLog,
0092     uploadLog,
0093 )
0094 from pandaserver.userinterface import Client
0095 
0096 # pylint: disable=W0611
0097 from pandaserver.userinterface.UserIF import (
0098     addHarvesterDialogs,
0099     avalancheTask,
0100     changeTaskAttributePanda,
0101     changeTaskModTimePanda,
0102     changeTaskPriority,
0103     changeTaskSplitRulePanda,
0104     checkSandboxFile,
0105     enableJumboJobs,
0106     execute_idds_workflow_command,
0107     finishTask,
0108     get_ban_users,
0109     get_files_in_datasets,
0110     get_job_statistics_per_site_label_resource,
0111     get_user_secrets,
0112     getFullJobStatus,
0113     getJediTaskDetails,
0114     getJediTasksInTimeRange,
0115     getJobStatisticsPerSite,
0116     getJobStatus,
0117     getJumboJobDatasets,
0118     getPandaIDsWithTaskID,
0119     getScriptOfflineRunning,
0120     getTaskParamsMap,
0121     getTaskStatus,
0122     getUserJobMetadata,
0123     getWorkerStats,
0124     harvesterIsAlive,
0125     increaseAttemptNrPanda,
0126     insertSandboxFileInfo,
0127     insertTaskParams,
0128     killJobs,
0129     killTask,
0130     killUnfinishedJobs,
0131     pauseTask,
0132     reactivateTask,
0133     reassignJobs,
0134     reassignShare,
0135     reassignTask,
0136     relay_idds_command,
0137     release_task,
0138     reloadInput,
0139     reportWorkerStats_jobtype,
0140     resumeTask,
0141     retryTask,
0142     send_command_to_job,
0143     set_user_secret,
0144     setDebugMode,
0145     setNumSlotsForWP,
0146     submitJobs,
0147     sweepPQ,
0148     updateServiceMetrics,
0149     updateWorkers,
0150     userIF,
0151 )
0152 
0153 _logger = PandaLogger().getLogger("Entry")
0154 
0155 LATEST = "1"
0156 
0157 # generate the allowed methods dynamically with all function names present in the API modules,
0158 # excluding functions imported from other modules or the init_task_buffer function
0159 cred_api_v1_methods = extract_allowed_methods(cred_api_v1)
0160 data_carousel_api_v1_methods = extract_allowed_methods(data_carousel_api_v1)
0161 event_api_v1_methods = extract_allowed_methods(event_api_v1)
0162 file_server_api_v1_methods = extract_allowed_methods(file_server_api_v1)
0163 harvester_api_v1_methods = extract_allowed_methods(harvester_api_v1)
0164 idds_api_v1_methods = extract_allowed_methods(idds_api_v1)
0165 job_api_v1_methods = extract_allowed_methods(job_api_v1)
0166 metaconfig_api_v1_methods = extract_allowed_methods(metaconfig_api_v1)
0167 pilot_api_v1_methods = extract_allowed_methods(pilot_api_v1)
0168 statistics_api_v1_methods = extract_allowed_methods(statistics_api_v1)
0169 system_api_v1_methods = extract_allowed_methods(system_api_v1)
0170 task_api_v1_methods = extract_allowed_methods(task_api_v1)
0171 workflow_api_v1_methods = extract_allowed_methods(workflow_api_v1)
0172 
0173 # initialize oracledb using dummy connection
0174 initializer.init()
0175 
0176 # initialize TaskBuffer
0177 requester_id = GenericThread().get_full_id(__name__, sys.modules[__name__].__file__)
0178 taskBuffer.init(
0179     panda_config.dbhost,
0180     panda_config.dbpasswd,
0181     nDBConnection=panda_config.nDBConnection,
0182     useTimeout=True,
0183     requester=requester_id,
0184 )
0185 
0186 if panda_config.nDBConnection != 0:
0187     # initialize all the API modules
0188     cred_api_v1.init_task_buffer(taskBuffer)
0189     data_carousel_api_v1.init_task_buffer(taskBuffer)
0190     event_api_v1.init_task_buffer(taskBuffer)
0191     file_server_api_v1.init_task_buffer(taskBuffer)
0192     harvester_api_v1.init_task_buffer(taskBuffer)
0193     # IDDS API does not need to be initialized. idds_server_api_v1.init_task_buffer(taskBuffer)
0194     job_api_v1.init_task_buffer(taskBuffer)
0195     metaconfig_api_v1.init_task_buffer(taskBuffer)
0196     pilot_api_v1.init_task_buffer(taskBuffer)
0197     statistics_api_v1.init_task_buffer(taskBuffer)
0198     # System API does not need to be initialized. system_api_v1.init_task_buffer(taskBuffer)
0199     task_api_v1.init_task_buffer(taskBuffer)
0200     workflow_api_v1.init_task_buffer(taskBuffer)
0201 
0202     # initialize JobDispatcher
0203     jobDispatcher.init(taskBuffer)
0204 
0205     # initialize UserIF
0206     userIF.init(taskBuffer)
0207 
0208 # ban list
0209 if panda_config.nDBConnection != 0:
0210     # get ban list directly from the database
0211     ban_user_list = CoreUtils.CachedObject("ban_list", 600, taskBuffer.get_ban_users, _logger)
0212 else:
0213     # get ban list from remote
0214     ban_user_list = CoreUtils.CachedObject("ban_list", 600, Client.get_banned_users, _logger)
0215 
0216 
0217 def pre_validate_request(panda_request):
0218     # check authentication
0219     if not panda_request.authenticated:
0220         error_message = f"Token authentication failed. {panda_request.message}"
0221         return error_message
0222 
0223     # check list of banned users
0224     username = panda_request.subprocess_env.get("SSL_CLIENT_S_DN", None)
0225     if username:
0226         username = CoreUtils.clean_user_id(username)
0227         if username in ban_user_list:
0228             error_message = f"{username} is banned"
0229             return error_message
0230 
0231     return None
0232 
0233 
0234 def read_body(environ, content_length):
0235     # read body contents
0236     body = b""
0237     while content_length > 0:
0238         chunk = environ["wsgi.input"].read(min(content_length, 1024 * 1024))
0239         if not chunk:
0240             break
0241         content_length -= len(chunk)
0242         body += chunk
0243     if content_length > 0:
0244         # OSError is caught in the main function and forces killing the process
0245         raise OSError(f"partial read from client. {content_length} bytes remaining")
0246 
0247     return body
0248 
0249 
0250 def parse_qsl_parameters(environ, body, request_method):
0251     # parse parameters for non-json requests
0252     environ["wsgi.input"] = io.BytesIO(body)
0253     environ["CONTENT_LENGTH"] = str(len(body))
0254     environ["wsgi.headers"] = EnvironHeaders(environ)
0255 
0256     # In the case of GET, HEAD methods we need to parse the query string list in the URL looking for parameters
0257     if request_method in ["GET", "HEAD"]:
0258         # Parse the query string list in the URL looking for parameters. Repeated query parameters submitted multiple times will be appended to a list
0259         results_tmp = defaultdict(list)
0260         parameter_list = parse_qsl(environ.get("QUERY_STRING", ""), keep_blank_values=True)
0261         for key, value in parameter_list:
0262             results_tmp[key].append(value)
0263 
0264         params = {key: values[0] if len(values) == 1 else values for key, values in results_tmp.items()}
0265 
0266     # In the case of POST, PUT methods we need to parse the form data
0267     else:
0268         # Parse form data. Combine the form (string fields) and the files (file uploads) into a single object
0269         _, form, files = parse_form_data(environ)
0270 
0271         # Combine the form and files into a single dictionary
0272         params = dict(CombinedMultiDict([form, files]))
0273     return params
0274 
0275 
0276 def parse_json_parameters_legacy(body):
0277     # parse parameters for json requests
0278     # decompress the body, this was done without checking the content encoding
0279     body = gzip.decompress(body)
0280 
0281     # de-serialize the body and patch for True/False
0282     params = json.loads(body)
0283     for key in list(params):
0284         if params[key] is True:
0285             params[key] = "True"
0286         elif params[key] is False:
0287             params[key] = "False"
0288     return params
0289 
0290 
0291 def parse_json_parameters(body, content_encoding):
0292     # parse parameters for json requests
0293     # decompress the body if necessary
0294     if content_encoding == "gzip":
0295         body = gzip.decompress(body)
0296 
0297     # de-serialize the body
0298     params = json.loads(body)
0299 
0300     return params
0301 
0302 
0303 def parse_parameters(api_module, json_app, json_body, content_encoding, environ, body, request_method):
0304     # parse parameters with the new refactored API
0305     if is_new_api(api_module):
0306         # the request specifies json and it's a PUT/POST request with the data in the body
0307         if json_body:
0308             return parse_json_parameters(body, content_encoding)
0309         else:
0310             return parse_qsl_parameters(environ, body, request_method)
0311 
0312     # parse parameters conserving the legacy API logic
0313     else:
0314         # parse parameters for json requests with the legacy API, even for GET requests
0315         if json_app:
0316             return parse_json_parameters_legacy(body)
0317         # parse parameters for non-json requests
0318         else:
0319             return parse_qsl_parameters(environ, body, request_method)
0320 
0321 
0322 def is_new_api(api_module):
0323     return api_module != "panda"
0324 
0325 
0326 def parse_script_name(environ):
0327     method_name = ""
0328     api_module = ""
0329     version = "v0"
0330 
0331     if "SCRIPT_NAME" in environ:
0332         script_name = environ["SCRIPT_NAME"]
0333         fields = script_name.split("/")
0334 
0335         # Legacy API: /server/panda/<method>
0336         if script_name.startswith("/server/panda/") and len(fields) == 4:
0337             api_module = "panda"
0338             method_name = fields[-1]
0339 
0340         # Refactored API: /api/<version>/<module>/<method>
0341         elif script_name.startswith("/api/") and len(fields) == 5:
0342             method_name = fields[-1]
0343             api_module = fields[-2]
0344             version = fields[-3]
0345             if version == "latest":
0346                 version = LATEST
0347 
0348         else:
0349             _logger.error(f"Could not parse script name: {script_name}")
0350 
0351     return method_name, api_module, version
0352 
0353 
0354 def module_mapping(version, api_module):
0355     mapping = {
0356         "v0": {"panda": {"module": None, "allowed_methods": allowed_methods}},  # legacy API uses globals instead of a particular module
0357         "v1": {
0358             "creds": {"module": cred_api_v1, "allowed_methods": cred_api_v1_methods},
0359             "data_carousel": {"module": data_carousel_api_v1, "allowed_methods": data_carousel_api_v1_methods},
0360             "event": {"module": event_api_v1, "allowed_methods": event_api_v1_methods},
0361             "file_server": {"module": file_server_api_v1, "allowed_methods": file_server_api_v1_methods},
0362             "harvester": {"module": harvester_api_v1, "allowed_methods": harvester_api_v1_methods},
0363             "idds": {"module": idds_api_v1, "allowed_methods": idds_api_v1_methods},
0364             "job": {"module": job_api_v1, "allowed_methods": job_api_v1_methods},
0365             "metaconfig": {"module": metaconfig_api_v1, "allowed_methods": metaconfig_api_v1_methods},
0366             "pilot": {"module": pilot_api_v1, "allowed_methods": pilot_api_v1_methods},
0367             "statistics": {"module": statistics_api_v1, "allowed_methods": statistics_api_v1_methods},
0368             "system": {"module": system_api_v1, "allowed_methods": system_api_v1_methods},
0369             "task": {"module": task_api_v1, "allowed_methods": task_api_v1_methods},
0370             "workflow": {"module": workflow_api_v1, "allowed_methods": workflow_api_v1_methods},
0371         },
0372     }
0373     try:
0374         return mapping[version][api_module]
0375     except KeyError:
0376         _logger.error(f"Could not find module {api_module} in API version {version}")
0377         return None
0378 
0379 
0380 def validate_method(method_name, api_module, version):
0381     # We are in the refactored API and the method is not in the specific allowed list
0382     mapping = module_mapping(version, api_module)
0383     if mapping and method_name in mapping["allowed_methods"]:
0384         return True
0385 
0386     return False
0387 
0388 
0389 # Encoder: convert datetime → ISO string with a marker
0390 def encode_special_cases(obj):
0391     if isinstance(obj, datetime.datetime):
0392         return {"__datetime__": obj.isoformat()}
0393     if isinstance(obj, decimal.Decimal):
0394         if obj == obj.to_integral_value():
0395             return int(obj)
0396         else:
0397             return float(obj)
0398     raise TypeError(f"Type not serializable for {obj} ({type(obj)})")
0399 
0400 
0401 # This is the starting point for all WSGI requests
0402 def application(environ, start_response):
0403     # Parse the script name to retrieve method, module and version
0404     method_name, api_module, version = parse_script_name(environ)
0405 
0406     tmp_log = LogWrapper(_logger, f"PID={os.getpid()} module={api_module} method={method_name} version={version}", seeMem=True)
0407     cont_length = int(environ.get("CONTENT_LENGTH", 0))
0408     request_method = environ.get("REQUEST_METHOD", None)  # GET, POST, PUT, DELETE
0409 
0410     # see if we are on the new or old APIs
0411     new_api = is_new_api(api_module)
0412 
0413     # json app means the content type is application/json,
0414     # while json body requires additionally to be a PUT or POST request, where the body is json encoded
0415     if new_api:
0416         json_app = environ.get("CONTENT_TYPE", None) == "application/json" or environ.get("HTTP_ACCEPT", None) == "application/json"
0417     else:
0418         json_app = environ.get("CONTENT_TYPE", None) == "application/json"
0419     json_body = environ.get("CONTENT_TYPE", None) == "application/json" and request_method in ["PUT", "POST"]
0420 
0421     # Content encoding specifies whether the body is compressed through gzip or others.
0422     # No encoding usually means the body is not compressed
0423     content_encoding = environ.get("HTTP_CONTENT_ENCODING")
0424 
0425     tmp_log.debug(f"""start content-length={cont_length} json={json_app} origin={environ.get("HTTP_ORIGIN", None)}""")
0426 
0427     start_time = naive_utcnow()
0428     return_type = None
0429 
0430     # check method name is allowed, otherwise return 403
0431     if not validate_method(method_name, api_module, version):
0432         error_message = f"method {method_name} is forbidden"
0433         tmp_log.error(error_message)
0434         start_response("403 Forbidden", [("Content-Type", "text/plain")])
0435         return [f"ERROR : {error_message}".encode()]
0436 
0437     # get the method object to be executed
0438     try:
0439         if new_api:
0440             module = module_mapping(version, api_module)["module"]
0441             tmp_method = getattr(module, method_name)
0442         else:
0443             tmp_method = globals()[method_name]
0444     except Exception:
0445         error_message = f"method {method_name} is undefined in {api_module} {version}"
0446         tmp_log.error(error_message)
0447         start_response("500 INTERNAL SERVER ERROR", [("Content-Type", "text/plain")])
0448         return ["ERROR : {error_message}".encode()]
0449 
0450     try:
0451         # generate a request object with the environment and the logger
0452         panda_request = PandaRequest(environ, tmp_log)
0453 
0454         # pre-validate the request
0455         error_message = pre_validate_request(panda_request)
0456         if error_message:
0457             tmp_log.error(error_message)
0458             start_response("403 Forbidden", [("Content-Type", "text/plain")])
0459             return [f"ERROR : {error_message}".encode()]
0460 
0461         # read the body of the request
0462         body = read_body(environ, cont_length)
0463 
0464         # parse the parameters
0465         params = parse_parameters(api_module, json_app, json_body, content_encoding, environ, body, request_method)
0466 
0467         if panda_config.entryVerbose:
0468             tmp_log.debug(f"with {str(list(params))}")
0469 
0470         # execute the method, passing along the request and the decoded parameters
0471         param_list = [panda_request]
0472         exec_result = tmp_method(*param_list, **params)
0473 
0474         # extract return type
0475         if isinstance(exec_result, dict) and "type" in exec_result and "content" in exec_result:
0476             return_type = exec_result["type"]
0477             exec_result = exec_result["content"]
0478 
0479         # convert bool to string
0480         if exec_result in [True, False]:
0481             exec_result = str(exec_result)
0482 
0483         # convert the response to JSON or str depending on HTTP_ACCEPT and CONTENT_TYPE
0484         if new_api:
0485             if json_app:
0486                 exec_result = json.dumps(exec_result, default=encode_special_cases)
0487             elif not isinstance(exec_result, str):
0488                 exec_result = str(exec_result)
0489 
0490     except Exception as exc:
0491         tmp_log.error(f"execution failure : {str(exc)}\n {traceback.format_exc()}")
0492         if hasattr(panda_config, "dumpBadRequest") and panda_config.dumpBadRequest:
0493             try:
0494                 with tempfile.NamedTemporaryFile(delete=False, prefix="req_dump_") as file_object:
0495                     environ["WSGI_INPUT_DUMP"] = file_object.name
0496                     file_object.write(body)
0497                     os.chmod(file_object.name, 0o775)
0498             except Exception:
0499                 tmp_log.error(traceback.format_exc())
0500                 pass
0501         error_string = "\n".join(f"{tmp_key} : {str(tmp_value)}" for tmp_key, tmp_value in environ.items())
0502         tmp_log.error(error_string)
0503 
0504         # return internal server error
0505         start_response("500 INTERNAL SERVER ERROR", [("Content-Type", "text/plain")])
0506         # force kill to release memory
0507         if isinstance(exc, OSError):
0508             tmp_log.warning("force restart due")
0509             os.kill(os.getpid(), signal.SIGINT)
0510 
0511         return [str(exc).encode()]
0512 
0513     if panda_config.entryVerbose:
0514         tmp_log.debug("done")
0515 
0516     # log execution time and return length
0517     duration = naive_utcnow() - start_time
0518     tmp_log.info(
0519         f"exec_time={duration.seconds}.{duration.microseconds // 1000:03d} sec, return_type={return_type} real_type={type(exec_result).__name__} len={len(str(exec_result))} B"
0520     )
0521 
0522     # start the response and return result
0523     if exec_result == pandaserver.taskbuffer.ErrorCode.EC_NotFound:
0524         start_response("404 Not Found", [("Content-Type", "text/plain")])
0525         return ["not found".encode()]
0526 
0527     if exec_result == pandaserver.taskbuffer.ErrorCode.EC_Forbidden:
0528         start_response("403 Forbidden", [("Content-Type", "text/plain")])
0529         return ["forbidden".encode()]
0530 
0531     if return_type == "json":
0532         start_response("200 OK", [("Content-Type", "application/json")])
0533     else:
0534         start_response("200 OK", [("Content-Type", "text/plain")])
0535 
0536     if isinstance(exec_result, str):
0537         exec_result = exec_result.encode()
0538 
0539     return [exec_result]