Skip to content

Commit

Permalink
Fix websockets listeners not causing the main loop not to wake up.
Browse files Browse the repository at this point in the history
Closes #1936. Thanks to sectokia.
  • Loading branch information
ralight committed Dec 10, 2020
1 parent f54360d commit 37ce517
Show file tree
Hide file tree
Showing 6 changed files with 103 additions and 38 deletions.
2 changes: 2 additions & 0 deletions ChangeLog.txt
Expand Up @@ -6,6 +6,8 @@ Broker:
Closes #1934.
- Fix DH group not being set for TLS connections, which meant ciphers using
DHE couldn't be used. Closes #1925. Closes #1476.
- Fix websockets listeners not causing the main loop not to wake up.
Closes #1936.

Client library:
- Fix DH group not being set for TLS connections, which meant ciphers using
Expand Down
94 changes: 65 additions & 29 deletions src/mosquitto.c
Expand Up @@ -58,6 +58,10 @@ SPDX-License-Identifier: EPL-2.0 OR EDL-1.0

struct mosquitto_db db;

static struct mosquitto__listener_sock *listensock = NULL;
static int listensock_count = 0;
static int listensock_index = 0;

bool flag_reload = false;
#ifdef WITH_PERSISTENCE
bool flag_db_backup = false;
Expand Down Expand Up @@ -202,37 +206,74 @@ void listeners__reload_all_certificates(void)
}


int listeners__start_single_mqtt(struct mosquitto__listener_sock **listensock, int *listensock_count, int *listensock_index, struct mosquitto__listener *listener)
int listeners__start_single_mqtt(struct mosquitto__listener *listener)
{
int i;
struct mosquitto__listener_sock *listensock_new;

if(net__socket_listen(listener)){
return 1;
}
(*listensock_count) += listener->sock_count;
listensock_new = mosquitto__realloc(*listensock, sizeof(struct mosquitto__listener_sock)*(size_t)(*listensock_count));
listensock_count += listener->sock_count;
listensock_new = mosquitto__realloc(listensock, sizeof(struct mosquitto__listener_sock)*(size_t)listensock_count);
if(!listensock_new){
return 1;
}
*listensock = listensock_new;
listensock = listensock_new;

for(i=0; i<listener->sock_count; i++){
if(listener->socks[i] == INVALID_SOCKET){
return 1;
}
(*listensock)[*listensock_index].sock = listener->socks[i];
(*listensock)[*listensock_index].listener = listener;
listensock[listensock_index].sock = listener->socks[i];
listensock[listensock_index].listener = listener;
#ifdef WITH_EPOLL
(*listensock)[*listensock_index].ident = id_listener;
listensock[listensock_index].ident = id_listener;
#endif
(*listensock_index)++;
listensock_index++;
}
return MOSQ_ERR_SUCCESS;
}


int listeners__add_local(struct mosquitto__listener_sock **listensock, int *listensock_count, int *listensock_index, const char *host, uint16_t port)
#ifdef WITH_WEBSOCKETS
void listeners__add_websockets(struct lws_context *ws_context, int fd)
{
int i;
struct mosquitto__listener *listener = NULL;
struct mosquitto__listener_sock *listensock_new;

/* Don't add more listeners after we've started the main loop */
if(run || ws_context == NULL) return;

/* Find context */
for(i=0; i<db.config->listener_count; i++){
if(db.config->listeners[i].ws_in_init){
listener = &db.config->listeners[i];
break;
}
}
if(listener == NULL){
return;
}

listensock_count++;
listensock_new = mosquitto__realloc(listensock, sizeof(struct mosquitto__listener_sock)*(size_t)listensock_count);
if(!listensock_new){
return;
}
listensock = listensock_new;

listensock[listensock_index].sock = fd;
listensock[listensock_index].listener = listener;
#ifdef WITH_EPOLL
listensock[listensock_index].ident = id_listener_ws;
#endif
listensock_index++;
}
#endif

