File indexing completed on 2026-04-20 07:59:01
0001
0002 import datetime
0003 import logging
0004 import optparse
0005 import os
0006 import sqlite3
0007 import sys
0008
0009 logger = logging.getLogger(__name__)
0010
0011
0012
0013
0014
0015
0016
0017
0018
0019
0020
0021
0022
0023
0024
0025
0026
0027
0028
0029
0030
0031
0032
0033
0034
0035
0036
0037
0038
0039
0040
0041
0042
0043
0044
0045
0046
0047
0048 def main():
0049 """this script grabs the latest workers that have been added to the worker_table, finds their associated panda job ids from the jw_table, then presents how many jobs are in each state for that worker. It also shows the panda jobs which are in the fetched, preparing, and prepared states which have not yet been assigned to a worker."""
0050 logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s:%(name)s:%(message)s")
0051
0052 parser = optparse.OptionParser(description="")
0053 parser.add_option("-d", "--database-filename", dest="database_filename", help="The Harvester data base file name.")
0054 parser.add_option(
0055 "-t",
0056 "--time-in-hours",
0057 dest="hours",
0058 help="this prints the workers last modified in the last N hours. last-n-workers and time-in-hours are mutually exclusive.",
0059 type="int",
0060 )
0061 parser.add_option(
0062 "-n",
0063 "--last-n-workers",
0064 dest="workers",
0065 help="this prints the last N workers created. last-n-workers and time-in-hours are mutually exclusive.",
0066 type="int",
0067 )
0068 options, args = parser.parse_args()
0069
0070 manditory_args = [
0071 "database_filename",
0072 ]
0073
0074 for man in manditory_args:
0075 if options.__dict__[man] is None:
0076 logger.error("Must specify option: " + man)
0077 parser.print_help()
0078 sys.exit(-1)
0079
0080 if options.hours and options.workers:
0081 logger.error("can only specify time-in-hours or last-n-workers, not both")
0082 parser.print_help()
0083 sys.exit(-1)
0084 elif not options.hours and not options.workers:
0085 logger.error("must specify time-in-hours or last-n-workers")
0086 parser.print_help()
0087 sys.exit(-1)
0088
0089 conn = sqlite3.connect(options.database_filename)
0090
0091 cursor = conn.cursor()
0092
0093 if options.hours:
0094 utcnow = datetime.datetime.now(datetime.timezone.utc).replace(tzinfo=None) - datetime.timedelta(hours=options.hours)
0095 utcnow_str = utcnow.strftime("%Y-%d-%m %H:%M:%S")
0096 work_cmd = f'SELECT workerID,batchID,status FROM work_table WHERE modificationTime > "{utcnow_str}"'
0097 elif options.workers:
0098 work_cmd = f"SELECT workerID,batchID,status FROM work_table ORDER BY workerID DESC LIMIT {options.workers}"
0099 cursor.execute(work_cmd)
0100
0101 work_entries = cursor.fetchall()
0102
0103 for work_entry in work_entries:
0104 workerID, batchID, workerStatus = work_entry
0105
0106 jobs_in_state = {}
0107 jobs_in_substate = {}
0108
0109 jw_cmd = f"SELECT * FROM jw_table WHERE workerID={workerID}"
0110
0111 cursor.execute(jw_cmd)
0112 jw_entries = cursor.fetchall()
0113
0114 for jw_entry in jw_entries:
0115 pandaID, workerID, relationType = jw_entry
0116
0117 job_cmd = f"SELECT status,subStatus FROM job_table WHERE PandaID={pandaID}"
0118
0119 cursor.execute(job_cmd)
0120 job_info = cursor.fetchall()[0]
0121 jobStatus, jobSubStatus = job_info
0122 if jobStatus in jobs_in_state:
0123 jobs_in_state[jobStatus] += 1
0124 else:
0125 jobs_in_state[jobStatus] = 1
0126 if jobSubStatus in jobs_in_substate:
0127 jobs_in_substate[jobSubStatus] += 1
0128 else:
0129 jobs_in_substate[jobSubStatus] = 1
0130
0131 string = "job status = ["
0132 for job_status, count in jobs_in_state.iteritems():
0133 string += f" {job_status}({count})"
0134 string += "] subStatus = {"
0135 for job_substatus, count in jobs_in_substate.iteritems():
0136 string += f"{job_substatus}({count})"
0137 string += "}"
0138 logger.info("workerID: %s; batchID: %s; worker status: %s; %s", workerID, batchID, workerStatus, string)
0139
0140 cmd = 'SELECT PandaID,status,subStatus FROM job_table WHERE subStatus="fetched"'
0141 cursor.execute(cmd)
0142 jobs = cursor.fetchall()
0143 logger.info("panda jobs in fetched: %s", len(jobs))
0144
0145 cmd = 'SELECT PandaID,status,subStatus FROM job_table WHERE subStatus="preparing"'
0146 cursor.execute(cmd)
0147 jobs = cursor.fetchall()
0148 logger.info("panda jobs in preparing: %s", len(jobs))
0149
0150 cmd = 'SELECT PandaID,status,subStatus FROM job_table WHERE subStatus="prepared"'
0151 cursor.execute(cmd)
0152 jobs = cursor.fetchall()
0153 logger.info("panda jobs in prepared: %s", len(jobs))
0154
0155
0156 if __name__ == "__main__":
0157 main()