Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-10 08:39:01

0001 """
0002 General Adder plugin. Add data to dataset
0003 
0004 """
0005 
0006 import datetime
0007 import json
0008 import re
0009 import sys
0010 import time
0011 import traceback
0012 
0013 from pandacommon.pandalogger.LogWrapper import LogWrapper
0014 from pandacommon.pandalogger.PandaLogger import PandaLogger
0015 from pandacommon.pandautils.PandaUtils import naive_utcnow
0016 
0017 import pandaserver.dataservice.ErrorCode
0018 import pandaserver.taskbuffer.ErrorCode
0019 from pandaserver.config import panda_config
0020 from pandaserver.dataservice import closer
0021 from pandaserver.srvcore.CoreUtils import normalize_cpu_model
0022 from pandaserver.taskbuffer import EventServiceUtils, JobUtils, retryModule
0023 
0024 _logger = PandaLogger().getLogger("adder")
0025 
0026 panda_config.setupPlugin()
0027 
0028 
0029 class AdderGen:
0030     """
0031     General Adder plugin.
0032     """
0033 
0034     # constructor
0035     def __init__(
0036         self,
0037         taskBuffer,
0038         job_id,
0039         job_status,
0040         attempt_nr,
0041         ignore_tmp_error=True,
0042         siteMapper=None,
0043         pid=None,
0044         prelock_pid=None,
0045         lock_offset=10,
0046         lock_pool=None,
0047     ) -> None:
0048         """
0049         Initialize the AdderGen.
0050 
0051         :param job: The job object.
0052         :param params: Additional parameters.
0053         """
0054         self.job = None
0055         self.job_id = job_id
0056         self.job_status = job_status
0057         self.taskBuffer = taskBuffer
0058         self.ignore_tmp_error = ignore_tmp_error
0059         self.lock_offset = lock_offset
0060         self.siteMapper = siteMapper
0061         self.dataset_map = {}
0062         self.extra_info = {
0063             "surl": {},
0064             "nevents": {},
0065             "lbnr": {},
0066             "endpoint": {},
0067             "guid": {},
0068         }
0069         self.attempt_nr = attempt_nr
0070         self.pid = pid
0071         self.prelock_pid = prelock_pid
0072         self.data = None
0073         self.lock_pool = lock_pool
0074         self.report_dict = None
0075         self.adder_plugin = None
0076         self.add_result = None
0077         self.adder_plugin_class = None
0078         # logger
0079         self.logger = LogWrapper(_logger, str(self.job_id))
0080 
0081     # main
0082     def run(self):
0083         """
0084         Run the AdderGen plugin.
0085         """
0086         try:
0087             start_time = naive_utcnow()
0088             self.logger.debug(f"new start: {self.job_status} attemptNr={self.attempt_nr}")
0089 
0090             # got lock, get the report
0091             self.get_report()
0092 
0093             # query job
0094             self.job = self.taskBuffer.peekJobs([self.job_id], fromDefined=False, fromWaiting=False, fromArchived=False, forAnal=True)[0]
0095 
0096             # execute plugin to process job report and update the job
0097             processed = self.process_job_report()
0098             self.logger.debug(f"job report was successfully processed and can be deleted now: {processed}")
0099 
0100             duration = naive_utcnow() - start_time
0101             self.logger.debug("end: took %s.%03d sec in total" % (duration.seconds, duration.microseconds / 1000))
0102 
0103             # remove Catalog
0104             if processed:
0105                 self.taskBuffer.deleteJobOutputReport(panda_id=self.job_id, attempt_nr=self.attempt_nr)
0106 
0107             del self.data
0108             del self.report_dict
0109         except Exception as e:
0110             err_str = f"{str(e)} {traceback.format_exc()}"
0111             self.logger.error(err_str)
0112             duration = naive_utcnow() - start_time
0113             self.logger.error("except: took %s.%03d sec in total" % (duration.seconds, duration.microseconds / 1000))
0114             # unlock job output report
0115             self.taskBuffer.unlockJobOutputReport(
0116                 panda_id=self.job_id,
0117                 attempt_nr=self.attempt_nr,
0118                 pid=self.pid,
0119                 lock_offset=self.lock_offset,
0120             )
0121 
0122     # dump file report
0123     def dump_file_report(self, file_catalog, attempt_nr):
0124         """
0125         Dump the file report.
0126 
0127         :param file_catalog: The file catalog.
0128         :param attempt_nr: The attempt number.
0129         """
0130         self.logger.debug("dump file report")
0131 
0132         attempt_nr = 0 if attempt_nr is None else attempt_nr
0133         if self.job is None:
0134             self.job = self.taskBuffer.peekJobs([self.job_id], fromDefined=False, fromWaiting=False, forAnal=True)[0]
0135         if self.job:
0136             self.taskBuffer.insertJobOutputReport(
0137                 panda_id=self.job_id,
0138                 prod_source_label=self.job.prodSourceLabel,
0139                 job_status=self.job_status,
0140                 attempt_nr=attempt_nr,
0141                 data=file_catalog,
0142             )
0143 
0144     # get plugin class
0145     def get_plugin_class(self, tmp_vo, tmp_group):
0146         """
0147         Get the plugin class for the given VO and group.
0148 
0149         This method attempts to retrieve a specific plugin class based on the provided
0150         VO (Virtual Organization) and group. If no specific plugin is found, it defaults
0151         to using the ATLAS plugin.
0152 
0153         :param tmp_vo: The Virtual Organization identifier.
0154         :param tmp_group: The group identifier.
0155         :return: The plugin class.
0156         """
0157         # instantiate concrete plugin
0158         self.adder_plugin_class = panda_config.getPlugin("adder_plugins", tmp_vo, tmp_group)
0159         if self.adder_plugin_class is None:
0160             # use ATLAS plugin by default
0161             from pandaserver.dataservice.adder_atlas_plugin import AdderAtlasPlugin
0162 
0163             self.adder_plugin_class = AdderAtlasPlugin
0164         self.logger.debug(f"plugin name {self.adder_plugin_class.__name__}")
0165         return self.adder_plugin_class
0166 
0167     def get_report(self) -> None:
0168         """
0169         Get the job output report.
0170         """
0171         self.report_dict = self.taskBuffer.getJobOutputReport(panda_id=self.job_id, attempt_nr=self.attempt_nr)
0172         self.data = self.report_dict.get("data")
0173 
0174     def register_event_service_files(self) -> None:
0175         """
0176         Register Event Service (ES) files.
0177         """
0178         # instantiate concrete plugin
0179         self.adder_plugin_class = self.get_plugin_class(self.job.VO, self.job.cloud)
0180         self.adder_plugin = self.adder_plugin_class(
0181             self.job,
0182             taskBuffer=self.taskBuffer,
0183             siteMapper=self.siteMapper,
0184             logger=self.logger,
0185         )
0186         self.logger.debug("plugin is ready for ES file registration")
0187         self.adder_plugin.register_event_service_files()
0188 
0189     def check_file_status_in_jedi(self) -> None:
0190         """
0191         Check the file status in JEDI.
0192         """
0193         if not self.job.isCancelled() and self.job.taskBufferErrorCode not in [pandaserver.taskbuffer.ErrorCode.EC_PilotRetried]:
0194             file_check_in_jedi = self.taskBuffer.checkInputFileStatusInJEDI(self.job)
0195             self.logger.debug(f"check file status in JEDI : {file_check_in_jedi}")
0196             if file_check_in_jedi is None:
0197                 raise RuntimeError("failed to check file status in JEDI")
0198             if file_check_in_jedi is False:
0199                 # set job status to failed since some file status is wrong in JEDI
0200                 self.job_status = "failed"
0201                 self.job.ddmErrorCode = pandaserver.dataservice.ErrorCode.EC_Adder
0202                 err_str = "inconsistent file status between Panda and JEDI. "
0203                 err_str += "failed to avoid duplicated processing caused by synchronization failure"
0204                 self.job.ddmErrorDiag = err_str
0205                 self.logger.debug(f"set jobStatus={self.job_status} since input is inconsistent between Panda and JEDI")
0206             elif self.job.jobSubStatus in ["pilot_closed"]:
0207                 # terminated by the pilot
0208                 self.logger.debug("going to closed since terminated by the pilot")
0209                 ret_closed = self.taskBuffer.killJobs([self.job_id], "pilot", "60", True)
0210                 if ret_closed[0] is True:
0211                     self.logger.debug("end")
0212                     # remove Catalog
0213                     self.taskBuffer.deleteJobOutputReport(panda_id=self.job_id, attempt_nr=self.attempt_nr)
0214                     return
0215             # check for cloned jobs
0216             if EventServiceUtils.isJobCloningJob(self.job) and self.job_status == "finished":
0217                 # get semaphore for storeonce
0218                 if EventServiceUtils.getJobCloningType(self.job) == "storeonce":
0219                     self.taskBuffer.getEventRanges(self.job.PandaID, self.job.jobsetID, self.job.jediTaskID, 1, False, False, None)
0220                 # check semaphore
0221                 check_jc = self.taskBuffer.checkClonedJob(self.job)
0222                 if check_jc is None:
0223                     raise RuntimeError("failed to check the cloned job")
0224                 # failed to lock semaphore
0225                 if check_jc["lock"] is False:
0226                     self.job_status = "failed"
0227                     self.job.ddmErrorCode = pandaserver.dataservice.ErrorCode.EC_Adder
0228                     self.job.ddmErrorDiag = "failed to lock semaphore for job cloning"
0229                     self.logger.debug(f"set jobStatus={self.job_status} since did not get semaphore for job cloning")
0230 
0231     def execute_plugin(self) -> None:
0232         """
0233         Execute the Adder plugin.
0234         """
0235         # interaction with DDM
0236         try:
0237             # instantiate concrete plugin
0238             self.adder_plugin_class = self.get_plugin_class(self.job.VO, self.job.cloud)
0239             self.adder_plugin = self.adder_plugin_class(
0240                 self.job,
0241                 taskBuffer=self.taskBuffer,
0242                 siteMapper=self.siteMapper,
0243                 extra_info=self.extra_info,
0244                 logger=self.logger,
0245             )
0246             self.logger.debug("plugin is ready")
0247             self.adder_plugin.execute()
0248             self.add_result = self.adder_plugin.result
0249             self.logger.debug(f"plugin done with {self.add_result.status_code}")
0250         except Exception:
0251             err_type, err_value = sys.exc_info()[:2]
0252             self.logger.error(f"failed to execute AdderPlugin for VO={self.job.VO} with {err_type}:{err_value}")
0253             self.logger.error(f"failed to execute AdderPlugin for VO={self.job.VO} with {traceback.format_exc()}")
0254             self.add_result = None
0255             self.job.ddmErrorCode = pandaserver.dataservice.ErrorCode.EC_Adder
0256             self.job.ddmErrorDiag = "AdderPlugin failure"
0257 
0258     def finalize_job_status(self) -> None:
0259         """
0260         Finalize the job status.
0261         """
0262         if self.job.jobStatus == "failed" or self.job_status == "failed":
0263             self.handle_failed_job()
0264         else:
0265             # reset errors
0266             self.job.jobDispatcherErrorCode = 0
0267             self.job.jobDispatcherErrorDiag = "NULL"
0268             # set status
0269             if self.add_result is not None and self.add_result.merging_files != []:
0270                 # set status for merging:
0271                 for file in self.job.Files:
0272                     if file.lfn in self.add_result.merging_files:
0273                         file.status = "merging"
0274                 self.job.jobStatus = "merging"
0275                 # propagate transition to prodDB
0276                 self.job.stateChangeTime = time.strftime("%Y-%m-%d %H:%M:%S", time.gmtime())
0277             elif self.add_result is not None and self.add_result.transferring_files != []:
0278                 # set status for transferring
0279                 for file in self.job.Files:
0280                     if file.lfn in self.add_result.transferring_files:
0281                         file.status = "transferring"
0282                 self.job.jobStatus = "transferring"
0283                 self.job.jobSubStatus = None
0284                 # propagate transition to prodDB
0285                 self.job.stateChangeTime = time.strftime("%Y-%m-%d %H:%M:%S", time.gmtime())
0286             else:
0287                 self.job.jobStatus = "finished"
0288 
0289     def handle_failed_job(self) -> None:
0290         """
0291         Handle failed job status.
0292         """
0293         # First of all: check if job failed and in this case take first actions according to error table
0294         source, error_code, error_diag = None, None, None
0295         errors = []
0296         if self.job.pilotErrorCode:
0297             source = "pilotErrorCode"
0298             error_code = self.job.pilotErrorCode
0299             error_diag = self.job.pilotErrorDiag
0300             errors.append(
0301                 {
0302                     "source": source,
0303                     "error_code": error_code,
0304                     "error_diag": error_diag,
0305                 }
0306             )
0307         if self.job.exeErrorCode:
0308             source = "exeErrorCode"
0309             error_code = self.job.exeErrorCode
0310             error_diag = self.job.exeErrorDiag
0311             errors.append(
0312                 {
0313                     "source": source,
0314                     "error_code": error_code,
0315                     "error_diag": error_diag,
0316                 }
0317             )
0318         if self.job.ddmErrorCode:
0319             source = "ddmErrorCode"
0320             error_code = self.job.ddmErrorCode
0321             error_diag = self.job.ddmErrorDiag
0322             errors.append(
0323                 {
0324                     "source": source,
0325                     "error_code": error_code,
0326                     "error_diag": error_diag,
0327                 }
0328             )
0329         if self.job.transExitCode:
0330             source = "transExitCode"
0331             error_code = self.job.transExitCode
0332             error_diag = ""
0333             errors.append(
0334                 {
0335                     "source": source,
0336                     "error_code": error_code,
0337                     "error_diag": error_diag,
0338                 }
0339             )
0340 
0341         if source and error_code:
0342             try:
0343                 self.logger.debug("AdderGen.run will call job_failure_postprocessing")
0344                 retryModule.job_failure_postprocessing(
0345                     self.taskBuffer,
0346                     self.job.PandaID,
0347                     errors,
0348                     self.job.attemptNr,
0349                 )
0350                 self.logger.debug("job_failure_postprocessing is back")
0351             except Exception as e:
0352                 self.logger.error(f"job_failure_postprocessing excepted and needs to be investigated ({e}): {traceback.format_exc()}")
0353 
0354         self.job.jobStatus = "failed"
0355         for file in self.job.Files:
0356             if file.type in ["output", "log"]:
0357                 if self.add_result is not None and file.lfn in self.add_result.merging_files:
0358                     file.status = "merging"
0359                 else:
0360                     file.status = "failed"
0361 
0362     def process_job_report(self) -> bool:
0363         """
0364         Check the job status, execute the plugin if job status is applicable, and update job in the database.
0365         return: True if successful or unnecessary, False otherwise to retry.
0366         """
0367         if self.job is None:
0368             self.logger.debug("job not found in DB")
0369             return True
0370         if self.job.jobStatus in ["finished", "failed", "unknown", "merging"]:
0371             self.logger.error(f"invalid state -> {self.job.jobStatus}")
0372             return True
0373         if self.attempt_nr is not None and self.job.attemptNr != self.attempt_nr:
0374             self.logger.error(f"wrong attemptNr -> job={self.job.attemptNr} <> {self.attempt_nr}")
0375             return True
0376         if self.job_status == EventServiceUtils.esRegStatus:
0377             self.register_event_service_files()
0378             return True
0379 
0380         # check file status in JEDI
0381         self.check_file_status_in_jedi()
0382 
0383         # use failed for cancelled/closed jobs
0384         if self.job.isCancelled():
0385             self.job_status = "failed"
0386             # reset error codes to skip retrial module
0387             self.job.pilotErrorCode = 0
0388             self.job.exeErrorCode = 0
0389             self.job.ddmErrorCode = 0
0390 
0391         # keep old status
0392         old_job_status = self.job.jobStatus
0393 
0394         # set job status
0395         if self.job.jobStatus not in ["transferring"]:
0396             self.job.jobStatus = self.job_status
0397         self.add_result = None
0398         self.adder_plugin = None
0399 
0400         # parse Job Output Report JSON
0401         parse_result = self.parse_job_output_report()
0402 
0403         # If the parse_result is less than 2, it means that the parsing either succeeded or encountered a harmless error
0404         if parse_result < 2:
0405             self.execute_plugin()
0406 
0407             # ignore temporary errors
0408             if self.ignore_tmp_error and self.add_result is not None and self.add_result.is_temporary():
0409                 self.logger.debug(f"ignore {self.job.ddmErrorDiag} ")
0410                 self.logger.debug("escape")
0411                 # unlock job output report
0412                 self.taskBuffer.unlockJobOutputReport(
0413                     panda_id=self.job_id,
0414                     attempt_nr=self.attempt_nr,
0415                     pid=self.pid,
0416                     lock_offset=self.lock_offset,
0417                 )
0418                 return False
0419             # failed
0420             if self.add_result is None or not self.add_result.is_succeeded():
0421                 self.job.jobStatus = "failed"
0422 
0423         # set file status for failed jobs or failed transferring jobs
0424         self.logger.debug(f"status after plugin call :job.jobStatus={self.job.jobStatus} jobStatus={self.job_status}")
0425 
0426         self.finalize_job_status()
0427 
0428         # endtime
0429         if self.job.endTime == "NULL":
0430             self.job.endTime = time.strftime("%Y-%m-%d %H:%M:%S", time.gmtime())
0431         # output size and # of outputs
0432         self.job.nOutputDataFiles = 0
0433         self.job.outputFileBytes = 0
0434         for tmp_file in self.job.Files:
0435             if tmp_file.type == "output":
0436                 self.job.nOutputDataFiles += 1
0437                 try:
0438                     self.job.outputFileBytes += tmp_file.fsize
0439                 except Exception:
0440                     pass
0441         # protection
0442         max_output_file_bytes = 99999999999
0443         self.job.outputFileBytes = min(self.job.outputFileBytes, max_output_file_bytes)
0444         # set cancelled state
0445         if self.job.commandToPilot == "tobekilled" and self.job.jobStatus == "failed":
0446             self.job.jobStatus = "cancelled"
0447         # update job
0448         if old_job_status in ["cancelled", "closed"]:
0449             pass
0450         else:
0451             self.logger.debug("updating DB")
0452             update_result = self.taskBuffer.updateJobs(
0453                 [self.job],
0454                 False,
0455                 oldJobStatusList=[old_job_status],
0456                 extraInfo=self.extra_info,
0457                 async_dataset_update=True,
0458             )
0459             self.logger.debug(f"retU: {update_result}")
0460 
0461             # failed
0462             if not update_result[0]:
0463                 self.logger.error(f"failed to update DB for pandaid={self.job.PandaID}")
0464                 # unlock job output report
0465                 self.taskBuffer.unlockJobOutputReport(
0466                     panda_id=self.job_id,
0467                     attempt_nr=self.attempt_nr,
0468                     pid=self.pid,
0469                     lock_offset=self.lock_offset,
0470                 )
0471                 return False
0472 
0473             try:
0474                 # updateJobs was successful and it failed a job with taskBufferErrorCode
0475                 self.logger.debug("AdderGen.run will peek the job")
0476                 job_tmp = self.taskBuffer.peekJobs(
0477                     [self.job.PandaID],
0478                     fromDefined=False,
0479                     fromArchived=True,
0480                     fromWaiting=False,
0481                 )[0]
0482                 self.logger.debug(
0483                     f"status {job_tmp.jobStatus}, taskBufferErrorCode {job_tmp.taskBufferErrorCode}, taskBufferErrorDiag {job_tmp.taskBufferErrorDiag}"
0484                 )
0485                 if job_tmp.jobStatus == "failed" and job_tmp.taskBufferErrorCode:
0486                     source = "taskBufferErrorCode"
0487                     error_code = job_tmp.taskBufferErrorCode
0488                     error_diag = job_tmp.taskBufferErrorDiag
0489                     errors = [
0490                         {
0491                             "source": source,
0492                             "error_code": error_code,
0493                             "error_diag": error_diag,
0494                         }
0495                     ]
0496                     self.logger.debug("AdderGen.run 2 will call job_failure_postprocessing")
0497                     retryModule.job_failure_postprocessing(
0498                         self.taskBuffer,
0499                         job_tmp.PandaID,
0500                         errors,
0501                         job_tmp.attemptNr,
0502                     )
0503                     self.logger.debug("job_failure_postprocessing 2 is back")
0504             except IndexError:
0505                 pass
0506             except Exception as e:
0507                 self.logger.error(f"job_failure_postprocessing 2 excepted and needs to be investigated ({e}): {traceback.format_exc()}")
0508 
0509             self.setup_closer()
0510         return True
0511 
0512     def setup_closer(self) -> None:
0513         """
0514         Setup closer for the job.
0515         """
0516         # setup for closer
0517         if not (EventServiceUtils.isEventServiceJob(self.job) and self.job.isCancelled()):
0518             destination_dispatch_block_list = []
0519             guid_list = []
0520             for file in self.job.Files:
0521                 # ignore inputs
0522                 if file.type == "input":
0523                     continue
0524                 # skip pseudo datasets
0525                 if file.destinationDBlock in ["", None, "NULL"]:
0526                     continue
0527                 # start closer for output/log datasets
0528                 if file.destinationDBlock not in destination_dispatch_block_list:
0529                     destination_dispatch_block_list.append(file.destinationDBlock)
0530                 # collect GUIDs
0531                 if (
0532                     self.job.prodSourceLabel == "panda"
0533                     or (
0534                         self.job.prodSourceLabel in ["rucio_test"] + JobUtils.list_ptest_prod_sources
0535                         and self.job.processingType
0536                         in [
0537                             "pathena",
0538                             "prun",
0539                             "gangarobot-rctest",
0540                             "hammercloud",
0541                         ]
0542                     )
0543                 ) and file.type == "output":
0544                     # extract base LFN since LFN was changed to full LFN for CMS
0545                     base_lfn = file.lfn.split("/")[-1]
0546                     guid_list.append(
0547                         {
0548                             "lfn": base_lfn,
0549                             "guid": file.GUID,
0550                             "type": file.type,
0551                             "checksum": file.checksum,
0552                             "md5sum": file.md5sum,
0553                             "fsize": file.fsize,
0554                             "scope": file.scope,
0555                         }
0556                     )
0557             if guid_list:
0558                 self.taskBuffer.setGUIDs(guid_list)
0559             if destination_dispatch_block_list:
0560                 # start Closer
0561                 if self.adder_plugin is not None and hasattr(self.adder_plugin, "dataset_map") and self.adder_plugin.dataset_map != {}:
0562                     closer_thread = closer.Closer(
0563                         self.taskBuffer,
0564                         destination_dispatch_block_list,
0565                         self.job,
0566                         dataset_map=self.adder_plugin.dataset_map,
0567                     )
0568                 else:
0569                     closer_thread = closer.Closer(self.taskBuffer, destination_dispatch_block_list, self.job)
0570                 self.logger.debug("start Closer")
0571                 closer_thread.run()
0572                 del closer_thread
0573                 self.logger.debug("end Closer")
0574             # run closer for associate parallel jobs
0575             if EventServiceUtils.isJobCloningJob(self.job):
0576                 associate_dispatch_block_map = self.taskBuffer.getDestDBlocksWithSingleConsumer(
0577                     self.job.jediTaskID, self.job.PandaID, destination_dispatch_block_list
0578                 )
0579                 for associate_job_id in associate_dispatch_block_map:
0580                     associate_dispatch_blocks = associate_dispatch_block_map[associate_job_id]
0581                     associate_job = self.taskBuffer.peekJobs(
0582                         [associate_job_id],
0583                         fromDefined=False,
0584                         fromArchived=False,
0585                         fromWaiting=False,
0586                         forAnal=True,
0587                     )[0]
0588                     if self.job is None:
0589                         self.logger.debug(f"associated job PandaID={associate_job_id} not found in DB")
0590                     else:
0591                         closer_thread = closer.Closer(self.taskBuffer, associate_dispatch_blocks, associate_job)
0592                         self.logger.debug(f"start Closer for PandaID={associate_job_id}")
0593                         closer_thread.run()
0594                         del closer_thread
0595                         self.logger.debug(f"end Closer for PandaID={associate_job_id}")
0596 
0597     def update_worker_node(self, json_dict):
0598         try:
0599             self.logger.debug(f"update_worker_node: start")
0600             wn_specs = json_dict.get("worker_node", {})
0601             if not wn_specs:
0602                 self.logger.debug(f"update_worker_node: done. No worker node specs found")
0603                 return
0604 
0605             site = wn_specs.get("site")
0606             host_name = wn_specs.get("host_name")
0607             cpu_model = wn_specs.get("cpu_model")
0608             if not site or not host_name or not cpu_model:
0609                 self.logger.debug(f"update_worker_node: done. Incomplete worker node specs found: site={site}, host_name={host_name}, cpu_model={cpu_model}")
0610                 return
0611 
0612             cpu_model_normalized = normalize_cpu_model(cpu_model)
0613             panda_queue = wn_specs.get("panda_queue")
0614             n_logical_cpus = wn_specs.get("n_logical_cpus")
0615             n_sockets = wn_specs.get("n_sockets")
0616             cores_per_socket = wn_specs.get("cores_per_socket")
0617             threads_per_core = wn_specs.get("threads_per_core")
0618             cpu_architecture = wn_specs.get("cpu_architecture")
0619             cpu_architecture_level = wn_specs.get("cpu_architecture_level")
0620             clock_speed = wn_specs.get("clock_speed")
0621             total_memory = wn_specs.get("total_memory")
0622             total_local_disk = wn_specs.get("total_local_disk")
0623 
0624             self.taskBuffer.update_worker_node(
0625                 site,
0626                 panda_queue,
0627                 host_name,
0628                 cpu_model,
0629                 cpu_model_normalized,
0630                 n_logical_cpus,
0631                 n_sockets,
0632                 cores_per_socket,
0633                 threads_per_core,
0634                 cpu_architecture,
0635                 cpu_architecture_level,
0636                 clock_speed,
0637                 total_memory,
0638                 total_local_disk,
0639             )
0640             self.logger.debug(f"update_worker_node: done for site={site}, host_name={host_name}, cpu_model={cpu_model}")
0641             return
0642         except Exception:
0643             self.logger.error(f"update_worker_node: issue with updating worker node specs: {traceback.format_exc()}")
0644 
0645     def update_worker_node_gpu(self, json_dict):
0646         try:
0647             self.logger.debug(f"update_worker_node_gpu: start")
0648             wn_gpu_specs = json_dict.get("worker_node_gpus", {})
0649             if not wn_gpu_specs:
0650                 self.logger.debug(f"update_worker_node_gpu: done. No worker node GPU specs found")
0651                 return
0652 
0653             site = wn_gpu_specs.get("site")
0654             host_name = wn_gpu_specs.get("host_name")
0655             if not site or not host_name:
0656                 self.logger.debug(f"update_worker_node_gpu: done. Incomplete worker node GPU specs found: site={site}, host_name={host_name}")
0657                 return
0658 
0659             vendor = wn_gpu_specs.get("vendor")
0660             model = wn_gpu_specs.get("model")
0661             count = wn_gpu_specs.get("count")
0662             vram = wn_gpu_specs.get("vram")
0663             architecture = wn_gpu_specs.get("architecture")
0664             framework = wn_gpu_specs.get("framework")
0665             framework_version = wn_gpu_specs.get("framework_version")
0666             driver_version = wn_gpu_specs.get("driver_version")
0667 
0668             self.taskBuffer.update_worker_node_gpu(site, host_name, vendor, model, count, vram, architecture, framework, framework_version, driver_version)
0669             self.logger.debug(f"update_worker_node_gpu: done for site={site}, host_name={host_name}")
0670             return
0671         except Exception:
0672             self.logger.error(f"update_worker_node_gpu: issue with updating worker node GPU specs: {traceback.format_exc()}")
0673 
0674     # parse JSON
0675     # 0: succeeded, 1: harmless error to exit, 2: fatal error, 3: event service
0676     def parse_job_output_report(self):
0677         """
0678         Parse the JSON data associated with the job to extract file information.
0679 
0680         This method processes the JSON data to retrieve Logical File Names (LFNs),
0681         Globally Unique Identifiers (GUIDs), file sizes, checksums, and other metadata.
0682         It updates the job's file information and ensures consistency between the JSON
0683         data and the job's file records.
0684 
0685         :return: An integer indicating the result of the parsing process.
0686                  0 - succeeded
0687                  1 - harmless error to exit
0688                  2 - fatal error
0689                  3 - event service
0690         """
0691         # get LFN and GUID
0692         log_out = [f for f in self.job.Files if f.type in ["log", "output"]]
0693 
0694         # no outputs
0695         if not log_out:
0696             self.logger.debug("has no outputs")
0697             self.logger.debug("parse_job_output_report end")
0698             return 0
0699 
0700         # get input files
0701         input_lfns = [file.lfn for file in self.job.Files if file.type == "input"]
0702 
0703         # parse JSON
0704         lfns = []
0705         guids = []
0706         fsizes = []
0707         md5sums = []
0708         chksums = []
0709         surls = []
0710         full_lfn_map = {}
0711         n_events_map = {}
0712         guid_map = {}
0713 
0714         try:
0715             json_dict = json.loads(self.data)
0716             for lfn in json_dict:
0717                 file_data = json_dict[lfn]
0718                 lfn = str(lfn)
0719                 fsize = None
0720                 md5sum = None
0721                 adler32 = None
0722                 surl = None
0723                 full_lfn = None
0724                 guid = str(file_data["guid"])
0725                 if "fsize" in file_data:
0726                     fsize = int(file_data["fsize"])
0727                 if "md5sum" in file_data:
0728                     md5sum = str(file_data["md5sum"])
0729                     # check the md5sum is a 32-character hexadecimal number
0730                     if re.search("^[a-fA-F0-9]{32}$", md5sum) is None:
0731                         md5sum = None
0732                 if "adler32" in file_data:
0733                     adler32 = str(file_data["adler32"])
0734                 if "surl" in file_data:
0735                     surl = str(file_data["surl"])
0736                 if "full_lfn" in file_data:
0737                     full_lfn = str(file_data["full_lfn"])
0738                 # endpoints
0739                 self.extra_info["endpoint"][lfn] = []
0740                 if "endpoint" in file_data:
0741                     self.extra_info["endpoint"][lfn] = [file_data["endpoint"]]
0742                 # error check
0743                 if (lfn not in input_lfns) and (fsize is None or (md5sum is None and adler32 is None)):
0744                     if not EventServiceUtils.isEventServiceMerge(self.job):
0745                         raise RuntimeError("fsize/md5sum/adler32/surl=None")
0746                 # append
0747                 lfns.append(lfn)
0748                 guids.append(guid)
0749                 fsizes.append(fsize)
0750                 md5sums.append(md5sum)
0751                 surls.append(surl)
0752                 if adler32 is not None:
0753                     # use adler32 if available
0754                     chksums.append(f"ad:{adler32}")
0755                 else:
0756                     chksums.append(f"md5:{md5sum}")
0757                 if full_lfn is not None:
0758                     full_lfn_map[lfn] = full_lfn
0759         except Exception:
0760             exc_type, value, _ = sys.exc_info()
0761             self.logger.warning("Issue with parsing JSON")
0762             self.logger.error(f"{exc_type} {value}")
0763             # set failed anyway
0764             self.job.jobStatus = "failed"
0765             # JSON error happens when pilot got killed due to wall-time limit or failures in wrapper
0766             if (
0767                 (self.job.pilotErrorCode in [0, "0", "NULL"])
0768                 and (self.job.taskBufferErrorCode not in [pandaserver.taskbuffer.ErrorCode.EC_WorkerDone])
0769                 and (self.job.transExitCode in [0, "0", "NULL"])
0770             ):
0771                 self.job.ddmErrorCode = pandaserver.dataservice.ErrorCode.EC_Adder
0772                 self.job.ddmErrorDiag = "Could not get GUID/LFN/MD5/FSIZE/SURL from pilot JSON"
0773             return 2
0774 
0775         # parse metadata to get nEvents
0776         n_events_from = ""
0777         json_dict = {}
0778         try:
0779             json_dict = json.loads(self.job.metadata)
0780             for json_file_item in json_dict["files"]["output"]:
0781                 for json_sub_file_item in json_file_item["subFiles"]:
0782                     lfn = str(json_sub_file_item["name"])
0783                     try:
0784                         n_events = int(json_sub_file_item["nentries"])
0785                         n_events_map[lfn] = n_events
0786                     except Exception:
0787                         pass
0788                     try:
0789                         guid = str(json_sub_file_item["file_guid"])
0790                         guid_map[lfn] = guid
0791                     except Exception:
0792                         pass
0793             n_events_from = "json"
0794         except Exception:
0795             self.logger.warning("Issue with parsing JSON for nEvents")
0796             pass
0797 
0798         # parse metadata to get worker node specs and GPU specs
0799         if isinstance(json_dict, dict):
0800             self.update_worker_node(json_dict)
0801             self.update_worker_node_gpu(json_dict)
0802 
0803         # use nEvents and GUIDs reported by the pilot if no job report
0804         if self.job.metadata == "NULL" and self.job_status == "finished" and self.job.nEvents > 0 and self.job.prodSourceLabel in ["managed"]:
0805             for file in self.job.Files:
0806                 if file.type == "output":
0807                     n_events_map[file.lfn] = self.job.nEvents
0808             for lfn, guid in zip(lfns, guids):
0809                 guid_map[lfn] = guid
0810             n_events_from = "pilot"
0811 
0812         self.logger.debug(f"nEventsMap={str(n_events_map)}")
0813         self.logger.debug(f"nEventsFrom={str(n_events_from)}")
0814         self.logger.debug(f"guidMap={str(guid_map)}")
0815         self.logger.debug(f"self.job.jobStatus={self.job.jobStatus} in parse_job_output_report")
0816         self.logger.debug(f"isES={EventServiceUtils.isEventServiceJob(self.job)} isJumbo={EventServiceUtils.isJumboJob(self.job)}")
0817 
0818         # get lumi block number
0819         lumi_block_nr = self.job.getLumiBlockNr()
0820 
0821         # copy files for variable number of outputs
0822         tmp_stat = self.copy_files_for_variable_num_outputs(lfns)
0823         if not tmp_stat:
0824             err_msg = "failed to copy files for variable number of outputs"
0825             self.logger.error(err_msg)
0826             self.job.ddmErrorCode = pandaserver.dataservice.ErrorCode.EC_Adder
0827             self.job.ddmErrorDiag = err_msg
0828             self.job.jobStatus = "failed"
0829             return 2
0830 
0831         # check files
0832         lfns_set = set(lfns)
0833         file_list = []
0834         for file in self.job.Files:
0835             file_list.append(file.lfn)
0836             if file.type == "input":
0837                 if file.lfn in lfns_set:
0838                     if self.job.prodSourceLabel in ["user", "panda"] or self.job.is_on_site_merging():
0839                         # skipped file
0840                         file.status = "skipped"
0841                         self.logger.debug(f"skipped input : {file.lfn}")
0842                     elif self.job.prodSourceLabel in ["managed", "test"] + JobUtils.list_ptest_prod_sources:
0843                         # failed by pilot
0844                         file.status = "failed"
0845                         self.logger.debug(f"failed input : {file.lfn}")
0846             elif file.type in {"output", "log"}:
0847                 # add only log file for failed jobs
0848                 if self.job_status == "failed" and file.type != "log":
0849                     file.status = "failed"
0850                     continue
0851                 # set failed if it is missing in JSON
0852                 if file.lfn not in lfns_set:
0853                     if (self.job.jobStatus == "finished" and EventServiceUtils.isEventServiceJob(self.job)) or EventServiceUtils.isJumboJob(self.job):
0854                         # unset file status for ES jobs
0855                         pass
0856                     elif file.isAllowedNoOutput():
0857                         # allowed not to be produced
0858                         file.status = "nooutput"
0859                         self.logger.debug(f"set {file.lfn} to status={file.status}")
0860                     else:
0861                         file.status = "failed"
0862                         self.job.jobStatus = "failed"
0863                         self.job.ddmErrorCode = pandaserver.dataservice.ErrorCode.EC_Adder
0864                         self.job.ddmErrorDiag = f"expected output {file.lfn} is missing in pilot JSON"
0865                         self.logger.error(self.job.ddmErrorDiag)
0866                     continue
0867 
0868                 # look for GUID with LFN
0869                 try:
0870                     i = lfns.index(file.lfn)
0871                     file.GUID = guids[i]
0872                     file.fsize = fsizes[i]
0873                     file.md5sum = md5sums[i]
0874                     file.checksum = chksums[i]
0875                     surl = surls[i]
0876                     # status
0877                     file.status = "ready"
0878                     # change to full LFN
0879                     if file.lfn in full_lfn_map:
0880                         file.lfn = full_lfn_map[file.lfn]
0881                     # add SURL to extraInfo
0882                     self.extra_info["surl"][file.lfn] = surl
0883                     # add nevents
0884                     if file.lfn in n_events_map:
0885                         self.extra_info["nevents"][file.lfn] = n_events_map[file.lfn]
0886                 except Exception:
0887                     # status
0888                     file.status = "failed"
0889                     exc_type, value, _ = sys.exc_info()
0890                     self.logger.error(f"{exc_type} {value}")
0891 
0892                 # set lumi block number
0893                 if lumi_block_nr is not None and file.status != "failed":
0894                     self.extra_info["lbnr"][file.lfn] = lumi_block_nr
0895 
0896         self.extra_info["guid"] = guid_map
0897 
0898         # check consistency between JSON and filesTable
0899         for lfn in lfns:
0900             if lfn not in file_list:
0901                 self.logger.error(f"{lfn} is not found in filesTable")
0902                 self.job.jobStatus = "failed"
0903                 for tmp_file in self.job.Files:
0904                     tmp_file.status = "failed"
0905                 self.job.ddmErrorCode = pandaserver.dataservice.ErrorCode.EC_Adder
0906                 self.job.ddmErrorDiag = f"pilot produced {lfn} inconsistently with jobdef"
0907                 return 2
0908         # return
0909         self.logger.debug("parse_job_output_report end")
0910         return 0
0911 
0912     # copy files for variable number of outputs
0913     def copy_files_for_variable_num_outputs(self, lfns):
0914         """
0915         Copy files for variable number of outputs.
0916 
0917         This method handles the copying of file records when there is a variable number of output files.
0918         It identifies new Logical File Names (LFNs) that correspond to the original output files and
0919         updates the job information accordingly.
0920 
0921         :param lfns: List of new Logical File Names (LFNs).
0922         :return: True if the operation is successful, False otherwise.
0923         """
0924         # get original output files
0925         original_output_files = {}
0926         for tmp_file in self.job.Files:
0927             if tmp_file.type in ["output", "log"]:
0928                 original_output_files[tmp_file.lfn] = tmp_file
0929         # look for unknown files
0930         orig_to_new_map = {}
0931         for new_lfn in lfns:
0932             if new_lfn not in original_output_files:
0933                 # look for corresponding original output
0934                 for original_lfn in original_output_files:
0935                     # match LFNs that have a similar base name but may have additional suffixes
0936                     tmp_patt = r"^{0}\.*_\d+$".format(original_lfn)
0937                     # removing any prefix up to and including the first pipe character (|) from the original LFN
0938                     reg_patt = re.sub(r"^[^|]+\|", "", original_lfn)
0939                     if re.search(tmp_patt, new_lfn) or (original_lfn.startswith("regex|") and re.search(reg_patt, new_lfn)):
0940                         self.logger.debug(f"use new LFN {new_lfn} for {original_lfn}")
0941                         # collect new filenames
0942                         orig_to_new_map.setdefault(original_lfn, [])
0943                         orig_to_new_map[original_lfn].append(new_lfn)
0944                         break
0945         # copy file records
0946         for original_lfn, new_lfn in orig_to_new_map.items():
0947             tmp_stat = self.taskBuffer.copy_file_records(new_lfn, original_output_files[original_lfn])
0948             if not tmp_stat:
0949                 return False
0950         # refresh job info
0951         if orig_to_new_map:
0952             self.job = self.taskBuffer.peekJobs([self.job_id], fromDefined=False, fromWaiting=False, forAnal=True)[0]
0953         # return
0954         return True