Skip to content

Commit

Permalink
lib: monkey: thread-local-storage fixes (fluent#4101)
Browse files Browse the repository at this point in the history
Signed-off-by: Eduardo Silva <[email protected]>
  • Loading branch information
edsiper authored Sep 18, 2021
1 parent 48d305c commit 4e0c4de
Show file tree
Hide file tree
Showing 14 changed files with 183 additions and 112 deletions.
20 changes: 11 additions & 9 deletions lib/monkey/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -176,15 +176,17 @@ if(MK_LINUX_TRACE)
endif()

# Use old Pthread TLS
check_c_source_compiles("
__thread int a;
int main() {
__tls_get_addr(0);
return 0;
}" HAVE_C_TLS)

if(HAVE_C_TLS)
MK_DEFINITION(MK_HAVE_C_TLS)
if(NOT MK_PTHREAD_TLS)
check_c_source_compiles("
__thread int a;
int main() {
__tls_get_addr(0);
return 0;
}" HAVE_C_TLS)

if(HAVE_C_TLS)
MK_DEFINITION(MK_HAVE_C_TLS)
endif()
endif()

# Valgrind support
Expand Down
13 changes: 13 additions & 0 deletions lib/monkey/include/monkey/mk_config.h
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,19 @@ struct mk_server
/* FIXME: temporal map of Network Layer plugin */
struct mk_plugin_network *network;

/* Thread initializator helpers (sched_launch_thread) */
int pth_init;
pthread_cond_t pth_cond;
pthread_mutex_t pth_mutex;

/* Used for vhost initialization synchronization */
pthread_mutex_t vhost_fdt_mutex;

/* worker_id as used by mk_sched_register_thread, it was moved here
* because it has to be local to each mk_server instance.
*/
int worker_id;

/* Direct map to Stage plugins */
struct mk_list stage10_handler;
struct mk_list stage20_handler;
Expand Down
5 changes: 5 additions & 0 deletions lib/monkey/include/monkey/mk_http_thread.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,16 @@ struct mk_http_thread {
struct mk_list _head; /* Link to worker->threads */
};

extern MK_TLS_DEFINE(struct mk_http_libco_params, mk_http_thread_libco_params);
extern MK_TLS_DEFINE(struct mk_thread, mk_thread);

static MK_INLINE void mk_http_thread_resume(struct mk_http_thread *mth)
{
mk_thread_resume(mth->parent);
}

void mk_http_thread_initialize_tls();

struct mk_http_thread *mk_http_thread_create(int type,
struct mk_vhost_handler *handler,
struct mk_http_session *session,
Expand Down
11 changes: 4 additions & 7 deletions lib/monkey/include/monkey/mk_thread_libco.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@
#include <valgrind/valgrind.h>
#endif

#include <monkey/mk_tls.h>

struct mk_thread {

#ifdef MK_HAVE_VALGRIND
Expand All @@ -52,12 +54,7 @@ struct mk_thread {
#define MK_THREAD_STACK_SIZE ((3 * PTHREAD_STACK_MIN) / 2)
#define MK_THREAD_DATA(th) (((char *) th) + sizeof(struct mk_thread))

extern MK_EXPORT pthread_key_t mk_thread_key;

static MK_INLINE void mk_thread_prepare()
{
pthread_key_create(&mk_thread_key, NULL);
}
extern MK_EXPORT MK_TLS_DEFINE(struct mk_thread, mk_thread);

static MK_INLINE void mk_thread_yield(struct mk_thread *th)
{
Expand All @@ -84,7 +81,7 @@ static MK_INLINE void mk_thread_destroy(struct mk_thread *th)

static MK_INLINE void mk_thread_resume(struct mk_thread *th)
{
pthread_setspecific(mk_thread_key, (void *) th);
MK_TLS_SET(mk_thread, th);

/*
* In the past we used to have a flag to mark when a coroutine
Expand Down
12 changes: 4 additions & 8 deletions lib/monkey/include/monkey/mk_thread_ucontext.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
#include <valgrind/valgrind.h>

#include <monkey/mk_core.h>
#include <monkey/mk_tls.h>

struct mk_thread {
int id;
Expand All @@ -49,16 +50,11 @@ struct mk_thread {
#define MK_THREAD_DATA(th) ((char *) MK_THREAD_STACK_END(th))
#define MK_THREAD_SIZE() (sizeof(struct mk_thread) + MK_THREAD_STACK_SIZE)

MK_EXPORT pthread_key_t mk_thread_key;

static MK_INLINE void mk_thread_prepare()
{
pthread_key_create(&mk_thread_key, NULL);
}
extern MK_EXPORT MK_TLS_DEFINE(struct mk_thread, mk_thread);

static MK_INLINE void *mk_thread_get()
{
return pthread_getspecific(mk_thread_key);
return MK_TLS_GET(mk_thread);
}

static MK_INLINE void mk_thread_yield(struct mk_thread *th, int ended)
Expand All @@ -81,7 +77,7 @@ static MK_INLINE void mk_thread_destroy(struct mk_thread *th)

static MK_INLINE void mk_thread_resume(struct mk_thread *th)
{
pthread_setspecific(mk_thread_key, (void *) th);
MK_TLS_SET(mk_thread, th);

/*
* In the past we used to have a flag to mark when a coroutine
Expand Down
24 changes: 18 additions & 6 deletions lib/monkey/include/monkey/mk_tls.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,12 @@

#include <monkey/mk_info.h>

#define MK_INIT_INITIALIZE_TLS_UNIVERSAL() \
/* mk_utils.c */ \
pthread_key_create(&mk_utils_error_key, NULL); \
/* mk_lib.c */ \
pthread_key_create(&mk_server_fifo_key, NULL);

#ifdef MK_HAVE_C_TLS /* Use Compiler Thread Local Storage (TLS) */

/* mk_cache.c */
Expand All @@ -45,9 +51,12 @@ extern __thread struct mk_list *mk_tls_server_listen;
extern __thread struct mk_server_timeout *mk_tls_server_timeout;

/* TLS helper macros */
#define MK_TLS_SET(key, val) key=val
#define MK_TLS_GET(key) key
#define MK_TLS_INIT() do {} while (0)
#define MK_TLS_SET(key, val) key=val
#define MK_TLS_GET(key) key
#define MK_TLS_INIT(key) do {} while (0)
#define MK_TLS_DEFINE(type, name) __thread type *name;

#define MK_INIT_INITIALIZE_TLS() do {} while (0)

#else /* Use Posix Thread Keys */

Expand All @@ -71,9 +80,12 @@ pthread_key_t mk_tls_sched_worker_node;
pthread_key_t mk_tls_server_listen;
pthread_key_t mk_tls_server_timeout;

#define MK_TLS_SET(key, val) pthread_setspecific(key, (void *) val)
#define MK_TLS_GET(key) pthread_getspecific(key)
#define MK_TLS_INIT() \
#define MK_TLS_SET(key, val) pthread_setspecific(key, (void *) val)
#define MK_TLS_GET(key) pthread_getspecific(key)
#define MK_TLS_INIT(key) pthread_key_create(&key, NULL)
#define MK_TLS_DEFINE(type, name) pthread_key_t name;

#define MK_INIT_INITIALIZE_TLS() \
/* mk_cache.c */ \
pthread_key_create(&mk_tls_cache_iov_header, NULL); \
pthread_key_create(&mk_tls_cache_header_cl, NULL); \
Expand Down
4 changes: 2 additions & 2 deletions lib/monkey/mk_core/mk_utils.c
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@
#include <mk_core/mk_utils.h>

pthread_mutex_t mutex_trace;
pthread_key_t mk_utils_error_key;
pthread_key_t mk_utils_error_key; /* Initialized by MK_INIT_INITIALIZE_TLS_UNIVERSAL */

#ifdef MK_HAVE_TRACE
#ifdef _WIN32
Expand Down Expand Up @@ -388,6 +388,6 @@ int mk_core_init()
mk_core_init_time = time(NULL);
env_trace_filter = getenv("MK_TRACE_FILTER");
#endif
pthread_key_create(&mk_utils_error_key, NULL);

return 0;
}
11 changes: 9 additions & 2 deletions lib/monkey/mk_server/mk_fifo.c
Original file line number Diff line number Diff line change
Expand Up @@ -116,8 +116,15 @@ struct mk_fifo *mk_fifo_create(pthread_key_t *key, void *data)


/* Pthread specifics */
ctx->key = key;
pthread_key_create(ctx->key, NULL);

/* We need to isolate this because there is a key that's shared between monkey
* instances by design.
*/
if (key != NULL) {
ctx->key = key;
pthread_key_create(ctx->key, NULL);
}

pthread_mutex_init(&ctx->mutex_init, NULL);

return ctx;
Expand Down
81 changes: 64 additions & 17 deletions lib/monkey/mk_server/mk_http_thread.c
Original file line number Diff line number Diff line change
Expand Up @@ -42,20 +42,63 @@ struct mk_http_libco_params {
struct mk_thread *th;
};

struct mk_http_libco_params libco_param;
pthread_once_t mk_http_thread_initialize_tls_once_flag = PTHREAD_ONCE_INIT;

pthread_key_t mk_thread_key;
MK_TLS_DEFINE(struct mk_http_libco_params, mk_http_thread_libco_params);
MK_TLS_DEFINE(struct mk_thread, mk_thread);

/* This function could return NULL if the process runs out of memory, in that
* case failure is imminent.
*/
static inline struct mk_http_libco_params *thread_get_libco_params()
{
struct mk_http_libco_params *libco_params;

libco_params = MK_TLS_GET(mk_http_thread_libco_params);

if (libco_params == NULL) {
libco_params = mk_mem_alloc_z(sizeof(struct mk_http_libco_params));

if (libco_params == NULL) {
mk_err("libco thread params could not be allocated.");
}

MK_TLS_SET(mk_http_thread_libco_params, libco_params);
}

return libco_params;
}

static void mk_http_thread_initialize_tls_once()
{
MK_TLS_INIT(mk_http_thread_libco_params);
MK_TLS_INIT(mk_thread);
}

void mk_http_thread_initialize_tls()
{
pthread_once(&mk_http_thread_initialize_tls_once_flag,
mk_http_thread_initialize_tls_once);
}

static inline void thread_cb_init_vars()
{
int close;
int type = libco_param.type;
struct mk_vhost_handler *handler = libco_param.handler;
struct mk_http_session *session = libco_param.session;
struct mk_http_request *request = libco_param.request;
struct mk_thread *th = libco_param.th;
struct mk_http_thread *mth;
//struct mk_plugin *plugin;
struct mk_http_libco_params *libco_params;
struct mk_vhost_handler *handler;
struct mk_http_session *session;
struct mk_http_request *request;
int close;
int type;
struct mk_http_thread *mth;
struct mk_thread *th;

libco_params = thread_get_libco_params();

type = libco_params->type;
handler = libco_params->handler;
session = libco_params->session;
request = libco_params->request;
th = libco_params->th;

/*
* Until this point the th->callee already set the variables, so we
Expand Down Expand Up @@ -120,14 +163,18 @@ static inline void thread_params_set(struct mk_thread *th,
int n_params,
struct mk_list *params)
{
struct mk_http_libco_params *libco_params;

libco_params = thread_get_libco_params();

/* Callback parameters in order */
libco_param.type = type;
libco_param.handler = handler;
libco_param.session = session;
libco_param.request = request;
libco_param.n_params = n_params;
libco_param.params = params;
libco_param.th = th;
libco_params->type = type;
libco_params->handler = handler;
libco_params->session = session;
libco_params->request = request;
libco_params->n_params = n_params;
libco_params->params = params;
libco_params->th = th;

co_switch(th->callee);
}
Expand Down
8 changes: 5 additions & 3 deletions lib/monkey/mk_server/mk_lib.c
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
#include <monkey/mk_scheduler.h>
#include <monkey/mk_fifo.h>
#include <monkey/mk_utils.h>
#include <monkey/mk_tls.h>

#define config_eq(a, b) strcasecmp(a, b)

Expand All @@ -51,7 +52,7 @@ mk_ctx_t *mk_create()
{
mk_ctx_t *ctx;

ctx = mk_mem_alloc(sizeof(mk_ctx_t));
ctx = mk_mem_alloc_z(sizeof(mk_ctx_t));
if (!ctx) {
return NULL;
}
Expand All @@ -66,7 +67,8 @@ mk_ctx_t *mk_create()
* for further communication between the caller (user) and HTTP end-point
* callbacks.
*/
ctx->fifo = mk_fifo_create(&mk_server_fifo_key, ctx->server);
ctx->fifo = mk_fifo_create(NULL, ctx->server);
ctx->fifo->key = &mk_server_fifo_key;

/*
* FIFO: Set workers callback associated to the Monkey scheduler to prepare them
Expand Down Expand Up @@ -96,7 +98,7 @@ static inline int mk_lib_yield(mk_request_t *req)
return -1;
}

th = pthread_getspecific(mk_thread_key);
th = MK_TLS_GET(mk_thread);
channel = req->session->channel;

channel->thread = th;
Expand Down
3 changes: 2 additions & 1 deletion lib/monkey/mk_server/mk_net.c
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include <monkey/mk_scheduler.h>
#include <monkey/mk_plugin.h>
#include <monkey/mk_thread.h>
#include <monkey/mk_tls.h>

#ifdef _WIN32
#include <winsock2.h>
Expand Down Expand Up @@ -181,7 +182,7 @@ int mk_net_conn_write(struct mk_channel *channel,
size_t total = 0;
size_t send;
socklen_t slen = sizeof(error);
struct mk_thread *th = pthread_getspecific(mk_thread_key);
struct mk_thread *th = MK_TLS_GET(mk_thread);
struct mk_sched_worker *sched;

sched = mk_sched_get_thread_conf();
Expand Down
Loading

0 comments on commit 4e0c4de

Please sign in to comment.