File indexing completed on 2026-04-10 08:39:02
0001 """
0002 dispatch jobs
0003
0004 """
0005
0006 import datetime
0007 import json
0008 import os
0009 import re
0010 import socket
0011 import sys
0012 import threading
0013 import time
0014 import traceback
0015 from threading import Lock
0016
0017 from pandacommon.pandalogger.LogWrapper import LogWrapper
0018 from pandacommon.pandalogger.PandaLogger import PandaLogger
0019 from pandacommon.pandautils.PandaUtils import naive_utcnow
0020
0021 from pandaserver.brokerage.SiteMapper import SiteMapper
0022 from pandaserver.config import panda_config
0023 from pandaserver.dataservice.adder_gen import AdderGen
0024 from pandaserver.jobdispatcher import Protocol
0025 from pandaserver.proxycache import panda_proxy_cache, token_cache
0026 from pandaserver.srvcore import CoreUtils
0027
0028
0029 _logger = PandaLogger().getLogger("JobDispatcher")
0030 _pilotReqLogger = PandaLogger().getLogger("PilotRequests")
0031
0032
0033
0034 class _TimedMethod:
0035 def __init__(self, method, timeout):
0036 self.method = method
0037 self.timeout = timeout
0038 self.result = Protocol.TimeOutToken
0039
0040
0041 def __call__(self, *var):
0042 self.result = self.method(*var)
0043
0044
0045 def run(self, *var):
0046 thr = threading.Thread(target=self, args=var)
0047 thr.start()
0048 thr.join()
0049
0050
0051
0052 class JobDispatcher:
0053
0054 def __init__(self):
0055
0056 self.taskBuffer = None
0057
0058 self.lastUpdated = naive_utcnow()
0059
0060 self.timeInterval = datetime.timedelta(seconds=180)
0061
0062 self.specialDispatchParams = None
0063
0064 self.siteMapperCache = None
0065
0066 self.lock = Lock()
0067
0068 self.proxy_cacher = panda_proxy_cache.MyProxyInterface()
0069
0070 self.token_cacher = token_cache.TokenCache()
0071
0072 try:
0073 with open(panda_config.token_cache_config) as f:
0074 self.token_cache_config = json.load(f)
0075 except Exception:
0076 self.token_cache_config = {}
0077
0078
0079 def init(self, taskBuffer):
0080
0081 self.lock.acquire()
0082
0083 if self.taskBuffer is None:
0084 self.taskBuffer = taskBuffer
0085
0086 if self.specialDispatchParams is None:
0087 self.specialDispatchParams = CoreUtils.CachedObject("dispatcher_params", 60 * 10, self.get_special_dispatch_params, _logger)
0088
0089 if self.siteMapperCache is None:
0090 self.siteMapperCache = CoreUtils.CachedObject("site_mapper", 60 * 10, self.getSiteMapper, _logger)
0091
0092 self.lock.release()
0093
0094
0095 def get_special_dispatch_params(self):
0096 """
0097 Wrapper function around taskBuffer.get_special_dispatch_params to convert list to set since task buffer cannot return set
0098 """
0099 param = self.taskBuffer.get_special_dispatch_params()
0100 for client_name in param["tokenKeys"]:
0101 param["tokenKeys"][client_name]["fullList"] = set(param["tokenKeys"][client_name]["fullList"])
0102 return True, param
0103
0104
0105 def set_user_proxy(self, response, distinguished_name=None, role=None, tokenized=False) -> tuple[bool, str]:
0106 """
0107 Set user proxy to the response
0108
0109 :param response: response object
0110 :param distinguished_name: the distinguished name of the user
0111 :param role: the role of the user
0112 :param tokenized: whether the response should contain a token instead of a proxy
0113
0114 :return: a tuple containing a boolean indicating success and a message
0115 """
0116 try:
0117 if distinguished_name is None:
0118 distinguished_name = response.data["prodUserID"]
0119
0120 distinguished_name = CoreUtils.get_bare_dn(distinguished_name, keep_digits=False)
0121 if not tokenized:
0122
0123 output = self.proxy_cacher.retrieve(distinguished_name, role=role)
0124 else:
0125
0126 output = self.token_cacher.get_access_token(distinguished_name)
0127
0128 if output is None:
0129 tmp_msg = f"""{"token" if tokenized else "proxy"} not found for {distinguished_name}"""
0130 response.appendNode("errorDialog", tmp_msg)
0131 return False, tmp_msg
0132
0133 response.appendNode("userProxy", output)
0134 return True, ""
0135 except Exception as e:
0136 tmp_msg = f"""{"token" if tokenized else "proxy"} retrieval failed with {str(e)}"""
0137 response.appendNode("errorDialog", tmp_msg)
0138 return False, tmp_msg
0139
0140
0141 def getJob(
0142 self,
0143 siteName,
0144 prodSourceLabel,
0145 mem,
0146 diskSpace,
0147 node,
0148 timeout,
0149 computingElement,
0150 prodUserID,
0151 getProxyKey,
0152 realDN,
0153 taskID,
0154 nJobs,
0155 acceptJson,
0156 background,
0157 resourceType,
0158 harvester_id,
0159 worker_id,
0160 schedulerID,
0161 jobType,
0162 via_topic,
0163 remaining_time,
0164 tmpLog,
0165 ):
0166 t_getJob_start = time.time()
0167 jobs = []
0168 try:
0169 tmpNumJobs = int(nJobs)
0170 except Exception:
0171 tmpNumJobs = None
0172 if tmpNumJobs is None:
0173 tmpNumJobs = 1
0174
0175 self.siteMapperCache.update()
0176 is_gu = self.siteMapperCache.cachedObj.getSite(siteName).is_grandly_unified()
0177 in_test = self.siteMapperCache.cachedObj.getSite(siteName).status == "test"
0178
0179
0180 if in_test and prodSourceLabel in ["user", "managed", "unified"]:
0181 new_label = "test"
0182 tmpLog.debug(f"prodSourceLabel changed {prodSourceLabel} -> {new_label}")
0183 prodSourceLabel = new_label
0184
0185
0186 tmpWrapper = _TimedMethod(self.taskBuffer.getJobs, timeout)
0187 tmpWrapper.run(
0188 tmpNumJobs,
0189 siteName,
0190 prodSourceLabel,
0191 mem,
0192 diskSpace,
0193 node,
0194 timeout,
0195 computingElement,
0196 prodUserID,
0197 taskID,
0198 background,
0199 resourceType,
0200 harvester_id,
0201 worker_id,
0202 schedulerID,
0203 jobType,
0204 is_gu,
0205 via_topic,
0206 remaining_time,
0207 )
0208
0209 if isinstance(tmpWrapper.result, list):
0210 jobs = jobs + tmpWrapper.result
0211
0212 secrets_map = {}
0213 if len(jobs) > 0:
0214 secrets_map = jobs.pop()
0215 proxyKey = jobs[-1]
0216 nSent = jobs[-2]
0217 jobs = jobs[:-2]
0218 if len(jobs) != 0:
0219
0220 responseList = []
0221
0222 for tmpJob in jobs:
0223 try:
0224 response = Protocol.Response(Protocol.SC_Success)
0225 response.appendJob(tmpJob, self.siteMapperCache)
0226 except Exception as e:
0227 tmpMsg = f"failed to get jobs with {str(e)}"
0228 tmpLog.error(tmpMsg + "\n" + traceback.format_exc())
0229 raise
0230
0231
0232 response.appendNode("nSent", nSent)
0233
0234 if getProxyKey:
0235 response.setProxyKey(proxyKey)
0236
0237 if tmpJob.use_secrets() and tmpJob.prodUserName in secrets_map and secrets_map[tmpJob.prodUserName]:
0238 response.appendNode("secrets", secrets_map[tmpJob.prodUserName])
0239 if panda_config.pilot_secrets in secrets_map and secrets_map[panda_config.pilot_secrets]:
0240 response.appendNode("pilotSecrets", secrets_map[panda_config.pilot_secrets])
0241
0242 responseList.append(response.data)
0243
0244 if nJobs is not None:
0245 try:
0246 response = Protocol.Response(Protocol.SC_Success)
0247 if not acceptJson:
0248 response.appendNode("jobs", json.dumps(responseList))
0249 else:
0250 response.appendNode("jobs", responseList)
0251 except Exception as e:
0252 tmpMsg = f"failed to make response with {str(e)}"
0253 tmpLog.error(tmpMsg + "\n" + traceback.format_exc())
0254 raise
0255
0256 else:
0257 if tmpWrapper.result == Protocol.TimeOutToken:
0258
0259 if acceptJson:
0260 response = Protocol.Response(Protocol.SC_TimeOut, "database timeout")
0261 else:
0262 response = Protocol.Response(Protocol.SC_TimeOut)
0263 else:
0264
0265 if acceptJson:
0266 response = Protocol.Response(Protocol.SC_NoJobs, "no jobs in PanDA")
0267 else:
0268 response = Protocol.Response(Protocol.SC_NoJobs)
0269 _pilotReqLogger.info(f"method=noJob,site={siteName},node={node},type={prodSourceLabel}")
0270
0271 tmpLog.debug(f"{siteName} {node} ret -> {response.encode(acceptJson)}")
0272
0273 t_getJob_end = time.time()
0274 t_getJob_spent = t_getJob_end - t_getJob_start
0275 tmpLog.info(f"siteName={siteName} took timing={t_getJob_spent}s in_test={in_test}")
0276 return response.encode(acceptJson)
0277
0278
0279 def updateJob(
0280 self,
0281 jobID,
0282 jobStatus,
0283 timeout,
0284 xml,
0285 siteName,
0286 param,
0287 metadata,
0288 pilotLog,
0289 attemptNr=None,
0290 stdout="",
0291 acceptJson=False,
0292 ):
0293 tmp_logger = LogWrapper(_logger, f"updateJob {jobID}")
0294
0295
0296 if pilotLog != "":
0297 tmp_logger.debug("saving pilot log")
0298 try:
0299 self.taskBuffer.storePilotLog(int(jobID), pilotLog)
0300 tmp_logger.debug("saving pilot log DONE")
0301 except Exception:
0302 tmp_logger.debug("saving pilot log FAILED")
0303
0304
0305 if metadata != "":
0306 ret = self.taskBuffer.addMetadata([jobID], [metadata], [jobStatus])
0307 if len(ret) > 0 and not ret[0]:
0308 tmp_logger.debug(f"failed to add metadata")
0309
0310 response = Protocol.Response(Protocol.SC_Success)
0311 return response.encode(acceptJson)
0312
0313
0314 if stdout != "":
0315 self.taskBuffer.addStdOut(jobID, stdout)
0316
0317
0318 tmpStatus = jobStatus
0319 updateStateChange = False
0320 if jobStatus == "failed" or jobStatus == "finished":
0321 tmpStatus = "holding"
0322
0323 updateStateChange = True
0324 param["jobDispatcherErrorDiag"] = None
0325 elif jobStatus in ["holding", "transferring"]:
0326 param["jobDispatcherErrorDiag"] = f"set to {jobStatus} by the pilot at {naive_utcnow().strftime('%Y-%m-%d %H:%M:%S')}"
0327 if tmpStatus == "holding":
0328 tmpWrapper = _TimedMethod(self.taskBuffer.updateJobStatus, None)
0329 else:
0330 tmpWrapper = _TimedMethod(self.taskBuffer.updateJobStatus, timeout)
0331 tmpWrapper.run(jobID, tmpStatus, param, updateStateChange, attemptNr)
0332
0333
0334 if tmpWrapper.result == Protocol.TimeOutToken:
0335
0336 response = Protocol.Response(Protocol.SC_TimeOut)
0337 else:
0338 if tmpWrapper.result:
0339
0340 response = Protocol.Response(Protocol.SC_Success)
0341 result = tmpWrapper.result
0342 secrets = None
0343 if isinstance(result, dict):
0344 if "secrets" in result:
0345 secrets = result["secrets"]
0346 result = result["command"]
0347
0348 if isinstance(result, str):
0349 response.appendNode("command", result)
0350 if secrets:
0351 response.appendNode("pilotSecrets", secrets)
0352 else:
0353 response.appendNode("command", "NULL")
0354
0355
0356 if result not in ["badattemptnr", "alreadydone"] and (jobStatus == "failed" or jobStatus == "finished"):
0357 adder_gen = AdderGen(self.taskBuffer, jobID, jobStatus, attemptNr)
0358 adder_gen.dump_file_report(xml, attemptNr)
0359 del adder_gen
0360 else:
0361 response = Protocol.Response(Protocol.SC_Failed)
0362
0363 tmp_logger.debug(f"ret -> {response.encode(acceptJson)}")
0364 return response.encode(acceptJson)
0365
0366
0367 def getStatus(self, strIDs, timeout):
0368
0369 ids = strIDs.split()
0370
0371 tmpWrapper = _TimedMethod(self.taskBuffer.peekJobs, timeout)
0372 tmpWrapper.run(ids, False, True, True, False)
0373
0374 if tmpWrapper.result == Protocol.TimeOutToken:
0375
0376 response = Protocol.Response(Protocol.SC_TimeOut)
0377 else:
0378 if isinstance(tmpWrapper.result, list):
0379
0380 response = Protocol.Response(Protocol.SC_Success)
0381
0382 retStr = ""
0383 attStr = ""
0384 for job in tmpWrapper.result:
0385 if job is None:
0386 retStr += f"notfound+"
0387 attStr += "0+"
0388 else:
0389 retStr += f"{job.jobStatus}+"
0390 attStr += f"{job.attemptNr}+"
0391 response.appendNode("status", retStr[:-1])
0392 response.appendNode("attemptNr", attStr[:-1])
0393 else:
0394
0395 response = Protocol.Response(Protocol.SC_Failed)
0396 _logger.debug(f"getStatus : {strIDs} ret -> {response.encode()}")
0397 return response.encode()
0398
0399
0400 def checkJobStatus(self, pandaIDs, timeout):
0401 tmpWrapper = _TimedMethod(self.taskBuffer.checkJobStatus, timeout)
0402 tmpWrapper.run(pandaIDs)
0403
0404 if tmpWrapper.result == Protocol.TimeOutToken:
0405
0406 response = Protocol.Response(Protocol.SC_TimeOut)
0407 else:
0408 if isinstance(tmpWrapper.result, list):
0409
0410 response = Protocol.Response(Protocol.SC_Success)
0411 response.appendNode("data", tmpWrapper.result)
0412 else:
0413
0414 response = Protocol.Response(Protocol.SC_Failed)
0415 _logger.debug(f"checkJobStatus : {pandaIDs} ret -> {response.encode(True)}")
0416 return response.encode(True)
0417
0418
0419 def getEventRanges(
0420 self,
0421 pandaID,
0422 jobsetID,
0423 jediTaskID,
0424 nRanges,
0425 timeout,
0426 acceptJson,
0427 scattered,
0428 segment_id,
0429 ):
0430 tmpWrapper = _TimedMethod(self.taskBuffer.getEventRanges, timeout)
0431 tmpWrapper.run(pandaID, jobsetID, jediTaskID, nRanges, acceptJson, scattered, segment_id)
0432
0433 if tmpWrapper.result == Protocol.TimeOutToken:
0434
0435 response = Protocol.Response(Protocol.SC_TimeOut)
0436 else:
0437 if tmpWrapper.result is not None:
0438
0439 response = Protocol.Response(Protocol.SC_Success)
0440
0441 response.appendNode("eventRanges", tmpWrapper.result)
0442 else:
0443
0444 response = Protocol.Response(Protocol.SC_Failed)
0445 _logger.debug(f"getEventRanges : {pandaID} ret -> {response.encode(acceptJson)}")
0446 return response.encode(acceptJson)
0447
0448
0449 def updateEventRange(
0450 self,
0451 eventRangeID,
0452 eventStatus,
0453 coreCount,
0454 cpuConsumptionTime,
0455 objstoreID,
0456 timeout,
0457 ):
0458 tmpWrapper = _TimedMethod(self.taskBuffer.updateEventRange, timeout)
0459 tmpWrapper.run(eventRangeID, eventStatus, coreCount, cpuConsumptionTime, objstoreID)
0460
0461 _logger.debug(str(tmpWrapper.result))
0462 if tmpWrapper.result == Protocol.TimeOutToken:
0463
0464 response = Protocol.Response(Protocol.SC_TimeOut)
0465 else:
0466 if tmpWrapper.result[0] is True:
0467
0468 response = Protocol.Response(Protocol.SC_Success)
0469 response.appendNode("Command", tmpWrapper.result[1])
0470 else:
0471
0472 response = Protocol.Response(Protocol.SC_Failed)
0473 _logger.debug(f"updateEventRange : {eventRangeID} ret -> {response.encode()}")
0474 return response.encode()
0475
0476
0477 def updateEventRanges(self, eventRanges, timeout, acceptJson, version):
0478 tmpWrapper = _TimedMethod(self.taskBuffer.updateEventRanges, timeout)
0479 tmpWrapper.run(eventRanges, version)
0480
0481 if tmpWrapper.result == Protocol.TimeOutToken:
0482
0483 response = Protocol.Response(Protocol.SC_TimeOut)
0484 else:
0485
0486 response = Protocol.Response(Protocol.SC_Success)
0487
0488 response.appendNode("Returns", tmpWrapper.result[0])
0489 response.appendNode("Command", tmpWrapper.result[1])
0490 _logger.debug(f"updateEventRanges : ret -> {response.encode(acceptJson)}")
0491 return response.encode(acceptJson)
0492
0493
0494 def checkEventsAvailability(self, pandaID, jobsetID, jediTaskID, timeout):
0495 tmpWrapper = _TimedMethod(self.taskBuffer.checkEventsAvailability, timeout)
0496 tmpWrapper.run(pandaID, jobsetID, jediTaskID)
0497
0498 if tmpWrapper.result == Protocol.TimeOutToken:
0499
0500 response = Protocol.Response(Protocol.SC_TimeOut)
0501 else:
0502 if tmpWrapper.result is not None:
0503
0504 response = Protocol.Response(Protocol.SC_Success)
0505
0506 response.appendNode("nEventRanges", tmpWrapper.result)
0507 else:
0508
0509 response = Protocol.Response(Protocol.SC_Failed)
0510 _logger.debug(f"checkEventsAvailability : {pandaID} ret -> {response.encode(True)}")
0511 return response.encode(True)
0512
0513
0514 def getKeyPair(self, realDN, publicKeyName, privateKeyName, acceptJson):
0515 tmpMsg = f"getKeyPair {publicKeyName}/{privateKeyName} json={acceptJson}: "
0516 if realDN is None:
0517
0518 tmpMsg += "failed since DN cannot be extracted"
0519 _logger.debug(tmpMsg)
0520 response = Protocol.Response(Protocol.SC_Perms, "Cannot extract DN from proxy. not HTTPS?")
0521 else:
0522
0523 compactDN = CoreUtils.clean_user_id(realDN)
0524
0525 self.specialDispatchParams.update()
0526 allowKey = self.specialDispatchParams.get("allowKeyPair", [])
0527 if compactDN not in allowKey:
0528
0529 tmpMsg += f"failed since '{compactDN}' not authorized with 'k' in {panda_config.schemaMETA}.USERS.GRIDPREF"
0530 _logger.debug(tmpMsg)
0531 response = Protocol.Response(Protocol.SC_Perms, tmpMsg)
0532 else:
0533
0534 if "keyPair" not in self.specialDispatchParams:
0535 keyPair = {}
0536 else:
0537 keyPair = self.specialDispatchParams["keyPair"]
0538 notFound = False
0539 if publicKeyName not in keyPair:
0540
0541 notFound = True
0542 tmpMsg += f"failed for '{compactDN}' since {publicKeyName} is missing on {socket.getfqdn()}"
0543 elif privateKeyName not in keyPair:
0544
0545 notFound = True
0546 tmpMsg += f"failed for '{compactDN}' since {privateKeyName} is missing on {socket.getfqdn()}"
0547 if notFound:
0548
0549 _logger.debug(tmpMsg)
0550 response = Protocol.Response(Protocol.SC_MissKey, tmpMsg)
0551 else:
0552
0553 response = Protocol.Response(Protocol.SC_Success)
0554 response.appendNode("publicKey", keyPair[publicKeyName])
0555 response.appendNode("privateKey", keyPair[privateKeyName])
0556 tmpMsg += f"sent key-pair to '{compactDN}'"
0557 _logger.debug(tmpMsg)
0558
0559 return response.encode(acceptJson)
0560
0561
0562 def get_token_key(self, distinguished_name, client_name, accept_json):
0563 tmp_log = LogWrapper(_logger, f"get_token_key client={client_name} PID={os.getpid()}")
0564 if distinguished_name is None:
0565
0566 tmp_msg = "failed since DN cannot be extracted. non-HTTPS?"
0567 tmp_log.debug(tmp_msg)
0568 response = Protocol.Response(Protocol.SC_Perms, tmp_msg)
0569 else:
0570
0571 compact_name = CoreUtils.clean_user_id(distinguished_name)
0572
0573 self.specialDispatchParams.update()
0574 allowed_users = self.specialDispatchParams.get("allowTokenKey", [])
0575 if compact_name not in allowed_users:
0576
0577 tmp_msg = f"denied since '{compact_name}' not authorized with 't' in {panda_config.schemaMETA}.USERS.GRIDPREF"
0578 tmp_log.debug(tmp_msg)
0579 response = Protocol.Response(Protocol.SC_Perms, tmp_msg)
0580 else:
0581
0582 if client_name not in self.specialDispatchParams["tokenKeys"]:
0583
0584 tmp_msg = f"token key is missing for '{client_name}'"
0585 tmp_log.debug(tmp_msg)
0586 response = Protocol.Response(Protocol.SC_MissKey, tmp_msg)
0587 else:
0588
0589 response = Protocol.Response(Protocol.SC_Success)
0590 response.appendNode("tokenKey", self.specialDispatchParams["tokenKeys"][client_name]["latest"])
0591 tmp_msg = f"sent token key to '{compact_name}'"
0592 tmp_log.debug(tmp_msg)
0593
0594 return response.encode(accept_json)
0595
0596
0597 def getSiteMapper(self):
0598 return True, SiteMapper(self.taskBuffer)
0599
0600 def getCommands(self, harvester_id, n_commands, timeout, accept_json):
0601 """
0602 Get commands for a particular harvester instance
0603 """
0604 tmp_wrapper = _TimedMethod(self.taskBuffer.getCommands, timeout)
0605 tmp_wrapper.run(harvester_id, n_commands)
0606
0607
0608 if tmp_wrapper.result == Protocol.TimeOutToken:
0609
0610 response = Protocol.Response(Protocol.SC_TimeOut)
0611 else:
0612
0613 response = Protocol.Response(Protocol.SC_Success)
0614 response.appendNode("Returns", tmp_wrapper.result[0])
0615 response.appendNode("Commands", tmp_wrapper.result[1])
0616
0617 _logger.debug(f"getCommands : ret -> {response.encode(accept_json)}")
0618 return response.encode(accept_json)
0619
0620 def ackCommands(self, command_ids, timeout, accept_json):
0621 """
0622 Acknowledge the commands from a list of IDs
0623 """
0624 _logger.debug(f"command_ids : {command_ids}")
0625 tmp_wrapper = _TimedMethod(self.taskBuffer.ackCommands, timeout)
0626 tmp_wrapper.run(command_ids)
0627
0628
0629 if tmp_wrapper.result == Protocol.TimeOutToken:
0630
0631 response = Protocol.Response(Protocol.SC_TimeOut)
0632 else:
0633
0634 response = Protocol.Response(Protocol.SC_Success)
0635 response.appendNode("Returns", tmp_wrapper.result)
0636
0637 _logger.debug(f"ackCommands : ret -> {response.encode(accept_json)}")
0638 return response.encode(accept_json)
0639
0640 def getResourceTypes(self, timeout, accept_json):
0641 """
0642 Get resource types (SCORE, MCORE, SCORE_HIMEM, MCORE_HIMEM) and their definitions
0643 """
0644 tmp_wrapper = _TimedMethod(self.taskBuffer.getResourceTypes, timeout)
0645 tmp_wrapper.run()
0646
0647
0648 if tmp_wrapper.result == Protocol.TimeOutToken:
0649
0650 response = Protocol.Response(Protocol.SC_TimeOut)
0651 else:
0652
0653 response = Protocol.Response(Protocol.SC_Success)
0654 response.appendNode("Returns", 0)
0655 response.appendNode("ResourceTypes", tmp_wrapper.result)
0656
0657 _logger.debug(f"getResourceTypes : ret -> {response.encode(accept_json)}")
0658 return response.encode(accept_json)
0659
0660
0661 def get_proxy(self, real_distinguished_name: str, role: str | None, target_distinguished_name: str | None, tokenized: bool, token_key: str | None) -> dict:
0662 """
0663 Get proxy for a user with a role
0664
0665 :param real_distinguished_name: actual distinguished name of the user
0666 :param role: role of the user
0667 :param target_distinguished_name: target distinguished name if the user wants to get proxy for someone else.
0668 This is one of client_name defined in token_cache_config when getting a token
0669 :param tokenized: whether the response should contain a token instead of a proxy
0670 :param token_key: key to get the token from the token cache
0671
0672 :return: response in dictionary
0673 """
0674 if target_distinguished_name is None:
0675 target_distinguished_name = real_distinguished_name
0676 tmp_log = LogWrapper(_logger, f"get_proxy PID={os.getpid()}")
0677 tmp_msg = f"""start DN="{real_distinguished_name}" role={role} target="{target_distinguished_name}" tokenized={tokenized} token_key={token_key}"""
0678 tmp_log.debug(tmp_msg)
0679 if real_distinguished_name is None:
0680
0681 tmp_msg = "failed since DN cannot be extracted"
0682 tmp_log.debug(tmp_msg)
0683 response = Protocol.Response(Protocol.SC_Perms, "Cannot extract DN from proxy. not HTTPS?")
0684 else:
0685
0686 compact_name = CoreUtils.clean_user_id(real_distinguished_name)
0687
0688 self.specialDispatchParams.update()
0689 if "allowProxy" not in self.specialDispatchParams:
0690 allowed_names = []
0691 else:
0692 allowed_names = self.specialDispatchParams["allowProxy"]
0693 if compact_name not in allowed_names:
0694
0695 tmp_msg = f"failed since '{compact_name}' not in the authorized user list who have 'p' in {panda_config.schemaMETA}.USERS.GRIDPREF "
0696 if not tokenized:
0697 tmp_msg += "to get proxy"
0698 else:
0699 tmp_msg += "to get access token"
0700 tmp_log.debug(tmp_msg)
0701 response = Protocol.Response(Protocol.SC_Perms, tmp_msg)
0702 elif (
0703 tokenized
0704 and target_distinguished_name in self.token_cache_config
0705 and self.token_cache_config[target_distinguished_name].get("use_token_key") is True
0706 and (
0707 target_distinguished_name not in self.specialDispatchParams["tokenKeys"]
0708 or token_key not in self.specialDispatchParams["tokenKeys"][target_distinguished_name]["fullList"]
0709 )
0710 ):
0711
0712 tmp_msg = f"failed since token key is invalid for {target_distinguished_name}"
0713 tmp_log.debug(tmp_msg)
0714 response = Protocol.Response(Protocol.SC_Invalid, tmp_msg)
0715 else:
0716
0717 response = Protocol.Response(Protocol.SC_Success, "")
0718 tmp_status, tmp_msg = self.set_user_proxy(response, target_distinguished_name, role, tokenized)
0719 if not tmp_status:
0720 tmp_log.debug(tmp_msg)
0721 response.appendNode("StatusCode", Protocol.SC_ProxyError)
0722 else:
0723 tmp_msg = "successful sent proxy"
0724 tmp_log.debug(tmp_msg)
0725
0726 return response.encode(True)
0727
0728
0729 def getActiveJobAttributes(self, pandaID, attrs):
0730 return self.taskBuffer.getActiveJobAttributes(pandaID, attrs)
0731
0732
0733 def updateWorkerPilotStatus(self, workerID, harvesterID, status, timeout, accept_json, node_id):
0734 tmp_wrapper = _TimedMethod(self.taskBuffer.updateWorkerPilotStatus, timeout)
0735 tmp_wrapper.run(workerID, harvesterID, status, node_id)
0736
0737
0738 if tmp_wrapper.result == Protocol.TimeOutToken:
0739 response = Protocol.Response(Protocol.SC_TimeOut)
0740 else:
0741 if tmp_wrapper.result:
0742 response = Protocol.Response(Protocol.SC_Success)
0743 else:
0744 response = Protocol.Response(Protocol.SC_Failed)
0745 _logger.debug(f"updateWorkerPilotStatus : {workerID} {harvesterID} {status} ret -> {response.encode(accept_json)}")
0746 return response.encode(accept_json)
0747
0748
0749 def get_max_worker_id(self, harvester_id):
0750 id = self.taskBuffer.get_max_worker_id(harvester_id)
0751 return json.dumps(id)
0752
0753 def get_events_status(self, ids):
0754 ret = self.taskBuffer.get_events_status(ids)
0755 return json.dumps(ret)
0756
0757
0758
0759 jobDispatcher = JobDispatcher()
0760 del JobDispatcher
0761
0762
0763
0764 def _getFQAN(req):
0765 fqans = []
0766 for tmp_key in req.subprocess_env:
0767 tmp_value = req.subprocess_env[tmp_key]
0768
0769
0770 if tmp_key.startswith("GRST_CRED_") and tmp_value.startswith("VOMS"):
0771 fqan = tmp_value.split()[-1]
0772 fqans.append(fqan)
0773
0774
0775 elif tmp_key.startswith("GRST_CONN_"):
0776 tmp_items = tmp_value.split(":")
0777 if len(tmp_items) == 2 and tmp_items[0] == "fqan":
0778 fqans.append(tmp_items[-1])
0779
0780 return fqans
0781
0782
0783
0784 def _checkRole(fqans, dn, withVomsPatch=True):
0785 production_manager = False
0786 try:
0787
0788 production_attributes = [
0789 "/atlas/usatlas/Role=production",
0790 "/atlas/usatlas/Role=pilot",
0791 "/atlas/Role=production",
0792 "/atlas/Role=pilot",
0793 "/osg/Role=pilot",
0794 "^/[^/]+/Role=production$",
0795 "/ams/Role=pilot",
0796 "/Engage/LBNE/Role=pilot",
0797 ]
0798 if withVomsPatch:
0799
0800 production_attributes += ["/atlas/", "/osg/", "/cms/", "/ams/", "/Engage/LBNE/"]
0801
0802 for fqan in fqans:
0803
0804 if any(fqan.startswith(role_pattern) or re.search(role_pattern, fqan) for role_pattern in production_attributes):
0805 production_manager = True
0806 break
0807
0808
0809 if production_manager:
0810 break
0811
0812
0813 if not production_manager and dn not in [None]:
0814 for owner in set(panda_config.production_dns).union(panda_config.pilot_owners):
0815 if owner and re.search(owner, dn) is not None:
0816 production_manager = True
0817 break
0818
0819 except Exception:
0820 pass
0821
0822 return production_manager
0823
0824
0825
0826 def _getDN(req):
0827 realDN = None
0828 if "SSL_CLIENT_S_DN" in req.subprocess_env:
0829 realDN = req.subprocess_env["SSL_CLIENT_S_DN"]
0830
0831 realDN = CoreUtils.get_bare_dn(realDN, keep_proxy=True)
0832
0833 return realDN
0834
0835
0836 """
0837 web service interface
0838
0839 """
0840
0841
0842
0843 def getJob(
0844 req,
0845 siteName,
0846 token=None,
0847 timeout=60,
0848 cpu=None,
0849 mem=None,
0850 diskSpace=None,
0851 prodSourceLabel=None,
0852 node=None,
0853 computingElement=None,
0854 AtlasRelease=None,
0855 prodUserID=None,
0856 getProxyKey=None,
0857 countryGroup=None,
0858 workingGroup=None,
0859 allowOtherCountry=None,
0860 taskID=None,
0861 nJobs=None,
0862 background=None,
0863 resourceType=None,
0864 harvester_id=None,
0865 worker_id=None,
0866 schedulerID=None,
0867 jobType=None,
0868 viaTopic=None,
0869 remaining_time=None,
0870 ):
0871 tmpLog = LogWrapper(_logger, f"getJob {naive_utcnow().isoformat('/')}")
0872 tmpLog.debug(siteName)
0873
0874 realDN = _getDN(req)
0875
0876 fqans = _getFQAN(req)
0877
0878 if getProxyKey == "True":
0879
0880 prodManager = _checkRole(fqans, realDN, False)
0881 else:
0882 prodManager = _checkRole(fqans, realDN)
0883
0884 if not prodManager:
0885 prodUserID = realDN
0886
0887 if getProxyKey == "True" and prodManager:
0888 getProxyKey = True
0889 else:
0890 getProxyKey = False
0891
0892 try:
0893 mem = int(float(mem))
0894 if mem < 0:
0895 mem = 0
0896 except Exception:
0897 mem = 0
0898 try:
0899 diskSpace = int(float(diskSpace))
0900 if diskSpace < 0:
0901 diskSpace = 0
0902 except Exception:
0903 diskSpace = 0
0904 try:
0905 remaining_time = max(0, int(remaining_time))
0906 except Exception:
0907 remaining_time = 0
0908 if background == "True":
0909 background = True
0910 else:
0911 background = False
0912 if viaTopic == "True":
0913 viaTopic = True
0914 else:
0915 viaTopic = False
0916 tmpLog.debug(
0917 f"{siteName},nJobs={nJobs},cpu={cpu},mem={mem},disk={diskSpace},source_label={prodSourceLabel},"
0918 f"node={node},ce={computingElement},rel={AtlasRelease},user={prodUserID},proxy={getProxyKey},"
0919 f"c_group={countryGroup},w_group={workingGroup},{allowOtherCountry},taskID={taskID},DN={realDN},"
0920 f"role={prodManager},FQAN={str(fqans)},json={req.acceptJson()},"
0921 f"bg={background},rt={resourceType},harvester_id={harvester_id},worker_id={worker_id},"
0922 f"schedulerID={schedulerID},jobType={jobType},viaTopic={viaTopic} remaining_time={remaining_time}"
0923 )
0924 try:
0925 dummyNumSlots = int(nJobs)
0926 except Exception:
0927 dummyNumSlots = 1
0928 if dummyNumSlots > 1:
0929 for iSlots in range(dummyNumSlots):
0930 _pilotReqLogger.info(f"method=getJob,site={siteName},node={node}_{iSlots},type={prodSourceLabel}")
0931 else:
0932 _pilotReqLogger.info(f"method=getJob,site={siteName},node={node},type={prodSourceLabel}")
0933
0934 if (not prodManager) and (prodSourceLabel not in ["user"]):
0935 tmpLog.warning("invalid role")
0936 if req.acceptJson():
0937 tmpMsg = "no production/pilot role in VOMS FQANs or non pilot owner"
0938 else:
0939 tmpMsg = None
0940 return Protocol.Response(Protocol.SC_Role, tmpMsg).encode(req.acceptJson())
0941
0942 return jobDispatcher.getJob(
0943 siteName,
0944 prodSourceLabel,
0945 mem,
0946 diskSpace,
0947 node,
0948 int(timeout),
0949 computingElement,
0950 prodUserID,
0951 getProxyKey,
0952 realDN,
0953 taskID,
0954 nJobs,
0955 req.acceptJson(),
0956 background,
0957 resourceType,
0958 harvester_id,
0959 worker_id,
0960 schedulerID,
0961 jobType,
0962 viaTopic,
0963 remaining_time,
0964 tmpLog,
0965 )
0966
0967
0968
0969 def updateJob(
0970 req,
0971 jobId,
0972 state,
0973 token=None,
0974 transExitCode=None,
0975 pilotErrorCode=None,
0976 pilotErrorDiag=None,
0977 timestamp=None,
0978 timeout=60,
0979 xml="",
0980 node=None,
0981 workdir=None,
0982 cpuConsumptionTime=None,
0983 cpuConsumptionUnit=None,
0984 remainingSpace=None,
0985 schedulerID=None,
0986 pilotID=None,
0987 siteName=None,
0988 messageLevel=None,
0989 pilotLog="",
0990 metaData="",
0991 cpuConversionFactor=None,
0992 exeErrorCode=None,
0993 exeErrorDiag=None,
0994 pilotTiming=None,
0995 computingElement=None,
0996 startTime=None,
0997 endTime=None,
0998 nEvents=None,
0999 nInputFiles=None,
1000 batchID=None,
1001 attemptNr=None,
1002 jobMetrics=None,
1003 stdout="",
1004 jobSubStatus=None,
1005 coreCount=None,
1006 maxRSS=None,
1007 maxVMEM=None,
1008 maxSWAP=None,
1009 maxPSS=None,
1010 avgRSS=None,
1011 avgVMEM=None,
1012 avgSWAP=None,
1013 avgPSS=None,
1014 totRCHAR=None,
1015 totWCHAR=None,
1016 totRBYTES=None,
1017 totWBYTES=None,
1018 rateRCHAR=None,
1019 rateWCHAR=None,
1020 rateRBYTES=None,
1021 rateWBYTES=None,
1022 corruptedFiles=None,
1023 meanCoreCount=None,
1024 cpu_architecture_level=None,
1025 grid=None,
1026 source_site=None,
1027 destination_site=None,
1028 ):
1029 tmp_log = LogWrapper(_logger, f"updateJob PandaID={jobId} PID={os.getpid()}")
1030 tmp_log.debug("start")
1031
1032
1033 realDN = _getDN(req)
1034 fqans = _getFQAN(req)
1035 prodManager = _checkRole(fqans, realDN)
1036 acceptJson = req.acceptJson()
1037
1038 _logger.debug(
1039 f"updateJob({jobId},status={state},transExitCode={transExitCode},pilotErrorCode={pilotErrorCode},pilotErrorDiag={pilotErrorDiag},node={node},workdir={workdir},"
1040 f"cpuConsumptionTime={cpuConsumptionTime},cpuConsumptionUnit={cpuConsumptionUnit},cpu_architecture_level={cpu_architecture_level},remainingSpace={remainingSpace},"
1041 f"schedulerID={schedulerID},pilotID={pilotID},siteName={siteName},messageLevel={messageLevel},nEvents={nEvents},nInputFiles={nInputFiles},cpuConversionFactor={cpuConversionFactor},"
1042 f"exeErrorCode={exeErrorCode},exeErrorDiag={exeErrorDiag},pilotTiming={pilotTiming},computingElement={computingElement},startTime={startTime},endTime={endTime},batchID={batchID},"
1043 f"attemptNr:{attemptNr},jobSubStatus:{jobSubStatus},core:{coreCount},DN:{realDN},role:{prodManager},"
1044 f"FQAN:{fqans},maxRSS={maxRSS},maxVMEM={maxVMEM},maxSWAP={maxSWAP},"
1045 f"maxPSS={maxPSS},avgRSS={avgRSS},avgVMEM={avgVMEM},avgSWAP={avgSWAP},avgPSS={avgPSS},"
1046 f"totRCHAR={totRCHAR},totWCHAR={totWCHAR},totRBYTES={totRBYTES},totWBYTES={totWBYTES},rateRCHAR={rateRCHAR},"
1047 f"rateWCHAR={rateWCHAR},rateRBYTES={rateRBYTES},rateWBYTES={rateWBYTES},meanCoreCount={meanCoreCount},"
1048 f"grid={grid},source_site={source_site},destination_site={destination_site},"
1049 f"corruptedFiles={corruptedFiles}\n==XML==\n{xml}\n==LOG==\n{pilotLog[:1024]}\n==Meta==\n{metaData[:1024]}\n"
1050 f"==Metrics==\n{jobMetrics}\n==stdout==\n{stdout})"
1051 )
1052
1053 _pilotReqLogger.debug(f"method=updateJob,site={siteName},node={node},type=None")
1054
1055
1056 if not prodManager:
1057 tmp_log.warning(f"invalid role")
1058 tmpMsg = None
1059 if acceptJson:
1060 tmpMsg = "no production/pilot role in VOMS FQANs or non pilot owner"
1061 return Protocol.Response(Protocol.SC_Role, tmpMsg).encode(acceptJson)
1062
1063
1064 if jobId == "NULL":
1065 return Protocol.Response(Protocol.SC_Success).encode(acceptJson)
1066
1067
1068 if state not in [
1069 "running",
1070 "failed",
1071 "finished",
1072 "holding",
1073 "starting",
1074 "transferring",
1075 ]:
1076 tmp_log.warning(f"invalid state={state} for updateJob")
1077 return Protocol.Response(Protocol.SC_Success).encode(acceptJson)
1078
1079
1080 param = {}
1081 if cpuConsumptionTime not in [None, ""]:
1082 param["cpuConsumptionTime"] = cpuConsumptionTime
1083 if cpuConsumptionUnit is not None:
1084 param["cpuConsumptionUnit"] = cpuConsumptionUnit
1085 if cpu_architecture_level:
1086 try:
1087 param["cpu_architecture_level"] = cpu_architecture_level[:20]
1088 except Exception:
1089 tmp_log.error(f"invalid cpu_architecture_level={cpu_architecture_level} for updateJob")
1090 pass
1091 if node is not None:
1092 param["modificationHost"] = node[:128]
1093 if transExitCode is not None:
1094 try:
1095 int(transExitCode)
1096 param["transExitCode"] = transExitCode
1097 except Exception:
1098 pass
1099 if pilotErrorCode is not None:
1100 try:
1101 int(pilotErrorCode)
1102 param["pilotErrorCode"] = pilotErrorCode
1103 except Exception:
1104 pass
1105 if pilotErrorDiag is not None:
1106 param["pilotErrorDiag"] = pilotErrorDiag[:500]
1107 if jobMetrics is not None:
1108 param["jobMetrics"] = jobMetrics[:500]
1109 if schedulerID is not None:
1110 param["schedulerID"] = schedulerID
1111 if pilotID is not None:
1112 param["pilotID"] = pilotID[:200]
1113 if batchID is not None:
1114 param["batchID"] = batchID[:80]
1115 if exeErrorCode is not None:
1116 param["exeErrorCode"] = exeErrorCode
1117 if exeErrorDiag is not None:
1118 param["exeErrorDiag"] = exeErrorDiag[:500]
1119 if cpuConversionFactor is not None:
1120 param["cpuConversion"] = cpuConversionFactor
1121 if pilotTiming is not None:
1122 param["pilotTiming"] = pilotTiming
1123 if nEvents is not None:
1124 param["nEvents"] = nEvents
1125 if nInputFiles is not None:
1126 param["nInputFiles"] = nInputFiles
1127 if jobSubStatus not in [None, ""]:
1128 param["jobSubStatus"] = jobSubStatus
1129 if coreCount not in [None, ""]:
1130 param["actualCoreCount"] = coreCount
1131 if meanCoreCount:
1132 try:
1133 param["meanCoreCount"] = float(meanCoreCount)
1134 except Exception:
1135 pass
1136 if grid is not None:
1137 param["grid"] = grid
1138 if source_site is not None:
1139 param["sourceSite"] = source_site
1140 if destination_site is not None:
1141 param["destinationSite"] = destination_site
1142 if maxRSS is not None:
1143 param["maxRSS"] = maxRSS
1144 if maxVMEM is not None:
1145 param["maxVMEM"] = maxVMEM
1146 if maxSWAP is not None:
1147 param["maxSWAP"] = maxSWAP
1148 if maxPSS is not None:
1149 param["maxPSS"] = maxPSS
1150 if avgRSS is not None:
1151 param["avgRSS"] = int(float(avgRSS))
1152 if avgVMEM is not None:
1153 param["avgVMEM"] = int(float(avgVMEM))
1154 if avgSWAP is not None:
1155 param["avgSWAP"] = int(float(avgSWAP))
1156 if avgPSS is not None:
1157 param["avgPSS"] = int(float(avgPSS))
1158 if totRCHAR is not None:
1159 totRCHAR = int(totRCHAR) / 1024
1160 totRCHAR = min(10**10 - 1, totRCHAR)
1161 param["totRCHAR"] = totRCHAR
1162 if totWCHAR is not None:
1163 totWCHAR = int(totWCHAR) / 1024
1164 totWCHAR = min(10**10 - 1, totWCHAR)
1165 param["totWCHAR"] = totWCHAR
1166 if totRBYTES is not None:
1167 totRBYTES = int(totRBYTES) / 1024
1168 totRBYTES = min(10**10 - 1, totRBYTES)
1169 param["totRBYTES"] = totRBYTES
1170 if totWBYTES is not None:
1171 totWBYTES = int(totWBYTES) / 1024
1172 totWBYTES = min(10**10 - 1, totWBYTES)
1173 param["totWBYTES"] = totWBYTES
1174 if rateRCHAR is not None:
1175 rateRCHAR = min(10**10 - 1, int(float(rateRCHAR)))
1176 param["rateRCHAR"] = rateRCHAR
1177 if rateWCHAR is not None:
1178 rateWCHAR = min(10**10 - 1, int(float(rateWCHAR)))
1179 param["rateWCHAR"] = rateWCHAR
1180 if rateRBYTES is not None:
1181 rateRBYTES = min(10**10 - 1, int(float(rateRBYTES)))
1182 param["rateRBYTES"] = rateRBYTES
1183 if rateWBYTES is not None:
1184 rateWBYTES = min(10**10 - 1, int(float(rateWBYTES)))
1185 param["rateWBYTES"] = rateWBYTES
1186 if startTime is not None:
1187 try:
1188 param["startTime"] = datetime.datetime(*time.strptime(startTime, "%Y-%m-%d %H:%M:%S")[:6])
1189 except Exception:
1190 pass
1191 if endTime is not None:
1192 try:
1193 param["endTime"] = datetime.datetime(*time.strptime(endTime, "%Y-%m-%d %H:%M:%S")[:6])
1194 except Exception:
1195 pass
1196 if attemptNr is not None:
1197 try:
1198 attemptNr = int(attemptNr)
1199 except Exception:
1200 attemptNr = None
1201 if stdout != "":
1202 stdout = stdout[:2048]
1203 if corruptedFiles is not None:
1204 param["corruptedFiles"] = corruptedFiles
1205
1206 tmp_log.debug("executing")
1207 return jobDispatcher.updateJob(
1208 int(jobId),
1209 state,
1210 int(timeout),
1211 xml,
1212 siteName,
1213 param,
1214 metaData,
1215 pilotLog,
1216 attemptNr,
1217 stdout,
1218 acceptJson,
1219 )
1220
1221
1222
1223 def updateJobsInBulk(req, jobList, harvester_id=None):
1224 ret_list = []
1225 ret_val = False
1226 prefix = f"updateJobsInBulk {harvester_id}"
1227 tmp_logger = LogWrapper(_logger, prefix)
1228 tmp_logger.debug("start")
1229 t_start = naive_utcnow()
1230
1231 try:
1232 job_list = json.loads(jobList)
1233 for job_dict in job_list:
1234 job_id = job_dict["jobId"]
1235 del job_dict["jobId"]
1236 state = job_dict["state"]
1237 del job_dict["state"]
1238 if "metaData" in job_dict:
1239 job_dict["metaData"] = str(job_dict["metaData"])
1240 tmp_ret = updateJob(req, job_id, state, **job_dict)
1241 ret_list.append(tmp_ret)
1242 ret_val = True
1243 except Exception:
1244 err_type, err_value = sys.exc_info()[:2]
1245 tmp_msg = f"failed with {err_type.__name__} {err_value}"
1246 ret_list = f"{prefix} {tmp_msg}"
1247 tmp_logger.error(f"{tmp_msg}\n{traceback.format_exc()}")
1248
1249 t_delta = naive_utcnow() - t_start
1250 tmp_logger.debug(f"took {t_delta.seconds}.{t_delta.microseconds // 1000:03d} sec")
1251 return json.dumps((ret_val, ret_list))
1252
1253
1254
1255 def getStatus(req, ids, timeout=60):
1256 _logger.debug(f"getStatus({ids})")
1257 return jobDispatcher.getStatus(ids, int(timeout))
1258
1259
1260
1261 def checkJobStatus(req, ids, timeout=60):
1262 return jobDispatcher.checkJobStatus(ids, int(timeout))
1263
1264
1265
1266 def getEventRanges(
1267 req,
1268 pandaID,
1269 jobsetID,
1270 taskID=None,
1271 nRanges=10,
1272 timeout=60,
1273 scattered=None,
1274 segment_id=None,
1275 ):
1276 """
1277 Check the permissions and retrieve a list of event ranges for a given PandaID.
1278
1279 Args:
1280 req: The request object containing the environment variables.
1281 pandaID (str): The ID of the Panda job.
1282 jobsetID (str): The ID of the job set.
1283 taskID (str, optional): The ID of the task. Defaults to None.
1284 nRanges (int, optional): The number of event ranges to retrieve. Defaults to 10.
1285 timeout (int, optional): The timeout value. Defaults to 60.
1286 scattered (str, optional): Whether the event ranges are scattered. Defaults to None.
1287 segment_id (int, optional): The segment ID. Defaults to None.
1288 Returns:
1289 dict: The response from the job dispatcher.
1290 """
1291 tmp_log = LogWrapper(_logger, f"getEventRanges(PandaID={pandaID} jobsetID={jobsetID} taskID={taskID},nRanges={nRanges},segment={segment_id})")
1292 tmp_log.debug("start")
1293
1294 tmp_stat, tmp_out = checkPilotPermission(req)
1295 if not tmp_stat:
1296 tmp_log.error(f"failed with {tmp_out}")
1297 return tmp_out
1298
1299 if scattered == "True":
1300 scattered = True
1301 else:
1302 scattered = False
1303
1304 if segment_id is not None:
1305 segment_id = int(segment_id)
1306
1307 return jobDispatcher.getEventRanges(
1308 pandaID,
1309 jobsetID,
1310 taskID,
1311 nRanges,
1312 int(timeout),
1313 req.acceptJson(),
1314 scattered,
1315 segment_id,
1316 )
1317
1318
1319 def updateEventRange(
1320 req,
1321 eventRangeID,
1322 eventStatus,
1323 coreCount=None,
1324 cpuConsumptionTime=None,
1325 objstoreID=None,
1326 timeout=60,
1327 pandaID=None,
1328 ):
1329 """
1330 Check the permissions and update the status of a specific event range.
1331
1332 Args:
1333 req: The request object containing the environment variables.
1334 eventRangeID (str): The ID of the event range to update.
1335 eventStatus (str): The new status of the event range.
1336 coreCount (int, optional): The number of cores used. Defaults to None.
1337 cpuConsumptionTime (float, optional): The CPU consumption time. Defaults to None.
1338 objstoreID (int, optional): The object store ID. Defaults to None.
1339 timeout (int, optional): The timeout value. Defaults to 60.
1340 pandaID (str, optional): The PandaID. Defaults to None.
1341
1342 Returns:
1343 dict: The response from the job dispatcher.
1344 """
1345 tmp_log = LogWrapper(
1346 _logger, f"updateEventRange({eventRangeID} status={eventStatus} coreCount={coreCount} cpuConsumptionTime={cpuConsumptionTime} osID={objstoreID})"
1347 )
1348 tmp_log.debug("start")
1349
1350 tmp_stat, tmp_out = checkPilotPermission(req)
1351 if not tmp_stat:
1352 tmp_log.error(f"failed with {tmp_out}")
1353 return tmp_out
1354
1355 return jobDispatcher.updateEventRange(
1356 eventRangeID,
1357 eventStatus,
1358 coreCount,
1359 cpuConsumptionTime,
1360 objstoreID,
1361 int(timeout),
1362 )
1363
1364
1365 def updateEventRanges(req, eventRanges, timeout=120, version=0, pandaID=None):
1366 """
1367 This function checks the permissions, converts the version to an integer, and updates the event ranges.
1368
1369 Args:
1370 req: The request object containing the environment variables.
1371 eventRanges (str): A JSON string containing the list of event ranges to update.
1372 timeout (int, optional): The timeout value. Defaults to 120.
1373 version (int, optional): The version of the event ranges. Defaults to 0.
1374 pandaID (str, optional): The PandaID. Defaults to None.
1375 Returns:
1376 dict: The response from the job dispatcher.
1377 """
1378 tmp_log = LogWrapper(_logger, f"updateEventRanges({eventRanges})")
1379 tmp_log.debug("start")
1380
1381 tmp_stat, tmp_out = checkPilotPermission(req)
1382 if not tmp_stat:
1383 tmp_log.error(f"failed with {tmp_out}")
1384 return tmp_out
1385
1386 try:
1387 version = int(version)
1388 except Exception:
1389 version = 0
1390
1391 return jobDispatcher.updateEventRanges(eventRanges, int(timeout), req.acceptJson(), version)
1392
1393
1394 def checkEventsAvailability(req, pandaID, jobsetID, taskID, timeout=60):
1395 """
1396 This function checks the availability of events for a given PandaID, jobsetID, and taskID.
1397
1398 Args:
1399 req: The request object containing the environment variables.
1400 pandaID (str): The PandaID.
1401 jobset_id (str): The jobsetID.
1402 task_id (str): The taskID.
1403 timeout (int, optional): The timeout value. Defaults to 60.
1404 Returns:
1405 bool: The availability status of the events.
1406 """
1407 tmp_log = LogWrapper(_logger, f"check_events_availability(panda_id={pandaID} jobset_id={jobsetID} task_id={taskID})")
1408 tmp_log.debug("start")
1409 tmp_stat, tmp_out = checkPilotPermission(req)
1410 if not tmp_stat:
1411 tmp_log.error(f"failed with {tmp_out}")
1412
1413 return jobDispatcher.checkEventsAvailability(pandaID, jobsetID, taskID, timeout)
1414
1415
1416 def getKeyPair(req, publicKeyName, privateKeyName):
1417 """
1418 This function retrieves the distinguished name (DN) from the request and uses it to get a key pair.
1419
1420 Args:
1421 req: The request object containing the environment variables.
1422 publicKeyName (str): The name of the public key.
1423 privateKeyName (str): The name of the private key.
1424
1425 Returns:
1426 dict: The key pair for the user.
1427 """
1428 real_dn = _getDN(req)
1429 return jobDispatcher.getKeyPair(real_dn, publicKeyName, privateKeyName, req.acceptJson())
1430
1431
1432 def getProxy(req, role=None, dn=None):
1433 """
1434 Get proxy for a user with a role.
1435
1436 Args:
1437 req: The request object containing the environment variables.
1438 role (str, optional): The role of the user. Defaults to None.
1439 dn (str, optional): The distinguished name of the user. Defaults to None.
1440 Returns:
1441 dict: The proxy for the user.
1442 """
1443 real_dn = _getDN(req)
1444 if role == "":
1445 role = None
1446 return jobDispatcher.get_proxy(real_dn, role, dn, False, None)
1447
1448
1449 def get_access_token(req, client_name, token_key=None):
1450 """
1451 This function retrieves the distinguished name (DN) from the request and uses it to get an access token for the specified client.
1452
1453 Args:
1454 req: The request object containing the environment variables.
1455 client_name (str): The name of the client requesting the access token.
1456 token_key (str, optional): The key to get the token from the token cache. Defaults to None.
1457 Returns:
1458 dict: The access token for the client.
1459 """
1460
1461 real_dn = _getDN(req)
1462 return jobDispatcher.get_proxy(real_dn, None, client_name, True, token_key)
1463
1464
1465 def get_token_key(req, client_name):
1466 """
1467 This function retrieves the distinguished name (DN) from the request and uses it to get a token key for the specified client.
1468
1469 Args:
1470 req: The request object containing the environment variables.
1471 client_name (str): The name of the client requesting the token key.
1472 Returns:
1473 str: The token key for the client.
1474 """
1475
1476 real_dn = _getDN(req)
1477 return jobDispatcher.get_token_key(real_dn, client_name, req.acceptJson())
1478
1479
1480 def checkPilotPermission(req):
1481 """
1482 This function retrieves the distinguished name (DN) and Fully Qualified Attribute Names (FQANs) from the request,
1483 checks if the user has a production role, and verifies the DN.
1484
1485 Args:
1486 req: The request object containing the environment variables.
1487 Returns:
1488 tuple: A tuple containing a boolean indicating success and a message.
1489 """
1490
1491 real_dn = _getDN(req)
1492 if real_dn is None:
1493 return False, "failed to retrieve DN"
1494
1495
1496 fqans = _getFQAN(req)
1497 prod_manager = _checkRole(fqans, real_dn, True)
1498 if not prod_manager:
1499 return False, "production or pilot role is required"
1500
1501 return True, None
1502
1503
1504 def getCommands(req, harvester_id, n_commands, timeout=30):
1505 """
1506 This function checks the permissions and retrieves the commands for a specified harvester instance.
1507
1508 Args:
1509 req: The request object containing the environment variables.
1510 harvester_id (str): The ID of the harvester instance.
1511 n_commands (int): The number of commands to retrieve.
1512 timeout (int, optional): The timeout value. Defaults to 30.
1513 Returns:
1514 dict: The response from the job dispatcher.
1515 """
1516 tmp_str = "getCommands"
1517
1518
1519 tmp_stat, tmp_out = checkPilotPermission(req)
1520 if not tmp_stat:
1521 _logger.error(f"{tmp_str} failed with {tmp_out}")
1522
1523 accept_json = req.acceptJson()
1524
1525 return jobDispatcher.getCommands(harvester_id, n_commands, timeout, accept_json)
1526
1527
1528 def ackCommands(req, command_ids, timeout=30):
1529 """
1530 This function checks the permissions, parses the command IDs from JSON, and acknowledges the list of commands.
1531
1532 Args:
1533 req: The request object containing the environment variables.
1534 command_ids (str): A JSON string containing the list of command IDs to acknowledge.
1535 timeout (int, optional): The timeout value. Defaults to 30.
1536 Returns:
1537 dict: The response from the job dispatcher.
1538 """
1539 tmp_str = "ackCommands"
1540
1541
1542 tmp_stat, tmp_out = checkPilotPermission(req)
1543 if not tmp_stat:
1544 _logger.error(f"{tmp_str} failed with {tmp_out}")
1545
1546 command_ids = json.loads(command_ids)
1547 accept_json = req.acceptJson()
1548
1549
1550 return jobDispatcher.ackCommands(command_ids, timeout, accept_json)
1551
1552
1553 def getResourceTypes(req, timeout=30):
1554 """
1555 This function retrieves the resource types (MCORE, SCORE, etc.) and their definitions.
1556
1557 Args:
1558 req: The request object containing the environment variables.
1559 timeout (int, optional): The timeout value. Defaults to 30.
1560 Returns:
1561 dict: The resource types and their definitions.
1562 """
1563 tmp_str = "getResourceTypes"
1564
1565
1566 tmp_stat, tmp_out = checkPilotPermission(req)
1567 if not tmp_stat:
1568 _logger.error(f"{tmp_str} failed with {tmp_out}")
1569
1570 accept_json = req.acceptJson()
1571
1572
1573 return jobDispatcher.getResourceTypes(timeout, accept_json)
1574
1575
1576 def updateWorkerPilotStatus(req, site, workerID, harvesterID, status, timeout=60, node_id=None):
1577 """
1578 Update the status of a worker according to the pilot.
1579
1580 This function validates the pilot permissions and the state passed by the pilot, then updates the worker status.
1581
1582 Args:
1583 req: The request object containing the environment variables.
1584 site (str): The site name.
1585 workerID (str): The worker ID.
1586 harvesterID (str): The harvester ID.
1587 status (str): The status of the worker. Must be either 'started' or 'finished'.
1588 timeout (int, optional): The timeout value. Defaults to 60.
1589 node_id (str, optional): The node ID. Defaults to None.
1590
1591 Returns:
1592 str: The result of the status update or an error message.
1593 """
1594 tmp_log = LogWrapper(
1595 _logger,
1596 f"updateWorkerPilotStatus workerID={workerID} harvesterID={harvesterID} status={status} nodeID={node_id} PID={os.getpid()}",
1597 )
1598 tmp_log.debug("start")
1599
1600
1601 tmp_stat, tmp_out = checkPilotPermission(req)
1602 if not tmp_stat:
1603 tmp_log.error(f"failed with {tmp_out}")
1604 return tmp_out
1605
1606
1607 valid_states = ("started", "finished")
1608 if status not in valid_states:
1609 message = f"Invalid worker state. The worker state has to be in {valid_states}"
1610 tmp_log.debug(message)
1611 return message
1612
1613 accept_json = req.acceptJson()
1614
1615 return jobDispatcher.updateWorkerPilotStatus(workerID, harvesterID, status, timeout, accept_json, node_id)
1616
1617
1618
1619 def get_max_worker_id(req, harvester_id):
1620 return jobDispatcher.get_max_worker_id(harvester_id)
1621
1622
1623
1624 def get_events_status(req, ids):
1625 return jobDispatcher.get_events_status(ids)