Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-10 08:39:07

0001 """
0002 provide web interface to users
0003 
0004 """
0005 
0006 import datetime
0007 import json
0008 import re
0009 import sys
0010 import time
0011 import traceback
0012 
0013 from pandacommon.pandalogger.LogWrapper import LogWrapper
0014 from pandacommon.pandalogger.PandaLogger import PandaLogger
0015 from pandacommon.pandautils.PandaUtils import naive_utcnow
0016 
0017 import pandaserver.jobdispatcher.Protocol as Protocol
0018 from pandaserver.brokerage.SiteMapper import SiteMapper
0019 from pandaserver.config import panda_config
0020 from pandaserver.dataservice.ddm import rucioAPI
0021 from pandaserver.srvcore import CoreUtils
0022 from pandaserver.srvcore.CoreUtils import clean_user_id, resolve_bool
0023 from pandaserver.taskbuffer import JobUtils, PrioUtil
0024 from pandaserver.taskbuffer.JediTaskSpec import JediTaskSpec
0025 from pandaserver.taskbuffer.WrappedPickle import WrappedPickle
0026 
0027 try:
0028     import idds.common.constants
0029     import idds.common.utils
0030     from idds.client.client import Client as iDDS_Client
0031     from idds.client.clientmanager import ClientManager as iDDS_ClientManager
0032 except ImportError:
0033     pass
0034 
0035 MESSAGE_SSL = "SSL secure connection is required"
0036 MESSAGE_PROD_ROLE = "production or pilot role required"
0037 MESSAGE_TASK_ID = "jediTaskID must be an integer"
0038 MESSAGE_DATABASE = "database error in the PanDA server"
0039 MESSAGE_JSON = "failed to load JSON"
0040 
0041 CODE_SSL = 100
0042 CODE_LOGIC = 101
0043 CODE_OTHER_PARAMS = 102
0044 
0045 
0046 _logger = PandaLogger().getLogger("UserIF")
0047 
0048 
0049 def resolve_true(variable):
0050     return variable == "True"
0051 
0052 
0053 def resolve_false(variable):
0054     return variable != "False"
0055 
0056 
0057 # main class
0058 class UserIF:
0059     # constructor
0060     def __init__(self):
0061         self.taskBuffer = None
0062 
0063     # initialize
0064     def init(self, taskBuffer):
0065         self.taskBuffer = taskBuffer
0066 
0067     # submit jobs
0068     def submitJobs(self, jobsStr, user, host, userFQANs, prodRole=False, toPending=False):
0069         try:
0070             # deserialize jobspecs
0071             try:
0072                 jobs = WrappedPickle.loads(jobsStr)
0073             except Exception:
0074                 jobs = JobUtils.load_jobs_json(jobsStr)
0075             _logger.debug(f"submitJobs {user} len:{len(jobs)} prodRole={prodRole} FQAN:{str(userFQANs)}")
0076             maxJobs = 5000
0077             if len(jobs) > maxJobs:
0078                 _logger.error(f"submitJobs: too many jobs more than {maxJobs}")
0079                 jobs = jobs[:maxJobs]
0080         except Exception as ex:
0081             _logger.error(f"submitJobs : {str(ex)} {traceback.format_exc()}")
0082             jobs = []
0083         # check prodSourceLabel
0084         try:
0085             good_labels = True
0086             for tmpJob in jobs:
0087                 # check production role
0088                 if tmpJob.prodSourceLabel in ["managed"] and not prodRole:
0089                     good_labels = False
0090                     good_labels_message = f"submitJobs {user} missing prod-role for prodSourceLabel={tmpJob.prodSourceLabel}"
0091                     break
0092 
0093                 # check the job_label is valid
0094                 if tmpJob.job_label not in [None, "", "NULL"] and tmpJob.job_label not in JobUtils.job_labels:
0095                     good_labels = False
0096                     good_labels_message = f"submitJobs {user} wrong job_label={tmpJob.job_label}"
0097                     break
0098         except Exception:
0099             err_type, err_value = sys.exc_info()[:2]
0100             _logger.error(f"submitJobs : checking good_labels {err_type} {err_value}")
0101             good_labels = False
0102 
0103         # reject injection for error with the labels
0104         if not good_labels:
0105             _logger.error(good_labels_message)
0106             return f"ERROR: {good_labels_message}"
0107 
0108         job0 = None
0109 
0110         # get user VO
0111         userVO = "atlas"
0112         try:
0113             job0 = jobs[0]
0114             if job0.VO not in [None, "", "NULL"]:
0115                 userVO = job0.VO
0116         except (IndexError, AttributeError) as e:
0117             _logger.error(f"submitJobs : checking userVO. userVO not found, defaulting to {userVO}. (Exception {e})")
0118 
0119         # atlas jobs require FQANs
0120         if userVO == "atlas" and userFQANs == []:
0121             _logger.error(f"submitJobs : VOMS FQANs are missing in your proxy. They are required for {userVO}")
0122             # return "ERROR: VOMS FQANs are missing. They are required for {0}".format(userVO)
0123 
0124         # get LSST pipeline username
0125         if userVO.lower() == "lsst":
0126             try:
0127                 if job0.prodUserName and job0.prodUserName.lower() != "none":
0128                     user = job0.prodUserName
0129             except AttributeError:
0130                 _logger.error(f"submitJobs : checking username for userVO {userVO}: username not found, defaulting to {user}.")
0131 
0132         # store jobs
0133         ret = self.taskBuffer.storeJobs(
0134             jobs,
0135             user,
0136             fqans=userFQANs,
0137             hostname=host,
0138             toPending=toPending,
0139             userVO=userVO,
0140         )
0141         _logger.debug(f"submitJobs {user} ->:{len(ret)}")
0142 
0143         # serialize
0144         return WrappedPickle.dumps(ret)
0145 
0146     # set debug mode
0147     def setDebugMode(self, dn, pandaID, prodManager, modeOn, workingGroup):
0148         ret = self.taskBuffer.setDebugMode(dn, pandaID, prodManager, modeOn, workingGroup)
0149         # return
0150         return ret
0151 
0152     # insert sandbox file info
0153     def insertSandboxFileInfo(self, userName, hostName, fileName, fileSize, checkSum):
0154         ret = self.taskBuffer.insertSandboxFileInfo(userName, hostName, fileName, fileSize, checkSum)
0155         # return
0156         return ret
0157 
0158     # check duplicated sandbox file
0159     def checkSandboxFile(self, userName, fileSize, checkSum):
0160         ret = self.taskBuffer.checkSandboxFile(userName, fileSize, checkSum)
0161         # return
0162         return ret
0163 
0164     # get job status
0165     def getJobStatus(self, idsStr, use_json, no_pickle=False):
0166         try:
0167             # deserialize IDs
0168             if use_json or no_pickle:
0169                 ids = json.loads(idsStr)
0170             else:
0171                 ids = WrappedPickle.loads(idsStr)
0172             _logger.debug(f"getJobStatus len   : {len(ids)}")
0173             maxIDs = 5500
0174             if len(ids) > maxIDs:
0175                 _logger.error(f"too long ID list more than {maxIDs}")
0176                 ids = ids[:maxIDs]
0177         except Exception:
0178             type, value, traceBack = sys.exc_info()
0179             _logger.error(f"getJobStatus : {type} {value}")
0180             ids = []
0181         _logger.debug(f"getJobStatus start : {str(ids)} json={use_json}")
0182         # peek jobs
0183         ret = self.taskBuffer.peekJobs(ids, use_json=use_json)
0184         _logger.debug("getJobStatus end")
0185         # serialize
0186         if use_json:
0187             return json.dumps(ret)
0188         if no_pickle:
0189             return JobUtils.dump_jobs_json(ret)
0190         return WrappedPickle.dumps(ret)
0191 
0192     # get PandaIDs with TaskID
0193     def getPandaIDsWithTaskID(self, jediTaskID):
0194         # get PandaIDs
0195         ret = self.taskBuffer.getPandaIDsWithTaskID(jediTaskID)
0196         # serialize
0197         return WrappedPickle.dumps(ret)
0198 
0199     # get job statistics per site
0200     def getJobStatisticsPerSite(self):
0201         ret = self.taskBuffer.getJobStatistics()
0202         return WrappedPickle.dumps(ret, convert_to_safe=True)
0203 
0204     # get job statistics per site, source label, and resource type
0205     def get_job_statistics_per_site_label_resource(self, time_window):
0206         ret = self.taskBuffer.get_job_statistics_per_site_label_resource(time_window)
0207         return json.dumps(ret)
0208 
0209     # kill jobs
0210     def killJobs(self, idsStr, user, host, code, prodManager, useMailAsID, fqans, killOpts=[]):
0211         # deserialize IDs
0212         ids = WrappedPickle.loads(idsStr)
0213         if not isinstance(ids, list):
0214             ids = [ids]
0215         _logger.info(f"killJob : {user} {code} {prodManager} {fqans} {ids}")
0216         try:
0217             if useMailAsID:
0218                 _logger.debug(f"killJob : getting mail address for {user}")
0219                 nTry = 3
0220                 for iDDMTry in range(nTry):
0221                     status, userInfo = rucioAPI.finger(user)
0222                     if status:
0223                         _logger.debug(f"killJob : {user} is converted to {userInfo['email']}")
0224                         user = userInfo["email"]
0225                         break
0226                     time.sleep(1)
0227         except Exception:
0228             err_type, err_value = sys.exc_info()[:2]
0229             _logger.error(f"killJob : failed to convert email address {user} : {err_type} {err_value}")
0230         # get working groups with prod role
0231         wgProdRole = []
0232         for fqan in fqans:
0233             tmpMatch = re.search("/atlas/([^/]+)/Role=production", fqan)
0234             if tmpMatch is not None:
0235                 # ignore usatlas since it is used as atlas prod role
0236                 tmpWG = tmpMatch.group(1)
0237                 if tmpWG not in ["", "usatlas"] + wgProdRole:
0238                     wgProdRole.append(tmpWG)
0239                     # group production
0240                     wgProdRole.append(f"gr_{tmpWG}")
0241         # kill jobs
0242         ret = self.taskBuffer.killJobs(ids, user, code, prodManager, wgProdRole, killOpts)
0243         return WrappedPickle.dumps(ret)
0244 
0245     # reassign jobs
0246     def reassignJobs(self, idsStr, user, host, forPending, firstSubmission):
0247         # deserialize IDs
0248         ids = WrappedPickle.loads(idsStr)
0249         # reassign jobs
0250         ret = self.taskBuffer.reassignJobs(
0251             ids,
0252             forPending=forPending,
0253             firstSubmission=firstSubmission,
0254         )
0255         return WrappedPickle.dumps(ret)
0256 
0257     # get script for offline running
0258     def getScriptOfflineRunning(self, pandaID, days=None):
0259         ret = self.taskBuffer.getScriptOfflineRunning(pandaID, days)
0260         return ret
0261 
0262     # get ban users
0263     def get_ban_users(self):
0264         ret = self.taskBuffer.get_ban_users()
0265         return json.dumps(ret)
0266 
0267     # get active JediTasks in a time range
0268     def getJediTasksInTimeRange(self, dn, timeRange, fullFlag, minTaskID, task_type):
0269         ret = self.taskBuffer.getJediTasksInTimeRange(dn, timeRange, fullFlag, minTaskID, task_type)
0270         return WrappedPickle.dumps(ret)
0271 
0272     # get details of JediTask
0273     def getJediTaskDetails(self, jediTaskID, fullFlag, withTaskInfo):
0274         ret = self.taskBuffer.getJediTaskDetails(jediTaskID, fullFlag, withTaskInfo)
0275         return WrappedPickle.dumps(ret)
0276 
0277     # get full job status
0278     def getFullJobStatus(self, idsStr, dn):
0279         try:
0280             # deserialize jobspecs
0281             ids = WrappedPickle.loads(idsStr)
0282             # truncate
0283             maxIDs = 5500
0284             if len(ids) > maxIDs:
0285                 _logger.error(f"getFullJobStatus: too long ID list more than {maxIDs}")
0286                 ids = ids[:maxIDs]
0287         except Exception:
0288             type, value, traceBack = sys.exc_info()
0289             _logger.error(f"getFullJobStatus : {type} {value}")
0290             ids = []
0291         _logger.debug(f"getFullJobStatus start : {dn} {str(ids)}")
0292         # peek jobs
0293         ret = self.taskBuffer.getFullJobStatus(ids)
0294         _logger.debug("getFullJobStatus end")
0295         return WrappedPickle.dumps(ret)
0296 
0297     # insert task params
0298     def insertTaskParams(self, taskParams, user, prodRole, fqans, properErrorCode, parent_tid):
0299         ret = self.taskBuffer.insertTaskParamsPanda(
0300             taskParams,
0301             user,
0302             prodRole,
0303             fqans,
0304             properErrorCode=properErrorCode,
0305             parent_tid=parent_tid,
0306         )
0307         return ret
0308 
0309     # kill task
0310     def killTask(self, jediTaskID, user, prodRole, properErrorCode, broadcast):
0311         ret = self.taskBuffer.sendCommandTaskPanda(
0312             jediTaskID,
0313             user,
0314             prodRole,
0315             "kill",
0316             properErrorCode=properErrorCode,
0317             broadcast=broadcast,
0318         )
0319         return ret
0320 
0321     # finish task
0322     def finishTask(self, jediTaskID, user, prodRole, properErrorCode, qualifier, broadcast):
0323         ret = self.taskBuffer.sendCommandTaskPanda(
0324             jediTaskID,
0325             user,
0326             prodRole,
0327             "finish",
0328             properErrorCode=properErrorCode,
0329             comQualifier=qualifier,
0330             broadcast=broadcast,
0331         )
0332         return ret
0333 
0334     # reload input
0335     def reloadInput(self, jediTaskID, user, prodRole, ignore_hard_exhausted):
0336         if not prodRole and ignore_hard_exhausted:
0337             ignore_hard_exhausted = False
0338         com_qualifier = JediTaskSpec.get_retry_command_qualifiers(ignore_hard_exhausted=ignore_hard_exhausted)
0339         com_comment = json.dumps([{}, com_qualifier])
0340         ret = self.taskBuffer.sendCommandTaskPanda(jediTaskID, user, prodRole, "incexec", comComment=com_comment, properErrorCode=True)
0341         return ret
0342 
0343     # retry task
0344     def retryTask(
0345         self,
0346         jediTaskID,
0347         user,
0348         prodRole,
0349         properErrorCode,
0350         newParams,
0351         noChildRetry,
0352         discardEvents,
0353         disable_staging_mode,
0354         keep_gshare_priority,
0355         ignore_hard_exhausted,
0356     ):
0357         # retry with new params
0358         if newParams is not None:
0359             try:
0360                 # convert to dict
0361                 newParams = PrioUtil.decodeJSON(newParams)
0362                 # get original params
0363                 taskParams = self.taskBuffer.getTaskParamsPanda(jediTaskID)
0364                 taskParamsJson = PrioUtil.decodeJSON(taskParams)
0365                 # replace with new values
0366                 for newKey in newParams:
0367                     newVal = newParams[newKey]
0368                     taskParamsJson[newKey] = newVal
0369                 taskParams = json.dumps(taskParamsJson)
0370                 # retry with new params
0371                 ret = self.taskBuffer.insertTaskParamsPanda(
0372                     taskParams,
0373                     user,
0374                     prodRole,
0375                     [],
0376                     properErrorCode=properErrorCode,
0377                     allowActiveTask=True,
0378                 )
0379             except Exception:
0380                 err_type, err_value = sys.exc_info()[:2]
0381                 ret = 1, f"server error with {err_type}:{err_value}"
0382         else:
0383             if not prodRole and ignore_hard_exhausted:
0384                 ignore_hard_exhausted = False
0385             com_qualifier = JediTaskSpec.get_retry_command_qualifiers(
0386                 noChildRetry, discardEvents, disable_staging_mode, keep_gshare_priority, ignore_hard_exhausted
0387             )
0388             com_qualifier = " ".join(com_qualifier)
0389             # normal retry
0390             ret = self.taskBuffer.sendCommandTaskPanda(
0391                 jediTaskID,
0392                 user,
0393                 prodRole,
0394                 "retry",
0395                 properErrorCode=properErrorCode,
0396                 comQualifier=com_qualifier,
0397             )
0398         if properErrorCode is True and ret[0] == 5:
0399             # retry failed analysis jobs
0400             jobdefList = self.taskBuffer.getJobdefIDsForFailedJob(jediTaskID)
0401             cUID = CoreUtils.clean_user_id(user)
0402             for jobID in jobdefList:
0403                 self.taskBuffer.finalizePendingJobs(cUID, jobID)
0404             self.taskBuffer.increaseAttemptNrPanda(jediTaskID, 5)
0405             return_str = f"retry has been triggered for failed jobs while the task is still {ret[1]}"
0406             if newParams is None:
0407                 ret = 0, return_str
0408             else:
0409                 ret = 3, return_str
0410         return ret
0411 
0412     # reassign task
0413     def reassignTask(self, jediTaskID, user, prodRole, comComment):
0414         ret = self.taskBuffer.sendCommandTaskPanda(
0415             jediTaskID,
0416             user,
0417             prodRole,
0418             "reassign",
0419             comComment=comComment,
0420             properErrorCode=True,
0421         )
0422         return ret
0423 
0424     # pause task
0425     def pauseTask(self, jediTaskID, user, prodRole):
0426         ret = self.taskBuffer.sendCommandTaskPanda(jediTaskID, user, prodRole, "pause", properErrorCode=True)
0427         return ret
0428 
0429     # resume task
0430     def resumeTask(self, jediTaskID, user, prodRole):
0431         ret = self.taskBuffer.sendCommandTaskPanda(jediTaskID, user, prodRole, "resume", properErrorCode=True)
0432         return ret
0433 
0434     # force avalanche for task
0435     def avalancheTask(self, jediTaskID, user, prodRole):
0436         ret = self.taskBuffer.sendCommandTaskPanda(jediTaskID, user, prodRole, "avalanche", properErrorCode=True)
0437         return ret
0438 
0439     # send command to task
0440     def send_command_to_task(self, jedi_task_id, user, prod_role, command_string):
0441         ret = self.taskBuffer.sendCommandTaskPanda(jedi_task_id, user, prod_role, command_string, properErrorCode=True)
0442         return ret
0443 
0444     # change task priority
0445     def changeTaskPriority(self, jediTaskID, newPriority):
0446         ret = self.taskBuffer.changeTaskPriorityPanda(jediTaskID, newPriority)
0447         return ret
0448 
0449     # increase attempt number for unprocessed files
0450     def increaseAttemptNrPanda(self, jediTaskID, increasedNr):
0451         ret = self.taskBuffer.increaseAttemptNrPanda(jediTaskID, increasedNr)
0452         return ret
0453 
0454     # change task attribute
0455     def changeTaskAttributePanda(self, jediTaskID, attrName, attrValue):
0456         ret = self.taskBuffer.changeTaskAttributePanda(jediTaskID, attrName, attrValue)
0457         return ret
0458 
0459     # change split rule for task
0460     def changeTaskSplitRulePanda(self, jediTaskID, attrName, attrValue):
0461         ret = self.taskBuffer.changeTaskSplitRulePanda(jediTaskID, attrName, attrValue)
0462         return ret
0463 
0464     # reactivate task
0465     def reactivateTask(self, jediTaskID, keep_attempt_nr, trigger_job_generation):
0466         ret = self.taskBuffer.reactivateTask(jediTaskID, keep_attempt_nr, trigger_job_generation)
0467         return ret
0468 
0469     # get task status
0470     def getTaskStatus(self, jediTaskID):
0471         ret = self.taskBuffer.getTaskStatus(jediTaskID)
0472         return ret[0]
0473 
0474     # reassign share
0475     def reassignShare(self, jedi_task_ids, share_dest, reassign_running):
0476         return self.taskBuffer.reassignShare(jedi_task_ids, share_dest, reassign_running)
0477 
0478     # get taskParamsMap
0479     def getTaskParamsMap(self, jediTaskID):
0480         # get taskParamsMap
0481         ret = self.taskBuffer.getTaskParamsMap(jediTaskID)
0482         return ret
0483 
0484     # update workers
0485     def updateWorkers(self, user, host, harvesterID, data):
0486         ret = self.taskBuffer.updateWorkers(harvesterID, data)
0487         if ret is None:
0488             return_value = (False, MESSAGE_DATABASE)
0489         else:
0490             return_value = (True, ret)
0491         return json.dumps(return_value)
0492 
0493     # update workers
0494     def updateServiceMetrics(self, user, host, harvesterID, data):
0495         ret = self.taskBuffer.updateServiceMetrics(harvesterID, data)
0496         if ret is None:
0497             return_value = (False, MESSAGE_DATABASE)
0498         else:
0499             return_value = (True, ret)
0500         return json.dumps(return_value)
0501 
0502     # add harvester dialog messages
0503     def addHarvesterDialogs(self, user, harvesterID, dialogs):
0504         ret = self.taskBuffer.addHarvesterDialogs(harvesterID, dialogs)
0505         if not ret:
0506             return_value = (False, MESSAGE_DATABASE)
0507         else:
0508             return_value = (True, "")
0509         return json.dumps(return_value)
0510 
0511     # heartbeat for harvester
0512     def harvesterIsAlive(self, user, host, harvesterID, data):
0513         ret = self.taskBuffer.harvesterIsAlive(user, host, harvesterID, data)
0514         if ret is None:
0515             return_value = (False, MESSAGE_DATABASE)
0516         else:
0517             return_value = (True, ret)
0518         return json.dumps(return_value)
0519 
0520     # get stats of workers
0521     def getWorkerStats(self):
0522         return self.taskBuffer.getWorkerStats()
0523 
0524     # report stat of workers
0525     def reportWorkerStats_jobtype(self, harvesterID, siteName, paramsList):
0526         return self.taskBuffer.reportWorkerStats_jobtype(harvesterID, siteName, paramsList)
0527 
0528     # set num slots for workload provisioning
0529     def setNumSlotsForWP(self, pandaQueueName, numSlots, gshare, resourceType, validPeriod):
0530         return_value = self.taskBuffer.setNumSlotsForWP(pandaQueueName, numSlots, gshare, resourceType, validPeriod)
0531         return json.dumps(return_value)
0532 
0533     # enable jumbo jobs
0534     def enableJumboJobs(self, jediTaskID, totalJumboJobs, nJumboPerSite):
0535         return_value = self.taskBuffer.enableJumboJobs(jediTaskID, totalJumboJobs, nJumboPerSite)
0536         if totalJumboJobs > 0 and return_value[0] == 0:
0537             self.avalancheTask(jediTaskID, "panda", True)
0538         return json.dumps(return_value)
0539 
0540     # get user job metadata
0541     def getUserJobMetadata(self, jediTaskID):
0542         return_value = self.taskBuffer.getUserJobMetadata(jediTaskID)
0543         return json.dumps(return_value)
0544 
0545     # get jumbo job datasets
0546     def getJumboJobDatasets(self, n_days, grace_period):
0547         return_value = self.taskBuffer.getJumboJobDatasets(n_days, grace_period)
0548         # serialize
0549         return json.dumps(return_value)
0550 
0551     # sweep panda queue
0552     def sweepPQ(self, panda_queue, status_list, ce_list, submission_host_list):
0553         try:
0554             panda_queue_des = json.loads(panda_queue)
0555             status_list_des = json.loads(status_list)
0556             ce_list_des = json.loads(ce_list)
0557             submission_host_list_des = json.loads(submission_host_list)
0558         except Exception:
0559             _logger.error("Problem deserializing variables")
0560 
0561         ret = self.taskBuffer.sweepPQ(panda_queue_des, status_list_des, ce_list_des, submission_host_list_des)
0562         return WrappedPickle.dumps(ret)
0563 
0564     # send command to a job
0565     def send_command_to_job(self, panda_id, com):
0566         ret = self.taskBuffer.send_command_to_job(panda_id, com)
0567         return ret
0568 
0569     # set user secret
0570     def set_user_secret(self, owner, key, value):
0571         ret = self.taskBuffer.set_user_secret(owner, key, value)
0572         return ret
0573 
0574     # get user secrets
0575     def get_user_secrets(self, owner, keys, get_json):
0576         ret = self.taskBuffer.get_user_secrets(owner, keys, get_json)
0577         return ret
0578 
0579     # get files in datasets
0580     def get_files_in_datasets(self, task_id, dataset_types):
0581         ret = self.taskBuffer.get_files_in_datasets(task_id, dataset_types)
0582         return ret
0583 
0584 
0585 # Singleton
0586 userIF = UserIF()
0587 del UserIF
0588 
0589 
0590 # get FQANs
0591 def _getFQAN(req):
0592     fqans = []
0593     for tmp_key in req.subprocess_env:
0594         tmp_value = req.subprocess_env[tmp_key]
0595         # Scan VOMS attributes
0596         # compact style
0597         if tmp_key.startswith("GRST_CRED_") and tmp_value.startswith("VOMS"):
0598             fqan = tmp_value.split()[-1]
0599             fqans.append(fqan)
0600 
0601         # old style
0602         elif tmp_key.startswith("GRST_CONN_"):
0603             tmp_items = tmp_value.split(":")
0604             if len(tmp_items) == 2 and tmp_items[0] == "fqan":
0605                 fqans.append(tmp_items[-1])
0606 
0607     return fqans
0608 
0609 
0610 # get DN
0611 def _getDN(req):
0612     real_dn = ""
0613     if "SSL_CLIENT_S_DN" in req.subprocess_env:
0614         # remove redundant CN
0615         real_dn = CoreUtils.get_bare_dn(req.subprocess_env["SSL_CLIENT_S_DN"], keep_proxy=True)
0616     return real_dn
0617 
0618 
0619 # check role
0620 def _has_production_role(req):
0621     # check DN
0622     user = _getDN(req)
0623     for sdn in panda_config.production_dns:
0624         if sdn in user:
0625             return True
0626     # get FQANs
0627     fqans = _getFQAN(req)
0628     # loop over all FQANs
0629     for fqan in fqans:
0630         # check production role
0631         for rolePat in [
0632             "/atlas/usatlas/Role=production",
0633             "/atlas/Role=production",
0634             "^/[^/]+/Role=production",
0635         ]:
0636             if fqan.startswith(rolePat):
0637                 return True
0638             if re.search(rolePat, fqan):
0639                 return True
0640     return False
0641 
0642 
0643 # get primary working group with prod role
0644 def _getWGwithPR(req):
0645     try:
0646         fqans = _getFQAN(req)
0647         for fqan in fqans:
0648             tmpMatch = re.search("/[^/]+/([^/]+)/Role=production", fqan)
0649             if tmpMatch is not None:
0650                 # ignore usatlas since it is used as atlas prod role
0651                 tmpWG = tmpMatch.group(1)
0652                 if tmpWG not in ["", "usatlas"]:
0653                     return tmpWG.split("-")[-1].lower()
0654     except Exception:
0655         pass
0656     return None
0657 
0658 
0659 # security check
0660 def isSecure(req):
0661     # check security
0662     if not Protocol.isSecure(req):
0663         return False
0664     # disable limited proxy
0665     if "/CN=limited proxy" in req.subprocess_env["SSL_CLIENT_S_DN"]:
0666         _logger.warning(f"access via limited proxy : {req.subprocess_env['SSL_CLIENT_S_DN']}")
0667         return False
0668     return True
0669 
0670 
0671 """
0672 web service interface
0673 
0674 """
0675 
0676 
0677 # submit jobs
0678 def submitJobs(req, jobs, toPending=None):
0679     # check security
0680     if not isSecure(req):
0681         return False
0682     # get DN
0683     user = None
0684     if "SSL_CLIENT_S_DN" in req.subprocess_env:
0685         user = _getDN(req)
0686     # get FQAN
0687     fqans = _getFQAN(req)
0688     # hostname
0689     host = req.get_remote_host()
0690     # production Role
0691     is_production_role = _has_production_role(req)
0692     # to pending
0693     toPending = resolve_true(toPending)
0694 
0695     return userIF.submitJobs(jobs, user, host, fqans, is_production_role, toPending)
0696 
0697 
0698 # get job status
0699 def getJobStatus(req, ids, no_pickle=None):
0700     return userIF.getJobStatus(ids, req.acceptJson(), no_pickle)
0701 
0702 
0703 # set debug mode
0704 def setDebugMode(req, pandaID, modeOn):
0705     tmp_log = LogWrapper(_logger, f"setDebugMode {pandaID} {modeOn}")
0706     # get DN
0707     if "SSL_CLIENT_S_DN" not in req.subprocess_env:
0708         error_str = MESSAGE_SSL
0709         tmp_log.error(error_str)
0710         return f"ERROR: {error_str}"
0711     user = _getDN(req)
0712     # check role
0713     is_production_manager = _has_production_role(req)
0714     fqans = _getFQAN(req)
0715     # mode
0716     modeOn = resolve_true(modeOn)
0717 
0718     # get the primary working group with prod role
0719     working_group = _getWGwithPR(req)
0720     tmp_log.debug(f"user={user} mgr={is_production_manager} wg={working_group} fqans={str(fqans)}")
0721     # exec
0722     return userIF.setDebugMode(user, pandaID, is_production_manager, modeOn, working_group)
0723 
0724 
0725 # insert sandbox file info
0726 def insertSandboxFileInfo(req, userName, fileName, fileSize, checkSum):
0727     tmp_log = LogWrapper(_logger, f"insertSandboxFileInfo {userName} {fileName}")
0728     # get DN
0729     if "SSL_CLIENT_S_DN" not in req.subprocess_env:
0730         error_str = MESSAGE_SSL
0731         tmp_log.error(error_str)
0732         return f"ERROR: {error_str}"
0733 
0734     # check role
0735     is_production_manager = _has_production_role(req)
0736     if not is_production_manager:
0737         tmp_log.error(MESSAGE_PROD_ROLE)
0738         return f"ERROR: {MESSAGE_PROD_ROLE}"
0739 
0740     # hostname
0741     if hasattr(panda_config, "sandboxHostname") and panda_config.sandboxHostname:
0742         hostName = panda_config.sandboxHostname
0743     else:
0744         hostName = req.get_remote_host()
0745     # exec
0746     return userIF.insertSandboxFileInfo(userName, hostName, fileName, fileSize, checkSum)
0747 
0748 
0749 # check duplicated sandbox file
0750 def checkSandboxFile(req, fileSize, checkSum):
0751     # get DN
0752     if "SSL_CLIENT_S_DN" not in req.subprocess_env:
0753         return f"ERROR: {MESSAGE_SSL}"
0754     user = _getDN(req)
0755     # exec
0756     return userIF.checkSandboxFile(user, fileSize, checkSum)
0757 
0758 
0759 # get job statistics per site and resource
0760 def get_job_statistics_per_site_label_resource(req, time_window=None):
0761     return userIF.get_job_statistics_per_site_label_resource(time_window)
0762 
0763 
0764 # get job statistics per site
0765 def getJobStatisticsPerSite(req):
0766     return userIF.getJobStatisticsPerSite()
0767 
0768 
0769 # kill jobs
0770 def killJobs(req, ids, code=None, useMailAsID=None, killOpts=None):
0771     # check security
0772     if not isSecure(req):
0773         return False
0774     # get DN
0775     user = None
0776     if "SSL_CLIENT_S_DN" in req.subprocess_env:
0777         user = _getDN(req)
0778     # check role
0779     is_production_manager = _has_production_role(req)
0780     # get FQANs
0781     fqans = _getFQAN(req)
0782     # use email address as ID
0783     useMailAsID = resolve_true(useMailAsID)
0784 
0785     # hostname
0786     host = req.get_remote_host()
0787     # options
0788     if killOpts is None:
0789         killOpts = []
0790     else:
0791         killOpts = killOpts.split(",")
0792     return userIF.killJobs(ids, user, host, code, is_production_manager, useMailAsID, fqans, killOpts)
0793 
0794 
0795 # reassign jobs
0796 def reassignJobs(req, ids, forPending=None, firstSubmission=None):
0797     # check security
0798     if not isSecure(req):
0799         return False
0800     # get DN
0801     user = None
0802     if "SSL_CLIENT_S_DN" in req.subprocess_env:
0803         user = _getDN(req)
0804     # hostname
0805     host = req.get_remote_host()
0806     # for pending
0807     forPending = resolve_true(forPending)
0808 
0809     # first submission
0810     firstSubmission = resolve_false(firstSubmission)
0811 
0812     return userIF.reassignJobs(ids, user, host, forPending, firstSubmission)
0813 
0814 
0815 # get ban users
0816 def get_ban_users(req):
0817     return userIF.get_ban_users()
0818 
0819 
0820 # get script for offline running
0821 def getScriptOfflineRunning(req, pandaID, days=None):
0822     try:
0823         if days is not None:
0824             days = int(days)
0825     except Exception:
0826         days = None
0827     return userIF.getScriptOfflineRunning(pandaID, days)
0828 
0829 
0830 # get active JediTasks in a time range
0831 def getJediTasksInTimeRange(req, timeRange, dn=None, fullFlag=None, minTaskID=None, task_type="user"):
0832     # check security
0833     if not isSecure(req):
0834         return False
0835     # get DN
0836     if "SSL_CLIENT_S_DN" not in req.subprocess_env:
0837         return False
0838     if dn is None:
0839         dn = _getDN(req)
0840     fullFlag = resolve_true(fullFlag)
0841 
0842     try:
0843         minTaskID = int(minTaskID)
0844     except Exception:
0845         minTaskID = None
0846     _logger.debug(f"getJediTasksInTimeRange {dn} {timeRange}")
0847     # execute
0848     return userIF.getJediTasksInTimeRange(dn, timeRange, fullFlag, minTaskID, task_type)
0849 
0850 
0851 # get details of JediTask
0852 def getJediTaskDetails(req, jediTaskID, fullFlag, withTaskInfo):
0853     # check security
0854     if not isSecure(req):
0855         return False
0856     # get DN
0857     if "SSL_CLIENT_S_DN" not in req.subprocess_env:
0858         return False
0859     # option
0860     fullFlag = resolve_true(fullFlag)
0861     withTaskInfo = resolve_true(withTaskInfo)
0862 
0863     _logger.debug(f"getJediTaskDetails {jediTaskID} {fullFlag} {withTaskInfo}")
0864     # execute
0865     return userIF.getJediTaskDetails(jediTaskID, fullFlag, withTaskInfo)
0866 
0867 
0868 # get full job status
0869 def getFullJobStatus(req, ids):
0870     # check security
0871     if not isSecure(req):
0872         return False
0873     # get DN
0874     if "SSL_CLIENT_S_DN" not in req.subprocess_env:
0875         return False
0876     dn = _getDN(req)
0877     return userIF.getFullJobStatus(ids, dn)
0878 
0879 
0880 # insert task params
0881 def insertTaskParams(req, taskParams=None, properErrorCode=None, parent_tid=None):
0882     tmp_log = LogWrapper(_logger, f"insertTaskParams-{naive_utcnow().isoformat('/')}")
0883     tmp_log.debug("start")
0884     properErrorCode = resolve_true(properErrorCode)
0885 
0886     # check security
0887     if not isSecure(req):
0888         tmp_log.debug(MESSAGE_SSL)
0889         return WrappedPickle.dumps((False, MESSAGE_SSL))
0890     # get DN
0891     user = None
0892     if "SSL_CLIENT_S_DN" in req.subprocess_env:
0893         user = _getDN(req)
0894 
0895     # check format
0896     try:
0897         json.loads(taskParams)
0898     except Exception:
0899         tmp_log.debug(MESSAGE_JSON)
0900         return WrappedPickle.dumps((False, MESSAGE_JSON))
0901     # check role
0902     is_production_role = _has_production_role(req)
0903     # get FQANs
0904     fqans = _getFQAN(req)
0905 
0906     tmp_log.debug(f"user={user} prodRole={is_production_role} FQAN:{str(fqans)} parent_tid={parent_tid}")
0907     ret = userIF.insertTaskParams(taskParams, user, is_production_role, fqans, properErrorCode, parent_tid)
0908     try:
0909         tmp_log.debug(ret[1])
0910     except Exception:
0911         pass
0912     return WrappedPickle.dumps(ret)
0913 
0914 
0915 # kill task
0916 def killTask(req, jediTaskID=None, properErrorCode=None, broadcast=None):
0917     properErrorCode = resolve_true(properErrorCode)
0918     broadcast = resolve_true(broadcast)
0919 
0920     # check security
0921     if not isSecure(req):
0922         error_code = CODE_SSL if properErrorCode else False
0923         return WrappedPickle.dumps((error_code, MESSAGE_SSL))
0924 
0925     # get DN
0926     user = None
0927     if "SSL_CLIENT_S_DN" in req.subprocess_env:
0928         user = _getDN(req)
0929     # check role
0930     is_production_role = _has_production_role(req)
0931     # check jediTaskID
0932     try:
0933         jediTaskID = int(jediTaskID)
0934     except Exception:
0935         error_code = CODE_LOGIC if properErrorCode else False
0936         return WrappedPickle.dumps((error_code, MESSAGE_TASK_ID))
0937 
0938     ret = userIF.killTask(jediTaskID, user, is_production_role, properErrorCode, broadcast)
0939     return WrappedPickle.dumps(ret)
0940 
0941 
0942 # retry task
0943 def retryTask(
0944     req,
0945     jediTaskID,
0946     properErrorCode=None,
0947     newParams=None,
0948     noChildRetry=None,
0949     discardEvents=None,
0950     disable_staging_mode=None,
0951     keep_gshare_priority=None,
0952     ignore_hard_exhausted=None,
0953 ):
0954     properErrorCode = resolve_true(properErrorCode)
0955     noChildRetry = resolve_true(noChildRetry)
0956     discardEvents = resolve_true(discardEvents)
0957     disable_staging_mode = resolve_true(disable_staging_mode)
0958     keep_gshare_priority = resolve_true(keep_gshare_priority)
0959     ignore_hard_exhausted = resolve_true(ignore_hard_exhausted)
0960 
0961     # check security
0962     if not isSecure(req):
0963         error_code = CODE_SSL if properErrorCode else False
0964         return WrappedPickle.dumps((error_code, MESSAGE_SSL))
0965 
0966     # get DN
0967     user = None
0968     if "SSL_CLIENT_S_DN" in req.subprocess_env:
0969         user = _getDN(req)
0970     # check role
0971     is_production_role = _has_production_role(req)
0972     # check jediTaskID
0973     try:
0974         jediTaskID = int(jediTaskID)
0975     except Exception:
0976         error_code = CODE_LOGIC if properErrorCode else False
0977         return WrappedPickle.dumps((error_code, MESSAGE_TASK_ID))
0978 
0979     ret = userIF.retryTask(
0980         jediTaskID,
0981         user,
0982         is_production_role,
0983         properErrorCode,
0984         newParams,
0985         noChildRetry,
0986         discardEvents,
0987         disable_staging_mode,
0988         keep_gshare_priority,
0989         ignore_hard_exhausted,
0990     )
0991     return WrappedPickle.dumps(ret)
0992 
0993 
0994 # reassign task to site/cloud
0995 def reassignTask(req, jediTaskID, site=None, cloud=None, nucleus=None, soft=None, mode=None):
0996     # check security
0997     if not isSecure(req):
0998         return WrappedPickle.dumps((CODE_SSL, MESSAGE_SSL))
0999     # get DN
1000     user = None
1001     if "SSL_CLIENT_S_DN" in req.subprocess_env:
1002         user = _getDN(req)
1003     # check role
1004     is_production_role = _has_production_role(req)
1005     # check jediTaskID
1006     try:
1007         jediTaskID = int(jediTaskID)
1008     except Exception:
1009         return WrappedPickle.dumps((CODE_LOGIC, MESSAGE_TASK_ID))
1010     # site or cloud
1011     if site is not None:
1012         # set 'y' to go back to oldStatus immediately
1013         comComment = f"site:{site}:y"
1014     elif nucleus is not None:
1015         comComment = f"nucleus:{nucleus}:n"
1016     else:
1017         comComment = f"cloud:{cloud}:n"
1018     if mode == "nokill":
1019         comComment += ":nokill reassign"
1020     elif mode == "soft" or soft == "True":
1021         comComment += ":soft reassign"
1022     ret = userIF.reassignTask(jediTaskID, user, is_production_role, comComment)
1023     return WrappedPickle.dumps(ret)
1024 
1025 
1026 # finish task
1027 def finishTask(req, jediTaskID=None, properErrorCode=None, soft=None, broadcast=None):
1028     properErrorCode = resolve_true(properErrorCode)
1029     broadcast = resolve_true(broadcast)
1030     qualifier = None
1031     if soft == "True":
1032         qualifier = "soft"
1033 
1034     # check security
1035     if not isSecure(req):
1036         error_code = CODE_SSL if properErrorCode else False
1037         return WrappedPickle.dumps((error_code, MESSAGE_SSL))
1038 
1039     # get DN
1040     user = None
1041     if "SSL_CLIENT_S_DN" in req.subprocess_env:
1042         user = _getDN(req)
1043     # check role
1044     is_production_role = _has_production_role(req)
1045     # check jediTaskID
1046     try:
1047         jediTaskID = int(jediTaskID)
1048     except Exception:
1049         error_code = CODE_LOGIC if properErrorCode else False
1050         return WrappedPickle.dumps((error_code, MESSAGE_TASK_ID))
1051 
1052     ret = userIF.finishTask(jediTaskID, user, is_production_role, properErrorCode, qualifier, broadcast)
1053     return WrappedPickle.dumps(ret)
1054 
1055 
1056 # reload input
1057 def reloadInput(req, jediTaskID, properErrorCode=None, ignore_hard_exhausted=None):
1058     properErrorCode = resolve_true(properErrorCode)
1059     # check security
1060     if not isSecure(req):
1061         error_code = CODE_SSL if properErrorCode else False
1062         return WrappedPickle.dumps((error_code, MESSAGE_SSL))
1063 
1064     # get DN
1065     user = None
1066     if "SSL_CLIENT_S_DN" in req.subprocess_env:
1067         user = _getDN(req)
1068     # check role
1069     is_production_role = _has_production_role(req)
1070     # check jediTaskID
1071     try:
1072         jediTaskID = int(jediTaskID)
1073     except Exception:
1074         error_code = CODE_LOGIC if properErrorCode else False
1075         return WrappedPickle.dumps((error_code, MESSAGE_TASK_ID))
1076     ignore_hard_exhausted = resolve_true(ignore_hard_exhausted)
1077     ret = userIF.reloadInput(jediTaskID, user, is_production_role, ignore_hard_exhausted)
1078     return WrappedPickle.dumps(ret)
1079 
1080 
1081 # change task priority
1082 def changeTaskPriority(req, jediTaskID=None, newPriority=None):
1083     # check security
1084     if not isSecure(req):
1085         return WrappedPickle.dumps((False, MESSAGE_SSL))
1086     # check role
1087     is_production_role = _has_production_role(req)
1088     # only prod managers can use this method
1089     if not is_production_role:
1090         return "Failed : production or pilot role required"
1091     # check jediTaskID
1092     try:
1093         jediTaskID = int(jediTaskID)
1094     except Exception:
1095         return WrappedPickle.dumps((False, MESSAGE_TASK_ID))
1096     # check priority
1097     try:
1098         newPriority = int(newPriority)
1099     except Exception:
1100         return WrappedPickle.dumps((False, "newPriority must be an integer"))
1101     ret = userIF.changeTaskPriority(jediTaskID, newPriority)
1102     return WrappedPickle.dumps(ret)
1103 
1104 
1105 # increase attempt number for unprocessed files
1106 def increaseAttemptNrPanda(req, jediTaskID, increasedNr):
1107     # check security
1108     if not isSecure(req):
1109         return WrappedPickle.dumps((False, MESSAGE_SSL))
1110 
1111     # check role
1112     is_production_role = _has_production_role(req)
1113     # only prod managers can use this method
1114     if not is_production_role:
1115         return WrappedPickle.dumps((3, MESSAGE_PROD_ROLE))
1116 
1117     # check jediTaskID
1118     try:
1119         jediTaskID = int(jediTaskID)
1120     except Exception:
1121         return WrappedPickle.dumps((4, MESSAGE_TASK_ID))
1122 
1123     # check value for increase
1124     try:
1125         increasedNr = int(increasedNr)
1126     except Exception:
1127         increasedNr = -1
1128     if increasedNr < 0:
1129         return WrappedPickle.dumps((4, "increase must be a positive integer"))
1130 
1131     return WrappedPickle.dumps(userIF.increaseAttemptNrPanda(jediTaskID, increasedNr))
1132 
1133 
1134 # change task attribute
1135 def changeTaskAttributePanda(req, jediTaskID, attrName, attrValue):
1136     # check security
1137     if not isSecure(req):
1138         return WrappedPickle.dumps((False, MESSAGE_SSL))
1139 
1140     # check role
1141     is_production_role = _has_production_role(req)
1142     # only prod managers can use this method
1143     if not is_production_role:
1144         return WrappedPickle.dumps((False, MESSAGE_PROD_ROLE))
1145     # check jediTaskID
1146     try:
1147         jediTaskID = int(jediTaskID)
1148     except Exception:
1149         return WrappedPickle.dumps((False, MESSAGE_TASK_ID))
1150     # check attribute
1151     if attrName not in ["ramCount", "wallTime", "cpuTime", "coreCount"]:
1152         return WrappedPickle.dumps((2, f"disallowed to update {attrName}"))
1153     ret = userIF.changeTaskAttributePanda(jediTaskID, attrName, attrValue)
1154     return WrappedPickle.dumps((ret, None))
1155 
1156 
1157 # change split rule for task
1158 def changeTaskSplitRulePanda(req, jediTaskID, attrName, attrValue):
1159     # check security
1160     if not isSecure(req):
1161         return WrappedPickle.dumps((False, MESSAGE_SSL))
1162 
1163     # check role
1164     is_production_role = _has_production_role(req)
1165     # only prod managers can use this method
1166     if not is_production_role:
1167         return WrappedPickle.dumps((False, MESSAGE_PROD_ROLE))
1168     # check jediTaskID
1169     try:
1170         jediTaskID = int(jediTaskID)
1171     except Exception:
1172         return WrappedPickle.dumps((False, MESSAGE_TASK_ID))
1173     # check attribute
1174     if attrName not in [
1175         "AI",
1176         "TW",
1177         "EC",
1178         "ES",
1179         "MF",
1180         "NG",
1181         "NI",
1182         "NF",
1183         "NJ",
1184         "AV",
1185         "IL",
1186         "LI",
1187         "LC",
1188         "CC",
1189         "OT",
1190         "UZ",
1191     ]:
1192         return WrappedPickle.dumps((2, f"disallowed to update {attrName}"))
1193     ret = userIF.changeTaskSplitRulePanda(jediTaskID, attrName, attrValue)
1194     return WrappedPickle.dumps((ret, None))
1195 
1196 
1197 # pause task
1198 def pauseTask(req, jediTaskID):
1199     # check security
1200     if not isSecure(req):
1201         return WrappedPickle.dumps((False, MESSAGE_SSL))
1202     # get DN
1203     user = None
1204     if "SSL_CLIENT_S_DN" in req.subprocess_env:
1205         user = _getDN(req)
1206     # check role
1207     is_production_role = _has_production_role(req)
1208     # only prod managers can use this method
1209     if not is_production_role:
1210         return WrappedPickle.dumps((False, MESSAGE_PROD_ROLE))
1211     # check jediTaskID
1212     try:
1213         jediTaskID = int(jediTaskID)
1214     except Exception:
1215         return WrappedPickle.dumps((False, MESSAGE_TASK_ID))
1216     ret = userIF.pauseTask(jediTaskID, user, is_production_role)
1217     return WrappedPickle.dumps(ret)
1218 
1219 
1220 # resume task
1221 def resumeTask(req, jediTaskID):
1222     # check security
1223     if not isSecure(req):
1224         return WrappedPickle.dumps((False, MESSAGE_SSL))
1225     # get DN
1226     user = None
1227     if "SSL_CLIENT_S_DN" in req.subprocess_env:
1228         user = _getDN(req)
1229     # check role
1230     is_production_role = _has_production_role(req)
1231     # only prod managers can use this method
1232     if not is_production_role:
1233         return WrappedPickle.dumps((False, "production role required"))
1234     # check jediTaskID
1235     try:
1236         jediTaskID = int(jediTaskID)
1237     except Exception:
1238         return WrappedPickle.dumps((False, MESSAGE_TASK_ID))
1239     ret = userIF.resumeTask(jediTaskID, user, is_production_role)
1240     return WrappedPickle.dumps(ret)
1241 
1242 
1243 # force avalanche for task
1244 def avalancheTask(req, jediTaskID):
1245     # check security
1246     if not isSecure(req):
1247         return WrappedPickle.dumps((False, MESSAGE_SSL))
1248     # get DN
1249     user = None
1250     if "SSL_CLIENT_S_DN" in req.subprocess_env:
1251         user = _getDN(req)
1252     # check role
1253     is_production_role = _has_production_role(req)
1254     # only prod managers can use this method
1255     if not is_production_role:
1256         return WrappedPickle.dumps((False, "production role required"))
1257     # check jediTaskID
1258     try:
1259         jediTaskID = int(jediTaskID)
1260     except Exception:
1261         return WrappedPickle.dumps((False, MESSAGE_TASK_ID))
1262     ret = userIF.avalancheTask(jediTaskID, user, is_production_role)
1263     return WrappedPickle.dumps(ret)
1264 
1265 
1266 # release task
1267 def release_task(req, jedi_task_id):
1268     # check security
1269     if not isSecure(req):
1270         return json.dumps((False, MESSAGE_SSL))
1271     # get DN
1272     user = None
1273     if "SSL_CLIENT_S_DN" in req.subprocess_env:
1274         user = _getDN(req)
1275     # check role
1276     prod_role = _has_production_role(req)
1277     # only prod managers can use this method
1278     if not prod_role:
1279         return json.dumps((False, "production role required"))
1280     # check jediTaskID
1281     try:
1282         jedi_task_id = int(jedi_task_id)
1283     except Exception:
1284         return json.dumps((False, MESSAGE_TASK_ID))
1285     ret = userIF.send_command_to_task(jedi_task_id, user, prod_role, "release")
1286     return json.dumps(ret)
1287 
1288 
1289 # kill unfinished jobs
1290 def killUnfinishedJobs(req, jediTaskID, code=None, useMailAsID=None):
1291     # check security
1292     if not isSecure(req):
1293         return False
1294     # get DN
1295     user = None
1296     if "SSL_CLIENT_S_DN" in req.subprocess_env:
1297         user = _getDN(req)
1298     # check role
1299     is_production_manager = False
1300     # get FQANs
1301     fqans = _getFQAN(req)
1302     # loop over all FQANs
1303     for fqan in fqans:
1304         # check production role
1305         for rolePat in ["/atlas/usatlas/Role=production", "/atlas/Role=production"]:
1306             if fqan.startswith(rolePat):
1307                 is_production_manager = True
1308                 break
1309         # escape
1310         if is_production_manager:
1311             break
1312     # use email address as ID
1313     useMailAsID = resolve_true(useMailAsID)
1314     # hostname
1315     host = req.get_remote_host()
1316     # get PandaIDs
1317     ids = userIF.getPandaIDsWithTaskID(jediTaskID)
1318     # kill
1319     return userIF.killJobs(ids, user, host, code, is_production_manager, useMailAsID, fqans)
1320 
1321 
1322 # change modificationTime for task
1323 def changeTaskModTimePanda(req, jediTaskID, diffValue):
1324     # check security
1325     if not isSecure(req):
1326         return WrappedPickle.dumps((False, MESSAGE_SSL))
1327 
1328     # check role
1329     is_production_role = _has_production_role(req)
1330     # only prod managers can use this method
1331     if not is_production_role:
1332         return WrappedPickle.dumps((False, MESSAGE_PROD_ROLE))
1333     # check jediTaskID
1334     try:
1335         jediTaskID = int(jediTaskID)
1336     except Exception:
1337         return WrappedPickle.dumps((False, MESSAGE_TASK_ID))
1338     try:
1339         diffValue = int(diffValue)
1340         attrValue = datetime.datetime.now() + datetime.timedelta(hours=diffValue)
1341     except Exception:
1342         return WrappedPickle.dumps((False, f"failed to convert {diffValue} to time diff"))
1343     ret = userIF.changeTaskAttributePanda(jediTaskID, "modificationTime", attrValue)
1344     return WrappedPickle.dumps((ret, None))
1345 
1346 
1347 # get PandaIDs with TaskID
1348 def getPandaIDsWithTaskID(req, jediTaskID):
1349     try:
1350         jediTaskID = int(jediTaskID)
1351     except Exception:
1352         return WrappedPickle.dumps((False, MESSAGE_TASK_ID))
1353     idsStr = userIF.getPandaIDsWithTaskID(jediTaskID)
1354     # deserialize
1355     ids = WrappedPickle.loads(idsStr)
1356 
1357     return WrappedPickle.dumps(ids)
1358 
1359 
1360 # reactivate Task
1361 def reactivateTask(req, jediTaskID, keep_attempt_nr=None, trigger_job_generation=None):
1362     # check security
1363     if not isSecure(req):
1364         return WrappedPickle.dumps((False, MESSAGE_SSL))
1365     # check role
1366     is_production_manager = _has_production_role(req)
1367     if not is_production_manager:
1368         msg = "production role is required"
1369         _logger.error(f"reactivateTask: {msg}")
1370         return WrappedPickle.dumps((False, msg))
1371     try:
1372         jediTaskID = int(jediTaskID)
1373     except Exception:
1374         return WrappedPickle.dumps((False, MESSAGE_TASK_ID))
1375     keep_attempt_nr = resolve_true(keep_attempt_nr)
1376     trigger_job_generation = resolve_true(trigger_job_generation)
1377 
1378     ret = userIF.reactivateTask(jediTaskID, keep_attempt_nr, trigger_job_generation)
1379 
1380     return WrappedPickle.dumps(ret)
1381 
1382 
1383 # get task status
1384 def getTaskStatus(req, jediTaskID):
1385     try:
1386         jediTaskID = int(jediTaskID)
1387     except Exception:
1388         return WrappedPickle.dumps((False, MESSAGE_TASK_ID))
1389     ret = userIF.getTaskStatus(jediTaskID)
1390     return WrappedPickle.dumps(ret)
1391 
1392 
1393 # reassign share
1394 def reassignShare(req, jedi_task_ids_pickle, share, reassign_running):
1395     # check security
1396     if not isSecure(req):
1397         return WrappedPickle.dumps((False, MESSAGE_SSL))
1398 
1399     # check role
1400     prod_role = _has_production_role(req)
1401     if not prod_role:
1402         return WrappedPickle.dumps((False, MESSAGE_PROD_ROLE))
1403 
1404     jedi_task_ids = WrappedPickle.loads(jedi_task_ids_pickle)
1405     _logger.debug(f"reassignShare: jedi_task_ids: {jedi_task_ids}, share: {share}, reassign_running: {reassign_running} (type {type(reassign_running)})")
1406 
1407     if isinstance(reassign_running, str) and "true" in reassign_running.lower():
1408         reassign_running = True
1409     else:
1410         reassign_running = False
1411 
1412     if not ((isinstance(jedi_task_ids, list) or (isinstance(jedi_task_ids, tuple)) and isinstance(share, str))):
1413         return WrappedPickle.dumps((False, "jedi_task_ids must be tuple/list and share must be string"))
1414 
1415     ret = userIF.reassignShare(jedi_task_ids, share, reassign_running)
1416     return WrappedPickle.dumps(ret)
1417 
1418 
1419 # get taskParamsMap with TaskID
1420 def getTaskParamsMap(req, jediTaskID):
1421     try:
1422         jediTaskID = int(jediTaskID)
1423     except Exception:
1424         return WrappedPickle.dumps((False, MESSAGE_TASK_ID))
1425     ret = userIF.getTaskParamsMap(jediTaskID)
1426     return WrappedPickle.dumps(ret)
1427 
1428 
1429 # update workers
1430 def updateWorkers(req, harvesterID, workers):
1431     # check security
1432     if not isSecure(req):
1433         return json.dumps((False, MESSAGE_SSL))
1434     # get DN
1435     user = _getDN(req)
1436     # hostname
1437     host = req.get_remote_host()
1438     return_value = None
1439     tStart = naive_utcnow()
1440     # convert
1441     try:
1442         data = json.loads(workers)
1443     except Exception:
1444         return_value = json.dumps((False, MESSAGE_JSON))
1445     # update
1446     if return_value is None:
1447         return_value = userIF.updateWorkers(user, host, harvesterID, data)
1448     tDelta = naive_utcnow() - tStart
1449     _logger.debug(f"updateWorkers {harvesterID} took {tDelta.seconds}.{tDelta.microseconds // 1000:03d} sec")
1450 
1451     return return_value
1452 
1453 
1454 # update workers
1455 def updateServiceMetrics(req, harvesterID, metrics):
1456     # check security
1457     if not isSecure(req):
1458         return json.dumps((False, MESSAGE_SSL))
1459 
1460     user = _getDN(req)
1461 
1462     host = req.get_remote_host()
1463     return_value = None
1464     tStart = naive_utcnow()
1465 
1466     # convert
1467     try:
1468         data = json.loads(metrics)
1469     except Exception:
1470         return_value = json.dumps((False, MESSAGE_JSON))
1471 
1472     # update
1473     if return_value is None:
1474         return_value = userIF.updateServiceMetrics(user, host, harvesterID, data)
1475 
1476     tDelta = naive_utcnow() - tStart
1477     _logger.debug(f"updateServiceMetrics {harvesterID} took {tDelta.seconds}.{tDelta.microseconds // 1000:03d} sec")
1478 
1479     return return_value
1480 
1481 
1482 # add harvester dialog messages
1483 def addHarvesterDialogs(req, harvesterID, dialogs):
1484     # check security
1485     if not isSecure(req):
1486         return json.dumps((False, MESSAGE_SSL))
1487     # get DN
1488     user = _getDN(req)
1489     # convert
1490     try:
1491         data = json.loads(dialogs)
1492     except Exception:
1493         return json.dumps((False, MESSAGE_JSON))
1494     # update
1495     return userIF.addHarvesterDialogs(user, harvesterID, data)
1496 
1497 
1498 # heartbeat for harvester
1499 def harvesterIsAlive(req, harvesterID, data=None):
1500     # check security
1501     if not isSecure(req):
1502         return json.dumps((False, MESSAGE_SSL))
1503     # get DN
1504     user = _getDN(req)
1505     # hostname
1506     host = req.get_remote_host()
1507     # convert
1508     try:
1509         if data is not None:
1510             data = json.loads(data)
1511         else:
1512             data = dict()
1513     except Exception:
1514         return json.dumps((False, MESSAGE_JSON))
1515     # update
1516     return userIF.harvesterIsAlive(user, host, harvesterID, data)
1517 
1518 
1519 # get stats of workers
1520 def getWorkerStats(req):
1521     # get
1522     ret = userIF.getWorkerStats()
1523     return json.dumps(ret)
1524 
1525 
1526 # report stat of workers
1527 def reportWorkerStats_jobtype(req, harvesterID, siteName, paramsList):
1528     # check security
1529     if not isSecure(req):
1530         return json.dumps((False, MESSAGE_SSL))
1531     # update
1532     ret = userIF.reportWorkerStats_jobtype(harvesterID, siteName, paramsList)
1533     return json.dumps(ret)
1534 
1535 
1536 # set num slots for workload provisioning
1537 def setNumSlotsForWP(req, pandaQueueName, numSlots, gshare=None, resourceType=None, validPeriod=None):
1538     # check security
1539     if not isSecure(req):
1540         return json.dumps((CODE_SSL, MESSAGE_SSL))
1541     # check role
1542     if not _has_production_role(req):
1543         return json.dumps((CODE_LOGIC, "production role is required in the certificate"))
1544     # convert
1545     try:
1546         numSlots = int(numSlots)
1547     except Exception:
1548         return json.dumps((CODE_OTHER_PARAMS, "numSlots must be int"))
1549     # execute
1550     return userIF.setNumSlotsForWP(pandaQueueName, numSlots, gshare, resourceType, validPeriod)
1551 
1552 
1553 # enable jumbo jobs
1554 def enableJumboJobs(req, jediTaskID, nJumboJobs, nJumboPerSite=None):
1555     # check security
1556     if not isSecure(req):
1557         return json.dumps((CODE_SSL, MESSAGE_SSL))
1558     # check role
1559     if not _has_production_role(req):
1560         return json.dumps((CODE_LOGIC, "production role is required in the certificate"))
1561     # convert
1562     try:
1563         nJumboJobs = int(nJumboJobs)
1564     except Exception:
1565         return json.dumps((CODE_OTHER_PARAMS, "nJumboJobs must be int"))
1566     try:
1567         nJumboPerSite = int(nJumboPerSite)
1568     except Exception:
1569         nJumboPerSite = nJumboJobs
1570     # execute
1571     return userIF.enableJumboJobs(jediTaskID, nJumboJobs, nJumboPerSite)
1572 
1573 
1574 # get user job metadata
1575 def getUserJobMetadata(req, jediTaskID):
1576     try:
1577         jediTaskID = int(jediTaskID)
1578     except Exception:
1579         return WrappedPickle.dumps((False, MESSAGE_TASK_ID))
1580     return userIF.getUserJobMetadata(jediTaskID)
1581 
1582 
1583 # get jumbo job datasets
1584 def getJumboJobDatasets(req, n_days, grace_period=0):
1585     try:
1586         n_days = int(n_days)
1587     except Exception:
1588         return WrappedPickle.dumps((False, "wrong n_days"))
1589     try:
1590         grace_period = int(grace_period)
1591     except Exception:
1592         return WrappedPickle.dumps((False, "wrong grace_period"))
1593     return userIF.getJumboJobDatasets(n_days, grace_period)
1594 
1595 
1596 # send Harvester the command to clean up the workers for a panda queue
1597 def sweepPQ(req, panda_queue, status_list, ce_list, submission_host_list):
1598     # check security
1599     if not isSecure(req):
1600         return json.dumps((False, MESSAGE_SSL))
1601     # check role
1602     prod_role = _has_production_role(req)
1603     if not prod_role:
1604         return json.dumps((False, MESSAGE_PROD_ROLE))
1605 
1606     return json.dumps((True, userIF.sweepPQ(panda_queue, status_list, ce_list, submission_host_list)))
1607 
1608 
1609 # json decoder for idds constants
1610 def decode_idds_enum(d):
1611     if "__idds_const__" in d:
1612         items = d["__idds_const__"].split(".")
1613         obj = idds.common.constants
1614         for item in items:
1615             obj = getattr(obj, item)
1616         return obj
1617     else:
1618         return d
1619 
1620 
1621 # relay iDDS command
1622 def relay_idds_command(req, command_name, args=None, kwargs=None, manager=None, json_outputs=None):
1623     tmp_log = LogWrapper(
1624         _logger,
1625         f"relay_idds_command-{naive_utcnow().isoformat('/')}",
1626     )
1627     # check security
1628     if not isSecure(req):
1629         tmp_log.error(MESSAGE_SSL)
1630         return json.dumps((False, MESSAGE_SSL))
1631     try:
1632         manager = resolve_bool(manager)
1633         if not manager:
1634             manager = False
1635         if "+" in command_name:
1636             command_name, idds_host = command_name.split("+")
1637         else:
1638             idds_host = idds.common.utils.get_rest_host()
1639         if manager:
1640             c = iDDS_ClientManager(idds_host)
1641         else:
1642             c = iDDS_Client(idds_host)
1643         if not hasattr(c, command_name):
1644             tmp_str = f"{command_name} is not a command of iDDS {c.__class__.__name__}"
1645             tmp_log.error(tmp_str)
1646             return json.dumps((False, tmp_str))
1647         if args:
1648             try:
1649                 args = idds.common.utils.json_loads(args)
1650             except Exception as e:
1651                 tmp_log.warning(f"failed to load args json with {str(e)}")
1652                 args = json.loads(args, object_hook=decode_idds_enum)
1653         else:
1654             args = []
1655         if kwargs:
1656             try:
1657                 kwargs = idds.common.utils.json_loads(kwargs)
1658             except Exception as e:
1659                 tmp_log.warning(f"failed to load kwargs json with {str(e)}")
1660                 kwargs = json.loads(kwargs, object_hook=decode_idds_enum)
1661         else:
1662             kwargs = {}
1663         # json outputs
1664         if json_outputs and manager:
1665             c.setup_json_outputs()
1666         # set original username
1667         dn = req.subprocess_env.get("SSL_CLIENT_S_DN")
1668         if dn:
1669             c.set_original_user(user_name=clean_user_id(dn))
1670         tmp_log.debug(f"execute: class={c.__class__.__name__} com={command_name} host={idds_host} args={str(args)[:200]} kwargs={str(kwargs)[:200]}")
1671         ret = getattr(c, command_name)(*args, **kwargs)
1672         tmp_log.debug(f"ret: {str(ret)[:200]}")
1673         try:
1674             return json.dumps((True, ret))
1675         except Exception:
1676             return idds.common.utils.json_dumps((True, ret))
1677     except Exception as e:
1678         tmp_str = f"failed to execute command with {str(e)}"
1679         tmp_log.error(f"{tmp_str} {traceback.format_exc()}")
1680         return json.dumps((False, tmp_str))
1681 
1682 
1683 # relay iDDS workflow command with ownership check
1684 def execute_idds_workflow_command(req, command_name, kwargs=None, json_outputs=None):
1685     try:
1686         tmp_log = LogWrapper(
1687             _logger,
1688             f"execute_idds_workflow_command-{naive_utcnow().isoformat('/')}",
1689         )
1690         if kwargs:
1691             try:
1692                 kwargs = idds.common.utils.json_loads(kwargs)
1693             except Exception:
1694                 kwargs = json.loads(kwargs, object_hook=decode_idds_enum)
1695         else:
1696             kwargs = {}
1697         if "+" in command_name:
1698             command_name, idds_host = command_name.split("+")
1699         else:
1700             idds_host = idds.common.utils.get_rest_host()
1701         # check permission
1702         if command_name in ["get_status"]:
1703             check_owner = False
1704         elif command_name in ["abort", "suspend", "resume", "retry", "finish"]:
1705             check_owner = True
1706         else:
1707             tmp_message = f"{command_name} is unsupported"
1708             tmp_log.error(tmp_message)
1709             return json.dumps((False, tmp_message))
1710         # check owner
1711         c = iDDS_ClientManager(idds_host)
1712         if json_outputs:
1713             c.setup_json_outputs()
1714         dn = req.subprocess_env.get("SSL_CLIENT_S_DN")
1715         if check_owner:
1716             # requester
1717             if not dn:
1718                 tmp_message = "SSL_CLIENT_S_DN is missing in HTTP request"
1719                 tmp_log.error(tmp_message)
1720                 return json.dumps((False, tmp_message))
1721             requester = clean_user_id(dn)
1722             # get request_id
1723             request_id = kwargs.get("request_id")
1724             if request_id is None:
1725                 tmp_message = "request_id is missing"
1726                 tmp_log.error(tmp_message)
1727                 return json.dumps((False, tmp_message))
1728             # get request
1729             req = c.get_requests(request_id=request_id)
1730             if not req:
1731                 tmp_message = f"request {request_id} is not found"
1732                 tmp_log.error(tmp_message)
1733                 return json.dumps((False, tmp_message))
1734             user_name = req[0].get("username")
1735             if user_name and user_name != requester:
1736                 tmp_message = f"request {request_id} is not owned by {requester}"
1737                 tmp_log.error(tmp_message)
1738                 return json.dumps((False, tmp_message))
1739         # set original username
1740         if dn:
1741             c.set_original_user(user_name=clean_user_id(dn))
1742         # execute command
1743         tmp_log.debug(f"com={command_name} host={idds_host} kwargs={str(kwargs)}")
1744         ret = getattr(c, command_name)(**kwargs)
1745         tmp_log.debug(str(ret))
1746         if isinstance(ret, dict) and "message" in ret:
1747             return json.dumps((True, [ret["status"], ret["message"]]))
1748         return json.dumps((True, ret))
1749     except Exception as e:
1750         tmp_log.error(f"failed with {str(e)} {traceback.format_exc()}")
1751         return json.dumps((False, f"server failed with {str(e)}"))
1752 
1753 
1754 # send command to a job
1755 def send_command_to_job(req, panda_id, com):
1756     # check security
1757     if not isSecure(req):
1758         return json.dumps((False, MESSAGE_SSL))
1759     # check role
1760     prod_role = _has_production_role(req)
1761     if not prod_role:
1762         return json.dumps((False, MESSAGE_PROD_ROLE))
1763     return json.dumps(userIF.send_command_to_job(panda_id, com))
1764 
1765 
1766 # set user secret
1767 def set_user_secret(req, key=None, value=None):
1768     tmp_log = LogWrapper(_logger, f"set_user_secret-{naive_utcnow().isoformat('/')}")
1769     # get owner
1770     dn = req.subprocess_env.get("SSL_CLIENT_S_DN")
1771     if not dn:
1772         tmp_message = "SSL_CLIENT_S_DN is missing in HTTP request"
1773         tmp_log.error(tmp_message)
1774         return json.dumps((False, tmp_message))
1775     owner = clean_user_id(dn)
1776     return json.dumps(userIF.set_user_secret(owner, key, value))
1777 
1778 
1779 # get user secrets
1780 def get_user_secrets(req, keys=None, get_json=None):
1781     tmp_log = LogWrapper(_logger, f"get_user_secrets-{naive_utcnow().isoformat('/')}")
1782     # get owner
1783     dn = req.subprocess_env.get("SSL_CLIENT_S_DN")
1784     get_json = resolve_true(get_json)
1785 
1786     if not dn:
1787         tmp_message = "SSL_CLIENT_S_DN is missing in HTTP request"
1788         tmp_log.error(tmp_message)
1789         return json.dumps((False, tmp_message))
1790     owner = clean_user_id(dn)
1791     return json.dumps(userIF.get_user_secrets(owner, keys, get_json))
1792 
1793 
1794 # get files in datasets
1795 def get_files_in_datasets(req, task_id, dataset_types="input,pseudo_input"):
1796     # check security
1797     if not isSecure(req):
1798         return json.dumps((False, MESSAGE_SSL))
1799     return json.dumps(userIF.get_files_in_datasets(task_id, dataset_types))