File indexing completed on 2026-04-10 08:39:02
0001 import datetime
0002 import json
0003
0004 from pandacommon.pandautils.PandaUtils import naive_utcnow
0005
0006
0007
0008 def make_message(msg_type, **kwargs):
0009 msg_dict = {"msg_type": msg_type}
0010 msg_dict.update(kwargs)
0011 msg_dict["timestamp"] = int(naive_utcnow().timestamp())
0012 return json.dumps(msg_dict)
0013
0014
0015
0016 def send_job_message(msg_queue, msg_topic, task_id, job_id):
0017
0018 msg = make_message("get_job", taskid=task_id, jobid=job_id)
0019
0020 headers = {"type": job_id}
0021
0022 msg_topic.send(msg)
0023
0024 msg_queue.send(msg, headers=headers)
0025
0026
0027
0028 def delete_job_message(msg_queue, job_id, time_out=10):
0029
0030 headers = {"selector": "type='{0}' OR JMSType='{0}'".format(job_id)}
0031
0032 msg_queue.add_remover(headers, time_out)
0033
0034 msg_queue.purge_removers()
0035
0036
0037
0038 def send_task_message(msg_topic, command_str, task_id):
0039
0040 msg = make_message(f"{command_str}_task", taskid=task_id)
0041
0042 msg_topic.send(msg)