Skip to content

Commit

Permalink
librdmacm/cmtime: Rework work queue abstraction
Browse files Browse the repository at this point in the history
Isolate the work queue abstraction from the cmtime.  Define
a work_item structure to allow individual work items to
invoke separate callbacks.  Add support for multiple threads.
Move the code from the cmtime source into the common source
for future reusability.

This change will enable cmtime to use a single work queue
to process different steps in the connection process.  Having
a single work queue will limit the number of threads that
the test will need to spawn.

Signed-off-by: Sean Hefty <[email protected]>
  • Loading branch information
Sean Hefty committed Apr 18, 2024
1 parent 48816f1 commit a7277c0
Show file tree
Hide file tree
Showing 4 changed files with 160 additions and 119 deletions.
3 changes: 2 additions & 1 deletion librdmacm/examples/CMakeLists.txt
Expand Up @@ -2,9 +2,10 @@
add_library(rdmacm_tools STATIC
common.c
)
target_link_libraries(rdmacm_tools LINK_PRIVATE ${CMAKE_THREAD_LIBS_INIT})

rdma_executable(cmtime cmtime.c)
target_link_libraries(cmtime LINK_PRIVATE rdmacm ${CMAKE_THREAD_LIBS_INIT} rdmacm_tools)
target_link_libraries(cmtime LINK_PRIVATE rdmacm rdmacm_tools)

rdma_executable(mckey mckey.c)
target_link_libraries(mckey LINK_PRIVATE rdmacm ${CMAKE_THREAD_LIBS_INIT} rdmacm_tools)
Expand Down
137 changes: 19 additions & 118 deletions librdmacm/examples/cmtime.c
Expand Up @@ -43,6 +43,7 @@
#include <fcntl.h>
#include <unistd.h>
#include <netinet/tcp.h>
#include <ccan/container_of.h>

#include <rdma/rdma_cma.h>
#include "common.h"
Expand Down Expand Up @@ -103,7 +104,7 @@ static const char *step_str[] = {
};

struct node {
struct node *next;
struct work_item work;
struct rdma_cm_id *id;
struct ibv_qp *qp;

Expand All @@ -112,24 +113,13 @@ struct node {
int retries;
};

struct work_queue {
pthread_mutex_t lock;
pthread_cond_t cond;
pthread_t thread;

void (*work_handler)(struct node *node);
struct node *head;
struct node *tail;
};

static void *wq_handler(void *arg);
static struct work_queue req_wq;
static struct work_queue disc_wq;
static struct work_queue wq;

static struct node *nodes;
static int node_index;
static uint64_t times[STEP_CNT][2];
static int connections;
static int num_threads = 1;
static volatile int disc_events;

static volatile int completed[STEP_CNT];
Expand All @@ -148,74 +138,6 @@ static inline bool is_client(void)
return dst_addr != NULL;
}

static int
wq_init(struct work_queue *wq, void (*work_handler)(struct node *))
{
int ret;

wq->head = NULL;
wq->tail = NULL;
wq->work_handler = work_handler;

ret = pthread_mutex_init(&wq->lock, NULL);
if (ret) {
perror("pthread_mutex_init");
return ret;
}

ret = pthread_cond_init(&wq->cond, NULL);
if (ret) {
perror("pthread_cond_init");
return ret;
}

ret = pthread_create(&wq->thread, NULL, wq_handler, wq);
if (ret) {
perror("pthread_create");
return ret;
}

return 0;
}

static void wq_cleanup(struct work_queue *wq)
{
pthread_join(wq->thread, NULL);
pthread_cond_destroy(&wq->cond);
pthread_mutex_destroy(&wq->lock);
pthread_join(wq->thread, NULL);
}

static void wq_insert(struct work_queue *wq, struct node *n)
{
bool empty;

n->next = NULL;
pthread_mutex_lock(&wq->lock);
if (wq->head) {
wq->tail->next = n;
empty = false;
} else {
wq->head = n;
empty = true;
}
wq->tail = n;
pthread_mutex_unlock(&wq->lock);

if (empty)
pthread_cond_signal(&wq->cond);
}

static struct node *wq_remove(struct work_queue *wq)
{
struct node *n;

n = wq->head;
wq->head = wq->head->next;
n->next = NULL;
return n;
}

static void show_perf(int iter)
{
uint32_t diff, max[STEP_CNT], min[STEP_CNT], sum[STEP_CNT];
Expand Down Expand Up @@ -423,8 +345,10 @@ static void disc_handler(struct node *n)
completed[STEP_DISCONNECT]++;
}

static void req_work_handler(struct node *n)
static void req_handler(struct work_item *item)
{
struct node *n = container_of(item, struct node, work);

struct rdma_conn_param conn_param;
int ret;

Expand Down Expand Up @@ -463,8 +387,10 @@ static void req_work_handler(struct node *n)
return;
}

static void disc_work_handler(struct node *n)
static void server_disconnect(struct work_item *item)
{
struct node *n = container_of(item, struct node, work);

start_perf(n, STEP_DISCONNECT);
rdma_disconnect(n->id);
end_perf(n, STEP_DISCONNECT);
Expand All @@ -474,25 +400,6 @@ static void disc_work_handler(struct node *n)
completed[STEP_DISCONNECT]++;
}

