File indexing completed on 2026-04-10 08:39:00
0001 import datetime
0002 import os
0003 import sys
0004 import time
0005 import traceback
0006 from typing import List
0007
0008 from pandacommon.pandalogger.LogWrapper import LogWrapper
0009 from pandacommon.pandalogger.PandaLogger import PandaLogger
0010 from pandacommon.pandautils.PandaUtils import naive_utcnow
0011
0012 from pandaserver.api.v1.common import (
0013 TimedMethod,
0014 generate_response,
0015 get_dn,
0016 has_production_role,
0017 request_validation,
0018 )
0019 from pandaserver.brokerage.SiteMapper import SiteMapper
0020 from pandaserver.config import panda_config
0021 from pandaserver.dataservice.adder_gen import AdderGen
0022 from pandaserver.jobdispatcher import Protocol
0023 from pandaserver.srvcore import CoreUtils
0024 from pandaserver.srvcore.CoreUtils import normalize_cpu_model
0025 from pandaserver.srvcore.panda_request import PandaRequest
0026 from pandaserver.taskbuffer.TaskBuffer import TaskBuffer
0027
0028 _logger = PandaLogger().getLogger("api_pilot")
0029 pilot_logger = PandaLogger().getLogger("PilotRequests")
0030
0031
0032 global_task_buffer = None
0033 global_site_mapper_cache = None
0034
0035 VALID_JOB_STATES = ["running", "failed", "finished", "holding", "starting", "transferring"]
0036
0037
0038 def init_task_buffer(task_buffer: TaskBuffer) -> None:
0039 """
0040 Initialize the task buffer. This method needs to be called before any other method in this module.
0041 """
0042 global global_task_buffer
0043 global_task_buffer = task_buffer
0044
0045 global global_site_mapper_cache
0046 global_site_mapper_cache = CoreUtils.CachedObject("site_mapper", 60 * 10, _get_site_mapper, _logger)
0047
0048
0049 def _get_site_mapper():
0050 return True, SiteMapper(global_task_buffer)
0051
0052
0053 @request_validation(_logger, secure=True, request_method="POST")
0054 def acquire_jobs(
0055 req: PandaRequest,
0056 site_name: str,
0057 timeout: int = 60,
0058 memory: int = None,
0059 disk_space: int = None,
0060 prod_source_label: str = None,
0061 node: str = None,
0062 computing_element: str = None,
0063 prod_user_id: str = None,
0064 get_proxy_key: str = None,
0065 task_id: int = None,
0066 n_jobs: int = 1,
0067 background: bool = None,
0068 resource_type: str = None,
0069 harvester_id: str = None,
0070 worker_id: int = None,
0071 scheduler_id: str = None,
0072 job_type: str = None,
0073 via_topic: bool = None,
0074 remaining_time=None,
0075 ) -> dict:
0076 """
0077 Acquire jobs
0078
0079 Acquire jobs for the pilot. The jobs are reserved, the job status is updated and the jobs are returned. Requires a secure connection.
0080
0081 API details:
0082 HTTP Method: POST
0083 Path: /v1/pilot/acquire_jobs
0084
0085 Args:
0086 req(PandaRequest): Internally generated request object containing the environment variables.
0087 site_name(str): The PanDA queue name
0088 timeout(int, optional): Request timeout in seconds. Optional and defaults to 60.
0089 memory(int, optional): Memory limit for the job. Optional and defaults to `None`.
0090 disk_space(int, optional): Disk space limit for the job. Optional and defaults to `None`.
0091 prod_source_label(str, optional): Prodsourcelabel, e.g. `user`, `managed`, `unified`... Optional and defaults to `None`.
0092 node(str, optional): Identifier of the worker node/slot. Optional and defaults to `None`.
0093 computing_element(str, optional): Computing element. Optional and defaults to `None`.
0094 prod_user_id(str, optional): User ID of the job. Optional and defaults to `None`.
0095 get_proxy_key(bool, optional): Flag to request a proxy key.Optional and defaults to `None`.
0096 task_id(int, optional): JEDI task ID of the job. Optional and defaults to `None`.
0097 n_jobs(int, optional): Number of jobs for bulk requests. Optional and defaults to `None`.
0098 background(bool, optional): Background flag. Optional and defaults to `None`.
0099 resource_type(str, optional): Resource type of the job, e.g. `SCORE`, `MCORE`,.... Optional and defaults to `None`.
0100 harvester_id(str, optional): Harvester ID, used to update the worker entry in the DB. Optional and defaults to `None`.
0101 worker_id(int, optional): Worker ID, used to update the worker entry in the DB. Optional and defaults to `None`.
0102 scheduler_id(str, optional): Scheduler, e.g. harvester ID. Optional and defaults to `None`.
0103 job_type(str, optional): Job type, e.g. `user`, `unified`, ... This is necessary on top of the `prodsourcelabel`
0104 to disambiguate the cases of test jobs that can be production or analysis. Optional and defaults to `None`.
0105 via_topic(bool, optional): Topic for message broker. Optional and defaults to `None`.
0106 remaining_time(int, optional): Remaining walltime. Optional and defaults to `None`.
0107
0108 Returns:
0109 dict: The system response `{"success": success, "message": message, "data": data}`. The data is a list of job dictionaries.
0110 When failed, the message contains the error message.
0111 """
0112
0113 tmp_logger = LogWrapper(_logger, f"acquire_jobs {naive_utcnow().isoformat('/')}")
0114 tmp_logger.debug(f"Start for {site_name}")
0115
0116
0117 real_dn = get_dn(req)
0118
0119
0120 is_production_manager = has_production_role(req)
0121
0122
0123 if not is_production_manager and prod_source_label not in ["user"]:
0124 tmp_logger.warning("invalid role")
0125 tmp_msg = "no production/pilot role in VOMS FQANs or non pilot owner"
0126 return generate_response(False, message=tmp_msg)
0127
0128
0129 if not is_production_manager:
0130 prod_user_id = real_dn
0131
0132
0133 if get_proxy_key and is_production_manager:
0134 get_proxy_key = True
0135 else:
0136 get_proxy_key = False
0137
0138
0139 try:
0140 memory = max(0, memory)
0141 except (ValueError, TypeError):
0142 memory = 0
0143
0144
0145 try:
0146 disk_space = max(0, disk_space)
0147 except (ValueError, TypeError):
0148 disk_space = 0
0149
0150
0151 try:
0152 remaining_time = max(0, remaining_time)
0153 except (ValueError, TypeError):
0154 remaining_time = 0
0155
0156 tmp_logger.debug(
0157 f"{site_name}, n_jobs={n_jobs}, memory={memory}, disk={disk_space}, source_label={prod_source_label}, "
0158 f"node={node}, ce={computing_element}, user={prod_user_id}, proxy={get_proxy_key}, "
0159 f"task_id={task_id}, DN={real_dn}, role={is_production_manager}, "
0160 f"bg={background}, rt={resource_type}, harvester_id={harvester_id}, worker_id={worker_id}, "
0161 f"scheduler_id={scheduler_id}, job_type={job_type}, via_topic={via_topic} remaining_time={remaining_time}"
0162 )
0163
0164
0165 for i in range(n_jobs):
0166 slot_suffix = f"_{i}" if n_jobs > 1 else ""
0167 pilot_logger.info(f"method=getJob,site={site_name},node={node}{slot_suffix},type={prod_source_label}")
0168
0169 t_start = time.time()
0170
0171 global_site_mapper_cache.update()
0172 is_grandly_unified = global_site_mapper_cache.cachedObj.getSite(site_name).is_grandly_unified()
0173 in_test_status = global_site_mapper_cache.cachedObj.getSite(site_name).status == "test"
0174
0175
0176 if in_test_status and prod_source_label in ["user", "managed", "unified"]:
0177 new_label = "test"
0178 tmp_logger.debug(f"prod_source_label changed from {prod_source_label} to {new_label}")
0179 prod_source_label = new_label
0180
0181
0182 timed_method = TimedMethod(global_task_buffer.getJobs, timeout)
0183 timed_method.run(
0184 n_jobs,
0185 site_name,
0186 prod_source_label,
0187 memory,
0188 disk_space,
0189 node,
0190 timeout,
0191 computing_element,
0192 prod_user_id,
0193 task_id,
0194 background,
0195 resource_type,
0196 harvester_id,
0197 worker_id,
0198 scheduler_id,
0199 job_type,
0200 is_grandly_unified,
0201 via_topic,
0202 remaining_time,
0203 )
0204
0205
0206 if timed_method.result == Protocol.TimeOutToken:
0207 message = "Timed out"
0208 tmp_logger.debug(message)
0209 return generate_response(False, message=message)
0210
0211
0212 jobs = []
0213 if isinstance(timed_method.result, list):
0214 result = timed_method.result
0215 secrets_map = result.pop()
0216 proxy_key = result[-1]
0217 n_sent = result[-2]
0218 jobs = result[:-2]
0219
0220 if not jobs:
0221 message = "No jobs in PanDA"
0222 tmp_logger.debug(message)
0223 pilot_logger.info(f"method=noJob,site={site_name},node={node},type={prod_source_label}")
0224 return generate_response(False, message=message)
0225
0226
0227 response_list = []
0228 for tmp_job in jobs:
0229 try:
0230
0231 response = Protocol.Response(Protocol.SC_Success)
0232 response.appendJob(tmp_job, global_site_mapper_cache)
0233 except Exception as e:
0234 tmp_msg = f"failed to get jobs with {str(e)}"
0235 tmp_logger.error(f"{tmp_msg}\n{traceback.format_exc()}")
0236 raise
0237
0238
0239 response.appendNode("nSent", n_sent)
0240
0241
0242 if get_proxy_key:
0243 response.setProxyKey(proxy_key)
0244
0245
0246 if tmp_job.use_secrets() and tmp_job.prodUserName in secrets_map and secrets_map[tmp_job.prodUserName]:
0247 response.appendNode("secrets", secrets_map[tmp_job.prodUserName])
0248
0249
0250 pilot_secrets = secrets_map.get(panda_config.pilot_secrets, None)
0251 if pilot_secrets:
0252 response.appendNode("pilotSecrets", secrets_map[panda_config.pilot_secrets])
0253
0254 response_list.append(response.data)
0255
0256
0257 if n_jobs is not None:
0258 try:
0259 response = Protocol.Response(Protocol.SC_Success)
0260 response.appendNode("jobs", response_list)
0261 except Exception as e:
0262 tmp_msg = f"Failed to make response with {str(e)}"
0263 tmp_logger.error(f"{tmp_msg}\n{traceback.format_exc()}")
0264 raise
0265
0266 tmp_logger.debug(f"Done for {site_name} {node}")
0267
0268 t_end = time.time()
0269 t_delta = t_end - t_start
0270 tmp_logger.info(f"site_name={site_name} took timing={t_delta}s in_test={in_test_status}")
0271 return generate_response(True, data=response.data)
0272
0273
0274 @request_validation(_logger, secure=True, request_method="GET")
0275 def get_job_status(req: PandaRequest, job_ids: List[int], timeout: int = 60) -> dict:
0276 """
0277 Get job status
0278
0279 Gets the status for a list of jobs. Requires a secure connection.
0280
0281 API details:
0282 HTTP Method: GET
0283 Path: /v1/pilot/get_job_status
0284
0285 Args:
0286 req(PandaRequest): Internally generated request object containing the environment variables.
0287 job_ids(list): list of job IDs.
0288 timeout(int, optional): The timeout value. Defaults to 60.
0289
0290 Returns:
0291 dict: The system response `{"success": success, "message": message, "data": data}`. The data is a list of job status dictionaries, in the format
0292 ```
0293 [{"job_id": <job_id_requested>, "status": "not found", "attempt_number": 0}), {"job_id": <job_id>, "status": <status>, "attempt_number": <attempt_nr>}]
0294 ```
0295 """
0296
0297 tmp_logger = LogWrapper(_logger, f"get_job_status {job_ids}")
0298 tmp_logger.debug("Start")
0299
0300
0301 timed_method = TimedMethod(global_task_buffer.peekJobs, timeout)
0302 timed_method.run(job_ids, fromDefined=False, fromActive=True, fromArchived=True, forAnal=False, use_json=False)
0303
0304
0305 if timed_method.result == Protocol.TimeOutToken:
0306 message = "Timed out"
0307 tmp_logger.debug(message)
0308 return generate_response(False, message=message)
0309
0310 if not isinstance(timed_method.result, list):
0311 message = "Database error"
0312 tmp_logger.debug(message)
0313 return generate_response(False, message=message)
0314
0315 job_status_list = []
0316 for job, job_id_requested in zip(timed_method.result, job_ids):
0317 if not job:
0318 job_status_list.append({"job_id": job_id_requested, "status": "not found", "attempt_number": 0})
0319 else:
0320 job_status_list.append({"job_id": job.PandaID, "status": job.jobStatus, "attempt_number": job.attemptNr})
0321
0322 tmp_logger.debug(f"Done: job_status_list={job_status_list}")
0323
0324 return generate_response(True, data=job_status_list)
0325
0326
0327 @request_validation(_logger, secure=True, production=True, request_method="POST")
0328 def update_job(
0329 req: PandaRequest,
0330 job_id: int,
0331 job_status: str,
0332 job_output_report: str = "",
0333 node: str = None,
0334 cpu_consumption_time: int = None,
0335 cpu_consumption_unit: str = None,
0336 scheduler_id: str = None,
0337 pilot_id: str = None,
0338 site_name: str = None,
0339 pilot_log: str = "",
0340 meta_data: str = "",
0341 cpu_conversion_factor: float | int = None,
0342 trans_exit_code: int = None,
0343 pilot_error_code: int = None,
0344 pilot_error_diag: str = None,
0345 exe_error_code: int = None,
0346 exe_error_diag: str = None,
0347 pilot_timing: str = None,
0348 start_time: str = None,
0349 end_time: str = None,
0350 n_events: int = None,
0351 n_input_files: int = None,
0352 batch_id: str = None,
0353 attempt_nr: int = None,
0354 job_metrics: str = None,
0355 stdout: str = "",
0356 job_sub_status: str = None,
0357 core_count: int = None,
0358 max_rss: int | float = None,
0359 max_vmem: int | float = None,
0360 max_swap: int | float = None,
0361 max_pss: int | float = None,
0362 avg_rss: int | float = None,
0363 avg_vmem: int | float = None,
0364 avg_swap: int | float = None,
0365 avg_pss: int | float = None,
0366 tot_rchar: int | float = None,
0367 tot_wchar: int | float = None,
0368 tot_rbytes: int | float = None,
0369 tot_wbytes: int | float = None,
0370 rate_rchar: int | float = None,
0371 rate_wchar: int | float = None,
0372 rate_rbytes: int | float = None,
0373 rate_wbytes: int | float = None,
0374 corrupted_files: str = None,
0375 mean_core_count: int = None,
0376 cpu_architecture_level: str = None,
0377 grid: str = None,
0378 source_site: str = None,
0379 destination_site: str = None,
0380 timeout: int = 60,
0381 ):
0382 """
0383 Update job
0384
0385 Updates the details for a job, stores the metadata and excerpts from the pilot log. Requires a secure connection and production role.
0386
0387 API details:
0388 HTTP Method: POST
0389 Path: /v1/pilot/update_job
0390
0391 Args:
0392 req(PandaRequest): internally generated request object containing the env variables
0393 job_id (int): PanDA job ID
0394 job_status(str, optional): Job status
0395 job_sub_status(str, optional): Job sub status. Optional, defaults to `None`
0396 start_time(str, optional): Job start time in format `"%Y-%m-%d %H:%M:%S"`. Optional, defaults to `None`
0397 end_time(str, optional): Job end time in format `"%Y-%m-%d %H:%M:%S"`. Optional, defaults to `None`
0398 pilot_timing(str, optional): String with pilot timings. Optional, defaults to `None`
0399 site_name(str, optional): PanDA queue name. Optional, defaults to `None`
0400 node(str, optional): Identifier for worker node/slot. Optional, defaults to `None`
0401 scheduler_id(str, optional): Scheduler ID, such as harvester instance. Optional, defaults to `None`
0402 pilot_id(str, optional): Pilot ID. Optional, defaults to `None`
0403 batch_id(str, optional): Batch ID. Optional, defaults to `None`
0404 trans_exit_code(int, optional): Transformation exit code. Optional, defaults to `None`
0405 pilot_error_code(int, optional): Pilot error code. Optional, defaults to `None`
0406 pilot_error_diag(str, optional): Pilot error message. Optional, defaults to `None`
0407 exe_error_code(int, optional): Execution error code. Optional, defaults to `None`
0408 exe_error_diag(str, optional): Execution error message. Optional, defaults to `None`
0409 n_events(int, optional): Number of events. Optional, defaults to `None`
0410 n_input_files(int, optional): Number of input files. Optional, defaults to `None`
0411 attempt_nr(int, optional): Job attempt number. Optional, defaults to `None`
0412 cpu_consumption_time(int, optional): CPU consumption time. Optional, defaults to `None`
0413 cpu_consumption_unit(str, optional): CPU consumption unit, being used for updating some CPU details. Optional, defaults to `None`
0414 cpu_conversion_factor(float, optional): CPU conversion factor. Optional defaults to `None`
0415 core_count(int, optional): Number of cores of the job. Optional, defaults to `None`
0416 mean_core_count(int, optional): Mean core count. Optional, defaults to `None`
0417 max_rss(int, optional): Measured max RSS memory. Optional, defaults to `None`
0418 max_vmem(int, optional): Measured max Virtual memory. Optional, defaults to `None`
0419 max_swap(int, optional): Measured max swap memory. Optional, defaults to `None`
0420 max_pss(int, optional): Measured max PSS memory. Optional, defaults to `None`
0421 avg_rss(int, optional): Measured average RSS. Optional, defaults to `None`
0422 avg_vmem(int, optional): Measured average Virtual memory.Optional, defaults to `None`
0423 avg_swap(int, optional): Measured average swap memory. Optional, defaults to `None`
0424 avg_pss(int, optional): Measured average PSS. Optional, defaults to `None`
0425 tot_rchar(int, optional): Measured total read characters. Optional, defaults to `None`
0426 tot_wchar(int, optional): Measured total written characters. Optional, defaults to `None`
0427 tot_rbytes(int, optional): Measured total read bytes. Optional, defaults to `None`
0428 tot_wbytes(int, optional): Measured total written bytes. Optional, defaults to `None`
0429 rate_rchar(int, optional): Measured rate for read characters. Optional, defaults to `None`
0430 rate_wchar(int, optional): Measured rate for written characters. Optional, defaults to `None`
0431 rate_rbytes(int, optional): Measured rate for read bytes. Optional, defaults to `None`
0432 rate_wbytes(int, optional): Measured rate for written bytes. Optional, defaults to `None`
0433 corrupted_files(str, optional): List of corrupted files in comma separated format. Optional, defaults to `None`
0434 cpu_architecture_level(str, optional): CPU architecture level (e.g. `x86_64-v3`). Optional, defaults to `None`
0435 grid(str, optional): Grid type. Optional, defaults to `None`
0436 source_site(str, optional): Source site name. Optional, defaults to `None`
0437 destination_site(str, optional): Destination site name. Optional, defaults to `None`
0438 job_metrics(str, optional): Job metrics. Optional, defaults to `None`
0439 job_output_report(str, optional): Job output report. Optional, defaults to `""`
0440 pilot_log(str, optional): Pilot log excerpt. Optional, defaults to `""`
0441 meta_data(str, optional): Job metadata. Optional, defaults to `""`
0442 stdout(str, optional): Standard output. Optional, defaults to `""`
0443 timeout(int, optional): Timeout for the operation in seconds. Optional, defaults to 60
0444
0445 Returns:
0446 dict: The system response `{"success": success, "message": message, "data": data}`. Data will contain a dictionary with the pilot secrets and the command to the pilot.
0447 ```
0448 {"pilotSecrets": <pilot_secrets>, "command": <command>}
0449 ```
0450 """
0451 tmp_logger = LogWrapper(_logger, f"update_job PandaID={job_id} PID={os.getpid()}")
0452 tmp_logger.debug("Start")
0453
0454 _logger.debug(
0455 f"update_job({job_id}, {job_status}, {trans_exit_code}, {pilot_error_code}, {pilot_error_diag}, {node},"
0456 f"cpu_consumption_time={cpu_consumption_time}, {cpu_consumption_unit}, {cpu_architecture_level}, "
0457 f"{scheduler_id}, {pilot_id}, {site_name}, {n_events}, {n_input_files}, {cpu_conversion_factor}, "
0458 f"{exe_error_code}, {exe_error_diag}, {pilot_timing}, {start_time}, {end_time}, {batch_id}, "
0459 f"attempt_nr:{attempt_nr}, job_sub_status:{job_sub_status}, core_count:{core_count}, "
0460 f"max_rss={max_rss}, max_vmem={max_vmem}, max_swap={max_swap}, "
0461 f"max_pss={max_pss}, avg_rss={avg_rss}, avg_vmem={avg_vmem}, avg_swap={avg_swap}, avg_pss={avg_pss}, "
0462 f"tot_rchar={tot_rchar}, tot_wchar={tot_wchar}, tot_rbytes={tot_rbytes}, tot_wbytes={tot_wbytes}, rate_rchar={rate_rchar}, "
0463 f"rate_wchar={rate_wchar}, rate_rbytes={rate_rbytes}, rate_wbytes={rate_wbytes}, mean_core_count={mean_core_count}, "
0464 f"grid={grid}, source_site={source_site}, destination_site={destination_site}, "
0465 f"corrupted_files={corrupted_files}\n==job_output_report==\n{job_output_report}\n==LOG==\n{pilot_log[:1024]}\n==Meta==\n{meta_data[:1024]}\n"
0466 f"==Metrics==\n{job_metrics}\n==stdout==\n{stdout})"
0467 )
0468
0469 pilot_logger.debug(f"method=updateJob,site={site_name},node={node},type=None")
0470
0471
0472 if job_id == "NULL":
0473 response = Protocol.Response(Protocol.SC_Invalid)
0474 return generate_response(success=False, message="job_id is NULL", data=response.data)
0475
0476
0477 if job_status not in VALID_JOB_STATES:
0478 message = f"Invalid job status: {job_status}"
0479 tmp_logger.warning(message)
0480 response = Protocol.Response(Protocol.SC_Invalid)
0481 return generate_response(success=False, message=message, data=response.data)
0482
0483
0484 param = {}
0485 fields = [
0486 ("cpuConsumptionTime", cpu_consumption_time, int),
0487 ("cpuConsumptionUnit", cpu_consumption_unit, str),
0488 ("cpu_architecture_level", cpu_architecture_level, lambda x: str(x)[:20]),
0489 ("modificationHost", node, lambda x: str(x)[:128]),
0490 ("transExitCode", trans_exit_code, int),
0491 ("pilotErrorCode", pilot_error_code, int),
0492 ("pilotErrorDiag", pilot_error_diag, lambda x: str(x)[:500]),
0493 ("jobMetrics", job_metrics, lambda x: str(x)[:500]),
0494 ("schedulerID", scheduler_id, str),
0495 ("pilotID", pilot_id, lambda x: str(x)[:200]),
0496 ("batchID", batch_id, lambda x: str(x)[:80]),
0497 ("exeErrorCode", exe_error_code, int),
0498 ("exeErrorDiag", exe_error_diag, lambda x: str(x)[:500]),
0499 ("cpuConversion", cpu_conversion_factor, float),
0500 ("pilotTiming", pilot_timing, str),
0501 ("nEvents", n_events, int),
0502 ("nInputFiles", n_input_files, int),
0503 ("jobSubStatus", job_sub_status, str),
0504 ("actualCoreCount", core_count, int),
0505 ("meanCoreCount", mean_core_count, float),
0506 ("maxRSS", max_rss, int),
0507 ("maxVMEM", max_vmem, int),
0508 ("maxSWAP", max_swap, int),
0509 ("maxPSS", max_pss, int),
0510 ("avgRSS", avg_rss, lambda x: int(float(x))),
0511 ("avgVMEM", avg_vmem, lambda x: int(float(x))),
0512 ("avgSWAP", avg_swap, lambda x: int(float(x))),
0513 ("avgPSS", avg_pss, lambda x: int(float(x))),
0514 ("corruptedFiles", corrupted_files, str),
0515 ("grid", grid, str),
0516 ("sourceSite", source_site, str),
0517 ("destinationSite", destination_site, str),
0518 ]
0519
0520
0521 for key, value, cast in fields:
0522 if value not in [None, ""]:
0523 try:
0524 param[key] = cast(value)
0525 except Exception:
0526 tmp_logger.error(f"Invalid {key}={value} for updateJob")
0527
0528
0529 file_metrics = [
0530 ("totRCHAR", tot_rchar),
0531 ("totWCHAR", tot_wchar),
0532 ("totRBYTES", tot_rbytes),
0533 ("totWBYTES", tot_wbytes),
0534 ("rateRCHAR", rate_rchar),
0535 ("rateWCHAR", rate_wchar),
0536 ("rateRBYTES", rate_rbytes),
0537 ("rateWBYTES", rate_wbytes),
0538 ]
0539
0540 for key, value in file_metrics:
0541 if value is not None:
0542 try:
0543 value = int(value) / 1024
0544 param[key] = min(10**10 - 1, value)
0545 except Exception:
0546 tmp_logger.error(f"Invalid {key}={value} for updateJob")
0547
0548
0549 for key, value in [("startTime", start_time), ("endTime", end_time)]:
0550 if value is not None:
0551 try:
0552 param[key] = datetime.datetime(*time.strptime(value, "%Y-%m-%d %H:%M:%S")[:6])
0553 except Exception:
0554 tmp_logger.error(f"Invalid {key}={value} for updateJob")
0555
0556
0557 if attempt_nr is not None:
0558 try:
0559 attempt_nr = int(attempt_nr)
0560 except Exception:
0561 attempt_nr = None
0562
0563 tmp_logger.debug("executing")
0564
0565
0566 if pilot_log != "":
0567 tmp_logger.debug("Saving pilot log")
0568 try:
0569 global_task_buffer.storePilotLog(int(job_id), pilot_log)
0570 tmp_logger.debug("Saving pilot log DONE")
0571 except Exception:
0572 tmp_logger.debug("Saving pilot log FAILED")
0573
0574
0575 if meta_data != "":
0576 ret = global_task_buffer.addMetadata([job_id], [meta_data], [job_status])
0577 if len(ret) > 0 and not ret[0]:
0578 message = "Failed to add meta_data"
0579 tmp_logger.debug(message)
0580 return generate_response(True, message, data={"StatusCode": Protocol.SC_Failed, "ErrorDiag": message})
0581
0582
0583 if stdout != "":
0584 global_task_buffer.addStdOut(job_id, stdout)
0585
0586
0587 tmp_status = job_status
0588 update_state_change = False
0589 if job_status in ("failed", "finished"):
0590 tmp_status = "holding"
0591 update_state_change = True
0592 param["jobDispatcherErrorDiag"] = None
0593 elif job_status in ("holding", "transferring"):
0594 param["jobDispatcherErrorDiag"] = f"set to {job_status} by the pilot at {naive_utcnow().strftime('%Y-%m-%d %H:%M:%S')}"
0595
0596
0597 timeout = None if job_status == "holding" else timeout
0598 timed_method = TimedMethod(global_task_buffer.updateJobStatus, timeout)
0599 timed_method.run(job_id, tmp_status, param, update_state_change, attempt_nr)
0600
0601
0602 if timed_method.result == Protocol.TimeOutToken:
0603 message = "Timed out"
0604 tmp_logger.error(message)
0605 response = Protocol.Response(Protocol.SC_TimeOut)
0606 return generate_response(True, message=message, data=response.data)
0607
0608
0609 if not timed_method.result:
0610 message = "Database error"
0611 tmp_logger.error(message)
0612 response = Protocol.Response(Protocol.SC_Failed)
0613 return generate_response(True, message=message, data=response.data)
0614
0615
0616 data = {"StatusCode": Protocol.SC_Success}
0617 result = timed_method.result
0618
0619
0620 secrets = result.get("secrets") if isinstance(result, dict) else None
0621 if secrets:
0622 data["pilotSecrets"] = secrets
0623
0624
0625 command = result.get("command") if isinstance(result, dict) else result
0626 data["command"] = command if isinstance(command, str) else None
0627
0628
0629 if job_status in ("failed", "finished") and result not in ("badattemptnr", "alreadydone"):
0630 adder_gen = AdderGen(global_task_buffer, job_id, job_status, attempt_nr)
0631 adder_gen.dump_file_report(job_output_report, attempt_nr)
0632 del adder_gen
0633
0634 tmp_logger.debug(f"Done. data={data}")
0635 return generate_response(True, data=data)
0636
0637
0638 @request_validation(_logger, secure=True, production=True, request_method="POST")
0639 def update_jobs_bulk(req, job_list: List, harvester_id: str = None):
0640 """
0641 Update jobs in bulk
0642
0643 Bulk method to update the details for jobs, store the metadata and excerpt from the pilot log. Internally, this method loops over
0644 the jobs and calls `update_job` for each job. Requires a secure connection and production role.
0645
0646 API details:
0647 HTTP Method: POST
0648 Path: /v1/pilot/update_jobs_bulk
0649
0650 Args:
0651 req(PandaRequest): internally generated request object containing the env variables
0652 job_list(list): list of job dictionaries to update. The mandatory and optional keys for each job dictionary are the same as the arguments for `update_job`.
0653 harvester_id (str, optional): Harvester ID. Optional, defaults to `None`.
0654
0655 Returns:
0656 dict: The system response `{"success": success, "message": message, "data": data}`. Data will contain a dictionary with the pilot secrets and the command to the pilot.
0657 ```
0658 {"pilotSecrets": <pilot_secrets>, "command": <command>}
0659 ```
0660 """
0661 tmp_logger = LogWrapper(_logger, f"update_jobs_bulk harvester_id={harvester_id}")
0662 tmp_logger.debug("Start")
0663 t_start = naive_utcnow()
0664
0665 success = False
0666 message = ""
0667 data = []
0668
0669 try:
0670 for job_dict in job_list:
0671 job_id = job_dict["job_id"]
0672 del job_dict["job_id"]
0673
0674 status = job_dict["job_status"]
0675 del job_dict["job_status"]
0676
0677 if "meta_data" in job_dict:
0678 job_dict["meta_data"] = str(job_dict["meta_data"])
0679
0680 tmp_ret = update_job(req, job_id, status, **job_dict)
0681 data.append(tmp_ret)
0682 success = True
0683 except Exception:
0684 err_type, err_value = sys.exc_info()[:2]
0685 message = f"failed with {err_type.__name__} {err_value}"
0686 data = []
0687 tmp_logger.error(f"{message}\n{traceback.format_exc()}")
0688
0689 t_delta = naive_utcnow() - t_start
0690 tmp_logger.debug(f"Done. Took {t_delta.seconds}.{t_delta.microseconds // 1000:03d} sec")
0691 return generate_response(success, message, data)
0692
0693
0694 @request_validation(_logger, secure=True, production=True, request_method="POST")
0695 def update_worker_status(req: PandaRequest, worker_id, harvester_id, status, timeout=60, node_id=None):
0696 """
0697 Update worker status
0698
0699 Updates the status of a worker with the information seen by the pilot. Requires a secure connection and production role.
0700
0701 API details:
0702 HTTP Method: POST
0703 Path: /v1/pilot/update_worker_status
0704
0705 Args:
0706 req(PandaRequest): Internally generated request object containing the environment variables.
0707 worker_id (str): The worker ID.
0708 harvester_id (str): The harvester ID.
0709 status (str): The status of the worker. Must be either 'started' or 'finished'.
0710 timeout (int, optional): The timeout value. Defaults to 60.
0711 node_id (str, optional): The node ID. Defaults to None.
0712
0713 Returns:
0714 str: The result of the status update or an error message.
0715 """
0716 tmp_logger = LogWrapper(
0717 _logger,
0718 f"updateWorkerPilotStatus worker_id={worker_id} harvester_id={harvester_id} status={status} node_id={node_id} PID={os.getpid()}",
0719 )
0720 tmp_logger.debug("Start")
0721
0722
0723 valid_worker_states = ("started", "finished")
0724 if status not in valid_worker_states:
0725 message = f"Invalid worker state. The worker state has to be in {valid_worker_states}"
0726 tmp_logger.debug(message)
0727 return generate_response(False, message)
0728
0729 timed_method = TimedMethod(global_task_buffer.updateWorkerPilotStatus, timeout)
0730 timed_method.run(worker_id, harvester_id, status, node_id)
0731
0732
0733 if not timed_method.result:
0734 message = "Failed to update worker status"
0735 tmp_logger.error(message)
0736 return generate_response(False, message)
0737
0738 if timed_method.result == Protocol.TimeOutToken:
0739 message = "Updating worker status timed out"
0740 tmp_logger.error(message)
0741 return generate_response(False, message)
0742
0743 tmp_logger.debug(f"Done")
0744 return generate_response(True)
0745
0746
0747 @request_validation(_logger, secure=True, production=True, request_method="POST")
0748 def update_worker_node(
0749 req: PandaRequest,
0750 site: str,
0751 host_name: str,
0752 cpu_model: str,
0753 panda_queue: str = None,
0754 n_logical_cpus: int = None,
0755 n_sockets: int = None,
0756 cores_per_socket: int = None,
0757 threads_per_core: int = None,
0758 cpu_architecture: str = None,
0759 cpu_architecture_level: str = None,
0760 clock_speed: float = None,
0761 total_memory: int = None,
0762 total_local_disk: int = None,
0763 timeout: int = 60,
0764 ):
0765 """
0766 Update worker node
0767
0768 Updates a worker node in the worker node map. When already found, it updates the `last_seen` time. When not found, it adds the worker node. Requires a secure connection and production role.
0769
0770 API details:
0771 HTTP Method: POST
0772 Path: /v1/pilot/update_worker_node
0773
0774 Args:
0775 req(PandaRequest): Internally generated request object containing the environment variables.
0776 site(str): Site name (e.g. ATLAS site name, not PanDA queue).
0777 host_name(str): Host name. In the case of reporting in format `slot@worker_node.example.com`, the slot ID will be parsed out.
0778 cpu_model(str): CPU model, e.g. `AMD EPYC 7351`.
0779 panda_queue(str, optional): PanDA queue the worker node is associated to. Optional, defaults to `None`.
0780 n_logical_cpus(int, optional): Number of logical CPUs: n_sockets * cores_per_socket * threads_per_core.
0781 When SMT is enabled, this is the number of threads. Otherwise it is the number of cores. Optional, defaults to `None`.
0782 n_sockets(int, optional): Number of sockets. Optional, defaults to `None`.
0783 cores_per_socket(int, optional): Number of cores per socket. Optional, defaults to `None`.
0784 threads_per_core(int, optional): Number of threads per core. When SMT is disabled, this is 1. Otherwise a number > 1. Optional, defaults to `None`.
0785 cpu_architecture(str, optional): CPU architecture, e.g. `x86_64`. Optional, defaults to `None`.
0786 cpu_architecture_level(str, optional): CPU architecture level, e.g. `x86-64-v3`. Optional, defaults to `None`.
0787 clock_speed(float, optional): Clock speed in MHz. Optional, defaults to `None`.
0788 total_memory(int, optional): Total memory in MB. Optional, defaults to `None`.
0789 total_local_disk(int, optional): Total disk space in GB. Optional, defaults to `None`.
0790 timeout(int, optional): The timeout value. Defaults to 60.
0791
0792 Returns:
0793 dict: The system response `{"success": success, "message": message, "data": data}`. True for success, False for failure, and an error message.
0794 """
0795 tmp_logger = LogWrapper(_logger, f"update_worker_node site={site} panda_queue={panda_queue} host_name={host_name} cpu_model={cpu_model}")
0796 tmp_logger.debug("Start")
0797
0798 cpu_model_normalized = normalize_cpu_model(cpu_model)
0799
0800 timed_method = TimedMethod(global_task_buffer.update_worker_node, timeout)
0801 timed_method.run(
0802 site,
0803 panda_queue,
0804 host_name,
0805 cpu_model,
0806 cpu_model_normalized,
0807 n_logical_cpus,
0808 n_sockets,
0809 cores_per_socket,
0810 threads_per_core,
0811 cpu_architecture,
0812 cpu_architecture_level,
0813 clock_speed,
0814 total_memory,
0815 total_local_disk,
0816 )
0817
0818 if timed_method.result == Protocol.TimeOutToken:
0819 message = "Updating worker node timed out"
0820 tmp_logger.error(message)
0821 return generate_response(False, message)
0822
0823 success, message = timed_method.result
0824
0825 tmp_logger.debug(f"Done")
0826 return generate_response(success, message)
0827
0828
0829 @request_validation(_logger, secure=True, production=True, request_method="POST")
0830 def update_worker_node_gpu(
0831 req: PandaRequest,
0832 site: str,
0833 host_name: str,
0834 vendor: str,
0835 model: str,
0836 count: int,
0837 vram: int = None,
0838 architecture: str = None,
0839 framework: str = None,
0840 framework_version: str = None,
0841 driver_version: str = None,
0842 timeout: int = 60,
0843 ):
0844 """
0845 Update GPUs for a worker node
0846
0847 Updates the GPUs associated to a worker node in the worker node map. When already found, it updates the `last_seen` time. Requires a secure connection and production role.
0848
0849 API details:
0850 HTTP Method: POST
0851 Path: /v1/pilot/update_worker_node_gpu
0852
0853 Args:
0854 req(PandaRequest): Internally generated request object containing the environment variables.
0855 site(str): Site name (e.g. ATLAS site name, not PanDA queue).
0856 host_name(str): Host name. In the case of reporting in format `slot@worker_node.example.com`, the slot ID will be parsed out.
0857 vendor(str): GPU vendor, e.g. `NVIDIA`.
0858 model(str): GPU model, e.g. `A100 80GB`.
0859 count(int): Number of GPUs of this type in the worker node.
0860 vram(int, optional): VRAM memory in MB. Defaults to `None`.
0861 architecture(str, optional): GPU architecture, e.g. `Tesla`, `Ampere`... Defaults to `None`.
0862 framework(str, optional): Driver framework available, e.g. `CUDA`. Defaults to `None`.
0863 framework_version(str, optional): Version of the driver framework, e.g. `12.2`. Defaults to `None`.
0864 driver_version(str, optional): Version of the driver, e.g. `575.51.03`. Defaults to `None`
0865 timeout(int, optional): The timeout value. Defaults to `60`.
0866
0867 Returns:
0868 dict: The system response `{"success": success, "message": message, "data": data}`. True for success, False for failure, and an error message.
0869 """
0870 tmp_logger = LogWrapper(_logger, f"update_worker_node_gpu site={site} host_name={host_name} vendor={vendor} model={model}")
0871 tmp_logger.debug("Start")
0872
0873 timed_method = TimedMethod(global_task_buffer.update_worker_node_gpu, timeout)
0874 timed_method.run(
0875 site,
0876 host_name,
0877 vendor,
0878 model,
0879 count,
0880 vram,
0881 architecture,
0882 framework,
0883 framework_version,
0884 driver_version,
0885 )
0886
0887 if timed_method.result == Protocol.TimeOutToken:
0888 message = "Updating worker node GPU timed out"
0889 tmp_logger.error(message)
0890 return generate_response(False, message)
0891
0892 success, message = timed_method.result
0893
0894 tmp_logger.debug(f"Done")
0895 return generate_response(success, message)