Skip to content

Commit

Permalink
Use hash for message store to speed up loading.
Browse files Browse the repository at this point in the history
Comes at the expense of increased memory usage. This could be countered
by using a hash just for loading (increased memory usage during loading,
reduced afterwards) but this approach does allow the immediate removal
of messages from the store.
  • Loading branch information
ralight committed Nov 17, 2014
1 parent 94cd34c commit 4374170
Show file tree
Hide file tree
Showing 4 changed files with 31 additions and 48 deletions.
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -36,5 +36,8 @@ lib/libmosquitto.a

test/ssl/*.csr

test/lib/c/*.test
test/lib/cpp/*.test

build/
dist/
40 changes: 15 additions & 25 deletions src/database.c
Original file line number Diff line number Diff line change
Expand Up @@ -438,7 +438,6 @@ int mqtt3_db_message_store(struct mosquitto_db *db, const char *source, uint16_t
temp = _mosquitto_malloc(sizeof(struct mosquitto_msg_store));
if(!temp) return MOSQ_ERR_NOMEM;

temp->next = db->msg_store;
temp->ref_count = 0;
if(source){
temp->source_id = _mosquitto_strdup(source);
Expand Down Expand Up @@ -490,7 +489,6 @@ int mqtt3_db_message_store(struct mosquitto_db *db, const char *source, uint16_t
temp->dest_ids = NULL;
temp->dest_id_count = 0;
db->msg_store_count++;
db->msg_store = temp;
(*stored) = temp;

if(!store_id){
Expand All @@ -499,6 +497,8 @@ int mqtt3_db_message_store(struct mosquitto_db *db, const char *source, uint16_t
temp->db_id = store_id;
}

HASH_ADD(hh, db->msg_store, db_id, sizeof(dbid_t), temp);

return MOSQ_ERR_SUCCESS;
}

Expand Down Expand Up @@ -841,35 +841,25 @@ int mqtt3_db_message_write(struct mosquitto *context)
void mqtt3_db_store_clean(struct mosquitto_db *db)
{
/* FIXME - this may not be necessary if checks are made when messages are removed. */
struct mosquitto_msg_store *tail, *last = NULL;
struct mosquitto_msg_store *msg_store, *msg_tmp;
int i;
assert(db);

tail = db->msg_store;
while(tail){
if(tail->ref_count == 0){
if(tail->source_id) _mosquitto_free(tail->source_id);
if(tail->dest_ids){
for(i=0; i<tail->dest_id_count; i++){
if(tail->dest_ids[i]) _mosquitto_free(tail->dest_ids[i]);
HASH_ITER(hh, db->msg_store, msg_store, msg_tmp){
if(msg_store->ref_count == 0){
HASH_DELETE(hh, db->msg_store, msg_store);

if(msg_store->source_id) _mosquitto_free(msg_store->source_id);
if(msg_store->dest_ids){
for(i=0; i<msg_store->dest_id_count; i++){
if(msg_store->dest_ids[i]) _mosquitto_free(msg_store->dest_ids[i]);
}
_mosquitto_free(tail->dest_ids);
}
if(tail->msg.topic) _mosquitto_free(tail->msg.topic);
if(tail->msg.payload) _mosquitto_free(tail->msg.payload);
if(last){
last->next = tail->next;
_mosquitto_free(tail);
tail = last->next;
}else{
db->msg_store = tail->next;
_mosquitto_free(tail);
tail = db->msg_store;
_mosquitto_free(msg_store->dest_ids);
}
if(msg_store->msg.topic) _mosquitto_free(msg_store->msg.topic);
if(msg_store->msg.payload) _mosquitto_free(msg_store->msg.payload);
_mosquitto_free(msg_store);
db->msg_store_count--;
}else{
last = tail;
tail = tail->next;
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/mosquitto_broker.h
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ struct _mosquitto_subhier {
};

struct mosquitto_msg_store{
struct mosquitto_msg_store *next;
UT_hash_handle hh;
dbid_t db_id;
int ref_count;
char *source_id;
Expand Down
34 changes: 12 additions & 22 deletions src/persist.c
Original file line number Diff line number Diff line change
Expand Up @@ -131,14 +131,13 @@ static int mqtt3_db_message_store_write(struct mosquitto_db *db, FILE *db_fptr)
uint32_t i32temp;
uint16_t i16temp, slen;
uint8_t i8temp;
struct mosquitto_msg_store *stored;
struct mosquitto_msg_store *stored, *stored_tmp;
bool force_no_retain;

assert(db);
assert(db_fptr);

stored = db->msg_store;
while(stored){
HASH_ITER(hh, db->msg_store, stored, stored_tmp){
if(!strncmp(stored->msg.topic, "$SYS", 4)){
/* Don't save $SYS messages as retained otherwise they can give
* misleading information when reloaded. They should still be saved
Expand Down Expand Up @@ -193,8 +192,6 @@ static int mqtt3_db_message_store_write(struct mosquitto_db *db, FILE *db_fptr)
if(stored->msg.payloadlen){
write_e(db_fptr, stored->msg.payload, (unsigned int)stored->msg.payloadlen);
}

stored = stored->next;
}

return MOSQ_ERR_SUCCESS;
Expand Down Expand Up @@ -428,20 +425,14 @@ static int _db_client_msg_restore(struct mosquitto_db *db, const char *client_id
cmsg->state = state;
cmsg->dup = dup;

store = db->msg_store;
while(store){
if(store->db_id == store_id){
cmsg->store = store;
cmsg->store->ref_count++;
break;
}
store = store->next;
}
if(!cmsg->store){
HASH_FIND(hh, db->msg_store, &store_id, sizeof(dbid_t), store);
if(!store){
_mosquitto_free(cmsg);
_mosquitto_log_printf(NULL, MOSQ_LOG_ERR, "Error restoring persistent database, message store corrupt.");
return 1;
}
cmsg->store = store;

context = _db_find_or_add_context(db, client_id, 0);
if(!context){
_mosquitto_free(cmsg);
Expand Down Expand Up @@ -655,13 +646,12 @@ static int _db_retain_chunk_restore(struct mosquitto_db *db, FILE *db_fptr)
return 1;
}
store_id = i64temp;
store = db->msg_store;
while(store){
if(store->db_id == store_id){
mqtt3_db_messages_queue(db, NULL, store->msg.topic, store->msg.qos, store->msg.retain, store);
break;
}
store = store->next;
HASH_FIND(hh, db->msg_store, &store_id, sizeof(dbid_t), store);
if(store){
mqtt3_db_messages_queue(db, NULL, store->msg.topic, store->msg.qos, store->msg.retain, store);
}else{
_mosquitto_log_printf(NULL, MOSQ_LOG_ERR, "Error: Corrupt database whilst restoring a retained message.");
return MOSQ_ERR_INVAL;
}
return MOSQ_ERR_SUCCESS;
}
Expand Down

0 comments on commit 4374170

Please sign in to comment.