File indexing completed on 2026-04-10 08:39:01
0001 """
0002 General Adder plugin. Add data to dataset
0003
0004 """
0005
0006 import datetime
0007 import json
0008 import re
0009 import sys
0010 import time
0011 import traceback
0012
0013 from pandacommon.pandalogger.LogWrapper import LogWrapper
0014 from pandacommon.pandalogger.PandaLogger import PandaLogger
0015 from pandacommon.pandautils.PandaUtils import naive_utcnow
0016
0017 import pandaserver.dataservice.ErrorCode
0018 import pandaserver.taskbuffer.ErrorCode
0019 from pandaserver.config import panda_config
0020 from pandaserver.dataservice import closer
0021 from pandaserver.srvcore.CoreUtils import normalize_cpu_model
0022 from pandaserver.taskbuffer import EventServiceUtils, JobUtils, retryModule
0023
0024 _logger = PandaLogger().getLogger("adder")
0025
0026 panda_config.setupPlugin()
0027
0028
0029 class AdderGen:
0030 """
0031 General Adder plugin.
0032 """
0033
0034
0035 def __init__(
0036 self,
0037 taskBuffer,
0038 job_id,
0039 job_status,
0040 attempt_nr,
0041 ignore_tmp_error=True,
0042 siteMapper=None,
0043 pid=None,
0044 prelock_pid=None,
0045 lock_offset=10,
0046 lock_pool=None,
0047 ) -> None:
0048 """
0049 Initialize the AdderGen.
0050
0051 :param job: The job object.
0052 :param params: Additional parameters.
0053 """
0054 self.job = None
0055 self.job_id = job_id
0056 self.job_status = job_status
0057 self.taskBuffer = taskBuffer
0058 self.ignore_tmp_error = ignore_tmp_error
0059 self.lock_offset = lock_offset
0060 self.siteMapper = siteMapper
0061 self.dataset_map = {}
0062 self.extra_info = {
0063 "surl": {},
0064 "nevents": {},
0065 "lbnr": {},
0066 "endpoint": {},
0067 "guid": {},
0068 }
0069 self.attempt_nr = attempt_nr
0070 self.pid = pid
0071 self.prelock_pid = prelock_pid
0072 self.data = None
0073 self.lock_pool = lock_pool
0074 self.report_dict = None
0075 self.adder_plugin = None
0076 self.add_result = None
0077 self.adder_plugin_class = None
0078
0079 self.logger = LogWrapper(_logger, str(self.job_id))
0080
0081
0082 def run(self):
0083 """
0084 Run the AdderGen plugin.
0085 """
0086 try:
0087 start_time = naive_utcnow()
0088 self.logger.debug(f"new start: {self.job_status} attemptNr={self.attempt_nr}")
0089
0090
0091 self.get_report()
0092
0093
0094 self.job = self.taskBuffer.peekJobs([self.job_id], fromDefined=False, fromWaiting=False, fromArchived=False, forAnal=True)[0]
0095
0096
0097 processed = self.process_job_report()
0098 self.logger.debug(f"job report was successfully processed and can be deleted now: {processed}")
0099
0100 duration = naive_utcnow() - start_time
0101 self.logger.debug("end: took %s.%03d sec in total" % (duration.seconds, duration.microseconds / 1000))
0102
0103
0104 if processed:
0105 self.taskBuffer.deleteJobOutputReport(panda_id=self.job_id, attempt_nr=self.attempt_nr)
0106
0107 del self.data
0108 del self.report_dict
0109 except Exception as e:
0110 err_str = f"{str(e)} {traceback.format_exc()}"
0111 self.logger.error(err_str)
0112 duration = naive_utcnow() - start_time
0113 self.logger.error("except: took %s.%03d sec in total" % (duration.seconds, duration.microseconds / 1000))
0114
0115 self.taskBuffer.unlockJobOutputReport(
0116 panda_id=self.job_id,
0117 attempt_nr=self.attempt_nr,
0118 pid=self.pid,
0119 lock_offset=self.lock_offset,
0120 )
0121
0122
0123 def dump_file_report(self, file_catalog, attempt_nr):
0124 """
0125 Dump the file report.
0126
0127 :param file_catalog: The file catalog.
0128 :param attempt_nr: The attempt number.
0129 """
0130 self.logger.debug("dump file report")
0131
0132 attempt_nr = 0 if attempt_nr is None else attempt_nr
0133 if self.job is None:
0134 self.job = self.taskBuffer.peekJobs([self.job_id], fromDefined=False, fromWaiting=False, forAnal=True)[0]
0135 if self.job:
0136 self.taskBuffer.insertJobOutputReport(
0137 panda_id=self.job_id,
0138 prod_source_label=self.job.prodSourceLabel,
0139 job_status=self.job_status,
0140 attempt_nr=attempt_nr,
0141 data=file_catalog,
0142 )
0143
0144
0145 def get_plugin_class(self, tmp_vo, tmp_group):
0146 """
0147 Get the plugin class for the given VO and group.
0148
0149 This method attempts to retrieve a specific plugin class based on the provided
0150 VO (Virtual Organization) and group. If no specific plugin is found, it defaults
0151 to using the ATLAS plugin.
0152
0153 :param tmp_vo: The Virtual Organization identifier.
0154 :param tmp_group: The group identifier.
0155 :return: The plugin class.
0156 """
0157
0158 self.adder_plugin_class = panda_config.getPlugin("adder_plugins", tmp_vo, tmp_group)
0159 if self.adder_plugin_class is None:
0160
0161 from pandaserver.dataservice.adder_atlas_plugin import AdderAtlasPlugin
0162
0163 self.adder_plugin_class = AdderAtlasPlugin
0164 self.logger.debug(f"plugin name {self.adder_plugin_class.__name__}")
0165 return self.adder_plugin_class
0166
0167 def get_report(self) -> None:
0168 """
0169 Get the job output report.
0170 """
0171 self.report_dict = self.taskBuffer.getJobOutputReport(panda_id=self.job_id, attempt_nr=self.attempt_nr)
0172 self.data = self.report_dict.get("data")
0173
0174 def register_event_service_files(self) -> None:
0175 """
0176 Register Event Service (ES) files.
0177 """
0178
0179 self.adder_plugin_class = self.get_plugin_class(self.job.VO, self.job.cloud)
0180 self.adder_plugin = self.adder_plugin_class(
0181 self.job,
0182 taskBuffer=self.taskBuffer,
0183 siteMapper=self.siteMapper,
0184 logger=self.logger,
0185 )
0186 self.logger.debug("plugin is ready for ES file registration")
0187 self.adder_plugin.register_event_service_files()
0188
0189 def check_file_status_in_jedi(self) -> None:
0190 """
0191 Check the file status in JEDI.
0192 """
0193 if not self.job.isCancelled() and self.job.taskBufferErrorCode not in [pandaserver.taskbuffer.ErrorCode.EC_PilotRetried]:
0194 file_check_in_jedi = self.taskBuffer.checkInputFileStatusInJEDI(self.job)
0195 self.logger.debug(f"check file status in JEDI : {file_check_in_jedi}")
0196 if file_check_in_jedi is None:
0197 raise RuntimeError("failed to check file status in JEDI")
0198 if file_check_in_jedi is False:
0199
0200 self.job_status = "failed"
0201 self.job.ddmErrorCode = pandaserver.dataservice.ErrorCode.EC_Adder
0202 err_str = "inconsistent file status between Panda and JEDI. "
0203 err_str += "failed to avoid duplicated processing caused by synchronization failure"
0204 self.job.ddmErrorDiag = err_str
0205 self.logger.debug(f"set jobStatus={self.job_status} since input is inconsistent between Panda and JEDI")
0206 elif self.job.jobSubStatus in ["pilot_closed"]:
0207
0208 self.logger.debug("going to closed since terminated by the pilot")
0209 ret_closed = self.taskBuffer.killJobs([self.job_id], "pilot", "60", True)
0210 if ret_closed[0] is True:
0211 self.logger.debug("end")
0212
0213 self.taskBuffer.deleteJobOutputReport(panda_id=self.job_id, attempt_nr=self.attempt_nr)
0214 return
0215
0216 if EventServiceUtils.isJobCloningJob(self.job) and self.job_status == "finished":
0217
0218 if EventServiceUtils.getJobCloningType(self.job) == "storeonce":
0219 self.taskBuffer.getEventRanges(self.job.PandaID, self.job.jobsetID, self.job.jediTaskID, 1, False, False, None)
0220
0221 check_jc = self.taskBuffer.checkClonedJob(self.job)
0222 if check_jc is None:
0223 raise RuntimeError("failed to check the cloned job")
0224
0225 if check_jc["lock"] is False:
0226 self.job_status = "failed"
0227 self.job.ddmErrorCode = pandaserver.dataservice.ErrorCode.EC_Adder
0228 self.job.ddmErrorDiag = "failed to lock semaphore for job cloning"
0229 self.logger.debug(f"set jobStatus={self.job_status} since did not get semaphore for job cloning")
0230
0231 def execute_plugin(self) -> None:
0232 """
0233 Execute the Adder plugin.
0234 """
0235
0236 try:
0237
0238 self.adder_plugin_class = self.get_plugin_class(self.job.VO, self.job.cloud)
0239 self.adder_plugin = self.adder_plugin_class(
0240 self.job,
0241 taskBuffer=self.taskBuffer,
0242 siteMapper=self.siteMapper,
0243 extra_info=self.extra_info,
0244 logger=self.logger,
0245 )
0246 self.logger.debug("plugin is ready")
0247 self.adder_plugin.execute()
0248 self.add_result = self.adder_plugin.result
0249 self.logger.debug(f"plugin done with {self.add_result.status_code}")
0250 except Exception:
0251 err_type, err_value = sys.exc_info()[:2]
0252 self.logger.error(f"failed to execute AdderPlugin for VO={self.job.VO} with {err_type}:{err_value}")
0253 self.logger.error(f"failed to execute AdderPlugin for VO={self.job.VO} with {traceback.format_exc()}")
0254 self.add_result = None
0255 self.job.ddmErrorCode = pandaserver.dataservice.ErrorCode.EC_Adder
0256 self.job.ddmErrorDiag = "AdderPlugin failure"
0257
0258 def finalize_job_status(self) -> None:
0259 """
0260 Finalize the job status.
0261 """
0262 if self.job.jobStatus == "failed" or self.job_status == "failed":
0263 self.handle_failed_job()
0264 else:
0265
0266 self.job.jobDispatcherErrorCode = 0
0267 self.job.jobDispatcherErrorDiag = "NULL"
0268
0269 if self.add_result is not None and self.add_result.merging_files != []:
0270
0271 for file in self.job.Files:
0272 if file.lfn in self.add_result.merging_files:
0273 file.status = "merging"
0274 self.job.jobStatus = "merging"
0275
0276 self.job.stateChangeTime = time.strftime("%Y-%m-%d %H:%M:%S", time.gmtime())
0277 elif self.add_result is not None and self.add_result.transferring_files != []:
0278
0279 for file in self.job.Files:
0280 if file.lfn in self.add_result.transferring_files:
0281 file.status = "transferring"
0282 self.job.jobStatus = "transferring"
0283 self.job.jobSubStatus = None
0284
0285 self.job.stateChangeTime = time.strftime("%Y-%m-%d %H:%M:%S", time.gmtime())
0286 else:
0287 self.job.jobStatus = "finished"
0288
0289 def handle_failed_job(self) -> None:
0290 """
0291 Handle failed job status.
0292 """
0293
0294 source, error_code, error_diag = None, None, None
0295 errors = []
0296 if self.job.pilotErrorCode:
0297 source = "pilotErrorCode"
0298 error_code = self.job.pilotErrorCode
0299 error_diag = self.job.pilotErrorDiag
0300 errors.append(
0301 {
0302 "source": source,
0303 "error_code": error_code,
0304 "error_diag": error_diag,
0305 }
0306 )
0307 if self.job.exeErrorCode:
0308 source = "exeErrorCode"
0309 error_code = self.job.exeErrorCode
0310 error_diag = self.job.exeErrorDiag
0311 errors.append(
0312 {
0313 "source": source,
0314 "error_code": error_code,
0315 "error_diag": error_diag,
0316 }
0317 )
0318 if self.job.ddmErrorCode:
0319 source = "ddmErrorCode"
0320 error_code = self.job.ddmErrorCode
0321 error_diag = self.job.ddmErrorDiag
0322 errors.append(
0323 {
0324 "source": source,
0325 "error_code": error_code,
0326 "error_diag": error_diag,
0327 }
0328 )
0329 if self.job.transExitCode:
0330 source = "transExitCode"
0331 error_code = self.job.transExitCode
0332 error_diag = ""
0333 errors.append(
0334 {
0335 "source": source,
0336 "error_code": error_code,
0337 "error_diag": error_diag,
0338 }
0339 )
0340
0341 if source and error_code:
0342 try:
0343 self.logger.debug("AdderGen.run will call job_failure_postprocessing")
0344 retryModule.job_failure_postprocessing(
0345 self.taskBuffer,
0346 self.job.PandaID,
0347 errors,
0348 self.job.attemptNr,
0349 )
0350 self.logger.debug("job_failure_postprocessing is back")
0351 except Exception as e:
0352 self.logger.error(f"job_failure_postprocessing excepted and needs to be investigated ({e}): {traceback.format_exc()}")
0353
0354 self.job.jobStatus = "failed"
0355 for file in self.job.Files:
0356 if file.type in ["output", "log"]:
0357 if self.add_result is not None and file.lfn in self.add_result.merging_files:
0358 file.status = "merging"
0359 else:
0360 file.status = "failed"
0361
0362 def process_job_report(self) -> bool:
0363 """
0364 Check the job status, execute the plugin if job status is applicable, and update job in the database.
0365 return: True if successful or unnecessary, False otherwise to retry.
0366 """
0367 if self.job is None:
0368 self.logger.debug("job not found in DB")
0369 return True
0370 if self.job.jobStatus in ["finished", "failed", "unknown", "merging"]:
0371 self.logger.error(f"invalid state -> {self.job.jobStatus}")
0372 return True
0373 if self.attempt_nr is not None and self.job.attemptNr != self.attempt_nr:
0374 self.logger.error(f"wrong attemptNr -> job={self.job.attemptNr} <> {self.attempt_nr}")
0375 return True
0376 if self.job_status == EventServiceUtils.esRegStatus:
0377 self.register_event_service_files()
0378 return True
0379
0380
0381 self.check_file_status_in_jedi()
0382
0383
0384 if self.job.isCancelled():
0385 self.job_status = "failed"
0386
0387 self.job.pilotErrorCode = 0
0388 self.job.exeErrorCode = 0
0389 self.job.ddmErrorCode = 0
0390
0391
0392 old_job_status = self.job.jobStatus
0393
0394
0395 if self.job.jobStatus not in ["transferring"]:
0396 self.job.jobStatus = self.job_status
0397 self.add_result = None
0398 self.adder_plugin = None
0399
0400
0401 parse_result = self.parse_job_output_report()
0402
0403
0404 if parse_result < 2:
0405 self.execute_plugin()
0406
0407
0408 if self.ignore_tmp_error and self.add_result is not None and self.add_result.is_temporary():
0409 self.logger.debug(f"ignore {self.job.ddmErrorDiag} ")
0410 self.logger.debug("escape")
0411
0412 self.taskBuffer.unlockJobOutputReport(
0413 panda_id=self.job_id,
0414 attempt_nr=self.attempt_nr,
0415 pid=self.pid,
0416 lock_offset=self.lock_offset,
0417 )
0418 return False
0419
0420 if self.add_result is None or not self.add_result.is_succeeded():
0421 self.job.jobStatus = "failed"
0422
0423
0424 self.logger.debug(f"status after plugin call :job.jobStatus={self.job.jobStatus} jobStatus={self.job_status}")
0425
0426 self.finalize_job_status()
0427
0428
0429 if self.job.endTime == "NULL":
0430 self.job.endTime = time.strftime("%Y-%m-%d %H:%M:%S", time.gmtime())
0431
0432 self.job.nOutputDataFiles = 0
0433 self.job.outputFileBytes = 0
0434 for tmp_file in self.job.Files:
0435 if tmp_file.type == "output":
0436 self.job.nOutputDataFiles += 1
0437 try:
0438 self.job.outputFileBytes += tmp_file.fsize
0439 except Exception:
0440 pass
0441
0442 max_output_file_bytes = 99999999999
0443 self.job.outputFileBytes = min(self.job.outputFileBytes, max_output_file_bytes)
0444
0445 if self.job.commandToPilot == "tobekilled" and self.job.jobStatus == "failed":
0446 self.job.jobStatus = "cancelled"
0447
0448 if old_job_status in ["cancelled", "closed"]:
0449 pass
0450 else:
0451 self.logger.debug("updating DB")
0452 update_result = self.taskBuffer.updateJobs(
0453 [self.job],
0454 False,
0455 oldJobStatusList=[old_job_status],
0456 extraInfo=self.extra_info,
0457 async_dataset_update=True,
0458 )
0459 self.logger.debug(f"retU: {update_result}")
0460
0461
0462 if not update_result[0]:
0463 self.logger.error(f"failed to update DB for pandaid={self.job.PandaID}")
0464
0465 self.taskBuffer.unlockJobOutputReport(
0466 panda_id=self.job_id,
0467 attempt_nr=self.attempt_nr,
0468 pid=self.pid,
0469 lock_offset=self.lock_offset,
0470 )
0471 return False
0472
0473 try:
0474
0475 self.logger.debug("AdderGen.run will peek the job")
0476 job_tmp = self.taskBuffer.peekJobs(
0477 [self.job.PandaID],
0478 fromDefined=False,
0479 fromArchived=True,
0480 fromWaiting=False,
0481 )[0]
0482 self.logger.debug(
0483 f"status {job_tmp.jobStatus}, taskBufferErrorCode {job_tmp.taskBufferErrorCode}, taskBufferErrorDiag {job_tmp.taskBufferErrorDiag}"
0484 )
0485 if job_tmp.jobStatus == "failed" and job_tmp.taskBufferErrorCode:
0486 source = "taskBufferErrorCode"
0487 error_code = job_tmp.taskBufferErrorCode
0488 error_diag = job_tmp.taskBufferErrorDiag
0489 errors = [
0490 {
0491 "source": source,
0492 "error_code": error_code,
0493 "error_diag": error_diag,
0494 }
0495 ]
0496 self.logger.debug("AdderGen.run 2 will call job_failure_postprocessing")
0497 retryModule.job_failure_postprocessing(
0498 self.taskBuffer,
0499 job_tmp.PandaID,
0500 errors,
0501 job_tmp.attemptNr,
0502 )
0503 self.logger.debug("job_failure_postprocessing 2 is back")
0504 except IndexError:
0505 pass
0506 except Exception as e:
0507 self.logger.error(f"job_failure_postprocessing 2 excepted and needs to be investigated ({e}): {traceback.format_exc()}")
0508
0509 self.setup_closer()
0510 return True
0511
0512 def setup_closer(self) -> None:
0513 """
0514 Setup closer for the job.
0515 """
0516
0517 if not (EventServiceUtils.isEventServiceJob(self.job) and self.job.isCancelled()):
0518 destination_dispatch_block_list = []
0519 guid_list = []
0520 for file in self.job.Files:
0521
0522 if file.type == "input":
0523 continue
0524
0525 if file.destinationDBlock in ["", None, "NULL"]:
0526 continue
0527
0528 if file.destinationDBlock not in destination_dispatch_block_list:
0529 destination_dispatch_block_list.append(file.destinationDBlock)
0530
0531 if (
0532 self.job.prodSourceLabel == "panda"
0533 or (
0534 self.job.prodSourceLabel in ["rucio_test"] + JobUtils.list_ptest_prod_sources
0535 and self.job.processingType
0536 in [
0537 "pathena",
0538 "prun",
0539 "gangarobot-rctest",
0540 "hammercloud",
0541 ]
0542 )
0543 ) and file.type == "output":
0544
0545 base_lfn = file.lfn.split("/")[-1]
0546 guid_list.append(
0547 {
0548 "lfn": base_lfn,
0549 "guid": file.GUID,
0550 "type": file.type,
0551 "checksum": file.checksum,
0552 "md5sum": file.md5sum,
0553 "fsize": file.fsize,
0554 "scope": file.scope,
0555 }
0556 )
0557 if guid_list:
0558 self.taskBuffer.setGUIDs(guid_list)
0559 if destination_dispatch_block_list:
0560
0561 if self.adder_plugin is not None and hasattr(self.adder_plugin, "dataset_map") and self.adder_plugin.dataset_map != {}:
0562 closer_thread = closer.Closer(
0563 self.taskBuffer,
0564 destination_dispatch_block_list,
0565 self.job,
0566 dataset_map=self.adder_plugin.dataset_map,
0567 )
0568 else:
0569 closer_thread = closer.Closer(self.taskBuffer, destination_dispatch_block_list, self.job)
0570 self.logger.debug("start Closer")
0571 closer_thread.run()
0572 del closer_thread
0573 self.logger.debug("end Closer")
0574
0575 if EventServiceUtils.isJobCloningJob(self.job):
0576 associate_dispatch_block_map = self.taskBuffer.getDestDBlocksWithSingleConsumer(
0577 self.job.jediTaskID, self.job.PandaID, destination_dispatch_block_list
0578 )
0579 for associate_job_id in associate_dispatch_block_map:
0580 associate_dispatch_blocks = associate_dispatch_block_map[associate_job_id]
0581 associate_job = self.taskBuffer.peekJobs(
0582 [associate_job_id],
0583 fromDefined=False,
0584 fromArchived=False,
0585 fromWaiting=False,
0586 forAnal=True,
0587 )[0]
0588 if self.job is None:
0589 self.logger.debug(f"associated job PandaID={associate_job_id} not found in DB")
0590 else:
0591 closer_thread = closer.Closer(self.taskBuffer, associate_dispatch_blocks, associate_job)
0592 self.logger.debug(f"start Closer for PandaID={associate_job_id}")
0593 closer_thread.run()
0594 del closer_thread
0595 self.logger.debug(f"end Closer for PandaID={associate_job_id}")
0596
0597 def update_worker_node(self, json_dict):
0598 try:
0599 self.logger.debug(f"update_worker_node: start")
0600 wn_specs = json_dict.get("worker_node", {})
0601 if not wn_specs:
0602 self.logger.debug(f"update_worker_node: done. No worker node specs found")
0603 return
0604
0605 site = wn_specs.get("site")
0606 host_name = wn_specs.get("host_name")
0607 cpu_model = wn_specs.get("cpu_model")
0608 if not site or not host_name or not cpu_model:
0609 self.logger.debug(f"update_worker_node: done. Incomplete worker node specs found: site={site}, host_name={host_name}, cpu_model={cpu_model}")
0610 return
0611
0612 cpu_model_normalized = normalize_cpu_model(cpu_model)
0613 panda_queue = wn_specs.get("panda_queue")
0614 n_logical_cpus = wn_specs.get("n_logical_cpus")
0615 n_sockets = wn_specs.get("n_sockets")
0616 cores_per_socket = wn_specs.get("cores_per_socket")
0617 threads_per_core = wn_specs.get("threads_per_core")
0618 cpu_architecture = wn_specs.get("cpu_architecture")
0619 cpu_architecture_level = wn_specs.get("cpu_architecture_level")
0620 clock_speed = wn_specs.get("clock_speed")
0621 total_memory = wn_specs.get("total_memory")
0622 total_local_disk = wn_specs.get("total_local_disk")
0623
0624 self.taskBuffer.update_worker_node(
0625 site,
0626 panda_queue,
0627 host_name,
0628 cpu_model,
0629 cpu_model_normalized,
0630 n_logical_cpus,
0631 n_sockets,
0632 cores_per_socket,
0633 threads_per_core,
0634 cpu_architecture,
0635 cpu_architecture_level,
0636 clock_speed,
0637 total_memory,
0638 total_local_disk,
0639 )
0640 self.logger.debug(f"update_worker_node: done for site={site}, host_name={host_name}, cpu_model={cpu_model}")
0641 return
0642 except Exception:
0643 self.logger.error(f"update_worker_node: issue with updating worker node specs: {traceback.format_exc()}")
0644
0645 def update_worker_node_gpu(self, json_dict):
0646 try:
0647 self.logger.debug(f"update_worker_node_gpu: start")
0648 wn_gpu_specs = json_dict.get("worker_node_gpus", {})
0649 if not wn_gpu_specs:
0650 self.logger.debug(f"update_worker_node_gpu: done. No worker node GPU specs found")
0651 return
0652
0653 site = wn_gpu_specs.get("site")
0654 host_name = wn_gpu_specs.get("host_name")
0655 if not site or not host_name:
0656 self.logger.debug(f"update_worker_node_gpu: done. Incomplete worker node GPU specs found: site={site}, host_name={host_name}")
0657 return
0658
0659 vendor = wn_gpu_specs.get("vendor")
0660 model = wn_gpu_specs.get("model")
0661 count = wn_gpu_specs.get("count")
0662 vram = wn_gpu_specs.get("vram")
0663 architecture = wn_gpu_specs.get("architecture")
0664 framework = wn_gpu_specs.get("framework")
0665 framework_version = wn_gpu_specs.get("framework_version")
0666 driver_version = wn_gpu_specs.get("driver_version")
0667
0668 self.taskBuffer.update_worker_node_gpu(site, host_name, vendor, model, count, vram, architecture, framework, framework_version, driver_version)
0669 self.logger.debug(f"update_worker_node_gpu: done for site={site}, host_name={host_name}")
0670 return
0671 except Exception:
0672 self.logger.error(f"update_worker_node_gpu: issue with updating worker node GPU specs: {traceback.format_exc()}")
0673
0674
0675
0676 def parse_job_output_report(self):
0677 """
0678 Parse the JSON data associated with the job to extract file information.
0679
0680 This method processes the JSON data to retrieve Logical File Names (LFNs),
0681 Globally Unique Identifiers (GUIDs), file sizes, checksums, and other metadata.
0682 It updates the job's file information and ensures consistency between the JSON
0683 data and the job's file records.
0684
0685 :return: An integer indicating the result of the parsing process.
0686 0 - succeeded
0687 1 - harmless error to exit
0688 2 - fatal error
0689 3 - event service
0690 """
0691
0692 log_out = [f for f in self.job.Files if f.type in ["log", "output"]]
0693
0694
0695 if not log_out:
0696 self.logger.debug("has no outputs")
0697 self.logger.debug("parse_job_output_report end")
0698 return 0
0699
0700
0701 input_lfns = [file.lfn for file in self.job.Files if file.type == "input"]
0702
0703
0704 lfns = []
0705 guids = []
0706 fsizes = []
0707 md5sums = []
0708 chksums = []
0709 surls = []
0710 full_lfn_map = {}
0711 n_events_map = {}
0712 guid_map = {}
0713
0714 try:
0715 json_dict = json.loads(self.data)
0716 for lfn in json_dict:
0717 file_data = json_dict[lfn]
0718 lfn = str(lfn)
0719 fsize = None
0720 md5sum = None
0721 adler32 = None
0722 surl = None
0723 full_lfn = None
0724 guid = str(file_data["guid"])
0725 if "fsize" in file_data:
0726 fsize = int(file_data["fsize"])
0727 if "md5sum" in file_data:
0728 md5sum = str(file_data["md5sum"])
0729
0730 if re.search("^[a-fA-F0-9]{32}$", md5sum) is None:
0731 md5sum = None
0732 if "adler32" in file_data:
0733 adler32 = str(file_data["adler32"])
0734 if "surl" in file_data:
0735 surl = str(file_data["surl"])
0736 if "full_lfn" in file_data:
0737 full_lfn = str(file_data["full_lfn"])
0738
0739 self.extra_info["endpoint"][lfn] = []
0740 if "endpoint" in file_data:
0741 self.extra_info["endpoint"][lfn] = [file_data["endpoint"]]
0742
0743 if (lfn not in input_lfns) and (fsize is None or (md5sum is None and adler32 is None)):
0744 if not EventServiceUtils.isEventServiceMerge(self.job):
0745 raise RuntimeError("fsize/md5sum/adler32/surl=None")
0746
0747 lfns.append(lfn)
0748 guids.append(guid)
0749 fsizes.append(fsize)
0750 md5sums.append(md5sum)
0751 surls.append(surl)
0752 if adler32 is not None:
0753
0754 chksums.append(f"ad:{adler32}")
0755 else:
0756 chksums.append(f"md5:{md5sum}")
0757 if full_lfn is not None:
0758 full_lfn_map[lfn] = full_lfn
0759 except Exception:
0760 exc_type, value, _ = sys.exc_info()
0761 self.logger.warning("Issue with parsing JSON")
0762 self.logger.error(f"{exc_type} {value}")
0763
0764 self.job.jobStatus = "failed"
0765
0766 if (
0767 (self.job.pilotErrorCode in [0, "0", "NULL"])
0768 and (self.job.taskBufferErrorCode not in [pandaserver.taskbuffer.ErrorCode.EC_WorkerDone])
0769 and (self.job.transExitCode in [0, "0", "NULL"])
0770 ):
0771 self.job.ddmErrorCode = pandaserver.dataservice.ErrorCode.EC_Adder
0772 self.job.ddmErrorDiag = "Could not get GUID/LFN/MD5/FSIZE/SURL from pilot JSON"
0773 return 2
0774
0775
0776 n_events_from = ""
0777 json_dict = {}
0778 try:
0779 json_dict = json.loads(self.job.metadata)
0780 for json_file_item in json_dict["files"]["output"]:
0781 for json_sub_file_item in json_file_item["subFiles"]:
0782 lfn = str(json_sub_file_item["name"])
0783 try:
0784 n_events = int(json_sub_file_item["nentries"])
0785 n_events_map[lfn] = n_events
0786 except Exception:
0787 pass
0788 try:
0789 guid = str(json_sub_file_item["file_guid"])
0790 guid_map[lfn] = guid
0791 except Exception:
0792 pass
0793 n_events_from = "json"
0794 except Exception:
0795 self.logger.warning("Issue with parsing JSON for nEvents")
0796 pass
0797
0798
0799 if isinstance(json_dict, dict):
0800 self.update_worker_node(json_dict)
0801 self.update_worker_node_gpu(json_dict)
0802
0803
0804 if self.job.metadata == "NULL" and self.job_status == "finished" and self.job.nEvents > 0 and self.job.prodSourceLabel in ["managed"]:
0805 for file in self.job.Files:
0806 if file.type == "output":
0807 n_events_map[file.lfn] = self.job.nEvents
0808 for lfn, guid in zip(lfns, guids):
0809 guid_map[lfn] = guid
0810 n_events_from = "pilot"
0811
0812 self.logger.debug(f"nEventsMap={str(n_events_map)}")
0813 self.logger.debug(f"nEventsFrom={str(n_events_from)}")
0814 self.logger.debug(f"guidMap={str(guid_map)}")
0815 self.logger.debug(f"self.job.jobStatus={self.job.jobStatus} in parse_job_output_report")
0816 self.logger.debug(f"isES={EventServiceUtils.isEventServiceJob(self.job)} isJumbo={EventServiceUtils.isJumboJob(self.job)}")
0817
0818
0819 lumi_block_nr = self.job.getLumiBlockNr()
0820
0821
0822 tmp_stat = self.copy_files_for_variable_num_outputs(lfns)
0823 if not tmp_stat:
0824 err_msg = "failed to copy files for variable number of outputs"
0825 self.logger.error(err_msg)
0826 self.job.ddmErrorCode = pandaserver.dataservice.ErrorCode.EC_Adder
0827 self.job.ddmErrorDiag = err_msg
0828 self.job.jobStatus = "failed"
0829 return 2
0830
0831
0832 lfns_set = set(lfns)
0833 file_list = []
0834 for file in self.job.Files:
0835 file_list.append(file.lfn)
0836 if file.type == "input":
0837 if file.lfn in lfns_set:
0838 if self.job.prodSourceLabel in ["user", "panda"] or self.job.is_on_site_merging():
0839
0840 file.status = "skipped"
0841 self.logger.debug(f"skipped input : {file.lfn}")
0842 elif self.job.prodSourceLabel in ["managed", "test"] + JobUtils.list_ptest_prod_sources:
0843
0844 file.status = "failed"
0845 self.logger.debug(f"failed input : {file.lfn}")
0846 elif file.type in {"output", "log"}:
0847
0848 if self.job_status == "failed" and file.type != "log":
0849 file.status = "failed"
0850 continue
0851
0852 if file.lfn not in lfns_set:
0853 if (self.job.jobStatus == "finished" and EventServiceUtils.isEventServiceJob(self.job)) or EventServiceUtils.isJumboJob(self.job):
0854
0855 pass
0856 elif file.isAllowedNoOutput():
0857
0858 file.status = "nooutput"
0859 self.logger.debug(f"set {file.lfn} to status={file.status}")
0860 else:
0861 file.status = "failed"
0862 self.job.jobStatus = "failed"
0863 self.job.ddmErrorCode = pandaserver.dataservice.ErrorCode.EC_Adder
0864 self.job.ddmErrorDiag = f"expected output {file.lfn} is missing in pilot JSON"
0865 self.logger.error(self.job.ddmErrorDiag)
0866 continue
0867
0868
0869 try:
0870 i = lfns.index(file.lfn)
0871 file.GUID = guids[i]
0872 file.fsize = fsizes[i]
0873 file.md5sum = md5sums[i]
0874 file.checksum = chksums[i]
0875 surl = surls[i]
0876
0877 file.status = "ready"
0878
0879 if file.lfn in full_lfn_map:
0880 file.lfn = full_lfn_map[file.lfn]
0881
0882 self.extra_info["surl"][file.lfn] = surl
0883
0884 if file.lfn in n_events_map:
0885 self.extra_info["nevents"][file.lfn] = n_events_map[file.lfn]
0886 except Exception:
0887
0888 file.status = "failed"
0889 exc_type, value, _ = sys.exc_info()
0890 self.logger.error(f"{exc_type} {value}")
0891
0892
0893 if lumi_block_nr is not None and file.status != "failed":
0894 self.extra_info["lbnr"][file.lfn] = lumi_block_nr
0895
0896 self.extra_info["guid"] = guid_map
0897
0898
0899 for lfn in lfns:
0900 if lfn not in file_list:
0901 self.logger.error(f"{lfn} is not found in filesTable")
0902 self.job.jobStatus = "failed"
0903 for tmp_file in self.job.Files:
0904 tmp_file.status = "failed"
0905 self.job.ddmErrorCode = pandaserver.dataservice.ErrorCode.EC_Adder
0906 self.job.ddmErrorDiag = f"pilot produced {lfn} inconsistently with jobdef"
0907 return 2
0908
0909 self.logger.debug("parse_job_output_report end")
0910 return 0
0911
0912
0913 def copy_files_for_variable_num_outputs(self, lfns):
0914 """
0915 Copy files for variable number of outputs.
0916
0917 This method handles the copying of file records when there is a variable number of output files.
0918 It identifies new Logical File Names (LFNs) that correspond to the original output files and
0919 updates the job information accordingly.
0920
0921 :param lfns: List of new Logical File Names (LFNs).
0922 :return: True if the operation is successful, False otherwise.
0923 """
0924
0925 original_output_files = {}
0926 for tmp_file in self.job.Files:
0927 if tmp_file.type in ["output", "log"]:
0928 original_output_files[tmp_file.lfn] = tmp_file
0929
0930 orig_to_new_map = {}
0931 for new_lfn in lfns:
0932 if new_lfn not in original_output_files:
0933
0934 for original_lfn in original_output_files:
0935
0936 tmp_patt = r"^{0}\.*_\d+$".format(original_lfn)
0937
0938 reg_patt = re.sub(r"^[^|]+\|", "", original_lfn)
0939 if re.search(tmp_patt, new_lfn) or (original_lfn.startswith("regex|") and re.search(reg_patt, new_lfn)):
0940 self.logger.debug(f"use new LFN {new_lfn} for {original_lfn}")
0941
0942 orig_to_new_map.setdefault(original_lfn, [])
0943 orig_to_new_map[original_lfn].append(new_lfn)
0944 break
0945
0946 for original_lfn, new_lfn in orig_to_new_map.items():
0947 tmp_stat = self.taskBuffer.copy_file_records(new_lfn, original_output_files[original_lfn])
0948 if not tmp_stat:
0949 return False
0950
0951 if orig_to_new_map:
0952 self.job = self.taskBuffer.peekJobs([self.job_id], fromDefined=False, fromWaiting=False, forAnal=True)[0]
0953
0954 return True