Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-20 07:58:59

0001 # === Imports ===================================================
0002 
0003 import datetime
0004 import functools
0005 import multiprocessing
0006 import random
0007 import re
0008 import subprocess
0009 import tempfile
0010 import threading
0011 import time
0012 import traceback
0013 import xml.etree.ElementTree as ET
0014 from threading import get_ident
0015 
0016 from pandaharvester.harvesterconfig import harvester_config
0017 from pandaharvester.harvestercore import core_utils
0018 from pandaharvester.harvestercore.core_utils import SingletonWithID
0019 from pandaharvester.harvestercore.fifos import SpecialFIFOBase
0020 
0021 # === Condor ====================================================
0022 
0023 CONDOR_API_TYPE = "python"
0024 CONDOR_API_VERSION = 1
0025 
0026 try:
0027     # try to import htcondor version 2 for htcondor version >= 25
0028     import htcondor2 as htcondor
0029 
0030     CONDOR_API_VERSION = 2
0031 except ImportError:
0032     import htcondor
0033 
0034 
0035 # ===============================================================
0036 
0037 # === Definitions ===============================================
0038 
0039 # logger
0040 baseLogger = core_utils.setup_logger("htcondor_utils")
0041 
0042 # module level lock
0043 moduleLock = threading.Lock()
0044 
0045 # List of job ads required
0046 CONDOR_JOB_ADS_LIST = [
0047     "ClusterId",
0048     "ProcId",
0049     "JobStatus",
0050     "LastJobStatus",
0051     "JobStartDate",
0052     "EnteredCurrentStatus",
0053     "ExitCode",
0054     "HoldReason",
0055     "LastHoldReason",
0056     "RemoveReason",
0057     "harvesterWorkerID",
0058 ]
0059 
0060 # harvesterID
0061 harvesterID = harvester_config.master.harvester_id
0062 
0063 # ===============================================================
0064 
0065 # === Functions =================================================
0066 
0067 
0068 def synchronize(func):
0069     """
0070     synchronize decorator
0071     """
0072 
0073     @functools.wraps(func)
0074     def wrapper(*args, **kwargs):
0075         with moduleLock:
0076             return func(*args, **kwargs)
0077 
0078     return wrapper
0079 
0080 
0081 def _runShell(cmd):
0082     """
0083     Run shell function
0084     """
0085     cmd = str(cmd)
0086     p = subprocess.Popen(cmd.split(), shell=False, universal_newlines=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
0087     stdOut, stdErr = p.communicate()
0088     retCode = p.returncode
0089     return (retCode, stdOut, stdErr)
0090 
0091 
0092 def condor_job_id_from_workspec(workspec):
0093     """
0094     Generate condor job id with schedd host from workspec
0095     """
0096     batchid_str = str(workspec.batchID)
0097     # backward compatibility if workspec.batchID does not contain ProcId
0098     if "." not in batchid_str:
0099         batchid_str += ".0"
0100     return f"{workspec.submissionHost}#{batchid_str}"
0101 
0102 
0103 def get_host_batchid_map(workspec_list):
0104     """
0105     Get a dictionary of submissionHost: list of batchIDs from workspec_list
0106     return {submissionHost_1: {batchID_1_1, ...}, submissionHost_2: {...}, ...}
0107     """
0108     host_batchid_map = {}
0109     for workspec in workspec_list:
0110         host = workspec.submissionHost
0111         batchid = workspec.batchID
0112         if batchid is None:
0113             continue
0114         batchid_str = str(batchid)
0115         # backward compatibility if workspec.batchID does not contain ProcId
0116         if "." not in batchid_str:
0117             batchid_str += ".0"
0118         host_batchid_map.setdefault(host, {})
0119         host_batchid_map[host][batchid_str] = workspec
0120     return host_batchid_map
0121 
0122 
0123 def get_batchid_from_job(job_ads_dict):
0124     """
0125     Get batchID string from condor job dict
0126     """
0127     batchid = f"{job_ads_dict['ClusterId']}.{job_ads_dict['ProcId']}"
0128     return batchid
0129 
0130 
0131 def get_job_id_tuple_from_batchid(batchid):
0132     """
0133     Get tuple (ClusterId, ProcId) from batchID string
0134     """
0135     batchid_str_list = str(batchid).split(".")
0136     clusterid = batchid_str_list[0]
0137     procid = batchid_str_list[1]
0138     if not procid:
0139         procid = 0
0140     return (clusterid, procid)
0141 
0142 
0143 # def jdl_to_map(jdl):
0144 #     """
0145 #     Transform jdl into dictionary
0146 #     The "queue" line (e.g. "queue 1") will be omitted
0147 #     """
0148 #     # FIXME: not containing "+"
0149 #     ret_map = {}
0150 #     for line in jdl.split('\n'):
0151 #         match = re.search('^(.+) = (.+)$', line)
0152 #         if match:
0153 #             ret_map[match(1)] = match(2)
0154 #     return ret_map
0155 
0156 
0157 def condor_submit_process(mp_queue, host, jdl_map_list, tmp_log):
0158     """
0159     Function for new process to submit condor
0160     """
0161     # initialization
0162     errStr = ""
0163     batchIDs_list = []
0164     # parse schedd and pool name
0165     condor_schedd, condor_pool = None, None
0166     if host in ("LOCAL", "None"):
0167         tmp_log.debug(f"submissionHost is {host}, treated as local schedd. Skipped")
0168     else:
0169         try:
0170             condor_schedd, condor_pool = host.split(",")[0:2]
0171         except ValueError:
0172             tmp_log.error(f"Invalid submissionHost: {host} . Skipped")
0173     # get schedd
0174     try:
0175         if condor_pool:
0176             collector = htcondor.Collector(condor_pool)
0177         else:
0178             collector = htcondor.Collector()
0179         if condor_schedd:
0180             scheddAd = collector.locate(htcondor.DaemonTypes.Schedd, condor_schedd)
0181         else:
0182             scheddAd = collector.locate(htcondor.DaemonTypes.Schedd)
0183         schedd = htcondor.Schedd(scheddAd)
0184     except Exception as e:
0185         errStr = f"create condor collector and schedd failed; {e.__class__.__name__}: {e}"
0186     else:
0187         submit_obj = htcondor.Submit()
0188         try:
0189             with schedd.transaction() as txn:
0190                 # TODO: Currently spool is not supported in htcondor.Submit ...
0191                 submit_result = submit_obj.queue_with_itemdata(txn, 1, iter(jdl_map_list))
0192                 clusterid = submit_result.cluster()
0193                 first_proc = submit_result.first_proc()
0194                 num_proc = submit_result.num_procs()
0195                 batchIDs_list.extend([f"{clusterid}.{procid}" for procid in range(first_proc, first_proc + num_proc)])
0196         except RuntimeError as e:
0197             errStr = f"submission failed; {e.__class__.__name__}: {e}"
0198     mp_queue.put((batchIDs_list, errStr))
0199 
0200 
0201 # ===============================================================
0202 
0203 # === Classes ===================================================
0204 
0205 
0206 # Condor queue cache fifo
0207 class CondorQCacheFifo(SpecialFIFOBase, metaclass=SingletonWithID):
0208     global_lock_id = -1
0209 
0210     def __init__(self, target, *args, **kwargs):
0211         name_suffix = target.split(".")[0]
0212         name_suffix = re.sub("-", "_", name_suffix)
0213         self.titleName = f"CondorQCache_{name_suffix}"
0214         SpecialFIFOBase.__init__(self)
0215 
0216     def lock(self, score=None):
0217         lock_key = format(int(random.random() * 2**32), "x")
0218         if score is None:
0219             score = time.time()
0220         retVal = self.putbyid(self.global_lock_id, lock_key, score)
0221         if retVal:
0222             return lock_key
0223         return None
0224 
0225     def unlock(self, key=None, force=False):
0226         peeked_tuple = self.peekbyid(id=self.global_lock_id)
0227         if peeked_tuple.score is None or peeked_tuple.item is None:
0228             return True
0229         elif force or self.decode(peeked_tuple.item) == key:
0230             self.delete([self.global_lock_id])
0231             return True
0232         else:
0233             return False
0234 
0235 
0236 # Condor client
0237 class CondorClient(object):
0238     @classmethod
0239     def renew_session_and_retry(cls, func):
0240         """
0241         If RuntimeError, call renew_session and retry
0242         """
0243         # FIXME: currently hard-coded
0244         to_retry = True
0245         # Wrapper
0246 
0247         def wrapper(self, *args, **kwargs):
0248             # Make logger
0249             tmpLog = core_utils.make_logger(baseLogger, f"submissionHost={self.submissionHost}", method_name="CondorClient.renew_session_if_error")
0250             func_name = func.__name__
0251             try:
0252                 self.schedd
0253             except AttributeError as e:
0254                 if self.lock.acquire(False):
0255                     is_renewed = self.renew_session()
0256                     self.lock.release()
0257                     if not is_renewed:
0258                         errStr = f"failed to communicate with {self.submissionHost}"
0259                         tmpLog.error(errStr)
0260                         tmpLog.debug(f"got RuntimeError: {e}")
0261                         raise Exception(errStr)
0262             try:
0263                 ret = func(self, *args, **kwargs)
0264             except RuntimeError as e:
0265                 tmpLog.debug(f"got RuntimeError: {e}")
0266                 if self.lock.acquire(False):
0267                     is_renewed = self.renew_session()
0268                     self.lock.release()
0269                     if is_renewed:
0270                         if to_retry:
0271                             tmpLog.debug(f"condor session renewed. Retrying {func_name}")
0272                             ret = func(self, *args, **kwargs)
0273                         else:
0274                             tmpLog.debug("condor session renewed")
0275                             raise
0276                     else:
0277                         tmpLog.error("failed to renew condor session")
0278                         raise
0279                 else:
0280                     tmpLog.debug("another thread is renewing condor session; skipped...")
0281                     raise
0282                 tmpLog.debug("done")
0283             return ret
0284 
0285         return wrapper
0286 
0287     def __init__(self, submissionHost, *args, **kwargs):
0288         self.submissionHost = submissionHost
0289         # Make logger
0290         tmpLog = core_utils.make_logger(baseLogger, f"submissionHost={self.submissionHost}", method_name="CondorClient.__init__")
0291         # Initialize
0292         tmpLog.debug("Initializing client")
0293         self.lock = threading.Lock()
0294         self.condor_api_type = CONDOR_API_TYPE
0295         self.condor_schedd = None
0296         self.condor_pool = None
0297         # Parse condor command remote options from workspec
0298         if self.submissionHost in ("LOCAL", "None"):
0299             tmpLog.debug(f"submissionHost is {self.submissionHost}, treated as local schedd. Skipped")
0300         else:
0301             try:
0302                 self.condor_schedd, self.condor_pool = self.submissionHost.split(",")[0:2]
0303                 if self.condor_schedd in ["None"]:
0304                     self.condor_schedd = None
0305                 if self.condor_pool in ["None"]:
0306                     self.condor_pool = None
0307             except ValueError:
0308                 tmpLog.error(f"Invalid submissionHost: {self.submissionHost} . Skipped")
0309         # Use Python API or fall back to command
0310         if self.condor_api_type == "python":
0311             try:
0312                 if CONDOR_API_VERSION == 1:
0313                     self.secman = htcondor.SecMan()
0314                 self.renew_session(init=True)
0315             except Exception as e:
0316                 tmpLog.error(f"Error when using htcondor Python API. Exception {e.__class__.__name__}: {e}")
0317                 raise
0318         tmpLog.debug("Initialized client")
0319 
0320     @synchronize
0321     def renew_session(self, retry=3, init=False):
0322         # Make logger
0323         tmpLog = core_utils.make_logger(baseLogger, f"submissionHost={self.submissionHost}", method_name="CondorClient.renew_session")
0324         # Clear security session if not initialization
0325         if not init:
0326             tmpLog.info("Renew condor session")
0327             if CONDOR_API_VERSION == 1:
0328                 self.secman.invalidateAllSessions()
0329         # Recreate collector and schedd object
0330         i_try = 1
0331         while i_try <= retry:
0332             try:
0333                 tmpLog.info(f"Try {i_try}")
0334                 if self.condor_pool:
0335                     self.collector = htcondor.Collector(self.condor_pool)
0336                 else:
0337                     self.collector = htcondor.Collector()
0338                 if self.condor_schedd:
0339                     self.scheddAd = self.collector.locate(htcondor.DaemonTypes.Schedd, self.condor_schedd)
0340                 else:
0341                     self.scheddAd = self.collector.locate(htcondor.DaemonTypes.Schedd)
0342                 self.schedd = htcondor.Schedd(self.scheddAd)
0343                 tmpLog.info("Success")
0344                 break
0345             except Exception as e:
0346                 tmpLog.warning(f"Recreate condor collector and schedd failed: {e}")
0347                 if i_try < retry:
0348                     tmpLog.warning("Failed. Retry...")
0349                 else:
0350                     tmpLog.warning(f"Retry {i_try} times. Still failed. Skipped")
0351                     return False
0352                 i_try += 1
0353                 if CONDOR_API_VERSION == 1:
0354                     self.secman.invalidateAllSessions()
0355                 time.sleep(3)
0356         # Sleep
0357         time.sleep(3)
0358         return True
0359 
0360 
0361 # Condor job query
0362 class CondorJobQuery(CondorClient, metaclass=SingletonWithID):
0363     # class lock
0364     classLock = threading.Lock()
0365     # Query commands
0366     orig_comStr_list = [
0367         "condor_q -xml",
0368         "condor_history -xml",
0369     ]
0370     # Bad text of redundant xml roots to eleminate from condor XML
0371     badtext = """
0372 </classads>
0373 
0374 <?xml version="1.0"?>
0375 <!DOCTYPE classads SYSTEM "classads.dtd">
0376 <classads>
0377 """
0378 
0379     def __init__(self, cacheEnable=False, cacheRefreshInterval=None, useCondorHistory=True, useCondorHistoryMaxAge=7200, *args, **kwargs):
0380         self.submissionHost = str(kwargs.get("id"))
0381         # Make logger
0382         tmpLog = core_utils.make_logger(
0383             baseLogger, f"submissionHost={self.submissionHost} thrid={get_ident()} oid={id(self)}", method_name="CondorJobQuery.__init__"
0384         )
0385         # Initialize
0386         with self.classLock:
0387             tmpLog.debug("Start")
0388             CondorClient.__init__(self, self.submissionHost, *args, **kwargs)
0389             # For condor_q cache
0390             self.cacheEnable = cacheEnable
0391             if self.cacheEnable:
0392                 self.cache = ([], 0)
0393                 self.cacheRefreshInterval = cacheRefreshInterval
0394             self.useCondorHistory = useCondorHistory
0395             self.useCondorHistoryMaxAge = useCondorHistoryMaxAge
0396             tmpLog.debug("Initialize done")
0397 
0398     def get_all(self, batchIDs_dict=None, allJobs=False, to_update_cache=False):
0399         # Make logger
0400         tmpLog = core_utils.make_logger(baseLogger, f"submissionHost={self.submissionHost}", method_name="CondorJobQuery.get_all")
0401         # Get all
0402         tmpLog.debug("Start")
0403         job_ads_all_dict = {}
0404         if self.condor_api_type == "python":
0405             try:
0406                 job_ads_all_dict = self.query_with_python(batchIDs_dict, allJobs, to_update_cache)
0407             except Exception as e:
0408                 tb_str = traceback.format_exc()
0409                 tmpLog.error(f"Exception {e.__class__.__name__}: {e} ; {tb_str}")
0410                 raise
0411         else:
0412             job_ads_all_dict = self.query_with_command(batchIDs_dict)
0413         return job_ads_all_dict
0414 
0415     def query_with_command(self, batchIDs_dict=None):
0416         # Make logger
0417         tmpLog = core_utils.make_logger(baseLogger, f"submissionHost={self.submissionHost}", method_name="CondorJobQuery.query_with_command")
0418         # Start query
0419         tmpLog.debug("Start query")
0420         if batchIDs_dict is None:
0421             batchIDs_dict = {}
0422         job_ads_all_dict = {}
0423         batchIDs_set = set(batchIDs_dict.keys())
0424         for orig_comStr in self.orig_comStr_list:
0425             # String of batchIDs
0426             batchIDs_str = " ".join(list(batchIDs_set))
0427             # Command
0428             if "condor_q" in orig_comStr or ("condor_history" in orig_comStr and batchIDs_set):
0429                 name_opt = f"-name {self.condor_schedd}" if self.condor_schedd else ""
0430                 pool_opt = f"-pool {self.condor_pool}" if self.condor_pool else ""
0431                 ids = batchIDs_str
0432                 comStr = f"{orig_comStr} {name_opt} {pool_opt} {ids}"
0433             else:
0434                 # tmpLog.debug('No batch job left to query in this cycle by this thread')
0435                 continue
0436             tmpLog.debug(f"check with {comStr}")
0437             (retCode, stdOut, stdErr) = _runShell(comStr)
0438             if retCode == 0:
0439                 # Command succeeded
0440                 job_ads_xml_str = "\n".join(str(stdOut).split(self.badtext))
0441                 if "<c>" in job_ads_xml_str:
0442                     # Found at least one job
0443                     # XML parsing
0444                     xml_root = ET.fromstring(job_ads_xml_str)
0445 
0446                     def _getAttribute_tuple(attribute_xml_element):
0447                         # Attribute name
0448                         _n = str(attribute_xml_element.get("n"))
0449                         # Attribute value text
0450                         _t = " ".join(attribute_xml_element.itertext())
0451                         return (_n, _t)
0452 
0453                     # Every batch job
0454                     for _c in xml_root.findall("c"):
0455                         job_ads_dict = dict()
0456                         # Every attribute
0457                         attribute_iter = map(_getAttribute_tuple, _c.findall("a"))
0458                         job_ads_dict.update(attribute_iter)
0459                         batchid = get_batchid_from_job(job_ads_dict)
0460                         condor_job_id = f"{self.submissionHost}#{batchid}"
0461                         job_ads_all_dict[condor_job_id] = job_ads_dict
0462                         # Remove batch jobs already gotten from the list
0463                         if batchid in batchIDs_set:
0464                             batchIDs_set.discard(batchid)
0465                 else:
0466                     # Job not found
0467                     tmpLog.debug(f"job not found with {comStr}")
0468                     continue
0469             else:
0470                 # Command failed
0471                 errStr = f'command "{comStr}" failed, retCode={retCode}, error: {stdOut} {stdErr}'
0472                 tmpLog.error(errStr)
0473         if len(batchIDs_set) > 0:
0474             # Job unfound via both condor_q or condor_history, marked as unknown worker in harvester
0475             for batchid in batchIDs_set:
0476                 condor_job_id = f"{self.submissionHost}#{batchid}"
0477                 job_ads_all_dict[condor_job_id] = dict()
0478             tmpLog.info(f"Unfound batch jobs of submissionHost={self.submissionHost}: {' '.join(list(batchIDs_set))}")
0479         # Return
0480         return job_ads_all_dict
0481 
0482     @CondorClient.renew_session_and_retry
0483     def query_with_python(self, batchIDs_dict=None, allJobs=False, to_update_cache=False):
0484         # Make logger
0485         tmpLog = core_utils.make_logger(baseLogger, f"submissionHost={self.submissionHost}", method_name="CondorJobQuery.query_with_python")
0486         # Start query
0487         tmpLog.debug("Start query")
0488         cache_fifo = None
0489         job_ads_all_dict = {}
0490         # make id sets
0491         if batchIDs_dict is None:
0492             batchIDs_dict = {}
0493         batchIDs_set = set(batchIDs_dict.keys())
0494         clusterids_set = set([get_job_id_tuple_from_batchid(batchid)[0] for batchid in batchIDs_dict])
0495         # query from cache
0496 
0497         def cache_query(constraint=None, projection=CONDOR_JOB_ADS_LIST, timeout=60):
0498             # query from condor query and update cache to fifo
0499             def update_cache(lockInterval=90):
0500                 tmpLog.debug("update_cache")
0501                 # acquire lock with score timestamp
0502                 score = time.time() - self.cacheRefreshInterval + lockInterval
0503                 lock_key = cache_fifo.lock(score=score)
0504                 if lock_key is not None:
0505                     # acquired lock, update from condor schedd
0506                     tmpLog.debug("got lock, updating cache")
0507                     jobs_iter_orig = self.schedd.query(constraint=constraint, projection=projection)
0508                     jobs_iter = []
0509                     for job in jobs_iter_orig:
0510                         try:
0511                             jobs_iter.append(dict(job))
0512                         except Exception as e:
0513                             tmpLog.error(f"In updating cache schedd query; got exception {e.__class__.__name__}: {e} ; {repr(job)}")
0514                     timeNow = time.time()
0515                     cache_fifo.put(jobs_iter, timeNow)
0516                     self.cache = (jobs_iter, timeNow)
0517                     # release lock
0518                     retVal = cache_fifo.unlock(key=lock_key)
0519                     if retVal:
0520                         tmpLog.debug("done update cache and unlock")
0521                     else:
0522                         tmpLog.warning("cannot unlock... Maybe something wrong")
0523                     return jobs_iter
0524                 else:
0525                     tmpLog.debug("cache fifo locked by other thread. Skipped")
0526                     return None
0527 
0528             # remove invalid or outdated caches from fifo
0529             def cleanup_cache(timeout=60):
0530                 tmpLog.debug("cleanup_cache")
0531                 id_list = list()
0532                 attempt_timestamp = time.time()
0533                 n_cleanup = 0
0534                 while True:
0535                     if time.time() > attempt_timestamp + timeout:
0536                         tmpLog.debug("time is up when cleanup cache. Skipped")
0537                         break
0538                     peeked_tuple = cache_fifo.peek(skip_item=True)
0539                     if peeked_tuple is None:
0540                         tmpLog.debug("empty cache fifo")
0541                         break
0542                     elif peeked_tuple.score is not None and time.time() <= peeked_tuple.score + self.cacheRefreshInterval:
0543                         tmpLog.debug("nothing expired")
0544                         break
0545                     elif peeked_tuple.id is not None:
0546                         retVal = cache_fifo.delete([peeked_tuple.id])
0547                         if isinstance(retVal, int):
0548                             n_cleanup += retVal
0549                     else:
0550                         # problematic
0551                         tmpLog.warning("got nothing when cleanup cache, maybe problematic. Skipped")
0552                         break
0553                 tmpLog.debug(f"cleaned up {n_cleanup} objects in cache fifo")
0554 
0555             # start
0556             jobs_iter = tuple()
0557             try:
0558                 attempt_timestamp = time.time()
0559                 while True:
0560                     if time.time() > attempt_timestamp + timeout:
0561                         # skip cache_query if too long
0562                         tmpLog.debug(f"cache_query got timeout ({timeout} seconds). Skipped ")
0563                         break
0564                     # get latest cache
0565                     peeked_tuple = cache_fifo.peeklast(skip_item=True)
0566                     if peeked_tuple is not None and peeked_tuple.score is not None:
0567                         # got something
0568                         if peeked_tuple.id == cache_fifo.global_lock_id:
0569                             if time.time() <= peeked_tuple.score + self.cacheRefreshInterval:
0570                                 # lock
0571                                 tmpLog.debug("got fifo locked. Wait and retry...")
0572                                 time.sleep(random.uniform(1, 5))
0573                                 continue
0574                             else:
0575                                 # expired lock
0576                                 tmpLog.debug("got lock expired. Clean up and retry...")
0577                                 cleanup_cache()
0578                                 continue
0579                         elif not to_update_cache and time.time() <= peeked_tuple.score + self.cacheRefreshInterval:
0580                             # got valid cache
0581                             _obj, _last_update = self.cache
0582                             if _last_update >= peeked_tuple.score:
0583                                 # valid local cache
0584                                 tmpLog.debug("valid local cache")
0585                                 jobs_iter = _obj
0586                             else:
0587                                 # valid fifo cache
0588                                 tmpLog.debug("update local cache from fifo")
0589                                 peeked_tuple_with_item = cache_fifo.peeklast()
0590                                 if (
0591                                     peeked_tuple_with_item is not None
0592                                     and peeked_tuple.id != cache_fifo.global_lock_id
0593                                     and peeked_tuple_with_item.item is not None
0594                                 ):
0595                                     jobs_iter = cache_fifo.decode(peeked_tuple_with_item.item)
0596                                     self.cache = (jobs_iter, peeked_tuple_with_item.score)
0597                                 else:
0598                                     tmpLog.debug("peeked invalid cache fifo object. Wait and retry...")
0599                                     time.sleep(random.uniform(1, 5))
0600                                     continue
0601                         else:
0602                             # cache expired or force to_update_cache
0603                             tmpLog.debug("update cache in fifo")
0604                             retVal = update_cache()
0605                             if retVal is not None:
0606                                 jobs_iter = retVal
0607                             cleanup_cache()
0608                         break
0609                     else:
0610                         # no cache in fifo, check with size again
0611                         if cache_fifo.size() == 0:
0612                             if time.time() > attempt_timestamp + random.uniform(10, 30):
0613                                 # have waited for long enough, update cache
0614                                 tmpLog.debug("waited enough, update cache in fifo")
0615                                 retVal = update_cache()
0616                                 if retVal is not None:
0617                                     jobs_iter = retVal
0618                                 break
0619                             else:
0620                                 # still nothing, wait
0621                                 time.sleep(2)
0622                         continue
0623             except Exception as _e:
0624                 tb_str = traceback.format_exc()
0625                 tmpLog.error(f"Error querying from cache fifo; {_e} ; {tb_str}")
0626             return jobs_iter
0627 
0628         # query method options
0629         query_method_list = [self.schedd.query]
0630         if self.cacheEnable:
0631             cache_fifo = CondorQCacheFifo(target=self.submissionHost, id=f"{self.submissionHost},{get_ident()}")
0632             query_method_list.insert(0, cache_query)
0633         if self.useCondorHistory:
0634             query_method_list.append(self.schedd.history)
0635         # Go
0636         for query_method in query_method_list:
0637             # exclude already checked and outdated jobs before querying with condor history
0638             if query_method.__name__ == self.schedd.history.__name__:
0639                 now_time = core_utils.naive_utcnow()
0640                 update_time_threshold = now_time - datetime.timedelta(seconds=self.useCondorHistoryMaxAge)
0641                 clusterids_set = set()
0642                 for batchid, workspec in batchIDs_dict.items():
0643                     if batchid in batchIDs_set and (
0644                         (workspec.submitTime and workspec.submitTime >= update_time_threshold)
0645                         or (workspec.startTime and workspec.startTime >= update_time_threshold)
0646                     ):
0647                         clusterid = get_job_id_tuple_from_batchid(batchid)[0]
0648                         clusterids_set.add(clusterid)
0649                 # skip if no clusterid to check
0650                 if not clusterids_set:
0651                     break
0652             # Make constraint
0653             clusterids_str = ",".join(list(clusterids_set))
0654             if query_method is cache_query or allJobs:
0655                 constraint = f'harvesterID =?= "{harvesterID}"'
0656             else:
0657                 constraint = f"member(ClusterID, {{{clusterids_str}}})"
0658             if allJobs:
0659                 tmpLog.debug(f"Query method: {query_method.__name__} ; allJobs")
0660             else:
0661                 tmpLog.debug(f'Query method: {query_method.__name__} ; clusterids: "{clusterids_str}"')
0662             # Query
0663             jobs_iter = query_method(constraint=constraint, projection=CONDOR_JOB_ADS_LIST)
0664             for job in jobs_iter:
0665                 try:
0666                     job_ads_dict = dict(job)
0667                 except Exception as e:
0668                     tmpLog.error(f"In doing schedd query or history; got exception {e.__class__.__name__}: {e} ; {repr(job)}")
0669                 batchid = get_batchid_from_job(job_ads_dict)
0670                 condor_job_id = f"{self.submissionHost}#{batchid}"
0671                 job_ads_all_dict[condor_job_id] = job_ads_dict
0672                 # Remove batch jobs already gotten from the list
0673                 if not allJobs:
0674                     batchIDs_set.discard(batchid)
0675             if len(batchIDs_set) == 0 or allJobs:
0676                 break
0677         # Remaining
0678         if not allJobs and len(batchIDs_set) > 0:
0679             # Job unfound via both condor_q or condor_history, marked as unknown worker in harvester
0680             for batchid in batchIDs_set:
0681                 condor_job_id = f"{self.submissionHost}#{batchid}"
0682                 job_ads_all_dict[condor_job_id] = dict()
0683             tmpLog.info(f"Unfound batch jobs of submissionHost={self.submissionHost}: {' '.join(list(batchIDs_set))}")
0684         # Return
0685         return job_ads_all_dict
0686 
0687 
0688 # Condor job submit
0689 class CondorJobSubmit(CondorClient, metaclass=SingletonWithID):
0690     # class lock
0691     classLock = threading.Lock()
0692 
0693     def __init__(self, *args, **kwargs):
0694         self.submissionHost = str(kwargs.get("id"))
0695         # Make logger
0696         tmpLog = core_utils.make_logger(
0697             baseLogger, f"submissionHost={self.submissionHost} thrid={get_ident()} oid={id(self)}", method_name="CondorJobSubmit.__init__"
0698         )
0699         # Initialize
0700         tmpLog.debug("Start")
0701         self.lock = threading.Lock()
0702         CondorClient.__init__(self, self.submissionHost, *args, **kwargs)
0703         tmpLog.debug("Initialize done")
0704 
0705     def submit(self, jdl_list, use_spool=False):
0706         # Make logger
0707         tmpLog = core_utils.make_logger(baseLogger, f"submissionHost={self.submissionHost}", method_name="CondorJobSubmit.submit")
0708         # Get all
0709         tmpLog.debug("Start")
0710         job_ads_all_dict = {}
0711         if self.condor_api_type == "python":
0712             try:
0713                 # TODO: submit_with_python will meet segfault or c++ error after many times of submission; need help from condor team
0714                 # TODO: submit_with_python_process has no such error but spawns some processes that will not terminate after harvester stops
0715                 # TODO: Fall back to submit_with_command for now
0716                 # retVal = self.submit_with_python(jdl_list, use_spool)
0717                 # retVal = self.submit_with_python_process(jdl_list, use_spool)
0718                 retVal = self.submit_with_command(jdl_list, use_spool)
0719             except Exception as e:
0720                 tmpLog.error(f"Exception {e.__class__.__name__}: {e}")
0721                 raise
0722         else:
0723             retVal = self.submit_with_command(jdl_list, use_spool)
0724         return retVal
0725 
0726     def submit_with_command(self, jdl_list, use_spool=False, tmp_str="", keep_temp_sdf=False):
0727         # Make logger
0728         tmpLog = core_utils.make_logger(baseLogger, f"submissionHost={self.submissionHost}", method_name="CondorJobSubmit.submit_with_command")
0729         # Initialize
0730         errStr = ""
0731         batchIDs_list = []
0732         # make sdf temp file from jdls
0733         tmpFile = tempfile.NamedTemporaryFile(mode="w", delete=(not keep_temp_sdf), suffix=f"_{tmp_str}_cluster_submit.sdf")
0734         sdf_file = tmpFile.name
0735         tmpFile.write("\n\n".join(jdl_list))
0736         tmpFile.flush()
0737         # make condor remote options
0738         name_opt = f"-name {self.condor_schedd}" if self.condor_schedd else ""
0739         pool_opt = f"-pool {self.condor_pool}" if self.condor_pool else ""
0740         spool_opt = "-remote -spool" if use_spool and self.condor_schedd else ""
0741         # command
0742         comStr = f"condor_submit -single-cluster {spool_opt} {name_opt} {pool_opt} {sdf_file}"
0743         # submit
0744         tmpLog.debug(f"submit with command: {comStr}")
0745         try:
0746             p = subprocess.Popen(comStr.split(), shell=False, universal_newlines=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
0747             # check return code
0748             stdOut, stdErr = p.communicate()
0749             retCode = p.returncode
0750         except Exception as e:
0751             stdOut = ""
0752             stdErr = core_utils.dump_error_message(tmpLog, no_message=True)
0753             retCode = 1
0754             errStr = f"{e.__class__.__name__}: {e}"
0755         finally:
0756             tmpFile.close()
0757         tmpLog.debug(f"retCode={retCode}")
0758         if retCode == 0:
0759             # extract clusterid and n_jobs
0760             job_id_match = None
0761             for tmp_line_str in stdOut.split("\n"):
0762                 job_id_match = re.search("^(\d+) job[(]s[)] submitted to cluster (\d+)\.$", tmp_line_str)
0763                 if job_id_match:
0764                     break
0765             if job_id_match is not None:
0766                 n_jobs = int(job_id_match.group(1))
0767                 clusterid = job_id_match.group(2)
0768                 batchIDs_list = [f"{clusterid}.{procid}" for procid in range(n_jobs)]
0769                 tmpLog.debug(f"submitted {n_jobs} jobs: {' '.join(batchIDs_list)}")
0770             else:
0771                 errStr = f"no job submitted: {errStr}"
0772                 tmpLog.error(errStr)
0773         else:
0774             errStr = f"{stdErr} ; {errStr}"
0775             tmpLog.error(f"submission failed: {errStr}")
0776         # Return
0777         return (batchIDs_list, errStr)
0778 
0779     @CondorClient.renew_session_and_retry
0780     def submit_with_python(self, jdl_list, use_spool=False):
0781         # Make logger
0782         tmpLog = core_utils.make_logger(baseLogger, f"submissionHost={self.submissionHost}", method_name="CondorJobSubmit.submit_with_python")
0783         # Start
0784         tmpLog.debug("Start")
0785         # Initialize
0786         errStr = ""
0787         batchIDs_list = []
0788         # Make list of jdl map with dummy submit objects
0789         jdl_map_list = [dict(htcondor.Submit(jdl).items()) for jdl in jdl_list]
0790         # Go
0791         submit_obj = htcondor.Submit()
0792         try:
0793             with self.schedd.transaction() as txn:
0794                 # TODO: Currently spool is not supported in htcondor.Submit ...
0795                 submit_result = submit_obj.queue_with_itemdata(txn, 1, iter(jdl_map_list))
0796                 clusterid = submit_result.cluster()
0797                 first_proc = submit_result.first_proc()
0798                 num_proc = submit_result.num_procs()
0799                 batchIDs_list.extend([f"{clusterid}.{procid}" for procid in range(first_proc, first_proc + num_proc)])
0800         except RuntimeError as e:
0801             errStr = f"{e.__class__.__name__}: {e}"
0802             tmpLog.error(f"submission failed: {errStr}")
0803             raise
0804         if batchIDs_list:
0805             n_jobs = len(batchIDs_list)
0806             tmpLog.debug(f"submitted {n_jobs} jobs: {' '.join(batchIDs_list)}")
0807         elif not errStr:
0808             tmpLog.error("submitted nothing")
0809         tmpLog.debug("Done")
0810         # Return
0811         return (batchIDs_list, errStr)
0812 
0813     def submit_with_python_process(self, jdl_list, use_spool=False):
0814         # Make logger
0815         tmpLog = core_utils.make_logger(baseLogger, f"submissionHost={self.submissionHost}", method_name="CondorJobSubmit.submit_with_python_process")
0816         # Start
0817         tmpLog.debug("Start")
0818         # Make list of jdl map with dummy submit objects
0819         jdl_map_list = [dict(htcondor.Submit(jdl).items()) for jdl in jdl_list]
0820         # Go
0821         mp_queue = multiprocessing.Queue()
0822         mp_process = multiprocessing.Process(target=condor_submit_process, args=(mp_queue, self.submissionHost, jdl_map_list, tmpLog))
0823         mp_process.daemon = True
0824         mp_process.start()
0825         (batchIDs_list, errStr) = mp_queue.get()
0826         mp_queue.close()
0827         mp_process.terminate()
0828         mp_process.join()
0829         if batchIDs_list:
0830             n_jobs = len(batchIDs_list)
0831             tmpLog.debug(f"submitted {n_jobs} jobs: {' '.join(batchIDs_list)}")
0832         elif not errStr:
0833             tmpLog.error("submitted nothing")
0834         tmpLog.debug("Done")
0835         # Return
0836         return (batchIDs_list, errStr)
0837 
0838 
0839 # Condor job remove
0840 class CondorJobManage(CondorClient, metaclass=SingletonWithID):
0841     # class lock
0842     classLock = threading.Lock()
0843 
0844     def __init__(self, *args, **kwargs):
0845         self.submissionHost = str(kwargs.get("id"))
0846         # Make logger
0847         tmpLog = core_utils.make_logger(
0848             baseLogger, f"submissionHost={self.submissionHost} thrid={get_ident()} oid={id(self)}", method_name="CondorJobManage.__init__"
0849         )
0850         # Initialize
0851         tmpLog.debug("Start")
0852         self.lock = threading.Lock()
0853         CondorClient.__init__(self, self.submissionHost, *args, **kwargs)
0854         tmpLog.debug("Initialize done")
0855 
0856     def remove(self, batchIDs_list=[]):
0857         # Make logger
0858         tmpLog = core_utils.make_logger(baseLogger, f"submissionHost={self.submissionHost}", method_name="CondorJobManage.remove")
0859         # Get all
0860         tmpLog.debug("Start")
0861         job_ads_all_dict = {}
0862         if self.condor_api_type == "python":
0863             try:
0864                 retVal = self.remove_with_python(batchIDs_list)
0865             except Exception as e:
0866                 tmpLog.error(f"Exception {e.__class__.__name__}: {e}")
0867                 raise
0868         else:
0869             retVal = self.remove_with_command(batchIDs_list)
0870         return retVal
0871 
0872     def remove_with_command(self, batchIDs_list=[]):
0873         # Make logger
0874         tmpLog = core_utils.make_logger(baseLogger, f"submissionHost={self.submissionHost}", method_name="CondorJobManage.remove_with_command")
0875         # if workspec.batchID is None:
0876         #     tmpLog.info('Found workerID={0} has submissionHost={1} batchID={2} . Cannot kill. Skipped '.format(
0877         #                     workspec.workerID, workspec.submissionHost, workspec.batchID))
0878         #     ret_list.append((True, ''))
0879         #
0880         # ## Parse condor remote options
0881         # name_opt, pool_opt = '', ''
0882         # if workspec.submissionHost is None or workspec.submissionHost == 'LOCAL':
0883         #     pass
0884         # else:
0885         #     try:
0886         #         condor_schedd, condor_pool = workspec.submissionHost.split(',')[0:2]
0887         #     except ValueError:
0888         #         errStr = 'Invalid submissionHost: {0} . Skipped'.format(workspec.submissionHost)
0889         #         tmpLog.error(errStr)
0890         #         ret_list.append((False, errStr))
0891         #     name_opt = '-name {0}'.format(condor_schedd) if condor_schedd else ''
0892         #     pool_opt = '-pool {0}'.format(condor_pool) if condor_pool else ''
0893         #
0894         # ## Kill command
0895         # comStr = 'condor_rm {name_opt} {pool_opt} {batchID}'.format(name_opt=name_opt,
0896         #                                                             pool_opt=pool_opt,
0897         #                                                             batchID=workspec.batchID)
0898         # (retCode, stdOut, stdErr) = _runShell(comStr)
0899         # if retCode != 0:
0900         #     comStr = 'condor_q -l {name_opt} {pool_opt} {batchID}'.format(name_opt=name_opt,
0901         #                                                                 pool_opt=pool_opt,
0902         #                                                                 batchID=workspec.batchID)
0903         #     (retCode, stdOut, stdErr) = _runShell(comStr)
0904         #     if ('ClusterId = {0}'.format(workspec.batchID) in str(stdOut) \
0905         #         and 'JobStatus = 3' not in str(stdOut)) or retCode != 0:
0906         #         ## Force to cancel if batch job not terminated first time
0907         #         comStr = 'condor_rm -forcex {name_opt} {pool_opt} {batchID}'.format(name_opt=name_opt,
0908         #                                                                     pool_opt=pool_opt,
0909         #                                                                     batchID=workspec.batchID)
0910         #         (retCode, stdOut, stdErr) = _runShell(comStr)
0911         #         if retCode != 0:
0912         #             ## Command failed to kill
0913         #             errStr = 'command "{0}" failed, retCode={1}, error: {2} {3}'.format(comStr, retCode, stdOut, stdErr)
0914         #             tmpLog.error(errStr)
0915         #             ret_list.append((False, errStr))
0916         #     ## Found already killed
0917         #     tmpLog.info('Found workerID={0} submissionHost={1} batchID={2} already killed'.format(
0918         #                     workspec.workerID, workspec.submissionHost, workspec.batchID))
0919         # else:
0920         #     tmpLog.info('Succeeded to kill workerID={0} submissionHost={1} batchID={2}'.format(
0921         #                     workspec.workerID, workspec.submissionHost, workspec.batchID))
0922         raise NotImplementedError
0923 
0924     @CondorClient.renew_session_and_retry
0925     def remove_with_python(self, batchIDs_list=[]):
0926         # Make logger
0927         tmpLog = core_utils.make_logger(baseLogger, f"submissionHost={self.submissionHost}", method_name="CondorJobManage.remove_with_python")
0928         # Start
0929         tmpLog.debug("Start")
0930         # Acquire class lock
0931         with self.classLock:
0932             tmpLog.debug("Got class lock")
0933             # Initialize
0934             ret_list = []
0935             retMap = {}
0936             # Go
0937             n_jobs = len(batchIDs_list)
0938             act_ret = self.schedd.act(htcondor.JobAction.Remove, batchIDs_list)
0939             # Check if all jobs clear (off from schedd queue)
0940             is_all_clear = n_jobs == act_ret["TotalAlreadyDone"] + act_ret["TotalNotFound"] + act_ret["TotalSuccess"]
0941             if act_ret and is_all_clear:
0942                 tmpLog.debug(f"removed {n_jobs} jobs: {','.join(batchIDs_list)}")
0943                 for batchid in batchIDs_list:
0944                     condor_job_id = f"{self.submissionHost}#{batchid}"
0945                     retMap[condor_job_id] = (True, "")
0946             else:
0947                 tmpLog.error(f"job removal failed; batchIDs_list={batchIDs_list}, got: {act_ret}")
0948                 # need to query queue for unterminated jobs not removed yet
0949                 clusterids_set = set([get_job_id_tuple_from_batchid(batchid)[0] for batchid in batchIDs_list])
0950                 clusterids_str = ",".join(list(clusterids_set))
0951                 constraint = f"member(ClusterID, {{{clusterids_str}}}) && JobStatus =!= 3 && JobStatus =!= 4"
0952                 jobs_iter = self.schedd.query(constraint=constraint, projection=CONDOR_JOB_ADS_LIST)
0953                 all_batchid_map = {}
0954                 ok_batchid_list = []
0955                 ng_batchid_list = []
0956                 for job in jobs_iter:
0957                     job_ads_dict = dict(job)
0958                     batchid = get_batchid_from_job(job_ads_dict)
0959                     all_batchid_map[batchid] = job_ads_dict
0960                 for batchid in batchIDs_list:
0961                     condor_job_id = f"{self.submissionHost}#{batchid}"
0962                     if batchid in all_batchid_map:
0963                         ng_batchid_list.append(batchid)
0964                         retMap[condor_job_id] = (False, f"batchID={batchid} still unterminated in condor queue")
0965                     else:
0966                         ok_batchid_list.append(batchid)
0967                         retMap[condor_job_id] = (True, "")
0968                 tmpLog.debug(
0969                     "removed {0} jobs: {1} ; failed to remove {2} jobs: {3}".format(
0970                         len(ok_batchid_list), ",".join(ok_batchid_list), len(ng_batchid_list), ",".join(ng_batchid_list)
0971                     )
0972                 )
0973         tmpLog.debug("Done")
0974         # Return
0975         return retMap
0976 
0977 
0978 # ===============================================================