Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-20 07:59:01

0001 import os
0002 import random
0003 import sys
0004 import time
0005 
0006 from pandaharvester.harvestercore import core_utils
0007 from pandaharvester.harvestercore.fifos import MonitorFIFO
0008 from pandaharvester.harvestercore.job_spec import JobSpec
0009 from pandaharvester.harvestercore.plugin_factory import PluginFactory
0010 from pandaharvester.harvestercore.queue_config_mapper import QueueConfigMapper
0011 from pandaharvester.harvestercore.work_spec import WorkSpec
0012 
0013 # start test
0014 
0015 mq = MonitorFIFO()
0016 
0017 print("sleepTime", mq.config.sleepTime)
0018 
0019 
0020 def single_thread_test(nObjects=3, protective=False):
0021     time_point = time.time()
0022     print("clear")
0023     mq.fifo.clear()
0024     print("size", mq.size())
0025     time_consumed = time.time() - time_point
0026     print("Time consumed: ", time_consumed)
0027 
0028     time_point = time.time()
0029     for i in range(nObjects):
0030         workspec = WorkSpec()
0031         workspec.workerID = i
0032         data = {"random": [random.random(), random.random()]}
0033         workspec.workAttributes = data
0034         # print('put')
0035         mq.put(workspec)
0036         # print('size', mq.size())
0037     time_consumed = time.time() - time_point
0038     print(f"Time consumed: {time_consumed} sec ; Avg: {nObjects / time_consumed} obj/sec ")
0039 
0040     print("size", mq.size())
0041 
0042     print("peek")
0043     print(mq.peek())
0044 
0045     time_point = time.time()
0046     for i in range(nObjects):
0047         # print('get')
0048         obj = mq.get(timeout=3, protective=protective)
0049         # print(obj)
0050         # print('size', mq.size())
0051     time_consumed = time.time() - time_point
0052     print(f"Time consumed: {time_consumed} sec ; Avg: {nObjects / time_consumed} obj/sec ")
0053 
0054 
0055 print("Normal test")
0056 single_thread_test(nObjects=1000)
0057 print("Protective test")
0058 single_thread_test(nObjects=1000, protective=True)
0059 
0060 mq.fifo.clear()
0061 
0062 time_point = time.time()
0063 print("MonitorFIFO.populate")
0064 mq.populate(seconds_ago=0, clear_fifo=True)
0065 time_consumed = time.time() - time_point
0066 print("Time consumed: ", time_consumed)
0067 
0068 # workspec1 = WorkSpec()
0069 # workspec1.workerID = 777
0070 # workspec1.computingSite = 'TEST-SITE1'
0071 #
0072 # workspec2 = WorkSpec()
0073 # workspec2.workerID = 888
0074 # workspec2.computingSite = 'TEST-SITE2'
0075 #
0076 # workspec_bulk = [workspec1, workspec2]