Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-20 07:58:58

0001 import copy
0002 import datetime
0003 import fnmatch
0004 import itertools
0005 import json
0006 import multiprocessing
0007 import os
0008 import os.path
0009 import re
0010 import shutil
0011 import subprocess
0012 import tarfile
0013 import uuid
0014 from concurrent.futures import ThreadPoolExecutor as Pool
0015 from os import scandir, walk
0016 from shutil import which
0017 from urllib.parse import urlencode
0018 
0019 from pandaharvester.harvesterconfig import harvester_config
0020 from pandaharvester.harvestercore import core_utils
0021 from pandaharvester.harvestercore.work_spec import WorkSpec
0022 
0023 from .base_messenger import BaseMessenger
0024 
0025 # task-level work dir
0026 taskWorkDirPathFile = "task_workdir_path.txt"
0027 
0028 # post-processing job attributes
0029 postProcessAttrs = "post_process_job_attrs.json"
0030 
0031 # suffix to read json
0032 suffixReadJson = ".read"
0033 
0034 # logger
0035 _logger = core_utils.setup_logger("shared_file_messenger")
0036 
0037 
0038 def set_logger(master_logger):
0039     global _logger
0040     _logger = master_logger
0041 
0042 
0043 # filter for log.tgz
0044 def filter_log_tgz(extra=None):
0045     patt = ["*.log", "*.txt", "*.xml", "*.json", "log*"]
0046     if extra is not None:
0047         patt += extra
0048     return "-o ".join([f'-name "{i}" ' for i in patt])
0049 
0050 
0051 # tar a single directory
0052 def tar_directory(dir_name, jobspec_filename, tar_name=None, max_depth=None, extra_files=None, sub_tarball_name=None):
0053     if tar_name is None:
0054         tarFilePath = os.path.join(os.path.dirname(dir_name), f"{os.path.basename(dir_name)}.subdir.tar.gz")
0055     else:
0056         tarFilePath = tar_name
0057     # check if sub-tarball already exists
0058     com = None
0059     if sub_tarball_name is not None:
0060         subTarballPath = os.path.join(dir_name, sub_tarball_name)
0061         if os.path.exists(subTarballPath):
0062             com = f"mv {subTarballPath} {tarFilePath}"
0063     # make sub-tarball
0064     if com is None:
0065         com = f"cd {dir_name}; "
0066         com += "find . "
0067         if max_depth is not None:
0068             com += f"-maxdepth {max_depth} "
0069         com += r"-type f \( " + filter_log_tgz(extra_files) + r"\) "
0070         com += r'| grep -v {0} | tr "\n" "\0" | '.format(jobspec_filename)
0071         com += "tar "
0072         if which("pigz") is None:
0073             com += "-z "
0074         else:
0075             com += "-I pigz "
0076         com += f"-c -f {tarFilePath} --null -T -"
0077     p = subprocess.Popen(com, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
0078     stdOut, stdErr = p.communicate()
0079     retCode = p.returncode
0080     return com, retCode, stdOut, stdErr
0081 
0082 
0083 # scan files in a directory
0084 def scan_files_in_dir(dir_name, patterns=None, zip_patterns=None):
0085     fileList = []
0086     for root, dirs, filenames in walk(dir_name):
0087         for filename in filenames:
0088             # check if zipped
0089             is_zipped = False
0090             if zip_patterns:
0091                 matched = False
0092                 for pattern in zip_patterns:
0093                     if re.search(pattern, filename) is not None:
0094                         matched = True
0095                         break
0096                 if matched:
0097                     is_zipped = True
0098             # check filename
0099             if not is_zipped and patterns:
0100                 matched = False
0101                 for pattern in patterns:
0102                     if re.search(pattern, filename) is not None:
0103                         matched = True
0104                         break
0105                 if not matched:
0106                     continue
0107             # make dict
0108             tmpFileDict = dict()
0109             pfn = os.path.join(root, filename)
0110             tmpFileDict["path"] = pfn
0111             tmpFileDict["fsize"] = os.stat(pfn).st_size
0112             tmpFileDict["guid"] = str(uuid.uuid4())
0113             tmpFileDict["chksum"] = core_utils.calc_adler32(pfn)
0114             tmpFileDict["eventStatus"] = "finished"
0115             if is_zipped:
0116                 lfns = []
0117                 # extract actual event filenames from zip
0118                 with tarfile.open(pfn) as f:
0119                     for tar_info in f.getmembers():
0120                         lfns.append(os.path.basename(tar_info.name))
0121                 tmpFileDict["type"] = "zip_output"
0122             else:
0123                 lfns = [os.path.basename(pfn)]
0124                 tmpFileDict["type"] = "es_output"
0125             for lfn in lfns:
0126                 tmpDict = copy.copy(tmpFileDict)
0127                 tmpDict["eventRangeID"] = lfn.split(".")[-1]
0128 
0129                 fileList.append(tmpDict)
0130     return fileList
0131 
0132 
0133 # messenger with shared file system
0134 class SharedFileMessenger(BaseMessenger):
0135     # constructor
0136     def __init__(self, **kwarg):
0137         self.jobSpecFileFormat = "json"
0138         self.stripJobParams = False
0139         self.scanInPostProcess = False
0140         self.leftOverPatterns = None
0141         self.leftOverZipPatterns = None
0142         self.postProcessInSubDir = False
0143         self.outputSubDir = None
0144         self.subTarballName = None
0145         self.maxWorkersForZip = None
0146         BaseMessenger.__init__(self, **kwarg)
0147 
0148     # get access point
0149     def get_access_point(self, workspec, panda_id):
0150         if workspec.mapType == WorkSpec.MT_MultiJobs:
0151             accessPoint = os.path.join(workspec.get_access_point(), str(panda_id))
0152         else:
0153             accessPoint = workspec.get_access_point()
0154         return accessPoint
0155 
0156     # get task access point
0157     def get_task_access_point(self, workspec, jobspec):
0158         subAccessPoint = self.get_access_point(workspec, jobspec.PandaID)
0159         tmp_file = os.path.join(subAccessPoint, taskWorkDirPathFile)
0160         if os.path.exists(tmp_file):
0161             with open(tmp_file) as f:
0162                 return f.read()
0163         if jobspec.jobParams and "onSiteMerging" in jobspec.jobParams:
0164             return os.path.join(self.taskWorkBaseDir, str(jobspec.taskID))
0165         return None
0166 
0167     # get attributes of a worker which should be propagated to job(s).
0168     #  * the worker needs to put a json under the access point
0169     def get_work_attributes(self, workspec):
0170         # get logger
0171         tmpLog = core_utils.make_logger(_logger, f"workerID={workspec.workerID}", method_name="get_work_attributes")
0172         allRetDict = dict()
0173         numofreads = 0
0174         sw_readreports = core_utils.get_stopwatch()
0175         for pandaID in workspec.pandaid_list:
0176             # look for the json just under the access point
0177             accessPoint = self.get_access_point(workspec, pandaID)
0178             jsonFilePath = os.path.join(accessPoint, self.jsonAttrsFileName)
0179             tmpLog.debug(f"looking for attributes file {jsonFilePath}")
0180             retDict = dict()
0181             if not os.path.exists(jsonFilePath):
0182                 # not found
0183                 tmpLog.debug("not found attributes file")
0184             else:
0185                 try:
0186                     with open(jsonFilePath) as jsonFile:
0187                         retDict = json.load(jsonFile)
0188                 except Exception:
0189                     tmpLog.debug(f"failed to load {jsonFilePath}")
0190             # look for job report
0191             jsonFilePath = os.path.join(accessPoint, self.jsonJobReport)
0192             tmpLog.debug(f"looking for job report file {jsonFilePath}")
0193             sw_checkjobrep = core_utils.get_stopwatch()
0194             if not os.path.exists(jsonFilePath):
0195                 # not found
0196                 tmpLog.debug("not found job report file")
0197             else:
0198                 try:
0199                     sw_readrep = core_utils.get_stopwatch()
0200                     with open(jsonFilePath) as jsonFile:
0201                         tmpDict = json.load(jsonFile)
0202                     retDict["metaData"] = tmpDict
0203                     tmpLog.debug(f"got {os.stat(jsonFilePath).st_size / 1024} kB of job report. {sw_readrep.get_elapsed_time()} sec.")
0204                     numofreads += 1
0205                 except Exception:
0206                     tmpLog.debug(f"failed to load {jsonFilePath}")
0207             tmpLog.debug(f"Check file and read file time: {sw_checkjobrep.get_elapsed_time()} sec.")
0208             # loop for post-processing job attributes
0209             jsonFilePath = os.path.join(accessPoint, postProcessAttrs)
0210             tmpLog.debug(f"looking for post-processing job attributes file {jsonFilePath}")
0211             if not os.path.exists(jsonFilePath):
0212                 # not found
0213                 tmpLog.debug("not found post-processing job attributes file")
0214             else:
0215                 try:
0216                     with open(jsonFilePath) as jsonFile:
0217                         tmpDict = json.load(jsonFile)
0218                     retDict.update(tmpDict)
0219                 except Exception:
0220                     tmpLog.debug(f"failed to load {jsonFilePath}")
0221             allRetDict[pandaID] = retDict
0222 
0223         tmpLog.debug(f"Reading {numofreads} job report files {sw_readreports.get_elapsed_time()}")
0224         return allRetDict
0225 
0226     # get files to stage-out.
0227     #  * the worker needs to put a json under the access point
0228     def get_files_to_stage_out(self, workspec):
0229         # get logger
0230         tmpLog = core_utils.make_logger(_logger, f"workerID={workspec.workerID}", method_name="get_files_to_stage_out")
0231         fileDict = dict()
0232         # look for the json just under the access point
0233         for pandaID in workspec.pandaid_list:
0234             # look for the json just under the access point
0235             accessPoint = self.get_access_point(workspec, pandaID)
0236             jsonFilePath = os.path.join(accessPoint, self.jsonOutputsFileName)
0237             readJsonPath = jsonFilePath + suffixReadJson
0238             # first look for json.read which is not yet acknowledged
0239             tmpLog.debug(f"looking for output file {readJsonPath}")
0240             if os.path.exists(readJsonPath):
0241                 pass
0242             else:
0243                 tmpLog.debug(f"looking for output file {jsonFilePath}")
0244                 if not os.path.exists(jsonFilePath):
0245                     # not found
0246                     tmpLog.debug("not found")
0247                     continue
0248                 try:
0249                     tmpLog.debug("found")
0250                     # rename to prevent from being overwritten
0251                     os.rename(jsonFilePath, readJsonPath)
0252                 except Exception:
0253                     tmpLog.error("failed to rename json")
0254                     continue
0255             # load json
0256             toSkip = False
0257             loadDict = None
0258             try:
0259                 with open(readJsonPath) as jsonFile:
0260                     loadDict = json.load(jsonFile)
0261             except Exception:
0262                 tmpLog.error("failed to load json")
0263                 toSkip = True
0264             # test validity of data format (ie it should be a Dictionary)
0265             if not toSkip:
0266                 if not isinstance(loadDict, dict):
0267                     tmpLog.error("loaded data is not a dictionary")
0268                     toSkip = True
0269             # collect files and events
0270             nData = 0
0271             if not toSkip:
0272                 sizeMap = dict()
0273                 chksumMap = dict()
0274                 eventsList = dict()
0275                 for tmpPandaID, tmpEventMapList in loadDict.items():
0276                     tmpPandaID = int(tmpPandaID)
0277                     # test if tmpEventMapList is a list
0278                     if not isinstance(tmpEventMapList, list):
0279                         tmpLog.error("loaded data item is not a list")
0280                         toSkip = True
0281                         break
0282                     for tmpEventInfo in tmpEventMapList:
0283                         try:
0284                             nData += 1
0285                             if "eventRangeID" in tmpEventInfo:
0286                                 tmpEventRangeID = tmpEventInfo["eventRangeID"]
0287                             else:
0288                                 tmpEventRangeID = None
0289                             if "path" in tmpEventInfo:
0290                                 tmpFileDict = dict()
0291                                 pfn = tmpEventInfo["path"]
0292                                 lfn = os.path.basename(pfn)
0293                                 tmpFileDict["path"] = pfn
0294                                 if pfn not in sizeMap:
0295                                     if "fsize" in tmpEventInfo:
0296                                         sizeMap[pfn] = tmpEventInfo["fsize"]
0297                                     else:
0298                                         sizeMap[pfn] = os.stat(pfn).st_size
0299                                 tmpFileDict["fsize"] = sizeMap[pfn]
0300                                 tmpFileDict["type"] = tmpEventInfo["type"]
0301                                 if tmpEventInfo["type"] in ["log", "output", "checkpoint"]:
0302                                     # disable zipping
0303                                     tmpFileDict["isZip"] = 0
0304                                 elif tmpEventInfo["type"] == "zip_output":
0305                                     # already zipped
0306                                     tmpFileDict["isZip"] = 1
0307                                 elif "isZip" in tmpEventInfo:
0308                                     tmpFileDict["isZip"] = tmpEventInfo["isZip"]
0309                                 # guid
0310                                 if "guid" in tmpEventInfo:
0311                                     tmpFileDict["guid"] = tmpEventInfo["guid"]
0312                                 else:
0313                                     tmpFileDict["guid"] = str(uuid.uuid4())
0314                                 # get checksum
0315                                 if pfn not in chksumMap:
0316                                     if "chksum" in tmpEventInfo:
0317                                         chksumMap[pfn] = tmpEventInfo["chksum"]
0318                                     else:
0319                                         chksumMap[pfn] = core_utils.calc_adler32(pfn)
0320                                 tmpFileDict["chksum"] = chksumMap[pfn]
0321                                 if tmpPandaID not in fileDict:
0322                                     fileDict[tmpPandaID] = dict()
0323                                 if lfn not in fileDict[tmpPandaID]:
0324                                     fileDict[tmpPandaID][lfn] = []
0325                                 fileDict[tmpPandaID][lfn].append(tmpFileDict)
0326                                 # skip if unrelated to events
0327                                 if tmpFileDict["type"] not in ["es_output", "zip_output"]:
0328                                     continue
0329                                 tmpFileDict["eventRangeID"] = tmpEventRangeID
0330                             if tmpPandaID not in eventsList:
0331                                 eventsList[tmpPandaID] = list()
0332                             eventsList[tmpPandaID].append({"eventRangeID": tmpEventRangeID, "eventStatus": tmpEventInfo["eventStatus"]})
0333                         except Exception:
0334                             core_utils.dump_error_message(tmpLog)
0335                 # dump events
0336                 if not toSkip:
0337                     if len(eventsList) > 0:
0338                         curName = os.path.join(accessPoint, self.jsonEventsUpdateFileName)
0339                         newName = curName + ".new"
0340                         f = open(newName, "w")
0341                         json.dump(eventsList, f)
0342                         f.close()
0343                         os.rename(newName, curName)
0344             # remove empty file
0345             if toSkip or nData == 0:
0346                 try:
0347                     os.remove(readJsonPath)
0348                 except Exception:
0349                     pass
0350             tmpLog.debug(f"got {nData} files for PandaID={pandaID}")
0351         return fileDict
0352 
0353     # check if job is requested.
0354     # * the worker needs to put a json under the access point
0355     def job_requested(self, workspec):
0356         # get logger
0357         tmpLog = core_utils.make_logger(_logger, f"workerID={workspec.workerID}", method_name="job_requested")
0358         # look for the json just under the access point
0359         jsonFilePath = os.path.join(workspec.get_access_point(), self.jsonJobRequestFileName)
0360         tmpLog.debug(f"looking for job request file {jsonFilePath}")
0361         if not os.path.exists(jsonFilePath):
0362             # not found
0363             tmpLog.debug("not found")
0364             return False
0365         # read nJobs
0366         try:
0367             with open(jsonFilePath) as jsonFile:
0368                 tmpDict = json.load(jsonFile)
0369                 nJobs = tmpDict["nJobs"]
0370         except Exception:
0371             # request 1 job by default
0372             nJobs = 1
0373         tmpLog.debug(f"requesting {nJobs} jobs")
0374         return nJobs
0375 
0376     # feed jobs
0377     # * worker_jobspec.json is put under the access point
0378     def feed_jobs(self, workspec, jobspec_list):
0379         # get logger
0380         tmpLog = core_utils.make_logger(_logger, f"workerID={workspec.workerID}", method_name="feed_jobs")
0381         retVal = True
0382         # get PFC
0383         pfc = core_utils.make_pool_file_catalog(jobspec_list)
0384         pandaIDs = []
0385         for jobSpec in jobspec_list:
0386             accessPoint = self.get_access_point(workspec, jobSpec.PandaID)
0387             jobSpecFilePath = os.path.join(accessPoint, self.jobSpecFileName)
0388             xmlFilePath = os.path.join(accessPoint, self.xmlPoolCatalogFileName)
0389             tmpLog.debug(f"feeding jobs to {jobSpecFilePath}")
0390             try:
0391                 # put job spec file
0392                 with open(jobSpecFilePath, "w") as jobSpecFile:
0393                     jobParams = jobSpec.get_job_params(self.stripJobParams)
0394                     if self.jobSpecFileFormat == "cgi":
0395                         jobSpecFile.write(urlencode(jobParams))
0396                     else:
0397                         json.dump({jobSpec.PandaID: jobParams}, jobSpecFile)
0398                 # put PFC.xml
0399                 with open(xmlFilePath, "w") as pfcFile:
0400                     pfcFile.write(pfc)
0401                 # make symlink
0402                 for fileSpec in jobSpec.inFiles:
0403                     if fileSpec.path is None:
0404                         continue
0405                     dstPath = os.path.join(accessPoint, fileSpec.lfn)
0406                     if fileSpec.path != dstPath:
0407                         # test if symlink exists if so remove it
0408                         if os.path.exists(dstPath):
0409                             os.unlink(dstPath)
0410                             tmpLog.debug(f"removing existing symlink {dstPath}")
0411                         os.symlink(fileSpec.path, dstPath)
0412                 pandaIDs.append(jobSpec.PandaID)
0413             except Exception:
0414                 core_utils.dump_error_message(tmpLog)
0415                 retVal = False
0416         # put PandaIDs file
0417         try:
0418             jsonFilePath = os.path.join(workspec.get_access_point(), self.pandaIDsFile)
0419             with open(jsonFilePath, "w") as jsonPandaIDsFile:
0420                 json.dump(pandaIDs, jsonPandaIDsFile)
0421         except Exception:
0422             core_utils.dump_error_message(tmpLog)
0423             retVal = False
0424         # remove request file
0425         try:
0426             reqFilePath = os.path.join(workspec.get_access_point(), self.jsonJobRequestFileName)
0427             os.remove(reqFilePath)
0428         except Exception:
0429             pass
0430         tmpLog.debug("done")
0431         return retVal
0432 
0433     # request events.
0434     # * the worker needs to put a json under the access point
0435     def events_requested(self, workspec):
0436         # get logger
0437         tmpLog = core_utils.make_logger(_logger, f"workerID={workspec.workerID}", method_name="events_requested")
0438         # look for the json just under the access point
0439         jsonFilePath = os.path.join(workspec.get_access_point(), self.jsonEventsRequestFileName)
0440         tmpLog.debug(f"looking for event request file {jsonFilePath}")
0441         if not os.path.exists(jsonFilePath):
0442             # not found
0443             tmpLog.debug("not found")
0444             return {}
0445         try:
0446             with open(jsonFilePath) as jsonFile:
0447                 retDict = json.load(jsonFile)
0448         except Exception:
0449             tmpLog.debug("failed to load json")
0450             return {}
0451         tmpLog.debug("found")
0452         return retDict
0453 
0454     # feed events
0455     # * worker_events.json is put under the access point
0456     def feed_events(self, workspec, events_dict):
0457         # get logger
0458         tmpLog = core_utils.make_logger(_logger, f"workerID={workspec.workerID}", method_name="feed_events")
0459         retVal = True
0460         if workspec.mapType in [WorkSpec.MT_OneToOne, WorkSpec.MT_MultiWorkers]:
0461             # put the json just under the access point
0462             jsonFilePath = os.path.join(workspec.get_access_point(), self.jsonEventsFeedFileName)
0463             tmpLog.debug(f"feeding events to {jsonFilePath}")
0464             try:
0465                 with open(jsonFilePath, "w") as jsonFile:
0466                     json.dump(events_dict, jsonFile)
0467             except Exception:
0468                 core_utils.dump_error_message(tmpLog)
0469                 retVal = False
0470         elif workspec.mapType == WorkSpec.MT_MultiJobs:
0471             # TOBEFIXED
0472             pass
0473         # remove request file
0474         try:
0475             jsonFilePath = os.path.join(workspec.get_access_point(), self.jsonEventsRequestFileName)
0476             os.remove(jsonFilePath)
0477         except Exception:
0478             pass
0479         tmpLog.debug("done")
0480         return retVal
0481 
0482     # update events.
0483     # * the worker needs to put a json under the access point
0484     def events_to_update(self, workspec):
0485         # get logger
0486         tmpLog = core_utils.make_logger(_logger, f"workerID={workspec.workerID}", method_name="events_to_update")
0487         # look for the json just under the access point
0488         retDict = dict()
0489         for pandaID in workspec.pandaid_list:
0490             # look for the json just under the access point
0491             accessPoint = self.get_access_point(workspec, pandaID)
0492 
0493             jsonFilePath = os.path.join(accessPoint, self.jsonEventsUpdateFileName)
0494             readJsonPath = jsonFilePath + suffixReadJson
0495             # first look for json.read which is not yet acknowledged
0496             tmpLog.debug(f"looking for event update file {readJsonPath}")
0497             if os.path.exists(readJsonPath):
0498                 pass
0499             else:
0500                 tmpLog.debug(f"looking for event update file {jsonFilePath}")
0501                 if not os.path.exists(jsonFilePath):
0502                     # not found
0503                     tmpLog.debug("not found")
0504                     continue
0505                 try:
0506                     # rename to prevent from being overwritten
0507                     os.rename(jsonFilePath, readJsonPath)
0508                 except Exception:
0509                     tmpLog.error("failed to rename json")
0510                     continue
0511             # load json
0512             nData = 0
0513             try:
0514                 with open(readJsonPath) as jsonFile:
0515                     tmpOrigDict = json.load(jsonFile)
0516                     newDict = dict()
0517                     # change the key from str to int
0518                     for tmpPandaID, tmpDict in tmpOrigDict.items():
0519                         tmpPandaID = int(tmpPandaID)
0520                         retDict[tmpPandaID] = tmpDict
0521                         nData += len(tmpDict)
0522             except Exception:
0523                 tmpLog.error("failed to load json")
0524             # delete empty file
0525             if nData == 0:
0526                 try:
0527                     os.remove(readJsonPath)
0528                 except Exception:
0529                     pass
0530             tmpLog.debug(f"got {nData} events for PandaID={pandaID}")
0531         return retDict
0532 
0533     # acknowledge events and files
0534     # * delete json.read files
0535     def acknowledge_events_files(self, workspec):
0536         # get logger
0537         tmpLog = core_utils.make_logger(_logger, f"workerID={workspec.workerID}", method_name="acknowledge_events_files")
0538         # remove request file
0539         for pandaID in workspec.pandaid_list:
0540             accessPoint = self.get_access_point(workspec, pandaID)
0541             try:
0542                 jsonFilePath = os.path.join(accessPoint, self.jsonEventsUpdateFileName)
0543                 jsonFilePath += suffixReadJson
0544                 jsonFilePath_rename = jsonFilePath + "." + core_utils.naive_utcnow().strftime("%Y-%m-%d_%H_%M_%S.%f")
0545                 os.rename(jsonFilePath, jsonFilePath_rename)
0546             except Exception:
0547                 pass
0548             try:
0549                 jsonFilePath = os.path.join(accessPoint, self.jsonOutputsFileName)
0550                 jsonFilePath += suffixReadJson
0551                 jsonFilePath_rename = jsonFilePath + "." + core_utils.naive_utcnow().strftime("%Y-%m-%d_%H_%M_%S.%f")
0552                 os.rename(jsonFilePath, jsonFilePath_rename)
0553             except Exception:
0554                 pass
0555         tmpLog.debug("done")
0556         return
0557 
0558     # setup access points
0559     def setup_access_points(self, workspec_list):
0560         try:
0561             for workSpec in workspec_list:
0562                 accessPoint = workSpec.get_access_point()
0563                 # delete leftover
0564                 if os.path.exists(accessPoint) and workSpec.isNew:
0565                     shutil.rmtree(accessPoint, ignore_errors=True)
0566                 # make the dir if missing
0567                 if not os.path.exists(accessPoint):
0568                     os.makedirs(accessPoint)
0569                 jobSpecs = workSpec.get_jobspec_list()
0570                 if jobSpecs is not None:
0571                     for jobSpec in jobSpecs:
0572                         subAccessPoint = self.get_access_point(workSpec, jobSpec.PandaID)
0573                         if accessPoint != subAccessPoint:
0574                             if not os.path.exists(subAccessPoint):
0575                                 os.mkdir(subAccessPoint)
0576                         # task level work dir for on-site merging
0577                         taskAccessDir = self.get_task_access_point(workSpec, jobSpec)
0578                         if taskAccessDir:
0579                             if not os.path.exists(taskAccessDir):
0580                                 os.mkdir(taskAccessDir)
0581                             with open(os.path.join(subAccessPoint, taskWorkDirPathFile), "w") as f:
0582                                 f.write(taskAccessDir)
0583             return True
0584         except Exception:
0585             # get logger
0586             tmpLog = core_utils.make_logger(_logger, method_name="setup_access_points")
0587             core_utils.dump_error_message(tmpLog)
0588             return False
0589 
0590     # filter for log.tar.gz
0591     def filter_log_tgz(self, name):
0592         for tmpPatt in ["*.log", "*.txt", "*.xml", "*.json", "log*"]:
0593             if fnmatch.fnmatch(name, tmpPatt):
0594                 return True
0595         return False
0596 
0597     # post-processing (archiving log files and collecting job metrics)
0598     def post_processing(self, workspec, jobspec_list, map_type):
0599         # get logger
0600         tmpLog = core_utils.make_logger(_logger, f"workerID={workspec.workerID}", method_name="post_processing")
0601         try:
0602             for jobSpec in jobspec_list:
0603                 # check if log is already there
0604                 hasLog = False
0605                 for fileSpec in jobSpec.outFiles:
0606                     if fileSpec.fileType == "log":
0607                         hasLog = True
0608                         break
0609                 fileDict = dict()
0610                 accessPoint = self.get_access_point(workspec, jobSpec.PandaID)
0611                 origAccessPoint = accessPoint
0612                 if self.postProcessInSubDir:
0613                     accessPoint = os.path.join(accessPoint, str(jobSpec.PandaID))
0614                 # make log
0615                 if not hasLog:
0616                     logFileInfo = jobSpec.get_logfile_info()
0617                     # make log.tar.gz
0618                     logFilePath = os.path.join(accessPoint, logFileInfo["lfn"])
0619                     if map_type == WorkSpec.MT_MultiWorkers:
0620                         # append suffix
0621                         logFilePath += f"._{workspec.workerID}"
0622                     tmpLog.debug(f"making {logFilePath}")
0623                     dirs = [os.path.join(accessPoint, name) for name in os.listdir(accessPoint) if os.path.isdir(os.path.join(accessPoint, name))]
0624                     # tar sub dirs
0625                     tmpLog.debug(f"tar for {len(dirs)} sub dirs")
0626                     with Pool(max_workers=self.maxWorkersForZip if self.maxWorkersForZip else multiprocessing.cpu_count()) as pool:
0627                         retValList = pool.map(
0628                             lambda x, y: tar_directory(x, self.jobSpecFileName, sub_tarball_name=y), dirs, itertools.repeat(self.subTarballName)
0629                         )
0630                         for dirName, (comStr, retCode, stdOut, stdErr) in zip(dirs, retValList):
0631                             if retCode != 0:
0632                                 tmpLog.warning(f"failed to sub-tar {dirName} with {comStr} -> {stdOut}:{stdErr}")
0633                     # tar main dir
0634                     tmpLog.debug("tar for main dir")
0635                     comStr, retCode, stdOut, stdErr = tar_directory(accessPoint, self.jobSpecFileName, logFilePath, 1, ["*.subdir.tar.gz"])
0636                     tmpLog.debug("used command : " + comStr)
0637                     if retCode != 0:
0638                         tmpLog.warning(f"failed to tar {accessPoint} with {comStr} -> {stdOut}:{stdErr}")
0639                     # make file dict
0640                     fileDict.setdefault(jobSpec.PandaID, [])
0641                     fileDict[jobSpec.PandaID].append({"path": logFilePath, "type": "log", "isZip": 0})
0642                 # look for leftovers
0643                 if self.scanInPostProcess:
0644                     tmpLog.debug(f"scanning leftovers in {accessPoint}")
0645                     # set the directory paths to scan for left over files
0646                     dirs = []
0647                     if self.outputSubDir is None:
0648                         dirs = [os.path.join(accessPoint, name) for name in os.listdir(accessPoint) if os.path.isdir(os.path.join(accessPoint, name))]
0649                     else:
0650                         # loop over directories first level from accessPoint and then add subdirectory name.
0651                         upperdirs = [os.path.join(accessPoint, name) for name in os.listdir(accessPoint) if os.path.isdir(os.path.join(accessPoint, name))]
0652                         dirs = [os.path.join(dirname, self.outputSubDir) for dirname in upperdirs if os.path.isdir(os.path.join(dirname, self.outputSubDir))]
0653                     patterns = []
0654                     patterns_zip = []
0655                     for tmp_patterns, tmp_left_over_patterns in [[patterns, self.leftOverPatterns], [patterns_zip, self.leftOverZipPatterns]]:
0656                         if tmp_left_over_patterns is None:
0657                             continue
0658                         for scanPat in tmp_left_over_patterns:
0659                             # replace placeholders
0660                             if "%PANDAID" in scanPat:
0661                                 scanPat = scanPat.replace("%PANDAID", str(jobSpec.PandaID))
0662                             if "%TASKID" in scanPat:
0663                                 scanPat = scanPat.replace("%TASKID", str(jobSpec.taskID))
0664                             if "%OUTPUT_FILE" in scanPat:
0665                                 logFileName = jobSpec.get_logfile_info()["lfn"]
0666                                 for outputName in jobSpec.get_output_file_attributes().keys():
0667                                     if outputName == logFileName:
0668                                         continue
0669                                     tmp_patterns.append(scanPat.replace("%OUTPUT_FILE", outputName))
0670                             else:
0671                                 tmp_patterns.append(scanPat)
0672                     # scan files
0673                     nLeftOvers = 0
0674                     with Pool(max_workers=self.maxWorkersForZip if self.maxWorkersForZip else multiprocessing.cpu_count()) as pool:
0675                         retValList = pool.map(scan_files_in_dir, dirs, [patterns] * len(dirs), [patterns_zip] * len(dirs))
0676                         for retVal in retValList:
0677                             fileDict.setdefault(jobSpec.PandaID, [])
0678                             fileDict[jobSpec.PandaID] += retVal
0679                             nLeftOvers += len(retVal)
0680                     tmpLog.debug(f"got {nLeftOvers} leftovers")
0681                 # look into task-level work state file
0682                 taskAccessDir = self.get_task_access_point(workspec, jobSpec)
0683                 if taskAccessDir:
0684                     doneInputs = set()
0685                     taskWorkStatePath = os.path.join(taskAccessDir, self.taskWorkStateFile)
0686                     if os.path.exists(taskWorkStatePath):
0687                         nInTaskState = 0
0688                         with open(taskWorkStatePath) as f:
0689                             try:
0690                                 tmpData = json.load(f)
0691                                 if "merged" in tmpData:
0692                                     output_lfns = set()
0693                                     fileDict.setdefault(jobSpec.PandaID, [])
0694                                     for tmpIn, tmpOuts in tmpData["merged"].items():
0695                                         for tmpLFN, tmpFileDict in tmpOuts.items():
0696                                             if tmpLFN in output_lfns:
0697                                                 continue
0698                                             output_lfns.add(tmpLFN)
0699                                             nInTaskState += 1
0700                                             pfn = tmpFileDict["path"]
0701                                             if "fsize" not in tmpFileDict:
0702                                                 tmpFileDict["fsize"] = os.stat(pfn).st_size
0703                                             tmpFileDict["type"] = "output"
0704                                             if "guid" not in tmpFileDict:
0705                                                 tmpFileDict["guid"] = str(uuid.uuid4())
0706                                             if "chksum" not in tmpFileDict:
0707                                                 tmpFileDict["chksum"] = core_utils.calc_adler32(pfn)
0708                                             fileDict.setdefault(jobSpec.PandaID, [])
0709                                             fileDict[jobSpec.PandaID].append(tmpFileDict)
0710                                         doneInputs.add(tmpIn)
0711                             except Exception:
0712                                 core_utils.dump_error_message(tmpLog)
0713                                 tmpLog.error(f"failed to parse task-level work state file {taskWorkStatePath}")
0714                                 raise
0715                         tmpLog.debug(f"got {nInTaskState} output files from task state file")
0716                     # skipped files
0717                     skippedInputs = [fileSpec.lfn for fileSpec in jobSpec.inFiles if fileSpec.lfn not in doneInputs]
0718                     with open(os.path.join(accessPoint, postProcessAttrs), "w") as f:
0719                         json.dump({"skippedInputs": skippedInputs}, f)
0720                     tmpLog.debug(f"set {len(skippedInputs)} input files to skip")
0721                 # make json to stage-out
0722                 if len(fileDict) > 0:
0723                     jsonFilePath = os.path.join(origAccessPoint, self.jsonOutputsFileName)
0724                     with open(jsonFilePath, "w") as jsonFile:
0725                         json.dump(fileDict, jsonFile)
0726                 tmpLog.debug("done")
0727             return True
0728         except Exception:
0729             core_utils.dump_error_message(tmpLog)
0730             return None
0731 
0732     # get PandaIDs for pull model
0733     def get_panda_ids(self, workspec):
0734         # get logger
0735         tmpLog = core_utils.make_logger(_logger, f"workerID={workspec.workerID}", method_name="get_panda_ids")
0736         # look for the json just under the access point
0737         jsonFilePath = os.path.join(workspec.get_access_point(), self.pandaIDsFile)
0738         tmpLog.debug(f"looking for PandaID file {jsonFilePath}")
0739         retVal = []
0740         if not os.path.exists(jsonFilePath):
0741             # not found
0742             tmpLog.debug("not found")
0743             return retVal
0744         try:
0745             with open(jsonFilePath) as jsonFile:
0746                 retVal = json.load(jsonFile)
0747         except Exception:
0748             tmpLog.debug("failed to load json")
0749             return retVal
0750         tmpLog.debug("found")
0751         return retVal
0752 
0753     # check if requested to kill the worker itself
0754     def kill_requested(self, workspec):
0755         # get logger
0756         tmpLog = core_utils.make_logger(_logger, f"workerID={workspec.workerID}", method_name="kill_requested")
0757         # look for the json just under the access point
0758         jsonFilePath = os.path.join(workspec.get_access_point(), self.killWorkerFile)
0759         tmpLog.debug(f"looking for kill request file {jsonFilePath}")
0760         if not os.path.exists(jsonFilePath):
0761             # not found
0762             tmpLog.debug("not found")
0763             return False
0764         tmpLog.debug("kill requested")
0765         return True
0766 
0767     # check if the worker is alive
0768     def is_alive(self, workspec, time_limit):
0769         # get logger
0770         tmpLog = core_utils.make_logger(_logger, f"workerID={workspec.workerID}", method_name="is_alive")
0771         # json file
0772         jsonFilePath = os.path.join(workspec.get_access_point(), self.heartbeatFile)
0773         tmpLog.debug(f"looking for heartbeat file {jsonFilePath}")
0774         if not os.path.exists(jsonFilePath):  # no heartbeat file was found
0775             tmpLog.debug(f"startTime: {workspec.startTime}, now: {core_utils.naive_utcnow()}")
0776             if not workspec.startTime:
0777                 # the worker didn't even have time to start
0778                 tmpLog.debug("heartbeat not found, but no startTime yet for worker")
0779                 return True
0780             elif core_utils.naive_utcnow() - workspec.startTime < datetime.timedelta(minutes=time_limit):
0781                 # the worker is too young and maybe didn't have time to generate the heartbeat
0782                 tmpLog.debug("heartbeat not found, but worker too young")
0783                 return True
0784             else:
0785                 # the worker is old and the heartbeat should be expected
0786                 tmpLog.debug("not found")
0787                 return None
0788         try:
0789             mtime = core_utils.naive_utcfromtimestamp(os.path.getmtime(jsonFilePath))
0790             tmpLog.debug(f"last modification time : {mtime}")
0791             if core_utils.naive_utcnow() - mtime > datetime.timedelta(minutes=time_limit):
0792                 tmpLog.debug("too old")
0793                 return False
0794             tmpLog.debug("OK")
0795             return True
0796         except Exception:
0797             tmpLog.debug("failed to get mtime")
0798             return None
0799 
0800     # clean up. Called by sweeper agent to clean up stuff made by messenger for the worker
0801     # for shared_file_messenger, clean up worker the directory of access point
0802     def clean_up(self, workspec):
0803         # get logger
0804         tmpLog = core_utils.make_logger(_logger, f"workerID={workspec.workerID}", method_name="clean_up")
0805         # Remove from top directory of access point of worker
0806         errStr = ""
0807         worker_accessPoint = workspec.get_access_point()
0808         if os.path.isdir(worker_accessPoint):
0809             try:
0810                 shutil.rmtree(worker_accessPoint)
0811             except Exception as _e:
0812                 errStr = f"failed to remove directory {worker_accessPoint} : {_e}"
0813                 tmpLog.error(errStr)
0814             else:
0815                 tmpLog.debug("done")
0816                 return (True, errStr)
0817         elif not os.path.exists(worker_accessPoint):
0818             tmpLog.debug("accessPoint directory already gone. Skipped")
0819             return (None, errStr)
0820         else:
0821             errStr = f"{worker_accessPoint} is not a directory"
0822             tmpLog.error(errStr)
0823         return (False, errStr)