Loading zmq/rcv.py +19 −11 Original line number Diff line number Diff line import zmq import numpy as np import time context = zmq.Context() socket = context.socket(zmq.REP) socket.bind("tcp://*:5555") # subscription socket where we'll receive data subsocket = context.socket(zmq.SUB) subsocket.setsockopt(zmq.SUBSCRIBE,b"") subsocket.connect("tcp://localhost:5555") print("ready to receive") # let the publisher know we are established sigsocket = context.socket(zmq.REQ) sigsocket.connect("tcp://localhost:5556") sigsocket.send(b'') sigsocket.recv() # get metadata first ms = socket.recv_json() print("got the json") sz = socket.recv() print("got the array") ary = numpy.frombuffer(memoryview(sz),dtype=md["dtype"]) ary.reshape(md["shape"]) print("synchronized; ready to receive data") msg = subsocket.recv_multipart(flags=0,copy=True) md = msg[0] sz = msg[1] print(f'received {ary.len()} elements') ary = np.frombuffer(bytes(memoryview(sz)),dtype=md['dtype']) ary.reshape(md['shape']) print(f'received {ary.size} elements') zmq/reader.py +24 −4 Original line number Diff line number Diff line import numpy as np import zmq import time ary = np.load("../data/p_322_data_np_res_8.npy") Loading @@ -9,11 +10,30 @@ print('array contains {} elements'.format(ary.size)) md = dict(dtype=str(ary.dtype),shape=ary.shape) context = zmq.Context() socket = context.socket(zmq.REQ) socket.connect("tcp://localhost:5555") # data push socket pubsocket = context.socket(zmq.PUB) pubsocket.bind("tcp://*:5555") # subscriber signal socket sigsocket = context.socket(zmq.REP) sigsocket.bind("tcp://*:5556") # wait for a subscriber to announce their readiness at the signal socket. This means # they've already subscribed to the datasocket and so are ready to receive data. time.sleep(1) # wait for request... msg = sigsocket.recv() # trivial reply once request received sigsocket.send(b'') # now we can send data # send metadata first socket.send_json((md),zmq.SNDMORE) socket.send(ary) print('synchronized; ready to send data') pubsocket.send_json((md),flags=zmq.SNDMORE) pubsocket.send(ary,flags=0) print("sent all elements") Loading
zmq/rcv.py +19 −11 Original line number Diff line number Diff line import zmq import numpy as np import time context = zmq.Context() socket = context.socket(zmq.REP) socket.bind("tcp://*:5555") # subscription socket where we'll receive data subsocket = context.socket(zmq.SUB) subsocket.setsockopt(zmq.SUBSCRIBE,b"") subsocket.connect("tcp://localhost:5555") print("ready to receive") # let the publisher know we are established sigsocket = context.socket(zmq.REQ) sigsocket.connect("tcp://localhost:5556") sigsocket.send(b'') sigsocket.recv() # get metadata first ms = socket.recv_json() print("got the json") sz = socket.recv() print("got the array") ary = numpy.frombuffer(memoryview(sz),dtype=md["dtype"]) ary.reshape(md["shape"]) print("synchronized; ready to receive data") msg = subsocket.recv_multipart(flags=0,copy=True) md = msg[0] sz = msg[1] print(f'received {ary.len()} elements') ary = np.frombuffer(bytes(memoryview(sz)),dtype=md['dtype']) ary.reshape(md['shape']) print(f'received {ary.size} elements')
zmq/reader.py +24 −4 Original line number Diff line number Diff line import numpy as np import zmq import time ary = np.load("../data/p_322_data_np_res_8.npy") Loading @@ -9,11 +10,30 @@ print('array contains {} elements'.format(ary.size)) md = dict(dtype=str(ary.dtype),shape=ary.shape) context = zmq.Context() socket = context.socket(zmq.REQ) socket.connect("tcp://localhost:5555") # data push socket pubsocket = context.socket(zmq.PUB) pubsocket.bind("tcp://*:5555") # subscriber signal socket sigsocket = context.socket(zmq.REP) sigsocket.bind("tcp://*:5556") # wait for a subscriber to announce their readiness at the signal socket. This means # they've already subscribed to the datasocket and so are ready to receive data. time.sleep(1) # wait for request... msg = sigsocket.recv() # trivial reply once request received sigsocket.send(b'') # now we can send data # send metadata first socket.send_json((md),zmq.SNDMORE) socket.send(ary) print('synchronized; ready to send data') pubsocket.send_json((md),flags=zmq.SNDMORE) pubsocket.send(ary,flags=0) print("sent all elements")