Skip to content

Commit

Permalink
librdmacm/cmtime: Have client use work queue threads
Browse files Browse the repository at this point in the history
Have the client use a work queue to multi-thread the QP
creation, address resolution, route resolution, and QP
modify operations.

Signed-off-by: Sean Hefty <[email protected]>
  • Loading branch information
Sean Hefty committed Apr 23, 2024
1 parent 405554d commit ca92ba0
Showing 1 changed file with 74 additions and 45 deletions.
119 changes: 74 additions & 45 deletions librdmacm/examples/cmtime.c
Expand Up @@ -106,7 +106,10 @@ static const char *step_str[] = {
struct node {
struct work_item work;
struct rdma_cm_id *id;

struct ibv_qp *qp;
enum ibv_qp_state next_qps;
enum step next_step;

uint64_t times[STEP_CNT][2];
int error;
Expand Down Expand Up @@ -198,15 +201,14 @@ static int open_verbs(struct rdma_cm_id *id)
return 0;
}

static int create_qp(struct node *n)
static void create_qp(struct work_item *item)
{
struct node *n = container_of(item, struct node, work);
struct ibv_qp_init_attr attr;
int ret;

if (need_verbs()) {
ret = open_verbs(n->id);
if (ret)
return ret;
if (open_verbs(n->id))
return;
}

attr.qp_context = n;
Expand All @@ -226,16 +228,14 @@ static int create_qp(struct node *n)
if (!use_qpn) {
n->qp = ibv_create_qp(pd, &attr);
if (!n->qp) {
ret = -errno;
perror("ibv_create_qp");
n->error = 1;
}
} else {
sleep_us(mimic_qp_delay);
}
end_perf(n, STEP_CREATE_QP);

return ret;
completed[STEP_CREATE_QP]++;
}

static int
Expand All @@ -252,9 +252,9 @@ modify_qp(struct node *n, enum ibv_qp_state state, enum step attr_step)
n->error = 1;
return ret;
}
end_perf(n, attr_step);
end_perf(n, attr_step++);

start_perf(n, attr_step + 1);
start_perf(n, attr_step);
if (n->qp) {
ret = ibv_modify_qp(n->qp, &attr, mask);
if (ret) {
Expand All @@ -265,9 +265,17 @@ modify_qp(struct node *n, enum ibv_qp_state state, enum step attr_step)
} else {
sleep_us(mimic_qp_delay);
}
end_perf(n, attr_step + 1);
end_perf(n, attr_step);
completed[attr_step]++;

return 0;
return ret;
}

static void modify_qp_work(struct work_item *item)
{
struct node *n = container_of(item, struct node, work);

modify_qp(n, n->next_qps, n->next_step);
}

static void init_conn_param(struct node *n, struct rdma_conn_param *param)
Expand Down Expand Up @@ -299,8 +307,38 @@ static void connect_qp(struct node *n)
}
}

static void conn_handler(struct node *n)
static void resolve_addr(struct work_item *item)
{
struct node *n = container_of(item, struct node, work);
int ret;

n->retries = retries;
start_perf(n, STEP_RESOLVE_ADDR);
ret = rdma_resolve_addr(n->id, rai->ai_src_addr,
rai->ai_dst_addr, timeout);
if (ret) {
perror("rdma_resolve_addr");
n->error = 1;
}
}

static void resolve_route(struct work_item *item)
{
struct node *n = container_of(item, struct node, work);
int ret;

n->retries = retries;
start_perf(n, STEP_RESOLVE_ROUTE);
ret = rdma_resolve_route(n->id, timeout);
if (ret) {
perror("rdma_resolve_route");
n->error = 1;
}
}

