File indexing completed on 2026-04-10 08:39:01
0001 import datetime
0002 import os
0003 import re
0004 import sys
0005 import threading
0006 import time
0007 import traceback
0008
0009 from pandacommon.pandalogger.PandaLogger import PandaLogger
0010 from pandacommon.pandautils.PandaUtils import naive_utcnow
0011 from pandacommon.pandautils.thread_utils import GenericThread
0012
0013 import pandaserver.taskbuffer.ErrorCode
0014 from pandaserver.brokerage.SiteMapper import SiteMapper
0015 from pandaserver.config import panda_config
0016 from pandaserver.dataservice import DataServiceUtils
0017 from pandaserver.dataservice.closer import Closer
0018 from pandaserver.dataservice.DataServiceUtils import select_scope
0019 from pandaserver.dataservice.ddm import rucioAPI
0020 from pandaserver.dataservice.finisher import Finisher
0021 from pandaserver.taskbuffer import EventServiceUtils
0022
0023 _logger = PandaLogger().getLogger("datasetManager")
0024
0025 TRANSFER_TIMEOUT_HI_PRIORITY = 2
0026 TRANSFER_TIMEOUT_LO_PRIORITY = 6
0027
0028
0029 def main(tbuf=None, **kwargs):
0030 _logger.debug("===================== start =====================")
0031
0032
0033 def _memoryCheck(str):
0034 try:
0035 proc_status = f"/proc/{os.getpid()}/status"
0036 procfile = open(proc_status)
0037 name = ""
0038 vmSize = ""
0039 vmRSS = ""
0040
0041 for line in procfile:
0042 if line.startswith("Name:"):
0043 name = line.split()[-1]
0044 continue
0045 if line.startswith("VmSize:"):
0046 vmSize = ""
0047 for item in line.split()[1:]:
0048 vmSize += item
0049 continue
0050 if line.startswith("VmRSS:"):
0051 vmRSS = ""
0052 for item in line.split()[1:]:
0053 vmRSS += item
0054 continue
0055 procfile.close()
0056 _logger.debug(f"MemCheck - {os.getpid()} Name={name} VSZ={vmSize} RSS={vmRSS} : {str}")
0057 except Exception:
0058 type, value, traceBack = sys.exc_info()
0059 _logger.error(f"memoryCheck() : {type} {value}")
0060 _logger.debug(f"MemCheck - {os.getpid()} unknown : {str}")
0061 return
0062
0063 _memoryCheck("start")
0064
0065 from pandaserver.taskbuffer.TaskBuffer import taskBuffer
0066
0067 requester_id = GenericThread().get_full_id(__name__, sys.modules[__name__].__file__)
0068 taskBuffer.init(
0069 panda_config.dbhost,
0070 panda_config.dbpasswd,
0071 nDBConnection=1,
0072 useTimeout=True,
0073 requester=requester_id,
0074 )
0075
0076
0077
0078
0079 siteMapper = SiteMapper(taskBuffer)
0080
0081
0082 class ListWithLock:
0083 def __init__(self):
0084 self.lock = threading.Lock()
0085 self.list = []
0086
0087 def __contains__(self, item):
0088 self.lock.acquire()
0089 ret = self.list.__contains__(item)
0090 self.lock.release()
0091 return ret
0092
0093 def append(self, item):
0094 appended = False
0095 self.lock.acquire()
0096 if item not in self.list:
0097 self.list.append(item)
0098 appended = True
0099 self.lock.release()
0100 return appended
0101
0102
0103 deletedDisList = ListWithLock()
0104
0105
0106 def setTobeDeletedToDis(subDsName):
0107 try:
0108
0109 if subDsName.startswith("user") or subDsName.startswith("group") or re.search("_sub\d+$", subDsName) is None:
0110 return
0111
0112 disNameList = taskBuffer.getAssociatedDisDatasets(subDsName)
0113 _logger.debug(f"setTobeDeletedToDis : sub:{subDsName} has dis:{str(disNameList)}")
0114
0115 for tmpDisName in disNameList:
0116
0117 if not deletedDisList.append(tmpDisName):
0118
0119 continue
0120
0121 if re.search("_dis\d+$", tmpDisName) is None:
0122 continue
0123
0124 _logger.debug(f"setTobeDeletedToDis : try to get {tmpDisName} in DB")
0125 tmpDS = taskBuffer.queryDatasetWithMap({"name": tmpDisName})
0126 if tmpDS is None:
0127 _logger.error(f"setTobeDeletedToDis : cannot get {tmpDisName} in DB")
0128 continue
0129
0130 if tmpDS.status in ["tobedeleted", "deleted"]:
0131 _logger.debug(f"setTobeDeletedToDis : skip {tmpDisName} since status={tmpDS.status}")
0132 continue
0133
0134 if tmpDS.currentfiles == 0:
0135
0136 tmpDS.status = "deleting"
0137 excStatus = "deleted"
0138 else:
0139
0140 tmpDS.status = "shortening"
0141 excStatus = "shortened"
0142
0143 retU = taskBuffer.updateDatasets(
0144 [tmpDS],
0145 withLock=True,
0146 withCriteria="status<>:crStatus",
0147 criteriaMap={":crStatus": excStatus},
0148 )
0149 _logger.debug(f"setTobeDeletedToDis : set {tmpDS.status} to {tmpDisName} with {str(retU)}")
0150 except Exception:
0151 errType, errValue = sys.exc_info()[:2]
0152 _logger.error(f"setTobeDeletedToDis : {subDsName} {errType} {errValue}")
0153
0154
0155 class ThreadPool:
0156 def __init__(self):
0157 self.lock = threading.Lock()
0158 self.list = []
0159
0160 def add(self, obj):
0161 self.lock.acquire()
0162 self.list.append(obj)
0163 self.lock.release()
0164
0165 def remove(self, obj):
0166 self.lock.acquire()
0167 self.list.remove(obj)
0168 self.lock.release()
0169
0170 def join(self):
0171 self.lock.acquire()
0172 thrlist = tuple(self.list)
0173 self.lock.release()
0174 for thr in thrlist:
0175 thr.join()
0176
0177
0178 class CloserThr(threading.Thread):
0179 def __init__(self, lock, proxyLock, datasets, pool):
0180 threading.Thread.__init__(self)
0181 self.datasets = datasets
0182 self.lock = lock
0183 self.proxyLock = proxyLock
0184 self.pool = pool
0185 self.pool.add(self)
0186
0187 def run(self):
0188 self.lock.acquire()
0189 try:
0190
0191 for vuid, name, modDate in self.datasets:
0192 _logger.debug(f"Close {modDate} {name}")
0193 dsExists = True
0194 if name.startswith("user.") or name.startswith("group.") or name.startswith("hc_test.") or name.startswith("panda.um."):
0195 dsExists = False
0196 if dsExists:
0197
0198 status, out = rucioAPI.get_metadata(name)
0199 if status is True:
0200 if out is not None:
0201 try:
0202 rucioAPI.close_dataset(name)
0203 status = True
0204 except Exception:
0205 errtype, errvalue = sys.exc_info()[:2]
0206 out = f"failed to freeze : {errtype} {errvalue}"
0207 status = False
0208 else:
0209
0210 status, out = True, ""
0211 dsExists = False
0212 else:
0213 status, out = True, ""
0214 if not status:
0215 _logger.error(f"{name} failed to close with {out}")
0216 else:
0217 self.proxyLock.acquire()
0218 varMap = {}
0219 varMap[":vuid"] = vuid
0220 varMap[":newstatus"] = "completed"
0221 varMap[":oldstatus"] = "tobeclosed"
0222 taskBuffer.querySQLS(
0223 "UPDATE ATLAS_PANDA.Datasets SET status=:newstatus,modificationdate=CURRENT_DATE WHERE vuid=:vuid AND status=:oldstatus",
0224 varMap,
0225 )
0226 self.proxyLock.release()
0227
0228 setTobeDeletedToDis(name)
0229
0230 if not dsExists:
0231 continue
0232
0233 status, out = rucioAPI.get_number_of_files(name)
0234 if status is not True:
0235 if status is False:
0236 _logger.error(out)
0237 else:
0238 _logger.debug(out)
0239 try:
0240 nFile = int(out)
0241 if nFile == 0:
0242
0243 _logger.debug(f"erase {name}")
0244 status, out = rucioAPI.erase_dataset(name)
0245 _logger.debug(f"OK with {name}")
0246 except Exception:
0247 pass
0248 except Exception:
0249 pass
0250 self.pool.remove(self)
0251 self.lock.release()
0252
0253
0254 _logger.debug("==== close datasets ====")
0255 timeLimitU = naive_utcnow() - datetime.timedelta(minutes=1)
0256 timeLimitL = naive_utcnow() - datetime.timedelta(days=3)
0257 closeLock = threading.Semaphore(5)
0258 closeProxyLock = threading.Lock()
0259 closeThreadPool = ThreadPool()
0260 maxRows = 100000
0261 while True:
0262
0263 closeLock.acquire()
0264
0265 closeProxyLock.acquire()
0266 varMap = {}
0267 varMap[":modificationdateU"] = timeLimitU
0268 varMap[":modificationdateL"] = timeLimitL
0269 varMap[":type"] = "output"
0270 varMap[":status"] = "tobeclosed"
0271 sqlQuery = f"type=:type AND status=:status AND (modificationdate BETWEEN :modificationdateL AND :modificationdateU) AND rownum <= {maxRows}"
0272 res = taskBuffer.getLockDatasets(sqlQuery, varMap, modTimeOffset="90/24/60")
0273 if res is None:
0274 _logger.debug(f"# of datasets to be closed: {res}")
0275 else:
0276 _logger.debug(f"# of datasets to be closed: {len(res)}")
0277 if res is None or len(res) == 0:
0278 closeProxyLock.release()
0279 closeLock.release()
0280 break
0281
0282 closeProxyLock.release()
0283 closeLock.release()
0284
0285 iRows = 0
0286 nRows = 500
0287 while iRows < len(res):
0288 closerThr = CloserThr(closeLock, closeProxyLock, res[iRows : iRows + nRows], closeThreadPool)
0289 closerThr.start()
0290 iRows += nRows
0291 closeThreadPool.join()
0292 if len(res) < maxRows:
0293 break
0294
0295
0296 class Freezer(threading.Thread):
0297 def __init__(self, lock, proxyLock, datasets, pool):
0298 threading.Thread.__init__(self)
0299 self.datasets = datasets
0300 self.lock = lock
0301 self.proxyLock = proxyLock
0302 self.pool = pool
0303 self.pool.add(self)
0304
0305 def run(self):
0306 self.lock.acquire()
0307 try:
0308 for vuid, name, modDate in self.datasets:
0309 _logger.debug(f"Freezer start {modDate} {name}")
0310 self.proxyLock.acquire()
0311 retF, resF = taskBuffer.querySQLS(
0312 "SELECT /*+ index(tab FILESTABLE4_DESTDBLOCK_IDX) */ PandaID,status FROM ATLAS_PANDA.filesTable4 tab WHERE destinationDBlock=:destinationDBlock ",
0313 {":destinationDBlock": name},
0314 )
0315 self.proxyLock.release()
0316 if isinstance(retF, int) and retF < 0:
0317 _logger.error("SQL error")
0318 else:
0319 allFinished = True
0320 onePandaID = None
0321 for tmpPandaID, tmpFileStatus in resF:
0322 onePandaID = tmpPandaID
0323 if tmpFileStatus not in [
0324 "ready",
0325 "failed",
0326 "skipped",
0327 "merging",
0328 "finished",
0329 ]:
0330 allFinished = False
0331 break
0332
0333 if allFinished:
0334 self.proxyLock.acquire()
0335 tmpJobs = taskBuffer.getFullJobStatus([onePandaID])
0336 self.proxyLock.release()
0337 if len(tmpJobs) > 0 and tmpJobs[0] is not None:
0338 if EventServiceUtils.isEventServiceMerge(tmpJobs[0]):
0339 self.proxyLock.acquire()
0340 cThr = Closer(taskBuffer, [], tmpJobs[0])
0341 allFinished = cThr.checkSubDatasetsInJobset()
0342 self.proxyLock.release()
0343 _logger.debug(f"closer checked sub datasets in the jobset for {name} : {allFinished}")
0344
0345 if allFinished:
0346 _logger.debug(f"freeze {name} ")
0347 dsExists = True
0348 if name.startswith("user.") or name.startswith("group.") or name.startswith("hc_test.") or name.startswith("panda.um."):
0349 dsExists = False
0350 if name.startswith("panda.um."):
0351 self.proxyLock.acquire()
0352 retMer, resMer = taskBuffer.querySQLS(
0353 "SELECT /*+ index(tab FILESTABLE4_DESTDBLOCK_IDX) */ PandaID FROM ATLAS_PANDA.filesTable4 tab WHERE destinationDBlock=:destinationDBlock AND status IN (:statusM,:statusF) ",
0354 {
0355 ":destinationDBlock": name,
0356 ":statusM": "merging",
0357 ":statusF": "failed",
0358 },
0359 )
0360 self.proxyLock.release()
0361 if resMer is not None and len(resMer) > 0:
0362 mergeID = resMer[0][0]
0363
0364 self.proxyLock.acquire()
0365 mergingJobs = taskBuffer.peekJobs(
0366 [mergeID],
0367 fromDefined=False,
0368 fromArchived=False,
0369 fromWaiting=False,
0370 )
0371 self.proxyLock.release()
0372 mergeJob = mergingJobs[0]
0373 if mergeJob is not None:
0374 tmpDestDBlocks = []
0375
0376 for tmpFile in mergeJob.Files:
0377 if tmpFile.type in ["output", "log"]:
0378 if tmpFile.destinationDBlock not in tmpDestDBlocks:
0379 tmpDestDBlocks.append(tmpFile.destinationDBlock)
0380
0381 _logger.debug(f"start JEDI closer for {name} ")
0382 self.proxyLock.acquire()
0383 cThr = Closer(taskBuffer, tmpDestDBlocks, mergeJob)
0384 cThr.run()
0385 self.proxyLock.release()
0386 _logger.debug(f"end JEDI closer for {name} ")
0387 continue
0388 else:
0389 _logger.debug(f"failed to get merging job for {name} ")
0390 else:
0391 _logger.debug(f"failed to get merging file for {name} ")
0392 status, out = True, ""
0393 elif dsExists:
0394
0395 status, out = rucioAPI.get_metadata(name)
0396 if status is True:
0397 if out is not None:
0398 try:
0399 rucioAPI.close_dataset(name)
0400 status = True
0401 except Exception:
0402 errtype, errvalue = sys.exc_info()[:2]
0403 out = f"failed to freeze : {errtype} {errvalue}"
0404 status = False
0405 else:
0406
0407 status, out = True, ""
0408 dsExists = False
0409 else:
0410 status, out = True, ""
0411 if not status:
0412 _logger.error(f"{name} failed to freeze with {out}")
0413 else:
0414 self.proxyLock.acquire()
0415 varMap = {}
0416 varMap[":vuid"] = vuid
0417 varMap[":status"] = "completed"
0418 taskBuffer.querySQLS(
0419 "UPDATE ATLAS_PANDA.Datasets SET status=:status,modificationdate=CURRENT_DATE WHERE vuid=:vuid",
0420 varMap,
0421 )
0422 self.proxyLock.release()
0423 if name.startswith("panda.um.") or not dsExists:
0424 continue
0425
0426 setTobeDeletedToDis(name)
0427
0428 status, out = rucioAPI.get_number_of_files(name)
0429 if status is not True:
0430 if status is False:
0431 _logger.error(out)
0432 else:
0433 _logger.debug(out)
0434 try:
0435 nFile = int(out)
0436 _logger.debug(nFile)
0437 if nFile == 0:
0438
0439 _logger.debug(f"erase {name}")
0440 status, out = rucioAPI.erase_dataset(name)
0441 _logger.debug(f"OK with {name}")
0442 except Exception:
0443 pass
0444 else:
0445 _logger.debug(f"wait {name} ")
0446 self.proxyLock.acquire()
0447 taskBuffer.querySQLS(
0448 "UPDATE ATLAS_PANDA.Datasets SET modificationdate=CURRENT_DATE WHERE vuid=:vuid",
0449 {":vuid": vuid},
0450 )
0451 self.proxyLock.release()
0452 _logger.debug(f"end {name} ")
0453 except Exception:
0454 errStr = traceback.format_exc()
0455 _logger.error(errStr)
0456 self.pool.remove(self)
0457 self.lock.release()
0458
0459
0460 _logger.debug("==== freeze datasets ====")
0461 timeLimitRU = naive_utcnow() - datetime.timedelta(hours=3)
0462 timeLimitRL = naive_utcnow() - datetime.timedelta(hours=12)
0463 timeLimitU = naive_utcnow() - datetime.timedelta(hours=6)
0464 timeLimitL = naive_utcnow() - datetime.timedelta(days=14)
0465
0466 sql = "SELECT name FROM ATLAS_PANDA.Datasets "
0467 sql += "WHERE type=:type AND (modificationdate BETWEEN :modificationdateRL AND :modificationdateRU) AND subType=:subType AND status=:oldStatus "
0468 varMap = {}
0469 varMap[":modificationdateRU"] = timeLimitRU
0470 varMap[":modificationdateRL"] = timeLimitRL
0471 varMap[":type"] = "output"
0472 varMap[":subType"] = "sub"
0473 varMap[":oldStatus"] = "doing"
0474 retReset, resReset = taskBuffer.querySQLS(sql, varMap)
0475 sql = "UPDATE ATLAS_PANDA.Datasets SET status=:newStatus,modificationdate=:modificationdateU WHERE name=:name AND status=:oldStatus "
0476 if resReset is not None:
0477 for (name,) in resReset:
0478 varMap = {}
0479 varMap[":name"] = name
0480 varMap[":oldStatus"] = "doing"
0481 varMap[":newStatus"] = "running"
0482 varMap[":modificationdateU"] = timeLimitU
0483 _logger.debug(f"reset {name} to freeze")
0484 taskBuffer.querySQLS(sql, varMap)
0485
0486 freezeLock = threading.Semaphore(5)
0487 freezeProxyLock = threading.Lock()
0488 freezeThreadPool = ThreadPool()
0489 maxRows = 100000
0490 while True:
0491
0492 freezeLock.acquire()
0493
0494 sqlQuery = (
0495 "type=:type AND status IN (:status1,:status2,:status3,:status4,:status5) "
0496 + f"AND (modificationdate BETWEEN :modificationdateL AND :modificationdateU) AND subType=:subType AND rownum <= {maxRows}"
0497 )
0498 varMap = {}
0499 varMap[":modificationdateU"] = timeLimitU
0500 varMap[":modificationdateL"] = timeLimitL
0501 varMap[":type"] = "output"
0502 varMap[":status1"] = "running"
0503 varMap[":status2"] = "created"
0504 varMap[":status3"] = "defined"
0505 varMap[":status4"] = "locked"
0506 varMap[":status5"] = "doing"
0507 varMap[":subType"] = "sub"
0508 freezeProxyLock.acquire()
0509 res = taskBuffer.getLockDatasets(sqlQuery, varMap, modTimeOffset="90/24/60")
0510 if res is None:
0511 _logger.debug(f"# of datasets to be frozen: {res}")
0512 else:
0513 _logger.debug(f"# of datasets to be frozen: {len(res)}")
0514 if res is None or len(res) == 0:
0515 freezeProxyLock.release()
0516 freezeLock.release()
0517 break
0518 freezeProxyLock.release()
0519
0520 freezeLock.release()
0521
0522 iRows = 0
0523 nRows = 500
0524 while iRows < len(res):
0525 freezer = Freezer(
0526 freezeLock,
0527 freezeProxyLock,
0528 res[iRows : iRows + nRows],
0529 freezeThreadPool,
0530 )
0531 freezer.start()
0532 iRows += nRows
0533 freezeThreadPool.join()
0534 if len(res) < maxRows:
0535 break
0536
0537
0538 class EraserThr(threading.Thread):
0539 def __init__(self, lock, proxyLock, datasets, pool, operationType):
0540 threading.Thread.__init__(self)
0541 self.datasets = datasets
0542 self.lock = lock
0543 self.proxyLock = proxyLock
0544 self.pool = pool
0545 self.pool.add(self)
0546 self.operationType = operationType
0547
0548 def run(self):
0549 self.lock.acquire()
0550 try:
0551
0552 for vuid, name, modDate in self.datasets:
0553
0554 if re.search("_dis\d+$", name) is None:
0555 _logger.error(f"Eraser : non disDS {name}")
0556 continue
0557
0558 _logger.debug(f"Eraser {self.operationType} dis {modDate} {name}")
0559
0560 endStatus = "deleted"
0561 status, out = rucioAPI.erase_dataset(name)
0562 if not status:
0563 _logger.error(out)
0564 continue
0565 _logger.debug(f"OK with {name}")
0566
0567 self.proxyLock.acquire()
0568 varMap = {}
0569 varMap[":vuid"] = vuid
0570 varMap[":status"] = endStatus
0571 taskBuffer.querySQLS(
0572 "UPDATE ATLAS_PANDA.Datasets SET status=:status,modificationdate=CURRENT_DATE WHERE vuid=:vuid",
0573 varMap,
0574 )
0575 self.proxyLock.release()
0576 except Exception:
0577 errStr = traceback.format_exc()
0578 _logger.error(errStr)
0579 self.pool.remove(self)
0580 self.lock.release()
0581
0582
0583 _logger.debug("==== delete dis datasets ====")
0584 timeLimitU = naive_utcnow() - datetime.timedelta(minutes=30)
0585 timeLimitL = naive_utcnow() - datetime.timedelta(days=3)
0586 disEraseLock = threading.Semaphore(5)
0587 disEraseProxyLock = threading.Lock()
0588 disEraseThreadPool = ThreadPool()
0589
0590 maxRows = 5000
0591 for targetStatus in ["deleting", "shortening"]:
0592 for i in range(10):
0593
0594 disEraseLock.acquire()
0595
0596 varMap = {}
0597 varMap[":modificationdateU"] = timeLimitU
0598 varMap[":modificationdateL"] = timeLimitL
0599 varMap[":type"] = "dispatch"
0600 varMap[":status"] = targetStatus
0601 sqlQuery = f"type=:type AND status=:status AND (modificationdate BETWEEN :modificationdateL AND :modificationdateU) AND rownum <= {maxRows}"
0602 disEraseProxyLock.acquire()
0603 res = taskBuffer.getLockDatasets(sqlQuery, varMap, modTimeOffset="90/24/60")
0604 if res is None:
0605 _logger.debug(f"# of dis datasets for {targetStatus}: None")
0606 else:
0607 _logger.debug(f"# of dis datasets for {targetStatus}: {len(res)}")
0608 if res is None or len(res) == 0:
0609 disEraseProxyLock.release()
0610 disEraseLock.release()
0611 break
0612 disEraseProxyLock.release()
0613
0614 disEraseLock.release()
0615
0616 iRows = 0
0617 nRows = 500
0618 while iRows < len(res):
0619 disEraser = EraserThr(
0620 disEraseLock,
0621 disEraseProxyLock,
0622 res[iRows : iRows + nRows],
0623 disEraseThreadPool,
0624 targetStatus,
0625 )
0626 disEraser.start()
0627 iRows += nRows
0628 disEraseThreadPool.join()
0629 if len(res) < 100:
0630 break
0631
0632 _memoryCheck("finisher")
0633
0634
0635 class FinisherThr(threading.Thread):
0636 def __init__(self, lock, proxyLock, ids, pool, timeNow):
0637 threading.Thread.__init__(self)
0638 self.ids = ids
0639 self.lock = lock
0640 self.proxyLock = proxyLock
0641 self.pool = pool
0642 self.timeNow = timeNow
0643 self.pool.add(self)
0644
0645 def run(self):
0646 self.lock.acquire()
0647 try:
0648
0649 ids = self.ids
0650 self.proxyLock.acquire()
0651 jobs = taskBuffer.peekJobs(ids, fromDefined=False, fromArchived=False, fromWaiting=False)
0652 self.proxyLock.release()
0653 upJobs = []
0654 finJobs = []
0655 for job in jobs:
0656 if job is None or job.jobStatus == "unknown":
0657 continue
0658 seList = ["dummy"]
0659 tmpNucleus = siteMapper.getNucleus(job.nucleus)
0660
0661 if job.prodSourceLabel == "user" and job.destinationSE not in siteMapper.siteSpecList:
0662
0663 seList = [job.destinationSE]
0664 elif tmpNucleus is not None:
0665 seList = [tmpNucleus.default_ddm_endpoint_out]
0666
0667
0668 lfns = []
0669 guids = []
0670 scopes = []
0671 nTokens = 0
0672 for file in job.Files:
0673
0674 if file.type == "output" or file.type == "log":
0675 if file.status == "nooutput":
0676 continue
0677 if DataServiceUtils.getDistributedDestination(file.destinationDBlockToken) is not None:
0678 continue
0679 lfns.append(file.lfn)
0680 guids.append(file.GUID)
0681 scopes.append(file.scope)
0682 nTokens += len(file.destinationDBlockToken.split(","))
0683
0684 _logger.debug(f"{job.PandaID} Cloud:{job.cloud}")
0685 tmpStat, okFiles = rucioAPI.list_file_replicas(scopes, lfns, seList)
0686 if not tmpStat:
0687 _logger.error(f"{job.PandaID} failed to get file replicas")
0688 okFiles = {}
0689
0690 nOkTokens = 0
0691 for okLFN in okFiles:
0692 okSEs = okFiles[okLFN]
0693 nOkTokens += len(okSEs)
0694
0695 _logger.debug(f"{job.PandaID} nToken:{nTokens} nOkToken:{nOkTokens}")
0696 if nTokens <= nOkTokens:
0697 _logger.debug(f"{job.PandaID} Finisher : Finish")
0698 for file in job.Files:
0699 if file.type == "output" or file.type == "log":
0700 if file.status != "nooutput":
0701 file.status = "ready"
0702
0703 finJobs.append(job)
0704 else:
0705 endTime = job.endTime
0706 if endTime == "NULL":
0707 endTime = job.startTime
0708
0709 if job.currentPriority >= 800 and (job.prodSourceLabel not in ["user"]):
0710 timeOutValue = TRANSFER_TIMEOUT_HI_PRIORITY
0711 else:
0712 timeOutValue = TRANSFER_TIMEOUT_LO_PRIORITY
0713
0714 timeOut = self.timeNow - datetime.timedelta(days=timeOutValue)
0715 _logger.debug(f"{job.PandaID} Priority:{job.currentPriority} Limit:{str(timeOut)} End:{str(endTime)}")
0716 if endTime < timeOut:
0717
0718 _logger.debug(f"{job.PandaID} Finisher : Kill")
0719 strMiss = ""
0720 for lfn in lfns:
0721 if lfn not in okFiles:
0722 strMiss += f" {lfn}"
0723 job.jobStatus = "failed"
0724 job.taskBufferErrorCode = pandaserver.taskbuffer.ErrorCode.EC_Transfer
0725 job.taskBufferErrorDiag = f"transfer timeout for {strMiss}"
0726 guidMap = {}
0727 for file in job.Files:
0728
0729 if file.status == "transferring" or file.type in [
0730 "log",
0731 "output",
0732 ]:
0733 file.status = "failed"
0734
0735 if file.type == "output" or file.type == "log":
0736 if file.destinationDBlock not in guidMap:
0737 guidMap[file.destinationDBlock] = []
0738 guidMap[file.destinationDBlock].append(file.GUID)
0739 else:
0740
0741 _logger.debug(f"{job.PandaID} Finisher : Wait")
0742 for lfn in lfns:
0743 if lfn not in okFiles:
0744 _logger.debug(f"{job.PandaID} -> {lfn}")
0745 upJobs.append(job)
0746
0747 _logger.debug("updating ...")
0748 self.proxyLock.acquire()
0749 taskBuffer.updateJobs(upJobs, False)
0750 self.proxyLock.release()
0751
0752 for job in finJobs:
0753 fThr = Finisher(taskBuffer, None, job)
0754 fThr.run()
0755 _logger.debug("done")
0756 time.sleep(1)
0757 except Exception:
0758 errtype, errvalue = sys.exc_info()[:2]
0759 errStr = f"FinisherThr failed with {errtype} {errvalue}"
0760 errStr += traceback.format_exc()
0761 _logger.error(errStr)
0762 self.pool.remove(self)
0763 self.lock.release()
0764
0765
0766 _logger.debug("==== finish transferring jobs ====")
0767 finisherLock = threading.Semaphore(3)
0768 finisherProxyLock = threading.Lock()
0769 finisherThreadPool = ThreadPool()
0770 for loopIdx in ["low", "high"]:
0771 timeNow = naive_utcnow()
0772 if loopIdx == "high":
0773 highPrioFlag = True
0774 else:
0775 highPrioFlag = False
0776
0777 for ii in range(1000):
0778
0779 finisherLock.acquire()
0780 finisherProxyLock.acquire()
0781 ret, res = taskBuffer.lockJobsForFinisher(timeNow, 200, highPrioFlag)
0782 finisherProxyLock.release()
0783 finisherLock.release()
0784 if res is None:
0785 _logger.debug(f"# of jobs to be finished for {loopIdx} : {res}")
0786 else:
0787 _logger.debug(f"# of jobs to be finished for {loopIdx} : {len(res)}")
0788 if res is None or len(res) == 0:
0789 break
0790
0791 finThr = FinisherThr(finisherLock, finisherProxyLock, res, finisherThreadPool, timeNow)
0792 finThr.start()
0793
0794 finisherThreadPool.join()
0795
0796
0797 class ActivatorThr(threading.Thread):
0798 def __init__(self, lock, proxyLock, ids, pool):
0799 threading.Thread.__init__(self)
0800 self.ids = ids
0801 self.lock = lock
0802 self.proxyLock = proxyLock
0803 self.pool = pool
0804 self.pool.add(self)
0805
0806 def run(self):
0807 self.lock.acquire()
0808 try:
0809
0810 ids = self.ids
0811 self.proxyLock.acquire()
0812 jobs = taskBuffer.peekJobs(ids, fromActive=False, fromArchived=False, fromWaiting=False)
0813 self.proxyLock.release()
0814 actJobs = []
0815 for tmpJob in jobs:
0816 if tmpJob is None or tmpJob.jobStatus == "unknown":
0817 continue
0818
0819 lfns = []
0820 guids = []
0821 scopes = []
0822 for tmpFile in tmpJob.Files:
0823
0824 if tmpFile.type == "input" and tmpFile.status != "ready":
0825 lfns.append(tmpFile.lfn)
0826 scopes.append(tmpFile.scope)
0827
0828 _logger.debug(f"{tmpJob.PandaID} check input files at {tmpJob.computingSite}")
0829 tmpStat, okFiles = rucioAPI.list_file_replicas(scopes, lfns)
0830 if not tmpStat:
0831 pass
0832 else:
0833
0834 siteSpec = siteMapper.getSite(tmpJob.computingSite)
0835 scope_input, scope_output = select_scope(siteSpec, tmpJob.prodSourceLabel, tmpJob.job_label)
0836 allOK = True
0837 for tmpFile in tmpJob.Files:
0838
0839 if tmpFile.type == "input" and tmpFile.status != "ready":
0840
0841 if tmpFile.lfn in okFiles:
0842 for rse in okFiles[tmpFile.lfn]:
0843 if (
0844 siteSpec.ddm_endpoints_input[scope_input].isAssociated(rse)
0845 and siteSpec.ddm_endpoints_input[scope_input].getEndPoint(rse)["is_tape"] == "N"
0846 ):
0847 tmpFile.status = "ready"
0848 break
0849
0850 if tmpFile.status != "ready":
0851 allOK = False
0852 _logger.debug(f"{tmpJob.PandaID} skip since {tmpFile.scope}:{tmpFile.lfn} is missing")
0853 break
0854 if not allOK:
0855 continue
0856
0857 _logger.debug(f"{tmpJob.PandaID} to activate")
0858 actJobs.append(tmpJob)
0859
0860 _logger.debug("activating ...")
0861 self.proxyLock.acquire()
0862 taskBuffer.activateJobs(actJobs)
0863 self.proxyLock.release()
0864 _logger.debug("done")
0865 time.sleep(1)
0866 except Exception:
0867 errtype, errvalue = sys.exc_info()[:2]
0868 _logger.error(f"ActivatorThr failed with {errtype} {errvalue}")
0869 self.pool.remove(self)
0870 self.lock.release()
0871
0872 _memoryCheck("activator")
0873
0874
0875 _logger.debug("==== activate assigned jobs ====")
0876 activatorLock = threading.Semaphore(3)
0877 activatorProxyLock = threading.Lock()
0878 activatorThreadPool = ThreadPool()
0879 timeLimit = naive_utcnow() - datetime.timedelta(hours=1)
0880
0881 for ii in range(1000):
0882
0883 activatorLock.acquire()
0884 activatorProxyLock.acquire()
0885 ret, res = taskBuffer.lockJobsForActivator(timeLimit, 100, 800)
0886 activatorProxyLock.release()
0887 activatorLock.release()
0888 if res is None:
0889 _logger.debug(f"# of jobs to be activated for {res} ")
0890 else:
0891 _logger.debug(f"# of jobs to be activated for {len(res)} ")
0892 if res is None or len(res) == 0:
0893 break
0894
0895 actThr = ActivatorThr(activatorLock, activatorProxyLock, res, activatorThreadPool)
0896 actThr.start()
0897
0898 activatorThreadPool.join()
0899
0900
0901 class ActivatorWithRuleThr(threading.Thread):
0902 def __init__(self, lock, proxyLock, ids, pool):
0903 threading.Thread.__init__(self)
0904 self.ids = ids
0905 self.lock = lock
0906 self.proxyLock = proxyLock
0907 self.pool = pool
0908 self.pool.add(self)
0909
0910 def run(self):
0911 self.lock.acquire()
0912 try:
0913
0914 ids = self.ids
0915 self.proxyLock.acquire()
0916 jobs = taskBuffer.peekJobs(ids, fromActive=False, fromArchived=False, fromWaiting=False)
0917 self.proxyLock.release()
0918 actJobs = []
0919 replicaMap = dict()
0920 for tmpJob in jobs:
0921 if tmpJob is None or tmpJob.jobStatus == "unknown":
0922 continue
0923
0924 siteSpec = siteMapper.getSite(tmpJob.computingSite)
0925 scope_input, scope_output = select_scope(siteSpec, tmpJob.prodSourceLabel, tmpJob.job_label)
0926 allOK = True
0927 for tmpFile in tmpJob.Files:
0928
0929 if tmpFile.type == "input" and tmpFile.status != "ready":
0930
0931 if tmpFile.dispatchDBlock not in replicaMap:
0932 tmpStat, repMap = rucioAPI.list_dataset_replicas(tmpFile.dispatchDBlock)
0933 if tmpStat != 0:
0934 repMap = {}
0935 replicaMap[tmpFile.dispatchDBlock] = repMap
0936
0937 for rse in replicaMap[tmpFile.dispatchDBlock]:
0938 repInfo = replicaMap[tmpFile.dispatchDBlock][rse]
0939 if (
0940 siteSpec.ddm_endpoints_input[scope_input].isAssociated(rse)
0941 and siteSpec.ddm_endpoints_input[scope_input].getEndPoint(rse)["is_tape"] == "N"
0942 and repInfo[0]["total"] == repInfo[0]["found"]
0943 and repInfo[0]["total"] is not None
0944 and repInfo[0]["total"] > 0
0945 ):
0946 tmpFile.status = "ready"
0947 break
0948
0949 if tmpFile.status != "ready":
0950 allOK = False
0951 _logger.debug(f"{tmpJob.PandaID} skip since {tmpFile.scope}:{tmpFile.lfn} is missing with rule")
0952 break
0953 if not allOK:
0954 continue
0955
0956 _logger.debug(f"{tmpJob.PandaID} to activate with rule {str(replicaMap)}")
0957 actJobs.append(tmpJob)
0958
0959 _logger.debug("activating ...")
0960 self.proxyLock.acquire()
0961 taskBuffer.activateJobs(actJobs)
0962 self.proxyLock.release()
0963 _logger.debug("done")
0964 time.sleep(1)
0965 except Exception:
0966 errtype, errvalue = sys.exc_info()[:2]
0967 _logger.error(f"ActivatorThr failed with {errtype} {errvalue}")
0968 self.pool.remove(self)
0969 self.lock.release()
0970
0971 _memoryCheck("activator")
0972
0973
0974 _logger.debug("==== activate assigned jobs with rule ====")
0975 activatorLock = threading.Semaphore(3)
0976 activatorProxyLock = threading.Lock()
0977 activatorThreadPool = ThreadPool()
0978 timeLimit = naive_utcnow() - datetime.timedelta(hours=1)
0979
0980 for ii in range(1000):
0981
0982 activatorLock.acquire()
0983 activatorProxyLock.acquire()
0984 ret, res = taskBuffer.lockJobsForActivator(timeLimit, 100, 0)
0985 activatorProxyLock.release()
0986 activatorLock.release()
0987 if res is None:
0988 _logger.debug(f"# of jobs to be activated with rule for {res} ")
0989 else:
0990 _logger.debug(f"# of jobs to be activated with rule for {len(res)} ")
0991 if res is None or len(res) == 0:
0992 break
0993
0994 actThr = ActivatorWithRuleThr(activatorLock, activatorProxyLock, res, activatorThreadPool)
0995 actThr.start()
0996
0997 activatorThreadPool.join()
0998
0999
1000 class SubDeleter(threading.Thread):
1001 def __init__(self, lock, proxyLock, datasets, pool):
1002 threading.Thread.__init__(self)
1003 self.datasets = datasets
1004 self.lock = lock
1005 self.proxyLock = proxyLock
1006 self.pool = pool
1007 self.pool.add(self)
1008
1009 def run(self):
1010 self.lock.acquire()
1011 try:
1012 for vuid, name, modDate in self.datasets:
1013
1014 if re.search("_sub\d+$", name) is None:
1015 _logger.debug(f"skip non sub {name}")
1016 continue
1017 _logger.debug(f"delete sub {name}")
1018 if name.startswith("user.") or name.startswith("group.") or name.startswith("hc_test."):
1019 dsExists = False
1020 else:
1021 dsExists = True
1022
1023 self.proxyLock.acquire()
1024 retF, resF = taskBuffer.querySQLS(
1025 "SELECT /*+ index(tab FILESTABLE4_DESTDBLOCK_IDX) */ DISTINCT PandaID FROM ATLAS_PANDA.filesTable4 tab WHERE destinationDBlock=:destinationDBlock ",
1026 {":destinationDBlock": name},
1027 )
1028 self.proxyLock.release()
1029 if retF is None:
1030 _logger.error(f"SQL error for sub {name}")
1031 continue
1032 else:
1033 _logger.debug(f"sub {name} has {len(resF)} jobs")
1034 self.proxyLock.acquire()
1035
1036 sqlP = "SELECT jobStatus FROM ATLAS_PANDA.jobsArchived4 WHERE PandaID=:PandaID "
1037 sqlP += "UNION "
1038 sqlP += "SELECT jobStatus FROM ATLAS_PANDAARCH.jobsArchived WHERE PandaID=:PandaID AND modificationTime>CURRENT_DATE-30 "
1039 allDone = True
1040 for (pandaID,) in resF:
1041 retP, resP = taskBuffer.querySQLS(sqlP, {":PandaID": pandaID})
1042 if len(resP) == 0:
1043 _logger.debug(f"skip delete sub {name} PandaID={pandaID} not found")
1044 allDone = False
1045 break
1046 jobStatus = resP[0][0]
1047 if jobStatus not in [
1048 "finished",
1049 "failed",
1050 "cancelled",
1051 "closed",
1052 ]:
1053 _logger.debug(f"skip delete sub {name} PandaID={pandaID} is active {jobStatus}")
1054 allDone = False
1055 break
1056 self.proxyLock.release()
1057 if allDone:
1058 _logger.debug(f"deleting sub {name}")
1059 try:
1060 rucioAPI.erase_dataset(name, grace_period=4)
1061 status = True
1062 except Exception:
1063 errtype, errvalue = sys.exc_info()[:2]
1064 out = f"{errtype} {errvalue}"
1065 _logger.error(f"{name} failed to erase with {out}")
1066 else:
1067 _logger.debug(f"wait sub {name}")
1068 continue
1069
1070 self.proxyLock.acquire()
1071 varMap = {}
1072 varMap[":vuid"] = vuid
1073 varMap[":ost1"] = "completed"
1074 varMap[":ost2"] = "cleanup"
1075 varMap[":newStatus"] = "deleted"
1076 taskBuffer.querySQLS(
1077 "UPDATE ATLAS_PANDA.Datasets SET status=:newStatus,modificationdate=CURRENT_DATE WHERE vuid=:vuid AND status IN (:ost1,:ost2) ",
1078 varMap,
1079 )
1080 self.proxyLock.release()
1081 _logger.debug(f"end {name} ")
1082 except Exception:
1083 errStr = traceback.format_exc()
1084 _logger.error(errStr)
1085 self.pool.remove(self)
1086 self.lock.release()
1087
1088
1089 _logger.debug("==== delete sub datasets ====")
1090 timeLimitU = naive_utcnow() - datetime.timedelta(minutes=30)
1091 timeLimitL = naive_utcnow() - datetime.timedelta(days=14)
1092 subdeleteLock = threading.Semaphore(5)
1093 subdeleteProxyLock = threading.Lock()
1094 subdeleteThreadPool = ThreadPool()
1095 maxRows = 5000
1096 while True:
1097
1098 subdeleteLock.acquire()
1099
1100 varMap = {}
1101 varMap[":limitU"] = timeLimitU
1102 varMap[":limitL"] = timeLimitL
1103 varMap[":type"] = "output"
1104 varMap[":subtype"] = "sub"
1105 varMap[":st1"] = "completed"
1106 varMap[":st2"] = "cleanup"
1107 sqlQuery = (
1108 "type=:type AND subType=:subtype AND status IN (:st1,:st2) AND (creationdate BETWEEN :limitL AND :limitU) AND (modificationdate BETWEEN :limitL AND :limitU) AND rownum <= %s"
1109 % maxRows
1110 )
1111 subdeleteProxyLock.acquire()
1112 res = taskBuffer.getLockDatasets(sqlQuery, varMap, modTimeOffset="90/24/60")
1113 if res is None:
1114 _logger.debug(f"# of sub datasets to be deleted {res}")
1115 else:
1116 _logger.debug(f"# of sub datasets to be deleted {len(res)}")
1117 if res is None or len(res) == 0:
1118 subdeleteProxyLock.release()
1119 subdeleteLock.release()
1120 break
1121 subdeleteProxyLock.release()
1122
1123 subdeleteLock.release()
1124
1125 iRows = 0
1126 nRows = 500
1127 while iRows < len(res):
1128 subdeleter = SubDeleter(
1129 subdeleteLock,
1130 subdeleteProxyLock,
1131 res[iRows : iRows + nRows],
1132 subdeleteThreadPool,
1133 )
1134 subdeleter.start()
1135 iRows += nRows
1136 subdeleteThreadPool.join()
1137 if len(res) < 100:
1138 break
1139
1140
1141 del siteMapper
1142 del deletedDisList
1143
1144 _memoryCheck("end")
1145
1146
1147 taskBuffer.cleanup(requester=requester_id)
1148
1149 _logger.debug("===================== end =====================")
1150
1151
1152
1153 if __name__ == "__main__":
1154 main()