Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-20 07:59:01

0001 #!/usr/bin/env python
0002 import datetime
0003 import logging
0004 import optparse
0005 import os
0006 import sqlite3
0007 import sys
0008 
0009 logger = logging.getLogger(__name__)
0010 
0011 # jw_table columns:
0012 # 0|PandaID|integer|0||0
0013 # 1|workerID|integer|0||0
0014 # 2|relationType|text|0||0
0015 
0016 # work_table columns:
0017 # 0|workerID|integer|0||0
0018 # 1|batchID|text|0||0
0019 # 2|mapType|text|0||0
0020 # 3|queueName|text|0||0
0021 # 4|status|text|0||0
0022 # 5|hasJob|integer|0||0
0023 # 6|workParams|blob|0||0
0024 # 7|workAttributes|blob|0||0
0025 # 8|eventsRequestParams|blob|0||0
0026 # 9|eventsRequest|integer|0||0
0027 # 10|computingSite|text|0||0
0028 # 11|creationTime|timestamp|0||0
0029 # 12|submitTime|timestamp|0||0
0030 # 13|startTime|timestamp|0||0
0031 # 14|endTime|timestamp|0||0
0032 # 15|nCore|integer|0||0
0033 # 16|walltime|timestamp|0||0
0034 # 17|accessPoint|text|0||0
0035 # 18|modificationTime|timestamp|0||0
0036 # 19|lastUpdate|timestamp|0||0
0037 # 20|eventFeedTime|timestamp|0||0
0038 # 21|lockedBy|text|0||0
0039 # 22|postProcessed|integer|0||0
0040 # 23|nodeID|text|0||0
0041 # 24|minRamCount|integer|0||0
0042 # 25|maxDiskCount|integer|0||0
0043 # 26|maxWalltime|integer|0||0
0044 # 27|killTime|timestamp|0||0
0045 # 28|computingElement|text|0||0
0046 
0047 
0048 def main():
0049     """this script grabs the latest workers that have been added to the worker_table, finds their associated panda job ids from the jw_table, then presents how many jobs are in each state for that worker. It also shows the panda jobs which are in the fetched, preparing, and prepared states which have not yet been assigned to a worker."""
0050     logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s:%(name)s:%(message)s")
0051 
0052     parser = optparse.OptionParser(description="")
0053     parser.add_option("-d", "--database-filename", dest="database_filename", help="The Harvester data base file name.")
0054     parser.add_option(
0055         "-t",
0056         "--time-in-hours",
0057         dest="hours",
0058         help="this prints the workers last modified in the last N hours. last-n-workers and time-in-hours are mutually exclusive.",
0059         type="int",
0060     )
0061     parser.add_option(
0062         "-n",
0063         "--last-n-workers",
0064         dest="workers",
0065         help="this prints the last N workers created. last-n-workers and time-in-hours are mutually exclusive.",
0066         type="int",
0067     )
0068     options, args = parser.parse_args()
0069 
0070     manditory_args = [
0071         "database_filename",
0072     ]
0073 
0074     for man in manditory_args:
0075         if options.__dict__[man] is None:
0076             logger.error("Must specify option: " + man)
0077             parser.print_help()
0078             sys.exit(-1)
0079 
0080     if options.hours and options.workers:
0081         logger.error("can only specify time-in-hours or last-n-workers, not both")
0082         parser.print_help()
0083         sys.exit(-1)
0084     elif not options.hours and not options.workers:
0085         logger.error("must specify time-in-hours or last-n-workers")
0086         parser.print_help()
0087         sys.exit(-1)
0088 
0089     conn = sqlite3.connect(options.database_filename)
0090 
0091     cursor = conn.cursor()
0092 
0093     if options.hours:
0094         utcnow = datetime.datetime.now(datetime.timezone.utc).replace(tzinfo=None) - datetime.timedelta(hours=options.hours)
0095         utcnow_str = utcnow.strftime("%Y-%d-%m %H:%M:%S")
0096         work_cmd = f'SELECT workerID,batchID,status FROM work_table WHERE modificationTime > "{utcnow_str}"'
0097     elif options.workers:
0098         work_cmd = f"SELECT workerID,batchID,status FROM work_table ORDER BY workerID DESC LIMIT {options.workers}"
0099     cursor.execute(work_cmd)
0100 
0101     work_entries = cursor.fetchall()
0102 
0103     for work_entry in work_entries:
0104         workerID, batchID, workerStatus = work_entry
0105 
0106         jobs_in_state = {}
0107         jobs_in_substate = {}
0108 
0109         jw_cmd = f"SELECT * FROM jw_table WHERE workerID={workerID}"
0110 
0111         cursor.execute(jw_cmd)
0112         jw_entries = cursor.fetchall()
0113 
0114         for jw_entry in jw_entries:
0115             pandaID, workerID, relationType = jw_entry
0116 
0117             job_cmd = f"SELECT status,subStatus FROM job_table WHERE PandaID={pandaID}"
0118 
0119             cursor.execute(job_cmd)
0120             job_info = cursor.fetchall()[0]
0121             jobStatus, jobSubStatus = job_info
0122             if jobStatus in jobs_in_state:
0123                 jobs_in_state[jobStatus] += 1
0124             else:
0125                 jobs_in_state[jobStatus] = 1
0126             if jobSubStatus in jobs_in_substate:
0127                 jobs_in_substate[jobSubStatus] += 1
0128             else:
0129                 jobs_in_substate[jobSubStatus] = 1
0130             # logger.info('pandaID: %s status: %s subStatus: %s',pandaID,status,subStatus)
0131         string = "job status = ["
0132         for job_status, count in jobs_in_state.iteritems():
0133             string += f" {job_status}({count})"
0134         string += "] subStatus = {"
0135         for job_substatus, count in jobs_in_substate.iteritems():
0136             string += f"{job_substatus}({count})"
0137         string += "}"
0138         logger.info("workerID: %s; batchID: %s; worker status: %s; %s", workerID, batchID, workerStatus, string)
0139 
0140     cmd = 'SELECT PandaID,status,subStatus FROM job_table WHERE subStatus="fetched"'
0141     cursor.execute(cmd)
0142     jobs = cursor.fetchall()
0143     logger.info("panda jobs in fetched: %s", len(jobs))
0144 
0145     cmd = 'SELECT PandaID,status,subStatus FROM job_table WHERE subStatus="preparing"'
0146     cursor.execute(cmd)
0147     jobs = cursor.fetchall()
0148     logger.info("panda jobs in preparing: %s", len(jobs))
0149 
0150     cmd = 'SELECT PandaID,status,subStatus FROM job_table WHERE subStatus="prepared"'
0151     cursor.execute(cmd)
0152     jobs = cursor.fetchall()
0153     logger.info("panda jobs in prepared: %s", len(jobs))
0154 
0155 
0156 if __name__ == "__main__":
0157     main()