Skip to content

Commit

Permalink
librdmacm/cmtime: Redo work_list/queue abstraction
Browse files Browse the repository at this point in the history
Simplify and generalize the work queue abstraction.  Add helper to
initialize a work_queue.  Add thread tracking to the work queue, with
a common work item callback handler.  These changes merge most of the
CM request and disconnect event handling into a common work queue
abstraction.

Further simplify the work queue by replacing the double-linked list
with a single-linked list implementation to reduce overhead.

Signed-off-by: Sean Hefty <[email protected]>
  • Loading branch information
Sean Hefty committed Apr 9, 2024
1 parent 2a46ec8 commit a73f7df
Showing 1 changed file with 101 additions and 109 deletions.
210 changes: 101 additions & 109 deletions librdmacm/examples/cmtime.c
Original file line number Diff line number Diff line change
Expand Up @@ -84,22 +84,25 @@ struct node {
int retries;
};

struct list_head {
struct list_head *prev;
struct list_head *next;
struct rdma_cm_id *id;
struct work_item {
struct work_item *next;
struct rdma_cm_id *id;
};

struct work_list {
pthread_mutex_t lock;
pthread_cond_t cond;
struct list_head list;
struct work_queue {
pthread_mutex_t lock;
pthread_cond_t cond;
pthread_t thread;

void (*work_handler)(struct work_item *item);
struct work_item *head;
struct work_item *tail;
};

#define INIT_LIST(x) ((x)->prev = (x)->next = (x))
static void *wq_handler(void *arg);
static struct work_queue req_wq;
static struct work_queue disc_wq;

static struct work_list req_work;
static struct work_list disc_work;
static struct node *nodes;
static struct timeval times[STEP_CNT][2];
static int connections = 100;
Expand All @@ -120,41 +123,70 @@ static inline bool is_client(void)
return dst_addr != NULL;
}

static inline void __list_delete(struct list_head *list)
static int
wq_init(struct work_queue *wq, void (*work_handler)(struct work_item *))
{
struct list_head *prev, *next;
prev = list->prev;
next = list->next;
prev->next = next;
next->prev = prev;
INIT_LIST(list);
int ret;

wq->head = NULL;
wq->tail = NULL;
wq->work_handler = work_handler;

ret = pthread_mutex_init(&wq->lock, NULL);
if (ret) {
perror("pthread_mutex_init");
return ret;
}

ret = pthread_cond_init(&wq->cond, NULL);
if (ret) {
perror("pthread_cond_init");
return ret;
}

ret = pthread_create(&wq->thread, NULL, wq_handler, wq);
if (ret) {
perror("pthread_create");
return ret;
}

return 0;
}

static inline int __list_empty(struct work_list *list)
static void wq_cleanup(struct work_queue *wq)
{
return list->list.next == &list->list;
pthread_join(wq->thread, NULL);
pthread_cond_destroy(&wq->cond);
pthread_mutex_destroy(&wq->lock);
}

static inline struct list_head *__list_remove_head(struct work_list *work_list)
static void wq_insert(struct work_queue *wq, struct work_item *item)
{
struct list_head *list_item;
bool empty;

item->next = NULL;
pthread_mutex_lock(&wq->lock);
if (wq->head) {
wq->tail->next = item;
empty = false;
} else {
wq->head = item;
empty = true;
}
wq->tail = item;
pthread_mutex_unlock(&wq->lock);

list_item = work_list->list.next;
__list_delete(list_item);
return list_item;
if (empty)
pthread_cond_signal(&wq->cond);
}

static inline void list_add_tail(struct work_list *work_list, struct list_head *req)
static struct work_item *wq_remove(struct work_queue *wq)
{
int empty;
pthread_mutex_lock(&work_list->lock);
empty = __list_empty(work_list);
req->prev = work_list->list.prev;
req->next = &work_list->list;
req->prev->next = work_list->list.prev = req;
pthread_mutex_unlock(&work_list->lock);
if (empty)
pthread_cond_signal(&work_list->cond);
struct work_item *item;

item = wq->head;
wq->head = wq->head->next;
return item;
}

static int zero_time(struct timeval *t)
Expand Down Expand Up @@ -222,10 +254,12 @@ static void disc_handler(struct node *n)
completed[STEP_DISCONNECT]++;
}

