Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-10 08:39:02

0001 """
0002 dispatch jobs
0003 
0004 """
0005 
0006 import datetime
0007 import json
0008 import os
0009 import re
0010 import socket
0011 import sys
0012 import threading
0013 import time
0014 import traceback
0015 from threading import Lock
0016 
0017 from pandacommon.pandalogger.LogWrapper import LogWrapper
0018 from pandacommon.pandalogger.PandaLogger import PandaLogger
0019 from pandacommon.pandautils.PandaUtils import naive_utcnow
0020 
0021 from pandaserver.brokerage.SiteMapper import SiteMapper
0022 from pandaserver.config import panda_config
0023 from pandaserver.dataservice.adder_gen import AdderGen
0024 from pandaserver.jobdispatcher import Protocol
0025 from pandaserver.proxycache import panda_proxy_cache, token_cache
0026 from pandaserver.srvcore import CoreUtils
0027 
0028 # logger
0029 _logger = PandaLogger().getLogger("JobDispatcher")
0030 _pilotReqLogger = PandaLogger().getLogger("PilotRequests")
0031 
0032 
0033 # a wrapper to install timeout into a method
0034 class _TimedMethod:
0035     def __init__(self, method, timeout):
0036         self.method = method
0037         self.timeout = timeout
0038         self.result = Protocol.TimeOutToken
0039 
0040     # method emulation
0041     def __call__(self, *var):
0042         self.result = self.method(*var)
0043 
0044     # run
0045     def run(self, *var):
0046         thr = threading.Thread(target=self, args=var)
0047         thr.start()
0048         thr.join()
0049 
0050 
0051 # job dispatcher
0052 class JobDispatcher:
0053     # constructor
0054     def __init__(self):
0055         # taskbuffer
0056         self.taskBuffer = None
0057         # datetime of last updated
0058         self.lastUpdated = naive_utcnow()
0059         # how frequently update DN/token map
0060         self.timeInterval = datetime.timedelta(seconds=180)
0061         # special dispatcher parameters
0062         self.specialDispatchParams = None
0063         # site mapper cache
0064         self.siteMapperCache = None
0065         # lock
0066         self.lock = Lock()
0067         # proxy cacher
0068         self.proxy_cacher = panda_proxy_cache.MyProxyInterface()
0069         # token cacher
0070         self.token_cacher = token_cache.TokenCache()
0071         # config of token cacher
0072         try:
0073             with open(panda_config.token_cache_config) as f:
0074                 self.token_cache_config = json.load(f)
0075         except Exception:
0076             self.token_cache_config = {}
0077 
0078     # set task buffer
0079     def init(self, taskBuffer):
0080         # lock
0081         self.lock.acquire()
0082         # set TB
0083         if self.taskBuffer is None:
0084             self.taskBuffer = taskBuffer
0085         # special dispatcher parameters
0086         if self.specialDispatchParams is None:
0087             self.specialDispatchParams = CoreUtils.CachedObject("dispatcher_params", 60 * 10, self.get_special_dispatch_params, _logger)
0088         # site mapper cache
0089         if self.siteMapperCache is None:
0090             self.siteMapperCache = CoreUtils.CachedObject("site_mapper", 60 * 10, self.getSiteMapper, _logger)
0091         # release
0092         self.lock.release()
0093 
0094     # get special parameters for dispatcher
0095     def get_special_dispatch_params(self):
0096         """
0097         Wrapper function around taskBuffer.get_special_dispatch_params to convert list to set since task buffer cannot return set
0098         """
0099         param = self.taskBuffer.get_special_dispatch_params()
0100         for client_name in param["tokenKeys"]:
0101             param["tokenKeys"][client_name]["fullList"] = set(param["tokenKeys"][client_name]["fullList"])
0102         return True, param
0103 
0104     # set user proxy
0105     def set_user_proxy(self, response, distinguished_name=None, role=None, tokenized=False) -> tuple[bool, str]:
0106         """
0107         Set user proxy to the response
0108 
0109         :param response: response object
0110         :param distinguished_name: the distinguished name of the user
0111         :param role: the role of the user
0112         :param tokenized: whether the response should contain a token instead of a proxy
0113 
0114         :return: a tuple containing a boolean indicating success and a message
0115         """
0116         try:
0117             if distinguished_name is None:
0118                 distinguished_name = response.data["prodUserID"]
0119             # remove redundant extensions
0120             distinguished_name = CoreUtils.get_bare_dn(distinguished_name, keep_digits=False)
0121             if not tokenized:
0122                 # get proxy
0123                 output = self.proxy_cacher.retrieve(distinguished_name, role=role)
0124             else:
0125                 # get token
0126                 output = self.token_cacher.get_access_token(distinguished_name)
0127             # not found
0128             if output is None:
0129                 tmp_msg = f"""{"token" if tokenized else "proxy"} not found for {distinguished_name}"""
0130                 response.appendNode("errorDialog", tmp_msg)
0131                 return False, tmp_msg
0132             # set
0133             response.appendNode("userProxy", output)
0134             return True, ""
0135         except Exception as e:
0136             tmp_msg = f"""{"token" if tokenized else "proxy"} retrieval failed with {str(e)}"""
0137             response.appendNode("errorDialog", tmp_msg)
0138             return False, tmp_msg
0139 
0140     # get job
0141     def getJob(
0142         self,
0143         siteName,
0144         prodSourceLabel,
0145         mem,
0146         diskSpace,
0147         node,
0148         timeout,
0149         computingElement,
0150         prodUserID,
0151         getProxyKey,
0152         realDN,
0153         taskID,
0154         nJobs,
0155         acceptJson,
0156         background,
0157         resourceType,
0158         harvester_id,
0159         worker_id,
0160         schedulerID,
0161         jobType,
0162         via_topic,
0163         remaining_time,
0164         tmpLog,
0165     ):
0166         t_getJob_start = time.time()
0167         jobs = []
0168         try:
0169             tmpNumJobs = int(nJobs)
0170         except Exception:
0171             tmpNumJobs = None
0172         if tmpNumJobs is None:
0173             tmpNumJobs = 1
0174 
0175         self.siteMapperCache.update()
0176         is_gu = self.siteMapperCache.cachedObj.getSite(siteName).is_grandly_unified()
0177         in_test = self.siteMapperCache.cachedObj.getSite(siteName).status == "test"
0178 
0179         # change label
0180         if in_test and prodSourceLabel in ["user", "managed", "unified"]:
0181             new_label = "test"
0182             tmpLog.debug(f"prodSourceLabel changed {prodSourceLabel} -> {new_label}")
0183             prodSourceLabel = new_label
0184 
0185         # wrapper function for timeout
0186         tmpWrapper = _TimedMethod(self.taskBuffer.getJobs, timeout)
0187         tmpWrapper.run(
0188             tmpNumJobs,
0189             siteName,
0190             prodSourceLabel,
0191             mem,
0192             diskSpace,
0193             node,
0194             timeout,
0195             computingElement,
0196             prodUserID,
0197             taskID,
0198             background,
0199             resourceType,
0200             harvester_id,
0201             worker_id,
0202             schedulerID,
0203             jobType,
0204             is_gu,
0205             via_topic,
0206             remaining_time,
0207         )
0208 
0209         if isinstance(tmpWrapper.result, list):
0210             jobs = jobs + tmpWrapper.result
0211         # make response
0212         secrets_map = {}
0213         if len(jobs) > 0:
0214             secrets_map = jobs.pop()
0215             proxyKey = jobs[-1]
0216             nSent = jobs[-2]
0217             jobs = jobs[:-2]
0218         if len(jobs) != 0:
0219             # succeed
0220             responseList = []
0221             # append Jobs
0222             for tmpJob in jobs:
0223                 try:
0224                     response = Protocol.Response(Protocol.SC_Success)
0225                     response.appendJob(tmpJob, self.siteMapperCache)
0226                 except Exception as e:
0227                     tmpMsg = f"failed to get jobs with {str(e)}"
0228                     tmpLog.error(tmpMsg + "\n" + traceback.format_exc())
0229                     raise
0230 
0231                 # append nSent
0232                 response.appendNode("nSent", nSent)
0233                 # set proxy key
0234                 if getProxyKey:
0235                     response.setProxyKey(proxyKey)
0236                 # set secrets
0237                 if tmpJob.use_secrets() and tmpJob.prodUserName in secrets_map and secrets_map[tmpJob.prodUserName]:
0238                     response.appendNode("secrets", secrets_map[tmpJob.prodUserName])
0239                 if panda_config.pilot_secrets in secrets_map and secrets_map[panda_config.pilot_secrets]:
0240                     response.appendNode("pilotSecrets", secrets_map[panda_config.pilot_secrets])
0241                 # add
0242                 responseList.append(response.data)
0243             # make response for bulk
0244             if nJobs is not None:
0245                 try:
0246                     response = Protocol.Response(Protocol.SC_Success)
0247                     if not acceptJson:
0248                         response.appendNode("jobs", json.dumps(responseList))
0249                     else:
0250                         response.appendNode("jobs", responseList)
0251                 except Exception as e:
0252                     tmpMsg = f"failed to make response with {str(e)}"
0253                     tmpLog.error(tmpMsg + "\n" + traceback.format_exc())
0254                     raise
0255 
0256         else:
0257             if tmpWrapper.result == Protocol.TimeOutToken:
0258                 # timeout
0259                 if acceptJson:
0260                     response = Protocol.Response(Protocol.SC_TimeOut, "database timeout")
0261                 else:
0262                     response = Protocol.Response(Protocol.SC_TimeOut)
0263             else:
0264                 # no available jobs
0265                 if acceptJson:
0266                     response = Protocol.Response(Protocol.SC_NoJobs, "no jobs in PanDA")
0267                 else:
0268                     response = Protocol.Response(Protocol.SC_NoJobs)
0269                 _pilotReqLogger.info(f"method=noJob,site={siteName},node={node},type={prodSourceLabel}")
0270         # return
0271         tmpLog.debug(f"{siteName} {node} ret -> {response.encode(acceptJson)}")
0272 
0273         t_getJob_end = time.time()
0274         t_getJob_spent = t_getJob_end - t_getJob_start
0275         tmpLog.info(f"siteName={siteName} took timing={t_getJob_spent}s in_test={in_test}")
0276         return response.encode(acceptJson)
0277 
0278     # update job status
0279     def updateJob(
0280         self,
0281         jobID,
0282         jobStatus,
0283         timeout,
0284         xml,
0285         siteName,
0286         param,
0287         metadata,
0288         pilotLog,
0289         attemptNr=None,
0290         stdout="",
0291         acceptJson=False,
0292     ):
0293         tmp_logger = LogWrapper(_logger, f"updateJob {jobID}")
0294 
0295         # store the pilot log
0296         if pilotLog != "":
0297             tmp_logger.debug("saving pilot log")
0298             try:
0299                 self.taskBuffer.storePilotLog(int(jobID), pilotLog)
0300                 tmp_logger.debug("saving pilot log DONE")
0301             except Exception:
0302                 tmp_logger.debug("saving pilot log FAILED")
0303 
0304         # add metadata
0305         if metadata != "":
0306             ret = self.taskBuffer.addMetadata([jobID], [metadata], [jobStatus])
0307             if len(ret) > 0 and not ret[0]:
0308                 tmp_logger.debug(f"failed to add metadata")
0309                 # return succeed
0310                 response = Protocol.Response(Protocol.SC_Success)
0311                 return response.encode(acceptJson)
0312 
0313         # add stdout
0314         if stdout != "":
0315             self.taskBuffer.addStdOut(jobID, stdout)
0316 
0317         # update the job
0318         tmpStatus = jobStatus
0319         updateStateChange = False
0320         if jobStatus == "failed" or jobStatus == "finished":
0321             tmpStatus = "holding"
0322             # update stateChangeTime to prevent Watcher from finding this job
0323             updateStateChange = True
0324             param["jobDispatcherErrorDiag"] = None
0325         elif jobStatus in ["holding", "transferring"]:
0326             param["jobDispatcherErrorDiag"] = f"set to {jobStatus} by the pilot at {naive_utcnow().strftime('%Y-%m-%d %H:%M:%S')}"
0327         if tmpStatus == "holding":
0328             tmpWrapper = _TimedMethod(self.taskBuffer.updateJobStatus, None)
0329         else:
0330             tmpWrapper = _TimedMethod(self.taskBuffer.updateJobStatus, timeout)
0331         tmpWrapper.run(jobID, tmpStatus, param, updateStateChange, attemptNr)
0332 
0333         # make response
0334         if tmpWrapper.result == Protocol.TimeOutToken:
0335             # timeout
0336             response = Protocol.Response(Protocol.SC_TimeOut)
0337         else:
0338             if tmpWrapper.result:
0339                 # succeed
0340                 response = Protocol.Response(Protocol.SC_Success)
0341                 result = tmpWrapper.result
0342                 secrets = None
0343                 if isinstance(result, dict):
0344                     if "secrets" in result:
0345                         secrets = result["secrets"]
0346                     result = result["command"]
0347                 # set command
0348                 if isinstance(result, str):
0349                     response.appendNode("command", result)
0350                     if secrets:
0351                         response.appendNode("pilotSecrets", secrets)
0352                 else:
0353                     response.appendNode("command", "NULL")
0354 
0355                 # add output to dataset
0356                 if result not in ["badattemptnr", "alreadydone"] and (jobStatus == "failed" or jobStatus == "finished"):
0357                     adder_gen = AdderGen(self.taskBuffer, jobID, jobStatus, attemptNr)
0358                     adder_gen.dump_file_report(xml, attemptNr)
0359                     del adder_gen
0360             else:
0361                 response = Protocol.Response(Protocol.SC_Failed)
0362 
0363         tmp_logger.debug(f"ret -> {response.encode(acceptJson)}")
0364         return response.encode(acceptJson)
0365 
0366     # get job status
0367     def getStatus(self, strIDs, timeout):
0368         # convert str to list
0369         ids = strIDs.split()
0370         # peek jobs
0371         tmpWrapper = _TimedMethod(self.taskBuffer.peekJobs, timeout)
0372         tmpWrapper.run(ids, False, True, True, False)
0373         # make response
0374         if tmpWrapper.result == Protocol.TimeOutToken:
0375             # timeout
0376             response = Protocol.Response(Protocol.SC_TimeOut)
0377         else:
0378             if isinstance(tmpWrapper.result, list):
0379                 # succeed
0380                 response = Protocol.Response(Protocol.SC_Success)
0381                 # make return
0382                 retStr = ""
0383                 attStr = ""
0384                 for job in tmpWrapper.result:
0385                     if job is None:
0386                         retStr += f"notfound+"
0387                         attStr += "0+"
0388                     else:
0389                         retStr += f"{job.jobStatus}+"
0390                         attStr += f"{job.attemptNr}+"
0391                 response.appendNode("status", retStr[:-1])
0392                 response.appendNode("attemptNr", attStr[:-1])
0393             else:
0394                 # failed
0395                 response = Protocol.Response(Protocol.SC_Failed)
0396         _logger.debug(f"getStatus : {strIDs} ret -> {response.encode()}")
0397         return response.encode()
0398 
0399     # check job status
0400     def checkJobStatus(self, pandaIDs, timeout):
0401         tmpWrapper = _TimedMethod(self.taskBuffer.checkJobStatus, timeout)
0402         tmpWrapper.run(pandaIDs)
0403         # make response
0404         if tmpWrapper.result == Protocol.TimeOutToken:
0405             # timeout
0406             response = Protocol.Response(Protocol.SC_TimeOut)
0407         else:
0408             if isinstance(tmpWrapper.result, list):
0409                 # succeed
0410                 response = Protocol.Response(Protocol.SC_Success)
0411                 response.appendNode("data", tmpWrapper.result)
0412             else:
0413                 # failed
0414                 response = Protocol.Response(Protocol.SC_Failed)
0415         _logger.debug(f"checkJobStatus : {pandaIDs} ret -> {response.encode(True)}")
0416         return response.encode(True)
0417 
0418     # get a list of event ranges for a PandaID
0419     def getEventRanges(
0420         self,
0421         pandaID,
0422         jobsetID,
0423         jediTaskID,
0424         nRanges,
0425         timeout,
0426         acceptJson,
0427         scattered,
0428         segment_id,
0429     ):
0430         tmpWrapper = _TimedMethod(self.taskBuffer.getEventRanges, timeout)
0431         tmpWrapper.run(pandaID, jobsetID, jediTaskID, nRanges, acceptJson, scattered, segment_id)
0432         # make response
0433         if tmpWrapper.result == Protocol.TimeOutToken:
0434             # timeout
0435             response = Protocol.Response(Protocol.SC_TimeOut)
0436         else:
0437             if tmpWrapper.result is not None:
0438                 # succeed
0439                 response = Protocol.Response(Protocol.SC_Success)
0440                 # make return
0441                 response.appendNode("eventRanges", tmpWrapper.result)
0442             else:
0443                 # failed
0444                 response = Protocol.Response(Protocol.SC_Failed)
0445         _logger.debug(f"getEventRanges : {pandaID} ret -> {response.encode(acceptJson)}")
0446         return response.encode(acceptJson)
0447 
0448     # update an event range
0449     def updateEventRange(
0450         self,
0451         eventRangeID,
0452         eventStatus,
0453         coreCount,
0454         cpuConsumptionTime,
0455         objstoreID,
0456         timeout,
0457     ):
0458         tmpWrapper = _TimedMethod(self.taskBuffer.updateEventRange, timeout)
0459         tmpWrapper.run(eventRangeID, eventStatus, coreCount, cpuConsumptionTime, objstoreID)
0460         # make response
0461         _logger.debug(str(tmpWrapper.result))
0462         if tmpWrapper.result == Protocol.TimeOutToken:
0463             # timeout
0464             response = Protocol.Response(Protocol.SC_TimeOut)
0465         else:
0466             if tmpWrapper.result[0] is True:
0467                 # succeed
0468                 response = Protocol.Response(Protocol.SC_Success)
0469                 response.appendNode("Command", tmpWrapper.result[1])
0470             else:
0471                 # failed
0472                 response = Protocol.Response(Protocol.SC_Failed)
0473         _logger.debug(f"updateEventRange : {eventRangeID} ret -> {response.encode()}")
0474         return response.encode()
0475 
0476     # update event ranges
0477     def updateEventRanges(self, eventRanges, timeout, acceptJson, version):
0478         tmpWrapper = _TimedMethod(self.taskBuffer.updateEventRanges, timeout)
0479         tmpWrapper.run(eventRanges, version)
0480         # make response
0481         if tmpWrapper.result == Protocol.TimeOutToken:
0482             # timeout
0483             response = Protocol.Response(Protocol.SC_TimeOut)
0484         else:
0485             # succeed
0486             response = Protocol.Response(Protocol.SC_Success)
0487             # make return
0488             response.appendNode("Returns", tmpWrapper.result[0])
0489             response.appendNode("Command", tmpWrapper.result[1])
0490         _logger.debug(f"updateEventRanges : ret -> {response.encode(acceptJson)}")
0491         return response.encode(acceptJson)
0492 
0493     # check event availability
0494     def checkEventsAvailability(self, pandaID, jobsetID, jediTaskID, timeout):
0495         tmpWrapper = _TimedMethod(self.taskBuffer.checkEventsAvailability, timeout)
0496         tmpWrapper.run(pandaID, jobsetID, jediTaskID)
0497         # make response
0498         if tmpWrapper.result == Protocol.TimeOutToken:
0499             # timeout
0500             response = Protocol.Response(Protocol.SC_TimeOut)
0501         else:
0502             if tmpWrapper.result is not None:
0503                 # succeed
0504                 response = Protocol.Response(Protocol.SC_Success)
0505                 # make return
0506                 response.appendNode("nEventRanges", tmpWrapper.result)
0507             else:
0508                 # failed
0509                 response = Protocol.Response(Protocol.SC_Failed)
0510         _logger.debug(f"checkEventsAvailability : {pandaID} ret -> {response.encode(True)}")
0511         return response.encode(True)
0512 
0513     # get key pair
0514     def getKeyPair(self, realDN, publicKeyName, privateKeyName, acceptJson):
0515         tmpMsg = f"getKeyPair {publicKeyName}/{privateKeyName} json={acceptJson}: "
0516         if realDN is None:
0517             # cannot extract DN
0518             tmpMsg += "failed since DN cannot be extracted"
0519             _logger.debug(tmpMsg)
0520             response = Protocol.Response(Protocol.SC_Perms, "Cannot extract DN from proxy. not HTTPS?")
0521         else:
0522             # get compact DN
0523             compactDN = CoreUtils.clean_user_id(realDN)
0524             # check permission
0525             self.specialDispatchParams.update()
0526             allowKey = self.specialDispatchParams.get("allowKeyPair", [])
0527             if compactDN not in allowKey:
0528                 # permission denied
0529                 tmpMsg += f"failed since '{compactDN}' not authorized with 'k' in {panda_config.schemaMETA}.USERS.GRIDPREF"
0530                 _logger.debug(tmpMsg)
0531                 response = Protocol.Response(Protocol.SC_Perms, tmpMsg)
0532             else:
0533                 # look for key pair
0534                 if "keyPair" not in self.specialDispatchParams:
0535                     keyPair = {}
0536                 else:
0537                     keyPair = self.specialDispatchParams["keyPair"]
0538                 notFound = False
0539                 if publicKeyName not in keyPair:
0540                     # public key is missing
0541                     notFound = True
0542                     tmpMsg += f"failed for '{compactDN}' since {publicKeyName} is missing on {socket.getfqdn()}"
0543                 elif privateKeyName not in keyPair:
0544                     # private key is missing
0545                     notFound = True
0546                     tmpMsg += f"failed for '{compactDN}' since {privateKeyName} is missing on {socket.getfqdn()}"
0547                 if notFound:
0548                     # private or public key is missing
0549                     _logger.debug(tmpMsg)
0550                     response = Protocol.Response(Protocol.SC_MissKey, tmpMsg)
0551                 else:
0552                     # key pair is available
0553                     response = Protocol.Response(Protocol.SC_Success)
0554                     response.appendNode("publicKey", keyPair[publicKeyName])
0555                     response.appendNode("privateKey", keyPair[privateKeyName])
0556                     tmpMsg += f"sent key-pair to '{compactDN}'"
0557                     _logger.debug(tmpMsg)
0558         # return
0559         return response.encode(acceptJson)
0560 
0561     # get a token key
0562     def get_token_key(self, distinguished_name, client_name, accept_json):
0563         tmp_log = LogWrapper(_logger, f"get_token_key client={client_name} PID={os.getpid()}")
0564         if distinguished_name is None:
0565             # cannot extract DN
0566             tmp_msg = "failed since DN cannot be extracted. non-HTTPS?"
0567             tmp_log.debug(tmp_msg)
0568             response = Protocol.Response(Protocol.SC_Perms, tmp_msg)
0569         else:
0570             # get compact DN
0571             compact_name = CoreUtils.clean_user_id(distinguished_name)
0572             # check permission
0573             self.specialDispatchParams.update()
0574             allowed_users = self.specialDispatchParams.get("allowTokenKey", [])
0575             if compact_name not in allowed_users:
0576                 # permission denied
0577                 tmp_msg = f"denied since '{compact_name}' not authorized with 't' in {panda_config.schemaMETA}.USERS.GRIDPREF"
0578                 tmp_log.debug(tmp_msg)
0579                 response = Protocol.Response(Protocol.SC_Perms, tmp_msg)
0580             else:
0581                 # get a token key
0582                 if client_name not in self.specialDispatchParams["tokenKeys"]:
0583                     # token key is missing
0584                     tmp_msg = f"token key is missing for '{client_name}'"
0585                     tmp_log.debug(tmp_msg)
0586                     response = Protocol.Response(Protocol.SC_MissKey, tmp_msg)
0587                 else:
0588                     # token key is available
0589                     response = Protocol.Response(Protocol.SC_Success)
0590                     response.appendNode("tokenKey", self.specialDispatchParams["tokenKeys"][client_name]["latest"])
0591                     tmp_msg = f"sent token key to '{compact_name}'"
0592                     tmp_log.debug(tmp_msg)
0593         # return
0594         return response.encode(accept_json)
0595 
0596     # get site mapper
0597     def getSiteMapper(self):
0598         return True, SiteMapper(self.taskBuffer)
0599 
0600     def getCommands(self, harvester_id, n_commands, timeout, accept_json):
0601         """
0602         Get commands for a particular harvester instance
0603         """
0604         tmp_wrapper = _TimedMethod(self.taskBuffer.getCommands, timeout)
0605         tmp_wrapper.run(harvester_id, n_commands)
0606 
0607         # Make response
0608         if tmp_wrapper.result == Protocol.TimeOutToken:
0609             # timeout
0610             response = Protocol.Response(Protocol.SC_TimeOut)
0611         else:
0612             # success
0613             response = Protocol.Response(Protocol.SC_Success)
0614             response.appendNode("Returns", tmp_wrapper.result[0])
0615             response.appendNode("Commands", tmp_wrapper.result[1])
0616 
0617         _logger.debug(f"getCommands : ret -> {response.encode(accept_json)}")
0618         return response.encode(accept_json)
0619 
0620     def ackCommands(self, command_ids, timeout, accept_json):
0621         """
0622         Acknowledge the commands from a list of IDs
0623         """
0624         _logger.debug(f"command_ids : {command_ids}")
0625         tmp_wrapper = _TimedMethod(self.taskBuffer.ackCommands, timeout)
0626         tmp_wrapper.run(command_ids)
0627 
0628         # Make response
0629         if tmp_wrapper.result == Protocol.TimeOutToken:
0630             # timeout
0631             response = Protocol.Response(Protocol.SC_TimeOut)
0632         else:
0633             # success
0634             response = Protocol.Response(Protocol.SC_Success)
0635             response.appendNode("Returns", tmp_wrapper.result)
0636 
0637         _logger.debug(f"ackCommands : ret -> {response.encode(accept_json)}")
0638         return response.encode(accept_json)
0639 
0640     def getResourceTypes(self, timeout, accept_json):
0641         """
0642         Get resource types (SCORE, MCORE, SCORE_HIMEM, MCORE_HIMEM) and their definitions
0643         """
0644         tmp_wrapper = _TimedMethod(self.taskBuffer.getResourceTypes, timeout)
0645         tmp_wrapper.run()
0646 
0647         # Make response
0648         if tmp_wrapper.result == Protocol.TimeOutToken:
0649             # timeout
0650             response = Protocol.Response(Protocol.SC_TimeOut)
0651         else:
0652             # success
0653             response = Protocol.Response(Protocol.SC_Success)
0654             response.appendNode("Returns", 0)
0655             response.appendNode("ResourceTypes", tmp_wrapper.result)
0656 
0657         _logger.debug(f"getResourceTypes : ret -> {response.encode(accept_json)}")
0658         return response.encode(accept_json)
0659 
0660     # get proxy
0661     def get_proxy(self, real_distinguished_name: str, role: str | None, target_distinguished_name: str | None, tokenized: bool, token_key: str | None) -> dict:
0662         """
0663         Get proxy for a user with a role
0664 
0665         :param real_distinguished_name: actual distinguished name of the user
0666         :param role: role of the user
0667         :param target_distinguished_name: target distinguished name if the user wants to get proxy for someone else.
0668                                           This is one of client_name defined in token_cache_config when getting a token
0669         :param tokenized: whether the response should contain a token instead of a proxy
0670         :param token_key: key to get the token from the token cache
0671 
0672         :return: response in dictionary
0673         """
0674         if target_distinguished_name is None:
0675             target_distinguished_name = real_distinguished_name
0676         tmp_log = LogWrapper(_logger, f"get_proxy PID={os.getpid()}")
0677         tmp_msg = f"""start DN="{real_distinguished_name}" role={role} target="{target_distinguished_name}" tokenized={tokenized} token_key={token_key}"""
0678         tmp_log.debug(tmp_msg)
0679         if real_distinguished_name is None:
0680             # cannot extract DN
0681             tmp_msg = "failed since DN cannot be extracted"
0682             tmp_log.debug(tmp_msg)
0683             response = Protocol.Response(Protocol.SC_Perms, "Cannot extract DN from proxy. not HTTPS?")
0684         else:
0685             # get compact DN
0686             compact_name = CoreUtils.clean_user_id(real_distinguished_name)
0687             # check permission
0688             self.specialDispatchParams.update()
0689             if "allowProxy" not in self.specialDispatchParams:
0690                 allowed_names = []
0691             else:
0692                 allowed_names = self.specialDispatchParams["allowProxy"]
0693             if compact_name not in allowed_names:
0694                 # permission denied
0695                 tmp_msg = f"failed since '{compact_name}' not in the authorized user list who have 'p' in {panda_config.schemaMETA}.USERS.GRIDPREF "
0696                 if not tokenized:
0697                     tmp_msg += "to get proxy"
0698                 else:
0699                     tmp_msg += "to get access token"
0700                 tmp_log.debug(tmp_msg)
0701                 response = Protocol.Response(Protocol.SC_Perms, tmp_msg)
0702             elif (
0703                 tokenized
0704                 and target_distinguished_name in self.token_cache_config
0705                 and self.token_cache_config[target_distinguished_name].get("use_token_key") is True
0706                 and (
0707                     target_distinguished_name not in self.specialDispatchParams["tokenKeys"]
0708                     or token_key not in self.specialDispatchParams["tokenKeys"][target_distinguished_name]["fullList"]
0709                 )
0710             ):
0711                 # invalid token key
0712                 tmp_msg = f"failed since token key is invalid for {target_distinguished_name}"
0713                 tmp_log.debug(tmp_msg)
0714                 response = Protocol.Response(Protocol.SC_Invalid, tmp_msg)
0715             else:
0716                 # get proxy
0717                 response = Protocol.Response(Protocol.SC_Success, "")
0718                 tmp_status, tmp_msg = self.set_user_proxy(response, target_distinguished_name, role, tokenized)
0719                 if not tmp_status:
0720                     tmp_log.debug(tmp_msg)
0721                     response.appendNode("StatusCode", Protocol.SC_ProxyError)
0722                 else:
0723                     tmp_msg = "successful sent proxy"
0724                     tmp_log.debug(tmp_msg)
0725         # return
0726         return response.encode(True)
0727 
0728     # get active job attribute
0729     def getActiveJobAttributes(self, pandaID, attrs):
0730         return self.taskBuffer.getActiveJobAttributes(pandaID, attrs)
0731 
0732     # update job status
0733     def updateWorkerPilotStatus(self, workerID, harvesterID, status, timeout, accept_json, node_id):
0734         tmp_wrapper = _TimedMethod(self.taskBuffer.updateWorkerPilotStatus, timeout)
0735         tmp_wrapper.run(workerID, harvesterID, status, node_id)
0736 
0737         # make response
0738         if tmp_wrapper.result == Protocol.TimeOutToken:
0739             response = Protocol.Response(Protocol.SC_TimeOut)
0740         else:
0741             if tmp_wrapper.result:  # success
0742                 response = Protocol.Response(Protocol.SC_Success)
0743             else:  # error
0744                 response = Protocol.Response(Protocol.SC_Failed)
0745         _logger.debug(f"updateWorkerPilotStatus : {workerID} {harvesterID} {status} ret -> {response.encode(accept_json)}")
0746         return response.encode(accept_json)
0747 
0748     # get max workerID
0749     def get_max_worker_id(self, harvester_id):
0750         id = self.taskBuffer.get_max_worker_id(harvester_id)
0751         return json.dumps(id)
0752 
0753     def get_events_status(self, ids):
0754         ret = self.taskBuffer.get_events_status(ids)
0755         return json.dumps(ret)
0756 
0757 
0758 # Singleton
0759 jobDispatcher = JobDispatcher()
0760 del JobDispatcher
0761 
0762 
0763 # get FQANs
0764 def _getFQAN(req):
0765     fqans = []
0766     for tmp_key in req.subprocess_env:
0767         tmp_value = req.subprocess_env[tmp_key]
0768         # Scan VOMS attributes
0769         # compact style
0770         if tmp_key.startswith("GRST_CRED_") and tmp_value.startswith("VOMS"):
0771             fqan = tmp_value.split()[-1]
0772             fqans.append(fqan)
0773 
0774         # old style
0775         elif tmp_key.startswith("GRST_CONN_"):
0776             tmp_items = tmp_value.split(":")
0777             if len(tmp_items) == 2 and tmp_items[0] == "fqan":
0778                 fqans.append(tmp_items[-1])
0779 
0780     return fqans
0781 
0782 
0783 # check role
0784 def _checkRole(fqans, dn, withVomsPatch=True):
0785     production_manager = False
0786     try:
0787         # VOMS attributes of production and pilot roles
0788         production_attributes = [
0789             "/atlas/usatlas/Role=production",
0790             "/atlas/usatlas/Role=pilot",
0791             "/atlas/Role=production",
0792             "/atlas/Role=pilot",
0793             "/osg/Role=pilot",
0794             "^/[^/]+/Role=production$",
0795             "/ams/Role=pilot",
0796             "/Engage/LBNE/Role=pilot",
0797         ]
0798         if withVomsPatch:
0799             # FIXME once http://savannah.cern.ch/bugs/?47136 is solved
0800             production_attributes += ["/atlas/", "/osg/", "/cms/", "/ams/", "/Engage/LBNE/"]
0801 
0802         for fqan in fqans:
0803             # check atlas/usatlas production role
0804             if any(fqan.startswith(role_pattern) or re.search(role_pattern, fqan) for role_pattern in production_attributes):
0805                 production_manager = True
0806                 break
0807 
0808             # escape
0809             if production_manager:
0810                 break
0811 
0812         # check DN with pilot owners
0813         if not production_manager and dn not in [None]:
0814             for owner in set(panda_config.production_dns).union(panda_config.pilot_owners):
0815                 if owner and re.search(owner, dn) is not None:
0816                     production_manager = True
0817                     break
0818 
0819     except Exception:
0820         pass
0821 
0822     return production_manager
0823 
0824 
0825 # get DN
0826 def _getDN(req):
0827     realDN = None
0828     if "SSL_CLIENT_S_DN" in req.subprocess_env:
0829         realDN = req.subprocess_env["SSL_CLIENT_S_DN"]
0830         # remove redundant CN
0831         realDN = CoreUtils.get_bare_dn(realDN, keep_proxy=True)
0832     # return
0833     return realDN
0834 
0835 
0836 """
0837 web service interface
0838 
0839 """
0840 
0841 
0842 # get job
0843 def getJob(
0844     req,
0845     siteName,
0846     token=None,
0847     timeout=60,
0848     cpu=None,
0849     mem=None,
0850     diskSpace=None,
0851     prodSourceLabel=None,
0852     node=None,
0853     computingElement=None,
0854     AtlasRelease=None,
0855     prodUserID=None,
0856     getProxyKey=None,
0857     countryGroup=None,
0858     workingGroup=None,
0859     allowOtherCountry=None,
0860     taskID=None,
0861     nJobs=None,
0862     background=None,
0863     resourceType=None,
0864     harvester_id=None,
0865     worker_id=None,
0866     schedulerID=None,
0867     jobType=None,
0868     viaTopic=None,
0869     remaining_time=None,
0870 ):
0871     tmpLog = LogWrapper(_logger, f"getJob {naive_utcnow().isoformat('/')}")
0872     tmpLog.debug(siteName)
0873     # get DN
0874     realDN = _getDN(req)
0875     # get FQANs
0876     fqans = _getFQAN(req)
0877     # check production role
0878     if getProxyKey == "True":
0879         # don't use /atlas to prevent normal proxy getting credname
0880         prodManager = _checkRole(fqans, realDN, False)
0881     else:
0882         prodManager = _checkRole(fqans, realDN)
0883     # set DN for non-production user
0884     if not prodManager:
0885         prodUserID = realDN
0886     # allow getProxyKey for production role
0887     if getProxyKey == "True" and prodManager:
0888         getProxyKey = True
0889     else:
0890         getProxyKey = False
0891     # convert mem, diskSpace, remaining_time
0892     try:
0893         mem = int(float(mem))
0894         if mem < 0:
0895             mem = 0
0896     except Exception:
0897         mem = 0
0898     try:
0899         diskSpace = int(float(diskSpace))
0900         if diskSpace < 0:
0901             diskSpace = 0
0902     except Exception:
0903         diskSpace = 0
0904     try:
0905         remaining_time = max(0, int(remaining_time))
0906     except Exception:
0907         remaining_time = 0
0908     if background == "True":
0909         background = True
0910     else:
0911         background = False
0912     if viaTopic == "True":
0913         viaTopic = True
0914     else:
0915         viaTopic = False
0916     tmpLog.debug(
0917         f"{siteName},nJobs={nJobs},cpu={cpu},mem={mem},disk={diskSpace},source_label={prodSourceLabel},"
0918         f"node={node},ce={computingElement},rel={AtlasRelease},user={prodUserID},proxy={getProxyKey},"
0919         f"c_group={countryGroup},w_group={workingGroup},{allowOtherCountry},taskID={taskID},DN={realDN},"
0920         f"role={prodManager},FQAN={str(fqans)},json={req.acceptJson()},"
0921         f"bg={background},rt={resourceType},harvester_id={harvester_id},worker_id={worker_id},"
0922         f"schedulerID={schedulerID},jobType={jobType},viaTopic={viaTopic} remaining_time={remaining_time}"
0923     )
0924     try:
0925         dummyNumSlots = int(nJobs)
0926     except Exception:
0927         dummyNumSlots = 1
0928     if dummyNumSlots > 1:
0929         for iSlots in range(dummyNumSlots):
0930             _pilotReqLogger.info(f"method=getJob,site={siteName},node={node}_{iSlots},type={prodSourceLabel}")
0931     else:
0932         _pilotReqLogger.info(f"method=getJob,site={siteName},node={node},type={prodSourceLabel}")
0933     # invalid role
0934     if (not prodManager) and (prodSourceLabel not in ["user"]):
0935         tmpLog.warning("invalid role")
0936         if req.acceptJson():
0937             tmpMsg = "no production/pilot role in VOMS FQANs or non pilot owner"
0938         else:
0939             tmpMsg = None
0940         return Protocol.Response(Protocol.SC_Role, tmpMsg).encode(req.acceptJson())
0941     # invoke job dispatcher
0942     return jobDispatcher.getJob(
0943         siteName,
0944         prodSourceLabel,
0945         mem,
0946         diskSpace,
0947         node,
0948         int(timeout),
0949         computingElement,
0950         prodUserID,
0951         getProxyKey,
0952         realDN,
0953         taskID,
0954         nJobs,
0955         req.acceptJson(),
0956         background,
0957         resourceType,
0958         harvester_id,
0959         worker_id,
0960         schedulerID,
0961         jobType,
0962         viaTopic,
0963         remaining_time,
0964         tmpLog,
0965     )
0966 
0967 
0968 # update job status
0969 def updateJob(
0970     req,
0971     jobId,
0972     state,
0973     token=None,
0974     transExitCode=None,
0975     pilotErrorCode=None,
0976     pilotErrorDiag=None,
0977     timestamp=None,
0978     timeout=60,
0979     xml="",
0980     node=None,
0981     workdir=None,
0982     cpuConsumptionTime=None,
0983     cpuConsumptionUnit=None,
0984     remainingSpace=None,
0985     schedulerID=None,
0986     pilotID=None,
0987     siteName=None,
0988     messageLevel=None,
0989     pilotLog="",
0990     metaData="",
0991     cpuConversionFactor=None,
0992     exeErrorCode=None,
0993     exeErrorDiag=None,
0994     pilotTiming=None,
0995     computingElement=None,
0996     startTime=None,
0997     endTime=None,
0998     nEvents=None,
0999     nInputFiles=None,
1000     batchID=None,
1001     attemptNr=None,
1002     jobMetrics=None,
1003     stdout="",
1004     jobSubStatus=None,
1005     coreCount=None,
1006     maxRSS=None,
1007     maxVMEM=None,
1008     maxSWAP=None,
1009     maxPSS=None,
1010     avgRSS=None,
1011     avgVMEM=None,
1012     avgSWAP=None,
1013     avgPSS=None,
1014     totRCHAR=None,
1015     totWCHAR=None,
1016     totRBYTES=None,
1017     totWBYTES=None,
1018     rateRCHAR=None,
1019     rateWCHAR=None,
1020     rateRBYTES=None,
1021     rateWBYTES=None,
1022     corruptedFiles=None,
1023     meanCoreCount=None,
1024     cpu_architecture_level=None,
1025     grid=None,
1026     source_site=None,
1027     destination_site=None,
1028 ):
1029     tmp_log = LogWrapper(_logger, f"updateJob PandaID={jobId} PID={os.getpid()}")
1030     tmp_log.debug("start")
1031 
1032     # get DN, FQANs and roles
1033     realDN = _getDN(req)
1034     fqans = _getFQAN(req)
1035     prodManager = _checkRole(fqans, realDN)
1036     acceptJson = req.acceptJson()
1037 
1038     _logger.debug(
1039         f"updateJob({jobId},status={state},transExitCode={transExitCode},pilotErrorCode={pilotErrorCode},pilotErrorDiag={pilotErrorDiag},node={node},workdir={workdir},"
1040         f"cpuConsumptionTime={cpuConsumptionTime},cpuConsumptionUnit={cpuConsumptionUnit},cpu_architecture_level={cpu_architecture_level},remainingSpace={remainingSpace},"
1041         f"schedulerID={schedulerID},pilotID={pilotID},siteName={siteName},messageLevel={messageLevel},nEvents={nEvents},nInputFiles={nInputFiles},cpuConversionFactor={cpuConversionFactor},"
1042         f"exeErrorCode={exeErrorCode},exeErrorDiag={exeErrorDiag},pilotTiming={pilotTiming},computingElement={computingElement},startTime={startTime},endTime={endTime},batchID={batchID},"
1043         f"attemptNr:{attemptNr},jobSubStatus:{jobSubStatus},core:{coreCount},DN:{realDN},role:{prodManager},"
1044         f"FQAN:{fqans},maxRSS={maxRSS},maxVMEM={maxVMEM},maxSWAP={maxSWAP},"
1045         f"maxPSS={maxPSS},avgRSS={avgRSS},avgVMEM={avgVMEM},avgSWAP={avgSWAP},avgPSS={avgPSS},"
1046         f"totRCHAR={totRCHAR},totWCHAR={totWCHAR},totRBYTES={totRBYTES},totWBYTES={totWBYTES},rateRCHAR={rateRCHAR},"
1047         f"rateWCHAR={rateWCHAR},rateRBYTES={rateRBYTES},rateWBYTES={rateWBYTES},meanCoreCount={meanCoreCount},"
1048         f"grid={grid},source_site={source_site},destination_site={destination_site},"
1049         f"corruptedFiles={corruptedFiles}\n==XML==\n{xml}\n==LOG==\n{pilotLog[:1024]}\n==Meta==\n{metaData[:1024]}\n"
1050         f"==Metrics==\n{jobMetrics}\n==stdout==\n{stdout})"
1051     )
1052 
1053     _pilotReqLogger.debug(f"method=updateJob,site={siteName},node={node},type=None")
1054 
1055     # invalid role
1056     if not prodManager:
1057         tmp_log.warning(f"invalid role")
1058         tmpMsg = None
1059         if acceptJson:
1060             tmpMsg = "no production/pilot role in VOMS FQANs or non pilot owner"
1061         return Protocol.Response(Protocol.SC_Role, tmpMsg).encode(acceptJson)
1062 
1063     # aborting message
1064     if jobId == "NULL":
1065         return Protocol.Response(Protocol.SC_Success).encode(acceptJson)
1066 
1067     # check status
1068     if state not in [
1069         "running",
1070         "failed",
1071         "finished",
1072         "holding",
1073         "starting",
1074         "transferring",
1075     ]:
1076         tmp_log.warning(f"invalid state={state} for updateJob")
1077         return Protocol.Response(Protocol.SC_Success).encode(acceptJson)
1078 
1079     # create parameter map
1080     param = {}
1081     if cpuConsumptionTime not in [None, ""]:
1082         param["cpuConsumptionTime"] = cpuConsumptionTime
1083     if cpuConsumptionUnit is not None:
1084         param["cpuConsumptionUnit"] = cpuConsumptionUnit
1085     if cpu_architecture_level:
1086         try:
1087             param["cpu_architecture_level"] = cpu_architecture_level[:20]
1088         except Exception:
1089             tmp_log.error(f"invalid cpu_architecture_level={cpu_architecture_level} for updateJob")
1090             pass
1091     if node is not None:
1092         param["modificationHost"] = node[:128]
1093     if transExitCode is not None:
1094         try:
1095             int(transExitCode)
1096             param["transExitCode"] = transExitCode
1097         except Exception:
1098             pass
1099     if pilotErrorCode is not None:
1100         try:
1101             int(pilotErrorCode)
1102             param["pilotErrorCode"] = pilotErrorCode
1103         except Exception:
1104             pass
1105     if pilotErrorDiag is not None:
1106         param["pilotErrorDiag"] = pilotErrorDiag[:500]
1107     if jobMetrics is not None:
1108         param["jobMetrics"] = jobMetrics[:500]
1109     if schedulerID is not None:
1110         param["schedulerID"] = schedulerID
1111     if pilotID is not None:
1112         param["pilotID"] = pilotID[:200]
1113     if batchID is not None:
1114         param["batchID"] = batchID[:80]
1115     if exeErrorCode is not None:
1116         param["exeErrorCode"] = exeErrorCode
1117     if exeErrorDiag is not None:
1118         param["exeErrorDiag"] = exeErrorDiag[:500]
1119     if cpuConversionFactor is not None:
1120         param["cpuConversion"] = cpuConversionFactor
1121     if pilotTiming is not None:
1122         param["pilotTiming"] = pilotTiming
1123     if nEvents is not None:
1124         param["nEvents"] = nEvents
1125     if nInputFiles is not None:
1126         param["nInputFiles"] = nInputFiles
1127     if jobSubStatus not in [None, ""]:
1128         param["jobSubStatus"] = jobSubStatus
1129     if coreCount not in [None, ""]:
1130         param["actualCoreCount"] = coreCount
1131     if meanCoreCount:
1132         try:
1133             param["meanCoreCount"] = float(meanCoreCount)
1134         except Exception:
1135             pass
1136     if grid is not None:
1137         param["grid"] = grid
1138     if source_site is not None:
1139         param["sourceSite"] = source_site
1140     if destination_site is not None:
1141         param["destinationSite"] = destination_site
1142     if maxRSS is not None:
1143         param["maxRSS"] = maxRSS
1144     if maxVMEM is not None:
1145         param["maxVMEM"] = maxVMEM
1146     if maxSWAP is not None:
1147         param["maxSWAP"] = maxSWAP
1148     if maxPSS is not None:
1149         param["maxPSS"] = maxPSS
1150     if avgRSS is not None:
1151         param["avgRSS"] = int(float(avgRSS))
1152     if avgVMEM is not None:
1153         param["avgVMEM"] = int(float(avgVMEM))
1154     if avgSWAP is not None:
1155         param["avgSWAP"] = int(float(avgSWAP))
1156     if avgPSS is not None:
1157         param["avgPSS"] = int(float(avgPSS))
1158     if totRCHAR is not None:
1159         totRCHAR = int(totRCHAR) / 1024  # convert to kByte
1160         totRCHAR = min(10**10 - 1, totRCHAR)  # limit to 10 digit
1161         param["totRCHAR"] = totRCHAR
1162     if totWCHAR is not None:
1163         totWCHAR = int(totWCHAR) / 1024  # convert to kByte
1164         totWCHAR = min(10**10 - 1, totWCHAR)  # limit to 10 digit
1165         param["totWCHAR"] = totWCHAR
1166     if totRBYTES is not None:
1167         totRBYTES = int(totRBYTES) / 1024  # convert to kByte
1168         totRBYTES = min(10**10 - 1, totRBYTES)  # limit to 10 digit
1169         param["totRBYTES"] = totRBYTES
1170     if totWBYTES is not None:
1171         totWBYTES = int(totWBYTES) / 1024  # convert to kByte
1172         totWBYTES = min(10**10 - 1, totWBYTES)  # limit to 10 digit
1173         param["totWBYTES"] = totWBYTES
1174     if rateRCHAR is not None:
1175         rateRCHAR = min(10**10 - 1, int(float(rateRCHAR)))  # limit to 10 digit
1176         param["rateRCHAR"] = rateRCHAR
1177     if rateWCHAR is not None:
1178         rateWCHAR = min(10**10 - 1, int(float(rateWCHAR)))  # limit to 10 digit
1179         param["rateWCHAR"] = rateWCHAR
1180     if rateRBYTES is not None:
1181         rateRBYTES = min(10**10 - 1, int(float(rateRBYTES)))  # limit to 10 digit
1182         param["rateRBYTES"] = rateRBYTES
1183     if rateWBYTES is not None:
1184         rateWBYTES = min(10**10 - 1, int(float(rateWBYTES)))  # limit to 10 digit
1185         param["rateWBYTES"] = rateWBYTES
1186     if startTime is not None:
1187         try:
1188             param["startTime"] = datetime.datetime(*time.strptime(startTime, "%Y-%m-%d %H:%M:%S")[:6])
1189         except Exception:
1190             pass
1191     if endTime is not None:
1192         try:
1193             param["endTime"] = datetime.datetime(*time.strptime(endTime, "%Y-%m-%d %H:%M:%S")[:6])
1194         except Exception:
1195             pass
1196     if attemptNr is not None:
1197         try:
1198             attemptNr = int(attemptNr)
1199         except Exception:
1200             attemptNr = None
1201     if stdout != "":
1202         stdout = stdout[:2048]
1203     if corruptedFiles is not None:
1204         param["corruptedFiles"] = corruptedFiles
1205     # invoke JD
1206     tmp_log.debug("executing")
1207     return jobDispatcher.updateJob(
1208         int(jobId),
1209         state,
1210         int(timeout),
1211         xml,
1212         siteName,
1213         param,
1214         metaData,
1215         pilotLog,
1216         attemptNr,
1217         stdout,
1218         acceptJson,
1219     )
1220 
1221 
1222 # bulk update jobs
1223 def updateJobsInBulk(req, jobList, harvester_id=None):
1224     ret_list = []
1225     ret_val = False
1226     prefix = f"updateJobsInBulk {harvester_id}"
1227     tmp_logger = LogWrapper(_logger, prefix)
1228     tmp_logger.debug("start")
1229     t_start = naive_utcnow()
1230 
1231     try:
1232         job_list = json.loads(jobList)
1233         for job_dict in job_list:
1234             job_id = job_dict["jobId"]
1235             del job_dict["jobId"]
1236             state = job_dict["state"]
1237             del job_dict["state"]
1238             if "metaData" in job_dict:
1239                 job_dict["metaData"] = str(job_dict["metaData"])
1240             tmp_ret = updateJob(req, job_id, state, **job_dict)
1241             ret_list.append(tmp_ret)
1242         ret_val = True
1243     except Exception:
1244         err_type, err_value = sys.exc_info()[:2]
1245         tmp_msg = f"failed with {err_type.__name__} {err_value}"
1246         ret_list = f"{prefix} {tmp_msg}"
1247         tmp_logger.error(f"{tmp_msg}\n{traceback.format_exc()}")
1248 
1249     t_delta = naive_utcnow() - t_start
1250     tmp_logger.debug(f"took {t_delta.seconds}.{t_delta.microseconds // 1000:03d} sec")
1251     return json.dumps((ret_val, ret_list))
1252 
1253 
1254 # get job status
1255 def getStatus(req, ids, timeout=60):
1256     _logger.debug(f"getStatus({ids})")
1257     return jobDispatcher.getStatus(ids, int(timeout))
1258 
1259 
1260 # check job status
1261 def checkJobStatus(req, ids, timeout=60):
1262     return jobDispatcher.checkJobStatus(ids, int(timeout))
1263 
1264 
1265 # get a list of even ranges for a PandaID
1266 def getEventRanges(
1267     req,
1268     pandaID,
1269     jobsetID,
1270     taskID=None,
1271     nRanges=10,
1272     timeout=60,
1273     scattered=None,
1274     segment_id=None,
1275 ):
1276     """
1277     Check the permissions and retrieve a list of event ranges for a given PandaID.
1278 
1279     Args:
1280         req: The request object containing the environment variables.
1281         pandaID (str): The ID of the Panda job.
1282         jobsetID (str): The ID of the job set.
1283         taskID (str, optional): The ID of the task. Defaults to None.
1284         nRanges (int, optional): The number of event ranges to retrieve. Defaults to 10.
1285         timeout (int, optional): The timeout value. Defaults to 60.
1286         scattered (str, optional): Whether the event ranges are scattered. Defaults to None.
1287         segment_id (int, optional): The segment ID. Defaults to None.
1288     Returns:
1289         dict: The response from the job dispatcher.
1290     """
1291     tmp_log = LogWrapper(_logger, f"getEventRanges(PandaID={pandaID} jobsetID={jobsetID} taskID={taskID},nRanges={nRanges},segment={segment_id})")
1292     tmp_log.debug("start")
1293 
1294     tmp_stat, tmp_out = checkPilotPermission(req)
1295     if not tmp_stat:
1296         tmp_log.error(f"failed with {tmp_out}")
1297         return tmp_out
1298 
1299     if scattered == "True":
1300         scattered = True
1301     else:
1302         scattered = False
1303 
1304     if segment_id is not None:
1305         segment_id = int(segment_id)
1306 
1307     return jobDispatcher.getEventRanges(
1308         pandaID,
1309         jobsetID,
1310         taskID,
1311         nRanges,
1312         int(timeout),
1313         req.acceptJson(),
1314         scattered,
1315         segment_id,
1316     )
1317 
1318 
1319 def updateEventRange(
1320     req,
1321     eventRangeID,
1322     eventStatus,
1323     coreCount=None,
1324     cpuConsumptionTime=None,
1325     objstoreID=None,
1326     timeout=60,
1327     pandaID=None,
1328 ):
1329     """
1330     Check the permissions and update the status of a specific event range.
1331 
1332     Args:
1333         req: The request object containing the environment variables.
1334         eventRangeID (str): The ID of the event range to update.
1335         eventStatus (str): The new status of the event range.
1336         coreCount (int, optional): The number of cores used. Defaults to None.
1337         cpuConsumptionTime (float, optional): The CPU consumption time. Defaults to None.
1338         objstoreID (int, optional): The object store ID. Defaults to None.
1339         timeout (int, optional): The timeout value. Defaults to 60.
1340         pandaID (str, optional): The PandaID. Defaults to None.
1341 
1342     Returns:
1343         dict: The response from the job dispatcher.
1344     """
1345     tmp_log = LogWrapper(
1346         _logger, f"updateEventRange({eventRangeID} status={eventStatus} coreCount={coreCount} cpuConsumptionTime={cpuConsumptionTime} osID={objstoreID})"
1347     )
1348     tmp_log.debug("start")
1349 
1350     tmp_stat, tmp_out = checkPilotPermission(req)
1351     if not tmp_stat:
1352         tmp_log.error(f"failed with {tmp_out}")
1353         return tmp_out
1354 
1355     return jobDispatcher.updateEventRange(
1356         eventRangeID,
1357         eventStatus,
1358         coreCount,
1359         cpuConsumptionTime,
1360         objstoreID,
1361         int(timeout),
1362     )
1363 
1364 
1365 def updateEventRanges(req, eventRanges, timeout=120, version=0, pandaID=None):
1366     """
1367     This function checks the permissions, converts the version to an integer, and updates the event ranges.
1368 
1369     Args:
1370         req: The request object containing the environment variables.
1371         eventRanges (str): A JSON string containing the list of event ranges to update.
1372         timeout (int, optional): The timeout value. Defaults to 120.
1373         version (int, optional): The version of the event ranges. Defaults to 0.
1374         pandaID (str, optional): The PandaID. Defaults to None.
1375     Returns:
1376         dict: The response from the job dispatcher.
1377     """
1378     tmp_log = LogWrapper(_logger, f"updateEventRanges({eventRanges})")
1379     tmp_log.debug("start")
1380 
1381     tmp_stat, tmp_out = checkPilotPermission(req)
1382     if not tmp_stat:
1383         tmp_log.error(f"failed with {tmp_out}")
1384         return tmp_out
1385 
1386     try:
1387         version = int(version)
1388     except Exception:
1389         version = 0
1390 
1391     return jobDispatcher.updateEventRanges(eventRanges, int(timeout), req.acceptJson(), version)
1392 
1393 
1394 def checkEventsAvailability(req, pandaID, jobsetID, taskID, timeout=60):
1395     """
1396     This function checks the availability of events for a given PandaID, jobsetID, and taskID.
1397 
1398     Args:
1399         req: The request object containing the environment variables.
1400         pandaID (str): The PandaID.
1401         jobset_id (str): The jobsetID.
1402         task_id (str): The taskID.
1403         timeout (int, optional): The timeout value. Defaults to 60.
1404     Returns:
1405         bool: The availability status of the events.
1406     """
1407     tmp_log = LogWrapper(_logger, f"check_events_availability(panda_id={pandaID} jobset_id={jobsetID} task_id={taskID})")
1408     tmp_log.debug("start")
1409     tmp_stat, tmp_out = checkPilotPermission(req)
1410     if not tmp_stat:
1411         tmp_log.error(f"failed with {tmp_out}")
1412 
1413     return jobDispatcher.checkEventsAvailability(pandaID, jobsetID, taskID, timeout)
1414 
1415 
1416 def getKeyPair(req, publicKeyName, privateKeyName):
1417     """
1418     This function retrieves the distinguished name (DN) from the request and uses it to get a key pair.
1419 
1420     Args:
1421         req: The request object containing the environment variables.
1422         publicKeyName (str): The name of the public key.
1423         privateKeyName (str): The name of the private key.
1424 
1425     Returns:
1426         dict: The key pair for the user.
1427     """
1428     real_dn = _getDN(req)
1429     return jobDispatcher.getKeyPair(real_dn, publicKeyName, privateKeyName, req.acceptJson())
1430 
1431 
1432 def getProxy(req, role=None, dn=None):
1433     """
1434     Get proxy for a user with a role.
1435 
1436     Args:
1437         req: The request object containing the environment variables.
1438         role (str, optional): The role of the user. Defaults to None.
1439         dn (str, optional): The distinguished name of the user. Defaults to None.
1440     Returns:
1441         dict: The proxy for the user.
1442     """
1443     real_dn = _getDN(req)
1444     if role == "":
1445         role = None
1446     return jobDispatcher.get_proxy(real_dn, role, dn, False, None)
1447 
1448 
1449 def get_access_token(req, client_name, token_key=None):
1450     """
1451     This function retrieves the distinguished name (DN) from the request and uses it to get an access token for the specified client.
1452 
1453     Args:
1454         req: The request object containing the environment variables.
1455         client_name (str): The name of the client requesting the access token.
1456         token_key (str, optional): The key to get the token from the token cache. Defaults to None.
1457     Returns:
1458         dict: The access token for the client.
1459     """
1460     # get DN
1461     real_dn = _getDN(req)
1462     return jobDispatcher.get_proxy(real_dn, None, client_name, True, token_key)
1463 
1464 
1465 def get_token_key(req, client_name):
1466     """
1467     This function retrieves the distinguished name (DN) from the request and uses it to get a token key for the specified client.
1468 
1469     Args:
1470         req: The request object containing the environment variables.
1471         client_name (str): The name of the client requesting the token key.
1472     Returns:
1473         str: The token key for the client.
1474     """
1475     # get DN
1476     real_dn = _getDN(req)
1477     return jobDispatcher.get_token_key(real_dn, client_name, req.acceptJson())
1478 
1479 
1480 def checkPilotPermission(req):
1481     """
1482     This function retrieves the distinguished name (DN) and Fully Qualified Attribute Names (FQANs) from the request,
1483     checks if the user has a production role, and verifies the DN.
1484 
1485     Args:
1486         req: The request object containing the environment variables.
1487     Returns:
1488         tuple: A tuple containing a boolean indicating success and a message.
1489     """
1490     # get DN
1491     real_dn = _getDN(req)
1492     if real_dn is None:
1493         return False, "failed to retrieve DN"
1494 
1495     # get FQANs and check production role
1496     fqans = _getFQAN(req)
1497     prod_manager = _checkRole(fqans, real_dn, True)
1498     if not prod_manager:
1499         return False, "production or pilot role is required"
1500 
1501     return True, None
1502 
1503 
1504 def getCommands(req, harvester_id, n_commands, timeout=30):
1505     """
1506     This function checks the permissions and retrieves the commands for a specified harvester instance.
1507 
1508     Args:
1509         req: The request object containing the environment variables.
1510         harvester_id (str): The ID of the harvester instance.
1511         n_commands (int): The number of commands to retrieve.
1512         timeout (int, optional): The timeout value. Defaults to 30.
1513     Returns:
1514         dict: The response from the job dispatcher.
1515     """
1516     tmp_str = "getCommands"
1517 
1518     # check permissions
1519     tmp_stat, tmp_out = checkPilotPermission(req)
1520     if not tmp_stat:
1521         _logger.error(f"{tmp_str} failed with {tmp_out}")
1522 
1523     accept_json = req.acceptJson()
1524     # retrieve the commands
1525     return jobDispatcher.getCommands(harvester_id, n_commands, timeout, accept_json)
1526 
1527 
1528 def ackCommands(req, command_ids, timeout=30):
1529     """
1530     This function checks the permissions, parses the command IDs from JSON, and acknowledges the list of commands.
1531 
1532     Args:
1533         req: The request object containing the environment variables.
1534         command_ids (str): A JSON string containing the list of command IDs to acknowledge.
1535         timeout (int, optional): The timeout value. Defaults to 30.
1536     Returns:
1537         dict: The response from the job dispatcher.
1538     """
1539     tmp_str = "ackCommands"
1540 
1541     # check permissions
1542     tmp_stat, tmp_out = checkPilotPermission(req)
1543     if not tmp_stat:
1544         _logger.error(f"{tmp_str} failed with {tmp_out}")
1545 
1546     command_ids = json.loads(command_ids)
1547     accept_json = req.acceptJson()
1548 
1549     # retrieve the commands
1550     return jobDispatcher.ackCommands(command_ids, timeout, accept_json)
1551 
1552 
1553 def getResourceTypes(req, timeout=30):
1554     """
1555     This function retrieves the resource types (MCORE, SCORE, etc.) and their definitions.
1556 
1557     Args:
1558         req: The request object containing the environment variables.
1559         timeout (int, optional): The timeout value. Defaults to 30.
1560     Returns:
1561         dict: The resource types and their definitions.
1562     """
1563     tmp_str = "getResourceTypes"
1564 
1565     # check permissions
1566     tmp_stat, tmp_out = checkPilotPermission(req)
1567     if not tmp_stat:
1568         _logger.error(f"{tmp_str} failed with {tmp_out}")
1569 
1570     accept_json = req.acceptJson()
1571 
1572     # retrieve the resource types
1573     return jobDispatcher.getResourceTypes(timeout, accept_json)
1574 
1575 
1576 def updateWorkerPilotStatus(req, site, workerID, harvesterID, status, timeout=60, node_id=None):
1577     """
1578     Update the status of a worker according to the pilot.
1579 
1580     This function validates the pilot permissions and the state passed by the pilot, then updates the worker status.
1581 
1582     Args:
1583         req: The request object containing the environment variables.
1584         site (str): The site name.
1585         workerID (str): The worker ID.
1586         harvesterID (str): The harvester ID.
1587         status (str): The status of the worker. Must be either 'started' or 'finished'.
1588         timeout (int, optional): The timeout value. Defaults to 60.
1589         node_id (str, optional): The node ID. Defaults to None.
1590 
1591     Returns:
1592         str: The result of the status update or an error message.
1593     """
1594     tmp_log = LogWrapper(
1595         _logger,
1596         f"updateWorkerPilotStatus workerID={workerID} harvesterID={harvesterID} status={status} nodeID={node_id} PID={os.getpid()}",
1597     )
1598     tmp_log.debug("start")
1599 
1600     # validate the pilot permissions
1601     tmp_stat, tmp_out = checkPilotPermission(req)
1602     if not tmp_stat:
1603         tmp_log.error(f"failed with {tmp_out}")
1604         return tmp_out
1605 
1606     # validate the state passed by the pilot
1607     valid_states = ("started", "finished")
1608     if status not in valid_states:
1609         message = f"Invalid worker state. The worker state has to be in {valid_states}"
1610         tmp_log.debug(message)
1611         return message
1612 
1613     accept_json = req.acceptJson()
1614 
1615     return jobDispatcher.updateWorkerPilotStatus(workerID, harvesterID, status, timeout, accept_json, node_id)
1616 
1617 
1618 # get max workerID
1619 def get_max_worker_id(req, harvester_id):
1620     return jobDispatcher.get_max_worker_id(harvester_id)
1621 
1622 
1623 # get events status
1624 def get_events_status(req, ids):
1625     return jobDispatcher.get_events_status(ids)