From ca92ba0b750a0764baaeab1229128cbc3c9f7893 Mon Sep 17 00:00:00 2001 From: Sean Hefty Date: Wed, 17 Apr 2024 10:41:25 -0700 Subject: [PATCH] librdmacm/cmtime: Have client use work queue threads Have the client use a work queue to multi-thread the QP creation, address resolution, route resolution, and QP modify operations. Signed-off-by: Sean Hefty --- librdmacm/examples/cmtime.c | 119 ++++++++++++++++++++++-------------- 1 file changed, 74 insertions(+), 45 deletions(-) diff --git a/librdmacm/examples/cmtime.c b/librdmacm/examples/cmtime.c index 1468d733c..45c1dd622 100644 --- a/librdmacm/examples/cmtime.c +++ b/librdmacm/examples/cmtime.c @@ -106,7 +106,10 @@ static const char *step_str[] = { struct node { struct work_item work; struct rdma_cm_id *id; + struct ibv_qp *qp; + enum ibv_qp_state next_qps; + enum step next_step; uint64_t times[STEP_CNT][2]; int error; @@ -198,15 +201,14 @@ static int open_verbs(struct rdma_cm_id *id) return 0; } -static int create_qp(struct node *n) +static void create_qp(struct work_item *item) { + struct node *n = container_of(item, struct node, work); struct ibv_qp_init_attr attr; - int ret; if (need_verbs()) { - ret = open_verbs(n->id); - if (ret) - return ret; + if (open_verbs(n->id)) + return; } attr.qp_context = n; @@ -226,7 +228,6 @@ static int create_qp(struct node *n) if (!use_qpn) { n->qp = ibv_create_qp(pd, &attr); if (!n->qp) { - ret = -errno; perror("ibv_create_qp"); n->error = 1; } @@ -234,8 +235,7 @@ static int create_qp(struct node *n) sleep_us(mimic_qp_delay); } end_perf(n, STEP_CREATE_QP); - - return ret; + completed[STEP_CREATE_QP]++; } static int @@ -252,9 +252,9 @@ modify_qp(struct node *n, enum ibv_qp_state state, enum step attr_step) n->error = 1; return ret; } - end_perf(n, attr_step); + end_perf(n, attr_step++); - start_perf(n, attr_step + 1); + start_perf(n, attr_step); if (n->qp) { ret = ibv_modify_qp(n->qp, &attr, mask); if (ret) { @@ -265,9 +265,17 @@ modify_qp(struct node *n, enum ibv_qp_state state, enum step attr_step) } else { sleep_us(mimic_qp_delay); } - end_perf(n, attr_step + 1); + end_perf(n, attr_step); + completed[attr_step]++; - return 0; + return ret; +} + +static void modify_qp_work(struct work_item *item) +{ + struct node *n = container_of(item, struct node, work); + + modify_qp(n, n->next_qps, n->next_step); } static void init_conn_param(struct node *n, struct rdma_conn_param *param) @@ -299,8 +307,38 @@ static void connect_qp(struct node *n) } } -static void conn_handler(struct node *n) +static void resolve_addr(struct work_item *item) +{ + struct node *n = container_of(item, struct node, work); + int ret; + + n->retries = retries; + start_perf(n, STEP_RESOLVE_ADDR); + ret = rdma_resolve_addr(n->id, rai->ai_src_addr, + rai->ai_dst_addr, timeout); + if (ret) { + perror("rdma_resolve_addr"); + n->error = 1; + } +} + +static void resolve_route(struct work_item *item) { + struct node *n = container_of(item, struct node, work); + int ret; + + n->retries = retries; + start_perf(n, STEP_RESOLVE_ROUTE); + ret = rdma_resolve_route(n->id, timeout); + if (ret) { + perror("rdma_resolve_route"); + n->error = 1; + } +} + +static void connect_response(struct work_item *item) +{ + struct node *n = container_of(item, struct node, work); int ret; if (n->error) @@ -330,13 +368,10 @@ static void conn_handler(struct node *n) static void req_handler(struct work_item *item) { struct node *n = container_of(item, struct node, work); - struct rdma_conn_param conn_param; int ret; - ret = create_qp(n); - if (ret) - goto err1; + create_qp(&n->work); ret = modify_qp(n, IBV_QPS_INIT, STEP_INIT_QP_ATTR); if (ret) @@ -362,13 +397,20 @@ static void req_handler(struct work_item *item) err2: if (n->qp) ibv_destroy_qp(n->qp); -err1: printf("failing connection request\n"); rdma_reject(n->id, NULL, 0); rdma_destroy_id(n->id); return; } +static void client_disconnect(struct work_item *item) +{ + struct node *n = container_of(item, struct node, work); + + start_perf(n, STEP_DISCONNECT); + rdma_disconnect(n->id); +} + static void server_disconnect(struct work_item *item) { struct node *n = container_of(item, struct node, work); @@ -406,7 +448,7 @@ static void cma_handler(struct rdma_cm_id *id, struct rdma_cm_event *event) wq_insert(&wq, &n->work, req_handler); break; case RDMA_CM_EVENT_CONNECT_RESPONSE: - conn_handler(n); + wq_insert(&wq, &n->work, connect_response); break; case RDMA_CM_EVENT_ESTABLISHED: if (++completed[STEP_CONNECT] >= connections) @@ -439,7 +481,7 @@ static void cma_handler(struct rdma_cm_id *id, struct rdma_cm_event *event) printf("event: %s, error: %d\n", rdma_event_str(event->event), event->status); n->error = 1; - conn_handler(n); + connect_response(&n->work); break; case RDMA_CM_EVENT_DISCONNECTED: if (is_client()) { @@ -629,15 +671,7 @@ static int client_connect(int iter) for (i = 0; i < iter; i++) { if (nodes[i].error) continue; - nodes[i].retries = retries; - start_perf(&nodes[i], STEP_RESOLVE_ADDR); - ret = rdma_resolve_addr(nodes[i].id, rai->ai_src_addr, - rai->ai_dst_addr, timeout); - if (ret) { - perror("failure getting addr"); - nodes[i].error = 1; - continue; - } + wq_insert(&wq, &nodes[i].work, resolve_addr); } while (completed[STEP_RESOLVE_ADDR] != iter) sched_yield(); @@ -648,14 +682,7 @@ static int client_connect(int iter) for (i = 0; i < iter; i++) { if (nodes[i].error) continue; - nodes[i].retries = retries; - start_perf(&nodes[i], STEP_RESOLVE_ROUTE); - ret = rdma_resolve_route(nodes[i].id, timeout); - if (ret) { - perror("failure resolving route"); - nodes[i].error = 1; - continue; - } + wq_insert(&wq, &nodes[i].work, resolve_route); } while (completed[STEP_RESOLVE_ROUTE] != iter) sched_yield(); @@ -666,10 +693,10 @@ static int client_connect(int iter) for (i = 0; i < iter; i++) { if (nodes[i].error) continue; - ret = create_qp(&nodes[i]); - if (ret) - continue; + wq_insert(&wq, &nodes[i].work, create_qp); } + while (completed[STEP_CREATE_QP] != iter) + sched_yield(); end_time(STEP_CREATE_QP); printf("\tModify QPs to INIT\n"); @@ -677,10 +704,12 @@ static int client_connect(int iter) for (i = 0; i < iter; i++) { if (nodes[i].error) continue; - ret = modify_qp(&nodes[i], IBV_QPS_INIT, STEP_INIT_QP_ATTR); - if (ret) - continue; + nodes[i].next_qps = IBV_QPS_INIT; + nodes[i].next_step = STEP_INIT_QP_ATTR; + wq_insert(&wq, &nodes[i].work, modify_qp_work); } + while (completed[STEP_INIT_QP] != iter) + sched_yield(); end_time(STEP_INIT_QP); printf("\tConnecting\n"); @@ -702,8 +731,7 @@ static int client_connect(int iter) for (i = 0; i < iter; i++) { if (nodes[i].error) continue; - start_perf(&nodes[i], STEP_DISCONNECT); - rdma_disconnect(nodes[i].id); + wq_insert(&wq, &nodes[i].work, client_disconnect); } while (completed[STEP_DISCONNECT] != iter) sched_yield(); @@ -711,6 +739,7 @@ static int client_connect(int iter) oob_sendrecv(oob_sock, STEP_DISCONNECT); + /* Wait for event threads to exit before destroying resources */ printf("\tDestroying QPs\n"); destroy_qps(iter); printf("\tDestroying IDs\n");