static void __req_handler(struct rdma_cm_id *id)
static void req_work_handler(struct work_item *item)
{
struct rdma_cm_id *id;
int ret;

id = item->id;
ret = rdma_create_qp(id, NULL, &init_qp_attr);
if (ret) {
perror("failure creating qp");
Expand All @@ -248,48 +282,37 @@ static void __req_handler(struct rdma_cm_id *id)
return;
}

static void *req_handler_thread(void *arg)
static void disc_work_handler(struct work_item *item)
{
struct list_head *work;
int i;

for (i = 0; i < connections; i++) {
pthread_mutex_lock(&req_work.lock);
if (__list_empty(&req_work))
pthread_cond_wait(&req_work.cond, &req_work.lock);
work = __list_remove_head(&req_work);
pthread_mutex_unlock(&req_work.lock);
__req_handler(work->id);
free(work);
}

return NULL;
rdma_disconnect(item->id);
rdma_destroy_qp(item->id);
rdma_destroy_id(item->id);
}

static void *disc_handler_thread(void *arg)
static void *wq_handler(void *arg)
{
struct list_head *work;
struct work_queue *wq = arg;
struct work_item *item;
int i;

for (i = 0; i < connections; i++) {
pthread_mutex_lock(&disc_work.lock);
if (__list_empty(&disc_work))
pthread_cond_wait(&disc_work.cond, &disc_work.lock);
work = __list_remove_head(&disc_work);
pthread_mutex_unlock(&disc_work.lock);
rdma_disconnect(work->id);
rdma_destroy_qp(work->id);
rdma_destroy_id(work->id);
free(work);
};
pthread_mutex_lock(&wq->lock);
if (!wq->head)
pthread_cond_wait(&wq->cond, &wq->lock);
item = wq_remove(wq);
pthread_mutex_unlock(&wq->lock);

wq->work_handler(item);
free(item);
}

return NULL;
}

static void cma_handler(struct rdma_cm_id *id, struct rdma_cm_event *event)
{
struct node *n = id->context;
struct list_head *request;
struct work_item *item;

switch (event->event) {
case RDMA_CM_EVENT_ADDR_RESOLVED:
Expand All @@ -299,15 +322,14 @@ static void cma_handler(struct rdma_cm_id *id, struct rdma_cm_event *event)
route_handler(n);
break;
case RDMA_CM_EVENT_CONNECT_REQUEST:
request = malloc(sizeof *request);
if (!request) {
item = malloc(sizeof *item);
if (!item) {
perror("out of memory accepting connect request");
rdma_reject(id, NULL, 0);
rdma_destroy_id(id);
} else {
INIT_LIST(request);
request->id = id;
list_add_tail(&req_work, request);
item->id = id;
wq_insert(&req_wq, item);
}
break;
case RDMA_CM_EVENT_ESTABLISHED:
Expand Down Expand Up @@ -344,16 +366,15 @@ static void cma_handler(struct rdma_cm_id *id, struct rdma_cm_event *event)
case RDMA_CM_EVENT_DISCONNECTED:
disc_events++;
if (!n) {
request = malloc(sizeof *request);
if (!request) {
item = malloc(sizeof *item);
if (!item) {
perror("out of memory queueing disconnect request, handling synchronously");
rdma_disconnect(id);
rdma_destroy_qp(id);
rdma_destroy_id(id);
} else {
INIT_LIST(request);
request->id = id;
list_add_tail(&disc_work, request);
item->id = id;
wq_insert(&disc_wq, item);
}
} else {
disc_handler(n);
Expand Down Expand Up @@ -426,47 +447,16 @@ static void *process_events(void *arg)

static int run_server(void)
{
pthread_t req_thread, disc_thread;
struct rdma_cm_id *listen_id;
int ret;

INIT_LIST(&req_work.list);
INIT_LIST(&disc_work.list);
ret = pthread_mutex_init(&req_work.lock, NULL);
if (ret) {
perror("initializing mutex for req work");
return ret;
}

ret = pthread_mutex_init(&disc_work.lock, NULL);
if (ret) {
perror("initializing mutex for disc work");
return ret;
}

ret = pthread_cond_init(&req_work.cond, NULL);
if (ret) {
perror("initializing cond for req work");
return ret;
}

ret = pthread_cond_init(&disc_work.cond, NULL);
if (ret) {
perror("initializing cond for disc work");
return ret;
}

ret = pthread_create(&req_thread, NULL, req_handler_thread, NULL);
if (ret) {
perror("failed to create req handler thread");
ret = wq_init(&req_wq, req_work_handler);
if (ret)
return ret;
}

ret = pthread_create(&disc_thread, NULL, disc_handler_thread, NULL);
if (ret) {
perror("failed to create disconnect handler thread");
ret = wq_init(&disc_wq, disc_work_handler);
if (ret)
return ret;
}

ret = rdma_create_id(channel, &listen_id, NULL, hints.ai_port_space);
if (ret) {
Expand All @@ -489,6 +479,8 @@ static int run_server(void)
process_events(NULL);
out:
rdma_destroy_id(listen_id);
wq_cleanup(&req_wq);
wq_cleanup(&disc_wq);
return ret;
}

Expand Down

0 comments on commit a73f7df

Please sign in to comment.