Skip to content

Commit

Permalink
More memory fixes.
Browse files Browse the repository at this point in the history
  • Loading branch information
ralight committed Jun 29, 2014
1 parent 15efd2d commit bdb3e74
Show file tree
Hide file tree
Showing 11 changed files with 38 additions and 34 deletions.
3 changes: 2 additions & 1 deletion lib/mosquitto_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,8 @@ enum mosquitto_client_state {
mosq_cs_disconnecting = 2,
mosq_cs_connect_async = 3,
mosq_cs_connect_pending = 4,
mosq_cs_connect_srv = 5
mosq_cs_connect_srv = 5,
mosq_cs_disconnect_ws = 6
};

enum _mosquitto_protocol {
Expand Down
11 changes: 10 additions & 1 deletion lib/net_mosq.c
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ void _mosquitto_net_init(void)
void _mosquitto_net_cleanup(void)
{
#ifdef WITH_TLS
sk_SSL_COMP_free(SSL_COMP_get_compression_methods());
ERR_remove_state(0);
ENGINE_cleanup();
CONF_modules_unload(1);
Expand Down Expand Up @@ -219,12 +220,20 @@ int _mosquitto_socket_close(struct mosquitto *mosq)
}
#endif

if(mosq->sock != INVALID_SOCKET){
if(mosq->sock >= 0){
#ifdef WITH_BROKER
HASH_DELETE(hh_sock, db->contexts_by_sock, mosq);
#endif
rc = COMPAT_CLOSE(mosq->sock);
mosq->sock = INVALID_SOCKET;
#ifdef WITH_WEBSOCKETS
}else if(mosq->sock == WEBSOCKET_CLIENT){
if(mosq->state != mosq_cs_disconnecting){
mosq->state = mosq_cs_disconnect_ws;
}
libwebsocket_callback_on_writable(mosq->ws_context, mosq->wsi);
mosq->sock = INVALID_SOCKET;
#endif
}

#ifdef WITH_BROKER
Expand Down
2 changes: 1 addition & 1 deletion src/bridge.c
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ int mqtt3_bridge_new(struct mosquitto_db *db, struct _mqtt3_bridge *bridge)
if(!new_context){
return MOSQ_ERR_NOMEM;
}
new_context->id = local_id;
new_context->id = _mosquitto_strdup(local_id);
HASH_ADD_KEYPTR(hh_id, db->contexts_by_id, new_context->id, strlen(new_context->id), new_context);
}
new_context->bridge = bridge;
Expand Down
12 changes: 10 additions & 2 deletions src/context.c
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ struct mosquitto *mqtt3_context_init(struct mosquitto_db *db, int sock)
context->current_out_packet = NULL;

context->address = NULL;
if(sock != -1){
if(sock >= 0){
if(!_mosquitto_socket_get_address(sock, address, 1024)){
context->address = _mosquitto_strdup(address);
}
Expand All @@ -76,7 +76,7 @@ struct mosquitto *mqtt3_context_init(struct mosquitto_db *db, int sock)
context->ssl = NULL;
#endif

if(context->sock != INVALID_SOCKET){
if(context->sock >= 0){
HASH_ADD(hh_sock, db->contexts_by_sock, sock, sizeof(context->sock), context);
}
return context;
Expand Down Expand Up @@ -113,6 +113,8 @@ void mqtt3_context_cleanup(struct mosquitto_db *db, struct mosquitto *context, b
if(ctx_tmp){
HASH_DELETE(hh_bridge, db->contexts_bridge, context);
}
_mosquitto_free(context->bridge->local_clientid);
context->bridge->local_clientid = NULL;
}
if(context->bridge->username){
context->bridge->username = NULL;
Expand Down Expand Up @@ -140,6 +142,12 @@ void mqtt3_context_cleanup(struct mosquitto_db *db, struct mosquitto *context, b
_mosquitto_free(context->address);
context->address = NULL;
}

HASH_FIND(hh_for_free, db->contexts_for_free, context, sizeof(void *), ctx_tmp);
if(ctx_tmp){
HASH_DELETE(hh_for_free, db->contexts_for_free, context);
}

if(context->id){
assert(db); /* db can only be NULL here if the client hasn't sent a
CONNECT and hence wouldn't have an id. */
Expand Down
15 changes: 1 addition & 14 deletions src/database.c
Original file line number Diff line number Diff line change
Expand Up @@ -232,11 +232,7 @@ int mqtt3_db_message_insert(struct mosquitto_db *db, struct mosquitto *context,
}
}
}
#ifdef WITH_WEBSOCKETS
if(context->sock == INVALID_SOCKET && !context->wsi){
#else
if(context->sock == INVALID_SOCKET){
#endif
/* Client is not connected only queue messages with QoS>0. */
if(qos == 0 && !db->config->queue_qos0_messages){
if(!context->bridge){
Expand All @@ -249,11 +245,7 @@ int mqtt3_db_message_insert(struct mosquitto_db *db, struct mosquitto *context,
}
}

#ifdef WITH_WEBSOCKETS
if(context->sock != INVALID_SOCKET || context->wsi){
#else
if(context->sock != INVALID_SOCKET){
#endif
if(qos == 0 || max_inflight == 0 || context->msg_count12 < max_inflight){
if(dir == mosq_md_out){
switch(qos){
Expand Down Expand Up @@ -732,13 +724,8 @@ int mqtt3_db_message_write(struct mosquitto *context)
const void *payload;
int msg_count = 0;

#ifdef WITH_WEBSOCKETS
if(!context || (context->sock == -1 && !context->wsi)
if(!context || context->sock == INVALID_SOCKET
|| (context->state == mosq_cs_connected && !context->id)){
#else
if(!context || context->sock == -1
|| (context->state == mosq_cs_connected && !context->id)){
#endif
return MOSQ_ERR_INVAL;
}

Expand Down
4 changes: 0 additions & 4 deletions src/loop.c
Original file line number Diff line number Diff line change
Expand Up @@ -130,11 +130,7 @@ int mosquitto_main_loop(struct mosquitto_db *db, int *listensock, int listensock
}
context->pollfd_index = -1;

#ifdef WITH_WEBSOCKETS
if(context->sock != INVALID_SOCKET || context->wsi){
#else
if(context->sock != INVALID_SOCKET){
#endif
#ifdef WITH_BRIDGE
if(context->bridge){
_mosquitto_check_keepalive(db, context);
Expand Down
2 changes: 2 additions & 0 deletions src/mosquitto_broker.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ and the Eclipse Distribution License is available at
#define MQTT3_LOG_TOPIC 0x10
#define MQTT3_LOG_ALL 0xFF

#define WEBSOCKET_CLIENT -2

enum mosquitto_protocol {
mp_mqtt,
mp_mqttsn,
Expand Down
9 changes: 5 additions & 4 deletions src/net.c
Original file line number Diff line number Diff line change
Expand Up @@ -119,13 +119,13 @@ int mqtt3_socket_accept(struct mosquitto_db *db, int listensock)
}
}
if(!new_context->listener){
mqtt3_context_cleanup(NULL, new_context, true);
mqtt3_context_cleanup(db, new_context, true);
return -1;
}

if(new_context->listener->max_connections > 0 && new_context->listener->client_count > new_context->listener->max_connections){
_mosquitto_log_printf(NULL, MOSQ_LOG_NOTICE, "Client connection from %s denied: max_connections exceeded.", new_context->address);
mqtt3_context_cleanup(NULL, new_context, true);
mqtt3_context_cleanup(db, new_context, true);
return -1;
}

Expand All @@ -137,7 +137,7 @@ int mqtt3_socket_accept(struct mosquitto_db *db, int listensock)
if(db->config->listeners[i].ssl_ctx){
new_context->ssl = SSL_new(db->config->listeners[i].ssl_ctx);
if(!new_context->ssl){
mqtt3_context_cleanup(NULL, new_context, true);
mqtt3_context_cleanup(db, new_context, true);
return -1;
}
SSL_set_ex_data(new_context->ssl, tls_ex_index_context, new_context);
Expand All @@ -160,7 +160,7 @@ int mqtt3_socket_accept(struct mosquitto_db *db, int listensock)
new_context->address, ERR_error_string(e, ebuf));
e = ERR_get_error();
}
mqtt3_context_cleanup(NULL, new_context, true);
mqtt3_context_cleanup(db, new_context, true);
return -1;
}
}
Expand Down Expand Up @@ -229,6 +229,7 @@ static unsigned int psk_server_callback(SSL *ssl, const char *identity, unsigned
}
}

_mosquitto_free(psk_key);
return len;
}
#endif
Expand Down
2 changes: 1 addition & 1 deletion src/persist.c
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ static struct mosquitto *_db_find_or_add_context(struct mosquitto_db *db, const
context = mqtt3_context_init(db, -1);
if(!context) return NULL;
context->id = _mosquitto_strdup(client_id);
if(!context){
if(!context->id){
_mosquitto_free(context);
return NULL;
}
Expand Down
8 changes: 4 additions & 4 deletions src/read_handle_server.c
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ int mqtt3_handle_connect(struct mosquitto_db *db, struct mosquitto *context)
return 3;
}
if(_mosquitto_read_byte(&context->in_packet, &protocol_version)){
HASH_ADD_KEYPTR(hh_for_free, db->contexts_for_free, context, sizeof(context), context);
HASH_ADD_KEYPTR(hh_for_free, db->contexts_for_free, context, sizeof(void *), context);
rc = 1;
goto handle_connect_error;
return 1;
Expand Down Expand Up @@ -409,7 +409,7 @@ int mqtt3_handle_connect(struct mosquitto_db *db, struct mosquitto *context)
found_context->address = NULL;
}
found_context->disconnect_t = 0;
if(context->sock != INVALID_SOCKET){
if(context->sock >= 0){
HASH_DELETE(hh_sock, db->contexts_by_sock, context);
}
found_context->sock = context->sock;
Expand All @@ -430,7 +430,7 @@ int mqtt3_handle_connect(struct mosquitto_db *db, struct mosquitto *context)
context->ssl = NULL;
#endif
context->state = mosq_cs_disconnecting;
HASH_ADD_KEYPTR(hh_for_free, db->contexts_for_free, context, sizeof(context), context);
HASH_ADD_KEYPTR(hh_for_free, db->contexts_for_free, context, sizeof(void *), context);
context = found_context;
HASH_ADD(hh_sock, db->contexts_by_sock, sock, sizeof(context->sock), context);
if(context->msgs){
Expand Down Expand Up @@ -544,7 +544,7 @@ int mqtt3_handle_connect(struct mosquitto_db *db, struct mosquitto *context)
if(will_struct) _mosquitto_free(will_struct);
if(context){
mqtt3_context_disconnect(db, context);
HASH_ADD_KEYPTR(hh_for_free, db->contexts_for_free, context, sizeof(context), context);
HASH_ADD_KEYPTR(hh_for_free, db->contexts_for_free, context, sizeof(void *), context);
}
return rc;
}
Expand Down
4 changes: 2 additions & 2 deletions src/websockets.c
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ static int callback_mqtt(struct libwebsocket_context *context,

switch (reason) {
case LWS_CALLBACK_ESTABLISHED:
mosq = mqtt3_context_init(db, -1);
mosq = mqtt3_context_init(db, WEBSOCKET_CLIENT);
if(mosq){
mosq->ws_context = context;
mosq->wsi = wsi;
Expand All @@ -138,7 +138,7 @@ static int callback_mqtt(struct libwebsocket_context *context,

case LWS_CALLBACK_SERVER_WRITEABLE:
mosq = u->mosq;
if(mosq->state == mosq_cs_disconnecting){
if(mosq->state == mosq_cs_disconnect_ws || mosq->state == mosq_cs_disconnecting){
return -1;
}

Expand Down

0 comments on commit bdb3e74

Please sign in to comment.