File indexing completed on 2026-04-20 07:59:01
0001 import os
0002 import sys
0003
0004 from pandaharvester.harvestercore.communicator_pool import CommunicatorPool
0005 from pandaharvester.harvestercore.job_spec import JobSpec
0006 from pandaharvester.harvestercore.plugin_factory import PluginFactory
0007 from pandaharvester.harvestercore.queue_config_mapper import QueueConfigMapper
0008 from pandaharvester.harvestercore.resource_type_constants import (
0009 BASIC_RESOURCE_TYPE_SINGLE_CORE,
0010 )
0011 from pandaharvester.harvestercore.resource_type_mapper import ResourceTypeMapper
0012 from pandaharvester.harvestercore.work_spec import WorkSpec
0013 from pandaharvester.harvestermisc import signal_utils
0014
0015 fork_child_pid = os.fork()
0016 if fork_child_pid != 0:
0017 signal_utils.set_suicide_handler(None)
0018 os.wait()
0019 else:
0020 if len(sys.argv) not in (2, 4):
0021 print("Wrong number of parameters. You can either:")
0022 print(" - specify the queue name")
0023 print(" - specify the queue name, jobType (managed, user) and resourceType (e.g. SCORE, SCORE_HIMEM, MCORE, MCORE_HIMEM)")
0024 sys.exit(0)
0025
0026 queueName = sys.argv[1]
0027 queueConfigMapper = QueueConfigMapper()
0028 queueConfig = queueConfigMapper.get_queue(queueName)
0029
0030 resource_type_mapper = ResourceTypeMapper()
0031
0032 if queueConfig.prodSourceLabel in ("user", "managed"):
0033 jobType = queueConfig.prodSourceLabel
0034 else:
0035 jobType = "managed"
0036
0037 resourceType = BASIC_RESOURCE_TYPE_SINGLE_CORE
0038
0039 if len(sys.argv) == 4:
0040
0041 if sys.argv[2] in ("user", "managed"):
0042 jobType = sys.argv[2]
0043 else:
0044 print(f"value for jobType not valid, defaulted to {jobType}")
0045
0046
0047 if resource_type_mapper.is_valid_resource_type(sys.argv[3]):
0048 resourceType = sys.argv[3]
0049 else:
0050 print(f"value for resourceType not valid, defaulted to {resourceType}")
0051
0052 print(f"Running with queueName:{queueName}, jobType:{jobType}, resourceType:{resourceType}")
0053
0054 pluginFactory = PluginFactory()
0055
0056 com = CommunicatorPool()
0057
0058
0059 jobSpecList = []
0060 if queueConfig.mapType != WorkSpec.MT_NoJob:
0061 jobs, errStr = com.get_jobs(queueConfig.queueName, "nodeName", queueConfig.prodSourceLabel, "computingElement", 1, None)
0062 if len(jobs) == 0:
0063 print(f"Failed to get jobs at {queueConfig.queueName} due to {errStr}")
0064 sys.exit(0)
0065
0066 jobSpec = JobSpec()
0067 jobSpec.convert_job_json(jobs[0])
0068
0069
0070 inFiles = jobSpec.get_input_file_attributes()
0071 for inLFN, inFile in inFiles.items():
0072 inFile["path"] = f"{os.getcwd()}/{inLFN}"
0073 jobSpec.set_input_file_paths(inFiles)
0074 jobSpecList.append(jobSpec)
0075
0076 maker = pluginFactory.get_plugin(queueConfig.workerMaker)
0077 workSpec = maker.make_worker(jobSpecList, queueConfig, jobType, resourceType)
0078
0079 workSpec.accessPoint = queueConfig.messenger["accessPoint"]
0080 workSpec.mapType = queueConfig.mapType
0081 workSpec.computingSite = queueConfig.queueName
0082
0083
0084 if not queueConfig.useJobLateBinding:
0085 workSpec.hasJob = 1
0086 workSpec.set_jobspec_list(jobSpecList)
0087
0088 messenger = pluginFactory.get_plugin(queueConfig.messenger)
0089 messenger.setup_access_points([workSpec])
0090
0091
0092 if queueConfig.mapType != WorkSpec.MT_NoJob:
0093 messenger = pluginFactory.get_plugin(queueConfig.messenger)
0094 messenger.feed_jobs(workSpec, jobSpecList)
0095
0096 jobSpec = jobSpecList[0]
0097 if "eventService" in jobSpec.jobParams:
0098 workSpec.eventsRequest = WorkSpec.EV_useEvents
0099
0100 if workSpec.hasJob == 1 and workSpec.eventsRequest == WorkSpec.EV_useEvents:
0101 workSpec.eventsRequest = WorkSpec.EV_requestEvents
0102 eventsRequestParams = dict()
0103 eventsRequestParams[jobSpec.PandaID] = {
0104 "pandaID": jobSpec.PandaID,
0105 "taskID": jobSpec.taskID,
0106 "jobsetID": jobSpec.jobParams["jobsetID"],
0107 "nRanges": jobSpec.jobParams["coreCount"],
0108 }
0109 workSpec.eventsRequestParams = eventsRequestParams
0110
0111 tmpStat, events = com.get_event_ranges(workSpec.eventsRequestParams, False, os.getcwd())
0112
0113 if tmpStat is False:
0114 print(f"failed to get events with {events}")
0115 sys.exit(0)
0116 tmpStat = messenger.feed_events(workSpec, events)
0117 if tmpStat is False:
0118 print(f"failed to feed events with {events}")
0119 sys.exit(0)
0120
0121
0122 submitterCore = pluginFactory.get_plugin(queueConfig.submitter)
0123 print(f"testing submission with plugin={submitterCore.__class__.__name__}")
0124 tmpRetList = submitterCore.submit_workers([workSpec])
0125 tmpStat, tmpOut = tmpRetList[0]
0126 if tmpStat:
0127 print(f" OK batchID={workSpec.batchID}")
0128 else:
0129 print(f" NG {tmpOut}")
0130 sys.exit(1)
0131
0132 print("")
0133
0134
0135 monCore = pluginFactory.get_plugin(queueConfig.monitor)
0136 print(f"testing monitoring for batchID={workSpec.batchID} with plugin={monCore.__class__.__name__}")
0137 tmpStat, tmpOut = monCore.check_workers([workSpec])
0138 tmpOut = tmpOut[0]
0139 if tmpStat:
0140 print(f" OK workerStatus={tmpOut[0]}")
0141 else:
0142 print(f" NG {tmpOut[1]}")
0143 sys.exit(1)