Loading sender.py +18 −15 Original line number Diff line number Diff line Loading @@ -4,12 +4,14 @@ import time import json import yaml import os import sys from datetime import datetime configFile = os.path.expanduser('./topaz_config.yaml') with open(configFile, 'r') as stream: config = yaml.safe_load(stream) aryinit = np.load(config['data_path']) print('Array contains {} elements'.format(aryinit.size)) Loading @@ -17,32 +19,33 @@ md = dict(dtype=str(aryinit.dtype), shape=aryinit.shape) context = zmq.Context() pubsocket = context.socket(zmq.PUB) pubsocket.bind("tcp://*:5555") sigsocket = context.socket(zmq.REP) sigsocket.bind("tcp://*:5556") pubsocket.connect("tcp://localhost:5559") time.sleep(1) # Allow time for socket setup msg = sigsocket.recv() sigsocket.send(b'') print('Synchronized; ready to send data') metadata_str = json.dumps(md) pubsocket.send_string("metadata", flags=zmq.SNDMORE) pubsocket.send_string(metadata_str) chunk_size = 10 chunk_size = 1000 total_chunks = len(aryinit) // chunk_size + (1 if len(aryinit) % chunk_size else 0) for i in range(total_chunks): start = i * chunk_size end = start + chunk_size print(f"Array Length: {len(aryinit)} - Total Chunks: {total_chunks}") start_time = datetime.now() pubsocket.send_string(f"metadata", flags=zmq.SNDMORE) pubsocket.send_string(metadata_str) pubsocket.send_string("data", flags=zmq.SNDMORE) pubsocket.send(aryinit[start:end], flags=0) print(f"Sent chunk from index {start} to {end}") pubsocket.send(aryinit) pubsocket.send_string("END", flags=zmq.SNDMORE) pubsocket.send(b"") print("Sent all elements") end_time = datetime.now() print(f"Total time sending data: {(end_time - start_time).total_seconds()}") time.sleep(1) Loading
sender.py +18 −15 Original line number Diff line number Diff line Loading @@ -4,12 +4,14 @@ import time import json import yaml import os import sys from datetime import datetime configFile = os.path.expanduser('./topaz_config.yaml') with open(configFile, 'r') as stream: config = yaml.safe_load(stream) aryinit = np.load(config['data_path']) print('Array contains {} elements'.format(aryinit.size)) Loading @@ -17,32 +19,33 @@ md = dict(dtype=str(aryinit.dtype), shape=aryinit.shape) context = zmq.Context() pubsocket = context.socket(zmq.PUB) pubsocket.bind("tcp://*:5555") sigsocket = context.socket(zmq.REP) sigsocket.bind("tcp://*:5556") pubsocket.connect("tcp://localhost:5559") time.sleep(1) # Allow time for socket setup msg = sigsocket.recv() sigsocket.send(b'') print('Synchronized; ready to send data') metadata_str = json.dumps(md) pubsocket.send_string("metadata", flags=zmq.SNDMORE) pubsocket.send_string(metadata_str) chunk_size = 10 chunk_size = 1000 total_chunks = len(aryinit) // chunk_size + (1 if len(aryinit) % chunk_size else 0) for i in range(total_chunks): start = i * chunk_size end = start + chunk_size print(f"Array Length: {len(aryinit)} - Total Chunks: {total_chunks}") start_time = datetime.now() pubsocket.send_string(f"metadata", flags=zmq.SNDMORE) pubsocket.send_string(metadata_str) pubsocket.send_string("data", flags=zmq.SNDMORE) pubsocket.send(aryinit[start:end], flags=0) print(f"Sent chunk from index {start} to {end}") pubsocket.send(aryinit) pubsocket.send_string("END", flags=zmq.SNDMORE) pubsocket.send(b"") print("Sent all elements") end_time = datetime.now() print(f"Total time sending data: {(end_time - start_time).total_seconds()}") time.sleep(1)