From a7277c02e11a5169806cf8a9620621c90b281a4c Mon Sep 17 00:00:00 2001 From: Sean Hefty Date: Tue, 16 Apr 2024 21:50:52 -0700 Subject: [PATCH] librdmacm/cmtime: Rework work queue abstraction Isolate the work queue abstraction from the cmtime. Define a work_item structure to allow individual work items to invoke separate callbacks. Add support for multiple threads. Move the code from the cmtime source into the common source for future reusability. This change will enable cmtime to use a single work queue to process different steps in the connection process. Having a single work queue will limit the number of threads that the test will need to spawn. Signed-off-by: Sean Hefty --- librdmacm/examples/CMakeLists.txt | 3 +- librdmacm/examples/cmtime.c | 137 +++++------------------------- librdmacm/examples/common.c | 113 ++++++++++++++++++++++++ librdmacm/examples/common.h | 26 ++++++ 4 files changed, 160 insertions(+), 119 deletions(-) diff --git a/librdmacm/examples/CMakeLists.txt b/librdmacm/examples/CMakeLists.txt index 46347b6ea..ed119466f 100644 --- a/librdmacm/examples/CMakeLists.txt +++ b/librdmacm/examples/CMakeLists.txt @@ -2,9 +2,10 @@ add_library(rdmacm_tools STATIC common.c ) +target_link_libraries(rdmacm_tools LINK_PRIVATE ${CMAKE_THREAD_LIBS_INIT}) rdma_executable(cmtime cmtime.c) -target_link_libraries(cmtime LINK_PRIVATE rdmacm ${CMAKE_THREAD_LIBS_INIT} rdmacm_tools) +target_link_libraries(cmtime LINK_PRIVATE rdmacm rdmacm_tools) rdma_executable(mckey mckey.c) target_link_libraries(mckey LINK_PRIVATE rdmacm ${CMAKE_THREAD_LIBS_INIT} rdmacm_tools) diff --git a/librdmacm/examples/cmtime.c b/librdmacm/examples/cmtime.c index 0ccb6820c..a24e3db24 100644 --- a/librdmacm/examples/cmtime.c +++ b/librdmacm/examples/cmtime.c @@ -43,6 +43,7 @@ #include #include #include +#include #include #include "common.h" @@ -103,7 +104,7 @@ static const char *step_str[] = { }; struct node { - struct node *next; + struct work_item work; struct rdma_cm_id *id; struct ibv_qp *qp; @@ -112,24 +113,13 @@ struct node { int retries; }; -struct work_queue { - pthread_mutex_t lock; - pthread_cond_t cond; - pthread_t thread; - - void (*work_handler)(struct node *node); - struct node *head; - struct node *tail; -}; - -static void *wq_handler(void *arg); -static struct work_queue req_wq; -static struct work_queue disc_wq; +static struct work_queue wq; static struct node *nodes; static int node_index; static uint64_t times[STEP_CNT][2]; static int connections; +static int num_threads = 1; static volatile int disc_events; static volatile int completed[STEP_CNT]; @@ -148,74 +138,6 @@ static inline bool is_client(void) return dst_addr != NULL; } -static int -wq_init(struct work_queue *wq, void (*work_handler)(struct node *)) -{ - int ret; - - wq->head = NULL; - wq->tail = NULL; - wq->work_handler = work_handler; - - ret = pthread_mutex_init(&wq->lock, NULL); - if (ret) { - perror("pthread_mutex_init"); - return ret; - } - - ret = pthread_cond_init(&wq->cond, NULL); - if (ret) { - perror("pthread_cond_init"); - return ret; - } - - ret = pthread_create(&wq->thread, NULL, wq_handler, wq); - if (ret) { - perror("pthread_create"); - return ret; - } - - return 0; -} - -static void wq_cleanup(struct work_queue *wq) -{ - pthread_join(wq->thread, NULL); - pthread_cond_destroy(&wq->cond); - pthread_mutex_destroy(&wq->lock); - pthread_join(wq->thread, NULL); -} - -static void wq_insert(struct work_queue *wq, struct node *n) -{ - bool empty; - - n->next = NULL; - pthread_mutex_lock(&wq->lock); - if (wq->head) { - wq->tail->next = n; - empty = false; - } else { - wq->head = n; - empty = true; - } - wq->tail = n; - pthread_mutex_unlock(&wq->lock); - - if (empty) - pthread_cond_signal(&wq->cond); -} - -static struct node *wq_remove(struct work_queue *wq) -{ - struct node *n; - - n = wq->head; - wq->head = wq->head->next; - n->next = NULL; - return n; -} - static void show_perf(int iter) { uint32_t diff, max[STEP_CNT], min[STEP_CNT], sum[STEP_CNT]; @@ -423,8 +345,10 @@ static void disc_handler(struct node *n) completed[STEP_DISCONNECT]++; } -static void req_work_handler(struct node *n) +static void req_handler(struct work_item *item) { + struct node *n = container_of(item, struct node, work); + struct rdma_conn_param conn_param; int ret; @@ -463,8 +387,10 @@ static void req_work_handler(struct node *n) return; } -static void disc_work_handler(struct node *n) +static void server_disconnect(struct work_item *item) { + struct node *n = container_of(item, struct node, work); + start_perf(n, STEP_DISCONNECT); rdma_disconnect(n->id); end_perf(n, STEP_DISCONNECT); @@ -474,25 +400,6 @@ static void disc_work_handler(struct node *n) completed[STEP_DISCONNECT]++; } -static void *wq_handler(void *arg) -{ - struct work_queue *wq = arg; - struct node *n; - int i; - - for (i = 0; i < connections; i++) { - pthread_mutex_lock(&wq->lock); - if (!wq->head) - pthread_cond_wait(&wq->cond, &wq->lock); - n = wq_remove(wq); - pthread_mutex_unlock(&wq->lock); - - wq->work_handler(n); - } - - return NULL; -} - static void cma_handler(struct rdma_cm_id *id, struct rdma_cm_event *event) { struct node *n = id->context; @@ -512,7 +419,7 @@ static void cma_handler(struct rdma_cm_id *id, struct rdma_cm_event *event) n = &nodes[node_index++]; n->id = id; id->context = n; - wq_insert(&req_wq, n); + wq_insert(&wq, &n->work, req_handler); break; case RDMA_CM_EVENT_CONNECT_RESPONSE: conn_handler(n); @@ -557,7 +464,7 @@ static void cma_handler(struct rdma_cm_id *id, struct rdma_cm_event *event) start_time(STEP_DISCONNECT); } disc_events++; - wq_insert(&disc_wq, n); + wq_insert(&wq, &n->work, server_disconnect); } break; case RDMA_CM_EVENT_DEVICE_REMOVAL: @@ -687,16 +594,7 @@ static void reset_test(int iter) static int server_connect(int iter) { - int ret; - reset_test(iter); - ret = wq_init(&req_wq, req_work_handler); - if (ret) - return ret; - - ret = wq_init(&disc_wq, disc_work_handler); - if (ret) - return ret; while (completed[STEP_CONNECT] != iter) sched_yield(); @@ -708,12 +606,9 @@ static int server_connect(int iter) oob_recvsend(oob_sock, STEP_DISCONNECT); - /* Wait for event threads to exit before destroying resources */ - wq_cleanup(&req_wq); - wq_cleanup(&disc_wq); destroy_qps(iter); destroy_ids(iter); - return ret; + return 0; } static int client_connect(int iter) @@ -995,12 +890,18 @@ int main(int argc, char **argv) goto destchan; } + ret = wq_init(&wq, num_threads); + if (ret) + goto free; + if (is_client()) { ret = run_client(iter); } else { ret = run_server(iter); } + wq_cleanup(&wq); +free: free(nodes); destchan: rdma_destroy_event_channel(channel); diff --git a/librdmacm/examples/common.c b/librdmacm/examples/common.c index 765a58027..c81728f2d 100644 --- a/librdmacm/examples/common.c +++ b/librdmacm/examples/common.c @@ -36,6 +36,7 @@ #include #include #include +#include #include #include #include @@ -294,3 +295,115 @@ int oob_recvsend(int sock, char val) return 0; } + +static void *wq_handler(void *arg); + +int wq_init(struct work_queue *wq, int thread_cnt) +{ + int ret, i; + + wq->head = NULL; + wq->tail = NULL; + + ret = pthread_mutex_init(&wq->lock, NULL); + if (ret) { + perror("pthread_mutex_init"); + return ret; + } + + ret = pthread_cond_init(&wq->cond, NULL); + if (ret) { + perror("pthread_cond_init"); + return ret; + } + + wq->thread_cnt = thread_cnt; + wq->thread = calloc(thread_cnt, sizeof(*wq->thread)); + if (!wq->thread) + return -ENOMEM; + + wq->running = true; + for (i = 0; i < thread_cnt; i++) { + ret = pthread_create(&wq->thread[i], NULL, wq_handler, wq); + if (ret) { + perror("pthread_create"); + return ret; + } + } + + return 0; +} + +void wq_cleanup(struct work_queue *wq) +{ + int i; + + pthread_mutex_lock(&wq->lock); + wq->running = false; + pthread_cond_broadcast(&wq->cond); + pthread_mutex_unlock(&wq->lock); + + for (i = 0; i < wq->thread_cnt; i++) + pthread_join(wq->thread[i], NULL); + pthread_cond_destroy(&wq->cond); + pthread_mutex_destroy(&wq->lock); +} + +void wq_insert(struct work_queue *wq, struct work_item *item, + void (*work_handler)(struct work_item *item)) +{ + bool empty; + + item->next = NULL; + item->work_handler = work_handler; + pthread_mutex_lock(&wq->lock); + if (wq->head) { + wq->tail->next = item; + empty = false; + } else { + wq->head = item; + empty = true; + } + wq->tail = item; + pthread_mutex_unlock(&wq->lock); + + if (empty) + pthread_cond_signal(&wq->cond); +} + +struct work_item *wq_remove(struct work_queue *wq) +{ + struct work_item *item; + + item = wq->head; + wq->head = wq->head->next; + item->next = NULL; + return item; +} + +static void *wq_handler(void *arg) +{ + struct work_queue *wq = arg; + struct work_item *item; + + pthread_mutex_lock(&wq->lock); + while (wq->running) { + while (!wq->head) { + pthread_cond_wait(&wq->cond, &wq->lock); + if (!wq->running) + goto out; + } + + item = wq_remove(wq); + pthread_mutex_unlock(&wq->lock); + + item->work_handler(item); + pthread_mutex_lock(&wq->lock); + } + +out: + if (wq->head) + pthread_cond_signal(&wq->cond); + pthread_mutex_unlock(&wq->lock); + return NULL; +} diff --git a/librdmacm/examples/common.h b/librdmacm/examples/common.h index 5726f52f2..0fa9b735b 100644 --- a/librdmacm/examples/common.h +++ b/librdmacm/examples/common.h @@ -37,6 +37,8 @@ #include #include #include +#include +#include #include #include @@ -136,3 +138,27 @@ static inline int sleep_us(unsigned int time_us) spec.tv_nsec = time_us * 1000; return nanosleep(&spec, NULL); } + + +struct work_item { + struct work_item *next; + void (*work_handler)(struct work_item *item); +}; + +struct work_queue { + pthread_mutex_t lock; + pthread_cond_t cond; + + pthread_t *thread; + int thread_cnt; + bool running; + + struct work_item *head; + struct work_item *tail; +}; + +int wq_init(struct work_queue *wq, int thread_cnt); +void wq_cleanup(struct work_queue *wq); +void wq_insert(struct work_queue *wq, struct work_item *item, + void (*work_handler)(struct work_item *item)); +struct work_item *wq_remove(struct work_queue *wq);