Skip to content

Commit

Permalink
lib: monkey: sync scheduler and co-routines changes
Browse files Browse the repository at this point in the history
 ba7fb673 fuzz: cleanup unused vars
 465b8e96 server: scheduler: register mk_sched_threads_destroy_all()
 3e12eaf1 core: event: new MK_EVENT_UNMODIFIED type for persistency
 5fec9a83 server: scheduler: new helpers to cleanup threads
 0a920b72 server: lib: remove connnection if request has ended
 5a283ad9 server: http: remove pending threads before drop connection
 fffbe2fa server: http_thread: improve handling of context
 08df97d0 server: stream: new function to return remaining data size
 6ee8715b api: update test programs
 da5e772f fuzz: new fuzz test tools (honggfuzz)

Signed-off-by: Eduardo Silva <[email protected]>
  • Loading branch information
edsiper committed Apr 10, 2018
1 parent d95a9b9 commit aa43f91
Show file tree
Hide file tree
Showing 14 changed files with 129 additions and 25 deletions.
6 changes: 5 additions & 1 deletion lib/monkey/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ option(MK_PTHREAD_TLS "Use old Pthread TLS mode" No)
option(MK_SYSTEM_MALLOC "Use system memory allocator" No)
option(MK_MBEDTLS_SHARED "Use mbedtls shared lib" No)
option(MK_VALGRIND "Enable Valgrind support" No)
option(MK_FUZZ_MODE "Enable HonggFuzz mode" No)
option(MK_HTTP2 "Enable HTTP Support (dev)" No)

# Plugins: what should be build ?, these options
Expand Down Expand Up @@ -114,7 +115,6 @@ if (MK_HTTP2)
MK_DEFINITION(MK_HAVE_HTTP2)
endif()


# Check for accept(2) v/s accept(4)
list(APPEND CMAKE_REQUIRED_DEFINITIONS -D_GNU_SOURCE)
check_symbol_exists(accept4 "sys/socket.h" HAVE_ACCEPT4)
Expand Down Expand Up @@ -366,3 +366,7 @@ if(NOT SKIP_EMPTY_DIRS)
endif()

add_subdirectory(api)

if(MK_FUZZ_MODE)
add_subdirectory(fuzz)
endif()
7 changes: 7 additions & 0 deletions lib/monkey/api/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,10 @@ set(src

add_executable(api_test ${src})
target_link_libraries(api_test monkey-core-static)

set(src
errors.c
)

add_executable(api_error ${src})
target_link_libraries(api_error monkey-core-static)
20 changes: 16 additions & 4 deletions lib/monkey/api/test.c
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
#include <unistd.h>

#define API_ADDR "127.0.0.1"
#define API_PORT "2020"
#define API_PORT "8080"

/* Main context set as global so the signal handler can use it */
mk_ctx_t *ctx;
Expand All @@ -18,6 +18,16 @@ void cb_worker(void *data)
mk_info("[api test] test worker callback; data=%p", data);
}

void cb_main(mk_request_t *request, void *data)
{
(void) data;

mk_http_status(request, 200);
mk_http_header(request, "X-Monkey", 8, "OK", 2);
mk_http_send(request, ":)\n", 3, NULL);
mk_http_done(request);
}

void cb_test_chunks(mk_request_t *request, void *data)
{
int i = 0;
Expand Down Expand Up @@ -90,6 +100,7 @@ static void cb_queue_message(mk_mq_t *queue, void *data, size_t size, void *ctx)
printf("\n\n");
}


int main()
{
int i = 0;
Expand All @@ -110,6 +121,7 @@ int main()

mk_config_set(ctx,
"Listen", API_PORT,
//"Timeout", "1",
NULL);

vid = mk_vhost_create(ctx, NULL);
Expand All @@ -118,16 +130,16 @@ int main()
NULL);
mk_vhost_handler(ctx, vid, "/test_chunks", cb_test_chunks, NULL);
mk_vhost_handler(ctx, vid, "/test_big_chunk", cb_test_big_chunk, NULL);
mk_vhost_handler(ctx, vid, "/", cb_main, NULL);

