Commit 6fe5452c authored by EVPath Upstream's avatar EVPath Upstream Committed by Eisenhauer, Greg
Browse files

EVPath 2018-12-16 (6ed8e721)

Code extracted from:

    https://github.com/GTkorvo/EVPath.git

at commit 6ed8e7215b26c955aa3fcb56ba9029c9cd81c57b (master).

Upstream Shortlog
-----------------
parent 54cfc23c
Loading
Loading
Loading
Loading
+1 −0
Original line number Diff line number Diff line
@@ -85,6 +85,7 @@ struct CMtrans_services_s CMstatic_trans_svcs = {INT_CMmalloc, INT_CMrealloc, IN
						 CM_fd_write_select, 
						 CM_fd_remove_select, 
						 CMtransport_trace,
						 CMtransport_verbose,
						 CMConnection_create,
						 INT_CMadd_shutdown_task,
						 INT_CMadd_periodic_task,
+1 −5
Original line number Diff line number Diff line
@@ -315,11 +315,6 @@ extern void IntCMConn_write_unlock(CMConnection cl, char *file,
					 int line);
extern int CMConn_write_locked(CMConnection cl);

typedef enum _CMTraceType {
    CMAlwaysTrace, CMControlVerbose, CMConnectionVerbose, CMLowLevelVerbose, CMDataVerbose, CMTransportVerbose, CMFormatVerbose, CMFreeVerbose, CMAttrVerbose, CMBufferVerbose, EVerbose, EVWarning, CMIBTransportVerbose, EVdfgVerbose, 
    CMLastTraceType /* add before this one */
} CMTraceType;

extern void 
CMtrace_out(CManager cm, CMTraceType trace_type, char *format, ...);

@@ -336,6 +331,7 @@ extern CMincoming_format_list
CMidentify_CMformat(CManager cm, FFSTypeHandle format);

extern void CMtransport_trace(CManager cm, const char *format, ...);
extern void CMtransport_verbose(CManager cm, CMTraceType trace, const char *format, ...);

extern void
CM_fd_add_select(CManager cm, int fd, select_list_func handler_func,
+7 −0
Original line number Diff line number Diff line
@@ -23,6 +23,11 @@ typedef struct _CMbuffer {
    void *return_callback_data;
} *CMbuffer;

typedef enum _CMTraceType {
    CMAlwaysTrace, CMControlVerbose, CMConnectionVerbose, CMLowLevelVerbose, CMDataVerbose, CMTransportVerbose, CMFormatVerbose, CMFreeVerbose, CMAttrVerbose, CMBufferVerbose, EVerbose, EVWarning, CMSelectVerbose, EVdfgVerbose, 
    CMLastTraceType /* add before this one */
} CMTraceType;

typedef void *(*CMTransport_malloc_func)(int);
typedef void *(*CMTransport_realloc_func)(void*, int);
typedef void (*CMTransport_free_func)(void*);
@@ -38,6 +43,7 @@ typedef void (*CMTransport_fd_add_select)(CManager cm, int fd, select_list_func
					  void *param1, void *param2);
typedef void (*CMTransport_fd_remove_select)(CManager cm, int fd);
typedef void (*CMTransport_trace)(CManager cm, const char *format, ...);
typedef void (*CMTransport_verbose)(CManager cm, CMTraceType trace, const char *format, ...);
typedef CMConnection (*CMTransport_conn_create)(transport_entry trans,
						void *transport_data,
						attr_list conn_attrs);
@@ -68,6 +74,7 @@ typedef struct CMtrans_services_s {
    CMTransport_fd_add_select fd_write_select;
    CMTransport_fd_remove_select fd_remove_select;
    CMTransport_trace trace_out;
    CMTransport_verbose verbose;
    CMTransport_conn_create connection_create;
    CMTransport_add_shut_task add_shutdown_task;
    CMTransport_add_period_task add_periodic_task;
+29 −2
Original line number Diff line number Diff line
@@ -52,7 +52,7 @@ extern int CMtrace_init(CManager cm, CMTraceType trace_type)
    CMtrace_val[CMAttrVerbose] = (getenv("CMAttrVerbose") != NULL);
    CMtrace_val[CMBufferVerbose] = (getenv("CMBufferVerbose") != NULL);
    CMtrace_val[EVerbose] = (getenv("EVerbose") != NULL);
    CMtrace_val[CMIBTransportVerbose] = (getenv("CMIBTransportVerbose") != NULL);    
    CMtrace_val[CMSelectVerbose] = (getenv("CMSelectVerbose") != NULL);    
    CMtrace_val[EVdfgVerbose] = (getenv("EVdfgVerbose") != NULL);
    CMtrace_timing = (getenv("CMTraceTiming") != NULL);
    CMtrace_PID = (getenv("CMTracePID") != NULL);
@@ -97,7 +97,7 @@ extern int CMtrace_init(CManager cm, CMTraceType trace_type)
	    if (CMtrace_val[CMBufferVerbose]) fprintf(cm->CMTrace_file, "CMBufferVerbose, ");
	    if (CMtrace_val[EVerbose]) fprintf(cm->CMTrace_file, "EVerbose, ");
	    if (CMtrace_val[EVWarning]) fprintf(cm->CMTrace_file, "EVWarning, ");
	    if (CMtrace_val[CMIBTransportVerbose]) fprintf(cm->CMTrace_file, "CMIBTransportVerbose, ");
	    if (CMtrace_val[CMSelectVerbose]) fprintf(cm->CMTrace_file, "CMSelectVerbose, ");
	    if (CMtrace_val[EVdfgVerbose]) fprintf(cm->CMTrace_file, "EVdfgVerbose, ");
	    fprintf(cm->CMTrace_file, "\n");
	}
@@ -150,6 +150,33 @@ CMtrace_out(CManager cm, CMTraceType trace_type, char *format, ...)
#endif
}
 */
extern void
CMtransport_verbose(CManager cm, CMTraceType trace, const char *format, ...)
{
#ifndef MODULE
    va_list ap;
    if (CMtrace_on(cm, trace)) {
        if (CMtrace_PID) {
            fprintf(cm->CMTrace_file, "P%lxT%lx - ", (long) getpid(), (long)thr_thread_self());
        }
        if (CMtrace_timing) {
            TRACE_TIME_DECL;
            TRACE_TIME_GET;
            fprintf(cm->CMTrace_file, TRACE_TIME_PRINTDETAILS);
        }
#ifdef STDC_HEADERS
	va_start(ap, format);
#else
	va_start(ap);
#endif
	vfprintf(cm->CMTrace_file, format, ap);
	va_end(ap);
	(void)cm;
	fprintf(cm->CMTrace_file, "\n");
    }
#endif
}

extern void
CMtransport_trace(CManager cm, const char *format, ...)
{
+67 −44
Original line number Diff line number Diff line
@@ -48,6 +48,7 @@ typedef struct enet_client_data {
    queued_data pending_data;
    int wake_write_fd;
    int wake_read_fd;
    enet_uint32 last_host_service_zero_return;
} *enet_client_data_ptr;

typedef struct enet_connection_data {
@@ -58,7 +59,7 @@ typedef struct enet_connection_data {
    CMbuffer read_buffer;
    int read_buffer_len;
    ENetPacket *packet;
    enet_client_data_ptr sd;
    enet_client_data_ptr ecd;
    CMConnection conn;
} *enet_conn_data_ptr;

@@ -72,6 +73,8 @@ static atom_t CM_ENET_CONN_TIMEOUT = -1;
static atom_t CM_ENET_CONN_REUSE = -1;
static atom_t CM_TRANSPORT = -1;

static enet_uint32 enet_host_service_warn_interval = 0;

extern attr_list
libcmenet_LTX_non_blocking_listen(CManager cm, CMtrans_services svc,
				  transport_entry trans, attr_list listen_info);
@@ -116,7 +119,7 @@ create_enet_conn_data(CMtrans_services svc)
}

static void *
enet_accept_conn(enet_client_data_ptr sd, transport_entry trans, 
enet_accept_conn(enet_client_data_ptr ecd, transport_entry trans, 
		 ENetAddress *address);

static void free_func(void *packet)
@@ -168,6 +171,12 @@ enet_service_network(CManager cm, void *void_trans)
    }

    while (ecd->server && (enet_host_service (ecd->server, & event, 0) > 0)) {
        if (enet_host_service_warn_interval && 
            (enet_time_get() > (ecd->last_host_service_zero_return + enet_host_service_warn_interval))) {
            fprintf(stderr, "WARNING, time between zero return for enet_host_service = %d msecs\n",
                    enet_time_get() - ecd->last_host_service_zero_return);
        }

        switch (event.type) {
	case ENET_EVENT_TYPE_NONE:
	    break;
@@ -211,6 +220,7 @@ enet_service_network(CManager cm, void *void_trans)
        }
	}
    }
    ecd->last_host_service_zero_return = enet_time_get();
}