static void connect_response(struct work_item *item)
{
struct node *n = container_of(item, struct node, work);
int ret;

if (n->error)
Expand Down Expand Up @@ -330,13 +368,10 @@ static void conn_handler(struct node *n)
static void req_handler(struct work_item *item)
{
struct node *n = container_of(item, struct node, work);

struct rdma_conn_param conn_param;
int ret;

ret = create_qp(n);
if (ret)
goto err1;
create_qp(&n->work);

ret = modify_qp(n, IBV_QPS_INIT, STEP_INIT_QP_ATTR);
if (ret)
Expand All @@ -362,13 +397,20 @@ static void req_handler(struct work_item *item)
err2:
if (n->qp)
ibv_destroy_qp(n->qp);
err1:
printf("failing connection request\n");
rdma_reject(n->id, NULL, 0);
rdma_destroy_id(n->id);
return;
}

static void client_disconnect(struct work_item *item)
{
struct node *n = container_of(item, struct node, work);

start_perf(n, STEP_DISCONNECT);
rdma_disconnect(n->id);
}

static void server_disconnect(struct work_item *item)
{
struct node *n = container_of(item, struct node, work);
Expand Down Expand Up @@ -406,7 +448,7 @@ static void cma_handler(struct rdma_cm_id *id, struct rdma_cm_event *event)
wq_insert(&wq, &n->work, req_handler);
break;
case RDMA_CM_EVENT_CONNECT_RESPONSE:
conn_handler(n);
wq_insert(&wq, &n->work, connect_response);
break;
case RDMA_CM_EVENT_ESTABLISHED:
if (++completed[STEP_CONNECT] >= connections)
Expand Down Expand Up @@ -439,7 +481,7 @@ static void cma_handler(struct rdma_cm_id *id, struct rdma_cm_event *event)
printf("event: %s, error: %d\n",
rdma_event_str(event->event), event->status);
n->error = 1;
conn_handler(n);
connect_response(&n->work);
break;
case RDMA_CM_EVENT_DISCONNECTED:
if (is_client()) {
Expand Down Expand Up @@ -629,15 +671,7 @@ static int client_connect(int iter)
for (i = 0; i < iter; i++) {
if (nodes[i].error)
continue;
nodes[i].retries = retries;
start_perf(&nodes[i], STEP_RESOLVE_ADDR);
ret = rdma_resolve_addr(nodes[i].id, rai->ai_src_addr,
rai->ai_dst_addr, timeout);
if (ret) {
perror("failure getting addr");
nodes[i].error = 1;
continue;
}
wq_insert(&wq, &nodes[i].work, resolve_addr);
}
while (completed[STEP_RESOLVE_ADDR] != iter)
sched_yield();
Expand All @@ -648,14 +682,7 @@ static int client_connect(int iter)
for (i = 0; i < iter; i++) {
if (nodes[i].error)
continue;
nodes[i].retries = retries;
start_perf(&nodes[i], STEP_RESOLVE_ROUTE);
ret = rdma_resolve_route(nodes[i].id, timeout);
if (ret) {
perror("failure resolving route");
nodes[i].error = 1;
continue;
}
wq_insert(&wq, &nodes[i].work, resolve_route);
}
while (completed[STEP_RESOLVE_ROUTE] != iter)
sched_yield();
Expand All @@ -666,21 +693,23 @@ static int client_connect(int iter)
for (i = 0; i < iter; i++) {
if (nodes[i].error)
continue;
ret = create_qp(&nodes[i]);
if (ret)
continue;
wq_insert(&wq, &nodes[i].work, create_qp);
}
while (completed[STEP_CREATE_QP] != iter)
sched_yield();
end_time(STEP_CREATE_QP);

printf("\tModify QPs to INIT\n");
start_time(STEP_INIT_QP);
for (i = 0; i < iter; i++) {
if (nodes[i].error)
continue;
ret = modify_qp(&nodes[i], IBV_QPS_INIT, STEP_INIT_QP_ATTR);
if (ret)
continue;
nodes[i].next_qps = IBV_QPS_INIT;
nodes[i].next_step = STEP_INIT_QP_ATTR;
wq_insert(&wq, &nodes[i].work, modify_qp_work);
}
while (completed[STEP_INIT_QP] != iter)
sched_yield();
end_time(STEP_INIT_QP);

printf("\tConnecting\n");
Expand All @@ -702,15 +731,15 @@ static int client_connect(int iter)
for (i = 0; i < iter; i++) {
if (nodes[i].error)
continue;
start_perf(&nodes[i], STEP_DISCONNECT);
rdma_disconnect(nodes[i].id);
wq_insert(&wq, &nodes[i].work, client_disconnect);
}
while (completed[STEP_DISCONNECT] != iter)
sched_yield();
end_time(STEP_DISCONNECT);

oob_sendrecv(oob_sock, STEP_DISCONNECT);

/* Wait for event threads to exit before destroying resources */
printf("\tDestroying QPs\n");
destroy_qps(iter);
printf("\tDestroying IDs\n");
Expand Down

0 comments on commit ca92ba0

Please sign in to comment.