File indexing completed on 2026-04-10 08:39:02
0001 """
0002 finish transferring jobs
0003
0004 """
0005
0006 import datetime
0007 import json
0008 import re
0009 import sys
0010 import threading
0011 from typing import List
0012
0013 from pandacommon.pandalogger.LogWrapper import LogWrapper
0014 from pandacommon.pandalogger.PandaLogger import PandaLogger
0015 from pandacommon.pandautils.PandaUtils import naive_utcnow
0016
0017
0018 _logger = PandaLogger().getLogger("finisher")
0019
0020
0021 class Finisher(threading.Thread):
0022 """
0023 A class used to finish transferring jobs
0024
0025 Attributes
0026 ----------
0027 dataset : DatasetSpec
0028 The dataset to be transferred
0029 taskBuffer : TaskBuffer
0030 The task buffer that contains the jobs
0031 job : JobSpec, optional
0032 The job to be transferred (default is None)
0033 site : str, optional
0034 The site where the job is to be transferred (default is None)
0035
0036 Methods
0037 -------
0038 run():
0039 Starts the thread to finish transferring jobs
0040 """
0041
0042
0043 def __init__(self, taskBuffer, dataset, job: str = None, site: str = None):
0044 """
0045 Constructs all the necessary attributes for the Finisher object.
0046
0047 Parameters
0048 ----------
0049 taskBuffer : TaskBuffer
0050 The task buffer that contains the jobs
0051 dataset : DatasetSpec
0052 The dataset to be transferred
0053 job : JobSpec, optional
0054 The job to be transferred (default is None)
0055 site : str, optional
0056 The site where the job is to be transferred (default is None)
0057 """
0058 threading.Thread.__init__(self)
0059 self.dataset = dataset
0060 self.task_buffer = taskBuffer
0061 self.job = job
0062 self.site = site
0063
0064 def create_json_doc(self, job, failed_files: List[str], no_out_files: List[str]):
0065 """
0066 This function creates a JSON document for the jobs.
0067
0068 Parameters:
0069 job (JobSpec): The job specification.
0070 failed_files (list): List of failed files.
0071 no_out_files (list): List of files with no output.
0072
0073 Returns:
0074 str: The created JSON document as a string.
0075 """
0076 json_dict = {}
0077 for file in job.Files:
0078 if file.type in ["output", "log"]:
0079
0080 if file.lfn in failed_files + no_out_files:
0081 continue
0082 file_dict = {
0083 "guid": file.GUID,
0084 "fsize": file.fsize,
0085 "full_lfn": file.lfn,
0086 }
0087 if file.checksum.startswith("ad:"):
0088 file_dict["adler32"] = re.sub("^ad:", "", file.checksum)
0089 else:
0090 file_dict["md5sum"] = re.sub("^md5:", "", file.checksum)
0091 json_dict[file.lfn] = file_dict
0092 return json.dumps(json_dict)
0093
0094 def update_job_output_report(self, job, failed_files: List[str], no_out_files: List[str]):
0095 """
0096 This function updates the job output report.
0097
0098 Parameters:
0099 job (JobSpec): The job specification.
0100 failed_files (list): List of failed files.
0101 no_out_files (list): List of files with no output.
0102 """
0103 json_data = self.create_json_doc(job, failed_files, no_out_files)
0104 record_status = "finished" if not failed_files else "failed"
0105 tmp_ret = self.task_buffer.updateJobOutputReport(
0106 panda_id=job.PandaID,
0107 attempt_nr=job.attemptNr,
0108 data=json_data,
0109 )
0110 if not tmp_ret:
0111 self.task_buffer.insertJobOutputReport(
0112 panda_id=job.PandaID,
0113 prod_source_label=job.prodSourceLabel,
0114 job_status=record_status,
0115 attempt_nr=job.attemptNr,
0116 data=json_data,
0117 )
0118
0119 def check_file_status(self, job):
0120 """
0121 This function checks the status of the files for the job.
0122
0123 Parameters:
0124 job (JobSpec): The job specification.
0125
0126 Returns:
0127 tuple: A tuple containing three elements:
0128 - bool: True if all files are ready, False otherwise.
0129 - list: A list of failed files.
0130 - list: A list of files with no output.
0131 """
0132 tmp_log = LogWrapper(_logger, f"check_file_status-{naive_utcnow().isoformat('/')}")
0133 failed_files = []
0134 no_out_files = []
0135 for file in job.Files:
0136 if file.type in ("output", "log"):
0137 if file.status == "failed":
0138 failed_files.append(file.lfn)
0139 elif file.status == "nooutput":
0140 no_out_files.append(file.lfn)
0141 elif file.status != "ready":
0142 tmp_log.debug(f"Job: {job.PandaID} file:{file.lfn} {file.status} != ready")
0143 return False, failed_files, no_out_files
0144 return True, failed_files, no_out_files
0145
0146
0147 def run(self):
0148 """
0149 Starts the thread to finish transferring jobs
0150 """
0151 tmp_log = LogWrapper(_logger, f"run-{naive_utcnow().isoformat('/')}")
0152
0153 try:
0154 by_call_back = False
0155 if self.job is not None:
0156 tmp_log.debug(f"start: {self.job.PandaID}")
0157 panda_ids = [self.job.PandaID]
0158 jobs = [self.job]
0159 else:
0160 by_call_back = True
0161 tmp_log.debug(f"start: {self.dataset.name}")
0162 panda_ids = self.task_buffer.updateOutFilesReturnPandaIDs(self.dataset.name)
0163
0164 self.dataset.status = "cleanup"
0165 self.task_buffer.updateDatasets([self.dataset])
0166 jobs = self.task_buffer.peekJobs(panda_ids, fromDefined=False, fromArchived=False, fromWaiting=False)
0167
0168 tmp_log.debug(f"IDs: {panda_ids}")
0169 if len(panda_ids) != 0:
0170
0171 for job in jobs:
0172 if job is None or job.jobStatus != "transferring":
0173 continue
0174 job_ready, failed_files, no_out_files = self.check_file_status(job)
0175
0176 if job_ready:
0177 if by_call_back:
0178 tmp_log.debug(f"Job: {job.PandaID} all files ready")
0179 else:
0180 tmp_log.debug(f"Job: {job.PandaID} all files checked with catalog")
0181
0182 try:
0183 self.update_job_output_report(job, failed_files, no_out_files)
0184 except Exception:
0185 exc_type, value, _ = sys.exc_info()
0186 tmp_log.error(f"Job: {job.PandaID} {exc_type} {value}")
0187 tmp_log.debug(f"Job: {job.PandaID} status: {job.jobStatus}")
0188 if self.job is None:
0189 tmp_log.debug(f"end: {self.dataset.name}")
0190 else:
0191 tmp_log.debug(f"end: {self.job.PandaID}")
0192 except Exception:
0193 exc_type, value, _ = sys.exc_info()
0194 tmp_log.error(f"run() : {exc_type} {value}")