File indexing completed on 2026-04-10 08:39:00
0001 import sys
0002 import traceback
0003 from typing import Dict, List
0004
0005 from pandacommon.pandalogger.LogWrapper import LogWrapper
0006 from pandacommon.pandalogger.PandaLogger import PandaLogger
0007
0008 from pandaserver.api.v1.common import (
0009 TimedMethod,
0010 extract_primary_production_working_group,
0011 extract_production_working_groups,
0012 generate_response,
0013 get_dn,
0014 get_email_address,
0015 get_fqan,
0016 has_production_role,
0017 request_validation,
0018 )
0019 from pandaserver.jobdispatcher import Protocol
0020 from pandaserver.srvcore.panda_request import PandaRequest
0021 from pandaserver.taskbuffer import JobUtils
0022 from pandaserver.taskbuffer.TaskBuffer import TaskBuffer
0023
0024 _logger = PandaLogger().getLogger("api_job")
0025
0026 global_task_buffer = None
0027
0028
0029 def init_task_buffer(task_buffer: TaskBuffer) -> None:
0030 """
0031 Initialize the task buffer. This method needs to be called before any other method in this module.
0032 """
0033 global global_task_buffer
0034 global_task_buffer = task_buffer
0035
0036
0037 @request_validation(_logger, secure=True, request_method="GET")
0038 def get_status(req: PandaRequest, job_ids: List[int], timeout: int = 60) -> Dict:
0039 """
0040 Get status of a job.
0041
0042 Gets the status for a job and command to the pilot if any. Requires a secure connection.
0043
0044 API details:
0045 HTTP Method: GET
0046 Path: /v1/job/get_status
0047
0048 Args:
0049 req(PandaRequest): internally generated request object containing the env variables
0050 job_ids(list of int): list of PanDA job IDs.
0051 timeout(int, optional): The timeout value. Defaults to 60.
0052
0053 Returns:
0054 dict: The system response `{"success": success, "message": message, "data": data}`. When successful, the data field contains a list of tupes with (status, command). When unsuccessful, the message field contains the error message and data an error code.
0055 """
0056 tmp_logger = LogWrapper(_logger, f"get_status job_ids={job_ids} timeout={timeout}")
0057 tmp_logger.debug("Start")
0058
0059
0060 job_ids_str = ",".join([str(job_id) for job_id in job_ids])
0061 timed_method = TimedMethod(global_task_buffer.checkJobStatus, timeout)
0062 timed_method.run(job_ids_str)
0063
0064
0065 if timed_method.result == Protocol.TimeOutToken:
0066 tmp_logger.debug("Timed out")
0067 return generate_response(False, message="time out", data={"code": Protocol.SC_TimeOut})
0068
0069
0070 if not isinstance(timed_method.result, list):
0071 tmp_logger.debug(f"Failed")
0072 return generate_response(False, message="failed", data={"code": Protocol.SC_Failed})
0073
0074
0075 data = timed_method.result
0076 tmp_logger.debug(f"Done with data={data}")
0077 return generate_response(True, data=data)
0078
0079
0080 @request_validation(_logger, secure=True)
0081 def get_description(self, job_ids: List[int]) -> Dict:
0082 """
0083 Get description of a job.
0084
0085 Gets the description of a job from the main/active schema. The description includes job attributes, job parameters and related file attributes. Requires a secure connection.
0086
0087 API details:
0088 HTTP Method: GET
0089 Path: /v1/job/get_description
0090
0091 Args:
0092 req(PandaRequest): internally generated request object containing the env variables
0093 job_ids (list of int): List of PanDA job IDs.
0094 timeout (int, optional): The timeout value. Defaults to 60.
0095
0096 Returns:
0097 dict: The system response `{"success": success, "message": message, "data": data}`. When successful, the data field contains a list with job descriptions. When unsuccessful, the message field contains the error message and data an error code.
0098 """
0099 tmp_logger = LogWrapper(_logger, f"get_description")
0100 tmp_logger.debug("Start")
0101
0102 try:
0103 tmp_logger.debug(f"Number of requested PanDA IDs: {len(job_ids)}")
0104 max_ids = 5500
0105 if len(job_ids) > max_ids:
0106 tmp_logger.error(f"List of PanDA IDs is longer than {max_ids}. Truncating the list.")
0107 job_ids = job_ids[:max_ids]
0108 except Exception:
0109 error_type, error_value, _ = sys.exc_info()
0110 tmp_logger.error(f"Failed deserializing the list of PanDA IDs: {error_type} {error_value}")
0111 job_ids = []
0112
0113 tmp_logger.debug(f"Retrieving data for {job_ids}")
0114 ret = [job.to_dict_advanced() if job is not None else None for job in global_task_buffer.peekJobs(job_ids)]
0115 _logger.debug("Done")
0116
0117 return generate_response(True, data=ret)
0118
0119
0120 @request_validation(_logger, secure=True)
0121 def get_description_incl_archive(req: PandaRequest, job_ids: List[int]) -> Dict:
0122 """
0123 Get description of a job.
0124
0125 Gets the description of a job, also looking into the secondary/archive schema. The description includes job attributes, job parameters and related file attributes. Requires a secure connection.
0126
0127 API details:
0128 HTTP Method: GET
0129 Path: /v1/job/get_description_incl_archive
0130
0131 Args:
0132 req(PandaRequest): internally generated request object containing the env variables.
0133 job_ids (list of int): List of PanDA job IDs.
0134 timeout (int, optional): The timeout value. Defaults to 60.
0135
0136 Returns:
0137 dict: The system response `{"success": success, "message": message, "data": data}`. When successful, the data field contains a list with job descriptions. When unsuccessful, the message field contains the error message and data an error code.
0138 """
0139 tmp_logger = LogWrapper(_logger, f"get_description_including_archive")
0140 tmp_logger.debug("Start")
0141
0142 try:
0143 tmp_logger.debug(f"Number of requested PanDA IDs: {len(job_ids)}")
0144 max_ids = 5500
0145 if len(job_ids) > max_ids:
0146 tmp_logger.error(f"List of PanDA IDs is longer than {max_ids}. Truncating the list.")
0147 job_ids = job_ids[:max_ids]
0148 except Exception:
0149 error_type, error_value, _ = sys.exc_info()
0150 tmp_logger.error(f"Failed deserializing the list of PanDA IDs: {error_type} {error_value}")
0151 job_ids = []
0152
0153 tmp_logger.debug(f"Retrieving data for {str(job_ids)}")
0154
0155 ret = [job.to_dict_advanced() if job is not None else None for job in global_task_buffer.getFullJobStatus(job_ids)]
0156
0157 tmp_logger.debug("getFullJobStatus end")
0158 return generate_response(True, data=ret)
0159
0160
0161 @request_validation(_logger, secure=False, request_method="GET")
0162 def generate_offline_execution_script(req: PandaRequest, job_id: int, days: int = None) -> Dict:
0163 """
0164 Get execution script for a job.
0165
0166 Gets the execution script for a job, including Rucio download of input, ALRB setup, downloading transformation script and running the script. Requires a secure connection.
0167
0168 API details:
0169 HTTP Method: GET
0170 Path: /v1/job/generate_offline_execution_script
0171
0172 Args:
0173 req(PandaRequest): internally generated request object containing the env variables
0174 job_id (int): PanDA job ID
0175 timeout (int, optional): The timeout value. Defaults to 60.
0176
0177 Returns:
0178 dict: The system response `{"success": success, "message": message, "data": data}`. When successful, the data field will contain `{"script": script}`. When unsuccessful, the message field contains the error message and data an error code.
0179 """
0180 tmp_logger = LogWrapper(_logger, f"generate_offline_execution_script job_id={job_id} days={days}")
0181 tmp_logger.debug("Start")
0182 script = global_task_buffer.getScriptOfflineRunning(job_id, days)
0183
0184 if script.startswith("ERROR"):
0185 tmp_logger.debug(f"Failed to generate script: {script}")
0186 return script
0187
0188 tmp_logger.debug("Done")
0189 return script
0190
0191
0192 @request_validation(_logger, secure=True, request_method="GET")
0193 def get_metadata_for_analysis_jobs(req: PandaRequest, task_id: int) -> Dict:
0194 """
0195 Get metadata for analysis jobs
0196
0197 Gets the metadata from the metatable for analysis jobs in `finished` status. Requires a secure connection.
0198
0199 API details:
0200 HTTP Method: GET
0201 Path: /v1/job/get_metadata_for_analysis_jobs
0202
0203 Args:
0204 req(PandaRequest): internally generated request object containing the env variables
0205 task_id (int): JEDI task ID
0206
0207 Returns:
0208 dict: The system response `{"success": success, "message": message, "data": data}`. When successful, the data field contains the medata. When unsuccessful, the message field contains the error message and data an error code.
0209 """
0210
0211 tmp_logger = LogWrapper(_logger, f"get_metadata_for_analysis_jobs task_id={task_id}")
0212 tmp_logger.debug("Start")
0213
0214 metadata = global_task_buffer.getUserJobMetadata(task_id)
0215 if not metadata:
0216 tmp_logger.debug("No metadata found")
0217 return generate_response(True, message="No metadata found", data={})
0218
0219 tmp_logger.debug("Done")
0220 return generate_response(True, data=metadata)
0221
0222
0223 @request_validation(_logger, secure=True, request_method="POST")
0224 def kill(req, job_ids: List[int], code: int = None, use_email_as_id: bool = False, kill_options: List[str] = []):
0225 """
0226 Kill the jobs
0227
0228 Kills the jobs with the given PanDA IDs. Requires a secure connection.
0229
0230 API details:
0231 HTTP Method: POST
0232 Path: /v1/job/kill
0233
0234 Args:
0235 req(PandaRequest): internally generated request object containing the env variables
0236 job_ids (list): List of PanDA job IDs
0237 code (int, optional): The kill code. Defaults to None.
0238 ```
0239 code
0240 2: expire
0241 3: aborted
0242 4: expire in waiting
0243 7: retry by server
0244 8: rebrokerage
0245 9: force kill
0246 10: fast rebrokerage in overloaded PQ
0247 50: kill by JEDI
0248 51: reassigned by JEDI
0249 52: force kill by JEDI
0250 55: killed since task is (almost) done
0251 60: workload was terminated by the pilot without actual work
0252 91: kill user jobs with prod role
0253 99: force kill user jobs with prod role
0254 ```
0255 use_email_as_id (bool, optional): Use the email as ID. Defaults to False.
0256 kill_options (List, optional): Defaults to []. Possible options are: `keepUnmerged`, `jobSubStatus=xyz`
0257
0258 Returns:
0259 dict: The system response `{"success": success, "message": message, "data": data}`. The data field contains a list of bools indicating the success of the kill operations.
0260 """
0261
0262
0263 user = get_dn(req)
0264 is_production_manager = has_production_role(req)
0265 fqans = get_fqan(req)
0266
0267 tmp_logger = LogWrapper(_logger, f"kill")
0268 tmp_logger.debug(f"Start user: {user} code: {code} is_production_manager: {is_production_manager} fqans: {fqans} job_ids: {job_ids}")
0269
0270
0271 if use_email_as_id:
0272 email = get_email_address(user)
0273 if email:
0274 user = email
0275
0276
0277 wg_prod_roles = extract_production_working_groups(fqans)
0278
0279
0280 ret = global_task_buffer.killJobs(job_ids, user, code, is_production_manager, wg_prod_roles, kill_options)
0281 tmp_logger.debug(f"Done with ret: {ret}")
0282 return generate_response(True, data=ret)
0283
0284
0285 @request_validation(_logger, secure=True, request_method="POST")
0286 def reassign(req: PandaRequest, job_ids: List[int]):
0287 """
0288 Reassign a list of jobs
0289
0290 Reassigns a list of jobs. Requires a secure connection.
0291
0292 API details:
0293 HTTP Method: POST
0294 Path: /v1/job/reassign
0295
0296 Args:
0297 req(PandaRequest): internally generated request object containing the env variables
0298 job_ids (list): List of PanDA job IDs
0299
0300 Returns:
0301 dict: The system response `{"success": True}`.
0302 """
0303
0304
0305 tmp_logger = LogWrapper(_logger, f"reassign job_ids={job_ids}")
0306 tmp_logger.debug("Start")
0307
0308 global_task_buffer.reassignJobs(job_ids)
0309 tmp_logger.debug(f"Done")
0310 return generate_response(True)
0311
0312
0313 @request_validation(_logger, secure=True, production=True, request_method="POST")
0314 def set_command(req: PandaRequest, job_id: int, command: str):
0315 """
0316 Set a pilot command
0317
0318 Sets a command to the pilot for a job. Requires a secure connection and production role.
0319
0320 API details:
0321 HTTP Method: POST
0322 Path: /v1/job/set_command
0323
0324 Args:
0325 req(PandaRequest): internally generated request object containing the env variables
0326 job_id (int): PanDA job ID
0327 command (str): The command for the pilot, e.g. `tobekilled`
0328
0329 Returns:
0330 dict: The system response `{"success": success, "message": message}`.
0331 """
0332 tmp_logger = LogWrapper(_logger, f"set_command job_id={job_id} command={command}")
0333 tmp_logger.debug("Start")
0334 success, message = global_task_buffer.send_command_to_job(job_id, command)
0335 tmp_logger.debug("Done with success={success} message={message}")
0336 return generate_response(success, message=message)
0337
0338
0339 @request_validation(_logger, secure=True, production=True, request_method="POST")
0340 def set_debug_mode(req: PandaRequest, job_id: int, mode: bool):
0341 """
0342 Set the debug mode
0343
0344 Sets the debug mode for a job. Requires a secure connection and production role.
0345
0346 API details:
0347 HTTP Method: POST
0348 Path: /v1/job/set_debug_mode
0349
0350 Args:
0351 req(PandaRequest): internally generated request object containing the env variables
0352 job_id (int): PanDA job ID
0353 mode (bool): True to set debug mode, False to unset debug mode
0354
0355 Returns:
0356 dict: The system response `{"success": success, "message": message}`.
0357 """
0358
0359 tmp_logger = LogWrapper(_logger, f"set_debug_mode job_id={job_id}")
0360
0361 user = get_dn(req)
0362 is_production_manager = has_production_role(req)
0363 fqans = get_fqan(req)
0364
0365
0366 working_group = extract_primary_production_working_group(fqans)
0367
0368 tmp_logger.debug(f"Start user={user} mgr={is_production_manager} wg={working_group} fqans={str(fqans)}")
0369
0370 message = global_task_buffer.setDebugMode(user, job_id, is_production_manager, mode, working_group)
0371
0372 success = False
0373 if message != "Succeeded":
0374 success = True
0375
0376 return generate_response(success, message=message)
0377
0378
0379 @request_validation(_logger, secure=True, request_method="POST")
0380 def submit(req: PandaRequest, jobs: str):
0381 """
0382 Submit jobs
0383
0384 Sets the debug mode for a job. Requires a secure connection.
0385
0386 API details:
0387 HTTP Method: POST
0388 Path: /v1/job/submit
0389
0390 Args:
0391 req(PandaRequest): internally generated request object containing the env variables
0392 jobs (str): JSON string with a list of job specs
0393
0394 Returns:
0395 dict: The system response `{"success": success, "message": message}`.
0396 """
0397 tmp_logger = LogWrapper(_logger, f"submit")
0398 user = get_dn(req)
0399 fqans = get_fqan(req)
0400 is_production_role = has_production_role(req)
0401 host = req.get_remote_host()
0402
0403
0404 try:
0405 jobs = JobUtils.load_jobs_json(jobs)
0406 tmp_logger.debug(f"{user} len:{len(jobs)} is_production_role={is_production_role} FQAN:{fqans}")
0407 max_jobs = 5000
0408 if len(jobs) > max_jobs:
0409 _logger.error(f"Number of jobs exceeded maximum {max_jobs}. Truncating the list.")
0410 jobs = jobs[:max_jobs]
0411 except Exception as ex:
0412 error_message = f"Failed to deserialize jobs: {str(ex)} {traceback.format_exc()}"
0413 tmp_logger.error(error_message)
0414 return generate_response(False, message=error_message)
0415
0416 if not jobs:
0417 return generate_response(False, message="No jobs were submitted")
0418
0419
0420 for tmp_job in jobs:
0421
0422 if tmp_job.prodSourceLabel in ["managed"] and not is_production_role:
0423 return generate_response(False, message=f"{user} is missing production for prodSourceLabel={tmp_job.prodSourceLabel} submission")
0424
0425
0426 if tmp_job.job_label not in [None, "", "NULL"] and tmp_job.job_label not in JobUtils.job_labels:
0427 return generate_response(False, message=f"job_label={tmp_job.job_label} is not valid")
0428
0429 sample_job = jobs[0]
0430
0431
0432 try:
0433 user_vo = "atlas"
0434 if sample_job.VO not in [None, "", "NULL"]:
0435 user_vo = sample_job.VO
0436 except (IndexError, AttributeError) as e:
0437 tmp_logger.error(f"User VO was not found and defaulted to {user_vo}. (Exception {e})")
0438
0439
0440 if user_vo == "atlas" and not fqans:
0441 tmp_logger.error(f"Proxy was missing FQANs for {user_vo}")
0442
0443
0444 if user_vo.lower() == "lsst":
0445 try:
0446 if sample_job.prodUserName and sample_job.prodUserName.lower() != "none":
0447 user = sample_job.prodUserName
0448 except AttributeError:
0449 tmp_logger.error(f"VO {user_vo} check: username not found in job parameters and defaulted to submitter ({user})")
0450
0451
0452 ret = global_task_buffer.storeJobs(jobs, user, fqans=fqans, hostname=host, userVO=user_vo)
0453 tmp_logger.debug(f"{user} -> {len(ret)}")
0454
0455
0456 if not ret:
0457 return generate_response(False, message="Failed to submit jobs")
0458
0459
0460 if isinstance(ret, str):
0461 return generate_response(False, message=ret)
0462
0463 return generate_response(True, data=ret)