Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-10 08:39:02

0001 import datetime
0002 import json
0003 
0004 from pandacommon.pandautils.PandaUtils import naive_utcnow
0005 
0006 
0007 # make message
0008 def make_message(msg_type, **kwargs):
0009     msg_dict = {"msg_type": msg_type}
0010     msg_dict.update(kwargs)
0011     msg_dict["timestamp"] = int(naive_utcnow().timestamp())
0012     return json.dumps(msg_dict)
0013 
0014 
0015 # send a job message
0016 def send_job_message(msg_queue, msg_topic, task_id, job_id):
0017     # make message
0018     msg = make_message("get_job", taskid=task_id, jobid=job_id)
0019     # use job ID for selector
0020     headers = {"type": job_id}
0021     # send message to topic first
0022     msg_topic.send(msg)
0023     # send the same message to queue
0024     msg_queue.send(msg, headers=headers)
0025 
0026 
0027 # delete a job message
0028 def delete_job_message(msg_queue, job_id, time_out=10):
0029     # job ID for selector
0030     headers = {"selector": "type='{0}' OR JMSType='{0}'".format(job_id)}
0031     # subscribe to remove job messages
0032     msg_queue.add_remover(headers, time_out)
0033     # delete old removers
0034     msg_queue.purge_removers()
0035 
0036 
0037 # send a task message
0038 def send_task_message(msg_topic, command_str, task_id):
0039     # make message
0040     msg = make_message(f"{command_str}_task", taskid=task_id)
0041     # send message to topic
0042     msg_topic.send(msg)