static void *wq_handler(void *arg)
{
struct work_queue *wq = arg;
struct node *n;
int i;

for (i = 0; i < connections; i++) {
pthread_mutex_lock(&wq->lock);
if (!wq->head)
pthread_cond_wait(&wq->cond, &wq->lock);
n = wq_remove(wq);
pthread_mutex_unlock(&wq->lock);

wq->work_handler(n);
}

return NULL;
}

static void cma_handler(struct rdma_cm_id *id, struct rdma_cm_event *event)
{
struct node *n = id->context;
Expand All @@ -512,7 +419,7 @@ static void cma_handler(struct rdma_cm_id *id, struct rdma_cm_event *event)
n = &nodes[node_index++];
n->id = id;
id->context = n;
wq_insert(&req_wq, n);
wq_insert(&wq, &n->work, req_handler);
break;
case RDMA_CM_EVENT_CONNECT_RESPONSE:
conn_handler(n);
Expand Down Expand Up @@ -557,7 +464,7 @@ static void cma_handler(struct rdma_cm_id *id, struct rdma_cm_event *event)
start_time(STEP_DISCONNECT);
}
disc_events++;
wq_insert(&disc_wq, n);
wq_insert(&wq, &n->work, server_disconnect);
}
break;
case RDMA_CM_EVENT_DEVICE_REMOVAL:
Expand Down Expand Up @@ -687,16 +594,7 @@ static void reset_test(int iter)

static int server_connect(int iter)
{
int ret;

reset_test(iter);
ret = wq_init(&req_wq, req_work_handler);
if (ret)
return ret;

ret = wq_init(&disc_wq, disc_work_handler);
if (ret)
return ret;

while (completed[STEP_CONNECT] != iter)
sched_yield();
Expand All @@ -708,12 +606,9 @@ static int server_connect(int iter)

oob_recvsend(oob_sock, STEP_DISCONNECT);

/* Wait for event threads to exit before destroying resources */
wq_cleanup(&req_wq);
wq_cleanup(&disc_wq);
destroy_qps(iter);
destroy_ids(iter);
return ret;
return 0;
}

static int client_connect(int iter)
Expand Down Expand Up @@ -995,12 +890,18 @@ int main(int argc, char **argv)
goto destchan;
}

ret = wq_init(&wq, num_threads);
if (ret)
goto free;

if (is_client()) {
ret = run_client(iter);
} else {
ret = run_server(iter);
}

wq_cleanup(&wq);
free:
free(nodes);
destchan:
rdma_destroy_event_channel(channel);
Expand Down
113 changes: 113 additions & 0 deletions librdmacm/examples/common.c
Expand Up @@ -36,6 +36,7 @@
#include <string.h>
#include <stdio.h>
#include <errno.h>
#include <stdbool.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
Expand Down Expand Up @@ -294,3 +295,115 @@ int oob_recvsend(int sock, char val)

return 0;
}

static void *wq_handler(void *arg);

int wq_init(struct work_queue *wq, int thread_cnt)
{
int ret, i;

wq->head = NULL;
wq->tail = NULL;

ret = pthread_mutex_init(&wq->lock, NULL);
if (ret) {
perror("pthread_mutex_init");
return ret;
}

ret = pthread_cond_init(&wq->cond, NULL);
if (ret) {
perror("pthread_cond_init");
return ret;
}

wq->thread_cnt = thread_cnt;
wq->thread = calloc(thread_cnt, sizeof(*wq->thread));
if (!wq->thread)
return -ENOMEM;

wq->running = true;
for (i = 0; i < thread_cnt; i++) {
ret = pthread_create(&wq->thread[i], NULL, wq_handler, wq);
if (ret) {
perror("pthread_create");
return ret;
}
}

return 0;
}

void wq_cleanup(struct work_queue *wq)
{
int i;

pthread_mutex_lock(&wq->lock);
wq->running = false;
pthread_cond_broadcast(&wq->cond);
pthread_mutex_unlock(&wq->lock);

for (i = 0; i < wq->thread_cnt; i++)
pthread_join(wq->thread[i], NULL);
pthread_cond_destroy(&wq->cond);
pthread_mutex_destroy(&wq->lock);
}

void wq_insert(struct work_queue *wq, struct work_item *item,
void (*work_handler)(struct work_item *item))
{
bool empty;

item->next = NULL;
item->work_handler = work_handler;
pthread_mutex_lock(&wq->lock);
if (wq->head) {
wq->tail->next = item;
empty = false;
} else {
wq->head = item;
empty = true;
}
wq->tail = item;
pthread_mutex_unlock(&wq->lock);

if (empty)
pthread_cond_signal(&wq->cond);
}

struct work_item *wq_remove(struct work_queue *wq)
{
struct work_item *item;

item = wq->head;
wq->head = wq->head->next;
item->next = NULL;
return item;
}

static void *wq_handler(void *arg)
{
struct work_queue *wq = arg;
struct work_item *item;

pthread_mutex_lock(&wq->lock);
while (wq->running) {
while (!wq->head) {
pthread_cond_wait(&wq->cond, &wq->lock);
if (!wq->running)
goto out;
}

item = wq_remove(wq);
pthread_mutex_unlock(&wq->lock);

item->work_handler(item);
pthread_mutex_lock(&wq->lock);
}

out:
if (wq->head)
pthread_cond_signal(&wq->cond);
pthread_mutex_unlock(&wq->lock);
return NULL;
}

0 comments on commit a7277c0

Please sign in to comment.