File indexing completed on 2026-04-20 07:59:00
0001 import json
0002 import os
0003
0004 from act.atlas.aCTDBPanda import aCTDBPanda
0005 from pandaharvester.harvesterconfig import harvester_config
0006 from pandaharvester.harvestercore import core_utils
0007 from pandaharvester.harvestercore.file_spec import FileSpec
0008 from pandaharvester.harvestercore.work_spec import WorkSpec
0009 from pandaharvester.harvestercore.worker_errors import WorkerErrors
0010
0011 from .base_stager import BaseStager
0012
0013
0014 baseLogger = core_utils.setup_logger("act_stager")
0015
0016
0017 jsonJobReport = harvester_config.payload_interaction.jobReportFile
0018
0019
0020
0021
0022 class ACTStager(BaseStager):
0023
0024 def __init__(self, **kwarg):
0025 BaseStager.__init__(self, **kwarg)
0026
0027
0028 self.log = core_utils.make_logger(baseLogger, "aCT stager", method_name="__init__")
0029 try:
0030 self.actDB = aCTDBPanda(self.log)
0031 except Exception as e:
0032 self.log.error(f"Could not connect to aCT database: {str(e)}")
0033 self.actDB = None
0034
0035
0036 def check_stage_out_status(self, jobspec):
0037 """Check the status of stage-out procedure.
0038 Checks aCT job status and sets output file status to finished or failed
0039 once aCT jobs is done. All error handling and post-processing needs to
0040 be done here.
0041
0042 :param jobspec: job specifications
0043 :type jobspec: JobSpec
0044 :return: A tuple of return code (True: transfer success, False: fatal transfer failure,
0045 None: on-going or temporary failure) and error dialog
0046 :rtype: (bool, string)
0047 """
0048
0049 workSpec = jobspec.get_workspec_list()[0]
0050
0051 tmpLog = core_utils.make_logger(baseLogger, f"workerID={workSpec.workerID}", method_name="check_workers")
0052 try:
0053 tmpLog.debug(f"Querying aCT for id {workSpec.batchID}")
0054 columns = ["actpandastatus", "error"]
0055 actjobs = self.actDB.getJobs(f"id={workSpec.batchID}", columns)
0056 except Exception as e:
0057 if self.actDB:
0058 tmpLog.error(f"Failed to query aCT DB: {str(e)}")
0059
0060 return None, "Failed to query aCT DB"
0061
0062 if not actjobs:
0063 tmpLog.error(f"Job with id {workSpec.batchID} not found in aCT")
0064 return False, "Job not found in aCT"
0065
0066 actstatus = actjobs[0]["actpandastatus"]
0067
0068 if actstatus == "done":
0069
0070 self.post_processing(workSpec, jobspec)
0071 elif actstatus == "donefailed":
0072
0073 self.post_processing(workSpec, jobspec)
0074
0075 errorMsg = actjobs[0]["error"] or "Unknown error"
0076 error_code = WorkerErrors.error_codes.get("GENERAL_ERROR")
0077 jobspec.status = "failed"
0078
0079
0080 jobspec.set_pilot_error(error_code, errorMsg)
0081 tmpLog.info(f"Job {jobspec.PandaID} failed with error {errorMsg}")
0082 elif actstatus == "donecancelled":
0083
0084 pass
0085 else:
0086
0087 return None, "still staging"
0088
0089 tmpLog.info(f"ID {workSpec.batchID} completed in state {actstatus}")
0090
0091
0092 for fileSpec in jobspec.get_output_file_specs(skip_done=True):
0093 fileSpec.status = "finished"
0094 return True, ""
0095
0096
0097 def trigger_stage_out(self, jobspec):
0098 """Trigger the stage-out procedure for the job.
0099 Create a dummy output file to force harvester to wait until aCT
0100 job is done
0101
0102 :param jobspec: job specifications
0103 :type jobspec: JobSpec
0104 :return: A tuple of return code (True: success, False: fatal failure, None: temporary failure)
0105 and error dialog
0106 :rtype: (bool, string)
0107 """
0108 fileSpec = FileSpec()
0109 fileSpec.PandaID = jobspec.PandaID
0110 fileSpec.taskID = jobspec.taskID
0111 fileSpec.lfn = f"dummy.{jobspec.PandaID}"
0112 fileSpec.scope = "dummy"
0113 fileSpec.fileType = "output"
0114 jobspec.add_in_file(fileSpec)
0115
0116 return True, ""
0117
0118
0119 def zip_output(self, jobspec):
0120 """Dummy"""
0121 return True, ""
0122
0123 def post_processing(self, workspec, jobspec):
0124 """
0125 Take the jobReport placed by aCT in the access point and fill metadata
0126 attributes of the jobspec.
0127 """
0128
0129
0130 tmpLog = core_utils.make_logger(baseLogger, f"workerID={workspec.workerID}", method_name="post_processing")
0131
0132 jsonFilePath = os.path.join(workspec.get_access_point(), jsonJobReport)
0133 tmpLog.debug(f"looking for job report file {jsonFilePath}")
0134 try:
0135 with open(jsonFilePath) as jsonFile:
0136 jobreport = json.load(jsonFile)
0137 except BaseException:
0138
0139
0140 tmpLog.debug(f"no job report at {jsonFilePath}")
0141 return
0142
0143 tmpLog.debug(f"got {os.stat(jsonFilePath).st_size / 1024} kB of job report")
0144 tmpLog.debug(f"pilot info for {jobspec.PandaID}: {jobreport}")
0145
0146
0147 jobspec.set_attributes({jobspec.PandaID: jobreport})
0148 jobspec.set_one_attribute("jobStatus", jobreport.get("state", "failed"))
0149 jobspec.status = jobreport.get("state", "failed")