File indexing completed on 2026-04-20 07:58:58
0001 import copy
0002 import datetime
0003 import fnmatch
0004 import itertools
0005 import json
0006 import multiprocessing
0007 import os
0008 import os.path
0009 import re
0010 import shutil
0011 import subprocess
0012 import tarfile
0013 import uuid
0014 from concurrent.futures import ThreadPoolExecutor as Pool
0015 from os import scandir, walk
0016 from shutil import which
0017 from urllib.parse import urlencode
0018
0019 from pandaharvester.harvesterconfig import harvester_config
0020 from pandaharvester.harvestercore import core_utils
0021 from pandaharvester.harvestercore.work_spec import WorkSpec
0022
0023 from .base_messenger import BaseMessenger
0024
0025
0026 taskWorkDirPathFile = "task_workdir_path.txt"
0027
0028
0029 postProcessAttrs = "post_process_job_attrs.json"
0030
0031
0032 suffixReadJson = ".read"
0033
0034
0035 _logger = core_utils.setup_logger("shared_file_messenger")
0036
0037
0038 def set_logger(master_logger):
0039 global _logger
0040 _logger = master_logger
0041
0042
0043
0044 def filter_log_tgz(extra=None):
0045 patt = ["*.log", "*.txt", "*.xml", "*.json", "log*"]
0046 if extra is not None:
0047 patt += extra
0048 return "-o ".join([f'-name "{i}" ' for i in patt])
0049
0050
0051
0052 def tar_directory(dir_name, jobspec_filename, tar_name=None, max_depth=None, extra_files=None, sub_tarball_name=None):
0053 if tar_name is None:
0054 tarFilePath = os.path.join(os.path.dirname(dir_name), f"{os.path.basename(dir_name)}.subdir.tar.gz")
0055 else:
0056 tarFilePath = tar_name
0057
0058 com = None
0059 if sub_tarball_name is not None:
0060 subTarballPath = os.path.join(dir_name, sub_tarball_name)
0061 if os.path.exists(subTarballPath):
0062 com = f"mv {subTarballPath} {tarFilePath}"
0063
0064 if com is None:
0065 com = f"cd {dir_name}; "
0066 com += "find . "
0067 if max_depth is not None:
0068 com += f"-maxdepth {max_depth} "
0069 com += r"-type f \( " + filter_log_tgz(extra_files) + r"\) "
0070 com += r'| grep -v {0} | tr "\n" "\0" | '.format(jobspec_filename)
0071 com += "tar "
0072 if which("pigz") is None:
0073 com += "-z "
0074 else:
0075 com += "-I pigz "
0076 com += f"-c -f {tarFilePath} --null -T -"
0077 p = subprocess.Popen(com, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
0078 stdOut, stdErr = p.communicate()
0079 retCode = p.returncode
0080 return com, retCode, stdOut, stdErr
0081
0082
0083
0084 def scan_files_in_dir(dir_name, patterns=None, zip_patterns=None):
0085 fileList = []
0086 for root, dirs, filenames in walk(dir_name):
0087 for filename in filenames:
0088
0089 is_zipped = False
0090 if zip_patterns:
0091 matched = False
0092 for pattern in zip_patterns:
0093 if re.search(pattern, filename) is not None:
0094 matched = True
0095 break
0096 if matched:
0097 is_zipped = True
0098
0099 if not is_zipped and patterns:
0100 matched = False
0101 for pattern in patterns:
0102 if re.search(pattern, filename) is not None:
0103 matched = True
0104 break
0105 if not matched:
0106 continue
0107
0108 tmpFileDict = dict()
0109 pfn = os.path.join(root, filename)
0110 tmpFileDict["path"] = pfn
0111 tmpFileDict["fsize"] = os.stat(pfn).st_size
0112 tmpFileDict["guid"] = str(uuid.uuid4())
0113 tmpFileDict["chksum"] = core_utils.calc_adler32(pfn)
0114 tmpFileDict["eventStatus"] = "finished"
0115 if is_zipped:
0116 lfns = []
0117
0118 with tarfile.open(pfn) as f:
0119 for tar_info in f.getmembers():
0120 lfns.append(os.path.basename(tar_info.name))
0121 tmpFileDict["type"] = "zip_output"
0122 else:
0123 lfns = [os.path.basename(pfn)]
0124 tmpFileDict["type"] = "es_output"
0125 for lfn in lfns:
0126 tmpDict = copy.copy(tmpFileDict)
0127 tmpDict["eventRangeID"] = lfn.split(".")[-1]
0128
0129 fileList.append(tmpDict)
0130 return fileList
0131
0132
0133
0134 class SharedFileMessenger(BaseMessenger):
0135
0136 def __init__(self, **kwarg):
0137 self.jobSpecFileFormat = "json"
0138 self.stripJobParams = False
0139 self.scanInPostProcess = False
0140 self.leftOverPatterns = None
0141 self.leftOverZipPatterns = None
0142 self.postProcessInSubDir = False
0143 self.outputSubDir = None
0144 self.subTarballName = None
0145 self.maxWorkersForZip = None
0146 BaseMessenger.__init__(self, **kwarg)
0147
0148
0149 def get_access_point(self, workspec, panda_id):
0150 if workspec.mapType == WorkSpec.MT_MultiJobs:
0151 accessPoint = os.path.join(workspec.get_access_point(), str(panda_id))
0152 else:
0153 accessPoint = workspec.get_access_point()
0154 return accessPoint
0155
0156
0157 def get_task_access_point(self, workspec, jobspec):
0158 subAccessPoint = self.get_access_point(workspec, jobspec.PandaID)
0159 tmp_file = os.path.join(subAccessPoint, taskWorkDirPathFile)
0160 if os.path.exists(tmp_file):
0161 with open(tmp_file) as f:
0162 return f.read()
0163 if jobspec.jobParams and "onSiteMerging" in jobspec.jobParams:
0164 return os.path.join(self.taskWorkBaseDir, str(jobspec.taskID))
0165 return None
0166
0167
0168
0169 def get_work_attributes(self, workspec):
0170
0171 tmpLog = core_utils.make_logger(_logger, f"workerID={workspec.workerID}", method_name="get_work_attributes")
0172 allRetDict = dict()
0173 numofreads = 0
0174 sw_readreports = core_utils.get_stopwatch()
0175 for pandaID in workspec.pandaid_list:
0176
0177 accessPoint = self.get_access_point(workspec, pandaID)
0178 jsonFilePath = os.path.join(accessPoint, self.jsonAttrsFileName)
0179 tmpLog.debug(f"looking for attributes file {jsonFilePath}")
0180 retDict = dict()
0181 if not os.path.exists(jsonFilePath):
0182
0183 tmpLog.debug("not found attributes file")
0184 else:
0185 try:
0186 with open(jsonFilePath) as jsonFile:
0187 retDict = json.load(jsonFile)
0188 except Exception:
0189 tmpLog.debug(f"failed to load {jsonFilePath}")
0190
0191 jsonFilePath = os.path.join(accessPoint, self.jsonJobReport)
0192 tmpLog.debug(f"looking for job report file {jsonFilePath}")
0193 sw_checkjobrep = core_utils.get_stopwatch()
0194 if not os.path.exists(jsonFilePath):
0195
0196 tmpLog.debug("not found job report file")
0197 else:
0198 try:
0199 sw_readrep = core_utils.get_stopwatch()
0200 with open(jsonFilePath) as jsonFile:
0201 tmpDict = json.load(jsonFile)
0202 retDict["metaData"] = tmpDict
0203 tmpLog.debug(f"got {os.stat(jsonFilePath).st_size / 1024} kB of job report. {sw_readrep.get_elapsed_time()} sec.")
0204 numofreads += 1
0205 except Exception:
0206 tmpLog.debug(f"failed to load {jsonFilePath}")
0207 tmpLog.debug(f"Check file and read file time: {sw_checkjobrep.get_elapsed_time()} sec.")
0208
0209 jsonFilePath = os.path.join(accessPoint, postProcessAttrs)
0210 tmpLog.debug(f"looking for post-processing job attributes file {jsonFilePath}")
0211 if not os.path.exists(jsonFilePath):
0212
0213 tmpLog.debug("not found post-processing job attributes file")
0214 else:
0215 try:
0216 with open(jsonFilePath) as jsonFile:
0217 tmpDict = json.load(jsonFile)
0218 retDict.update(tmpDict)
0219 except Exception:
0220 tmpLog.debug(f"failed to load {jsonFilePath}")
0221 allRetDict[pandaID] = retDict
0222
0223 tmpLog.debug(f"Reading {numofreads} job report files {sw_readreports.get_elapsed_time()}")
0224 return allRetDict
0225
0226
0227
0228 def get_files_to_stage_out(self, workspec):
0229
0230 tmpLog = core_utils.make_logger(_logger, f"workerID={workspec.workerID}", method_name="get_files_to_stage_out")
0231 fileDict = dict()
0232
0233 for pandaID in workspec.pandaid_list:
0234
0235 accessPoint = self.get_access_point(workspec, pandaID)
0236 jsonFilePath = os.path.join(accessPoint, self.jsonOutputsFileName)
0237 readJsonPath = jsonFilePath + suffixReadJson
0238
0239 tmpLog.debug(f"looking for output file {readJsonPath}")
0240 if os.path.exists(readJsonPath):
0241 pass
0242 else:
0243 tmpLog.debug(f"looking for output file {jsonFilePath}")
0244 if not os.path.exists(jsonFilePath):
0245
0246 tmpLog.debug("not found")
0247 continue
0248 try:
0249 tmpLog.debug("found")
0250
0251 os.rename(jsonFilePath, readJsonPath)
0252 except Exception:
0253 tmpLog.error("failed to rename json")
0254 continue
0255
0256 toSkip = False
0257 loadDict = None
0258 try:
0259 with open(readJsonPath) as jsonFile:
0260 loadDict = json.load(jsonFile)
0261 except Exception:
0262 tmpLog.error("failed to load json")
0263 toSkip = True
0264
0265 if not toSkip:
0266 if not isinstance(loadDict, dict):
0267 tmpLog.error("loaded data is not a dictionary")
0268 toSkip = True
0269
0270 nData = 0
0271 if not toSkip:
0272 sizeMap = dict()
0273 chksumMap = dict()
0274 eventsList = dict()
0275 for tmpPandaID, tmpEventMapList in loadDict.items():
0276 tmpPandaID = int(tmpPandaID)
0277
0278 if not isinstance(tmpEventMapList, list):
0279 tmpLog.error("loaded data item is not a list")
0280 toSkip = True
0281 break
0282 for tmpEventInfo in tmpEventMapList:
0283 try:
0284 nData += 1
0285 if "eventRangeID" in tmpEventInfo:
0286 tmpEventRangeID = tmpEventInfo["eventRangeID"]
0287 else:
0288 tmpEventRangeID = None
0289 if "path" in tmpEventInfo:
0290 tmpFileDict = dict()
0291 pfn = tmpEventInfo["path"]
0292 lfn = os.path.basename(pfn)
0293 tmpFileDict["path"] = pfn
0294 if pfn not in sizeMap:
0295 if "fsize" in tmpEventInfo:
0296 sizeMap[pfn] = tmpEventInfo["fsize"]
0297 else:
0298 sizeMap[pfn] = os.stat(pfn).st_size
0299 tmpFileDict["fsize"] = sizeMap[pfn]
0300 tmpFileDict["type"] = tmpEventInfo["type"]
0301 if tmpEventInfo["type"] in ["log", "output", "checkpoint"]:
0302
0303 tmpFileDict["isZip"] = 0
0304 elif tmpEventInfo["type"] == "zip_output":
0305
0306 tmpFileDict["isZip"] = 1
0307 elif "isZip" in tmpEventInfo:
0308 tmpFileDict["isZip"] = tmpEventInfo["isZip"]
0309
0310 if "guid" in tmpEventInfo:
0311 tmpFileDict["guid"] = tmpEventInfo["guid"]
0312 else:
0313 tmpFileDict["guid"] = str(uuid.uuid4())
0314
0315 if pfn not in chksumMap:
0316 if "chksum" in tmpEventInfo:
0317 chksumMap[pfn] = tmpEventInfo["chksum"]
0318 else:
0319 chksumMap[pfn] = core_utils.calc_adler32(pfn)
0320 tmpFileDict["chksum"] = chksumMap[pfn]
0321 if tmpPandaID not in fileDict:
0322 fileDict[tmpPandaID] = dict()
0323 if lfn not in fileDict[tmpPandaID]:
0324 fileDict[tmpPandaID][lfn] = []
0325 fileDict[tmpPandaID][lfn].append(tmpFileDict)
0326
0327 if tmpFileDict["type"] not in ["es_output", "zip_output"]:
0328 continue
0329 tmpFileDict["eventRangeID"] = tmpEventRangeID
0330 if tmpPandaID not in eventsList:
0331 eventsList[tmpPandaID] = list()
0332 eventsList[tmpPandaID].append({"eventRangeID": tmpEventRangeID, "eventStatus": tmpEventInfo["eventStatus"]})
0333 except Exception:
0334 core_utils.dump_error_message(tmpLog)
0335
0336 if not toSkip:
0337 if len(eventsList) > 0:
0338 curName = os.path.join(accessPoint, self.jsonEventsUpdateFileName)
0339 newName = curName + ".new"
0340 f = open(newName, "w")
0341 json.dump(eventsList, f)
0342 f.close()
0343 os.rename(newName, curName)
0344
0345 if toSkip or nData == 0:
0346 try:
0347 os.remove(readJsonPath)
0348 except Exception:
0349 pass
0350 tmpLog.debug(f"got {nData} files for PandaID={pandaID}")
0351 return fileDict
0352
0353
0354
0355 def job_requested(self, workspec):
0356
0357 tmpLog = core_utils.make_logger(_logger, f"workerID={workspec.workerID}", method_name="job_requested")
0358
0359 jsonFilePath = os.path.join(workspec.get_access_point(), self.jsonJobRequestFileName)
0360 tmpLog.debug(f"looking for job request file {jsonFilePath}")
0361 if not os.path.exists(jsonFilePath):
0362
0363 tmpLog.debug("not found")
0364 return False
0365
0366 try:
0367 with open(jsonFilePath) as jsonFile:
0368 tmpDict = json.load(jsonFile)
0369 nJobs = tmpDict["nJobs"]
0370 except Exception:
0371
0372 nJobs = 1
0373 tmpLog.debug(f"requesting {nJobs} jobs")
0374 return nJobs
0375
0376
0377
0378 def feed_jobs(self, workspec, jobspec_list):
0379
0380 tmpLog = core_utils.make_logger(_logger, f"workerID={workspec.workerID}", method_name="feed_jobs")
0381 retVal = True
0382
0383 pfc = core_utils.make_pool_file_catalog(jobspec_list)
0384 pandaIDs = []
0385 for jobSpec in jobspec_list:
0386 accessPoint = self.get_access_point(workspec, jobSpec.PandaID)
0387 jobSpecFilePath = os.path.join(accessPoint, self.jobSpecFileName)
0388 xmlFilePath = os.path.join(accessPoint, self.xmlPoolCatalogFileName)
0389 tmpLog.debug(f"feeding jobs to {jobSpecFilePath}")
0390 try:
0391
0392 with open(jobSpecFilePath, "w") as jobSpecFile:
0393 jobParams = jobSpec.get_job_params(self.stripJobParams)
0394 if self.jobSpecFileFormat == "cgi":
0395 jobSpecFile.write(urlencode(jobParams))
0396 else:
0397 json.dump({jobSpec.PandaID: jobParams}, jobSpecFile)
0398
0399 with open(xmlFilePath, "w") as pfcFile:
0400 pfcFile.write(pfc)
0401
0402 for fileSpec in jobSpec.inFiles:
0403 if fileSpec.path is None:
0404 continue
0405 dstPath = os.path.join(accessPoint, fileSpec.lfn)
0406 if fileSpec.path != dstPath:
0407
0408 if os.path.exists(dstPath):
0409 os.unlink(dstPath)
0410 tmpLog.debug(f"removing existing symlink {dstPath}")
0411 os.symlink(fileSpec.path, dstPath)
0412 pandaIDs.append(jobSpec.PandaID)
0413 except Exception:
0414 core_utils.dump_error_message(tmpLog)
0415 retVal = False
0416
0417 try:
0418 jsonFilePath = os.path.join(workspec.get_access_point(), self.pandaIDsFile)
0419 with open(jsonFilePath, "w") as jsonPandaIDsFile:
0420 json.dump(pandaIDs, jsonPandaIDsFile)
0421 except Exception:
0422 core_utils.dump_error_message(tmpLog)
0423 retVal = False
0424
0425 try:
0426 reqFilePath = os.path.join(workspec.get_access_point(), self.jsonJobRequestFileName)
0427 os.remove(reqFilePath)
0428 except Exception:
0429 pass
0430 tmpLog.debug("done")
0431 return retVal
0432
0433
0434
0435 def events_requested(self, workspec):
0436
0437 tmpLog = core_utils.make_logger(_logger, f"workerID={workspec.workerID}", method_name="events_requested")
0438
0439 jsonFilePath = os.path.join(workspec.get_access_point(), self.jsonEventsRequestFileName)
0440 tmpLog.debug(f"looking for event request file {jsonFilePath}")
0441 if not os.path.exists(jsonFilePath):
0442
0443 tmpLog.debug("not found")
0444 return {}
0445 try:
0446 with open(jsonFilePath) as jsonFile:
0447 retDict = json.load(jsonFile)
0448 except Exception:
0449 tmpLog.debug("failed to load json")
0450 return {}
0451 tmpLog.debug("found")
0452 return retDict
0453
0454
0455
0456 def feed_events(self, workspec, events_dict):
0457
0458 tmpLog = core_utils.make_logger(_logger, f"workerID={workspec.workerID}", method_name="feed_events")
0459 retVal = True
0460 if workspec.mapType in [WorkSpec.MT_OneToOne, WorkSpec.MT_MultiWorkers]:
0461
0462 jsonFilePath = os.path.join(workspec.get_access_point(), self.jsonEventsFeedFileName)
0463 tmpLog.debug(f"feeding events to {jsonFilePath}")
0464 try:
0465 with open(jsonFilePath, "w") as jsonFile:
0466 json.dump(events_dict, jsonFile)
0467 except Exception:
0468 core_utils.dump_error_message(tmpLog)
0469 retVal = False
0470 elif workspec.mapType == WorkSpec.MT_MultiJobs:
0471
0472 pass
0473
0474 try:
0475 jsonFilePath = os.path.join(workspec.get_access_point(), self.jsonEventsRequestFileName)
0476 os.remove(jsonFilePath)
0477 except Exception:
0478 pass
0479 tmpLog.debug("done")
0480 return retVal
0481
0482
0483
0484 def events_to_update(self, workspec):
0485
0486 tmpLog = core_utils.make_logger(_logger, f"workerID={workspec.workerID}", method_name="events_to_update")
0487
0488 retDict = dict()
0489 for pandaID in workspec.pandaid_list:
0490
0491 accessPoint = self.get_access_point(workspec, pandaID)
0492
0493 jsonFilePath = os.path.join(accessPoint, self.jsonEventsUpdateFileName)
0494 readJsonPath = jsonFilePath + suffixReadJson
0495
0496 tmpLog.debug(f"looking for event update file {readJsonPath}")
0497 if os.path.exists(readJsonPath):
0498 pass
0499 else:
0500 tmpLog.debug(f"looking for event update file {jsonFilePath}")
0501 if not os.path.exists(jsonFilePath):
0502
0503 tmpLog.debug("not found")
0504 continue
0505 try:
0506
0507 os.rename(jsonFilePath, readJsonPath)
0508 except Exception:
0509 tmpLog.error("failed to rename json")
0510 continue
0511
0512 nData = 0
0513 try:
0514 with open(readJsonPath) as jsonFile:
0515 tmpOrigDict = json.load(jsonFile)
0516 newDict = dict()
0517
0518 for tmpPandaID, tmpDict in tmpOrigDict.items():
0519 tmpPandaID = int(tmpPandaID)
0520 retDict[tmpPandaID] = tmpDict
0521 nData += len(tmpDict)
0522 except Exception:
0523 tmpLog.error("failed to load json")
0524
0525 if nData == 0:
0526 try:
0527 os.remove(readJsonPath)
0528 except Exception:
0529 pass
0530 tmpLog.debug(f"got {nData} events for PandaID={pandaID}")
0531 return retDict
0532
0533
0534
0535 def acknowledge_events_files(self, workspec):
0536
0537 tmpLog = core_utils.make_logger(_logger, f"workerID={workspec.workerID}", method_name="acknowledge_events_files")
0538
0539 for pandaID in workspec.pandaid_list:
0540 accessPoint = self.get_access_point(workspec, pandaID)
0541 try:
0542 jsonFilePath = os.path.join(accessPoint, self.jsonEventsUpdateFileName)
0543 jsonFilePath += suffixReadJson
0544 jsonFilePath_rename = jsonFilePath + "." + core_utils.naive_utcnow().strftime("%Y-%m-%d_%H_%M_%S.%f")
0545 os.rename(jsonFilePath, jsonFilePath_rename)
0546 except Exception:
0547 pass
0548 try:
0549 jsonFilePath = os.path.join(accessPoint, self.jsonOutputsFileName)
0550 jsonFilePath += suffixReadJson
0551 jsonFilePath_rename = jsonFilePath + "." + core_utils.naive_utcnow().strftime("%Y-%m-%d_%H_%M_%S.%f")
0552 os.rename(jsonFilePath, jsonFilePath_rename)
0553 except Exception:
0554 pass
0555 tmpLog.debug("done")
0556 return
0557
0558
0559 def setup_access_points(self, workspec_list):
0560 try:
0561 for workSpec in workspec_list:
0562 accessPoint = workSpec.get_access_point()
0563
0564 if os.path.exists(accessPoint) and workSpec.isNew:
0565 shutil.rmtree(accessPoint, ignore_errors=True)
0566
0567 if not os.path.exists(accessPoint):
0568 os.makedirs(accessPoint)
0569 jobSpecs = workSpec.get_jobspec_list()
0570 if jobSpecs is not None:
0571 for jobSpec in jobSpecs:
0572 subAccessPoint = self.get_access_point(workSpec, jobSpec.PandaID)
0573 if accessPoint != subAccessPoint:
0574 if not os.path.exists(subAccessPoint):
0575 os.mkdir(subAccessPoint)
0576
0577 taskAccessDir = self.get_task_access_point(workSpec, jobSpec)
0578 if taskAccessDir:
0579 if not os.path.exists(taskAccessDir):
0580 os.mkdir(taskAccessDir)
0581 with open(os.path.join(subAccessPoint, taskWorkDirPathFile), "w") as f:
0582 f.write(taskAccessDir)
0583 return True
0584 except Exception:
0585
0586 tmpLog = core_utils.make_logger(_logger, method_name="setup_access_points")
0587 core_utils.dump_error_message(tmpLog)
0588 return False
0589
0590
0591 def filter_log_tgz(self, name):
0592 for tmpPatt in ["*.log", "*.txt", "*.xml", "*.json", "log*"]:
0593 if fnmatch.fnmatch(name, tmpPatt):
0594 return True
0595 return False
0596
0597
0598 def post_processing(self, workspec, jobspec_list, map_type):
0599
0600 tmpLog = core_utils.make_logger(_logger, f"workerID={workspec.workerID}", method_name="post_processing")
0601 try:
0602 for jobSpec in jobspec_list:
0603
0604 hasLog = False
0605 for fileSpec in jobSpec.outFiles:
0606 if fileSpec.fileType == "log":
0607 hasLog = True
0608 break
0609 fileDict = dict()
0610 accessPoint = self.get_access_point(workspec, jobSpec.PandaID)
0611 origAccessPoint = accessPoint
0612 if self.postProcessInSubDir:
0613 accessPoint = os.path.join(accessPoint, str(jobSpec.PandaID))
0614
0615 if not hasLog:
0616 logFileInfo = jobSpec.get_logfile_info()
0617
0618 logFilePath = os.path.join(accessPoint, logFileInfo["lfn"])
0619 if map_type == WorkSpec.MT_MultiWorkers:
0620
0621 logFilePath += f"._{workspec.workerID}"
0622 tmpLog.debug(f"making {logFilePath}")
0623 dirs = [os.path.join(accessPoint, name) for name in os.listdir(accessPoint) if os.path.isdir(os.path.join(accessPoint, name))]
0624
0625 tmpLog.debug(f"tar for {len(dirs)} sub dirs")
0626 with Pool(max_workers=self.maxWorkersForZip if self.maxWorkersForZip else multiprocessing.cpu_count()) as pool:
0627 retValList = pool.map(
0628 lambda x, y: tar_directory(x, self.jobSpecFileName, sub_tarball_name=y), dirs, itertools.repeat(self.subTarballName)
0629 )
0630 for dirName, (comStr, retCode, stdOut, stdErr) in zip(dirs, retValList):
0631 if retCode != 0:
0632 tmpLog.warning(f"failed to sub-tar {dirName} with {comStr} -> {stdOut}:{stdErr}")
0633
0634 tmpLog.debug("tar for main dir")
0635 comStr, retCode, stdOut, stdErr = tar_directory(accessPoint, self.jobSpecFileName, logFilePath, 1, ["*.subdir.tar.gz"])
0636 tmpLog.debug("used command : " + comStr)
0637 if retCode != 0:
0638 tmpLog.warning(f"failed to tar {accessPoint} with {comStr} -> {stdOut}:{stdErr}")
0639
0640 fileDict.setdefault(jobSpec.PandaID, [])
0641 fileDict[jobSpec.PandaID].append({"path": logFilePath, "type": "log", "isZip": 0})
0642
0643 if self.scanInPostProcess:
0644 tmpLog.debug(f"scanning leftovers in {accessPoint}")
0645
0646 dirs = []
0647 if self.outputSubDir is None:
0648 dirs = [os.path.join(accessPoint, name) for name in os.listdir(accessPoint) if os.path.isdir(os.path.join(accessPoint, name))]
0649 else:
0650
0651 upperdirs = [os.path.join(accessPoint, name) for name in os.listdir(accessPoint) if os.path.isdir(os.path.join(accessPoint, name))]
0652 dirs = [os.path.join(dirname, self.outputSubDir) for dirname in upperdirs if os.path.isdir(os.path.join(dirname, self.outputSubDir))]
0653 patterns = []
0654 patterns_zip = []
0655 for tmp_patterns, tmp_left_over_patterns in [[patterns, self.leftOverPatterns], [patterns_zip, self.leftOverZipPatterns]]:
0656 if tmp_left_over_patterns is None:
0657 continue
0658 for scanPat in tmp_left_over_patterns:
0659
0660 if "%PANDAID" in scanPat:
0661 scanPat = scanPat.replace("%PANDAID", str(jobSpec.PandaID))
0662 if "%TASKID" in scanPat:
0663 scanPat = scanPat.replace("%TASKID", str(jobSpec.taskID))
0664 if "%OUTPUT_FILE" in scanPat:
0665 logFileName = jobSpec.get_logfile_info()["lfn"]
0666 for outputName in jobSpec.get_output_file_attributes().keys():
0667 if outputName == logFileName:
0668 continue
0669 tmp_patterns.append(scanPat.replace("%OUTPUT_FILE", outputName))
0670 else:
0671 tmp_patterns.append(scanPat)
0672
0673 nLeftOvers = 0
0674 with Pool(max_workers=self.maxWorkersForZip if self.maxWorkersForZip else multiprocessing.cpu_count()) as pool:
0675 retValList = pool.map(scan_files_in_dir, dirs, [patterns] * len(dirs), [patterns_zip] * len(dirs))
0676 for retVal in retValList:
0677 fileDict.setdefault(jobSpec.PandaID, [])
0678 fileDict[jobSpec.PandaID] += retVal
0679 nLeftOvers += len(retVal)
0680 tmpLog.debug(f"got {nLeftOvers} leftovers")
0681
0682 taskAccessDir = self.get_task_access_point(workspec, jobSpec)
0683 if taskAccessDir:
0684 doneInputs = set()
0685 taskWorkStatePath = os.path.join(taskAccessDir, self.taskWorkStateFile)
0686 if os.path.exists(taskWorkStatePath):
0687 nInTaskState = 0
0688 with open(taskWorkStatePath) as f:
0689 try:
0690 tmpData = json.load(f)
0691 if "merged" in tmpData:
0692 output_lfns = set()
0693 fileDict.setdefault(jobSpec.PandaID, [])
0694 for tmpIn, tmpOuts in tmpData["merged"].items():
0695 for tmpLFN, tmpFileDict in tmpOuts.items():
0696 if tmpLFN in output_lfns:
0697 continue
0698 output_lfns.add(tmpLFN)
0699 nInTaskState += 1
0700 pfn = tmpFileDict["path"]
0701 if "fsize" not in tmpFileDict:
0702 tmpFileDict["fsize"] = os.stat(pfn).st_size
0703 tmpFileDict["type"] = "output"
0704 if "guid" not in tmpFileDict:
0705 tmpFileDict["guid"] = str(uuid.uuid4())
0706 if "chksum" not in tmpFileDict:
0707 tmpFileDict["chksum"] = core_utils.calc_adler32(pfn)
0708 fileDict.setdefault(jobSpec.PandaID, [])
0709 fileDict[jobSpec.PandaID].append(tmpFileDict)
0710 doneInputs.add(tmpIn)
0711 except Exception:
0712 core_utils.dump_error_message(tmpLog)
0713 tmpLog.error(f"failed to parse task-level work state file {taskWorkStatePath}")
0714 raise
0715 tmpLog.debug(f"got {nInTaskState} output files from task state file")
0716
0717 skippedInputs = [fileSpec.lfn for fileSpec in jobSpec.inFiles if fileSpec.lfn not in doneInputs]
0718 with open(os.path.join(accessPoint, postProcessAttrs), "w") as f:
0719 json.dump({"skippedInputs": skippedInputs}, f)
0720 tmpLog.debug(f"set {len(skippedInputs)} input files to skip")
0721
0722 if len(fileDict) > 0:
0723 jsonFilePath = os.path.join(origAccessPoint, self.jsonOutputsFileName)
0724 with open(jsonFilePath, "w") as jsonFile:
0725 json.dump(fileDict, jsonFile)
0726 tmpLog.debug("done")
0727 return True
0728 except Exception:
0729 core_utils.dump_error_message(tmpLog)
0730 return None
0731
0732
0733 def get_panda_ids(self, workspec):
0734
0735 tmpLog = core_utils.make_logger(_logger, f"workerID={workspec.workerID}", method_name="get_panda_ids")
0736
0737 jsonFilePath = os.path.join(workspec.get_access_point(), self.pandaIDsFile)
0738 tmpLog.debug(f"looking for PandaID file {jsonFilePath}")
0739 retVal = []
0740 if not os.path.exists(jsonFilePath):
0741
0742 tmpLog.debug("not found")
0743 return retVal
0744 try:
0745 with open(jsonFilePath) as jsonFile:
0746 retVal = json.load(jsonFile)
0747 except Exception:
0748 tmpLog.debug("failed to load json")
0749 return retVal
0750 tmpLog.debug("found")
0751 return retVal
0752
0753
0754 def kill_requested(self, workspec):
0755
0756 tmpLog = core_utils.make_logger(_logger, f"workerID={workspec.workerID}", method_name="kill_requested")
0757
0758 jsonFilePath = os.path.join(workspec.get_access_point(), self.killWorkerFile)
0759 tmpLog.debug(f"looking for kill request file {jsonFilePath}")
0760 if not os.path.exists(jsonFilePath):
0761
0762 tmpLog.debug("not found")
0763 return False
0764 tmpLog.debug("kill requested")
0765 return True
0766
0767
0768 def is_alive(self, workspec, time_limit):
0769
0770 tmpLog = core_utils.make_logger(_logger, f"workerID={workspec.workerID}", method_name="is_alive")
0771
0772 jsonFilePath = os.path.join(workspec.get_access_point(), self.heartbeatFile)
0773 tmpLog.debug(f"looking for heartbeat file {jsonFilePath}")
0774 if not os.path.exists(jsonFilePath):
0775 tmpLog.debug(f"startTime: {workspec.startTime}, now: {core_utils.naive_utcnow()}")
0776 if not workspec.startTime:
0777
0778 tmpLog.debug("heartbeat not found, but no startTime yet for worker")
0779 return True
0780 elif core_utils.naive_utcnow() - workspec.startTime < datetime.timedelta(minutes=time_limit):
0781
0782 tmpLog.debug("heartbeat not found, but worker too young")
0783 return True
0784 else:
0785
0786 tmpLog.debug("not found")
0787 return None
0788 try:
0789 mtime = core_utils.naive_utcfromtimestamp(os.path.getmtime(jsonFilePath))
0790 tmpLog.debug(f"last modification time : {mtime}")
0791 if core_utils.naive_utcnow() - mtime > datetime.timedelta(minutes=time_limit):
0792 tmpLog.debug("too old")
0793 return False
0794 tmpLog.debug("OK")
0795 return True
0796 except Exception:
0797 tmpLog.debug("failed to get mtime")
0798 return None
0799
0800
0801
0802 def clean_up(self, workspec):
0803
0804 tmpLog = core_utils.make_logger(_logger, f"workerID={workspec.workerID}", method_name="clean_up")
0805
0806 errStr = ""
0807 worker_accessPoint = workspec.get_access_point()
0808 if os.path.isdir(worker_accessPoint):
0809 try:
0810 shutil.rmtree(worker_accessPoint)
0811 except Exception as _e:
0812 errStr = f"failed to remove directory {worker_accessPoint} : {_e}"
0813 tmpLog.error(errStr)
0814 else:
0815 tmpLog.debug("done")
0816 return (True, errStr)
0817 elif not os.path.exists(worker_accessPoint):
0818 tmpLog.debug("accessPoint directory already gone. Skipped")
0819 return (None, errStr)
0820 else:
0821 errStr = f"{worker_accessPoint} is not a directory"
0822 tmpLog.error(errStr)
0823 return (False, errStr)