Commit 9f47ffa5 authored by Ferreira Da Silva, Rafael's avatar Ferreira Da Silva, Rafael
Browse files

improving streamming

parent 78269aed
Loading
Loading
Loading
Loading
+0 −0

File moved.

zmq-c/rcv.py

0 → 100644
+60 −0
Original line number Diff line number Diff line
import zmq
import numpy as np
import time
import json

context = zmq.Context()
# subscription socket where we'll receive data
subsocket = context.socket(zmq.SUB)
subsocket.setsockopt(zmq.SUBSCRIBE,b"")
#subsocket.connect("tcp://login11.frontier.olcf.ornl.gov:5555")
subsocket.connect("tcp://*:5555")

# let the publisher know we are established
sigsocket = context.socket(zmq.REQ)
#sigsocket.connect("tcp://login11.frontier.olcf.ornl.gov:5556")
sigsocket.connect("tcp://*:5556")
sigsocket.send(b'')
sigsocket.recv()

print("synchronized; ready to receive data")

#msg = subsocket.recv()
#md = json.loads(msg[0])
#sz = msg[1]

# Function to receive and reconstruct the byte array
def receive_byte_array(socket):
    chunks = []
    while True:
        print("receiving data")
        chunk = socket.recv()
        chunks.append(chunk)
        print("received")
        if not socket.getsockopt(zmq.RCVMORE):
            break
        else:
            print("waiting")
    return b''.join(chunks)


# Receive the byte array
byte_array = receive_byte_array(subsocket)

# Assuming the sender sends metadata (dtype and shape) in advance, deserialize it
# Adjust these lines as necessary based on how the metadata is sent
#md = json.loads(subsocket.recv())
#ary = np.frombuffer(byte_array, dtype=md['dtype']).reshape(md['shape'])

print(f'received {ary.size} elements')

#msg = subsocket.recv_multipart(flags=0)
#md = json.loads(msg[0])
#sz = msg[1]
#
#ary = np.frombuffer(sz,dtype=md['dtype'])
#ary.reshape(md['shape'])
#
#print(f'received {ary.size} elements')

+17 −5
Original line number Diff line number Diff line
@@ -4,12 +4,12 @@ import time
import json

aryinit = np.load("../data/p_322_data_np_res_16.npy")
ary = aryinit[:1000]
#ary = aryinit[:1000]

print('array contains {} elements'.format(ary.size))
print('array contains {} elements'.format(aryinit.size))

# set up metadata for numpy array reconstruction
md = dict(dtype=str(ary.dtype),shape=ary.shape)
md = dict(dtype=str(aryinit.dtype),shape=aryinit.shape)

context = zmq.Context()
# data push socket
@@ -32,7 +32,19 @@ sigsocket.send(b'')
# send metadata first
print('synchronized; ready to send data')

pubsocket.send_json((md),flags=zmq.SNDMORE)
pubsocket.send(ary,flags=0)
#pubsocket.send_json((md),flags=zmq.SNDMORE)
#pubsocket.send(ary,flags=0)

chunk_size=1000
print(f"array length: {len(aryinit)}")
total_chunks = len(aryinit) // chunk_size + (1 if len(aryinit) % chunk_size else 0)

for i in range(total_chunks):
    start = i * chunk_size
    end = start + chunk_size

    pubsocket.send(aryinit[start:end], zmq.SNDMORE if i < total_chunks - 1 else 0)
    print(f"sent {start}, {end}")

#pubsocket.send_json((md),flags=0)
print("sent all elements")

zmq-c/receiver.py

0 → 100644
+46 −0
Original line number Diff line number Diff line
import zmq
import numpy as np
import json

context = zmq.Context()

subsocket = context.socket(zmq.SUB)
subsocket.setsockopt_string(zmq.SUBSCRIBE, "metadata")
subsocket.setsockopt_string(zmq.SUBSCRIBE, "data")
subsocket.setsockopt_string(zmq.SUBSCRIBE, "END")
subsocket.connect("tcp://localhost:5555")

sigsocket = context.socket(zmq.REQ)
sigsocket.connect("tcp://localhost:5556")
sigsocket.send(b'')
sigsocket.recv()

print("Synchronized; ready to receive data")

def receive_byte_array(socket):
    chunks = []
    while True:
        try:
            topic, chunk = socket.recv_multipart()
            print(f"Received message with topic: {topic.decode()}")  # Debugging print
            if topic.decode() == "END":
                break
            elif topic.decode() == "data":
                chunks.append(chunk)
        except Exception as e:
            print(f"Error receiving chunk: {e}")
            break
    return b''.join(chunks)

# Receive metadata
topic, metadata_str = subsocket.recv_multipart()
md = json.loads(metadata_str)
print(f"Metadata received: {md}")

byte_array = receive_byte_array(subsocket)

ary = np.frombuffer(byte_array, dtype=np.dtype(md['dtype']))
ary = ary.reshape(md['shape'])

print(f'Received {ary.size} elements of type {ary.dtype}')

zmq-c/sender.py

0 → 100644
+41 −0
Original line number Diff line number Diff line
import numpy as np
import zmq
import time
import json

aryinit = np.load("../data/p_322_data_np_res_32.npy")
print('Array contains {} elements'.format(aryinit.size))

md = dict(dtype=str(aryinit.dtype), shape=aryinit.shape)

context = zmq.Context()
pubsocket = context.socket(zmq.PUB)
pubsocket.bind("tcp://*:5555")

sigsocket = context.socket(zmq.REP)
sigsocket.bind("tcp://*:5556")

time.sleep(1)  # Allow time for socket setup
msg = sigsocket.recv()
sigsocket.send(b'')

print('Synchronized; ready to send data')

metadata_str = json.dumps(md)
pubsocket.send_string("metadata", flags=zmq.SNDMORE)
pubsocket.send_string(metadata_str)

chunk_size = 1000
total_chunks = len(aryinit) // chunk_size + (1 if len(aryinit) % chunk_size else 0)

for i in range(total_chunks):
    start = i * chunk_size
    end = start + chunk_size
    pubsocket.send_string("data", flags=zmq.SNDMORE)
    pubsocket.send(aryinit[start:end], flags=0)
    print(f"Sent chunk from index {start} to {end}")

pubsocket.send_string("END", flags=zmq.SNDMORE)
pubsocket.send(b"")
print("Sent all elements")
time.sleep(1)
Loading