static
@@ -292,17 +302,17 @@ static int conn_reuse = 1;
 * Accept enet connection
 */
static void *
enet_accept_conn(enet_client_data_ptr sd, transport_entry trans, 
enet_accept_conn(enet_client_data_ptr ecd, transport_entry trans, 
		 ENetAddress *address)
{
    CMtrans_services svc = sd->svc;
    CMtrans_services svc = ecd->svc;
    enet_conn_data_ptr enet_conn_data;

    CMConnection conn;
    attr_list conn_attr_list = NULL;;

    enet_conn_data = create_enet_conn_data(svc);
    enet_conn_data->sd = sd;
    enet_conn_data->ecd = ecd;
    conn_attr_list = create_attr_list();
    conn = svc->connection_create(trans, enet_conn_data, conn_attr_list);
    enet_conn_data->conn = conn;
@@ -334,7 +344,7 @@ enet_accept_conn(enet_client_data_ptr sd, transport_entry trans,
     * try flushing connection verify message here to make 
     * sure it's established 
     */
    enet_host_flush(sd->server);
    enet_host_flush(ecd->server);

    return enet_conn_data;
}
@@ -354,7 +364,7 @@ initiate_conn(CManager cm, CMtrans_services svc, transport_entry trans,
	      attr_list conn_attr_list)
{
    int int_port_num;
    enet_client_data_ptr sd = (enet_client_data_ptr) trans->trans_data;
    enet_client_data_ptr ecd = (enet_client_data_ptr) trans->trans_data;
    char *host_name;
    int host_ip = 0;
    struct in_addr sin_addr;
@@ -424,13 +434,13 @@ initiate_conn(CManager cm, CMtrans_services svc, transport_entry trans,
    }
    address.port = (unsigned short) int_port_num;

    if (sd->server == NULL) {
    if (ecd->server == NULL) {
	attr_list l = libcmenet_LTX_non_blocking_listen(cm, svc, trans, NULL);
	if (l) free_attr_list(l);
    }

    /* Initiate the connection, allocating the two channels 0 and 1. */
    peer = enet_host_connect (sd->server, & address, 1, 0);    
    peer = enet_host_connect (ecd->server, & address, 1, 0);    
    peer->data = enet_conn_data;
    svc->trace_out(cm, "ENET ========   On init Assigning peer %p has data %p\n", peer, enet_conn_data);
    if (peer == NULL)
@@ -445,12 +455,20 @@ initiate_conn(CManager cm, CMtrans_services svc, transport_entry trans,
    int got_connection = 0;
    enet_uint32 end = enet_time_get() + timeout;
    while (!finished) {
        int ret = enet_host_service (sd->server, & event, 100); 
        int ret = enet_host_service (ecd->server, & event, 100); 
        enet_uint32 now = enet_time_get();
        if (enet_host_service_warn_interval && 
            (enet_time_get() > (ecd->last_host_service_zero_return + enet_host_service_warn_interval))) {
            fprintf(stderr, "WARNING, time between zero return for enet_host_service = %d msecs\n",
                    enet_time_get() - ecd->last_host_service_zero_return);
        }
        if (now > end) {
            finished = 1;
        }
        if (ret <= 0) continue;
        if (ret <= 0) {
            ecd->last_host_service_zero_return = enet_time_get();
            continue;
        }
        switch(event.type) {
        case ENET_EVENT_TYPE_CONNECT: {
            if (event.peer != peer) {
@@ -461,15 +479,15 @@ initiate_conn(CManager cm, CMtrans_services svc, transport_entry trans,
                               inet_ntoa(addr),
                               event.peer->address.port);
                
                enet_connection_data = enet_accept_conn(sd, trans, &event.peer->address);
                enet_connection_data = enet_accept_conn(ecd, trans, &event.peer->address);
                
                /* Store any relevant client information here. */
                svc->trace_out(cm, "ENET ========   Assigning peer %p has data %p\n", event.peer, enet_connection_data);
                event.peer->data = enet_connection_data;
                ((enet_conn_data_ptr)enet_connection_data)->peer = event.peer;
                enet_host_flush (sd->server);
                enet_host_flush (ecd->server);
            } else {
                enet_host_flush (sd->server);
                enet_host_flush (ecd->server);
                svc->trace_out(cm, "Connection to %s:%d succeeded.\n", inet_ntoa(sin_addr), address.port);
                finished = 1;
                got_connection = 1;
@@ -501,10 +519,10 @@ initiate_conn(CManager cm, CMtrans_services svc, transport_entry trans,
            entry->econn_d = econn_d;
            entry->packet = event.packet;
            /* add at the end */
            if (econn_d->sd->pending_data == NULL) {
                econn_d->sd->pending_data = entry;
            if (econn_d->ecd->pending_data == NULL) {
                econn_d->ecd->pending_data = entry;
            } else {
                queued_data last = econn_d->sd->pending_data;
                queued_data last = econn_d->ecd->pending_data;
                while (last->next != NULL) {
                    last = last->next;
                }
@@ -523,7 +541,7 @@ initiate_conn(CManager cm, CMtrans_services svc, transport_entry trans,
    enet_conn_data->remote_host = host_name == NULL ? NULL : strdup(host_name);
    enet_conn_data->remote_IP = htonl(host_ip);
    enet_conn_data->remote_contact_port = int_port_num;
    enet_conn_data->sd = sd;
    enet_conn_data->ecd = ecd;
    enet_conn_data->peer = peer;
    peer->data = enet_conn_data;
    return 1;
@@ -566,7 +584,7 @@ libcmenet_LTX_self_check(CManager cm, CMtrans_services svc,
			 transport_entry trans, attr_list attrs)
{

    enet_client_data_ptr sd = trans->trans_data;
    enet_client_data_ptr ecd = trans->trans_data;
    int host_addr;
    int int_port_num;
    char *host_name;
@@ -605,8 +623,8 @@ libcmenet_LTX_self_check(CManager cm, CMtrans_services svc,
	svc->trace_out(cm, "CMself check - Host IP addrs don't match, %lx, %lx", IP, host_addr);
	return 0;
    }
    if (int_port_num != sd->listen_port) {
	svc->trace_out(cm, "CMself check - Ports don't match, %d, %d", int_port_num, sd->listen_port);
    if (int_port_num != ecd->listen_port) {
	svc->trace_out(cm, "CMself check - Ports don't match, %d, %d", int_port_num, ecd->listen_port);
	return 0;
    }
    svc->trace_out(cm, "CMself check returning TRUE");
@@ -666,7 +684,7 @@ libcmenet_LTX_connection_eq(CManager cm, CMtrans_services svc,
}

static attr_list
build_listen_attrs(CManager cm, CMtrans_services svc, enet_client_data_ptr sd,
build_listen_attrs(CManager cm, CMtrans_services svc, enet_client_data_ptr ecd,
		   attr_list listen_info, int int_port_num)
{
    char host_name[256];
@@ -681,9 +699,9 @@ build_listen_attrs(CManager cm, CMtrans_services svc, enet_client_data_ptr sd,

    ret_list = create_attr_list();

    if (sd) {
	sd->hostname = strdup(host_name);
	sd->listen_port = int_port_num;
    if (ecd) {
	ecd->hostname = strdup(host_name);
	ecd->listen_port = int_port_num;
    }
    if ((IP != 0) && !use_hostname) {
	add_attr(ret_list, CM_ENET_ADDR, Attr_Int4,
@@ -821,7 +839,7 @@ libcmenet_LTX_non_blocking_listen(CManager cm, CMtrans_services svc,
    svc->fd_add_select(cm, enet_host_get_sock_fd (server), 
		       (select_list_func) enet_service_network, (void*)cm, (void*)trans);

    svc->add_periodic_task(cm, 1, 0, (CMPollFunc) enet_service_network_lock, (void*)trans);
    svc->add_periodic_task(cm, 0, 100, (CMPollFunc) enet_service_network_lock, (void*)trans);

    svc->trace_out(enet_data->cm, "CMENET Adding read_wake_fd as action on fd %d",
		   enet_data->wake_read_fd);
@@ -907,7 +925,7 @@ libcmenet_LTX_writev_func(CMtrans_services svc, enet_conn_data_ptr ecd,
	length += iov[i].iov_len;
    }

    svc->trace_out(ecd->sd->cm, "CMENET vector write of %d bytes on peer %p",
    svc->trace_out(ecd->ecd->cm, "CMENET vector write of %d bytes on peer %p",
		   length, ecd->peer);

   /* Create a reliable packet of the right size */
@@ -924,20 +942,20 @@ libcmenet_LTX_writev_func(CMtrans_services svc, enet_conn_data_ptr ecd,
    /* Send the packet to the peer over channel id 0. */
    if (enet_peer_send (ecd->peer, 0, packet) == -1) {
        enet_packet_destroy(packet);
        svc->trace_out(ecd->sd->cm, "ENET  ======  failed to send a packet to peer %p, state %d\n", ecd->peer, ecd->peer->state);
        svc->trace_out(ecd->ecd->cm, "ENET  ======  failed to send a packet to peer %p, state %d\n", ecd->peer, ecd->peer->state);
	return -1;
    }

    wake_enet_server_thread(ecd->sd);
    wake_enet_server_thread(ecd->ecd);

    if (last_flush_call == 0) {
	enet_host_flush(ecd->sd->server);
	enet_host_flush(ecd->ecd->server);
	last_flush_call = time(NULL);
    } else {
	time_t now = time(NULL);
	if (now > last_flush_call) {
	    last_flush_call = now;
	    enet_host_flush(ecd->sd->server);
	    enet_host_flush(ecd->ecd->server);
	}
    }
    return iovcnt;
@@ -947,28 +965,28 @@ libcmenet_LTX_writev_func(CMtrans_services svc, enet_conn_data_ptr ecd,
static int enet_global_init = 0;

static void
free_enet_data(CManager cm, void *sdv)
free_enet_data(CManager cm, void *ecdv)
{
    enet_client_data_ptr sd = (enet_client_data_ptr) sdv;
    CMtrans_services svc = sd->svc;
    enet_client_data_ptr ecd = (enet_client_data_ptr) ecdv;
    CMtrans_services svc = ecd->svc;
    (void)cm;
    if (sd->hostname != NULL)
	svc->free_func(sd->hostname);
    svc->free_func(sd);
    if (ecd->hostname != NULL)
	svc->free_func(ecd->hostname);
    svc->free_func(ecd);
}

static void
shutdown_enet_thread
(CManager cm, void *sdv)
(CManager cm, void *ecdv)
{
    enet_client_data_ptr sd = (enet_client_data_ptr) sdv;
    CMtrans_services svc = sd->svc;
    enet_client_data_ptr ecd = (enet_client_data_ptr) ecdv;
    CMtrans_services svc = ecd->svc;
    (void)cm;
    if (sd->server != NULL) {
	ENetHost * server = sd->server;
	enet_host_flush(sd->server);
    if (ecd->server != NULL) {
	ENetHost * server = ecd->server;
	enet_host_flush(ecd->server);
	svc->fd_remove_select(cm, enet_host_get_sock_fd (server));
	sd->server = NULL;
	ecd->server = NULL;
	enet_host_destroy(server);
    }
}
@@ -979,6 +997,7 @@ libcmenet_LTX_initialize(CManager cm, CMtrans_services svc,
{
    static int atom_init = 0;
    int filedes[2];
    char *env = getenv("ENET_HOST_SERVICE_WARN_INTERVAL");

    enet_client_data_ptr enet_data;
    (void)attrs;
@@ -1003,6 +1022,10 @@ libcmenet_LTX_initialize(CManager cm, CMtrans_services svc,
	CM_ENET_CONN_REUSE = attr_atom_from_string("CM_ENET_CONN_REUSE");
	atom_init++;
    }
    if (env) {
        sscanf(env, "%d", &enet_host_service_warn_interval);
        fprintf(stderr, "DEBUG: Setting enet_host_service_warn_interval to %d\n", enet_host_service_warn_interval);
    }
    enet_data = svc->malloc_func(sizeof(struct enet_client_data));
    enet_data->cm = cm;
    enet_data->hostname = NULL;
Loading