Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-20 07:59:01

0001 import os
0002 import sys
0003 
0004 from pandaharvester.harvestercore.communicator_pool import CommunicatorPool
0005 from pandaharvester.harvestercore.job_spec import JobSpec
0006 from pandaharvester.harvestercore.plugin_factory import PluginFactory
0007 from pandaharvester.harvestercore.queue_config_mapper import QueueConfigMapper
0008 from pandaharvester.harvestercore.resource_type_constants import (
0009     BASIC_RESOURCE_TYPE_SINGLE_CORE,
0010 )
0011 from pandaharvester.harvestercore.resource_type_mapper import ResourceTypeMapper
0012 from pandaharvester.harvestercore.work_spec import WorkSpec
0013 from pandaharvester.harvestermisc import signal_utils
0014 
0015 fork_child_pid = os.fork()
0016 if fork_child_pid != 0:
0017     signal_utils.set_suicide_handler(None)
0018     os.wait()
0019 else:
0020     if len(sys.argv) not in (2, 4):
0021         print("Wrong number of parameters. You can either:")
0022         print("  - specify the queue name")
0023         print("  - specify the queue name, jobType (managed, user) and resourceType (e.g. SCORE, SCORE_HIMEM, MCORE, MCORE_HIMEM)")
0024         sys.exit(0)
0025 
0026     queueName = sys.argv[1]
0027     queueConfigMapper = QueueConfigMapper()
0028     queueConfig = queueConfigMapper.get_queue(queueName)
0029 
0030     resource_type_mapper = ResourceTypeMapper()
0031 
0032     if queueConfig.prodSourceLabel in ("user", "managed"):
0033         jobType = queueConfig.prodSourceLabel
0034     else:
0035         jobType = "managed"  # default, can be overwritten by parameters
0036 
0037     resourceType = BASIC_RESOURCE_TYPE_SINGLE_CORE  # default, can be overwritten by parameters
0038 
0039     if len(sys.argv) == 4:
0040         # jobType should be 'managed' or 'user'. If not specified will default to a production job
0041         if sys.argv[2] in ("user", "managed"):
0042             jobType = sys.argv[2]
0043         else:
0044             print(f"value for jobType not valid, defaulted to {jobType}")
0045 
0046         # resourceType should be a valid resource type, e.g. 'SCORE', 'SCORE_HIMEM', 'MCORE', 'MCORE_HIMEM'. If not specified defaults to single core
0047         if resource_type_mapper.is_valid_resource_type(sys.argv[3]):
0048             resourceType = sys.argv[3]
0049         else:
0050             print(f"value for resourceType not valid, defaulted to {resourceType}")
0051 
0052     print(f"Running with queueName:{queueName}, jobType:{jobType}, resourceType:{resourceType}")
0053 
0054     pluginFactory = PluginFactory()
0055 
0056     com = CommunicatorPool()
0057 
0058     # get job
0059     jobSpecList = []
0060     if queueConfig.mapType != WorkSpec.MT_NoJob:
0061         jobs, errStr = com.get_jobs(queueConfig.queueName, "nodeName", queueConfig.prodSourceLabel, "computingElement", 1, None)
0062         if len(jobs) == 0:
0063             print(f"Failed to get jobs at {queueConfig.queueName} due to {errStr}")
0064             sys.exit(0)
0065 
0066         jobSpec = JobSpec()
0067         jobSpec.convert_job_json(jobs[0])
0068 
0069         # set input file paths
0070         inFiles = jobSpec.get_input_file_attributes()
0071         for inLFN, inFile in inFiles.items():
0072             inFile["path"] = f"{os.getcwd()}/{inLFN}"
0073         jobSpec.set_input_file_paths(inFiles)
0074         jobSpecList.append(jobSpec)
0075 
0076     maker = pluginFactory.get_plugin(queueConfig.workerMaker)
0077     workSpec = maker.make_worker(jobSpecList, queueConfig, jobType, resourceType)
0078 
0079     workSpec.accessPoint = queueConfig.messenger["accessPoint"]
0080     workSpec.mapType = queueConfig.mapType
0081     workSpec.computingSite = queueConfig.queueName
0082 
0083     # set job to worker if not job-level late binding
0084     if not queueConfig.useJobLateBinding:
0085         workSpec.hasJob = 1
0086         workSpec.set_jobspec_list(jobSpecList)
0087 
0088     messenger = pluginFactory.get_plugin(queueConfig.messenger)
0089     messenger.setup_access_points([workSpec])
0090 
0091     # get plugin for messenger
0092     if queueConfig.mapType != WorkSpec.MT_NoJob:
0093         messenger = pluginFactory.get_plugin(queueConfig.messenger)
0094         messenger.feed_jobs(workSpec, jobSpecList)
0095 
0096         jobSpec = jobSpecList[0]
0097         if "eventService" in jobSpec.jobParams:
0098             workSpec.eventsRequest = WorkSpec.EV_useEvents
0099 
0100         if workSpec.hasJob == 1 and workSpec.eventsRequest == WorkSpec.EV_useEvents:
0101             workSpec.eventsRequest = WorkSpec.EV_requestEvents
0102             eventsRequestParams = dict()
0103             eventsRequestParams[jobSpec.PandaID] = {
0104                 "pandaID": jobSpec.PandaID,
0105                 "taskID": jobSpec.taskID,
0106                 "jobsetID": jobSpec.jobParams["jobsetID"],
0107                 "nRanges": jobSpec.jobParams["coreCount"],
0108             }
0109             workSpec.eventsRequestParams = eventsRequestParams
0110 
0111             tmpStat, events = com.get_event_ranges(workSpec.eventsRequestParams, False, os.getcwd())
0112             # failed
0113             if tmpStat is False:
0114                 print(f"failed to get events with {events}")
0115                 sys.exit(0)
0116             tmpStat = messenger.feed_events(workSpec, events)
0117             if tmpStat is False:
0118                 print(f"failed to feed events with {events}")
0119                 sys.exit(0)
0120 
0121     # get submitter plugin
0122     submitterCore = pluginFactory.get_plugin(queueConfig.submitter)
0123     print(f"testing submission with plugin={submitterCore.__class__.__name__}")
0124     tmpRetList = submitterCore.submit_workers([workSpec])
0125     tmpStat, tmpOut = tmpRetList[0]
0126     if tmpStat:
0127         print(f" OK batchID={workSpec.batchID}")
0128     else:
0129         print(f" NG {tmpOut}")
0130         sys.exit(1)
0131 
0132     print("")
0133 
0134     # get monitoring plug-in
0135     monCore = pluginFactory.get_plugin(queueConfig.monitor)
0136     print(f"testing monitoring for batchID={workSpec.batchID} with plugin={monCore.__class__.__name__}")
0137     tmpStat, tmpOut = monCore.check_workers([workSpec])
0138     tmpOut = tmpOut[0]
0139     if tmpStat:
0140         print(f" OK workerStatus={tmpOut[0]}")
0141     else:
0142         print(f" NG {tmpOut[1]}")
0143         sys.exit(1)