Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-10 08:39:02

0001 """
0002 finish transferring jobs
0003 
0004 """
0005 
0006 import datetime
0007 import json
0008 import re
0009 import sys
0010 import threading
0011 from typing import List
0012 
0013 from pandacommon.pandalogger.LogWrapper import LogWrapper
0014 from pandacommon.pandalogger.PandaLogger import PandaLogger
0015 from pandacommon.pandautils.PandaUtils import naive_utcnow
0016 
0017 # logger
0018 _logger = PandaLogger().getLogger("finisher")
0019 
0020 
0021 class Finisher(threading.Thread):
0022     """
0023     A class used to finish transferring jobs
0024 
0025     Attributes
0026     ----------
0027     dataset : DatasetSpec
0028         The dataset to be transferred
0029     taskBuffer : TaskBuffer
0030         The task buffer that contains the jobs
0031     job : JobSpec, optional
0032         The job to be transferred (default is None)
0033     site : str, optional
0034         The site where the job is to be transferred (default is None)
0035 
0036     Methods
0037     -------
0038     run():
0039         Starts the thread to finish transferring jobs
0040     """
0041 
0042     # constructor
0043     def __init__(self, taskBuffer, dataset, job: str = None, site: str = None):
0044         """
0045         Constructs all the necessary attributes for the Finisher object.
0046 
0047         Parameters
0048         ----------
0049             taskBuffer : TaskBuffer
0050                 The task buffer that contains the jobs
0051             dataset : DatasetSpec
0052                 The dataset to be transferred
0053             job : JobSpec, optional
0054                 The job to be transferred (default is None)
0055             site : str, optional
0056                 The site where the job is to be transferred (default is None)
0057         """
0058         threading.Thread.__init__(self)
0059         self.dataset = dataset
0060         self.task_buffer = taskBuffer
0061         self.job = job
0062         self.site = site
0063 
0064     def create_json_doc(self, job, failed_files: List[str], no_out_files: List[str]):
0065         """
0066         This function creates a JSON document for the jobs.
0067 
0068         Parameters:
0069         job (JobSpec): The job specification.
0070         failed_files (list): List of failed files.
0071         no_out_files (list): List of files with no output.
0072 
0073         Returns:
0074         str: The created JSON document as a string.
0075         """
0076         json_dict = {}
0077         for file in job.Files:
0078             if file.type in ["output", "log"]:
0079                 # skip failed or no-output files
0080                 if file.lfn in failed_files + no_out_files:
0081                     continue
0082                 file_dict = {
0083                     "guid": file.GUID,
0084                     "fsize": file.fsize,
0085                     "full_lfn": file.lfn,
0086                 }
0087                 if file.checksum.startswith("ad:"):
0088                     file_dict["adler32"] = re.sub("^ad:", "", file.checksum)
0089                 else:
0090                     file_dict["md5sum"] = re.sub("^md5:", "", file.checksum)
0091                 json_dict[file.lfn] = file_dict
0092         return json.dumps(json_dict)
0093 
0094     def update_job_output_report(self, job, failed_files: List[str], no_out_files: List[str]):
0095         """
0096         This function updates the job output report.
0097 
0098         Parameters:
0099         job (JobSpec): The job specification.
0100         failed_files (list): List of failed files.
0101         no_out_files (list): List of files with no output.
0102         """
0103         json_data = self.create_json_doc(job, failed_files, no_out_files)
0104         record_status = "finished" if not failed_files else "failed"
0105         tmp_ret = self.task_buffer.updateJobOutputReport(
0106             panda_id=job.PandaID,
0107             attempt_nr=job.attemptNr,
0108             data=json_data,
0109         )
0110         if not tmp_ret:
0111             self.task_buffer.insertJobOutputReport(
0112                 panda_id=job.PandaID,
0113                 prod_source_label=job.prodSourceLabel,
0114                 job_status=record_status,
0115                 attempt_nr=job.attemptNr,
0116                 data=json_data,
0117             )
0118 
0119     def check_file_status(self, job):
0120         """
0121         This function checks the status of the files for the job.
0122 
0123         Parameters:
0124         job (JobSpec): The job specification.
0125 
0126         Returns:
0127         tuple: A tuple containing three elements:
0128             - bool: True if all files are ready, False otherwise.
0129             - list: A list of failed files.
0130             - list: A list of files with no output.
0131         """
0132         tmp_log = LogWrapper(_logger, f"check_file_status-{naive_utcnow().isoformat('/')}")
0133         failed_files = []
0134         no_out_files = []
0135         for file in job.Files:
0136             if file.type in ("output", "log"):
0137                 if file.status == "failed":
0138                     failed_files.append(file.lfn)
0139                 elif file.status == "nooutput":
0140                     no_out_files.append(file.lfn)
0141                 elif file.status != "ready":
0142                     tmp_log.debug(f"Job: {job.PandaID} file:{file.lfn} {file.status} != ready")
0143                     return False, failed_files, no_out_files
0144         return True, failed_files, no_out_files
0145 
0146     # main
0147     def run(self):
0148         """
0149         Starts the thread to finish transferring jobs
0150         """
0151         tmp_log = LogWrapper(_logger, f"run-{naive_utcnow().isoformat('/')}")
0152         # start
0153         try:
0154             by_call_back = False
0155             if self.job is not None:
0156                 tmp_log.debug(f"start: {self.job.PandaID}")
0157                 panda_ids = [self.job.PandaID]
0158                 jobs = [self.job]
0159             else:
0160                 by_call_back = True
0161                 tmp_log.debug(f"start: {self.dataset.name}")
0162                 panda_ids = self.task_buffer.updateOutFilesReturnPandaIDs(self.dataset.name)
0163                 # set flag for T2 cleanup
0164                 self.dataset.status = "cleanup"
0165                 self.task_buffer.updateDatasets([self.dataset])
0166                 jobs = self.task_buffer.peekJobs(panda_ids, fromDefined=False, fromArchived=False, fromWaiting=False)
0167 
0168             tmp_log.debug(f"IDs: {panda_ids}")
0169             if len(panda_ids) != 0:
0170                 # loop over all jobs
0171                 for job in jobs:
0172                     if job is None or job.jobStatus != "transferring":
0173                         continue
0174                     job_ready, failed_files, no_out_files = self.check_file_status(job)
0175                     # finish job
0176                     if job_ready:
0177                         if by_call_back:
0178                             tmp_log.debug(f"Job: {job.PandaID} all files ready")
0179                         else:
0180                             tmp_log.debug(f"Job: {job.PandaID} all files checked with catalog")
0181                         # create JSON
0182                         try:
0183                             self.update_job_output_report(job, failed_files, no_out_files)
0184                         except Exception:
0185                             exc_type, value, _ = sys.exc_info()
0186                             tmp_log.error(f"Job: {job.PandaID} {exc_type} {value}")
0187                     tmp_log.debug(f"Job: {job.PandaID} status: {job.jobStatus}")
0188             if self.job is None:
0189                 tmp_log.debug(f"end: {self.dataset.name}")
0190             else:
0191                 tmp_log.debug(f"end: {self.job.PandaID}")
0192         except Exception:
0193             exc_type, value, _ = sys.exc_info()
0194             tmp_log.error(f"run() : {exc_type} {value}")