int listeners__add_local(const char *host, uint16_t port)
{
struct mosquitto__listener *listeners;
listeners = db.config->listeners;
Expand All @@ -244,7 +285,7 @@ int listeners__add_local(struct mosquitto__listener_sock **listensock, int *list
if(listeners[db.config->listener_count].host == NULL){
return MOSQ_ERR_NOMEM;
}
if(listeners__start_single_mqtt(listensock, listensock_count, listensock_index, &listeners[db.config->listener_count])){
if(listeners__start_single_mqtt(&listeners[db.config->listener_count])){
mosquitto__free(listeners[db.config->listener_count].host);
listeners[db.config->listener_count].host = NULL;
return MOSQ_ERR_UNKNOWN;
Expand All @@ -253,11 +294,10 @@ int listeners__add_local(struct mosquitto__listener_sock **listensock, int *list
return MOSQ_ERR_SUCCESS;
}

int listeners__start_local_only(struct mosquitto__listener_sock **listensock, int *listensock_count)
int listeners__start_local_only(void)
{
/* Attempt to open listeners bound to 127.0.0.1 and ::1 only */
int i;
int listensock_index = 0;
int rc;
struct mosquitto__listener *listeners;

Expand All @@ -272,15 +312,15 @@ int listeners__start_local_only(struct mosquitto__listener_sock **listensock, in
log__printf(NULL, MOSQ_LOG_WARNING, "Starting in local only mode. Connections will only be possible from clients running on this machine.");
log__printf(NULL, MOSQ_LOG_WARNING, "Create a configuration file which defines a listener to allow remote access.");
if(db.config->cmd_port_count == 0){
rc = listeners__add_local(listensock, listensock_count, &listensock_index, "127.0.0.1", 1883);
rc = listeners__add_local("127.0.0.1", 1883);
if(rc == MOSQ_ERR_NOMEM) return MOSQ_ERR_NOMEM;
rc = listeners__add_local(listensock, listensock_count, &listensock_index, "::1", 1883);
rc = listeners__add_local("::1", 1883);
if(rc == MOSQ_ERR_NOMEM) return MOSQ_ERR_NOMEM;
}else{
for(i=0; i<db.config->cmd_port_count; i++){
rc = listeners__add_local(listensock, listensock_count, &listensock_index, "127.0.0.1", db.config->cmd_port[i]);
rc = listeners__add_local("127.0.0.1", db.config->cmd_port[i]);
if(rc == MOSQ_ERR_NOMEM) return MOSQ_ERR_NOMEM;
rc = listeners__add_local(listensock, listensock_count, &listensock_index, "::1", db.config->cmd_port[i]);
rc = listeners__add_local("::1", db.config->cmd_port[i]);
if(rc == MOSQ_ERR_NOMEM) return MOSQ_ERR_NOMEM;
}
}
Expand All @@ -293,16 +333,14 @@ int listeners__start_local_only(struct mosquitto__listener_sock **listensock, in
}


int listeners__start(struct mosquitto__listener_sock **listensock, int *listensock_count)
int listeners__start(void)
{
int i;
int listensock_index = 0;

listensock_index = 0;
(*listensock_count) = 0;
listensock_count = 0;

if(db.config->listener_count == 0){
if(listeners__start_local_only(listensock, listensock_count)){
if(listeners__start_local_only()){
db__close();
if(db.config->pid_file){
(void)remove(db.config->pid_file);
Expand All @@ -314,7 +352,7 @@ int listeners__start(struct mosquitto__listener_sock **listensock, int *listenso

for(i=0; i<db.config->listener_count; i++){
if(db.config->listeners[i].protocol == mp_mqtt){
if(listeners__start_single_mqtt(listensock, listensock_count, &listensock_index, &db.config->listeners[i])){
if(listeners__start_single_mqtt(&db.config->listeners[i])){
db__close();
if(db.config->pid_file){
(void)remove(db.config->pid_file);
Expand All @@ -323,23 +361,23 @@ int listeners__start(struct mosquitto__listener_sock **listensock, int *listenso
}
}else if(db.config->listeners[i].protocol == mp_websockets){
#ifdef WITH_WEBSOCKETS
db.config->listeners[i].ws_context = mosq_websockets_init(&db.config->listeners[i], db.config);
mosq_websockets_init(&db.config->listeners[i], db.config);
if(!db.config->listeners[i].ws_context){
log__printf(NULL, MOSQ_LOG_ERR, "Error: Unable to create websockets listener on port %d.", db.config->listeners[i].port);
return 1;
}
#endif
}
}
if((*listensock) == NULL){
if(listensock == NULL){
log__printf(NULL, MOSQ_LOG_ERR, "Error: Unable to start any listening sockets, exiting.");
return 1;
}
return MOSQ_ERR_SUCCESS;
}


void listeners__stop(struct mosquitto__listener_sock *listensock, int listensock_count)
void listeners__stop(void)
{
int i;

Expand Down Expand Up @@ -404,8 +442,6 @@ int pid__write(void)

int main(int argc, char *argv[])
{
struct mosquitto__listener_sock *listensock = NULL;
int listensock_count = 0;
struct mosquitto__config config;
#ifdef WITH_BRIDGE
int i;
Expand Down Expand Up @@ -510,7 +546,7 @@ int main(int argc, char *argv[])
sys_tree__init();
#endif

if(listeners__start(&listensock, &listensock_count)) return 1;
if(listeners__start()) return 1;

signal__setup();

Expand Down Expand Up @@ -541,7 +577,7 @@ int main(int argc, char *argv[])
#endif
session_expiry__remove_all();

listeners__stop(listensock, listensock_count);
listeners__stop();

HASH_ITER(hh_id, db.contexts_by_id, ctxt, ctxt_tmp){
#ifdef WITH_WEBSOCKETS
Expand Down
8 changes: 7 additions & 1 deletion src/mosquitto_broker_internal.h
Expand Up @@ -186,6 +186,7 @@ enum struct_ident{
id_invalid = 0,
id_listener = 1,
id_client = 2,
id_listener_ws = 3,
};
#endif

Expand Down Expand Up @@ -224,6 +225,7 @@ struct mosquitto__listener {
#endif
#ifdef WITH_WEBSOCKETS
struct lws_context *ws_context;
bool ws_in_init;
char *http_dir;
struct lws_protocols *ws_protocol;
#endif
Expand Down Expand Up @@ -557,6 +559,7 @@ struct mosquitto__bridge{
#ifdef WITH_WEBSOCKETS
struct libws_mqtt_hack {
char *http_dir;
struct mosquitto__listener *listener;
};

struct libws_mqtt_data {
Expand Down Expand Up @@ -743,6 +746,9 @@ int mux__cleanup(void);
* ============================================================ */
void listener__set_defaults(struct mosquitto__listener *listener);
void listeners__reload_all_certificates(void);
#ifdef WITH_WEBSOCKETS
void listeners__add_websockets(struct lws_context *ws_context, int fd);
#endif

/* ============================================================
* Plugin related functions
Expand Down Expand Up @@ -825,7 +831,7 @@ DWORD WINAPI SigThreadProc(void* data);
* Websockets related functions
* ============================================================ */
#ifdef WITH_WEBSOCKETS
struct lws_context *mosq_websockets_init(struct mosquitto__listener *listener, const struct mosquitto__config *conf);
void mosq_websockets_init(struct mosquitto__listener *listener, const struct mosquitto__config *conf);
#endif
void do_disconnect(struct mosquitto *context, int reason);

Expand Down
5 changes: 5 additions & 0 deletions src/mux_epoll.c
Expand Up @@ -217,6 +217,11 @@ int mux_epoll__handle(void)
mux__add_in(context);
}
}
#ifdef WITH_WEBSOCKETS
}else if(context->ident == id_listener_ws){
/* Nothing needs to happen here, because we always call lws_service in the loop.
* The important point is we've been woken up for this listener. */
#endif
}
}
}
Expand Down
14 changes: 11 additions & 3 deletions src/mux_poll.c
Expand Up @@ -211,9 +211,17 @@ int mux_poll__handle(struct mosquitto__listener_sock *listensock, int listensock

for(i=0; i<listensock_count; i++){
if(pollfds[i].revents & (POLLIN | POLLPRI)){
while((context = net__socket_accept(&listensock[i])) != NULL){
context->pollfd_index = -1;
mux__add_in(context);
#ifdef WITH_WEBSOCKETS
if(listensock[i]->listener.ws_context){
/* Nothing needs to happen here, because we always call lws_service in the loop.
* The important point is we've been woken up for this listener. */
}else
#endif
{
while((context = net__socket_accept(&listensock[i])) != NULL){
context->pollfd_index = -1;
mux__add_in(context);
}
}
}
}
Expand Down
18 changes: 13 additions & 5 deletions src/websockets.c
Expand Up @@ -616,6 +616,11 @@ static int callback_http(
}else{
mux__remove_out(mosq);
}
}else{
if(reason == LWS_CALLBACK_ADD_POLL_FD && (pollargs->events & POLLIN)){
/* Assume this is a new listener */
listeners__add_websockets(lws_get_context(wsi), pollargs->fd);
}
}
break;

Expand All @@ -641,7 +646,7 @@ static void log_wrap(int level, const char *line)
log__printf(NULL, MOSQ_LOG_WEBSOCKETS, "%s", l);
}

struct lws_context *mosq_websockets_init(struct mosquitto__listener *listener, const struct mosquitto__config *conf)
void mosq_websockets_init(struct mosquitto__listener *listener, const struct mosquitto__config *conf)
{
struct lws_context_creation_info info;
struct lws_protocols *p;
Expand All @@ -655,7 +660,7 @@ struct lws_context *mosq_websockets_init(struct mosquitto__listener *listener, c
p = mosquitto__calloc(protocol_count+1, sizeof(struct lws_protocols));
if(!p){
log__printf(NULL, MOSQ_LOG_ERR, "Out of memory.");
return NULL;
return;
}
for(i=0; protocols[i].name; i++){
p[i].name = protocols[i].name;
Expand Down Expand Up @@ -694,7 +699,7 @@ struct lws_context *mosq_websockets_init(struct mosquitto__listener *listener, c
if(!user){
mosquitto__free(p);
log__printf(NULL, MOSQ_LOG_ERR, "Out of memory.");
return NULL;
return;
}

if(listener->http_dir){
Expand All @@ -707,9 +712,10 @@ struct lws_context *mosq_websockets_init(struct mosquitto__listener *listener, c
mosquitto__free(user);
mosquitto__free(p);
log__printf(NULL, MOSQ_LOG_ERR, "Error: Unable to open http dir \"%s\".", listener->http_dir);
return NULL;
return;
}
}
user->listener = listener;

info.user = user;
info.pt_serv_buf_size = WS_SERV_BUF_SIZE;
Expand All @@ -718,7 +724,9 @@ struct lws_context *mosq_websockets_init(struct mosquitto__listener *listener, c
lws_set_log_level(conf->websockets_log_level, log_wrap);

log__printf(NULL, MOSQ_LOG_INFO, "Opening websockets listen socket on port %d.", listener->port);
return lws_create_context(&info);
listener->ws_in_init = true;
listener->ws_context = lws_create_context(&info);
listener->ws_in_init = false;
}


Expand Down

0 comments on commit 37ce517

Please sign in to comment.