Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-10 08:39:01

0001 import datetime
0002 import os
0003 import re
0004 import sys
0005 import threading
0006 import time
0007 import traceback
0008 
0009 from pandacommon.pandalogger.PandaLogger import PandaLogger
0010 from pandacommon.pandautils.PandaUtils import naive_utcnow
0011 from pandacommon.pandautils.thread_utils import GenericThread
0012 
0013 import pandaserver.taskbuffer.ErrorCode
0014 from pandaserver.brokerage.SiteMapper import SiteMapper
0015 from pandaserver.config import panda_config
0016 from pandaserver.dataservice import DataServiceUtils
0017 from pandaserver.dataservice.closer import Closer
0018 from pandaserver.dataservice.DataServiceUtils import select_scope
0019 from pandaserver.dataservice.ddm import rucioAPI
0020 from pandaserver.dataservice.finisher import Finisher
0021 from pandaserver.taskbuffer import EventServiceUtils
0022 
0023 _logger = PandaLogger().getLogger("datasetManager")
0024 
0025 TRANSFER_TIMEOUT_HI_PRIORITY = 2
0026 TRANSFER_TIMEOUT_LO_PRIORITY = 6
0027 
0028 
0029 def main(tbuf=None, **kwargs):
0030     _logger.debug("===================== start =====================")
0031 
0032     # memory checker
0033     def _memoryCheck(str):
0034         try:
0035             proc_status = f"/proc/{os.getpid()}/status"
0036             procfile = open(proc_status)
0037             name = ""
0038             vmSize = ""
0039             vmRSS = ""
0040             # extract Name,VmSize,VmRSS
0041             for line in procfile:
0042                 if line.startswith("Name:"):
0043                     name = line.split()[-1]
0044                     continue
0045                 if line.startswith("VmSize:"):
0046                     vmSize = ""
0047                     for item in line.split()[1:]:
0048                         vmSize += item
0049                     continue
0050                 if line.startswith("VmRSS:"):
0051                     vmRSS = ""
0052                     for item in line.split()[1:]:
0053                         vmRSS += item
0054                     continue
0055             procfile.close()
0056             _logger.debug(f"MemCheck - {os.getpid()} Name={name} VSZ={vmSize} RSS={vmRSS} : {str}")
0057         except Exception:
0058             type, value, traceBack = sys.exc_info()
0059             _logger.error(f"memoryCheck() : {type} {value}")
0060             _logger.debug(f"MemCheck - {os.getpid()} unknown : {str}")
0061         return
0062 
0063     _memoryCheck("start")
0064 
0065     from pandaserver.taskbuffer.TaskBuffer import taskBuffer
0066 
0067     requester_id = GenericThread().get_full_id(__name__, sys.modules[__name__].__file__)
0068     taskBuffer.init(
0069         panda_config.dbhost,
0070         panda_config.dbpasswd,
0071         nDBConnection=1,
0072         useTimeout=True,
0073         requester=requester_id,
0074     )
0075     # else:
0076     #     taskBuffer = tbuf
0077 
0078     # instantiate sitemapper
0079     siteMapper = SiteMapper(taskBuffer)
0080 
0081     # list with lock
0082     class ListWithLock:
0083         def __init__(self):
0084             self.lock = threading.Lock()
0085             self.list = []
0086 
0087         def __contains__(self, item):
0088             self.lock.acquire()
0089             ret = self.list.__contains__(item)
0090             self.lock.release()
0091             return ret
0092 
0093         def append(self, item):
0094             appended = False
0095             self.lock.acquire()
0096             if item not in self.list:
0097                 self.list.append(item)
0098                 appended = True
0099             self.lock.release()
0100             return appended
0101 
0102     # list of dis datasets to be deleted
0103     deletedDisList = ListWithLock()
0104 
0105     # set tobedeleted to dis dataset
0106     def setTobeDeletedToDis(subDsName):
0107         try:
0108             # only production sub datasets
0109             if subDsName.startswith("user") or subDsName.startswith("group") or re.search("_sub\d+$", subDsName) is None:
0110                 return
0111             # get _dis names with _sub
0112             disNameList = taskBuffer.getAssociatedDisDatasets(subDsName)
0113             _logger.debug(f"setTobeDeletedToDis : sub:{subDsName} has dis:{str(disNameList)}")
0114             # loop over all _dis datasets
0115             for tmpDisName in disNameList:
0116                 # try to append to locked list
0117                 if not deletedDisList.append(tmpDisName):
0118                     # another thread already took care of the _dis
0119                     continue
0120                 # skip non _dis
0121                 if re.search("_dis\d+$", tmpDisName) is None:
0122                     continue
0123                 # get dataset
0124                 _logger.debug(f"setTobeDeletedToDis : try to get {tmpDisName} in DB")
0125                 tmpDS = taskBuffer.queryDatasetWithMap({"name": tmpDisName})
0126                 if tmpDS is None:
0127                     _logger.error(f"setTobeDeletedToDis : cannot get {tmpDisName} in DB")
0128                     continue
0129                 # check status
0130                 if tmpDS.status in ["tobedeleted", "deleted"]:
0131                     _logger.debug(f"setTobeDeletedToDis : skip {tmpDisName} since status={tmpDS.status}")
0132                     continue
0133                 # check the number of failed jobs associated to the _dis
0134                 if tmpDS.currentfiles == 0:
0135                     # all succeeded
0136                     tmpDS.status = "deleting"
0137                     excStatus = "deleted"
0138                 else:
0139                     # some failed, to reduce the lifetime
0140                     tmpDS.status = "shortening"
0141                     excStatus = "shortened"
0142                 # update dataset
0143                 retU = taskBuffer.updateDatasets(
0144                     [tmpDS],
0145                     withLock=True,
0146                     withCriteria="status<>:crStatus",
0147                     criteriaMap={":crStatus": excStatus},
0148                 )
0149                 _logger.debug(f"setTobeDeletedToDis : set {tmpDS.status} to {tmpDisName} with {str(retU)}")
0150         except Exception:
0151             errType, errValue = sys.exc_info()[:2]
0152             _logger.error(f"setTobeDeletedToDis : {subDsName} {errType} {errValue}")
0153 
0154     # thread pool
0155     class ThreadPool:
0156         def __init__(self):
0157             self.lock = threading.Lock()
0158             self.list = []
0159 
0160         def add(self, obj):
0161             self.lock.acquire()
0162             self.list.append(obj)
0163             self.lock.release()
0164 
0165         def remove(self, obj):
0166             self.lock.acquire()
0167             self.list.remove(obj)
0168             self.lock.release()
0169 
0170         def join(self):
0171             self.lock.acquire()
0172             thrlist = tuple(self.list)
0173             self.lock.release()
0174             for thr in thrlist:
0175                 thr.join()
0176 
0177     # thread to close dataset
0178     class CloserThr(threading.Thread):
0179         def __init__(self, lock, proxyLock, datasets, pool):
0180             threading.Thread.__init__(self)
0181             self.datasets = datasets
0182             self.lock = lock
0183             self.proxyLock = proxyLock
0184             self.pool = pool
0185             self.pool.add(self)
0186 
0187         def run(self):
0188             self.lock.acquire()
0189             try:
0190                 # loop over all datasets
0191                 for vuid, name, modDate in self.datasets:
0192                     _logger.debug(f"Close {modDate} {name}")
0193                     dsExists = True
0194                     if name.startswith("user.") or name.startswith("group.") or name.startswith("hc_test.") or name.startswith("panda.um."):
0195                         dsExists = False
0196                     if dsExists:
0197                         # check if dataset exists
0198                         status, out = rucioAPI.get_metadata(name)
0199                         if status is True:
0200                             if out is not None:
0201                                 try:
0202                                     rucioAPI.close_dataset(name)
0203                                     status = True
0204                                 except Exception:
0205                                     errtype, errvalue = sys.exc_info()[:2]
0206                                     out = f"failed to freeze : {errtype} {errvalue}"
0207                                     status = False
0208                             else:
0209                                 # dataset not exist
0210                                 status, out = True, ""
0211                                 dsExists = False
0212                     else:
0213                         status, out = True, ""
0214                     if not status:
0215                         _logger.error(f"{name} failed to close with {out}")
0216                     else:
0217                         self.proxyLock.acquire()
0218                         varMap = {}
0219                         varMap[":vuid"] = vuid
0220                         varMap[":newstatus"] = "completed"
0221                         varMap[":oldstatus"] = "tobeclosed"
0222                         taskBuffer.querySQLS(
0223                             "UPDATE ATLAS_PANDA.Datasets SET status=:newstatus,modificationdate=CURRENT_DATE WHERE vuid=:vuid AND status=:oldstatus",
0224                             varMap,
0225                         )
0226                         self.proxyLock.release()
0227                         # set tobedeleted to dis
0228                         setTobeDeletedToDis(name)
0229                         # skip if dataset is not real
0230                         if not dsExists:
0231                             continue
0232                         # count # of files
0233                         status, out = rucioAPI.get_number_of_files(name)
0234                         if status is not True:
0235                             if status is False:
0236                                 _logger.error(out)
0237                         else:
0238                             _logger.debug(out)
0239                             try:
0240                                 nFile = int(out)
0241                                 if nFile == 0:
0242                                     # erase dataset
0243                                     _logger.debug(f"erase {name}")
0244                                     status, out = rucioAPI.erase_dataset(name)
0245                                     _logger.debug(f"OK with {name}")
0246                             except Exception:
0247                                 pass
0248             except Exception:
0249                 pass
0250             self.pool.remove(self)
0251             self.lock.release()
0252 
0253     # close datasets
0254     _logger.debug("==== close datasets ====")
0255     timeLimitU = naive_utcnow() - datetime.timedelta(minutes=1)
0256     timeLimitL = naive_utcnow() - datetime.timedelta(days=3)
0257     closeLock = threading.Semaphore(5)
0258     closeProxyLock = threading.Lock()
0259     closeThreadPool = ThreadPool()
0260     maxRows = 100000
0261     while True:
0262         # lock
0263         closeLock.acquire()
0264         # get datasets
0265         closeProxyLock.acquire()
0266         varMap = {}
0267         varMap[":modificationdateU"] = timeLimitU
0268         varMap[":modificationdateL"] = timeLimitL
0269         varMap[":type"] = "output"
0270         varMap[":status"] = "tobeclosed"
0271         sqlQuery = f"type=:type AND status=:status AND (modificationdate BETWEEN :modificationdateL AND :modificationdateU) AND rownum <= {maxRows}"
0272         res = taskBuffer.getLockDatasets(sqlQuery, varMap, modTimeOffset="90/24/60")
0273         if res is None:
0274             _logger.debug(f"# of datasets to be closed: {res}")
0275         else:
0276             _logger.debug(f"# of datasets to be closed: {len(res)}")
0277         if res is None or len(res) == 0:
0278             closeProxyLock.release()
0279             closeLock.release()
0280             break
0281         # release
0282         closeProxyLock.release()
0283         closeLock.release()
0284         # run thread
0285         iRows = 0
0286         nRows = 500
0287         while iRows < len(res):
0288             closerThr = CloserThr(closeLock, closeProxyLock, res[iRows : iRows + nRows], closeThreadPool)
0289             closerThr.start()
0290             iRows += nRows
0291         closeThreadPool.join()
0292         if len(res) < maxRows:
0293             break
0294 
0295     # thread to freeze dataset
0296     class Freezer(threading.Thread):
0297         def __init__(self, lock, proxyLock, datasets, pool):
0298             threading.Thread.__init__(self)
0299             self.datasets = datasets
0300             self.lock = lock
0301             self.proxyLock = proxyLock
0302             self.pool = pool
0303             self.pool.add(self)
0304 
0305         def run(self):
0306             self.lock.acquire()
0307             try:
0308                 for vuid, name, modDate in self.datasets:
0309                     _logger.debug(f"Freezer start {modDate} {name}")
0310                     self.proxyLock.acquire()
0311                     retF, resF = taskBuffer.querySQLS(
0312                         "SELECT /*+ index(tab FILESTABLE4_DESTDBLOCK_IDX) */ PandaID,status FROM ATLAS_PANDA.filesTable4 tab WHERE destinationDBlock=:destinationDBlock ",
0313                         {":destinationDBlock": name},
0314                     )
0315                     self.proxyLock.release()
0316                     if isinstance(retF, int) and retF < 0:
0317                         _logger.error("SQL error")
0318                     else:
0319                         allFinished = True
0320                         onePandaID = None
0321                         for tmpPandaID, tmpFileStatus in resF:
0322                             onePandaID = tmpPandaID
0323                             if tmpFileStatus not in [
0324                                 "ready",
0325                                 "failed",
0326                                 "skipped",
0327                                 "merging",
0328                                 "finished",
0329                             ]:
0330                                 allFinished = False
0331                                 break
0332                         # check sub datasets in the jobset for event service job
0333                         if allFinished:
0334                             self.proxyLock.acquire()
0335                             tmpJobs = taskBuffer.getFullJobStatus([onePandaID])
0336                             self.proxyLock.release()
0337                             if len(tmpJobs) > 0 and tmpJobs[0] is not None:
0338                                 if EventServiceUtils.isEventServiceMerge(tmpJobs[0]):
0339                                     self.proxyLock.acquire()
0340                                     cThr = Closer(taskBuffer, [], tmpJobs[0])
0341                                     allFinished = cThr.checkSubDatasetsInJobset()
0342                                     self.proxyLock.release()
0343                                     _logger.debug(f"closer checked sub datasets in the jobset for {name} : {allFinished}")
0344                         # no files in filesTable
0345                         if allFinished:
0346                             _logger.debug(f"freeze {name} ")
0347                             dsExists = True
0348                             if name.startswith("user.") or name.startswith("group.") or name.startswith("hc_test.") or name.startswith("panda.um."):
0349                                 dsExists = False
0350                             if name.startswith("panda.um."):
0351                                 self.proxyLock.acquire()
0352                                 retMer, resMer = taskBuffer.querySQLS(
0353                                     "SELECT /*+ index(tab FILESTABLE4_DESTDBLOCK_IDX) */ PandaID FROM ATLAS_PANDA.filesTable4 tab WHERE destinationDBlock=:destinationDBlock AND status IN (:statusM,:statusF) ",
0354                                     {
0355                                         ":destinationDBlock": name,
0356                                         ":statusM": "merging",
0357                                         ":statusF": "failed",
0358                                     },
0359                                 )
0360                                 self.proxyLock.release()
0361                                 if resMer is not None and len(resMer) > 0:
0362                                     mergeID = resMer[0][0]
0363                                     # get merging jobs
0364                                     self.proxyLock.acquire()
0365                                     mergingJobs = taskBuffer.peekJobs(
0366                                         [mergeID],
0367                                         fromDefined=False,
0368                                         fromArchived=False,
0369                                         fromWaiting=False,
0370                                     )
0371                                     self.proxyLock.release()
0372                                     mergeJob = mergingJobs[0]
0373                                     if mergeJob is not None:
0374                                         tmpDestDBlocks = []
0375                                         # get destDBlock
0376                                         for tmpFile in mergeJob.Files:
0377                                             if tmpFile.type in ["output", "log"]:
0378                                                 if tmpFile.destinationDBlock not in tmpDestDBlocks:
0379                                                     tmpDestDBlocks.append(tmpFile.destinationDBlock)
0380                                         # run
0381                                         _logger.debug(f"start JEDI closer for {name} ")
0382                                         self.proxyLock.acquire()
0383                                         cThr = Closer(taskBuffer, tmpDestDBlocks, mergeJob)
0384                                         cThr.run()
0385                                         self.proxyLock.release()
0386                                         _logger.debug(f"end JEDI closer for {name} ")
0387                                         continue
0388                                     else:
0389                                         _logger.debug(f"failed to get merging job for {name} ")
0390                                 else:
0391                                     _logger.debug(f"failed to get merging file for {name} ")
0392                                 status, out = True, ""
0393                             elif dsExists:
0394                                 # check if dataset exists
0395                                 status, out = rucioAPI.get_metadata(name)
0396                                 if status is True:
0397                                     if out is not None:
0398                                         try:
0399                                             rucioAPI.close_dataset(name)
0400                                             status = True
0401                                         except Exception:
0402                                             errtype, errvalue = sys.exc_info()[:2]
0403                                             out = f"failed to freeze : {errtype} {errvalue}"
0404                                             status = False
0405                                     else:
0406                                         # dataset not exist
0407                                         status, out = True, ""
0408                                         dsExists = False
0409                             else:
0410                                 status, out = True, ""
0411                             if not status:
0412                                 _logger.error(f"{name} failed to freeze with {out}")
0413                             else:
0414                                 self.proxyLock.acquire()
0415                                 varMap = {}
0416                                 varMap[":vuid"] = vuid
0417                                 varMap[":status"] = "completed"
0418                                 taskBuffer.querySQLS(
0419                                     "UPDATE ATLAS_PANDA.Datasets SET status=:status,modificationdate=CURRENT_DATE WHERE vuid=:vuid",
0420                                     varMap,
0421                                 )
0422                                 self.proxyLock.release()
0423                                 if name.startswith("panda.um.") or not dsExists:
0424                                     continue
0425                                 # set tobedeleted to dis
0426                                 setTobeDeletedToDis(name)
0427                                 # count # of files
0428                                 status, out = rucioAPI.get_number_of_files(name)
0429                                 if status is not True:
0430                                     if status is False:
0431                                         _logger.error(out)
0432                                 else:
0433                                     _logger.debug(out)
0434                                     try:
0435                                         nFile = int(out)
0436                                         _logger.debug(nFile)
0437                                         if nFile == 0:
0438                                             # erase dataset
0439                                             _logger.debug(f"erase {name}")
0440                                             status, out = rucioAPI.erase_dataset(name)
0441                                             _logger.debug(f"OK with {name}")
0442                                     except Exception:
0443                                         pass
0444                         else:
0445                             _logger.debug(f"wait {name} ")
0446                             self.proxyLock.acquire()
0447                             taskBuffer.querySQLS(
0448                                 "UPDATE ATLAS_PANDA.Datasets SET modificationdate=CURRENT_DATE WHERE vuid=:vuid",
0449                                 {":vuid": vuid},
0450                             )
0451                             self.proxyLock.release()
0452                     _logger.debug(f"end {name} ")
0453             except Exception:
0454                 errStr = traceback.format_exc()
0455                 _logger.error(errStr)
0456             self.pool.remove(self)
0457             self.lock.release()
0458 
0459     # freeze dataset
0460     _logger.debug("==== freeze datasets ====")
0461     timeLimitRU = naive_utcnow() - datetime.timedelta(hours=3)
0462     timeLimitRL = naive_utcnow() - datetime.timedelta(hours=12)
0463     timeLimitU = naive_utcnow() - datetime.timedelta(hours=6)
0464     timeLimitL = naive_utcnow() - datetime.timedelta(days=14)
0465     # reset doing so that Closer can update unmerged datasets
0466     sql = "SELECT name FROM ATLAS_PANDA.Datasets "
0467     sql += "WHERE type=:type AND (modificationdate BETWEEN :modificationdateRL AND :modificationdateRU) AND subType=:subType AND status=:oldStatus "
0468     varMap = {}
0469     varMap[":modificationdateRU"] = timeLimitRU
0470     varMap[":modificationdateRL"] = timeLimitRL
0471     varMap[":type"] = "output"
0472     varMap[":subType"] = "sub"
0473     varMap[":oldStatus"] = "doing"
0474     retReset, resReset = taskBuffer.querySQLS(sql, varMap)
0475     sql = "UPDATE ATLAS_PANDA.Datasets SET status=:newStatus,modificationdate=:modificationdateU WHERE name=:name AND status=:oldStatus "
0476     if resReset is not None:
0477         for (name,) in resReset:
0478             varMap = {}
0479             varMap[":name"] = name
0480             varMap[":oldStatus"] = "doing"
0481             varMap[":newStatus"] = "running"
0482             varMap[":modificationdateU"] = timeLimitU
0483             _logger.debug(f"reset {name} to freeze")
0484             taskBuffer.querySQLS(sql, varMap)
0485     # loop for freezer
0486     freezeLock = threading.Semaphore(5)
0487     freezeProxyLock = threading.Lock()
0488     freezeThreadPool = ThreadPool()
0489     maxRows = 100000
0490     while True:
0491         # lock
0492         freezeLock.acquire()
0493         # get datasets
0494         sqlQuery = (
0495             "type=:type AND status IN (:status1,:status2,:status3,:status4,:status5) "
0496             + f"AND (modificationdate BETWEEN :modificationdateL AND :modificationdateU) AND subType=:subType AND rownum <= {maxRows}"
0497         )
0498         varMap = {}
0499         varMap[":modificationdateU"] = timeLimitU
0500         varMap[":modificationdateL"] = timeLimitL
0501         varMap[":type"] = "output"
0502         varMap[":status1"] = "running"
0503         varMap[":status2"] = "created"
0504         varMap[":status3"] = "defined"
0505         varMap[":status4"] = "locked"
0506         varMap[":status5"] = "doing"
0507         varMap[":subType"] = "sub"
0508         freezeProxyLock.acquire()
0509         res = taskBuffer.getLockDatasets(sqlQuery, varMap, modTimeOffset="90/24/60")
0510         if res is None:
0511             _logger.debug(f"# of datasets to be frozen: {res}")
0512         else:
0513             _logger.debug(f"# of datasets to be frozen: {len(res)}")
0514         if res is None or len(res) == 0:
0515             freezeProxyLock.release()
0516             freezeLock.release()
0517             break
0518         freezeProxyLock.release()
0519         # release
0520         freezeLock.release()
0521         # run freezer
0522         iRows = 0
0523         nRows = 500
0524         while iRows < len(res):
0525             freezer = Freezer(
0526                 freezeLock,
0527                 freezeProxyLock,
0528                 res[iRows : iRows + nRows],
0529                 freezeThreadPool,
0530             )
0531             freezer.start()
0532             iRows += nRows
0533         freezeThreadPool.join()
0534         if len(res) < maxRows:
0535             break
0536 
0537     # delete dis datasets
0538     class EraserThr(threading.Thread):
0539         def __init__(self, lock, proxyLock, datasets, pool, operationType):
0540             threading.Thread.__init__(self)
0541             self.datasets = datasets
0542             self.lock = lock
0543             self.proxyLock = proxyLock
0544             self.pool = pool
0545             self.pool.add(self)
0546             self.operationType = operationType
0547 
0548         def run(self):
0549             self.lock.acquire()
0550             try:
0551                 # loop over all datasets
0552                 for vuid, name, modDate in self.datasets:
0553                     # only dis datasets
0554                     if re.search("_dis\d+$", name) is None:
0555                         _logger.error(f"Eraser : non disDS {name}")
0556                         continue
0557                     # delete
0558                     _logger.debug(f"Eraser {self.operationType} dis {modDate} {name}")
0559                     # delete or shorten
0560                     endStatus = "deleted"
0561                     status, out = rucioAPI.erase_dataset(name)
0562                     if not status:
0563                         _logger.error(out)
0564                         continue
0565                     _logger.debug(f"OK with {name}")
0566                     # update
0567                     self.proxyLock.acquire()
0568                     varMap = {}
0569                     varMap[":vuid"] = vuid
0570                     varMap[":status"] = endStatus
0571                     taskBuffer.querySQLS(
0572                         "UPDATE ATLAS_PANDA.Datasets SET status=:status,modificationdate=CURRENT_DATE WHERE vuid=:vuid",
0573                         varMap,
0574                     )
0575                     self.proxyLock.release()
0576             except Exception:
0577                 errStr = traceback.format_exc()
0578                 _logger.error(errStr)
0579             self.pool.remove(self)
0580             self.lock.release()
0581 
0582     # delete dis datasets
0583     _logger.debug("==== delete dis datasets ====")
0584     timeLimitU = naive_utcnow() - datetime.timedelta(minutes=30)
0585     timeLimitL = naive_utcnow() - datetime.timedelta(days=3)
0586     disEraseLock = threading.Semaphore(5)
0587     disEraseProxyLock = threading.Lock()
0588     disEraseThreadPool = ThreadPool()
0589     # maxRows = 100000
0590     maxRows = 5000
0591     for targetStatus in ["deleting", "shortening"]:
0592         for i in range(10):
0593             # lock
0594             disEraseLock.acquire()
0595             # get datasets
0596             varMap = {}
0597             varMap[":modificationdateU"] = timeLimitU
0598             varMap[":modificationdateL"] = timeLimitL
0599             varMap[":type"] = "dispatch"
0600             varMap[":status"] = targetStatus
0601             sqlQuery = f"type=:type AND status=:status AND (modificationdate BETWEEN :modificationdateL AND :modificationdateU) AND rownum <= {maxRows}"
0602             disEraseProxyLock.acquire()
0603             res = taskBuffer.getLockDatasets(sqlQuery, varMap, modTimeOffset="90/24/60")
0604             if res is None:
0605                 _logger.debug(f"# of dis datasets for {targetStatus}: None")
0606             else:
0607                 _logger.debug(f"# of dis datasets for {targetStatus}: {len(res)}")
0608             if res is None or len(res) == 0:
0609                 disEraseProxyLock.release()
0610                 disEraseLock.release()
0611                 break
0612             disEraseProxyLock.release()
0613             # release
0614             disEraseLock.release()
0615             # run disEraser
0616             iRows = 0
0617             nRows = 500
0618             while iRows < len(res):
0619                 disEraser = EraserThr(
0620                     disEraseLock,
0621                     disEraseProxyLock,
0622                     res[iRows : iRows + nRows],
0623                     disEraseThreadPool,
0624                     targetStatus,
0625                 )
0626                 disEraser.start()
0627                 iRows += nRows
0628             disEraseThreadPool.join()
0629             if len(res) < 100:
0630                 break
0631 
0632     _memoryCheck("finisher")
0633 
0634     # finisher thread
0635     class FinisherThr(threading.Thread):
0636         def __init__(self, lock, proxyLock, ids, pool, timeNow):
0637             threading.Thread.__init__(self)
0638             self.ids = ids
0639             self.lock = lock
0640             self.proxyLock = proxyLock
0641             self.pool = pool
0642             self.timeNow = timeNow
0643             self.pool.add(self)
0644 
0645         def run(self):
0646             self.lock.acquire()
0647             try:
0648                 # get jobs from DB
0649                 ids = self.ids
0650                 self.proxyLock.acquire()
0651                 jobs = taskBuffer.peekJobs(ids, fromDefined=False, fromArchived=False, fromWaiting=False)
0652                 self.proxyLock.release()
0653                 upJobs = []
0654                 finJobs = []
0655                 for job in jobs:
0656                     if job is None or job.jobStatus == "unknown":
0657                         continue
0658                     seList = ["dummy"]
0659                     tmpNucleus = siteMapper.getNucleus(job.nucleus)
0660                     # get SEs
0661                     if job.prodSourceLabel == "user" and job.destinationSE not in siteMapper.siteSpecList:
0662                         # using --destSE for analysis job to transfer output
0663                         seList = [job.destinationSE]
0664                     elif tmpNucleus is not None:
0665                         seList = [tmpNucleus.default_ddm_endpoint_out]
0666 
0667                     # get LFN list
0668                     lfns = []
0669                     guids = []
0670                     scopes = []
0671                     nTokens = 0
0672                     for file in job.Files:
0673                         # only output files are checked
0674                         if file.type == "output" or file.type == "log":
0675                             if file.status == "nooutput":
0676                                 continue
0677                             if DataServiceUtils.getDistributedDestination(file.destinationDBlockToken) is not None:
0678                                 continue
0679                             lfns.append(file.lfn)
0680                             guids.append(file.GUID)
0681                             scopes.append(file.scope)
0682                             nTokens += len(file.destinationDBlockToken.split(","))
0683                     # get files
0684                     _logger.debug(f"{job.PandaID} Cloud:{job.cloud}")
0685                     tmpStat, okFiles = rucioAPI.list_file_replicas(scopes, lfns, seList)
0686                     if not tmpStat:
0687                         _logger.error(f"{job.PandaID} failed to get file replicas")
0688                         okFiles = {}
0689                     # count files
0690                     nOkTokens = 0
0691                     for okLFN in okFiles:
0692                         okSEs = okFiles[okLFN]
0693                         nOkTokens += len(okSEs)
0694                     # check all files are ready
0695                     _logger.debug(f"{job.PandaID} nToken:{nTokens} nOkToken:{nOkTokens}")
0696                     if nTokens <= nOkTokens:
0697                         _logger.debug(f"{job.PandaID} Finisher : Finish")
0698                         for file in job.Files:
0699                             if file.type == "output" or file.type == "log":
0700                                 if file.status != "nooutput":
0701                                     file.status = "ready"
0702                         # append to run Finisher
0703                         finJobs.append(job)
0704                     else:
0705                         endTime = job.endTime
0706                         if endTime == "NULL":
0707                             endTime = job.startTime
0708                         # priority-dependent timeout
0709                         if job.currentPriority >= 800 and (job.prodSourceLabel not in ["user"]):
0710                             timeOutValue = TRANSFER_TIMEOUT_HI_PRIORITY
0711                         else:
0712                             timeOutValue = TRANSFER_TIMEOUT_LO_PRIORITY
0713 
0714                         timeOut = self.timeNow - datetime.timedelta(days=timeOutValue)
0715                         _logger.debug(f"{job.PandaID}  Priority:{job.currentPriority} Limit:{str(timeOut)} End:{str(endTime)}")
0716                         if endTime < timeOut:
0717                             # timeout
0718                             _logger.debug(f"{job.PandaID} Finisher : Kill")
0719                             strMiss = ""
0720                             for lfn in lfns:
0721                                 if lfn not in okFiles:
0722                                     strMiss += f" {lfn}"
0723                             job.jobStatus = "failed"
0724                             job.taskBufferErrorCode = pandaserver.taskbuffer.ErrorCode.EC_Transfer
0725                             job.taskBufferErrorDiag = f"transfer timeout for {strMiss}"
0726                             guidMap = {}
0727                             for file in job.Files:
0728                                 # set file status
0729                                 if file.status == "transferring" or file.type in [
0730                                     "log",
0731                                     "output",
0732                                 ]:
0733                                     file.status = "failed"
0734                                 # collect GUIDs to delete files from _tid datasets
0735                                 if file.type == "output" or file.type == "log":
0736                                     if file.destinationDBlock not in guidMap:
0737                                         guidMap[file.destinationDBlock] = []
0738                                     guidMap[file.destinationDBlock].append(file.GUID)
0739                         else:
0740                             # wait
0741                             _logger.debug(f"{job.PandaID} Finisher : Wait")
0742                             for lfn in lfns:
0743                                 if lfn not in okFiles:
0744                                     _logger.debug(f"{job.PandaID}    -> {lfn}")
0745                     upJobs.append(job)
0746                 # update
0747                 _logger.debug("updating ...")
0748                 self.proxyLock.acquire()
0749                 taskBuffer.updateJobs(upJobs, False)
0750                 self.proxyLock.release()
0751                 # run Finisher
0752                 for job in finJobs:
0753                     fThr = Finisher(taskBuffer, None, job)
0754                     fThr.run()
0755                 _logger.debug("done")
0756                 time.sleep(1)
0757             except Exception:
0758                 errtype, errvalue = sys.exc_info()[:2]
0759                 errStr = f"FinisherThr failed with {errtype} {errvalue}"
0760                 errStr += traceback.format_exc()
0761                 _logger.error(errStr)
0762             self.pool.remove(self)
0763             self.lock.release()
0764 
0765     # finish transferring jobs
0766     _logger.debug("==== finish transferring jobs ====")
0767     finisherLock = threading.Semaphore(3)
0768     finisherProxyLock = threading.Lock()
0769     finisherThreadPool = ThreadPool()
0770     for loopIdx in ["low", "high"]:
0771         timeNow = naive_utcnow()
0772         if loopIdx == "high":
0773             highPrioFlag = True
0774         else:
0775             highPrioFlag = False
0776         # get jobs
0777         for ii in range(1000):
0778             # lock
0779             finisherLock.acquire()
0780             finisherProxyLock.acquire()
0781             ret, res = taskBuffer.lockJobsForFinisher(timeNow, 200, highPrioFlag)
0782             finisherProxyLock.release()
0783             finisherLock.release()
0784             if res is None:
0785                 _logger.debug(f"# of jobs to be finished for {loopIdx} : {res}")
0786             else:
0787                 _logger.debug(f"# of jobs to be finished for {loopIdx} : {len(res)}")
0788             if res is None or len(res) == 0:
0789                 break
0790             # run thread
0791             finThr = FinisherThr(finisherLock, finisherProxyLock, res, finisherThreadPool, timeNow)
0792             finThr.start()
0793         # wait
0794         finisherThreadPool.join()
0795 
0796     # activator thread
0797     class ActivatorThr(threading.Thread):
0798         def __init__(self, lock, proxyLock, ids, pool):
0799             threading.Thread.__init__(self)
0800             self.ids = ids
0801             self.lock = lock
0802             self.proxyLock = proxyLock
0803             self.pool = pool
0804             self.pool.add(self)
0805 
0806         def run(self):
0807             self.lock.acquire()
0808             try:
0809                 # get jobs from DB
0810                 ids = self.ids
0811                 self.proxyLock.acquire()
0812                 jobs = taskBuffer.peekJobs(ids, fromActive=False, fromArchived=False, fromWaiting=False)
0813                 self.proxyLock.release()
0814                 actJobs = []
0815                 for tmpJob in jobs:
0816                     if tmpJob is None or tmpJob.jobStatus == "unknown":
0817                         continue
0818                     # get LFN list
0819                     lfns = []
0820                     guids = []
0821                     scopes = []
0822                     for tmpFile in tmpJob.Files:
0823                         # only input files are checked
0824                         if tmpFile.type == "input" and tmpFile.status != "ready":
0825                             lfns.append(tmpFile.lfn)
0826                             scopes.append(tmpFile.scope)
0827                     # get file replicas
0828                     _logger.debug(f"{tmpJob.PandaID} check input files at {tmpJob.computingSite}")
0829                     tmpStat, okFiles = rucioAPI.list_file_replicas(scopes, lfns)
0830                     if not tmpStat:
0831                         pass
0832                     else:
0833                         # check if locally available
0834                         siteSpec = siteMapper.getSite(tmpJob.computingSite)
0835                         scope_input, scope_output = select_scope(siteSpec, tmpJob.prodSourceLabel, tmpJob.job_label)
0836                         allOK = True
0837                         for tmpFile in tmpJob.Files:
0838                             # only input
0839                             if tmpFile.type == "input" and tmpFile.status != "ready":
0840                                 # check RSEs
0841                                 if tmpFile.lfn in okFiles:
0842                                     for rse in okFiles[tmpFile.lfn]:
0843                                         if (
0844                                             siteSpec.ddm_endpoints_input[scope_input].isAssociated(rse)
0845                                             and siteSpec.ddm_endpoints_input[scope_input].getEndPoint(rse)["is_tape"] == "N"
0846                                         ):
0847                                             tmpFile.status = "ready"
0848                                             break
0849                                 # missing
0850                                 if tmpFile.status != "ready":
0851                                     allOK = False
0852                                     _logger.debug(f"{tmpJob.PandaID} skip since {tmpFile.scope}:{tmpFile.lfn} is missing")
0853                                     break
0854                         if not allOK:
0855                             continue
0856                         # append to run activator
0857                         _logger.debug(f"{tmpJob.PandaID} to activate")
0858                         actJobs.append(tmpJob)
0859                 # update
0860                 _logger.debug("activating ...")
0861                 self.proxyLock.acquire()
0862                 taskBuffer.activateJobs(actJobs)
0863                 self.proxyLock.release()
0864                 _logger.debug("done")
0865                 time.sleep(1)
0866             except Exception:
0867                 errtype, errvalue = sys.exc_info()[:2]
0868                 _logger.error(f"ActivatorThr failed with {errtype} {errvalue}")
0869             self.pool.remove(self)
0870             self.lock.release()
0871 
0872     _memoryCheck("activator")
0873 
0874     # activate assigned jobs
0875     _logger.debug("==== activate assigned jobs ====")
0876     activatorLock = threading.Semaphore(3)
0877     activatorProxyLock = threading.Lock()
0878     activatorThreadPool = ThreadPool()
0879     timeLimit = naive_utcnow() - datetime.timedelta(hours=1)
0880     # get jobs
0881     for ii in range(1000):
0882         # lock
0883         activatorLock.acquire()
0884         activatorProxyLock.acquire()
0885         ret, res = taskBuffer.lockJobsForActivator(timeLimit, 100, 800)
0886         activatorProxyLock.release()
0887         activatorLock.release()
0888         if res is None:
0889             _logger.debug(f"# of jobs to be activated for {res} ")
0890         else:
0891             _logger.debug(f"# of jobs to be activated for {len(res)} ")
0892         if res is None or len(res) == 0:
0893             break
0894         # run thread
0895         actThr = ActivatorThr(activatorLock, activatorProxyLock, res, activatorThreadPool)
0896         actThr.start()
0897     # wait
0898     activatorThreadPool.join()
0899 
0900     # activator thread with rule
0901     class ActivatorWithRuleThr(threading.Thread):
0902         def __init__(self, lock, proxyLock, ids, pool):
0903             threading.Thread.__init__(self)
0904             self.ids = ids
0905             self.lock = lock
0906             self.proxyLock = proxyLock
0907             self.pool = pool
0908             self.pool.add(self)
0909 
0910         def run(self):
0911             self.lock.acquire()
0912             try:
0913                 # get jobs from DB
0914                 ids = self.ids
0915                 self.proxyLock.acquire()
0916                 jobs = taskBuffer.peekJobs(ids, fromActive=False, fromArchived=False, fromWaiting=False)
0917                 self.proxyLock.release()
0918                 actJobs = []
0919                 replicaMap = dict()
0920                 for tmpJob in jobs:
0921                     if tmpJob is None or tmpJob.jobStatus == "unknown":
0922                         continue
0923                     # check if locally available
0924                     siteSpec = siteMapper.getSite(tmpJob.computingSite)
0925                     scope_input, scope_output = select_scope(siteSpec, tmpJob.prodSourceLabel, tmpJob.job_label)
0926                     allOK = True
0927                     for tmpFile in tmpJob.Files:
0928                         # only input files are checked
0929                         if tmpFile.type == "input" and tmpFile.status != "ready":
0930                             # get replicas
0931                             if tmpFile.dispatchDBlock not in replicaMap:
0932                                 tmpStat, repMap = rucioAPI.list_dataset_replicas(tmpFile.dispatchDBlock)
0933                                 if tmpStat != 0:
0934                                     repMap = {}
0935                                 replicaMap[tmpFile.dispatchDBlock] = repMap
0936                             # check RSEs
0937                             for rse in replicaMap[tmpFile.dispatchDBlock]:
0938                                 repInfo = replicaMap[tmpFile.dispatchDBlock][rse]
0939                                 if (
0940                                     siteSpec.ddm_endpoints_input[scope_input].isAssociated(rse)
0941                                     and siteSpec.ddm_endpoints_input[scope_input].getEndPoint(rse)["is_tape"] == "N"
0942                                     and repInfo[0]["total"] == repInfo[0]["found"]
0943                                     and repInfo[0]["total"] is not None
0944                                     and repInfo[0]["total"] > 0
0945                                 ):
0946                                     tmpFile.status = "ready"
0947                                     break
0948                             # missing
0949                             if tmpFile.status != "ready":
0950                                 allOK = False
0951                                 _logger.debug(f"{tmpJob.PandaID} skip since {tmpFile.scope}:{tmpFile.lfn} is missing with rule")
0952                                 break
0953                     if not allOK:
0954                         continue
0955                     # append to run activator
0956                     _logger.debug(f"{tmpJob.PandaID} to activate with rule {str(replicaMap)}")
0957                     actJobs.append(tmpJob)
0958                 # update
0959                 _logger.debug("activating ...")
0960                 self.proxyLock.acquire()
0961                 taskBuffer.activateJobs(actJobs)
0962                 self.proxyLock.release()
0963                 _logger.debug("done")
0964                 time.sleep(1)
0965             except Exception:
0966                 errtype, errvalue = sys.exc_info()[:2]
0967                 _logger.error(f"ActivatorThr failed with {errtype} {errvalue}")
0968             self.pool.remove(self)
0969             self.lock.release()
0970 
0971     _memoryCheck("activator")
0972 
0973     # activate assigned jobs
0974     _logger.debug("==== activate assigned jobs with rule ====")
0975     activatorLock = threading.Semaphore(3)
0976     activatorProxyLock = threading.Lock()
0977     activatorThreadPool = ThreadPool()
0978     timeLimit = naive_utcnow() - datetime.timedelta(hours=1)
0979     # get jobs
0980     for ii in range(1000):
0981         # lock
0982         activatorLock.acquire()
0983         activatorProxyLock.acquire()
0984         ret, res = taskBuffer.lockJobsForActivator(timeLimit, 100, 0)
0985         activatorProxyLock.release()
0986         activatorLock.release()
0987         if res is None:
0988             _logger.debug(f"# of jobs to be activated with rule for {res} ")
0989         else:
0990             _logger.debug(f"# of jobs to be activated with rule for {len(res)} ")
0991         if res is None or len(res) == 0:
0992             break
0993         # run thread
0994         actThr = ActivatorWithRuleThr(activatorLock, activatorProxyLock, res, activatorThreadPool)
0995         actThr.start()
0996     # wait
0997     activatorThreadPool.join()
0998 
0999     # thread to delete sub datasets
1000     class SubDeleter(threading.Thread):
1001         def __init__(self, lock, proxyLock, datasets, pool):
1002             threading.Thread.__init__(self)
1003             self.datasets = datasets
1004             self.lock = lock
1005             self.proxyLock = proxyLock
1006             self.pool = pool
1007             self.pool.add(self)
1008 
1009         def run(self):
1010             self.lock.acquire()
1011             try:
1012                 for vuid, name, modDate in self.datasets:
1013                     # check just in case
1014                     if re.search("_sub\d+$", name) is None:
1015                         _logger.debug(f"skip non sub {name}")
1016                         continue
1017                     _logger.debug(f"delete sub {name}")
1018                     if name.startswith("user.") or name.startswith("group.") or name.startswith("hc_test."):
1019                         dsExists = False
1020                     else:
1021                         dsExists = True
1022                         # get PandaIDs
1023                         self.proxyLock.acquire()
1024                         retF, resF = taskBuffer.querySQLS(
1025                             "SELECT /*+ index(tab FILESTABLE4_DESTDBLOCK_IDX) */ DISTINCT PandaID FROM ATLAS_PANDA.filesTable4 tab WHERE destinationDBlock=:destinationDBlock ",
1026                             {":destinationDBlock": name},
1027                         )
1028                         self.proxyLock.release()
1029                         if retF is None:
1030                             _logger.error(f"SQL error for sub {name}")
1031                             continue
1032                         else:
1033                             _logger.debug(f"sub {name} has {len(resF)} jobs")
1034                             self.proxyLock.acquire()
1035                             # check jobs
1036                             sqlP = "SELECT jobStatus FROM ATLAS_PANDA.jobsArchived4 WHERE PandaID=:PandaID "
1037                             sqlP += "UNION "
1038                             sqlP += "SELECT jobStatus FROM ATLAS_PANDAARCH.jobsArchived WHERE PandaID=:PandaID AND modificationTime>CURRENT_DATE-30 "
1039                             allDone = True
1040                             for (pandaID,) in resF:
1041                                 retP, resP = taskBuffer.querySQLS(sqlP, {":PandaID": pandaID})
1042                                 if len(resP) == 0:
1043                                     _logger.debug(f"skip delete sub {name} PandaID={pandaID} not found")
1044                                     allDone = False
1045                                     break
1046                                 jobStatus = resP[0][0]
1047                                 if jobStatus not in [
1048                                     "finished",
1049                                     "failed",
1050                                     "cancelled",
1051                                     "closed",
1052                                 ]:
1053                                     _logger.debug(f"skip delete sub {name} PandaID={pandaID} is active {jobStatus}")
1054                                     allDone = False
1055                                     break
1056                             self.proxyLock.release()
1057                             if allDone:
1058                                 _logger.debug(f"deleting sub {name}")
1059                                 try:
1060                                     rucioAPI.erase_dataset(name, grace_period=4)
1061                                     status = True
1062                                 except Exception:
1063                                     errtype, errvalue = sys.exc_info()[:2]
1064                                     out = f"{errtype} {errvalue}"
1065                                     _logger.error(f"{name} failed to erase with {out}")
1066                             else:
1067                                 _logger.debug(f"wait sub {name}")
1068                                 continue
1069                     # update dataset
1070                     self.proxyLock.acquire()
1071                     varMap = {}
1072                     varMap[":vuid"] = vuid
1073                     varMap[":ost1"] = "completed"
1074                     varMap[":ost2"] = "cleanup"
1075                     varMap[":newStatus"] = "deleted"
1076                     taskBuffer.querySQLS(
1077                         "UPDATE ATLAS_PANDA.Datasets SET status=:newStatus,modificationdate=CURRENT_DATE WHERE vuid=:vuid AND status IN (:ost1,:ost2) ",
1078                         varMap,
1079                     )
1080                     self.proxyLock.release()
1081                     _logger.debug(f"end {name} ")
1082             except Exception:
1083                 errStr = traceback.format_exc()
1084                 _logger.error(errStr)
1085             self.pool.remove(self)
1086             self.lock.release()
1087 
1088     # delete sub datasets
1089     _logger.debug("==== delete sub datasets ====")
1090     timeLimitU = naive_utcnow() - datetime.timedelta(minutes=30)
1091     timeLimitL = naive_utcnow() - datetime.timedelta(days=14)
1092     subdeleteLock = threading.Semaphore(5)
1093     subdeleteProxyLock = threading.Lock()
1094     subdeleteThreadPool = ThreadPool()
1095     maxRows = 5000
1096     while True:
1097         # lock
1098         subdeleteLock.acquire()
1099         # get datasets
1100         varMap = {}
1101         varMap[":limitU"] = timeLimitU
1102         varMap[":limitL"] = timeLimitL
1103         varMap[":type"] = "output"
1104         varMap[":subtype"] = "sub"
1105         varMap[":st1"] = "completed"
1106         varMap[":st2"] = "cleanup"
1107         sqlQuery = (
1108             "type=:type AND subType=:subtype AND status IN (:st1,:st2) AND (creationdate BETWEEN :limitL AND :limitU) AND (modificationdate BETWEEN :limitL AND :limitU) AND rownum <= %s"
1109             % maxRows
1110         )
1111         subdeleteProxyLock.acquire()
1112         res = taskBuffer.getLockDatasets(sqlQuery, varMap, modTimeOffset="90/24/60")
1113         if res is None:
1114             _logger.debug(f"# of sub datasets to be deleted {res}")
1115         else:
1116             _logger.debug(f"# of sub datasets to be deleted {len(res)}")
1117         if res is None or len(res) == 0:
1118             subdeleteProxyLock.release()
1119             subdeleteLock.release()
1120             break
1121         subdeleteProxyLock.release()
1122         # release
1123         subdeleteLock.release()
1124         # run subdeleter
1125         iRows = 0
1126         nRows = 500
1127         while iRows < len(res):
1128             subdeleter = SubDeleter(
1129                 subdeleteLock,
1130                 subdeleteProxyLock,
1131                 res[iRows : iRows + nRows],
1132                 subdeleteThreadPool,
1133             )
1134             subdeleter.start()
1135             iRows += nRows
1136         subdeleteThreadPool.join()
1137         if len(res) < 100:
1138             break
1139 
1140     # release memory
1141     del siteMapper
1142     del deletedDisList
1143 
1144     _memoryCheck("end")
1145 
1146     # stop taskBuffer if created inside this script
1147     taskBuffer.cleanup(requester=requester_id)
1148 
1149     _logger.debug("===================== end =====================")
1150 
1151 
1152 # run
1153 if __name__ == "__main__":
1154     main()