Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-09 07:58:22

0001 import json
0002 import socket
0003 import asyncio
0004 from nats.aio.client import Client as NATS
0005 
0006 
0007 nats_server = {"nats_url": "nats://idds-dev-rest-0.idds-dev-rest.panda.svc.cluster.local:4222", "nats_token": "my_default_token"}
0008 nats_server = {'nats_url': 'nats://idds-dev-rest-0.idds-dev-rest.panda.svc.cluster.local:4222', 'nats_token': 'my_default_token'}
0009 
0010 
0011 async def send():
0012     nats = NATS()
0013     await nats.connect(servers=[nats_server["nats_url"]], token=nats_server["nats_token"])
0014 
0015     event = {"type": 1, "a": 123}
0016     js = nats.jetstream()
0017     await js.publish("event.UpdateProcessing", json.dumps(event).encode("utf-8"), timeout=5)
0018     await nats.flush()
0019     print("published")
0020 
0021 
0022 async def get():
0023     nats = NATS()
0024     await nats.connect(servers=[nats_server["nats_url"]], token=nats_server["nats_token"])
0025     js = nats.jetstream()
0026     msg = await js.get_msg(stream_name="event_stream", subject="event.UpdateProcessing2", direct=True, seq=1, next=False)
0027     print(msg)
0028     print(type(msg))
0029     data = msg.data.decode()
0030     print(data)
0031     print(type(data))
0032     # msg.ack()
0033     print(f"stream: {msg.stream}, seq: {msg.seq}, subject: {msg.subject}")
0034     await js.delete_msg(stream=msg.stream, seq=msg.seq)
0035 
0036 
0037 async def get_and_ack():
0038     nc = NATS()
0039     await nc.connect(servers=[nats_server["nats_url"]], token=nats_server["nats_token"])
0040 
0041     js = nc.jetstream()
0042 
0043     # Attach to a durable consumer (FIFO delivery)
0044     short_hostname = socket.gethostname().split(".")[0]
0045     durable = f"event_UpdateProcessing_{short_hostname}"   # must without "."
0046 
0047     # sub = await js.pull_subscribe("event.UpdateProcessing", durable="worker1")
0048     sub = await js.pull_subscribe("event.UpdateProcessing", durable=durable)
0049 
0050     # Fetch one message
0051     msgs = await sub.fetch(1)
0052     for msg in msgs:
0053         print(type(msg))
0054         print("Got message:", msg.data.decode())
0055         print('----------------------')
0056         print('Subject:', msg.subject)
0057         print('Reply  :', msg.reply)
0058         print('Data   :', msg.data)
0059         print('Headers:', msg.header)
0060         await msg.ack()  # This works, because msg is a `Msg`, not a RawStreamMsg
0061 
0062     await nc.close()
0063 
0064 
0065 async def nc_get_and_ack():
0066     nc = NATS()
0067     await nc.connect(servers=[nats_server["nats_url"]], token=nats_server["nats_token"])
0068 
0069     sub = await nc.subscribe("event1.UpdateProcessing", max_msgs=1)
0070     msg = await sub.next_msg(timeout=5)
0071     if msg:
0072         print(type(msg))
0073         print("Got message:", msg.data.decode())
0074         print('----------------------')
0075         print('Subject:', msg.subject)
0076         print('Reply  :', msg.reply)
0077         print('Data   :', msg.data)
0078         print('Headers:', msg.header)
0079         # await msg.ack()  # This works, because msg is a `Msg`, not a RawStreamMsg
0080 
0081     await nc.close()
0082 
0083 
0084 if __name__ == "__main__":
0085     # asyncio.run(send())
0086     # asyncio.run(get())
0087     asyncio.run(get_and_ack())
0088     # asyncio.run(nc_get_and_ack())