File indexing completed on 2026-04-20 07:58:59
0001
0002
0003 import datetime
0004 import functools
0005 import multiprocessing
0006 import random
0007 import re
0008 import subprocess
0009 import tempfile
0010 import threading
0011 import time
0012 import traceback
0013 import xml.etree.ElementTree as ET
0014 from threading import get_ident
0015
0016 from pandaharvester.harvesterconfig import harvester_config
0017 from pandaharvester.harvestercore import core_utils
0018 from pandaharvester.harvestercore.core_utils import SingletonWithID
0019 from pandaharvester.harvestercore.fifos import SpecialFIFOBase
0020
0021
0022
0023 CONDOR_API_TYPE = "python"
0024 CONDOR_API_VERSION = 1
0025
0026 try:
0027
0028 import htcondor2 as htcondor
0029
0030 CONDOR_API_VERSION = 2
0031 except ImportError:
0032 import htcondor
0033
0034
0035
0036
0037
0038
0039
0040 baseLogger = core_utils.setup_logger("htcondor_utils")
0041
0042
0043 moduleLock = threading.Lock()
0044
0045
0046 CONDOR_JOB_ADS_LIST = [
0047 "ClusterId",
0048 "ProcId",
0049 "JobStatus",
0050 "LastJobStatus",
0051 "JobStartDate",
0052 "EnteredCurrentStatus",
0053 "ExitCode",
0054 "HoldReason",
0055 "LastHoldReason",
0056 "RemoveReason",
0057 "harvesterWorkerID",
0058 ]
0059
0060
0061 harvesterID = harvester_config.master.harvester_id
0062
0063
0064
0065
0066
0067
0068 def synchronize(func):
0069 """
0070 synchronize decorator
0071 """
0072
0073 @functools.wraps(func)
0074 def wrapper(*args, **kwargs):
0075 with moduleLock:
0076 return func(*args, **kwargs)
0077
0078 return wrapper
0079
0080
0081 def _runShell(cmd):
0082 """
0083 Run shell function
0084 """
0085 cmd = str(cmd)
0086 p = subprocess.Popen(cmd.split(), shell=False, universal_newlines=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
0087 stdOut, stdErr = p.communicate()
0088 retCode = p.returncode
0089 return (retCode, stdOut, stdErr)
0090
0091
0092 def condor_job_id_from_workspec(workspec):
0093 """
0094 Generate condor job id with schedd host from workspec
0095 """
0096 batchid_str = str(workspec.batchID)
0097
0098 if "." not in batchid_str:
0099 batchid_str += ".0"
0100 return f"{workspec.submissionHost}#{batchid_str}"
0101
0102
0103 def get_host_batchid_map(workspec_list):
0104 """
0105 Get a dictionary of submissionHost: list of batchIDs from workspec_list
0106 return {submissionHost_1: {batchID_1_1, ...}, submissionHost_2: {...}, ...}
0107 """
0108 host_batchid_map = {}
0109 for workspec in workspec_list:
0110 host = workspec.submissionHost
0111 batchid = workspec.batchID
0112 if batchid is None:
0113 continue
0114 batchid_str = str(batchid)
0115
0116 if "." not in batchid_str:
0117 batchid_str += ".0"
0118 host_batchid_map.setdefault(host, {})
0119 host_batchid_map[host][batchid_str] = workspec
0120 return host_batchid_map
0121
0122
0123 def get_batchid_from_job(job_ads_dict):
0124 """
0125 Get batchID string from condor job dict
0126 """
0127 batchid = f"{job_ads_dict['ClusterId']}.{job_ads_dict['ProcId']}"
0128 return batchid
0129
0130
0131 def get_job_id_tuple_from_batchid(batchid):
0132 """
0133 Get tuple (ClusterId, ProcId) from batchID string
0134 """
0135 batchid_str_list = str(batchid).split(".")
0136 clusterid = batchid_str_list[0]
0137 procid = batchid_str_list[1]
0138 if not procid:
0139 procid = 0
0140 return (clusterid, procid)
0141
0142
0143
0144
0145
0146
0147
0148
0149
0150
0151
0152
0153
0154
0155
0156
0157 def condor_submit_process(mp_queue, host, jdl_map_list, tmp_log):
0158 """
0159 Function for new process to submit condor
0160 """
0161
0162 errStr = ""
0163 batchIDs_list = []
0164
0165 condor_schedd, condor_pool = None, None
0166 if host in ("LOCAL", "None"):
0167 tmp_log.debug(f"submissionHost is {host}, treated as local schedd. Skipped")
0168 else:
0169 try:
0170 condor_schedd, condor_pool = host.split(",")[0:2]
0171 except ValueError:
0172 tmp_log.error(f"Invalid submissionHost: {host} . Skipped")
0173
0174 try:
0175 if condor_pool:
0176 collector = htcondor.Collector(condor_pool)
0177 else:
0178 collector = htcondor.Collector()
0179 if condor_schedd:
0180 scheddAd = collector.locate(htcondor.DaemonTypes.Schedd, condor_schedd)
0181 else:
0182 scheddAd = collector.locate(htcondor.DaemonTypes.Schedd)
0183 schedd = htcondor.Schedd(scheddAd)
0184 except Exception as e:
0185 errStr = f"create condor collector and schedd failed; {e.__class__.__name__}: {e}"
0186 else:
0187 submit_obj = htcondor.Submit()
0188 try:
0189 with schedd.transaction() as txn:
0190
0191 submit_result = submit_obj.queue_with_itemdata(txn, 1, iter(jdl_map_list))
0192 clusterid = submit_result.cluster()
0193 first_proc = submit_result.first_proc()
0194 num_proc = submit_result.num_procs()
0195 batchIDs_list.extend([f"{clusterid}.{procid}" for procid in range(first_proc, first_proc + num_proc)])
0196 except RuntimeError as e:
0197 errStr = f"submission failed; {e.__class__.__name__}: {e}"
0198 mp_queue.put((batchIDs_list, errStr))
0199
0200
0201
0202
0203
0204
0205
0206
0207 class CondorQCacheFifo(SpecialFIFOBase, metaclass=SingletonWithID):
0208 global_lock_id = -1
0209
0210 def __init__(self, target, *args, **kwargs):
0211 name_suffix = target.split(".")[0]
0212 name_suffix = re.sub("-", "_", name_suffix)
0213 self.titleName = f"CondorQCache_{name_suffix}"
0214 SpecialFIFOBase.__init__(self)
0215
0216 def lock(self, score=None):
0217 lock_key = format(int(random.random() * 2**32), "x")
0218 if score is None:
0219 score = time.time()
0220 retVal = self.putbyid(self.global_lock_id, lock_key, score)
0221 if retVal:
0222 return lock_key
0223 return None
0224
0225 def unlock(self, key=None, force=False):
0226 peeked_tuple = self.peekbyid(id=self.global_lock_id)
0227 if peeked_tuple.score is None or peeked_tuple.item is None:
0228 return True
0229 elif force or self.decode(peeked_tuple.item) == key:
0230 self.delete([self.global_lock_id])
0231 return True
0232 else:
0233 return False
0234
0235
0236
0237 class CondorClient(object):
0238 @classmethod
0239 def renew_session_and_retry(cls, func):
0240 """
0241 If RuntimeError, call renew_session and retry
0242 """
0243
0244 to_retry = True
0245
0246
0247 def wrapper(self, *args, **kwargs):
0248
0249 tmpLog = core_utils.make_logger(baseLogger, f"submissionHost={self.submissionHost}", method_name="CondorClient.renew_session_if_error")
0250 func_name = func.__name__
0251 try:
0252 self.schedd
0253 except AttributeError as e:
0254 if self.lock.acquire(False):
0255 is_renewed = self.renew_session()
0256 self.lock.release()
0257 if not is_renewed:
0258 errStr = f"failed to communicate with {self.submissionHost}"
0259 tmpLog.error(errStr)
0260 tmpLog.debug(f"got RuntimeError: {e}")
0261 raise Exception(errStr)
0262 try:
0263 ret = func(self, *args, **kwargs)
0264 except RuntimeError as e:
0265 tmpLog.debug(f"got RuntimeError: {e}")
0266 if self.lock.acquire(False):
0267 is_renewed = self.renew_session()
0268 self.lock.release()
0269 if is_renewed:
0270 if to_retry:
0271 tmpLog.debug(f"condor session renewed. Retrying {func_name}")
0272 ret = func(self, *args, **kwargs)
0273 else:
0274 tmpLog.debug("condor session renewed")
0275 raise
0276 else:
0277 tmpLog.error("failed to renew condor session")
0278 raise
0279 else:
0280 tmpLog.debug("another thread is renewing condor session; skipped...")
0281 raise
0282 tmpLog.debug("done")
0283 return ret
0284
0285 return wrapper
0286
0287 def __init__(self, submissionHost, *args, **kwargs):
0288 self.submissionHost = submissionHost
0289
0290 tmpLog = core_utils.make_logger(baseLogger, f"submissionHost={self.submissionHost}", method_name="CondorClient.__init__")
0291
0292 tmpLog.debug("Initializing client")
0293 self.lock = threading.Lock()
0294 self.condor_api_type = CONDOR_API_TYPE
0295 self.condor_schedd = None
0296 self.condor_pool = None
0297
0298 if self.submissionHost in ("LOCAL", "None"):
0299 tmpLog.debug(f"submissionHost is {self.submissionHost}, treated as local schedd. Skipped")
0300 else:
0301 try:
0302 self.condor_schedd, self.condor_pool = self.submissionHost.split(",")[0:2]
0303 if self.condor_schedd in ["None"]:
0304 self.condor_schedd = None
0305 if self.condor_pool in ["None"]:
0306 self.condor_pool = None
0307 except ValueError:
0308 tmpLog.error(f"Invalid submissionHost: {self.submissionHost} . Skipped")
0309
0310 if self.condor_api_type == "python":
0311 try:
0312 if CONDOR_API_VERSION == 1:
0313 self.secman = htcondor.SecMan()
0314 self.renew_session(init=True)
0315 except Exception as e:
0316 tmpLog.error(f"Error when using htcondor Python API. Exception {e.__class__.__name__}: {e}")
0317 raise
0318 tmpLog.debug("Initialized client")
0319
0320 @synchronize
0321 def renew_session(self, retry=3, init=False):
0322
0323 tmpLog = core_utils.make_logger(baseLogger, f"submissionHost={self.submissionHost}", method_name="CondorClient.renew_session")
0324
0325 if not init:
0326 tmpLog.info("Renew condor session")
0327 if CONDOR_API_VERSION == 1:
0328 self.secman.invalidateAllSessions()
0329
0330 i_try = 1
0331 while i_try <= retry:
0332 try:
0333 tmpLog.info(f"Try {i_try}")
0334 if self.condor_pool:
0335 self.collector = htcondor.Collector(self.condor_pool)
0336 else:
0337 self.collector = htcondor.Collector()
0338 if self.condor_schedd:
0339 self.scheddAd = self.collector.locate(htcondor.DaemonTypes.Schedd, self.condor_schedd)
0340 else:
0341 self.scheddAd = self.collector.locate(htcondor.DaemonTypes.Schedd)
0342 self.schedd = htcondor.Schedd(self.scheddAd)
0343 tmpLog.info("Success")
0344 break
0345 except Exception as e:
0346 tmpLog.warning(f"Recreate condor collector and schedd failed: {e}")
0347 if i_try < retry:
0348 tmpLog.warning("Failed. Retry...")
0349 else:
0350 tmpLog.warning(f"Retry {i_try} times. Still failed. Skipped")
0351 return False
0352 i_try += 1
0353 if CONDOR_API_VERSION == 1:
0354 self.secman.invalidateAllSessions()
0355 time.sleep(3)
0356
0357 time.sleep(3)
0358 return True
0359
0360
0361
0362 class CondorJobQuery(CondorClient, metaclass=SingletonWithID):
0363
0364 classLock = threading.Lock()
0365
0366 orig_comStr_list = [
0367 "condor_q -xml",
0368 "condor_history -xml",
0369 ]
0370
0371 badtext = """
0372 </classads>
0373
0374 <?xml version="1.0"?>
0375 <!DOCTYPE classads SYSTEM "classads.dtd">
0376 <classads>
0377 """
0378
0379 def __init__(self, cacheEnable=False, cacheRefreshInterval=None, useCondorHistory=True, useCondorHistoryMaxAge=7200, *args, **kwargs):
0380 self.submissionHost = str(kwargs.get("id"))
0381
0382 tmpLog = core_utils.make_logger(
0383 baseLogger, f"submissionHost={self.submissionHost} thrid={get_ident()} oid={id(self)}", method_name="CondorJobQuery.__init__"
0384 )
0385
0386 with self.classLock:
0387 tmpLog.debug("Start")
0388 CondorClient.__init__(self, self.submissionHost, *args, **kwargs)
0389
0390 self.cacheEnable = cacheEnable
0391 if self.cacheEnable:
0392 self.cache = ([], 0)
0393 self.cacheRefreshInterval = cacheRefreshInterval
0394 self.useCondorHistory = useCondorHistory
0395 self.useCondorHistoryMaxAge = useCondorHistoryMaxAge
0396 tmpLog.debug("Initialize done")
0397
0398 def get_all(self, batchIDs_dict=None, allJobs=False, to_update_cache=False):
0399
0400 tmpLog = core_utils.make_logger(baseLogger, f"submissionHost={self.submissionHost}", method_name="CondorJobQuery.get_all")
0401
0402 tmpLog.debug("Start")
0403 job_ads_all_dict = {}
0404 if self.condor_api_type == "python":
0405 try:
0406 job_ads_all_dict = self.query_with_python(batchIDs_dict, allJobs, to_update_cache)
0407 except Exception as e:
0408 tb_str = traceback.format_exc()
0409 tmpLog.error(f"Exception {e.__class__.__name__}: {e} ; {tb_str}")
0410 raise
0411 else:
0412 job_ads_all_dict = self.query_with_command(batchIDs_dict)
0413 return job_ads_all_dict
0414
0415 def query_with_command(self, batchIDs_dict=None):
0416
0417 tmpLog = core_utils.make_logger(baseLogger, f"submissionHost={self.submissionHost}", method_name="CondorJobQuery.query_with_command")
0418
0419 tmpLog.debug("Start query")
0420 if batchIDs_dict is None:
0421 batchIDs_dict = {}
0422 job_ads_all_dict = {}
0423 batchIDs_set = set(batchIDs_dict.keys())
0424 for orig_comStr in self.orig_comStr_list:
0425
0426 batchIDs_str = " ".join(list(batchIDs_set))
0427
0428 if "condor_q" in orig_comStr or ("condor_history" in orig_comStr and batchIDs_set):
0429 name_opt = f"-name {self.condor_schedd}" if self.condor_schedd else ""
0430 pool_opt = f"-pool {self.condor_pool}" if self.condor_pool else ""
0431 ids = batchIDs_str
0432 comStr = f"{orig_comStr} {name_opt} {pool_opt} {ids}"
0433 else:
0434
0435 continue
0436 tmpLog.debug(f"check with {comStr}")
0437 (retCode, stdOut, stdErr) = _runShell(comStr)
0438 if retCode == 0:
0439
0440 job_ads_xml_str = "\n".join(str(stdOut).split(self.badtext))
0441 if "<c>" in job_ads_xml_str:
0442
0443
0444 xml_root = ET.fromstring(job_ads_xml_str)
0445
0446 def _getAttribute_tuple(attribute_xml_element):
0447
0448 _n = str(attribute_xml_element.get("n"))
0449
0450 _t = " ".join(attribute_xml_element.itertext())
0451 return (_n, _t)
0452
0453
0454 for _c in xml_root.findall("c"):
0455 job_ads_dict = dict()
0456
0457 attribute_iter = map(_getAttribute_tuple, _c.findall("a"))
0458 job_ads_dict.update(attribute_iter)
0459 batchid = get_batchid_from_job(job_ads_dict)
0460 condor_job_id = f"{self.submissionHost}#{batchid}"
0461 job_ads_all_dict[condor_job_id] = job_ads_dict
0462
0463 if batchid in batchIDs_set:
0464 batchIDs_set.discard(batchid)
0465 else:
0466
0467 tmpLog.debug(f"job not found with {comStr}")
0468 continue
0469 else:
0470
0471 errStr = f'command "{comStr}" failed, retCode={retCode}, error: {stdOut} {stdErr}'
0472 tmpLog.error(errStr)
0473 if len(batchIDs_set) > 0:
0474
0475 for batchid in batchIDs_set:
0476 condor_job_id = f"{self.submissionHost}#{batchid}"
0477 job_ads_all_dict[condor_job_id] = dict()
0478 tmpLog.info(f"Unfound batch jobs of submissionHost={self.submissionHost}: {' '.join(list(batchIDs_set))}")
0479
0480 return job_ads_all_dict
0481
0482 @CondorClient.renew_session_and_retry
0483 def query_with_python(self, batchIDs_dict=None, allJobs=False, to_update_cache=False):
0484
0485 tmpLog = core_utils.make_logger(baseLogger, f"submissionHost={self.submissionHost}", method_name="CondorJobQuery.query_with_python")
0486
0487 tmpLog.debug("Start query")
0488 cache_fifo = None
0489 job_ads_all_dict = {}
0490
0491 if batchIDs_dict is None:
0492 batchIDs_dict = {}
0493 batchIDs_set = set(batchIDs_dict.keys())
0494 clusterids_set = set([get_job_id_tuple_from_batchid(batchid)[0] for batchid in batchIDs_dict])
0495
0496
0497 def cache_query(constraint=None, projection=CONDOR_JOB_ADS_LIST, timeout=60):
0498
0499 def update_cache(lockInterval=90):
0500 tmpLog.debug("update_cache")
0501
0502 score = time.time() - self.cacheRefreshInterval + lockInterval
0503 lock_key = cache_fifo.lock(score=score)
0504 if lock_key is not None:
0505
0506 tmpLog.debug("got lock, updating cache")
0507 jobs_iter_orig = self.schedd.query(constraint=constraint, projection=projection)
0508 jobs_iter = []
0509 for job in jobs_iter_orig:
0510 try:
0511 jobs_iter.append(dict(job))
0512 except Exception as e:
0513 tmpLog.error(f"In updating cache schedd query; got exception {e.__class__.__name__}: {e} ; {repr(job)}")
0514 timeNow = time.time()
0515 cache_fifo.put(jobs_iter, timeNow)
0516 self.cache = (jobs_iter, timeNow)
0517
0518 retVal = cache_fifo.unlock(key=lock_key)
0519 if retVal:
0520 tmpLog.debug("done update cache and unlock")
0521 else:
0522 tmpLog.warning("cannot unlock... Maybe something wrong")
0523 return jobs_iter
0524 else:
0525 tmpLog.debug("cache fifo locked by other thread. Skipped")
0526 return None
0527
0528
0529 def cleanup_cache(timeout=60):
0530 tmpLog.debug("cleanup_cache")
0531 id_list = list()
0532 attempt_timestamp = time.time()
0533 n_cleanup = 0
0534 while True:
0535 if time.time() > attempt_timestamp + timeout:
0536 tmpLog.debug("time is up when cleanup cache. Skipped")
0537 break
0538 peeked_tuple = cache_fifo.peek(skip_item=True)
0539 if peeked_tuple is None:
0540 tmpLog.debug("empty cache fifo")
0541 break
0542 elif peeked_tuple.score is not None and time.time() <= peeked_tuple.score + self.cacheRefreshInterval:
0543 tmpLog.debug("nothing expired")
0544 break
0545 elif peeked_tuple.id is not None:
0546 retVal = cache_fifo.delete([peeked_tuple.id])
0547 if isinstance(retVal, int):
0548 n_cleanup += retVal
0549 else:
0550
0551 tmpLog.warning("got nothing when cleanup cache, maybe problematic. Skipped")
0552 break
0553 tmpLog.debug(f"cleaned up {n_cleanup} objects in cache fifo")
0554
0555
0556 jobs_iter = tuple()
0557 try:
0558 attempt_timestamp = time.time()
0559 while True:
0560 if time.time() > attempt_timestamp + timeout:
0561
0562 tmpLog.debug(f"cache_query got timeout ({timeout} seconds). Skipped ")
0563 break
0564
0565 peeked_tuple = cache_fifo.peeklast(skip_item=True)
0566 if peeked_tuple is not None and peeked_tuple.score is not None:
0567
0568 if peeked_tuple.id == cache_fifo.global_lock_id:
0569 if time.time() <= peeked_tuple.score + self.cacheRefreshInterval:
0570
0571 tmpLog.debug("got fifo locked. Wait and retry...")
0572 time.sleep(random.uniform(1, 5))
0573 continue
0574 else:
0575
0576 tmpLog.debug("got lock expired. Clean up and retry...")
0577 cleanup_cache()
0578 continue
0579 elif not to_update_cache and time.time() <= peeked_tuple.score + self.cacheRefreshInterval:
0580
0581 _obj, _last_update = self.cache
0582 if _last_update >= peeked_tuple.score:
0583
0584 tmpLog.debug("valid local cache")
0585 jobs_iter = _obj
0586 else:
0587
0588 tmpLog.debug("update local cache from fifo")
0589 peeked_tuple_with_item = cache_fifo.peeklast()
0590 if (
0591 peeked_tuple_with_item is not None
0592 and peeked_tuple.id != cache_fifo.global_lock_id
0593 and peeked_tuple_with_item.item is not None
0594 ):
0595 jobs_iter = cache_fifo.decode(peeked_tuple_with_item.item)
0596 self.cache = (jobs_iter, peeked_tuple_with_item.score)
0597 else:
0598 tmpLog.debug("peeked invalid cache fifo object. Wait and retry...")
0599 time.sleep(random.uniform(1, 5))
0600 continue
0601 else:
0602
0603 tmpLog.debug("update cache in fifo")
0604 retVal = update_cache()
0605 if retVal is not None:
0606 jobs_iter = retVal
0607 cleanup_cache()
0608 break
0609 else:
0610
0611 if cache_fifo.size() == 0:
0612 if time.time() > attempt_timestamp + random.uniform(10, 30):
0613
0614 tmpLog.debug("waited enough, update cache in fifo")
0615 retVal = update_cache()
0616 if retVal is not None:
0617 jobs_iter = retVal
0618 break
0619 else:
0620
0621 time.sleep(2)
0622 continue
0623 except Exception as _e:
0624 tb_str = traceback.format_exc()
0625 tmpLog.error(f"Error querying from cache fifo; {_e} ; {tb_str}")
0626 return jobs_iter
0627
0628
0629 query_method_list = [self.schedd.query]
0630 if self.cacheEnable:
0631 cache_fifo = CondorQCacheFifo(target=self.submissionHost, id=f"{self.submissionHost},{get_ident()}")
0632 query_method_list.insert(0, cache_query)
0633 if self.useCondorHistory:
0634 query_method_list.append(self.schedd.history)
0635
0636 for query_method in query_method_list:
0637
0638 if query_method.__name__ == self.schedd.history.__name__:
0639 now_time = core_utils.naive_utcnow()
0640 update_time_threshold = now_time - datetime.timedelta(seconds=self.useCondorHistoryMaxAge)
0641 clusterids_set = set()
0642 for batchid, workspec in batchIDs_dict.items():
0643 if batchid in batchIDs_set and (
0644 (workspec.submitTime and workspec.submitTime >= update_time_threshold)
0645 or (workspec.startTime and workspec.startTime >= update_time_threshold)
0646 ):
0647 clusterid = get_job_id_tuple_from_batchid(batchid)[0]
0648 clusterids_set.add(clusterid)
0649
0650 if not clusterids_set:
0651 break
0652
0653 clusterids_str = ",".join(list(clusterids_set))
0654 if query_method is cache_query or allJobs:
0655 constraint = f'harvesterID =?= "{harvesterID}"'
0656 else:
0657 constraint = f"member(ClusterID, {{{clusterids_str}}})"
0658 if allJobs:
0659 tmpLog.debug(f"Query method: {query_method.__name__} ; allJobs")
0660 else:
0661 tmpLog.debug(f'Query method: {query_method.__name__} ; clusterids: "{clusterids_str}"')
0662
0663 jobs_iter = query_method(constraint=constraint, projection=CONDOR_JOB_ADS_LIST)
0664 for job in jobs_iter:
0665 try:
0666 job_ads_dict = dict(job)
0667 except Exception as e:
0668 tmpLog.error(f"In doing schedd query or history; got exception {e.__class__.__name__}: {e} ; {repr(job)}")
0669 batchid = get_batchid_from_job(job_ads_dict)
0670 condor_job_id = f"{self.submissionHost}#{batchid}"
0671 job_ads_all_dict[condor_job_id] = job_ads_dict
0672
0673 if not allJobs:
0674 batchIDs_set.discard(batchid)
0675 if len(batchIDs_set) == 0 or allJobs:
0676 break
0677
0678 if not allJobs and len(batchIDs_set) > 0:
0679
0680 for batchid in batchIDs_set:
0681 condor_job_id = f"{self.submissionHost}#{batchid}"
0682 job_ads_all_dict[condor_job_id] = dict()
0683 tmpLog.info(f"Unfound batch jobs of submissionHost={self.submissionHost}: {' '.join(list(batchIDs_set))}")
0684
0685 return job_ads_all_dict
0686
0687
0688
0689 class CondorJobSubmit(CondorClient, metaclass=SingletonWithID):
0690
0691 classLock = threading.Lock()
0692
0693 def __init__(self, *args, **kwargs):
0694 self.submissionHost = str(kwargs.get("id"))
0695
0696 tmpLog = core_utils.make_logger(
0697 baseLogger, f"submissionHost={self.submissionHost} thrid={get_ident()} oid={id(self)}", method_name="CondorJobSubmit.__init__"
0698 )
0699
0700 tmpLog.debug("Start")
0701 self.lock = threading.Lock()
0702 CondorClient.__init__(self, self.submissionHost, *args, **kwargs)
0703 tmpLog.debug("Initialize done")
0704
0705 def submit(self, jdl_list, use_spool=False):
0706
0707 tmpLog = core_utils.make_logger(baseLogger, f"submissionHost={self.submissionHost}", method_name="CondorJobSubmit.submit")
0708
0709 tmpLog.debug("Start")
0710 job_ads_all_dict = {}
0711 if self.condor_api_type == "python":
0712 try:
0713
0714
0715
0716
0717
0718 retVal = self.submit_with_command(jdl_list, use_spool)
0719 except Exception as e:
0720 tmpLog.error(f"Exception {e.__class__.__name__}: {e}")
0721 raise
0722 else:
0723 retVal = self.submit_with_command(jdl_list, use_spool)
0724 return retVal
0725
0726 def submit_with_command(self, jdl_list, use_spool=False, tmp_str="", keep_temp_sdf=False):
0727
0728 tmpLog = core_utils.make_logger(baseLogger, f"submissionHost={self.submissionHost}", method_name="CondorJobSubmit.submit_with_command")
0729
0730 errStr = ""
0731 batchIDs_list = []
0732
0733 tmpFile = tempfile.NamedTemporaryFile(mode="w", delete=(not keep_temp_sdf), suffix=f"_{tmp_str}_cluster_submit.sdf")
0734 sdf_file = tmpFile.name
0735 tmpFile.write("\n\n".join(jdl_list))
0736 tmpFile.flush()
0737
0738 name_opt = f"-name {self.condor_schedd}" if self.condor_schedd else ""
0739 pool_opt = f"-pool {self.condor_pool}" if self.condor_pool else ""
0740 spool_opt = "-remote -spool" if use_spool and self.condor_schedd else ""
0741
0742 comStr = f"condor_submit -single-cluster {spool_opt} {name_opt} {pool_opt} {sdf_file}"
0743
0744 tmpLog.debug(f"submit with command: {comStr}")
0745 try:
0746 p = subprocess.Popen(comStr.split(), shell=False, universal_newlines=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
0747
0748 stdOut, stdErr = p.communicate()
0749 retCode = p.returncode
0750 except Exception as e:
0751 stdOut = ""
0752 stdErr = core_utils.dump_error_message(tmpLog, no_message=True)
0753 retCode = 1
0754 errStr = f"{e.__class__.__name__}: {e}"
0755 finally:
0756 tmpFile.close()
0757 tmpLog.debug(f"retCode={retCode}")
0758 if retCode == 0:
0759
0760 job_id_match = None
0761 for tmp_line_str in stdOut.split("\n"):
0762 job_id_match = re.search("^(\d+) job[(]s[)] submitted to cluster (\d+)\.$", tmp_line_str)
0763 if job_id_match:
0764 break
0765 if job_id_match is not None:
0766 n_jobs = int(job_id_match.group(1))
0767 clusterid = job_id_match.group(2)
0768 batchIDs_list = [f"{clusterid}.{procid}" for procid in range(n_jobs)]
0769 tmpLog.debug(f"submitted {n_jobs} jobs: {' '.join(batchIDs_list)}")
0770 else:
0771 errStr = f"no job submitted: {errStr}"
0772 tmpLog.error(errStr)
0773 else:
0774 errStr = f"{stdErr} ; {errStr}"
0775 tmpLog.error(f"submission failed: {errStr}")
0776
0777 return (batchIDs_list, errStr)
0778
0779 @CondorClient.renew_session_and_retry
0780 def submit_with_python(self, jdl_list, use_spool=False):
0781
0782 tmpLog = core_utils.make_logger(baseLogger, f"submissionHost={self.submissionHost}", method_name="CondorJobSubmit.submit_with_python")
0783
0784 tmpLog.debug("Start")
0785
0786 errStr = ""
0787 batchIDs_list = []
0788
0789 jdl_map_list = [dict(htcondor.Submit(jdl).items()) for jdl in jdl_list]
0790
0791 submit_obj = htcondor.Submit()
0792 try:
0793 with self.schedd.transaction() as txn:
0794
0795 submit_result = submit_obj.queue_with_itemdata(txn, 1, iter(jdl_map_list))
0796 clusterid = submit_result.cluster()
0797 first_proc = submit_result.first_proc()
0798 num_proc = submit_result.num_procs()
0799 batchIDs_list.extend([f"{clusterid}.{procid}" for procid in range(first_proc, first_proc + num_proc)])
0800 except RuntimeError as e:
0801 errStr = f"{e.__class__.__name__}: {e}"
0802 tmpLog.error(f"submission failed: {errStr}")
0803 raise
0804 if batchIDs_list:
0805 n_jobs = len(batchIDs_list)
0806 tmpLog.debug(f"submitted {n_jobs} jobs: {' '.join(batchIDs_list)}")
0807 elif not errStr:
0808 tmpLog.error("submitted nothing")
0809 tmpLog.debug("Done")
0810
0811 return (batchIDs_list, errStr)
0812
0813 def submit_with_python_process(self, jdl_list, use_spool=False):
0814
0815 tmpLog = core_utils.make_logger(baseLogger, f"submissionHost={self.submissionHost}", method_name="CondorJobSubmit.submit_with_python_process")
0816
0817 tmpLog.debug("Start")
0818
0819 jdl_map_list = [dict(htcondor.Submit(jdl).items()) for jdl in jdl_list]
0820
0821 mp_queue = multiprocessing.Queue()
0822 mp_process = multiprocessing.Process(target=condor_submit_process, args=(mp_queue, self.submissionHost, jdl_map_list, tmpLog))
0823 mp_process.daemon = True
0824 mp_process.start()
0825 (batchIDs_list, errStr) = mp_queue.get()
0826 mp_queue.close()
0827 mp_process.terminate()
0828 mp_process.join()
0829 if batchIDs_list:
0830 n_jobs = len(batchIDs_list)
0831 tmpLog.debug(f"submitted {n_jobs} jobs: {' '.join(batchIDs_list)}")
0832 elif not errStr:
0833 tmpLog.error("submitted nothing")
0834 tmpLog.debug("Done")
0835
0836 return (batchIDs_list, errStr)
0837
0838
0839
0840 class CondorJobManage(CondorClient, metaclass=SingletonWithID):
0841
0842 classLock = threading.Lock()
0843
0844 def __init__(self, *args, **kwargs):
0845 self.submissionHost = str(kwargs.get("id"))
0846
0847 tmpLog = core_utils.make_logger(
0848 baseLogger, f"submissionHost={self.submissionHost} thrid={get_ident()} oid={id(self)}", method_name="CondorJobManage.__init__"
0849 )
0850
0851 tmpLog.debug("Start")
0852 self.lock = threading.Lock()
0853 CondorClient.__init__(self, self.submissionHost, *args, **kwargs)
0854 tmpLog.debug("Initialize done")
0855
0856 def remove(self, batchIDs_list=[]):
0857
0858 tmpLog = core_utils.make_logger(baseLogger, f"submissionHost={self.submissionHost}", method_name="CondorJobManage.remove")
0859
0860 tmpLog.debug("Start")
0861 job_ads_all_dict = {}
0862 if self.condor_api_type == "python":
0863 try:
0864 retVal = self.remove_with_python(batchIDs_list)
0865 except Exception as e:
0866 tmpLog.error(f"Exception {e.__class__.__name__}: {e}")
0867 raise
0868 else:
0869 retVal = self.remove_with_command(batchIDs_list)
0870 return retVal
0871
0872 def remove_with_command(self, batchIDs_list=[]):
0873
0874 tmpLog = core_utils.make_logger(baseLogger, f"submissionHost={self.submissionHost}", method_name="CondorJobManage.remove_with_command")
0875
0876
0877
0878
0879
0880
0881
0882
0883
0884
0885
0886
0887
0888
0889
0890
0891
0892
0893
0894
0895
0896
0897
0898
0899
0900
0901
0902
0903
0904
0905
0906
0907
0908
0909
0910
0911
0912
0913
0914
0915
0916
0917
0918
0919
0920
0921
0922 raise NotImplementedError
0923
0924 @CondorClient.renew_session_and_retry
0925 def remove_with_python(self, batchIDs_list=[]):
0926
0927 tmpLog = core_utils.make_logger(baseLogger, f"submissionHost={self.submissionHost}", method_name="CondorJobManage.remove_with_python")
0928
0929 tmpLog.debug("Start")
0930
0931 with self.classLock:
0932 tmpLog.debug("Got class lock")
0933
0934 ret_list = []
0935 retMap = {}
0936
0937 n_jobs = len(batchIDs_list)
0938 act_ret = self.schedd.act(htcondor.JobAction.Remove, batchIDs_list)
0939
0940 is_all_clear = n_jobs == act_ret["TotalAlreadyDone"] + act_ret["TotalNotFound"] + act_ret["TotalSuccess"]
0941 if act_ret and is_all_clear:
0942 tmpLog.debug(f"removed {n_jobs} jobs: {','.join(batchIDs_list)}")
0943 for batchid in batchIDs_list:
0944 condor_job_id = f"{self.submissionHost}#{batchid}"
0945 retMap[condor_job_id] = (True, "")
0946 else:
0947 tmpLog.error(f"job removal failed; batchIDs_list={batchIDs_list}, got: {act_ret}")
0948
0949 clusterids_set = set([get_job_id_tuple_from_batchid(batchid)[0] for batchid in batchIDs_list])
0950 clusterids_str = ",".join(list(clusterids_set))
0951 constraint = f"member(ClusterID, {{{clusterids_str}}}) && JobStatus =!= 3 && JobStatus =!= 4"
0952 jobs_iter = self.schedd.query(constraint=constraint, projection=CONDOR_JOB_ADS_LIST)
0953 all_batchid_map = {}
0954 ok_batchid_list = []
0955 ng_batchid_list = []
0956 for job in jobs_iter:
0957 job_ads_dict = dict(job)
0958 batchid = get_batchid_from_job(job_ads_dict)
0959 all_batchid_map[batchid] = job_ads_dict
0960 for batchid in batchIDs_list:
0961 condor_job_id = f"{self.submissionHost}#{batchid}"
0962 if batchid in all_batchid_map:
0963 ng_batchid_list.append(batchid)
0964 retMap[condor_job_id] = (False, f"batchID={batchid} still unterminated in condor queue")
0965 else:
0966 ok_batchid_list.append(batchid)
0967 retMap[condor_job_id] = (True, "")
0968 tmpLog.debug(
0969 "removed {0} jobs: {1} ; failed to remove {2} jobs: {3}".format(
0970 len(ok_batchid_list), ",".join(ok_batchid_list), len(ng_batchid_list), ",".join(ng_batchid_list)
0971 )
0972 )
0973 tmpLog.debug("Done")
0974
0975 return retMap
0976
0977
0978