Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-10 08:39:00

0001 import datetime
0002 import os
0003 import sys
0004 import time
0005 import traceback
0006 from typing import List
0007 
0008 from pandacommon.pandalogger.LogWrapper import LogWrapper
0009 from pandacommon.pandalogger.PandaLogger import PandaLogger
0010 from pandacommon.pandautils.PandaUtils import naive_utcnow
0011 
0012 from pandaserver.api.v1.common import (
0013     TimedMethod,
0014     generate_response,
0015     get_dn,
0016     has_production_role,
0017     request_validation,
0018 )
0019 from pandaserver.brokerage.SiteMapper import SiteMapper
0020 from pandaserver.config import panda_config
0021 from pandaserver.dataservice.adder_gen import AdderGen
0022 from pandaserver.jobdispatcher import Protocol
0023 from pandaserver.srvcore import CoreUtils
0024 from pandaserver.srvcore.CoreUtils import normalize_cpu_model
0025 from pandaserver.srvcore.panda_request import PandaRequest
0026 from pandaserver.taskbuffer.TaskBuffer import TaskBuffer
0027 
0028 _logger = PandaLogger().getLogger("api_pilot")
0029 pilot_logger = PandaLogger().getLogger("PilotRequests")
0030 
0031 
0032 global_task_buffer = None
0033 global_site_mapper_cache = None
0034 
0035 VALID_JOB_STATES = ["running", "failed", "finished", "holding", "starting", "transferring"]
0036 
0037 
0038 def init_task_buffer(task_buffer: TaskBuffer) -> None:
0039     """
0040     Initialize the task buffer. This method needs to be called before any other method in this module.
0041     """
0042     global global_task_buffer
0043     global_task_buffer = task_buffer
0044 
0045     global global_site_mapper_cache
0046     global_site_mapper_cache = CoreUtils.CachedObject("site_mapper", 60 * 10, _get_site_mapper, _logger)
0047 
0048 
0049 def _get_site_mapper():
0050     return True, SiteMapper(global_task_buffer)
0051 
0052 
0053 @request_validation(_logger, secure=True, request_method="POST")
0054 def acquire_jobs(
0055     req: PandaRequest,
0056     site_name: str,
0057     timeout: int = 60,
0058     memory: int = None,
0059     disk_space: int = None,
0060     prod_source_label: str = None,
0061     node: str = None,
0062     computing_element: str = None,
0063     prod_user_id: str = None,
0064     get_proxy_key: str = None,
0065     task_id: int = None,
0066     n_jobs: int = 1,
0067     background: bool = None,
0068     resource_type: str = None,
0069     harvester_id: str = None,
0070     worker_id: int = None,
0071     scheduler_id: str = None,
0072     job_type: str = None,
0073     via_topic: bool = None,
0074     remaining_time=None,
0075 ) -> dict:
0076     """
0077     Acquire jobs
0078 
0079     Acquire jobs for the pilot. The jobs are reserved, the job status is updated and the jobs are returned. Requires a secure connection.
0080 
0081     API details:
0082         HTTP Method: POST
0083         Path: /v1/pilot/acquire_jobs
0084 
0085     Args:
0086         req(PandaRequest): Internally generated request object containing the environment variables.
0087         site_name(str): The PanDA queue name
0088         timeout(int, optional): Request timeout in seconds. Optional and defaults to 60.
0089         memory(int, optional): Memory limit for the job. Optional and defaults to `None`.
0090         disk_space(int, optional): Disk space limit for the job. Optional and defaults to `None`.
0091         prod_source_label(str, optional): Prodsourcelabel, e.g. `user`, `managed`, `unified`... Optional and defaults to `None`.
0092         node(str, optional): Identifier of the worker node/slot. Optional and defaults to `None`.
0093         computing_element(str, optional): Computing element. Optional and defaults to `None`.
0094         prod_user_id(str, optional): User ID of the job. Optional and defaults to `None`.
0095         get_proxy_key(bool, optional): Flag to request a proxy key.Optional and defaults to `None`.
0096         task_id(int, optional): JEDI task ID of the job. Optional and defaults to `None`.
0097         n_jobs(int, optional): Number of jobs for bulk requests. Optional and defaults to `None`.
0098         background(bool, optional): Background flag. Optional and defaults to `None`.
0099         resource_type(str, optional): Resource type of the job, e.g. `SCORE`, `MCORE`,.... Optional and defaults to `None`.
0100         harvester_id(str, optional): Harvester ID, used to update the worker entry in the DB. Optional and defaults to `None`.
0101         worker_id(int, optional): Worker ID, used to update the worker entry in the DB. Optional and defaults to `None`.
0102         scheduler_id(str, optional): Scheduler, e.g. harvester ID. Optional and defaults to `None`.
0103         job_type(str, optional): Job type, e.g. `user`, `unified`, ... This is necessary on top of the `prodsourcelabel`
0104                                  to disambiguate the cases of test jobs that can be production or analysis. Optional and defaults to `None`.
0105         via_topic(bool, optional): Topic for message broker. Optional and defaults to `None`.
0106         remaining_time(int, optional): Remaining walltime. Optional and defaults to `None`.
0107 
0108     Returns:
0109         dict: The system response `{"success": success, "message": message, "data": data}`. The data is a list of job dictionaries.
0110               When failed, the message contains the error message.
0111     """
0112 
0113     tmp_logger = LogWrapper(_logger, f"acquire_jobs {naive_utcnow().isoformat('/')}")
0114     tmp_logger.debug(f"Start for {site_name}")
0115 
0116     # get DN and FQANs
0117     real_dn = get_dn(req)
0118 
0119     # check production role
0120     is_production_manager = has_production_role(req)
0121 
0122     # production jobs should only be retrieved with production role
0123     if not is_production_manager and prod_source_label not in ["user"]:
0124         tmp_logger.warning("invalid role")
0125         tmp_msg = "no production/pilot role in VOMS FQANs or non pilot owner"
0126         return generate_response(False, message=tmp_msg)
0127 
0128     # set DN for non-production user, the user can only get their own jobs
0129     if not is_production_manager:
0130         prod_user_id = real_dn
0131 
0132     # allow get_proxy_key for production role
0133     if get_proxy_key and is_production_manager:
0134         get_proxy_key = True
0135     else:
0136         get_proxy_key = False
0137 
0138     # convert memory
0139     try:
0140         memory = max(0, memory)
0141     except (ValueError, TypeError):
0142         memory = 0
0143 
0144     # convert disk_space
0145     try:
0146         disk_space = max(0, disk_space)
0147     except (ValueError, TypeError):
0148         disk_space = 0
0149 
0150     # convert remaining time
0151     try:
0152         remaining_time = max(0, remaining_time)
0153     except (ValueError, TypeError):
0154         remaining_time = 0
0155 
0156     tmp_logger.debug(
0157         f"{site_name}, n_jobs={n_jobs}, memory={memory}, disk={disk_space}, source_label={prod_source_label}, "
0158         f"node={node}, ce={computing_element}, user={prod_user_id}, proxy={get_proxy_key}, "
0159         f"task_id={task_id}, DN={real_dn}, role={is_production_manager}, "
0160         f"bg={background}, rt={resource_type}, harvester_id={harvester_id}, worker_id={worker_id}, "
0161         f"scheduler_id={scheduler_id}, job_type={job_type}, via_topic={via_topic} remaining_time={remaining_time}"
0162     )
0163 
0164     # log the acquire_jobs as it's used for site activity metrics
0165     for i in range(n_jobs):
0166         slot_suffix = f"_{i}" if n_jobs > 1 else ""
0167         pilot_logger.info(f"method=getJob,site={site_name},node={node}{slot_suffix},type={prod_source_label}")
0168 
0169     t_start = time.time()
0170 
0171     global_site_mapper_cache.update()
0172     is_grandly_unified = global_site_mapper_cache.cachedObj.getSite(site_name).is_grandly_unified()
0173     in_test_status = global_site_mapper_cache.cachedObj.getSite(site_name).status == "test"
0174 
0175     # change label
0176     if in_test_status and prod_source_label in ["user", "managed", "unified"]:
0177         new_label = "test"
0178         tmp_logger.debug(f"prod_source_label changed from {prod_source_label} to {new_label}")
0179         prod_source_label = new_label
0180 
0181     # wrapper function for timeout
0182     timed_method = TimedMethod(global_task_buffer.getJobs, timeout)
0183     timed_method.run(
0184         n_jobs,
0185         site_name,
0186         prod_source_label,
0187         memory,
0188         disk_space,
0189         node,
0190         timeout,
0191         computing_element,
0192         prod_user_id,
0193         task_id,
0194         background,
0195         resource_type,
0196         harvester_id,
0197         worker_id,
0198         scheduler_id,
0199         job_type,
0200         is_grandly_unified,
0201         via_topic,
0202         remaining_time,
0203     )
0204 
0205     # Time-out
0206     if timed_method.result == Protocol.TimeOutToken:
0207         message = "Timed out"
0208         tmp_logger.debug(message)
0209         return generate_response(False, message=message)
0210 
0211     # Try to get the jobs
0212     jobs = []
0213     if isinstance(timed_method.result, list):
0214         result = timed_method.result
0215         secrets_map = result.pop()
0216         proxy_key = result[-1]
0217         n_sent = result[-2]
0218         jobs = result[:-2]
0219 
0220     if not jobs:
0221         message = "No jobs in PanDA"
0222         tmp_logger.debug(message)
0223         pilot_logger.info(f"method=noJob,site={site_name},node={node},type={prod_source_label}")
0224         return generate_response(False, message=message)
0225 
0226     # add each job to the list
0227     response_list = []
0228     for tmp_job in jobs:
0229         try:
0230             # The response is nothing but a dictionary with the job information
0231             response = Protocol.Response(Protocol.SC_Success)
0232             response.appendJob(tmp_job, global_site_mapper_cache)
0233         except Exception as e:
0234             tmp_msg = f"failed to get jobs with {str(e)}"
0235             tmp_logger.error(f"{tmp_msg}\n{traceback.format_exc()}")
0236             raise
0237 
0238         # append n_sent
0239         response.appendNode("nSent", n_sent)
0240 
0241         # set proxy key
0242         if get_proxy_key:
0243             response.setProxyKey(proxy_key)
0244 
0245         # set user secrets
0246         if tmp_job.use_secrets() and tmp_job.prodUserName in secrets_map and secrets_map[tmp_job.prodUserName]:
0247             response.appendNode("secrets", secrets_map[tmp_job.prodUserName])
0248 
0249         # set pilot secrets
0250         pilot_secrets = secrets_map.get(panda_config.pilot_secrets, None)
0251         if pilot_secrets:
0252             response.appendNode("pilotSecrets", secrets_map[panda_config.pilot_secrets])
0253 
0254         response_list.append(response.data)
0255 
0256     # if the jobs were requested in bulk, we make a bulk response
0257     if n_jobs is not None:
0258         try:
0259             response = Protocol.Response(Protocol.SC_Success)
0260             response.appendNode("jobs", response_list)
0261         except Exception as e:
0262             tmp_msg = f"Failed to make response with {str(e)}"
0263             tmp_logger.error(f"{tmp_msg}\n{traceback.format_exc()}")
0264             raise
0265 
0266     tmp_logger.debug(f"Done for {site_name} {node}")
0267 
0268     t_end = time.time()
0269     t_delta = t_end - t_start
0270     tmp_logger.info(f"site_name={site_name} took timing={t_delta}s in_test={in_test_status}")
0271     return generate_response(True, data=response.data)
0272 
0273 
0274 @request_validation(_logger, secure=True, request_method="GET")
0275 def get_job_status(req: PandaRequest, job_ids: List[int], timeout: int = 60) -> dict:
0276     """
0277     Get job status
0278 
0279     Gets the status for a list of jobs. Requires a secure connection.
0280 
0281     API details:
0282         HTTP Method: GET
0283         Path: /v1/pilot/get_job_status
0284 
0285     Args:
0286         req(PandaRequest): Internally generated request object containing the environment variables.
0287         job_ids(list): list of job IDs.
0288         timeout(int, optional): The timeout value. Defaults to 60.
0289 
0290     Returns:
0291         dict: The system response `{"success": success, "message": message, "data": data}`. The data is a list of job status dictionaries, in the format
0292             ```
0293             [{"job_id": <job_id_requested>, "status": "not found", "attempt_number": 0}), {"job_id": <job_id>, "status": <status>, "attempt_number": <attempt_nr>}]
0294             ```
0295     """
0296 
0297     tmp_logger = LogWrapper(_logger, f"get_job_status {job_ids}")
0298     tmp_logger.debug("Start")
0299 
0300     # peek jobs
0301     timed_method = TimedMethod(global_task_buffer.peekJobs, timeout)
0302     timed_method.run(job_ids, fromDefined=False, fromActive=True, fromArchived=True, forAnal=False, use_json=False)
0303 
0304     # make response
0305     if timed_method.result == Protocol.TimeOutToken:
0306         message = "Timed out"
0307         tmp_logger.debug(message)
0308         return generate_response(False, message=message)
0309 
0310     if not isinstance(timed_method.result, list):
0311         message = "Database error"
0312         tmp_logger.debug(message)
0313         return generate_response(False, message=message)
0314 
0315     job_status_list = []
0316     for job, job_id_requested in zip(timed_method.result, job_ids):
0317         if not job:
0318             job_status_list.append({"job_id": job_id_requested, "status": "not found", "attempt_number": 0})
0319         else:
0320             job_status_list.append({"job_id": job.PandaID, "status": job.jobStatus, "attempt_number": job.attemptNr})
0321 
0322     tmp_logger.debug(f"Done: job_status_list={job_status_list}")
0323 
0324     return generate_response(True, data=job_status_list)
0325 
0326 
0327 @request_validation(_logger, secure=True, production=True, request_method="POST")
0328 def update_job(
0329     req: PandaRequest,
0330     job_id: int,
0331     job_status: str,
0332     job_output_report: str = "",
0333     node: str = None,
0334     cpu_consumption_time: int = None,
0335     cpu_consumption_unit: str = None,
0336     scheduler_id: str = None,
0337     pilot_id: str = None,
0338     site_name: str = None,
0339     pilot_log: str = "",
0340     meta_data: str = "",
0341     cpu_conversion_factor: float | int = None,
0342     trans_exit_code: int = None,
0343     pilot_error_code: int = None,
0344     pilot_error_diag: str = None,
0345     exe_error_code: int = None,
0346     exe_error_diag: str = None,
0347     pilot_timing: str = None,
0348     start_time: str = None,
0349     end_time: str = None,
0350     n_events: int = None,
0351     n_input_files: int = None,
0352     batch_id: str = None,
0353     attempt_nr: int = None,
0354     job_metrics: str = None,
0355     stdout: str = "",
0356     job_sub_status: str = None,
0357     core_count: int = None,
0358     max_rss: int | float = None,
0359     max_vmem: int | float = None,
0360     max_swap: int | float = None,
0361     max_pss: int | float = None,
0362     avg_rss: int | float = None,
0363     avg_vmem: int | float = None,
0364     avg_swap: int | float = None,
0365     avg_pss: int | float = None,
0366     tot_rchar: int | float = None,
0367     tot_wchar: int | float = None,
0368     tot_rbytes: int | float = None,
0369     tot_wbytes: int | float = None,
0370     rate_rchar: int | float = None,
0371     rate_wchar: int | float = None,
0372     rate_rbytes: int | float = None,
0373     rate_wbytes: int | float = None,
0374     corrupted_files: str = None,
0375     mean_core_count: int = None,
0376     cpu_architecture_level: str = None,
0377     grid: str = None,
0378     source_site: str = None,
0379     destination_site: str = None,
0380     timeout: int = 60,
0381 ):
0382     """
0383     Update job
0384 
0385     Updates the details for a job, stores the metadata and excerpts from the pilot log. Requires a secure connection and production role.
0386 
0387     API details:
0388         HTTP Method: POST
0389         Path: /v1/pilot/update_job
0390 
0391     Args:
0392         req(PandaRequest): internally generated request object containing the env variables
0393         job_id (int): PanDA job ID
0394         job_status(str, optional): Job status
0395         job_sub_status(str, optional): Job sub status. Optional, defaults to `None`
0396         start_time(str, optional): Job start time in format `"%Y-%m-%d %H:%M:%S"`. Optional, defaults to `None`
0397         end_time(str, optional): Job end time in format `"%Y-%m-%d %H:%M:%S"`. Optional, defaults to `None`
0398         pilot_timing(str, optional): String with pilot timings. Optional, defaults to `None`
0399         site_name(str, optional): PanDA queue name. Optional, defaults to `None`
0400         node(str, optional): Identifier for worker node/slot. Optional, defaults to `None`
0401         scheduler_id(str, optional): Scheduler ID, such as harvester instance. Optional, defaults to `None`
0402         pilot_id(str, optional): Pilot ID. Optional, defaults to `None`
0403         batch_id(str, optional): Batch ID. Optional, defaults to `None`
0404         trans_exit_code(int, optional): Transformation exit code. Optional, defaults to `None`
0405         pilot_error_code(int, optional): Pilot error code. Optional, defaults to `None`
0406         pilot_error_diag(str, optional): Pilot error message. Optional, defaults to `None`
0407         exe_error_code(int, optional): Execution error code. Optional, defaults to `None`
0408         exe_error_diag(str, optional): Execution error message. Optional, defaults to `None`
0409         n_events(int, optional): Number of events. Optional, defaults to `None`
0410         n_input_files(int, optional): Number of input files. Optional, defaults to `None`
0411         attempt_nr(int, optional): Job attempt number. Optional, defaults to `None`
0412         cpu_consumption_time(int, optional): CPU consumption time. Optional, defaults to `None`
0413         cpu_consumption_unit(str, optional): CPU consumption unit, being used for updating some CPU details. Optional, defaults to `None`
0414         cpu_conversion_factor(float, optional): CPU conversion factor. Optional defaults to `None`
0415         core_count(int, optional): Number of cores of the job. Optional, defaults to `None`
0416         mean_core_count(int, optional): Mean core count. Optional, defaults to `None`
0417         max_rss(int, optional): Measured max RSS memory. Optional, defaults to `None`
0418         max_vmem(int, optional): Measured max Virtual memory. Optional, defaults to `None`
0419         max_swap(int, optional): Measured max swap memory. Optional, defaults to `None`
0420         max_pss(int, optional): Measured max PSS memory. Optional, defaults to `None`
0421         avg_rss(int, optional): Measured average RSS. Optional, defaults to `None`
0422         avg_vmem(int, optional): Measured average Virtual memory.Optional, defaults to `None`
0423         avg_swap(int, optional): Measured average swap memory. Optional, defaults to `None`
0424         avg_pss(int, optional): Measured average PSS. Optional, defaults to `None`
0425         tot_rchar(int, optional): Measured total read characters. Optional, defaults to `None`
0426         tot_wchar(int, optional): Measured total written characters. Optional, defaults to `None`
0427         tot_rbytes(int, optional): Measured total read bytes. Optional, defaults to `None`
0428         tot_wbytes(int, optional): Measured total written bytes. Optional, defaults to `None`
0429         rate_rchar(int, optional): Measured rate for read characters. Optional, defaults to `None`
0430         rate_wchar(int, optional): Measured rate for written characters. Optional, defaults to `None`
0431         rate_rbytes(int, optional): Measured rate for read bytes. Optional, defaults to `None`
0432         rate_wbytes(int, optional): Measured rate for written bytes. Optional, defaults to `None`
0433         corrupted_files(str, optional): List of corrupted files in comma separated format. Optional, defaults to `None`
0434         cpu_architecture_level(str, optional): CPU architecture level (e.g. `x86_64-v3`). Optional, defaults to `None`
0435         grid(str, optional): Grid type. Optional, defaults to `None`
0436         source_site(str, optional): Source site name. Optional, defaults to `None`
0437         destination_site(str, optional): Destination site name. Optional, defaults to `None`
0438         job_metrics(str, optional): Job metrics. Optional, defaults to `None`
0439         job_output_report(str, optional): Job output report. Optional, defaults to `""`
0440         pilot_log(str, optional): Pilot log excerpt. Optional, defaults to `""`
0441         meta_data(str, optional): Job metadata. Optional, defaults to `""`
0442         stdout(str, optional): Standard output. Optional, defaults to `""`
0443         timeout(int, optional): Timeout for the operation in seconds. Optional, defaults to 60
0444 
0445     Returns:
0446         dict: The system response `{"success": success, "message": message, "data": data}`. Data will contain a dictionary with the pilot secrets and the command to the pilot.
0447               ```
0448                 {"pilotSecrets": <pilot_secrets>, "command": <command>}
0449               ```
0450     """
0451     tmp_logger = LogWrapper(_logger, f"update_job PandaID={job_id} PID={os.getpid()}")
0452     tmp_logger.debug("Start")
0453 
0454     _logger.debug(
0455         f"update_job({job_id}, {job_status}, {trans_exit_code}, {pilot_error_code}, {pilot_error_diag}, {node},"
0456         f"cpu_consumption_time={cpu_consumption_time}, {cpu_consumption_unit}, {cpu_architecture_level}, "
0457         f"{scheduler_id}, {pilot_id}, {site_name}, {n_events}, {n_input_files}, {cpu_conversion_factor}, "
0458         f"{exe_error_code}, {exe_error_diag}, {pilot_timing}, {start_time}, {end_time}, {batch_id}, "
0459         f"attempt_nr:{attempt_nr}, job_sub_status:{job_sub_status}, core_count:{core_count}, "
0460         f"max_rss={max_rss}, max_vmem={max_vmem}, max_swap={max_swap}, "
0461         f"max_pss={max_pss}, avg_rss={avg_rss}, avg_vmem={avg_vmem}, avg_swap={avg_swap}, avg_pss={avg_pss}, "
0462         f"tot_rchar={tot_rchar}, tot_wchar={tot_wchar}, tot_rbytes={tot_rbytes}, tot_wbytes={tot_wbytes}, rate_rchar={rate_rchar}, "
0463         f"rate_wchar={rate_wchar}, rate_rbytes={rate_rbytes}, rate_wbytes={rate_wbytes}, mean_core_count={mean_core_count}, "
0464         f"grid={grid}, source_site={source_site}, destination_site={destination_site}, "
0465         f"corrupted_files={corrupted_files}\n==job_output_report==\n{job_output_report}\n==LOG==\n{pilot_log[:1024]}\n==Meta==\n{meta_data[:1024]}\n"
0466         f"==Metrics==\n{job_metrics}\n==stdout==\n{stdout})"
0467     )
0468 
0469     pilot_logger.debug(f"method=updateJob,site={site_name},node={node},type=None")
0470 
0471     # aborting message
0472     if job_id == "NULL":
0473         response = Protocol.Response(Protocol.SC_Invalid)
0474         return generate_response(success=False, message="job_id is NULL", data=response.data)
0475 
0476     # check the job status is valid
0477     if job_status not in VALID_JOB_STATES:
0478         message = f"Invalid job status: {job_status}"
0479         tmp_logger.warning(message)
0480         response = Protocol.Response(Protocol.SC_Invalid)
0481         return generate_response(success=False, message=message, data=response.data)
0482 
0483     # create the job parameter map
0484     param = {}
0485     fields = [
0486         ("cpuConsumptionTime", cpu_consumption_time, int),
0487         ("cpuConsumptionUnit", cpu_consumption_unit, str),
0488         ("cpu_architecture_level", cpu_architecture_level, lambda x: str(x)[:20]),
0489         ("modificationHost", node, lambda x: str(x)[:128]),
0490         ("transExitCode", trans_exit_code, int),
0491         ("pilotErrorCode", pilot_error_code, int),
0492         ("pilotErrorDiag", pilot_error_diag, lambda x: str(x)[:500]),
0493         ("jobMetrics", job_metrics, lambda x: str(x)[:500]),
0494         ("schedulerID", scheduler_id, str),
0495         ("pilotID", pilot_id, lambda x: str(x)[:200]),
0496         ("batchID", batch_id, lambda x: str(x)[:80]),
0497         ("exeErrorCode", exe_error_code, int),
0498         ("exeErrorDiag", exe_error_diag, lambda x: str(x)[:500]),
0499         ("cpuConversion", cpu_conversion_factor, float),
0500         ("pilotTiming", pilot_timing, str),
0501         ("nEvents", n_events, int),
0502         ("nInputFiles", n_input_files, int),
0503         ("jobSubStatus", job_sub_status, str),
0504         ("actualCoreCount", core_count, int),
0505         ("meanCoreCount", mean_core_count, float),
0506         ("maxRSS", max_rss, int),
0507         ("maxVMEM", max_vmem, int),
0508         ("maxSWAP", max_swap, int),
0509         ("maxPSS", max_pss, int),
0510         ("avgRSS", avg_rss, lambda x: int(float(x))),
0511         ("avgVMEM", avg_vmem, lambda x: int(float(x))),
0512         ("avgSWAP", avg_swap, lambda x: int(float(x))),
0513         ("avgPSS", avg_pss, lambda x: int(float(x))),
0514         ("corruptedFiles", corrupted_files, str),
0515         ("grid", grid, str),
0516         ("sourceSite", source_site, str),
0517         ("destinationSite", destination_site, str),
0518     ]
0519 
0520     # Iterate through fields, apply transformations and add to `param`
0521     for key, value, cast in fields:
0522         if value not in [None, ""]:
0523             try:
0524                 param[key] = cast(value)
0525             except Exception:
0526                 tmp_logger.error(f"Invalid {key}={value} for updateJob")
0527 
0528     # Special handling for file size metrics
0529     file_metrics = [
0530         ("totRCHAR", tot_rchar),
0531         ("totWCHAR", tot_wchar),
0532         ("totRBYTES", tot_rbytes),
0533         ("totWBYTES", tot_wbytes),
0534         ("rateRCHAR", rate_rchar),
0535         ("rateWCHAR", rate_wchar),
0536         ("rateRBYTES", rate_rbytes),
0537         ("rateWBYTES", rate_wbytes),
0538     ]
0539 
0540     for key, value in file_metrics:
0541         if value is not None:
0542             try:
0543                 value = int(value) / 1024  # Convert to kB
0544                 param[key] = min(10**10 - 1, value)  # Limit to 10 digits
0545             except Exception:
0546                 tmp_logger.error(f"Invalid {key}={value} for updateJob")
0547 
0548     # Convert timestamps
0549     for key, value in [("startTime", start_time), ("endTime", end_time)]:
0550         if value is not None:
0551             try:
0552                 param[key] = datetime.datetime(*time.strptime(value, "%Y-%m-%d %H:%M:%S")[:6])
0553             except Exception:
0554                 tmp_logger.error(f"Invalid {key}={value} for updateJob")
0555 
0556     # Handle attempt_nr separately
0557     if attempt_nr is not None:
0558         try:
0559             attempt_nr = int(attempt_nr)
0560         except Exception:
0561             attempt_nr = None
0562 
0563     tmp_logger.debug("executing")
0564 
0565     # store the pilot log
0566     if pilot_log != "":
0567         tmp_logger.debug("Saving pilot log")
0568         try:
0569             global_task_buffer.storePilotLog(int(job_id), pilot_log)
0570             tmp_logger.debug("Saving pilot log DONE")
0571         except Exception:
0572             tmp_logger.debug("Saving pilot log FAILED")
0573 
0574     # add meta_data
0575     if meta_data != "":
0576         ret = global_task_buffer.addMetadata([job_id], [meta_data], [job_status])
0577         if len(ret) > 0 and not ret[0]:
0578             message = "Failed to add meta_data"
0579             tmp_logger.debug(message)
0580             return generate_response(True, message, data={"StatusCode": Protocol.SC_Failed, "ErrorDiag": message})
0581 
0582     # add stdout
0583     if stdout != "":
0584         global_task_buffer.addStdOut(job_id, stdout)
0585 
0586     # update the job
0587     tmp_status = job_status
0588     update_state_change = False
0589     if job_status in ("failed", "finished"):
0590         tmp_status = "holding"
0591         update_state_change = True  # update stateChangeTime to prevent Watcher from finding this job
0592         param["jobDispatcherErrorDiag"] = None
0593     elif job_status in ("holding", "transferring"):
0594         param["jobDispatcherErrorDiag"] = f"set to {job_status} by the pilot at {naive_utcnow().strftime('%Y-%m-%d %H:%M:%S')}"
0595 
0596     # update the job status in the database
0597     timeout = None if job_status == "holding" else timeout
0598     timed_method = TimedMethod(global_task_buffer.updateJobStatus, timeout)
0599     timed_method.run(job_id, tmp_status, param, update_state_change, attempt_nr)
0600 
0601     # time-out
0602     if timed_method.result == Protocol.TimeOutToken:
0603         message = "Timed out"
0604         tmp_logger.error(message)
0605         response = Protocol.Response(Protocol.SC_TimeOut)
0606         return generate_response(True, message=message, data=response.data)
0607 
0608     # no result
0609     if not timed_method.result:
0610         message = "Database error"
0611         tmp_logger.error(message)
0612         response = Protocol.Response(Protocol.SC_Failed)
0613         return generate_response(True, message=message, data=response.data)
0614 
0615     # generate the response with the result
0616     data = {"StatusCode": Protocol.SC_Success}
0617     result = timed_method.result
0618 
0619     # set the secrets
0620     secrets = result.get("secrets") if isinstance(result, dict) else None
0621     if secrets:
0622         data["pilotSecrets"] = secrets
0623 
0624     # set the command to the pilot
0625     command = result.get("command") if isinstance(result, dict) else result
0626     data["command"] = command if isinstance(command, str) else None
0627 
0628     # add output to dataset for failed/finished jobs with correct results
0629     if job_status in ("failed", "finished") and result not in ("badattemptnr", "alreadydone"):
0630         adder_gen = AdderGen(global_task_buffer, job_id, job_status, attempt_nr)
0631         adder_gen.dump_file_report(job_output_report, attempt_nr)
0632         del adder_gen
0633 
0634     tmp_logger.debug(f"Done. data={data}")
0635     return generate_response(True, data=data)
0636 
0637 
0638 @request_validation(_logger, secure=True, production=True, request_method="POST")
0639 def update_jobs_bulk(req, job_list: List, harvester_id: str = None):
0640     """
0641     Update jobs in bulk
0642 
0643     Bulk method to update the details for jobs, store the metadata and excerpt from the pilot log. Internally, this method loops over
0644     the jobs and calls `update_job` for each job. Requires a secure connection and production role.
0645 
0646     API details:
0647         HTTP Method: POST
0648         Path: /v1/pilot/update_jobs_bulk
0649 
0650     Args:
0651         req(PandaRequest): internally generated request object containing the env variables
0652         job_list(list): list of job dictionaries to update. The mandatory and optional keys for each job dictionary are the same as the arguments for `update_job`.
0653         harvester_id (str, optional): Harvester ID. Optional, defaults to `None`.
0654 
0655     Returns:
0656         dict: The system response `{"success": success, "message": message, "data": data}`. Data will contain a dictionary with the pilot secrets and the command to the pilot.
0657               ```
0658                 {"pilotSecrets": <pilot_secrets>, "command": <command>}
0659               ```
0660     """
0661     tmp_logger = LogWrapper(_logger, f"update_jobs_bulk harvester_id={harvester_id}")
0662     tmp_logger.debug("Start")
0663     t_start = naive_utcnow()
0664 
0665     success = False
0666     message = ""
0667     data = []
0668 
0669     try:
0670         for job_dict in job_list:
0671             job_id = job_dict["job_id"]
0672             del job_dict["job_id"]
0673 
0674             status = job_dict["job_status"]
0675             del job_dict["job_status"]
0676 
0677             if "meta_data" in job_dict:
0678                 job_dict["meta_data"] = str(job_dict["meta_data"])
0679 
0680             tmp_ret = update_job(req, job_id, status, **job_dict)
0681             data.append(tmp_ret)
0682         success = True
0683     except Exception:
0684         err_type, err_value = sys.exc_info()[:2]
0685         message = f"failed with {err_type.__name__} {err_value}"
0686         data = []
0687         tmp_logger.error(f"{message}\n{traceback.format_exc()}")
0688 
0689     t_delta = naive_utcnow() - t_start
0690     tmp_logger.debug(f"Done. Took {t_delta.seconds}.{t_delta.microseconds // 1000:03d} sec")
0691     return generate_response(success, message, data)
0692 
0693 
0694 @request_validation(_logger, secure=True, production=True, request_method="POST")
0695 def update_worker_status(req: PandaRequest, worker_id, harvester_id, status, timeout=60, node_id=None):
0696     """
0697     Update worker status
0698 
0699     Updates the status of a worker with the information seen by the pilot. Requires a secure connection and production role.
0700 
0701     API details:
0702         HTTP Method: POST
0703         Path: /v1/pilot/update_worker_status
0704 
0705     Args:
0706         req(PandaRequest): Internally generated request object containing the environment variables.
0707         worker_id (str): The worker ID.
0708         harvester_id (str): The harvester ID.
0709         status (str): The status of the worker. Must be either 'started' or 'finished'.
0710         timeout (int, optional): The timeout value. Defaults to 60.
0711         node_id (str, optional): The node ID. Defaults to None.
0712 
0713     Returns:
0714         str: The result of the status update or an error message.
0715     """
0716     tmp_logger = LogWrapper(
0717         _logger,
0718         f"updateWorkerPilotStatus worker_id={worker_id} harvester_id={harvester_id} status={status} node_id={node_id} PID={os.getpid()}",
0719     )
0720     tmp_logger.debug("Start")
0721 
0722     # validate the state passed by the pilot
0723     valid_worker_states = ("started", "finished")
0724     if status not in valid_worker_states:
0725         message = f"Invalid worker state. The worker state has to be in {valid_worker_states}"
0726         tmp_logger.debug(message)
0727         return generate_response(False, message)
0728 
0729     timed_method = TimedMethod(global_task_buffer.updateWorkerPilotStatus, timeout)
0730     timed_method.run(worker_id, harvester_id, status, node_id)
0731 
0732     # generate the response
0733     if not timed_method.result:  # failure
0734         message = "Failed to update worker status"
0735         tmp_logger.error(message)
0736         return generate_response(False, message)
0737 
0738     if timed_method.result == Protocol.TimeOutToken:  # timeout
0739         message = "Updating worker status timed out"
0740         tmp_logger.error(message)
0741         return generate_response(False, message)
0742 
0743     tmp_logger.debug(f"Done")
0744     return generate_response(True)
0745 
0746 
0747 @request_validation(_logger, secure=True, production=True, request_method="POST")
0748 def update_worker_node(
0749     req: PandaRequest,
0750     site: str,
0751     host_name: str,
0752     cpu_model: str,
0753     panda_queue: str = None,
0754     n_logical_cpus: int = None,
0755     n_sockets: int = None,
0756     cores_per_socket: int = None,
0757     threads_per_core: int = None,
0758     cpu_architecture: str = None,
0759     cpu_architecture_level: str = None,
0760     clock_speed: float = None,
0761     total_memory: int = None,
0762     total_local_disk: int = None,
0763     timeout: int = 60,
0764 ):
0765     """
0766     Update worker node
0767 
0768     Updates a worker node in the worker node map. When already found, it updates the `last_seen` time. When not found, it adds the worker node. Requires a secure connection and production role.
0769 
0770     API details:
0771         HTTP Method: POST
0772         Path: /v1/pilot/update_worker_node
0773 
0774     Args:
0775         req(PandaRequest): Internally generated request object containing the environment variables.
0776         site(str): Site name (e.g. ATLAS site name, not PanDA queue).
0777         host_name(str): Host name. In the case of reporting in format `slot@worker_node.example.com`, the slot ID will be parsed out.
0778         cpu_model(str): CPU model, e.g. `AMD EPYC 7351`.
0779         panda_queue(str, optional): PanDA queue the worker node is associated to. Optional, defaults to `None`.
0780         n_logical_cpus(int, optional): Number of logical CPUs: n_sockets * cores_per_socket * threads_per_core.
0781                              When SMT is enabled, this is the number of threads. Otherwise it is the number of cores. Optional, defaults to `None`.
0782         n_sockets(int, optional): Number of sockets. Optional, defaults to `None`.
0783         cores_per_socket(int, optional): Number of cores per socket. Optional, defaults to `None`.
0784         threads_per_core(int, optional): Number of threads per core. When SMT is disabled, this is 1. Otherwise a number > 1. Optional, defaults to `None`.
0785         cpu_architecture(str, optional): CPU architecture, e.g. `x86_64`. Optional, defaults to `None`.
0786         cpu_architecture_level(str, optional): CPU architecture level, e.g. `x86-64-v3`. Optional, defaults to `None`.
0787         clock_speed(float, optional): Clock speed in MHz. Optional, defaults to `None`.
0788         total_memory(int, optional): Total memory in MB. Optional, defaults to `None`.
0789         total_local_disk(int, optional): Total disk space in GB. Optional, defaults to `None`.
0790         timeout(int, optional): The timeout value. Defaults to 60.
0791 
0792     Returns:
0793         dict: The system response  `{"success": success, "message": message, "data": data}`. True for success, False for failure, and an error message.
0794     """
0795     tmp_logger = LogWrapper(_logger, f"update_worker_node site={site} panda_queue={panda_queue} host_name={host_name} cpu_model={cpu_model}")
0796     tmp_logger.debug("Start")
0797 
0798     cpu_model_normalized = normalize_cpu_model(cpu_model)
0799 
0800     timed_method = TimedMethod(global_task_buffer.update_worker_node, timeout)
0801     timed_method.run(
0802         site,
0803         panda_queue,
0804         host_name,
0805         cpu_model,
0806         cpu_model_normalized,
0807         n_logical_cpus,
0808         n_sockets,
0809         cores_per_socket,
0810         threads_per_core,
0811         cpu_architecture,
0812         cpu_architecture_level,
0813         clock_speed,
0814         total_memory,
0815         total_local_disk,
0816     )
0817 
0818     if timed_method.result == Protocol.TimeOutToken:  # timeout
0819         message = "Updating worker node timed out"
0820         tmp_logger.error(message)
0821         return generate_response(False, message)
0822 
0823     success, message = timed_method.result
0824 
0825     tmp_logger.debug(f"Done")
0826     return generate_response(success, message)
0827 
0828 
0829 @request_validation(_logger, secure=True, production=True, request_method="POST")
0830 def update_worker_node_gpu(
0831     req: PandaRequest,
0832     site: str,
0833     host_name: str,
0834     vendor: str,
0835     model: str,
0836     count: int,
0837     vram: int = None,
0838     architecture: str = None,
0839     framework: str = None,
0840     framework_version: str = None,
0841     driver_version: str = None,
0842     timeout: int = 60,
0843 ):
0844     """
0845     Update GPUs for a worker node
0846 
0847     Updates the GPUs associated to a worker node in the worker node map. When already found, it updates the `last_seen` time. Requires a secure connection and production role.
0848 
0849     API details:
0850         HTTP Method: POST
0851         Path: /v1/pilot/update_worker_node_gpu
0852 
0853     Args:
0854         req(PandaRequest): Internally generated request object containing the environment variables.
0855         site(str): Site name (e.g. ATLAS site name, not PanDA queue).
0856         host_name(str): Host name. In the case of reporting in format `slot@worker_node.example.com`, the slot ID will be parsed out.
0857         vendor(str): GPU vendor, e.g. `NVIDIA`.
0858         model(str): GPU model, e.g. `A100 80GB`.
0859         count(int): Number of GPUs of this type in the worker node.
0860         vram(int, optional): VRAM memory in MB. Defaults to `None`.
0861         architecture(str, optional): GPU architecture, e.g. `Tesla`, `Ampere`... Defaults to `None`.
0862         framework(str, optional): Driver framework available, e.g. `CUDA`. Defaults to `None`.
0863         framework_version(str, optional): Version of the driver framework, e.g. `12.2`. Defaults to `None`.
0864         driver_version(str, optional): Version of the driver, e.g. `575.51.03`. Defaults to `None`
0865         timeout(int, optional): The timeout value. Defaults to `60`.
0866 
0867     Returns:
0868         dict: The system response  `{"success": success, "message": message, "data": data}`. True for success, False for failure, and an error message.
0869     """
0870     tmp_logger = LogWrapper(_logger, f"update_worker_node_gpu site={site} host_name={host_name} vendor={vendor} model={model}")
0871     tmp_logger.debug("Start")
0872 
0873     timed_method = TimedMethod(global_task_buffer.update_worker_node_gpu, timeout)
0874     timed_method.run(
0875         site,
0876         host_name,
0877         vendor,
0878         model,
0879         count,
0880         vram,
0881         architecture,
0882         framework,
0883         framework_version,
0884         driver_version,
0885     )
0886 
0887     if timed_method.result == Protocol.TimeOutToken:  # timeout
0888         message = "Updating worker node GPU timed out"
0889         tmp_logger.error(message)
0890         return generate_response(False, message)
0891 
0892     success, message = timed_method.result
0893 
0894     tmp_logger.debug(f"Done")
0895     return generate_response(success, message)