Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-10 08:39:07

0001 import argparse
0002 import datetime
0003 import time
0004 
0005 from pandacommon.pandautils.PandaUtils import naive_utcnow
0006 
0007 # password
0008 from pandaserver.config import panda_config
0009 from pandaserver.taskbuffer.OraDBProxy import DBProxy
0010 from pandaserver.userinterface import Client
0011 
0012 proxyS = DBProxy()
0013 proxyS.connect(panda_config.dbhost, panda_config.dbpasswd, panda_config.dbuser, panda_config.dbname)
0014 
0015 
0016 option_parser = argparse.ArgumentParser(conflict_handler="resolve", description="Reassign jobs in a task")
0017 option_parser.add_argument("taskid", action="store", metavar="TASKID", help="taskID of the task")
0018 option_parser.add_argument(
0019     "-m",
0020     dest="limit",
0021     type=int,
0022     action="store",
0023     default=60,
0024     metavar="MIMUTES",
0025     help="time limit in minute",
0026 )
0027 option_parser.add_argument(
0028     "-9",
0029     action="store_const",
0030     const=True,
0031     dest="forceKill",
0032     default=False,
0033     help="kill jobs before next heartbeat is coming",
0034 )
0035 option_parser.add_argument(
0036     "--keepUnmerged",
0037     action="store_const",
0038     const=True,
0039     dest="keepUnmerged",
0040     default=False,
0041     help="generate new jobs after kiliing old jobs, to keep unmerged events",
0042 )
0043 options = option_parser.parse_args()
0044 
0045 taskid = options.taskid
0046 
0047 print("")
0048 print(f"trying to reassign jobs with modificationTime < CURRENT-{options.limit}min. Change the limit using -m if necessary")
0049 
0050 codeV = 51
0051 if options.forceKill:
0052     codeV = 9
0053 
0054 jobs = []
0055 jediJobs = []
0056 
0057 timeLimit = naive_utcnow() - datetime.timedelta(minutes=options.limit)
0058 varMap = {}
0059 varMap[":modificationTime"] = timeLimit
0060 varMap[":taskID"] = taskid
0061 sql = "SELECT PandaID,lockedby FROM ATLAS_PANDA.jobsDefined4 WHERE taskID=:taskID AND modificationTime<:modificationTime "
0062 status, res = proxyS.querySQLS(sql, varMap)
0063 if res is not None:
0064     for id, lockedby in res:
0065         if lockedby == "jedi":
0066             jediJobs.append(id)
0067         else:
0068             jobs.append(id)
0069 
0070 
0071 varMap = {}
0072 varMap[":js1"] = "activated"
0073 varMap[":js2"] = "starting"
0074 varMap[":modificationTime"] = timeLimit
0075 varMap[":taskID"] = taskid
0076 sql = "SELECT PandaID,lockedby FROM ATLAS_PANDA.jobsActive4 WHERE jobStatus IN (:js1,:js2) AND taskID=:taskID AND modificationTime<:modificationTime "
0077 status, res = proxyS.querySQLS(sql, varMap)
0078 if res is not None:
0079     for id, lockedby in res:
0080         if lockedby == "jedi":
0081             jediJobs.append(id)
0082         else:
0083             jobs.append(id)
0084 
0085 varMap = {}
0086 varMap[":jobStatus"] = "waiting"
0087 varMap[":modificationTime"] = timeLimit
0088 varMap[":taskID"] = taskid
0089 sql = "SELECT PandaID,lockedby FROM ATLAS_PANDA.jobsDefined4 WHERE jobStatus=:jobStatus AND taskID=:taskID AND modificationTime<:modificationTime "
0090 status, res = proxyS.querySQLS(sql, varMap)
0091 if res is not None:
0092     for id, lockedby in res:
0093         if lockedby == "jedi":
0094             jediJobs.append(id)
0095         else:
0096             jobs.append(id)
0097 
0098 # reassign
0099 jobs.sort()
0100 if len(jobs):
0101     nJob = 100
0102     iJob = 0
0103     while iJob < len(jobs):
0104         print(f"reassign  {str(jobs[iJob:iJob + nJob])}")
0105         Client.reassign_jobs(jobs[iJob : iJob + nJob])
0106         iJob += nJob
0107         time.sleep(10)
0108 
0109 if len(jediJobs) != 0:
0110     nJob = 100
0111     iJob = 0
0112     while iJob < len(jediJobs):
0113         print(f"kill JEDI jobs {str(jediJobs[iJob:iJob + nJob])}")
0114         Client.kill_jobs(jediJobs[iJob : iJob + nJob], codeV, keep_unmerged=options.keepUnmerged)
0115         iJob += nJob
0116 
0117 print(f"\nreassigned {len(jobs + jediJobs)} jobs")