File indexing completed on 2026-04-09 07:58:22
0001 import json
0002 import socket
0003 import asyncio
0004 from nats.aio.client import Client as NATS
0005
0006
0007 nats_server = {"nats_url": "nats://idds-dev-rest-0.idds-dev-rest.panda.svc.cluster.local:4222", "nats_token": "my_default_token"}
0008 nats_server = {'nats_url': 'nats://idds-dev-rest-0.idds-dev-rest.panda.svc.cluster.local:4222', 'nats_token': 'my_default_token'}
0009
0010
0011 async def send():
0012 nats = NATS()
0013 await nats.connect(servers=[nats_server["nats_url"]], token=nats_server["nats_token"])
0014
0015 event = {"type": 1, "a": 123}
0016 js = nats.jetstream()
0017 await js.publish("event.UpdateProcessing", json.dumps(event).encode("utf-8"), timeout=5)
0018 await nats.flush()
0019 print("published")
0020
0021
0022 async def get():
0023 nats = NATS()
0024 await nats.connect(servers=[nats_server["nats_url"]], token=nats_server["nats_token"])
0025 js = nats.jetstream()
0026 msg = await js.get_msg(stream_name="event_stream", subject="event.UpdateProcessing2", direct=True, seq=1, next=False)
0027 print(msg)
0028 print(type(msg))
0029 data = msg.data.decode()
0030 print(data)
0031 print(type(data))
0032
0033 print(f"stream: {msg.stream}, seq: {msg.seq}, subject: {msg.subject}")
0034 await js.delete_msg(stream=msg.stream, seq=msg.seq)
0035
0036
0037 async def get_and_ack():
0038 nc = NATS()
0039 await nc.connect(servers=[nats_server["nats_url"]], token=nats_server["nats_token"])
0040
0041 js = nc.jetstream()
0042
0043
0044 short_hostname = socket.gethostname().split(".")[0]
0045 durable = f"event_UpdateProcessing_{short_hostname}"
0046
0047
0048 sub = await js.pull_subscribe("event.UpdateProcessing", durable=durable)
0049
0050
0051 msgs = await sub.fetch(1)
0052 for msg in msgs:
0053 print(type(msg))
0054 print("Got message:", msg.data.decode())
0055 print('----------------------')
0056 print('Subject:', msg.subject)
0057 print('Reply :', msg.reply)
0058 print('Data :', msg.data)
0059 print('Headers:', msg.header)
0060 await msg.ack()
0061
0062 await nc.close()
0063
0064
0065 async def nc_get_and_ack():
0066 nc = NATS()
0067 await nc.connect(servers=[nats_server["nats_url"]], token=nats_server["nats_token"])
0068
0069 sub = await nc.subscribe("event1.UpdateProcessing", max_msgs=1)
0070 msg = await sub.next_msg(timeout=5)
0071 if msg:
0072 print(type(msg))
0073 print("Got message:", msg.data.decode())
0074 print('----------------------')
0075 print('Subject:', msg.subject)
0076 print('Reply :', msg.reply)
0077 print('Data :', msg.data)
0078 print('Headers:', msg.header)
0079
0080
0081 await nc.close()
0082
0083
0084 if __name__ == "__main__":
0085
0086
0087 asyncio.run(get_and_ack())
0088