File indexing completed on 2026-04-10 08:39:07
0001 """
0002 provide web interface to users
0003
0004 """
0005
0006 import datetime
0007 import json
0008 import re
0009 import sys
0010 import time
0011 import traceback
0012
0013 from pandacommon.pandalogger.LogWrapper import LogWrapper
0014 from pandacommon.pandalogger.PandaLogger import PandaLogger
0015 from pandacommon.pandautils.PandaUtils import naive_utcnow
0016
0017 import pandaserver.jobdispatcher.Protocol as Protocol
0018 from pandaserver.brokerage.SiteMapper import SiteMapper
0019 from pandaserver.config import panda_config
0020 from pandaserver.dataservice.ddm import rucioAPI
0021 from pandaserver.srvcore import CoreUtils
0022 from pandaserver.srvcore.CoreUtils import clean_user_id, resolve_bool
0023 from pandaserver.taskbuffer import JobUtils, PrioUtil
0024 from pandaserver.taskbuffer.JediTaskSpec import JediTaskSpec
0025 from pandaserver.taskbuffer.WrappedPickle import WrappedPickle
0026
0027 try:
0028 import idds.common.constants
0029 import idds.common.utils
0030 from idds.client.client import Client as iDDS_Client
0031 from idds.client.clientmanager import ClientManager as iDDS_ClientManager
0032 except ImportError:
0033 pass
0034
0035 MESSAGE_SSL = "SSL secure connection is required"
0036 MESSAGE_PROD_ROLE = "production or pilot role required"
0037 MESSAGE_TASK_ID = "jediTaskID must be an integer"
0038 MESSAGE_DATABASE = "database error in the PanDA server"
0039 MESSAGE_JSON = "failed to load JSON"
0040
0041 CODE_SSL = 100
0042 CODE_LOGIC = 101
0043 CODE_OTHER_PARAMS = 102
0044
0045
0046 _logger = PandaLogger().getLogger("UserIF")
0047
0048
0049 def resolve_true(variable):
0050 return variable == "True"
0051
0052
0053 def resolve_false(variable):
0054 return variable != "False"
0055
0056
0057
0058 class UserIF:
0059
0060 def __init__(self):
0061 self.taskBuffer = None
0062
0063
0064 def init(self, taskBuffer):
0065 self.taskBuffer = taskBuffer
0066
0067
0068 def submitJobs(self, jobsStr, user, host, userFQANs, prodRole=False, toPending=False):
0069 try:
0070
0071 try:
0072 jobs = WrappedPickle.loads(jobsStr)
0073 except Exception:
0074 jobs = JobUtils.load_jobs_json(jobsStr)
0075 _logger.debug(f"submitJobs {user} len:{len(jobs)} prodRole={prodRole} FQAN:{str(userFQANs)}")
0076 maxJobs = 5000
0077 if len(jobs) > maxJobs:
0078 _logger.error(f"submitJobs: too many jobs more than {maxJobs}")
0079 jobs = jobs[:maxJobs]
0080 except Exception as ex:
0081 _logger.error(f"submitJobs : {str(ex)} {traceback.format_exc()}")
0082 jobs = []
0083
0084 try:
0085 good_labels = True
0086 for tmpJob in jobs:
0087
0088 if tmpJob.prodSourceLabel in ["managed"] and not prodRole:
0089 good_labels = False
0090 good_labels_message = f"submitJobs {user} missing prod-role for prodSourceLabel={tmpJob.prodSourceLabel}"
0091 break
0092
0093
0094 if tmpJob.job_label not in [None, "", "NULL"] and tmpJob.job_label not in JobUtils.job_labels:
0095 good_labels = False
0096 good_labels_message = f"submitJobs {user} wrong job_label={tmpJob.job_label}"
0097 break
0098 except Exception:
0099 err_type, err_value = sys.exc_info()[:2]
0100 _logger.error(f"submitJobs : checking good_labels {err_type} {err_value}")
0101 good_labels = False
0102
0103
0104 if not good_labels:
0105 _logger.error(good_labels_message)
0106 return f"ERROR: {good_labels_message}"
0107
0108 job0 = None
0109
0110
0111 userVO = "atlas"
0112 try:
0113 job0 = jobs[0]
0114 if job0.VO not in [None, "", "NULL"]:
0115 userVO = job0.VO
0116 except (IndexError, AttributeError) as e:
0117 _logger.error(f"submitJobs : checking userVO. userVO not found, defaulting to {userVO}. (Exception {e})")
0118
0119
0120 if userVO == "atlas" and userFQANs == []:
0121 _logger.error(f"submitJobs : VOMS FQANs are missing in your proxy. They are required for {userVO}")
0122
0123
0124
0125 if userVO.lower() == "lsst":
0126 try:
0127 if job0.prodUserName and job0.prodUserName.lower() != "none":
0128 user = job0.prodUserName
0129 except AttributeError:
0130 _logger.error(f"submitJobs : checking username for userVO {userVO}: username not found, defaulting to {user}.")
0131
0132
0133 ret = self.taskBuffer.storeJobs(
0134 jobs,
0135 user,
0136 fqans=userFQANs,
0137 hostname=host,
0138 toPending=toPending,
0139 userVO=userVO,
0140 )
0141 _logger.debug(f"submitJobs {user} ->:{len(ret)}")
0142
0143
0144 return WrappedPickle.dumps(ret)
0145
0146
0147 def setDebugMode(self, dn, pandaID, prodManager, modeOn, workingGroup):
0148 ret = self.taskBuffer.setDebugMode(dn, pandaID, prodManager, modeOn, workingGroup)
0149
0150 return ret
0151
0152
0153 def insertSandboxFileInfo(self, userName, hostName, fileName, fileSize, checkSum):
0154 ret = self.taskBuffer.insertSandboxFileInfo(userName, hostName, fileName, fileSize, checkSum)
0155
0156 return ret
0157
0158
0159 def checkSandboxFile(self, userName, fileSize, checkSum):
0160 ret = self.taskBuffer.checkSandboxFile(userName, fileSize, checkSum)
0161
0162 return ret
0163
0164
0165 def getJobStatus(self, idsStr, use_json, no_pickle=False):
0166 try:
0167
0168 if use_json or no_pickle:
0169 ids = json.loads(idsStr)
0170 else:
0171 ids = WrappedPickle.loads(idsStr)
0172 _logger.debug(f"getJobStatus len : {len(ids)}")
0173 maxIDs = 5500
0174 if len(ids) > maxIDs:
0175 _logger.error(f"too long ID list more than {maxIDs}")
0176 ids = ids[:maxIDs]
0177 except Exception:
0178 type, value, traceBack = sys.exc_info()
0179 _logger.error(f"getJobStatus : {type} {value}")
0180 ids = []
0181 _logger.debug(f"getJobStatus start : {str(ids)} json={use_json}")
0182
0183 ret = self.taskBuffer.peekJobs(ids, use_json=use_json)
0184 _logger.debug("getJobStatus end")
0185
0186 if use_json:
0187 return json.dumps(ret)
0188 if no_pickle:
0189 return JobUtils.dump_jobs_json(ret)
0190 return WrappedPickle.dumps(ret)
0191
0192
0193 def getPandaIDsWithTaskID(self, jediTaskID):
0194
0195 ret = self.taskBuffer.getPandaIDsWithTaskID(jediTaskID)
0196
0197 return WrappedPickle.dumps(ret)
0198
0199
0200 def getJobStatisticsPerSite(self):
0201 ret = self.taskBuffer.getJobStatistics()
0202 return WrappedPickle.dumps(ret, convert_to_safe=True)
0203
0204
0205 def get_job_statistics_per_site_label_resource(self, time_window):
0206 ret = self.taskBuffer.get_job_statistics_per_site_label_resource(time_window)
0207 return json.dumps(ret)
0208
0209
0210 def killJobs(self, idsStr, user, host, code, prodManager, useMailAsID, fqans, killOpts=[]):
0211
0212 ids = WrappedPickle.loads(idsStr)
0213 if not isinstance(ids, list):
0214 ids = [ids]
0215 _logger.info(f"killJob : {user} {code} {prodManager} {fqans} {ids}")
0216 try:
0217 if useMailAsID:
0218 _logger.debug(f"killJob : getting mail address for {user}")
0219 nTry = 3
0220 for iDDMTry in range(nTry):
0221 status, userInfo = rucioAPI.finger(user)
0222 if status:
0223 _logger.debug(f"killJob : {user} is converted to {userInfo['email']}")
0224 user = userInfo["email"]
0225 break
0226 time.sleep(1)
0227 except Exception:
0228 err_type, err_value = sys.exc_info()[:2]
0229 _logger.error(f"killJob : failed to convert email address {user} : {err_type} {err_value}")
0230
0231 wgProdRole = []
0232 for fqan in fqans:
0233 tmpMatch = re.search("/atlas/([^/]+)/Role=production", fqan)
0234 if tmpMatch is not None:
0235
0236 tmpWG = tmpMatch.group(1)
0237 if tmpWG not in ["", "usatlas"] + wgProdRole:
0238 wgProdRole.append(tmpWG)
0239
0240 wgProdRole.append(f"gr_{tmpWG}")
0241
0242 ret = self.taskBuffer.killJobs(ids, user, code, prodManager, wgProdRole, killOpts)
0243 return WrappedPickle.dumps(ret)
0244
0245
0246 def reassignJobs(self, idsStr, user, host, forPending, firstSubmission):
0247
0248 ids = WrappedPickle.loads(idsStr)
0249
0250 ret = self.taskBuffer.reassignJobs(
0251 ids,
0252 forPending=forPending,
0253 firstSubmission=firstSubmission,
0254 )
0255 return WrappedPickle.dumps(ret)
0256
0257
0258 def getScriptOfflineRunning(self, pandaID, days=None):
0259 ret = self.taskBuffer.getScriptOfflineRunning(pandaID, days)
0260 return ret
0261
0262
0263 def get_ban_users(self):
0264 ret = self.taskBuffer.get_ban_users()
0265 return json.dumps(ret)
0266
0267
0268 def getJediTasksInTimeRange(self, dn, timeRange, fullFlag, minTaskID, task_type):
0269 ret = self.taskBuffer.getJediTasksInTimeRange(dn, timeRange, fullFlag, minTaskID, task_type)
0270 return WrappedPickle.dumps(ret)
0271
0272
0273 def getJediTaskDetails(self, jediTaskID, fullFlag, withTaskInfo):
0274 ret = self.taskBuffer.getJediTaskDetails(jediTaskID, fullFlag, withTaskInfo)
0275 return WrappedPickle.dumps(ret)
0276
0277
0278 def getFullJobStatus(self, idsStr, dn):
0279 try:
0280
0281 ids = WrappedPickle.loads(idsStr)
0282
0283 maxIDs = 5500
0284 if len(ids) > maxIDs:
0285 _logger.error(f"getFullJobStatus: too long ID list more than {maxIDs}")
0286 ids = ids[:maxIDs]
0287 except Exception:
0288 type, value, traceBack = sys.exc_info()
0289 _logger.error(f"getFullJobStatus : {type} {value}")
0290 ids = []
0291 _logger.debug(f"getFullJobStatus start : {dn} {str(ids)}")
0292
0293 ret = self.taskBuffer.getFullJobStatus(ids)
0294 _logger.debug("getFullJobStatus end")
0295 return WrappedPickle.dumps(ret)
0296
0297
0298 def insertTaskParams(self, taskParams, user, prodRole, fqans, properErrorCode, parent_tid):
0299 ret = self.taskBuffer.insertTaskParamsPanda(
0300 taskParams,
0301 user,
0302 prodRole,
0303 fqans,
0304 properErrorCode=properErrorCode,
0305 parent_tid=parent_tid,
0306 )
0307 return ret
0308
0309
0310 def killTask(self, jediTaskID, user, prodRole, properErrorCode, broadcast):
0311 ret = self.taskBuffer.sendCommandTaskPanda(
0312 jediTaskID,
0313 user,
0314 prodRole,
0315 "kill",
0316 properErrorCode=properErrorCode,
0317 broadcast=broadcast,
0318 )
0319 return ret
0320
0321
0322 def finishTask(self, jediTaskID, user, prodRole, properErrorCode, qualifier, broadcast):
0323 ret = self.taskBuffer.sendCommandTaskPanda(
0324 jediTaskID,
0325 user,
0326 prodRole,
0327 "finish",
0328 properErrorCode=properErrorCode,
0329 comQualifier=qualifier,
0330 broadcast=broadcast,
0331 )
0332 return ret
0333
0334
0335 def reloadInput(self, jediTaskID, user, prodRole, ignore_hard_exhausted):
0336 if not prodRole and ignore_hard_exhausted:
0337 ignore_hard_exhausted = False
0338 com_qualifier = JediTaskSpec.get_retry_command_qualifiers(ignore_hard_exhausted=ignore_hard_exhausted)
0339 com_comment = json.dumps([{}, com_qualifier])
0340 ret = self.taskBuffer.sendCommandTaskPanda(jediTaskID, user, prodRole, "incexec", comComment=com_comment, properErrorCode=True)
0341 return ret
0342
0343
0344 def retryTask(
0345 self,
0346 jediTaskID,
0347 user,
0348 prodRole,
0349 properErrorCode,
0350 newParams,
0351 noChildRetry,
0352 discardEvents,
0353 disable_staging_mode,
0354 keep_gshare_priority,
0355 ignore_hard_exhausted,
0356 ):
0357
0358 if newParams is not None:
0359 try:
0360
0361 newParams = PrioUtil.decodeJSON(newParams)
0362
0363 taskParams = self.taskBuffer.getTaskParamsPanda(jediTaskID)
0364 taskParamsJson = PrioUtil.decodeJSON(taskParams)
0365
0366 for newKey in newParams:
0367 newVal = newParams[newKey]
0368 taskParamsJson[newKey] = newVal
0369 taskParams = json.dumps(taskParamsJson)
0370
0371 ret = self.taskBuffer.insertTaskParamsPanda(
0372 taskParams,
0373 user,
0374 prodRole,
0375 [],
0376 properErrorCode=properErrorCode,
0377 allowActiveTask=True,
0378 )
0379 except Exception:
0380 err_type, err_value = sys.exc_info()[:2]
0381 ret = 1, f"server error with {err_type}:{err_value}"
0382 else:
0383 if not prodRole and ignore_hard_exhausted:
0384 ignore_hard_exhausted = False
0385 com_qualifier = JediTaskSpec.get_retry_command_qualifiers(
0386 noChildRetry, discardEvents, disable_staging_mode, keep_gshare_priority, ignore_hard_exhausted
0387 )
0388 com_qualifier = " ".join(com_qualifier)
0389
0390 ret = self.taskBuffer.sendCommandTaskPanda(
0391 jediTaskID,
0392 user,
0393 prodRole,
0394 "retry",
0395 properErrorCode=properErrorCode,
0396 comQualifier=com_qualifier,
0397 )
0398 if properErrorCode is True and ret[0] == 5:
0399
0400 jobdefList = self.taskBuffer.getJobdefIDsForFailedJob(jediTaskID)
0401 cUID = CoreUtils.clean_user_id(user)
0402 for jobID in jobdefList:
0403 self.taskBuffer.finalizePendingJobs(cUID, jobID)
0404 self.taskBuffer.increaseAttemptNrPanda(jediTaskID, 5)
0405 return_str = f"retry has been triggered for failed jobs while the task is still {ret[1]}"
0406 if newParams is None:
0407 ret = 0, return_str
0408 else:
0409 ret = 3, return_str
0410 return ret
0411
0412
0413 def reassignTask(self, jediTaskID, user, prodRole, comComment):
0414 ret = self.taskBuffer.sendCommandTaskPanda(
0415 jediTaskID,
0416 user,
0417 prodRole,
0418 "reassign",
0419 comComment=comComment,
0420 properErrorCode=True,
0421 )
0422 return ret
0423
0424
0425 def pauseTask(self, jediTaskID, user, prodRole):
0426 ret = self.taskBuffer.sendCommandTaskPanda(jediTaskID, user, prodRole, "pause", properErrorCode=True)
0427 return ret
0428
0429
0430 def resumeTask(self, jediTaskID, user, prodRole):
0431 ret = self.taskBuffer.sendCommandTaskPanda(jediTaskID, user, prodRole, "resume", properErrorCode=True)
0432 return ret
0433
0434
0435 def avalancheTask(self, jediTaskID, user, prodRole):
0436 ret = self.taskBuffer.sendCommandTaskPanda(jediTaskID, user, prodRole, "avalanche", properErrorCode=True)
0437 return ret
0438
0439
0440 def send_command_to_task(self, jedi_task_id, user, prod_role, command_string):
0441 ret = self.taskBuffer.sendCommandTaskPanda(jedi_task_id, user, prod_role, command_string, properErrorCode=True)
0442 return ret
0443
0444
0445 def changeTaskPriority(self, jediTaskID, newPriority):
0446 ret = self.taskBuffer.changeTaskPriorityPanda(jediTaskID, newPriority)
0447 return ret
0448
0449
0450 def increaseAttemptNrPanda(self, jediTaskID, increasedNr):
0451 ret = self.taskBuffer.increaseAttemptNrPanda(jediTaskID, increasedNr)
0452 return ret
0453
0454
0455 def changeTaskAttributePanda(self, jediTaskID, attrName, attrValue):
0456 ret = self.taskBuffer.changeTaskAttributePanda(jediTaskID, attrName, attrValue)
0457 return ret
0458
0459
0460 def changeTaskSplitRulePanda(self, jediTaskID, attrName, attrValue):
0461 ret = self.taskBuffer.changeTaskSplitRulePanda(jediTaskID, attrName, attrValue)
0462 return ret
0463
0464
0465 def reactivateTask(self, jediTaskID, keep_attempt_nr, trigger_job_generation):
0466 ret = self.taskBuffer.reactivateTask(jediTaskID, keep_attempt_nr, trigger_job_generation)
0467 return ret
0468
0469
0470 def getTaskStatus(self, jediTaskID):
0471 ret = self.taskBuffer.getTaskStatus(jediTaskID)
0472 return ret[0]
0473
0474
0475 def reassignShare(self, jedi_task_ids, share_dest, reassign_running):
0476 return self.taskBuffer.reassignShare(jedi_task_ids, share_dest, reassign_running)
0477
0478
0479 def getTaskParamsMap(self, jediTaskID):
0480
0481 ret = self.taskBuffer.getTaskParamsMap(jediTaskID)
0482 return ret
0483
0484
0485 def updateWorkers(self, user, host, harvesterID, data):
0486 ret = self.taskBuffer.updateWorkers(harvesterID, data)
0487 if ret is None:
0488 return_value = (False, MESSAGE_DATABASE)
0489 else:
0490 return_value = (True, ret)
0491 return json.dumps(return_value)
0492
0493
0494 def updateServiceMetrics(self, user, host, harvesterID, data):
0495 ret = self.taskBuffer.updateServiceMetrics(harvesterID, data)
0496 if ret is None:
0497 return_value = (False, MESSAGE_DATABASE)
0498 else:
0499 return_value = (True, ret)
0500 return json.dumps(return_value)
0501
0502
0503 def addHarvesterDialogs(self, user, harvesterID, dialogs):
0504 ret = self.taskBuffer.addHarvesterDialogs(harvesterID, dialogs)
0505 if not ret:
0506 return_value = (False, MESSAGE_DATABASE)
0507 else:
0508 return_value = (True, "")
0509 return json.dumps(return_value)
0510
0511
0512 def harvesterIsAlive(self, user, host, harvesterID, data):
0513 ret = self.taskBuffer.harvesterIsAlive(user, host, harvesterID, data)
0514 if ret is None:
0515 return_value = (False, MESSAGE_DATABASE)
0516 else:
0517 return_value = (True, ret)
0518 return json.dumps(return_value)
0519
0520
0521 def getWorkerStats(self):
0522 return self.taskBuffer.getWorkerStats()
0523
0524
0525 def reportWorkerStats_jobtype(self, harvesterID, siteName, paramsList):
0526 return self.taskBuffer.reportWorkerStats_jobtype(harvesterID, siteName, paramsList)
0527
0528
0529 def setNumSlotsForWP(self, pandaQueueName, numSlots, gshare, resourceType, validPeriod):
0530 return_value = self.taskBuffer.setNumSlotsForWP(pandaQueueName, numSlots, gshare, resourceType, validPeriod)
0531 return json.dumps(return_value)
0532
0533
0534 def enableJumboJobs(self, jediTaskID, totalJumboJobs, nJumboPerSite):
0535 return_value = self.taskBuffer.enableJumboJobs(jediTaskID, totalJumboJobs, nJumboPerSite)
0536 if totalJumboJobs > 0 and return_value[0] == 0:
0537 self.avalancheTask(jediTaskID, "panda", True)
0538 return json.dumps(return_value)
0539
0540
0541 def getUserJobMetadata(self, jediTaskID):
0542 return_value = self.taskBuffer.getUserJobMetadata(jediTaskID)
0543 return json.dumps(return_value)
0544
0545
0546 def getJumboJobDatasets(self, n_days, grace_period):
0547 return_value = self.taskBuffer.getJumboJobDatasets(n_days, grace_period)
0548
0549 return json.dumps(return_value)
0550
0551
0552 def sweepPQ(self, panda_queue, status_list, ce_list, submission_host_list):
0553 try:
0554 panda_queue_des = json.loads(panda_queue)
0555 status_list_des = json.loads(status_list)
0556 ce_list_des = json.loads(ce_list)
0557 submission_host_list_des = json.loads(submission_host_list)
0558 except Exception:
0559 _logger.error("Problem deserializing variables")
0560
0561 ret = self.taskBuffer.sweepPQ(panda_queue_des, status_list_des, ce_list_des, submission_host_list_des)
0562 return WrappedPickle.dumps(ret)
0563
0564
0565 def send_command_to_job(self, panda_id, com):
0566 ret = self.taskBuffer.send_command_to_job(panda_id, com)
0567 return ret
0568
0569
0570 def set_user_secret(self, owner, key, value):
0571 ret = self.taskBuffer.set_user_secret(owner, key, value)
0572 return ret
0573
0574
0575 def get_user_secrets(self, owner, keys, get_json):
0576 ret = self.taskBuffer.get_user_secrets(owner, keys, get_json)
0577 return ret
0578
0579
0580 def get_files_in_datasets(self, task_id, dataset_types):
0581 ret = self.taskBuffer.get_files_in_datasets(task_id, dataset_types)
0582 return ret
0583
0584
0585
0586 userIF = UserIF()
0587 del UserIF
0588
0589
0590
0591 def _getFQAN(req):
0592 fqans = []
0593 for tmp_key in req.subprocess_env:
0594 tmp_value = req.subprocess_env[tmp_key]
0595
0596
0597 if tmp_key.startswith("GRST_CRED_") and tmp_value.startswith("VOMS"):
0598 fqan = tmp_value.split()[-1]
0599 fqans.append(fqan)
0600
0601
0602 elif tmp_key.startswith("GRST_CONN_"):
0603 tmp_items = tmp_value.split(":")
0604 if len(tmp_items) == 2 and tmp_items[0] == "fqan":
0605 fqans.append(tmp_items[-1])
0606
0607 return fqans
0608
0609
0610
0611 def _getDN(req):
0612 real_dn = ""
0613 if "SSL_CLIENT_S_DN" in req.subprocess_env:
0614
0615 real_dn = CoreUtils.get_bare_dn(req.subprocess_env["SSL_CLIENT_S_DN"], keep_proxy=True)
0616 return real_dn
0617
0618
0619
0620 def _has_production_role(req):
0621
0622 user = _getDN(req)
0623 for sdn in panda_config.production_dns:
0624 if sdn in user:
0625 return True
0626
0627 fqans = _getFQAN(req)
0628
0629 for fqan in fqans:
0630
0631 for rolePat in [
0632 "/atlas/usatlas/Role=production",
0633 "/atlas/Role=production",
0634 "^/[^/]+/Role=production",
0635 ]:
0636 if fqan.startswith(rolePat):
0637 return True
0638 if re.search(rolePat, fqan):
0639 return True
0640 return False
0641
0642
0643
0644 def _getWGwithPR(req):
0645 try:
0646 fqans = _getFQAN(req)
0647 for fqan in fqans:
0648 tmpMatch = re.search("/[^/]+/([^/]+)/Role=production", fqan)
0649 if tmpMatch is not None:
0650
0651 tmpWG = tmpMatch.group(1)
0652 if tmpWG not in ["", "usatlas"]:
0653 return tmpWG.split("-")[-1].lower()
0654 except Exception:
0655 pass
0656 return None
0657
0658
0659
0660 def isSecure(req):
0661
0662 if not Protocol.isSecure(req):
0663 return False
0664
0665 if "/CN=limited proxy" in req.subprocess_env["SSL_CLIENT_S_DN"]:
0666 _logger.warning(f"access via limited proxy : {req.subprocess_env['SSL_CLIENT_S_DN']}")
0667 return False
0668 return True
0669
0670
0671 """
0672 web service interface
0673
0674 """
0675
0676
0677
0678 def submitJobs(req, jobs, toPending=None):
0679
0680 if not isSecure(req):
0681 return False
0682
0683 user = None
0684 if "SSL_CLIENT_S_DN" in req.subprocess_env:
0685 user = _getDN(req)
0686
0687 fqans = _getFQAN(req)
0688
0689 host = req.get_remote_host()
0690
0691 is_production_role = _has_production_role(req)
0692
0693 toPending = resolve_true(toPending)
0694
0695 return userIF.submitJobs(jobs, user, host, fqans, is_production_role, toPending)
0696
0697
0698
0699 def getJobStatus(req, ids, no_pickle=None):
0700 return userIF.getJobStatus(ids, req.acceptJson(), no_pickle)
0701
0702
0703
0704 def setDebugMode(req, pandaID, modeOn):
0705 tmp_log = LogWrapper(_logger, f"setDebugMode {pandaID} {modeOn}")
0706
0707 if "SSL_CLIENT_S_DN" not in req.subprocess_env:
0708 error_str = MESSAGE_SSL
0709 tmp_log.error(error_str)
0710 return f"ERROR: {error_str}"
0711 user = _getDN(req)
0712
0713 is_production_manager = _has_production_role(req)
0714 fqans = _getFQAN(req)
0715
0716 modeOn = resolve_true(modeOn)
0717
0718
0719 working_group = _getWGwithPR(req)
0720 tmp_log.debug(f"user={user} mgr={is_production_manager} wg={working_group} fqans={str(fqans)}")
0721
0722 return userIF.setDebugMode(user, pandaID, is_production_manager, modeOn, working_group)
0723
0724
0725
0726 def insertSandboxFileInfo(req, userName, fileName, fileSize, checkSum):
0727 tmp_log = LogWrapper(_logger, f"insertSandboxFileInfo {userName} {fileName}")
0728
0729 if "SSL_CLIENT_S_DN" not in req.subprocess_env:
0730 error_str = MESSAGE_SSL
0731 tmp_log.error(error_str)
0732 return f"ERROR: {error_str}"
0733
0734
0735 is_production_manager = _has_production_role(req)
0736 if not is_production_manager:
0737 tmp_log.error(MESSAGE_PROD_ROLE)
0738 return f"ERROR: {MESSAGE_PROD_ROLE}"
0739
0740
0741 if hasattr(panda_config, "sandboxHostname") and panda_config.sandboxHostname:
0742 hostName = panda_config.sandboxHostname
0743 else:
0744 hostName = req.get_remote_host()
0745
0746 return userIF.insertSandboxFileInfo(userName, hostName, fileName, fileSize, checkSum)
0747
0748
0749
0750 def checkSandboxFile(req, fileSize, checkSum):
0751
0752 if "SSL_CLIENT_S_DN" not in req.subprocess_env:
0753 return f"ERROR: {MESSAGE_SSL}"
0754 user = _getDN(req)
0755
0756 return userIF.checkSandboxFile(user, fileSize, checkSum)
0757
0758
0759
0760 def get_job_statistics_per_site_label_resource(req, time_window=None):
0761 return userIF.get_job_statistics_per_site_label_resource(time_window)
0762
0763
0764
0765 def getJobStatisticsPerSite(req):
0766 return userIF.getJobStatisticsPerSite()
0767
0768
0769
0770 def killJobs(req, ids, code=None, useMailAsID=None, killOpts=None):
0771
0772 if not isSecure(req):
0773 return False
0774
0775 user = None
0776 if "SSL_CLIENT_S_DN" in req.subprocess_env:
0777 user = _getDN(req)
0778
0779 is_production_manager = _has_production_role(req)
0780
0781 fqans = _getFQAN(req)
0782
0783 useMailAsID = resolve_true(useMailAsID)
0784
0785
0786 host = req.get_remote_host()
0787
0788 if killOpts is None:
0789 killOpts = []
0790 else:
0791 killOpts = killOpts.split(",")
0792 return userIF.killJobs(ids, user, host, code, is_production_manager, useMailAsID, fqans, killOpts)
0793
0794
0795
0796 def reassignJobs(req, ids, forPending=None, firstSubmission=None):
0797
0798 if not isSecure(req):
0799 return False
0800
0801 user = None
0802 if "SSL_CLIENT_S_DN" in req.subprocess_env:
0803 user = _getDN(req)
0804
0805 host = req.get_remote_host()
0806
0807 forPending = resolve_true(forPending)
0808
0809
0810 firstSubmission = resolve_false(firstSubmission)
0811
0812 return userIF.reassignJobs(ids, user, host, forPending, firstSubmission)
0813
0814
0815
0816 def get_ban_users(req):
0817 return userIF.get_ban_users()
0818
0819
0820
0821 def getScriptOfflineRunning(req, pandaID, days=None):
0822 try:
0823 if days is not None:
0824 days = int(days)
0825 except Exception:
0826 days = None
0827 return userIF.getScriptOfflineRunning(pandaID, days)
0828
0829
0830
0831 def getJediTasksInTimeRange(req, timeRange, dn=None, fullFlag=None, minTaskID=None, task_type="user"):
0832
0833 if not isSecure(req):
0834 return False
0835
0836 if "SSL_CLIENT_S_DN" not in req.subprocess_env:
0837 return False
0838 if dn is None:
0839 dn = _getDN(req)
0840 fullFlag = resolve_true(fullFlag)
0841
0842 try:
0843 minTaskID = int(minTaskID)
0844 except Exception:
0845 minTaskID = None
0846 _logger.debug(f"getJediTasksInTimeRange {dn} {timeRange}")
0847
0848 return userIF.getJediTasksInTimeRange(dn, timeRange, fullFlag, minTaskID, task_type)
0849
0850
0851
0852 def getJediTaskDetails(req, jediTaskID, fullFlag, withTaskInfo):
0853
0854 if not isSecure(req):
0855 return False
0856
0857 if "SSL_CLIENT_S_DN" not in req.subprocess_env:
0858 return False
0859
0860 fullFlag = resolve_true(fullFlag)
0861 withTaskInfo = resolve_true(withTaskInfo)
0862
0863 _logger.debug(f"getJediTaskDetails {jediTaskID} {fullFlag} {withTaskInfo}")
0864
0865 return userIF.getJediTaskDetails(jediTaskID, fullFlag, withTaskInfo)
0866
0867
0868
0869 def getFullJobStatus(req, ids):
0870
0871 if not isSecure(req):
0872 return False
0873
0874 if "SSL_CLIENT_S_DN" not in req.subprocess_env:
0875 return False
0876 dn = _getDN(req)
0877 return userIF.getFullJobStatus(ids, dn)
0878
0879
0880
0881 def insertTaskParams(req, taskParams=None, properErrorCode=None, parent_tid=None):
0882 tmp_log = LogWrapper(_logger, f"insertTaskParams-{naive_utcnow().isoformat('/')}")
0883 tmp_log.debug("start")
0884 properErrorCode = resolve_true(properErrorCode)
0885
0886
0887 if not isSecure(req):
0888 tmp_log.debug(MESSAGE_SSL)
0889 return WrappedPickle.dumps((False, MESSAGE_SSL))
0890
0891 user = None
0892 if "SSL_CLIENT_S_DN" in req.subprocess_env:
0893 user = _getDN(req)
0894
0895
0896 try:
0897 json.loads(taskParams)
0898 except Exception:
0899 tmp_log.debug(MESSAGE_JSON)
0900 return WrappedPickle.dumps((False, MESSAGE_JSON))
0901
0902 is_production_role = _has_production_role(req)
0903
0904 fqans = _getFQAN(req)
0905
0906 tmp_log.debug(f"user={user} prodRole={is_production_role} FQAN:{str(fqans)} parent_tid={parent_tid}")
0907 ret = userIF.insertTaskParams(taskParams, user, is_production_role, fqans, properErrorCode, parent_tid)
0908 try:
0909 tmp_log.debug(ret[1])
0910 except Exception:
0911 pass
0912 return WrappedPickle.dumps(ret)
0913
0914
0915
0916 def killTask(req, jediTaskID=None, properErrorCode=None, broadcast=None):
0917 properErrorCode = resolve_true(properErrorCode)
0918 broadcast = resolve_true(broadcast)
0919
0920
0921 if not isSecure(req):
0922 error_code = CODE_SSL if properErrorCode else False
0923 return WrappedPickle.dumps((error_code, MESSAGE_SSL))
0924
0925
0926 user = None
0927 if "SSL_CLIENT_S_DN" in req.subprocess_env:
0928 user = _getDN(req)
0929
0930 is_production_role = _has_production_role(req)
0931
0932 try:
0933 jediTaskID = int(jediTaskID)
0934 except Exception:
0935 error_code = CODE_LOGIC if properErrorCode else False
0936 return WrappedPickle.dumps((error_code, MESSAGE_TASK_ID))
0937
0938 ret = userIF.killTask(jediTaskID, user, is_production_role, properErrorCode, broadcast)
0939 return WrappedPickle.dumps(ret)
0940
0941
0942
0943 def retryTask(
0944 req,
0945 jediTaskID,
0946 properErrorCode=None,
0947 newParams=None,
0948 noChildRetry=None,
0949 discardEvents=None,
0950 disable_staging_mode=None,
0951 keep_gshare_priority=None,
0952 ignore_hard_exhausted=None,
0953 ):
0954 properErrorCode = resolve_true(properErrorCode)
0955 noChildRetry = resolve_true(noChildRetry)
0956 discardEvents = resolve_true(discardEvents)
0957 disable_staging_mode = resolve_true(disable_staging_mode)
0958 keep_gshare_priority = resolve_true(keep_gshare_priority)
0959 ignore_hard_exhausted = resolve_true(ignore_hard_exhausted)
0960
0961
0962 if not isSecure(req):
0963 error_code = CODE_SSL if properErrorCode else False
0964 return WrappedPickle.dumps((error_code, MESSAGE_SSL))
0965
0966
0967 user = None
0968 if "SSL_CLIENT_S_DN" in req.subprocess_env:
0969 user = _getDN(req)
0970
0971 is_production_role = _has_production_role(req)
0972
0973 try:
0974 jediTaskID = int(jediTaskID)
0975 except Exception:
0976 error_code = CODE_LOGIC if properErrorCode else False
0977 return WrappedPickle.dumps((error_code, MESSAGE_TASK_ID))
0978
0979 ret = userIF.retryTask(
0980 jediTaskID,
0981 user,
0982 is_production_role,
0983 properErrorCode,
0984 newParams,
0985 noChildRetry,
0986 discardEvents,
0987 disable_staging_mode,
0988 keep_gshare_priority,
0989 ignore_hard_exhausted,
0990 )
0991 return WrappedPickle.dumps(ret)
0992
0993
0994
0995 def reassignTask(req, jediTaskID, site=None, cloud=None, nucleus=None, soft=None, mode=None):
0996
0997 if not isSecure(req):
0998 return WrappedPickle.dumps((CODE_SSL, MESSAGE_SSL))
0999
1000 user = None
1001 if "SSL_CLIENT_S_DN" in req.subprocess_env:
1002 user = _getDN(req)
1003
1004 is_production_role = _has_production_role(req)
1005
1006 try:
1007 jediTaskID = int(jediTaskID)
1008 except Exception:
1009 return WrappedPickle.dumps((CODE_LOGIC, MESSAGE_TASK_ID))
1010
1011 if site is not None:
1012
1013 comComment = f"site:{site}:y"
1014 elif nucleus is not None:
1015 comComment = f"nucleus:{nucleus}:n"
1016 else:
1017 comComment = f"cloud:{cloud}:n"
1018 if mode == "nokill":
1019 comComment += ":nokill reassign"
1020 elif mode == "soft" or soft == "True":
1021 comComment += ":soft reassign"
1022 ret = userIF.reassignTask(jediTaskID, user, is_production_role, comComment)
1023 return WrappedPickle.dumps(ret)
1024
1025
1026
1027 def finishTask(req, jediTaskID=None, properErrorCode=None, soft=None, broadcast=None):
1028 properErrorCode = resolve_true(properErrorCode)
1029 broadcast = resolve_true(broadcast)
1030 qualifier = None
1031 if soft == "True":
1032 qualifier = "soft"
1033
1034
1035 if not isSecure(req):
1036 error_code = CODE_SSL if properErrorCode else False
1037 return WrappedPickle.dumps((error_code, MESSAGE_SSL))
1038
1039
1040 user = None
1041 if "SSL_CLIENT_S_DN" in req.subprocess_env:
1042 user = _getDN(req)
1043
1044 is_production_role = _has_production_role(req)
1045
1046 try:
1047 jediTaskID = int(jediTaskID)
1048 except Exception:
1049 error_code = CODE_LOGIC if properErrorCode else False
1050 return WrappedPickle.dumps((error_code, MESSAGE_TASK_ID))
1051
1052 ret = userIF.finishTask(jediTaskID, user, is_production_role, properErrorCode, qualifier, broadcast)
1053 return WrappedPickle.dumps(ret)
1054
1055
1056
1057 def reloadInput(req, jediTaskID, properErrorCode=None, ignore_hard_exhausted=None):
1058 properErrorCode = resolve_true(properErrorCode)
1059
1060 if not isSecure(req):
1061 error_code = CODE_SSL if properErrorCode else False
1062 return WrappedPickle.dumps((error_code, MESSAGE_SSL))
1063
1064
1065 user = None
1066 if "SSL_CLIENT_S_DN" in req.subprocess_env:
1067 user = _getDN(req)
1068
1069 is_production_role = _has_production_role(req)
1070
1071 try:
1072 jediTaskID = int(jediTaskID)
1073 except Exception:
1074 error_code = CODE_LOGIC if properErrorCode else False
1075 return WrappedPickle.dumps((error_code, MESSAGE_TASK_ID))
1076 ignore_hard_exhausted = resolve_true(ignore_hard_exhausted)
1077 ret = userIF.reloadInput(jediTaskID, user, is_production_role, ignore_hard_exhausted)
1078 return WrappedPickle.dumps(ret)
1079
1080
1081
1082 def changeTaskPriority(req, jediTaskID=None, newPriority=None):
1083
1084 if not isSecure(req):
1085 return WrappedPickle.dumps((False, MESSAGE_SSL))
1086
1087 is_production_role = _has_production_role(req)
1088
1089 if not is_production_role:
1090 return "Failed : production or pilot role required"
1091
1092 try:
1093 jediTaskID = int(jediTaskID)
1094 except Exception:
1095 return WrappedPickle.dumps((False, MESSAGE_TASK_ID))
1096
1097 try:
1098 newPriority = int(newPriority)
1099 except Exception:
1100 return WrappedPickle.dumps((False, "newPriority must be an integer"))
1101 ret = userIF.changeTaskPriority(jediTaskID, newPriority)
1102 return WrappedPickle.dumps(ret)
1103
1104
1105
1106 def increaseAttemptNrPanda(req, jediTaskID, increasedNr):
1107
1108 if not isSecure(req):
1109 return WrappedPickle.dumps((False, MESSAGE_SSL))
1110
1111
1112 is_production_role = _has_production_role(req)
1113
1114 if not is_production_role:
1115 return WrappedPickle.dumps((3, MESSAGE_PROD_ROLE))
1116
1117
1118 try:
1119 jediTaskID = int(jediTaskID)
1120 except Exception:
1121 return WrappedPickle.dumps((4, MESSAGE_TASK_ID))
1122
1123
1124 try:
1125 increasedNr = int(increasedNr)
1126 except Exception:
1127 increasedNr = -1
1128 if increasedNr < 0:
1129 return WrappedPickle.dumps((4, "increase must be a positive integer"))
1130
1131 return WrappedPickle.dumps(userIF.increaseAttemptNrPanda(jediTaskID, increasedNr))
1132
1133
1134
1135 def changeTaskAttributePanda(req, jediTaskID, attrName, attrValue):
1136
1137 if not isSecure(req):
1138 return WrappedPickle.dumps((False, MESSAGE_SSL))
1139
1140
1141 is_production_role = _has_production_role(req)
1142
1143 if not is_production_role:
1144 return WrappedPickle.dumps((False, MESSAGE_PROD_ROLE))
1145
1146 try:
1147 jediTaskID = int(jediTaskID)
1148 except Exception:
1149 return WrappedPickle.dumps((False, MESSAGE_TASK_ID))
1150
1151 if attrName not in ["ramCount", "wallTime", "cpuTime", "coreCount"]:
1152 return WrappedPickle.dumps((2, f"disallowed to update {attrName}"))
1153 ret = userIF.changeTaskAttributePanda(jediTaskID, attrName, attrValue)
1154 return WrappedPickle.dumps((ret, None))
1155
1156
1157
1158 def changeTaskSplitRulePanda(req, jediTaskID, attrName, attrValue):
1159
1160 if not isSecure(req):
1161 return WrappedPickle.dumps((False, MESSAGE_SSL))
1162
1163
1164 is_production_role = _has_production_role(req)
1165
1166 if not is_production_role:
1167 return WrappedPickle.dumps((False, MESSAGE_PROD_ROLE))
1168
1169 try:
1170 jediTaskID = int(jediTaskID)
1171 except Exception:
1172 return WrappedPickle.dumps((False, MESSAGE_TASK_ID))
1173
1174 if attrName not in [
1175 "AI",
1176 "TW",
1177 "EC",
1178 "ES",
1179 "MF",
1180 "NG",
1181 "NI",
1182 "NF",
1183 "NJ",
1184 "AV",
1185 "IL",
1186 "LI",
1187 "LC",
1188 "CC",
1189 "OT",
1190 "UZ",
1191 ]:
1192 return WrappedPickle.dumps((2, f"disallowed to update {attrName}"))
1193 ret = userIF.changeTaskSplitRulePanda(jediTaskID, attrName, attrValue)
1194 return WrappedPickle.dumps((ret, None))
1195
1196
1197
1198 def pauseTask(req, jediTaskID):
1199
1200 if not isSecure(req):
1201 return WrappedPickle.dumps((False, MESSAGE_SSL))
1202
1203 user = None
1204 if "SSL_CLIENT_S_DN" in req.subprocess_env:
1205 user = _getDN(req)
1206
1207 is_production_role = _has_production_role(req)
1208
1209 if not is_production_role:
1210 return WrappedPickle.dumps((False, MESSAGE_PROD_ROLE))
1211
1212 try:
1213 jediTaskID = int(jediTaskID)
1214 except Exception:
1215 return WrappedPickle.dumps((False, MESSAGE_TASK_ID))
1216 ret = userIF.pauseTask(jediTaskID, user, is_production_role)
1217 return WrappedPickle.dumps(ret)
1218
1219
1220
1221 def resumeTask(req, jediTaskID):
1222
1223 if not isSecure(req):
1224 return WrappedPickle.dumps((False, MESSAGE_SSL))
1225
1226 user = None
1227 if "SSL_CLIENT_S_DN" in req.subprocess_env:
1228 user = _getDN(req)
1229
1230 is_production_role = _has_production_role(req)
1231
1232 if not is_production_role:
1233 return WrappedPickle.dumps((False, "production role required"))
1234
1235 try:
1236 jediTaskID = int(jediTaskID)
1237 except Exception:
1238 return WrappedPickle.dumps((False, MESSAGE_TASK_ID))
1239 ret = userIF.resumeTask(jediTaskID, user, is_production_role)
1240 return WrappedPickle.dumps(ret)
1241
1242
1243
1244 def avalancheTask(req, jediTaskID):
1245
1246 if not isSecure(req):
1247 return WrappedPickle.dumps((False, MESSAGE_SSL))
1248
1249 user = None
1250 if "SSL_CLIENT_S_DN" in req.subprocess_env:
1251 user = _getDN(req)
1252
1253 is_production_role = _has_production_role(req)
1254
1255 if not is_production_role:
1256 return WrappedPickle.dumps((False, "production role required"))
1257
1258 try:
1259 jediTaskID = int(jediTaskID)
1260 except Exception:
1261 return WrappedPickle.dumps((False, MESSAGE_TASK_ID))
1262 ret = userIF.avalancheTask(jediTaskID, user, is_production_role)
1263 return WrappedPickle.dumps(ret)
1264
1265
1266
1267 def release_task(req, jedi_task_id):
1268
1269 if not isSecure(req):
1270 return json.dumps((False, MESSAGE_SSL))
1271
1272 user = None
1273 if "SSL_CLIENT_S_DN" in req.subprocess_env:
1274 user = _getDN(req)
1275
1276 prod_role = _has_production_role(req)
1277
1278 if not prod_role:
1279 return json.dumps((False, "production role required"))
1280
1281 try:
1282 jedi_task_id = int(jedi_task_id)
1283 except Exception:
1284 return json.dumps((False, MESSAGE_TASK_ID))
1285 ret = userIF.send_command_to_task(jedi_task_id, user, prod_role, "release")
1286 return json.dumps(ret)
1287
1288
1289
1290 def killUnfinishedJobs(req, jediTaskID, code=None, useMailAsID=None):
1291
1292 if not isSecure(req):
1293 return False
1294
1295 user = None
1296 if "SSL_CLIENT_S_DN" in req.subprocess_env:
1297 user = _getDN(req)
1298
1299 is_production_manager = False
1300
1301 fqans = _getFQAN(req)
1302
1303 for fqan in fqans:
1304
1305 for rolePat in ["/atlas/usatlas/Role=production", "/atlas/Role=production"]:
1306 if fqan.startswith(rolePat):
1307 is_production_manager = True
1308 break
1309
1310 if is_production_manager:
1311 break
1312
1313 useMailAsID = resolve_true(useMailAsID)
1314
1315 host = req.get_remote_host()
1316
1317 ids = userIF.getPandaIDsWithTaskID(jediTaskID)
1318
1319 return userIF.killJobs(ids, user, host, code, is_production_manager, useMailAsID, fqans)
1320
1321
1322
1323 def changeTaskModTimePanda(req, jediTaskID, diffValue):
1324
1325 if not isSecure(req):
1326 return WrappedPickle.dumps((False, MESSAGE_SSL))
1327
1328
1329 is_production_role = _has_production_role(req)
1330
1331 if not is_production_role:
1332 return WrappedPickle.dumps((False, MESSAGE_PROD_ROLE))
1333
1334 try:
1335 jediTaskID = int(jediTaskID)
1336 except Exception:
1337 return WrappedPickle.dumps((False, MESSAGE_TASK_ID))
1338 try:
1339 diffValue = int(diffValue)
1340 attrValue = datetime.datetime.now() + datetime.timedelta(hours=diffValue)
1341 except Exception:
1342 return WrappedPickle.dumps((False, f"failed to convert {diffValue} to time diff"))
1343 ret = userIF.changeTaskAttributePanda(jediTaskID, "modificationTime", attrValue)
1344 return WrappedPickle.dumps((ret, None))
1345
1346
1347
1348 def getPandaIDsWithTaskID(req, jediTaskID):
1349 try:
1350 jediTaskID = int(jediTaskID)
1351 except Exception:
1352 return WrappedPickle.dumps((False, MESSAGE_TASK_ID))
1353 idsStr = userIF.getPandaIDsWithTaskID(jediTaskID)
1354
1355 ids = WrappedPickle.loads(idsStr)
1356
1357 return WrappedPickle.dumps(ids)
1358
1359
1360
1361 def reactivateTask(req, jediTaskID, keep_attempt_nr=None, trigger_job_generation=None):
1362
1363 if not isSecure(req):
1364 return WrappedPickle.dumps((False, MESSAGE_SSL))
1365
1366 is_production_manager = _has_production_role(req)
1367 if not is_production_manager:
1368 msg = "production role is required"
1369 _logger.error(f"reactivateTask: {msg}")
1370 return WrappedPickle.dumps((False, msg))
1371 try:
1372 jediTaskID = int(jediTaskID)
1373 except Exception:
1374 return WrappedPickle.dumps((False, MESSAGE_TASK_ID))
1375 keep_attempt_nr = resolve_true(keep_attempt_nr)
1376 trigger_job_generation = resolve_true(trigger_job_generation)
1377
1378 ret = userIF.reactivateTask(jediTaskID, keep_attempt_nr, trigger_job_generation)
1379
1380 return WrappedPickle.dumps(ret)
1381
1382
1383
1384 def getTaskStatus(req, jediTaskID):
1385 try:
1386 jediTaskID = int(jediTaskID)
1387 except Exception:
1388 return WrappedPickle.dumps((False, MESSAGE_TASK_ID))
1389 ret = userIF.getTaskStatus(jediTaskID)
1390 return WrappedPickle.dumps(ret)
1391
1392
1393
1394 def reassignShare(req, jedi_task_ids_pickle, share, reassign_running):
1395
1396 if not isSecure(req):
1397 return WrappedPickle.dumps((False, MESSAGE_SSL))
1398
1399
1400 prod_role = _has_production_role(req)
1401 if not prod_role:
1402 return WrappedPickle.dumps((False, MESSAGE_PROD_ROLE))
1403
1404 jedi_task_ids = WrappedPickle.loads(jedi_task_ids_pickle)
1405 _logger.debug(f"reassignShare: jedi_task_ids: {jedi_task_ids}, share: {share}, reassign_running: {reassign_running} (type {type(reassign_running)})")
1406
1407 if isinstance(reassign_running, str) and "true" in reassign_running.lower():
1408 reassign_running = True
1409 else:
1410 reassign_running = False
1411
1412 if not ((isinstance(jedi_task_ids, list) or (isinstance(jedi_task_ids, tuple)) and isinstance(share, str))):
1413 return WrappedPickle.dumps((False, "jedi_task_ids must be tuple/list and share must be string"))
1414
1415 ret = userIF.reassignShare(jedi_task_ids, share, reassign_running)
1416 return WrappedPickle.dumps(ret)
1417
1418
1419
1420 def getTaskParamsMap(req, jediTaskID):
1421 try:
1422 jediTaskID = int(jediTaskID)
1423 except Exception:
1424 return WrappedPickle.dumps((False, MESSAGE_TASK_ID))
1425 ret = userIF.getTaskParamsMap(jediTaskID)
1426 return WrappedPickle.dumps(ret)
1427
1428
1429
1430 def updateWorkers(req, harvesterID, workers):
1431
1432 if not isSecure(req):
1433 return json.dumps((False, MESSAGE_SSL))
1434
1435 user = _getDN(req)
1436
1437 host = req.get_remote_host()
1438 return_value = None
1439 tStart = naive_utcnow()
1440
1441 try:
1442 data = json.loads(workers)
1443 except Exception:
1444 return_value = json.dumps((False, MESSAGE_JSON))
1445
1446 if return_value is None:
1447 return_value = userIF.updateWorkers(user, host, harvesterID, data)
1448 tDelta = naive_utcnow() - tStart
1449 _logger.debug(f"updateWorkers {harvesterID} took {tDelta.seconds}.{tDelta.microseconds // 1000:03d} sec")
1450
1451 return return_value
1452
1453
1454
1455 def updateServiceMetrics(req, harvesterID, metrics):
1456
1457 if not isSecure(req):
1458 return json.dumps((False, MESSAGE_SSL))
1459
1460 user = _getDN(req)
1461
1462 host = req.get_remote_host()
1463 return_value = None
1464 tStart = naive_utcnow()
1465
1466
1467 try:
1468 data = json.loads(metrics)
1469 except Exception:
1470 return_value = json.dumps((False, MESSAGE_JSON))
1471
1472
1473 if return_value is None:
1474 return_value = userIF.updateServiceMetrics(user, host, harvesterID, data)
1475
1476 tDelta = naive_utcnow() - tStart
1477 _logger.debug(f"updateServiceMetrics {harvesterID} took {tDelta.seconds}.{tDelta.microseconds // 1000:03d} sec")
1478
1479 return return_value
1480
1481
1482
1483 def addHarvesterDialogs(req, harvesterID, dialogs):
1484
1485 if not isSecure(req):
1486 return json.dumps((False, MESSAGE_SSL))
1487
1488 user = _getDN(req)
1489
1490 try:
1491 data = json.loads(dialogs)
1492 except Exception:
1493 return json.dumps((False, MESSAGE_JSON))
1494
1495 return userIF.addHarvesterDialogs(user, harvesterID, data)
1496
1497
1498
1499 def harvesterIsAlive(req, harvesterID, data=None):
1500
1501 if not isSecure(req):
1502 return json.dumps((False, MESSAGE_SSL))
1503
1504 user = _getDN(req)
1505
1506 host = req.get_remote_host()
1507
1508 try:
1509 if data is not None:
1510 data = json.loads(data)
1511 else:
1512 data = dict()
1513 except Exception:
1514 return json.dumps((False, MESSAGE_JSON))
1515
1516 return userIF.harvesterIsAlive(user, host, harvesterID, data)
1517
1518
1519
1520 def getWorkerStats(req):
1521
1522 ret = userIF.getWorkerStats()
1523 return json.dumps(ret)
1524
1525
1526
1527 def reportWorkerStats_jobtype(req, harvesterID, siteName, paramsList):
1528
1529 if not isSecure(req):
1530 return json.dumps((False, MESSAGE_SSL))
1531
1532 ret = userIF.reportWorkerStats_jobtype(harvesterID, siteName, paramsList)
1533 return json.dumps(ret)
1534
1535
1536
1537 def setNumSlotsForWP(req, pandaQueueName, numSlots, gshare=None, resourceType=None, validPeriod=None):
1538
1539 if not isSecure(req):
1540 return json.dumps((CODE_SSL, MESSAGE_SSL))
1541
1542 if not _has_production_role(req):
1543 return json.dumps((CODE_LOGIC, "production role is required in the certificate"))
1544
1545 try:
1546 numSlots = int(numSlots)
1547 except Exception:
1548 return json.dumps((CODE_OTHER_PARAMS, "numSlots must be int"))
1549
1550 return userIF.setNumSlotsForWP(pandaQueueName, numSlots, gshare, resourceType, validPeriod)
1551
1552
1553
1554 def enableJumboJobs(req, jediTaskID, nJumboJobs, nJumboPerSite=None):
1555
1556 if not isSecure(req):
1557 return json.dumps((CODE_SSL, MESSAGE_SSL))
1558
1559 if not _has_production_role(req):
1560 return json.dumps((CODE_LOGIC, "production role is required in the certificate"))
1561
1562 try:
1563 nJumboJobs = int(nJumboJobs)
1564 except Exception:
1565 return json.dumps((CODE_OTHER_PARAMS, "nJumboJobs must be int"))
1566 try:
1567 nJumboPerSite = int(nJumboPerSite)
1568 except Exception:
1569 nJumboPerSite = nJumboJobs
1570
1571 return userIF.enableJumboJobs(jediTaskID, nJumboJobs, nJumboPerSite)
1572
1573
1574
1575 def getUserJobMetadata(req, jediTaskID):
1576 try:
1577 jediTaskID = int(jediTaskID)
1578 except Exception:
1579 return WrappedPickle.dumps((False, MESSAGE_TASK_ID))
1580 return userIF.getUserJobMetadata(jediTaskID)
1581
1582
1583
1584 def getJumboJobDatasets(req, n_days, grace_period=0):
1585 try:
1586 n_days = int(n_days)
1587 except Exception:
1588 return WrappedPickle.dumps((False, "wrong n_days"))
1589 try:
1590 grace_period = int(grace_period)
1591 except Exception:
1592 return WrappedPickle.dumps((False, "wrong grace_period"))
1593 return userIF.getJumboJobDatasets(n_days, grace_period)
1594
1595
1596
1597 def sweepPQ(req, panda_queue, status_list, ce_list, submission_host_list):
1598
1599 if not isSecure(req):
1600 return json.dumps((False, MESSAGE_SSL))
1601
1602 prod_role = _has_production_role(req)
1603 if not prod_role:
1604 return json.dumps((False, MESSAGE_PROD_ROLE))
1605
1606 return json.dumps((True, userIF.sweepPQ(panda_queue, status_list, ce_list, submission_host_list)))
1607
1608
1609
1610 def decode_idds_enum(d):
1611 if "__idds_const__" in d:
1612 items = d["__idds_const__"].split(".")
1613 obj = idds.common.constants
1614 for item in items:
1615 obj = getattr(obj, item)
1616 return obj
1617 else:
1618 return d
1619
1620
1621
1622 def relay_idds_command(req, command_name, args=None, kwargs=None, manager=None, json_outputs=None):
1623 tmp_log = LogWrapper(
1624 _logger,
1625 f"relay_idds_command-{naive_utcnow().isoformat('/')}",
1626 )
1627
1628 if not isSecure(req):
1629 tmp_log.error(MESSAGE_SSL)
1630 return json.dumps((False, MESSAGE_SSL))
1631 try:
1632 manager = resolve_bool(manager)
1633 if not manager:
1634 manager = False
1635 if "+" in command_name:
1636 command_name, idds_host = command_name.split("+")
1637 else:
1638 idds_host = idds.common.utils.get_rest_host()
1639 if manager:
1640 c = iDDS_ClientManager(idds_host)
1641 else:
1642 c = iDDS_Client(idds_host)
1643 if not hasattr(c, command_name):
1644 tmp_str = f"{command_name} is not a command of iDDS {c.__class__.__name__}"
1645 tmp_log.error(tmp_str)
1646 return json.dumps((False, tmp_str))
1647 if args:
1648 try:
1649 args = idds.common.utils.json_loads(args)
1650 except Exception as e:
1651 tmp_log.warning(f"failed to load args json with {str(e)}")
1652 args = json.loads(args, object_hook=decode_idds_enum)
1653 else:
1654 args = []
1655 if kwargs:
1656 try:
1657 kwargs = idds.common.utils.json_loads(kwargs)
1658 except Exception as e:
1659 tmp_log.warning(f"failed to load kwargs json with {str(e)}")
1660 kwargs = json.loads(kwargs, object_hook=decode_idds_enum)
1661 else:
1662 kwargs = {}
1663
1664 if json_outputs and manager:
1665 c.setup_json_outputs()
1666
1667 dn = req.subprocess_env.get("SSL_CLIENT_S_DN")
1668 if dn:
1669 c.set_original_user(user_name=clean_user_id(dn))
1670 tmp_log.debug(f"execute: class={c.__class__.__name__} com={command_name} host={idds_host} args={str(args)[:200]} kwargs={str(kwargs)[:200]}")
1671 ret = getattr(c, command_name)(*args, **kwargs)
1672 tmp_log.debug(f"ret: {str(ret)[:200]}")
1673 try:
1674 return json.dumps((True, ret))
1675 except Exception:
1676 return idds.common.utils.json_dumps((True, ret))
1677 except Exception as e:
1678 tmp_str = f"failed to execute command with {str(e)}"
1679 tmp_log.error(f"{tmp_str} {traceback.format_exc()}")
1680 return json.dumps((False, tmp_str))
1681
1682
1683
1684 def execute_idds_workflow_command(req, command_name, kwargs=None, json_outputs=None):
1685 try:
1686 tmp_log = LogWrapper(
1687 _logger,
1688 f"execute_idds_workflow_command-{naive_utcnow().isoformat('/')}",
1689 )
1690 if kwargs:
1691 try:
1692 kwargs = idds.common.utils.json_loads(kwargs)
1693 except Exception:
1694 kwargs = json.loads(kwargs, object_hook=decode_idds_enum)
1695 else:
1696 kwargs = {}
1697 if "+" in command_name:
1698 command_name, idds_host = command_name.split("+")
1699 else:
1700 idds_host = idds.common.utils.get_rest_host()
1701
1702 if command_name in ["get_status"]:
1703 check_owner = False
1704 elif command_name in ["abort", "suspend", "resume", "retry", "finish"]:
1705 check_owner = True
1706 else:
1707 tmp_message = f"{command_name} is unsupported"
1708 tmp_log.error(tmp_message)
1709 return json.dumps((False, tmp_message))
1710
1711 c = iDDS_ClientManager(idds_host)
1712 if json_outputs:
1713 c.setup_json_outputs()
1714 dn = req.subprocess_env.get("SSL_CLIENT_S_DN")
1715 if check_owner:
1716
1717 if not dn:
1718 tmp_message = "SSL_CLIENT_S_DN is missing in HTTP request"
1719 tmp_log.error(tmp_message)
1720 return json.dumps((False, tmp_message))
1721 requester = clean_user_id(dn)
1722
1723 request_id = kwargs.get("request_id")
1724 if request_id is None:
1725 tmp_message = "request_id is missing"
1726 tmp_log.error(tmp_message)
1727 return json.dumps((False, tmp_message))
1728
1729 req = c.get_requests(request_id=request_id)
1730 if not req:
1731 tmp_message = f"request {request_id} is not found"
1732 tmp_log.error(tmp_message)
1733 return json.dumps((False, tmp_message))
1734 user_name = req[0].get("username")
1735 if user_name and user_name != requester:
1736 tmp_message = f"request {request_id} is not owned by {requester}"
1737 tmp_log.error(tmp_message)
1738 return json.dumps((False, tmp_message))
1739
1740 if dn:
1741 c.set_original_user(user_name=clean_user_id(dn))
1742
1743 tmp_log.debug(f"com={command_name} host={idds_host} kwargs={str(kwargs)}")
1744 ret = getattr(c, command_name)(**kwargs)
1745 tmp_log.debug(str(ret))
1746 if isinstance(ret, dict) and "message" in ret:
1747 return json.dumps((True, [ret["status"], ret["message"]]))
1748 return json.dumps((True, ret))
1749 except Exception as e:
1750 tmp_log.error(f"failed with {str(e)} {traceback.format_exc()}")
1751 return json.dumps((False, f"server failed with {str(e)}"))
1752
1753
1754
1755 def send_command_to_job(req, panda_id, com):
1756
1757 if not isSecure(req):
1758 return json.dumps((False, MESSAGE_SSL))
1759
1760 prod_role = _has_production_role(req)
1761 if not prod_role:
1762 return json.dumps((False, MESSAGE_PROD_ROLE))
1763 return json.dumps(userIF.send_command_to_job(panda_id, com))
1764
1765
1766
1767 def set_user_secret(req, key=None, value=None):
1768 tmp_log = LogWrapper(_logger, f"set_user_secret-{naive_utcnow().isoformat('/')}")
1769
1770 dn = req.subprocess_env.get("SSL_CLIENT_S_DN")
1771 if not dn:
1772 tmp_message = "SSL_CLIENT_S_DN is missing in HTTP request"
1773 tmp_log.error(tmp_message)
1774 return json.dumps((False, tmp_message))
1775 owner = clean_user_id(dn)
1776 return json.dumps(userIF.set_user_secret(owner, key, value))
1777
1778
1779
1780 def get_user_secrets(req, keys=None, get_json=None):
1781 tmp_log = LogWrapper(_logger, f"get_user_secrets-{naive_utcnow().isoformat('/')}")
1782
1783 dn = req.subprocess_env.get("SSL_CLIENT_S_DN")
1784 get_json = resolve_true(get_json)
1785
1786 if not dn:
1787 tmp_message = "SSL_CLIENT_S_DN is missing in HTTP request"
1788 tmp_log.error(tmp_message)
1789 return json.dumps((False, tmp_message))
1790 owner = clean_user_id(dn)
1791 return json.dumps(userIF.get_user_secrets(owner, keys, get_json))
1792
1793
1794
1795 def get_files_in_datasets(req, task_id, dataset_types="input,pseudo_input"):
1796
1797 if not isSecure(req):
1798 return json.dumps((False, MESSAGE_SSL))
1799 return json.dumps(userIF.get_files_in_datasets(task_id, dataset_types))