File indexing completed on 2026-04-10 08:39:07
0001 import argparse
0002 import datetime
0003 import time
0004
0005 from pandacommon.pandautils.PandaUtils import naive_utcnow
0006
0007
0008 from pandaserver.config import panda_config
0009 from pandaserver.taskbuffer.OraDBProxy import DBProxy
0010 from pandaserver.userinterface import Client
0011
0012 proxyS = DBProxy()
0013 proxyS.connect(panda_config.dbhost, panda_config.dbpasswd, panda_config.dbuser, panda_config.dbname)
0014
0015
0016 option_parser = argparse.ArgumentParser(conflict_handler="resolve", description="Reassign jobs in a task")
0017 option_parser.add_argument("taskid", action="store", metavar="TASKID", help="taskID of the task")
0018 option_parser.add_argument(
0019 "-m",
0020 dest="limit",
0021 type=int,
0022 action="store",
0023 default=60,
0024 metavar="MIMUTES",
0025 help="time limit in minute",
0026 )
0027 option_parser.add_argument(
0028 "-9",
0029 action="store_const",
0030 const=True,
0031 dest="forceKill",
0032 default=False,
0033 help="kill jobs before next heartbeat is coming",
0034 )
0035 option_parser.add_argument(
0036 "--keepUnmerged",
0037 action="store_const",
0038 const=True,
0039 dest="keepUnmerged",
0040 default=False,
0041 help="generate new jobs after kiliing old jobs, to keep unmerged events",
0042 )
0043 options = option_parser.parse_args()
0044
0045 taskid = options.taskid
0046
0047 print("")
0048 print(f"trying to reassign jobs with modificationTime < CURRENT-{options.limit}min. Change the limit using -m if necessary")
0049
0050 codeV = 51
0051 if options.forceKill:
0052 codeV = 9
0053
0054 jobs = []
0055 jediJobs = []
0056
0057 timeLimit = naive_utcnow() - datetime.timedelta(minutes=options.limit)
0058 varMap = {}
0059 varMap[":modificationTime"] = timeLimit
0060 varMap[":taskID"] = taskid
0061 sql = "SELECT PandaID,lockedby FROM ATLAS_PANDA.jobsDefined4 WHERE taskID=:taskID AND modificationTime<:modificationTime "
0062 status, res = proxyS.querySQLS(sql, varMap)
0063 if res is not None:
0064 for id, lockedby in res:
0065 if lockedby == "jedi":
0066 jediJobs.append(id)
0067 else:
0068 jobs.append(id)
0069
0070
0071 varMap = {}
0072 varMap[":js1"] = "activated"
0073 varMap[":js2"] = "starting"
0074 varMap[":modificationTime"] = timeLimit
0075 varMap[":taskID"] = taskid
0076 sql = "SELECT PandaID,lockedby FROM ATLAS_PANDA.jobsActive4 WHERE jobStatus IN (:js1,:js2) AND taskID=:taskID AND modificationTime<:modificationTime "
0077 status, res = proxyS.querySQLS(sql, varMap)
0078 if res is not None:
0079 for id, lockedby in res:
0080 if lockedby == "jedi":
0081 jediJobs.append(id)
0082 else:
0083 jobs.append(id)
0084
0085 varMap = {}
0086 varMap[":jobStatus"] = "waiting"
0087 varMap[":modificationTime"] = timeLimit
0088 varMap[":taskID"] = taskid
0089 sql = "SELECT PandaID,lockedby FROM ATLAS_PANDA.jobsDefined4 WHERE jobStatus=:jobStatus AND taskID=:taskID AND modificationTime<:modificationTime "
0090 status, res = proxyS.querySQLS(sql, varMap)
0091 if res is not None:
0092 for id, lockedby in res:
0093 if lockedby == "jedi":
0094 jediJobs.append(id)
0095 else:
0096 jobs.append(id)
0097
0098
0099 jobs.sort()
0100 if len(jobs):
0101 nJob = 100
0102 iJob = 0
0103 while iJob < len(jobs):
0104 print(f"reassign {str(jobs[iJob:iJob + nJob])}")
0105 Client.reassign_jobs(jobs[iJob : iJob + nJob])
0106 iJob += nJob
0107 time.sleep(10)
0108
0109 if len(jediJobs) != 0:
0110 nJob = 100
0111 iJob = 0
0112 while iJob < len(jediJobs):
0113 print(f"kill JEDI jobs {str(jediJobs[iJob:iJob + nJob])}")
0114 Client.kill_jobs(jediJobs[iJob : iJob + nJob], codeV, keep_unmerged=options.keepUnmerged)
0115 iJob += nJob
0116
0117 print(f"\nreassigned {len(jobs + jediJobs)} jobs")