mk_worker_callback(ctx,
cb_worker,
ctx);

mk_info("Service: http:https://%s:%s/test", API_ADDR, API_PORT);
mk_info("Service: http:https://%s:%s/test_chunks", API_ADDR, API_PORT);
mk_start(ctx);


for (i = 0; i < 1000; i++) {
for (i = 0; i < 5; i++) {
len = snprintf(msg, sizeof(msg) - 1, "[...] message ID: %i\n", i);
mk_mq_send(ctx, qid, &msg, len);
}
Expand Down
1 change: 1 addition & 0 deletions lib/monkey/include/monkey/mk_core/mk_event.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
#define MK_EVENT_H

/* Events type family */
#define MK_EVENT_UNMODIFIED -1 /* keep old event type */
#define MK_EVENT_NOTIFICATION 0 /* notification channel (pipe) */
#define MK_EVENT_LISTENER 1 /* listener socket */
#define MK_EVENT_FIFO 2 /* FIFO - Messaging */
Expand Down
3 changes: 2 additions & 1 deletion lib/monkey/include/monkey/mk_http_thread.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
#define MK_HTTP_THREAD_PLUGIN 1

struct mk_http_thread {
int close; /* Close TCP connection ? */
struct mk_http_session *session; /* HTTP session */
struct mk_http_request *request; /* HTTP request */
struct mk_thread *parent; /* Parent thread */
Expand All @@ -50,6 +51,6 @@ int mk_http_thread_destroy(struct mk_http_thread *mth);
int mk_http_thread_event(struct mk_event *event);

int mk_http_thread_start(struct mk_http_thread *mth);
int mk_http_thread_purge(struct mk_http_thread *mth);
int mk_http_thread_purge(struct mk_http_thread *mth, int close);

#endif
1 change: 1 addition & 0 deletions lib/monkey/include/monkey/mk_scheduler.h
Original file line number Diff line number Diff line change
Expand Up @@ -339,5 +339,6 @@ void mk_sched_worker_cb_free(struct mk_server *server);
int mk_sched_send_signal(struct mk_server *server, uint64_t val);
int mk_sched_workers_join(struct mk_server *server);
int mk_sched_threads_purge(struct mk_sched_worker *sched);
int mk_sched_threads_destroy_all(struct mk_sched_worker *sched);

#endif
7 changes: 5 additions & 2 deletions lib/monkey/mk_core/mk_event_epoll.c
Original file line number Diff line number Diff line change
Expand Up @@ -111,13 +111,16 @@ static inline int _mk_event_add(struct mk_event_ctx *ctx, int fd,
if (event->mask == MK_EVENT_EMPTY) {
op = EPOLL_CTL_ADD;
event->fd = fd;
event->type = type;
event->status = MK_EVENT_REGISTERED;
event->type = type;

}
else {
op = EPOLL_CTL_MOD;
if (type != MK_EVENT_UNMODIFIED) {
event->type = type;
}
}

ep_event.events = EPOLLERR | EPOLLHUP | EPOLLRDHUP;
ep_event.data.ptr = data;

Expand Down
5 changes: 5 additions & 0 deletions lib/monkey/mk_core/mk_event_kqueue.c
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,11 @@ static inline int _mk_event_add(struct mk_event_ctx *ctx, int fd,
event->type = type;
event->status = MK_EVENT_REGISTERED;
}
else {
if (type != MK_EVENT_UNMODIFIED) {
event->type = type;
}
}

/* Read flag */
if ((event->mask ^ MK_EVENT_READ) && (events & MK_EVENT_READ)) {
Expand Down
4 changes: 3 additions & 1 deletion lib/monkey/mk_core/mk_event_select.c
Original file line number Diff line number Diff line change
Expand Up @@ -101,9 +101,11 @@ static inline int _mk_event_add(struct mk_event_ctx *ctx, int fd,

event = (struct mk_event *) data;
event->fd = fd;
event->type = type;
event->mask = events;
event->status = MK_EVENT_REGISTERED;
if (type != MK_EVENT_UNMODIFIED) {
event->type = type;
}

ctx->events[fd] = event;
if (fd > ctx->max_fd) {
Expand Down
23 changes: 15 additions & 8 deletions lib/monkey/mk_server/mk_http.c
Original file line number Diff line number Diff line change
Expand Up @@ -718,7 +718,6 @@ int mk_http_init(struct mk_http_session *cs, struct mk_http_request *sr,
//return mk_http_error(MK_CLIENT_BAD_REQUEST, cs, sr, server);
}


ret_file = mk_file_get_info(sr->real_path.data, &sr->file_info, MK_FILE_READ);

/* Manually set the headers input streams */
Expand Down Expand Up @@ -750,6 +749,7 @@ int mk_http_init(struct mk_http_session *cs, struct mk_http_request *sr,
if (!mth) {
return -1;
}

mk_http_thread_start(mth);
return MK_EXIT_OK;
}
Expand Down Expand Up @@ -1085,7 +1085,8 @@ int mk_http_keepalive_check(struct mk_http_session *cs,
}

if (sr->connection.data) {
if (cs->parser.header_connection == MK_HTTP_PARSER_CONN_KA) {
if (cs->parser.header_connection == MK_HTTP_PARSER_CONN_KA &&
sr->protocol == MK_HTTP_PROTOCOL_11) {
cs->close_now = MK_FALSE;
}
else if (cs->parser.header_connection == MK_HTTP_PARSER_CONN_CLOSE) {
Expand Down Expand Up @@ -1130,7 +1131,6 @@ int mk_http_request_end(struct mk_http_session *cs, struct mk_server *server)
/* Check if we have some enqueued pipeline requests */
ret = mk_http_parser_more(&cs->parser, cs->body_length);
if (ret == MK_TRUE) {

/* Our pipeline request limit is the same that our keepalive limit */
cs->counter_connections++;
len = (cs->body_length - cs->parser.i) -1;
Expand All @@ -1147,7 +1147,11 @@ int mk_http_request_end(struct mk_http_session *cs, struct mk_server *server)
status = mk_http_parser(sr, &cs->parser, cs->body, cs->body_length,
server);
if (status == MK_HTTP_PARSER_OK) {
mk_http_request_prepare(cs, sr, server);
ret = mk_http_request_prepare(cs, sr, server);
if (ret == MK_EXIT_ABORT) {
return -1;
}

/*
* Return 1 means, we still have more data to send in a different
* scheduler round.
Expand Down Expand Up @@ -1376,6 +1380,9 @@ void mk_http_session_remove(struct mk_http_session *cs,

cs->_sched_init = MK_FALSE;

/* Remove any pending thread context */
struct mk_sched_worker *sched = mk_sched_get_thread_conf();
mk_sched_threads_destroy_all(sched);
}

/* FIXME: nobody is using this */
Expand Down Expand Up @@ -1557,7 +1564,7 @@ int mk_http_sched_read(struct mk_sched_conn *conn,
return -1;
}
mk_sched_conn_timeout_del(conn);
mk_http_request_prepare(cs, sr, server);
ret = mk_http_request_prepare(cs, sr, server);
}
else if (status == MK_HTTP_PARSER_ERROR) {
/* The HTTP parser may enqueued some response error */
Expand All @@ -1581,7 +1588,7 @@ int mk_http_sched_close(struct mk_sched_conn *conn,
struct mk_sched_worker *sched,
int type, struct mk_server *server)
{
struct mk_http_session *cs;
struct mk_http_session *session;
(void) sched;

#ifdef TRACE
Expand All @@ -1591,8 +1598,8 @@ int mk_http_sched_close(struct mk_sched_conn *conn,
#endif

/* Release resources of the requests and session */
cs = mk_http_session_get(conn);
mk_http_session_remove(cs, server);
session = mk_http_session_get(conn);
mk_http_session_remove(session, server);
return 0;
}

Expand Down
32 changes: 27 additions & 5 deletions lib/monkey/mk_server/mk_http_thread.c
Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,13 @@ struct mk_http_libco_params libco_param;

static inline void thread_cb_init_vars()
{
int close;
int type = libco_param.type;
struct mk_vhost_handler *handler = libco_param.handler;
struct mk_http_session *session = libco_param.session;
struct mk_http_request *request = libco_param.request;
struct mk_thread *th = libco_param.th;
struct mk_http_thread *mth;
//struct mk_plugin *plugin;

/*
Expand Down Expand Up @@ -85,17 +87,28 @@ static inline void thread_cb_init_vars()
//return -1;
}

mk_http_request_end(session, session->server);
mk_http_thread_purge(request->thread);
/* Save temporal session */
mth = request->thread;

/*
* Finalize request internally, if ret == -1 means we should
* ask to shutdown the connection.
*/
ret = mk_http_request_end(session, session->server);
if (ret == -1) {
close = MK_TRUE;
}
else {
close = MK_FALSE;
}
mk_http_thread_purge(mth, close);

/* Return control to caller */
mk_thread_yield(th);
}
else if (type == MK_HTTP_THREAD_PLUGIN) {
/* FIXME: call plugin handler callback with params */
}

printf("init vars finishing\n");
}

static inline void thread_params_set(struct mk_thread *th,
Expand Down Expand Up @@ -148,6 +161,7 @@ struct mk_http_thread *mk_http_thread_create(int type,
mth->session = session;
mth->request = request;
mth->parent = th;
mth->close = MK_FALSE;
request->thread = mth;
mk_list_add(&mth->_head, &sched->threads);

Expand All @@ -170,7 +184,7 @@ struct mk_http_thread *mk_http_thread_create(int type,
* Move a http thread context from sched->thread to sched->threads_purge list.
* On this way the scheduler will release or reasign the resource later.
*/
int mk_http_thread_purge(struct mk_http_thread *mth)
int mk_http_thread_purge(struct mk_http_thread *mth, int close)
{
struct mk_sched_worker *sched;

Expand All @@ -179,6 +193,7 @@ int mk_http_thread_purge(struct mk_http_thread *mth)
return -1;
}

mth->close = close;
mk_list_del(&mth->_head);
mk_list_add(&mth->_head, &sched->threads_purge);

Expand All @@ -194,6 +209,7 @@ int mk_http_thread_destroy(struct mk_http_thread *mth)

/* release original memory context */
th = mth->parent;
mth->session->channel->event->type = MK_EVENT_CONNECTION;
mk_thread_destroy(th);

return 0;
Expand All @@ -202,6 +218,12 @@ int mk_http_thread_destroy(struct mk_http_thread *mth)
int mk_http_thread_event(struct mk_event *event)
{
struct mk_sched_conn *conn = (struct mk_sched_conn *) event;

struct mk_thread *th;
struct mk_http_thread *mth;

th = conn->channel.thread;
mth = (struct mk_http_thread *) MK_THREAD_DATA(th);
mk_thread_resume(conn->channel.thread);
return 0;
}
Expand Down
8 changes: 8 additions & 0 deletions lib/monkey/mk_server/mk_lib.c
Original file line number Diff line number Diff line change
Expand Up @@ -694,6 +694,14 @@ int mk_http_done(mk_request_t *req)
/* Append end-of-chunk bytes */
mk_http_send(req, NULL, 0, NULL);
}
else {
mk_http_send(req, NULL, 0, NULL);
}

if (req->session->close_now == MK_TRUE) {
mk_lib_yield(req);
mk_http_session_remove(req->session, req->session->server);
}

return 0;
}
Expand Down
Loading

0 comments on commit aa43f91

Please sign in to comment.