Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-10 08:39:00

0001 import sys
0002 import traceback
0003 from typing import Dict, List
0004 
0005 from pandacommon.pandalogger.LogWrapper import LogWrapper
0006 from pandacommon.pandalogger.PandaLogger import PandaLogger
0007 
0008 from pandaserver.api.v1.common import (
0009     TimedMethod,
0010     extract_primary_production_working_group,
0011     extract_production_working_groups,
0012     generate_response,
0013     get_dn,
0014     get_email_address,
0015     get_fqan,
0016     has_production_role,
0017     request_validation,
0018 )
0019 from pandaserver.jobdispatcher import Protocol
0020 from pandaserver.srvcore.panda_request import PandaRequest
0021 from pandaserver.taskbuffer import JobUtils
0022 from pandaserver.taskbuffer.TaskBuffer import TaskBuffer
0023 
0024 _logger = PandaLogger().getLogger("api_job")
0025 
0026 global_task_buffer = None
0027 
0028 
0029 def init_task_buffer(task_buffer: TaskBuffer) -> None:
0030     """
0031     Initialize the task buffer. This method needs to be called before any other method in this module.
0032     """
0033     global global_task_buffer
0034     global_task_buffer = task_buffer
0035 
0036 
0037 @request_validation(_logger, secure=True, request_method="GET")
0038 def get_status(req: PandaRequest, job_ids: List[int], timeout: int = 60) -> Dict:
0039     """
0040     Get status of a job.
0041 
0042     Gets the status for a job and command to the pilot if any. Requires a secure connection.
0043 
0044     API details:
0045         HTTP Method: GET
0046         Path: /v1/job/get_status
0047 
0048     Args:
0049         req(PandaRequest): internally generated request object containing the env variables
0050         job_ids(list of int): list of PanDA job IDs.
0051         timeout(int, optional): The timeout value. Defaults to 60.
0052 
0053     Returns:
0054         dict: The system response `{"success": success, "message": message, "data": data}`. When successful, the data field contains a list of tupes with (status, command). When unsuccessful, the message field contains the error message and data an error code.
0055     """
0056     tmp_logger = LogWrapper(_logger, f"get_status job_ids={job_ids} timeout={timeout}")
0057     tmp_logger.debug("Start")
0058 
0059     # The task buffer method expect a comma separated list of job_ids
0060     job_ids_str = ",".join([str(job_id) for job_id in job_ids])
0061     timed_method = TimedMethod(global_task_buffer.checkJobStatus, timeout)
0062     timed_method.run(job_ids_str)
0063 
0064     # Time out
0065     if timed_method.result == Protocol.TimeOutToken:
0066         tmp_logger.debug("Timed out")
0067         return generate_response(False, message="time out", data={"code": Protocol.SC_TimeOut})
0068 
0069     # No result
0070     if not isinstance(timed_method.result, list):
0071         tmp_logger.debug(f"Failed")
0072         return generate_response(False, message="failed", data={"code": Protocol.SC_Failed})
0073 
0074     # Success
0075     data = timed_method.result
0076     tmp_logger.debug(f"Done with data={data}")
0077     return generate_response(True, data=data)
0078 
0079 
0080 @request_validation(_logger, secure=True)
0081 def get_description(self, job_ids: List[int]) -> Dict:
0082     """
0083     Get description of a job.
0084 
0085     Gets the description of a job from the main/active schema. The description includes job attributes, job parameters and related file attributes. Requires a secure connection.
0086 
0087     API details:
0088         HTTP Method: GET
0089         Path: /v1/job/get_description
0090 
0091     Args:
0092         req(PandaRequest): internally generated request object containing the env variables
0093         job_ids (list of int): List of PanDA job IDs.
0094         timeout (int, optional): The timeout value. Defaults to 60.
0095 
0096     Returns:
0097         dict: The system response `{"success": success, "message": message, "data": data}`. When successful, the data field contains a list with job descriptions. When unsuccessful, the message field contains the error message and data an error code.
0098     """
0099     tmp_logger = LogWrapper(_logger, f"get_description")
0100     tmp_logger.debug("Start")
0101 
0102     try:
0103         tmp_logger.debug(f"Number of requested PanDA IDs: {len(job_ids)}")
0104         max_ids = 5500
0105         if len(job_ids) > max_ids:
0106             tmp_logger.error(f"List of PanDA IDs is longer than {max_ids}. Truncating the list.")
0107             job_ids = job_ids[:max_ids]
0108     except Exception:
0109         error_type, error_value, _ = sys.exc_info()
0110         tmp_logger.error(f"Failed deserializing the list of PanDA IDs: {error_type} {error_value}")
0111         job_ids = []
0112 
0113     tmp_logger.debug(f"Retrieving data for {job_ids}")
0114     ret = [job.to_dict_advanced() if job is not None else None for job in global_task_buffer.peekJobs(job_ids)]
0115     _logger.debug("Done")
0116 
0117     return generate_response(True, data=ret)
0118 
0119 
0120 @request_validation(_logger, secure=True)
0121 def get_description_incl_archive(req: PandaRequest, job_ids: List[int]) -> Dict:
0122     """
0123     Get description of a job.
0124 
0125     Gets the description of a job, also looking into the secondary/archive schema. The description includes job attributes, job parameters and related file attributes. Requires a secure connection.
0126 
0127     API details:
0128         HTTP Method: GET
0129         Path: /v1/job/get_description_incl_archive
0130 
0131     Args:
0132         req(PandaRequest): internally generated request object containing the env variables.
0133         job_ids (list of int): List of PanDA job IDs.
0134         timeout (int, optional): The timeout value. Defaults to 60.
0135 
0136     Returns:
0137         dict: The system response `{"success": success, "message": message, "data": data}`. When successful, the data field contains a list with job descriptions. When unsuccessful, the message field contains the error message and data an error code.
0138     """
0139     tmp_logger = LogWrapper(_logger, f"get_description_including_archive")
0140     tmp_logger.debug("Start")
0141 
0142     try:
0143         tmp_logger.debug(f"Number of requested PanDA IDs: {len(job_ids)}")
0144         max_ids = 5500
0145         if len(job_ids) > max_ids:
0146             tmp_logger.error(f"List of PanDA IDs is longer than {max_ids}. Truncating the list.")
0147             job_ids = job_ids[:max_ids]
0148     except Exception:
0149         error_type, error_value, _ = sys.exc_info()
0150         tmp_logger.error(f"Failed deserializing the list of PanDA IDs: {error_type} {error_value}")
0151         job_ids = []
0152 
0153     tmp_logger.debug(f"Retrieving data for {str(job_ids)}")
0154 
0155     ret = [job.to_dict_advanced() if job is not None else None for job in global_task_buffer.getFullJobStatus(job_ids)]
0156 
0157     tmp_logger.debug("getFullJobStatus end")
0158     return generate_response(True, data=ret)
0159 
0160 
0161 @request_validation(_logger, secure=False, request_method="GET")
0162 def generate_offline_execution_script(req: PandaRequest, job_id: int, days: int = None) -> Dict:
0163     """
0164     Get execution script for a job.
0165 
0166     Gets the execution script for a job, including Rucio download of input, ALRB setup, downloading transformation script and running the script. Requires a secure connection.
0167 
0168     API details:
0169         HTTP Method: GET
0170         Path: /v1/job/generate_offline_execution_script
0171 
0172     Args:
0173         req(PandaRequest): internally generated request object containing the env variables
0174         job_id (int): PanDA job ID
0175         timeout (int, optional): The timeout value. Defaults to 60.
0176 
0177     Returns:
0178         dict: The system response `{"success": success, "message": message, "data": data}`. When successful, the data field will contain `{"script": script}`. When unsuccessful, the message field contains the error message and data an error code.
0179     """
0180     tmp_logger = LogWrapper(_logger, f"generate_offline_execution_script job_id={job_id} days={days}")
0181     tmp_logger.debug("Start")
0182     script = global_task_buffer.getScriptOfflineRunning(job_id, days)
0183 
0184     if script.startswith("ERROR"):
0185         tmp_logger.debug(f"Failed to generate script: {script}")
0186         return script
0187 
0188     tmp_logger.debug("Done")
0189     return script
0190 
0191 
0192 @request_validation(_logger, secure=True, request_method="GET")
0193 def get_metadata_for_analysis_jobs(req: PandaRequest, task_id: int) -> Dict:
0194     """
0195     Get metadata for analysis jobs
0196 
0197     Gets the metadata from the metatable for analysis jobs in `finished` status. Requires a secure connection.
0198 
0199     API details:
0200         HTTP Method: GET
0201         Path: /v1/job/get_metadata_for_analysis_jobs
0202 
0203     Args:
0204         req(PandaRequest): internally generated request object containing the env variables
0205         task_id (int): JEDI task ID
0206 
0207     Returns:
0208         dict: The system response `{"success": success, "message": message, "data": data}`. When successful, the data field contains the medata. When unsuccessful, the message field contains the error message and data an error code.
0209     """
0210 
0211     tmp_logger = LogWrapper(_logger, f"get_metadata_for_analysis_jobs task_id={task_id}")
0212     tmp_logger.debug("Start")
0213 
0214     metadata = global_task_buffer.getUserJobMetadata(task_id)
0215     if not metadata:
0216         tmp_logger.debug("No metadata found")
0217         return generate_response(True, message="No metadata found", data={})
0218 
0219     tmp_logger.debug("Done")
0220     return generate_response(True, data=metadata)
0221 
0222 
0223 @request_validation(_logger, secure=True, request_method="POST")
0224 def kill(req, job_ids: List[int], code: int = None, use_email_as_id: bool = False, kill_options: List[str] = []):
0225     """
0226     Kill the jobs
0227 
0228     Kills the jobs with the given PanDA IDs. Requires a secure connection.
0229 
0230     API details:
0231         HTTP Method: POST
0232         Path: /v1/job/kill
0233 
0234     Args:
0235         req(PandaRequest): internally generated request object containing the env variables
0236         job_ids (list): List of PanDA job IDs
0237         code (int, optional): The kill code. Defaults to None.
0238             ```
0239             code
0240             2: expire
0241             3: aborted
0242             4: expire in waiting
0243             7: retry by server
0244             8: rebrokerage
0245             9: force kill
0246             10: fast rebrokerage in overloaded PQ
0247             50: kill by JEDI
0248             51: reassigned by JEDI
0249             52: force kill by JEDI
0250             55: killed since task is (almost) done
0251             60: workload was terminated by the pilot without actual work
0252             91: kill user jobs with prod role
0253             99: force kill user jobs with prod role
0254             ```
0255         use_email_as_id (bool, optional): Use the email as ID. Defaults to False.
0256         kill_options (List, optional): Defaults to []. Possible options are: `keepUnmerged`, `jobSubStatus=xyz`
0257 
0258     Returns:
0259         dict: The system response `{"success": success, "message": message, "data": data}`. The data field contains a list of bools indicating the success of the kill operations.
0260     """
0261 
0262     # retrieve the user information
0263     user = get_dn(req)
0264     is_production_manager = has_production_role(req)
0265     fqans = get_fqan(req)
0266 
0267     tmp_logger = LogWrapper(_logger, f"kill")
0268     tmp_logger.debug(f"Start user: {user} code: {code} is_production_manager: {is_production_manager} fqans: {fqans} job_ids: {job_ids}")
0269 
0270     # Get the user's email address if use_email_as_id is set
0271     if use_email_as_id:
0272         email = get_email_address(user)
0273         if email:
0274             user = email
0275 
0276     # Extract working groups with production role from FQANs
0277     wg_prod_roles = extract_production_working_groups(fqans)
0278 
0279     # kill jobs
0280     ret = global_task_buffer.killJobs(job_ids, user, code, is_production_manager, wg_prod_roles, kill_options)
0281     tmp_logger.debug(f"Done with ret: {ret}")
0282     return generate_response(True, data=ret)
0283 
0284 
0285 @request_validation(_logger, secure=True, request_method="POST")
0286 def reassign(req: PandaRequest, job_ids: List[int]):
0287     """
0288     Reassign a list of jobs
0289 
0290     Reassigns a list of jobs. Requires a secure connection.
0291 
0292     API details:
0293         HTTP Method: POST
0294         Path: /v1/job/reassign
0295 
0296     Args:
0297         req(PandaRequest): internally generated request object containing the env variables
0298         job_ids (list): List of PanDA job IDs
0299 
0300     Returns:
0301         dict: The system response `{"success": True}`.
0302     """
0303     # TODO: think about the default values for for_pending and first_submission. The resolve_true and resolve_false functions are confusing me.
0304 
0305     tmp_logger = LogWrapper(_logger, f"reassign job_ids={job_ids}")
0306     tmp_logger.debug("Start")
0307     # taskBuffer.reassignJobs always returns True
0308     global_task_buffer.reassignJobs(job_ids)
0309     tmp_logger.debug(f"Done")
0310     return generate_response(True)
0311 
0312 
0313 @request_validation(_logger, secure=True, production=True, request_method="POST")
0314 def set_command(req: PandaRequest, job_id: int, command: str):
0315     """
0316     Set a pilot command
0317 
0318     Sets a command to the pilot for a job. Requires a secure connection and production role.
0319 
0320     API details:
0321         HTTP Method: POST
0322         Path: /v1/job/set_command
0323 
0324     Args:
0325         req(PandaRequest): internally generated request object containing the env variables
0326         job_id (int): PanDA job ID
0327         command (str): The command for the pilot, e.g. `tobekilled`
0328 
0329     Returns:
0330         dict: The system response `{"success": success, "message": message}`.
0331     """
0332     tmp_logger = LogWrapper(_logger, f"set_command job_id={job_id} command={command}")
0333     tmp_logger.debug("Start")
0334     success, message = global_task_buffer.send_command_to_job(job_id, command)
0335     tmp_logger.debug("Done with success={success} message={message}")
0336     return generate_response(success, message=message)
0337 
0338 
0339 @request_validation(_logger, secure=True, production=True, request_method="POST")
0340 def set_debug_mode(req: PandaRequest, job_id: int, mode: bool):
0341     """
0342     Set the debug mode
0343 
0344     Sets the debug mode for a job. Requires a secure connection and production role.
0345 
0346     API details:
0347         HTTP Method: POST
0348         Path: /v1/job/set_debug_mode
0349 
0350     Args:
0351         req(PandaRequest): internally generated request object containing the env variables
0352         job_id (int): PanDA job ID
0353         mode (bool): True to set debug mode, False to unset debug mode
0354 
0355     Returns:
0356         dict: The system response `{"success": success, "message": message}`.
0357     """
0358 
0359     tmp_logger = LogWrapper(_logger, f"set_debug_mode job_id={job_id}")
0360 
0361     user = get_dn(req)
0362     is_production_manager = has_production_role(req)
0363     fqans = get_fqan(req)
0364 
0365     # get the primary working group with prod role
0366     working_group = extract_primary_production_working_group(fqans)
0367 
0368     tmp_logger.debug(f"Start user={user} mgr={is_production_manager} wg={working_group} fqans={str(fqans)}")
0369 
0370     message = global_task_buffer.setDebugMode(user, job_id, is_production_manager, mode, working_group)
0371 
0372     success = False
0373     if message != "Succeeded":
0374         success = True
0375 
0376     return generate_response(success, message=message)
0377 
0378 
0379 @request_validation(_logger, secure=True, request_method="POST")
0380 def submit(req: PandaRequest, jobs: str):
0381     """
0382     Submit jobs
0383 
0384     Sets the debug mode for a job. Requires a secure connection.
0385 
0386     API details:
0387         HTTP Method: POST
0388         Path: /v1/job/submit
0389 
0390     Args:
0391         req(PandaRequest): internally generated request object containing the env variables
0392         jobs (str): JSON string with a list of job specs
0393 
0394     Returns:
0395         dict: The system response `{"success": success, "message": message}`.
0396     """
0397     tmp_logger = LogWrapper(_logger, f"submit")
0398     user = get_dn(req)
0399     fqans = get_fqan(req)
0400     is_production_role = has_production_role(req)
0401     host = req.get_remote_host()
0402 
0403     # deserialize job specs
0404     try:
0405         jobs = JobUtils.load_jobs_json(jobs)
0406         tmp_logger.debug(f"{user} len:{len(jobs)} is_production_role={is_production_role} FQAN:{fqans}")
0407         max_jobs = 5000
0408         if len(jobs) > max_jobs:
0409             _logger.error(f"Number of jobs exceeded maximum {max_jobs}. Truncating the list.")
0410             jobs = jobs[:max_jobs]
0411     except Exception as ex:
0412         error_message = f"Failed to deserialize jobs: {str(ex)} {traceback.format_exc()}"
0413         tmp_logger.error(error_message)
0414         return generate_response(False, message=error_message)
0415 
0416     if not jobs:
0417         return generate_response(False, message="No jobs were submitted")
0418 
0419     # check prodSourceLabel and job_label are correct
0420     for tmp_job in jobs:
0421         # check production jobs are submitted with production role
0422         if tmp_job.prodSourceLabel in ["managed"] and not is_production_role:
0423             return generate_response(False, message=f"{user} is missing production for prodSourceLabel={tmp_job.prodSourceLabel} submission")
0424 
0425         # validate the job_label
0426         if tmp_job.job_label not in [None, "", "NULL"] and tmp_job.job_label not in JobUtils.job_labels:
0427             return generate_response(False, message=f"job_label={tmp_job.job_label} is not valid")
0428 
0429     sample_job = jobs[0]
0430 
0431     # get user VO
0432     try:
0433         user_vo = "atlas"
0434         if sample_job.VO not in [None, "", "NULL"]:
0435             user_vo = sample_job.VO
0436     except (IndexError, AttributeError) as e:
0437         tmp_logger.error(f"User VO was not found and defaulted to {user_vo}. (Exception {e})")
0438 
0439     # atlas jobs require FQANs
0440     if user_vo == "atlas" and not fqans:
0441         tmp_logger.error(f"Proxy was missing FQANs for {user_vo}")
0442 
0443     # LSST: try to overwrite with pipeline username
0444     if user_vo.lower() == "lsst":
0445         try:
0446             if sample_job.prodUserName and sample_job.prodUserName.lower() != "none":
0447                 user = sample_job.prodUserName
0448         except AttributeError:
0449             tmp_logger.error(f"VO {user_vo} check: username not found in job parameters and defaulted to submitter ({user})")
0450 
0451     # store jobs
0452     ret = global_task_buffer.storeJobs(jobs, user, fqans=fqans, hostname=host, userVO=user_vo)
0453     tmp_logger.debug(f"{user} -> {len(ret)}")
0454 
0455     # There was no response
0456     if not ret:
0457         return generate_response(False, message="Failed to submit jobs")
0458 
0459     # There was a string for response, i.e. an error message
0460     if isinstance(ret, str):
0461         return generate_response(False, message=ret)
0462 
0463     return generate_response(True, data=ret)