Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-20 07:59:00

0001 import json
0002 import os
0003 
0004 from act.atlas.aCTDBPanda import aCTDBPanda
0005 from pandaharvester.harvesterconfig import harvester_config
0006 from pandaharvester.harvestercore import core_utils
0007 from pandaharvester.harvestercore.file_spec import FileSpec
0008 from pandaharvester.harvestercore.work_spec import WorkSpec
0009 from pandaharvester.harvestercore.worker_errors import WorkerErrors
0010 
0011 from .base_stager import BaseStager
0012 
0013 # logger
0014 baseLogger = core_utils.setup_logger("act_stager")
0015 
0016 # json for job report
0017 jsonJobReport = harvester_config.payload_interaction.jobReportFile
0018 
0019 # aCT stager plugin
0020 
0021 
0022 class ACTStager(BaseStager):
0023     # constructor
0024     def __init__(self, **kwarg):
0025         BaseStager.__init__(self, **kwarg)
0026 
0027         # Set up aCT DB connection
0028         self.log = core_utils.make_logger(baseLogger, "aCT stager", method_name="__init__")
0029         try:
0030             self.actDB = aCTDBPanda(self.log)
0031         except Exception as e:
0032             self.log.error(f"Could not connect to aCT database: {str(e)}")
0033             self.actDB = None
0034 
0035     # check status
0036     def check_stage_out_status(self, jobspec):
0037         """Check the status of stage-out procedure.
0038         Checks aCT job status and sets output file status to finished or failed
0039         once aCT jobs is done. All error handling and post-processing needs to
0040         be done here.
0041 
0042         :param jobspec: job specifications
0043         :type jobspec: JobSpec
0044         :return: A tuple of return code (True: transfer success, False: fatal transfer failure,
0045                  None: on-going or temporary failure) and error dialog
0046         :rtype: (bool, string)
0047         """
0048 
0049         workSpec = jobspec.get_workspec_list()[0]
0050         # make logger
0051         tmpLog = core_utils.make_logger(baseLogger, f"workerID={workSpec.workerID}", method_name="check_workers")
0052         try:
0053             tmpLog.debug(f"Querying aCT for id {workSpec.batchID}")
0054             columns = ["actpandastatus", "error"]
0055             actjobs = self.actDB.getJobs(f"id={workSpec.batchID}", columns)
0056         except Exception as e:
0057             if self.actDB:
0058                 tmpLog.error(f"Failed to query aCT DB: {str(e)}")
0059             # try again later
0060             return None, "Failed to query aCT DB"
0061 
0062         if not actjobs:
0063             tmpLog.error(f"Job with id {workSpec.batchID} not found in aCT")
0064             return False, "Job not found in aCT"
0065 
0066         actstatus = actjobs[0]["actpandastatus"]
0067         # Only check for final states
0068         if actstatus == "done":
0069             # Do post processing
0070             self.post_processing(workSpec, jobspec)
0071         elif actstatus == "donefailed":
0072             # Call post processing to collect attributes set by aCT for failed jobs
0073             self.post_processing(workSpec, jobspec)
0074             # Set error reported by aCT
0075             errorMsg = actjobs[0]["error"] or "Unknown error"
0076             error_code = WorkerErrors.error_codes.get("GENERAL_ERROR")
0077             jobspec.status = "failed"
0078             # No way to update workspec here
0079             # workSpec.set_supplemental_error(error_code=error_code, error_diag=errorMsg)
0080             jobspec.set_pilot_error(error_code, errorMsg)
0081             tmpLog.info(f"Job {jobspec.PandaID} failed with error {errorMsg}")
0082         elif actstatus == "donecancelled":
0083             # Nothing to do
0084             pass
0085         else:
0086             # Still staging
0087             return None, "still staging"
0088 
0089         tmpLog.info(f"ID {workSpec.batchID} completed in state {actstatus}")
0090 
0091         # Set dummy output file to finished
0092         for fileSpec in jobspec.get_output_file_specs(skip_done=True):
0093             fileSpec.status = "finished"
0094         return True, ""
0095 
0096     # trigger stage out
0097     def trigger_stage_out(self, jobspec):
0098         """Trigger the stage-out procedure for the job.
0099         Create a dummy output file to force harvester to wait until aCT
0100         job is done
0101 
0102         :param jobspec: job specifications
0103         :type jobspec: JobSpec
0104         :return: A tuple of return code (True: success, False: fatal failure, None: temporary failure)
0105                  and error dialog
0106         :rtype: (bool, string)
0107         """
0108         fileSpec = FileSpec()
0109         fileSpec.PandaID = jobspec.PandaID
0110         fileSpec.taskID = jobspec.taskID
0111         fileSpec.lfn = f"dummy.{jobspec.PandaID}"
0112         fileSpec.scope = "dummy"
0113         fileSpec.fileType = "output"
0114         jobspec.add_in_file(fileSpec)
0115 
0116         return True, ""
0117 
0118     # zip output files
0119     def zip_output(self, jobspec):
0120         """Dummy"""
0121         return True, ""
0122 
0123     def post_processing(self, workspec, jobspec):
0124         """
0125         Take the jobReport placed by aCT in the access point and fill metadata
0126         attributes of the jobspec.
0127         """
0128 
0129         # get logger
0130         tmpLog = core_utils.make_logger(baseLogger, f"workerID={workspec.workerID}", method_name="post_processing")
0131         # look for job report
0132         jsonFilePath = os.path.join(workspec.get_access_point(), jsonJobReport)
0133         tmpLog.debug(f"looking for job report file {jsonFilePath}")
0134         try:
0135             with open(jsonFilePath) as jsonFile:
0136                 jobreport = json.load(jsonFile)
0137         except BaseException:
0138             # Assume no job report available means true pilot or push mode
0139             # If job report is not available in full push mode aCT would have failed the job
0140             tmpLog.debug(f"no job report at {jsonFilePath}")
0141             return
0142 
0143         tmpLog.debug(f"got {os.stat(jsonFilePath).st_size / 1024} kB of job report")
0144         tmpLog.debug(f"pilot info for {jobspec.PandaID}: {jobreport}")
0145 
0146         # Set info for final heartbeat and final status
0147         jobspec.set_attributes({jobspec.PandaID: jobreport})
0148         jobspec.set_one_attribute("jobStatus", jobreport.get("state", "failed"))
0149         jobspec.status = jobreport.get("state", "failed")