Skip to content

Commit

Permalink
Merge branch 'improve-client-store' of git:https://github.com/dbeinder/mosq…
Browse files Browse the repository at this point in the history
…uitto into dbeinder-improve-client-store
  • Loading branch information
ralight committed Aug 7, 2020
2 parents 94d0413 + c11d20f commit 43df213
Show file tree
Hide file tree
Showing 28 changed files with 258 additions and 131 deletions.
24 changes: 12 additions & 12 deletions src/db_dump/db_dump.c
Original file line number Diff line number Diff line change
Expand Up @@ -110,8 +110,8 @@ static int dump__cfg_chunk_process(struct mosquitto_db *db, FILE *db_fd, uint32_

memset(&chunk, 0, sizeof(struct PF_cfg));

if(db_version == 5){
rc = persist__chunk_cfg_read_v5(db_fd, &chunk);
if(db_version == 6 || db_version == 5){
rc = persist__chunk_cfg_read_v56(db_fd, &chunk);
}else{
rc = persist__chunk_cfg_read_v234(db_fd, &chunk);
}
Expand Down Expand Up @@ -147,8 +147,8 @@ static int dump__client_chunk_process(struct mosquitto_db *db, FILE *db_fd, uint

memset(&chunk, 0, sizeof(struct P_client));

if(db_version == 5){
rc = persist__chunk_client_read_v5(db_fd, &chunk);
if(db_version == 6 || db_version == 5){
rc = persist__chunk_client_read_v56(db_fd, &chunk, db_version);
}else{
rc = persist__chunk_client_read_v234(db_fd, &chunk, db_version);
}
Expand Down Expand Up @@ -189,8 +189,8 @@ static int dump__client_msg_chunk_process(struct mosquitto_db *db, FILE *db_fd,
client_msg_count++;

memset(&chunk, 0, sizeof(struct P_client_msg));
if(db_version == 5){
rc = persist__chunk_client_msg_read_v5(db_fd, &chunk, length);
if(db_version == 6 || db_version == 5){
rc = persist__chunk_client_msg_read_v56(db_fd, &chunk, length);
}else{
rc = persist__chunk_client_msg_read_v234(db_fd, &chunk);
}
Expand Down Expand Up @@ -234,8 +234,8 @@ static int dump__msg_store_chunk_process(struct mosquitto_db *db, FILE *db_fptr,
msg_store_count++;

memset(&chunk, 0, sizeof(struct P_msg_store));
if(db_version == 5){
rc = persist__chunk_msg_store_read_v5(db_fptr, &chunk, length);
if(db_version == 6 || db_version == 5){
rc = persist__chunk_msg_store_read_v56(db_fptr, &chunk, length);
}else{
rc = persist__chunk_msg_store_read_v234(db_fptr, &chunk, db_version);
}
Expand Down Expand Up @@ -321,8 +321,8 @@ static int dump__retain_chunk_process(struct mosquitto_db *db, FILE *db_fd, uint
if(do_print) printf("DB_CHUNK_RETAIN:\n");
if(do_print) printf("\tLength: %d\n", length);

if(db_version == 5){
rc = persist__chunk_retain_read_v5(db_fd, &chunk);
if(db_version == 6 || db_version == 5){
rc = persist__chunk_retain_read_v56(db_fd, &chunk);
}else{
rc = persist__chunk_retain_read_v234(db_fd, &chunk);
}
Expand All @@ -345,8 +345,8 @@ static int dump__sub_chunk_process(struct mosquitto_db *db, FILE *db_fd, uint32_
sub_count++;

memset(&chunk, 0, sizeof(struct P_sub));
if(db_version == 5){
rc = persist__chunk_sub_read_v5(db_fd, &chunk);
if(db_version == 6 || db_version == 5){
rc = persist__chunk_sub_read_v56(db_fd, &chunk);
}else{
rc = persist__chunk_sub_read_v234(db_fd, &chunk);
}
Expand Down
6 changes: 6 additions & 0 deletions src/db_dump/print.c
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,12 @@ void print__client(struct P_client *chunk, int length)
printf("DB_CHUNK_CLIENT:\n");
printf("\tLength: %d\n", length);
printf("\tClient ID: %s\n", chunk->client_id);
if(chunk->username){
printf("\tUsername: %s\n", chunk->username);
}
if(chunk->F.listener_port > 0){
printf("\tListener port: %u\n", chunk->F.listener_port);
}
printf("\tLast MID: %d\n", chunk->F.last_mid);
printf("\tSession expiry time: %" PRIu64 "\n", chunk->F.session_expiry_time);
printf("\tSession expiry interval: %u\n", chunk->F.session_expiry_interval);
Expand Down
12 changes: 12 additions & 0 deletions src/mosquitto.c
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,18 @@ int main(int argc, char *argv[])
rc = mosquitto_security_init(&int_db, false);
if(rc) return rc;

/* After loading persisted clients and ACLs, try to associate them,
* so persisted subscriptions can start storing messages */
HASH_ITER(hh_id, int_db.contexts_by_id, ctxt, ctxt_tmp){
if(ctxt && !ctxt->clean_start && ctxt->username){
rc = acl__find_acls(&int_db, ctxt);
if(rc){
log__printf(NULL, MOSQ_LOG_WARNING, "Failed to associate persisted user %s with ACLs, "
"likely due to changed ports while using a per_listener_settings configuration.", ctxt->username);
}
}
}

#ifdef WITH_SYS_TREE
sys_tree__init(&int_db);
#endif
Expand Down
43 changes: 28 additions & 15 deletions src/persist.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ and the Eclipse Distribution License is available at
#ifndef PERSIST_H
#define PERSIST_H

#define MOSQ_DB_VERSION 5
#define MOSQ_DB_VERSION 6

/* DB read/write */
extern const unsigned char magic[15];
Expand Down Expand Up @@ -57,15 +57,28 @@ struct PF_cfg{
uint8_t dbid_size;
};

struct PF_client_v5{
int64_t session_expiry_time;
uint32_t session_expiry_interval;
uint16_t last_mid;
uint16_t id_len;
};
struct PF_client{
/* struct PF_client_v5; */
int64_t session_expiry_time;
uint32_t session_expiry_interval;
uint16_t last_mid;
uint16_t id_len;

uint16_t listener_port;
uint16_t username_len;
/* tail: 4 byte padding, because 64bit member
* forces multiple of 8 for struct size */
};
struct P_client{
struct PF_client F;
char *client_id;
char *username;
};


Expand Down Expand Up @@ -141,19 +154,19 @@ int persist__chunk_msg_store_read_v234(FILE *db_fptr, struct P_msg_store *chunk,
int persist__chunk_retain_read_v234(FILE *db_fptr, struct P_retain *chunk);
int persist__chunk_sub_read_v234(FILE *db_fptr, struct P_sub *chunk);

int persist__chunk_header_read_v5(FILE *db_fptr, int *chunk, int *length);
int persist__chunk_cfg_read_v5(FILE *db_fptr, struct PF_cfg *chunk);
int persist__chunk_client_read_v5(FILE *db_fptr, struct P_client *chunk);
int persist__chunk_client_msg_read_v5(FILE *db_fptr, struct P_client_msg *chunk, uint32_t length);
int persist__chunk_msg_store_read_v5(FILE *db_fptr, struct P_msg_store *chunk, uint32_t length);
int persist__chunk_retain_read_v5(FILE *db_fptr, struct P_retain *chunk);
int persist__chunk_sub_read_v5(FILE *db_fptr, struct P_sub *chunk);

int persist__chunk_cfg_write_v5(FILE *db_fptr, struct PF_cfg *chunk);
int persist__chunk_client_write_v5(FILE *db_fptr, struct P_client *chunk);
int persist__chunk_client_msg_write_v5(FILE *db_fptr, struct P_client_msg *chunk);
int persist__chunk_message_store_write_v5(FILE *db_fptr, struct P_msg_store *chunk);
int persist__chunk_retain_write_v5(FILE *db_fptr, struct P_retain *chunk);
int persist__chunk_sub_write_v5(FILE *db_fptr, struct P_sub *chunk);
int persist__chunk_header_read_v56(FILE *db_fptr, int *chunk, int *length);
int persist__chunk_cfg_read_v56(FILE *db_fptr, struct PF_cfg *chunk);
int persist__chunk_client_read_v56(FILE *db_fptr, struct P_client *chunk, int db_version);
int persist__chunk_client_msg_read_v56(FILE *db_fptr, struct P_client_msg *chunk, uint32_t length);
int persist__chunk_msg_store_read_v56(FILE *db_fptr, struct P_msg_store *chunk, uint32_t length);
int persist__chunk_retain_read_v56(FILE *db_fptr, struct P_retain *chunk);
int persist__chunk_sub_read_v56(FILE *db_fptr, struct P_sub *chunk);

int persist__chunk_cfg_write_v6(FILE *db_fptr, struct PF_cfg *chunk);
int persist__chunk_client_write_v6(FILE *db_fptr, struct P_client *chunk);
int persist__chunk_client_msg_write_v6(FILE *db_fptr, struct P_client_msg *chunk);
int persist__chunk_message_store_write_v6(FILE *db_fptr, struct P_msg_store *chunk);
int persist__chunk_retain_write_v6(FILE *db_fptr, struct P_retain *chunk);
int persist__chunk_sub_write_v6(FILE *db_fptr, struct P_sub *chunk);

#endif
52 changes: 35 additions & 17 deletions src/persist_read.c
Original file line number Diff line number Diff line change
Expand Up @@ -175,14 +175,14 @@ static int persist__client_msg_restore(struct mosquitto_db *db, struct P_client_

static int persist__client_chunk_restore(struct mosquitto_db *db, FILE *db_fptr)
{
int rc = 0;
int i, rc = 0;
struct mosquitto *context;
struct P_client chunk;

memset(&chunk, 0, sizeof(struct P_client));

if(db_version == 5){
rc = persist__chunk_client_read_v5(db_fptr, &chunk);
if(db_version == 6 || db_version == 5){
rc = persist__chunk_client_read_v56(db_fptr, &chunk, db_version);
}else{
rc = persist__chunk_client_read_v234(db_fptr, &chunk, db_version);
}
Expand All @@ -195,13 +195,29 @@ static int persist__client_chunk_restore(struct mosquitto_db *db, FILE *db_fptr)
if(context){
context->session_expiry_time = chunk.F.session_expiry_time;
context->session_expiry_interval = chunk.F.session_expiry_interval;
if(chunk.username && !context->username){
/* username is not freed here, it is now owned by context */
context->username = chunk.username;
chunk.username = NULL;
/* in per_listener_settings mode, try to find the listener by persisted port */
if(db->config->per_listener_settings && !context->listener && chunk.F.listener_port > 0){
for(i=0; i < db->config->listener_count; i++){
if(db->config->listeners[i].port == chunk.F.listener_port){
context->listener = &db->config->listeners[i];
break;
}
}
}
}
/* FIXME - we should expire clients here if they have exceeded their time */
}else{
rc = 1;
}

mosquitto__free(chunk.client_id);

if(chunk.username){
mosquitto__free(chunk.username);
}
return rc;
}

Expand All @@ -213,8 +229,8 @@ static int persist__client_msg_chunk_restore(struct mosquitto_db *db, FILE *db_f

memset(&chunk, 0, sizeof(struct P_client_msg));

if(db_version == 5){
rc = persist__chunk_client_msg_read_v5(db_fptr, &chunk, length);
if(db_version == 6 || db_version == 5){
rc = persist__chunk_client_msg_read_v56(db_fptr, &chunk, length);
}else{
rc = persist__chunk_client_msg_read_v234(db_fptr, &chunk);
}
Expand Down Expand Up @@ -242,8 +258,8 @@ static int persist__msg_store_chunk_restore(struct mosquitto_db *db, FILE *db_fp

memset(&chunk, 0, sizeof(struct P_msg_store));

if(db_version == 5){
rc = persist__chunk_msg_store_read_v5(db_fptr, &chunk, length);
if(db_version == 6 || db_version == 5){
rc = persist__chunk_msg_store_read_v56(db_fptr, &chunk, length);
}else{
rc = persist__chunk_msg_store_read_v234(db_fptr, &chunk, db_version);
}
Expand Down Expand Up @@ -320,8 +336,8 @@ static int persist__retain_chunk_restore(struct mosquitto_db *db, FILE *db_fptr)

memset(&chunk, 0, sizeof(struct P_retain));

if(db_version == 5){
rc = persist__chunk_retain_read_v5(db_fptr, &chunk);
if(db_version == 6 || db_version == 5){
rc = persist__chunk_retain_read_v56(db_fptr, &chunk);
}else{
rc = persist__chunk_retain_read_v234(db_fptr, &chunk);
}
Expand All @@ -346,8 +362,8 @@ static int persist__sub_chunk_restore(struct mosquitto_db *db, FILE *db_fptr)

memset(&chunk, 0, sizeof(struct P_sub));

if(db_version == 5){
rc = persist__chunk_sub_read_v5(db_fptr, &chunk);
if(db_version == 6 || db_version == 5){
rc = persist__chunk_sub_read_v56(db_fptr, &chunk);
}else{
rc = persist__chunk_sub_read_v234(db_fptr, &chunk);
}
Expand All @@ -367,8 +383,8 @@ static int persist__sub_chunk_restore(struct mosquitto_db *db, FILE *db_fptr)

int persist__chunk_header_read(FILE *db_fptr, int *chunk, int *length)
{
if(db_version == 5){
return persist__chunk_header_read_v5(db_fptr, chunk, length);
if(db_version == 6 || db_version == 5){
return persist__chunk_header_read_v56(db_fptr, chunk, length);
}else{
return persist__chunk_header_read_v234(db_fptr, chunk, length);
}
Expand Down Expand Up @@ -416,7 +432,9 @@ int persist__restore(struct mosquitto_db *db)
* Is your DB change still compatible with previous versions?
*/
if(db_version != MOSQ_DB_VERSION){
if(db_version == 4){
if(db_version == 5){
/* Addition of username and listener_port to client chunk in v6 */
}else if(db_version == 4){
}else if(db_version == 3){
/* Addition of source_username and source_port to msg_store chunk in v4, v1.5.6 */
}else if(db_version == 2){
Expand All @@ -431,8 +449,8 @@ int persist__restore(struct mosquitto_db *db)
while(persist__chunk_header_read(fptr, &chunk, &length) == MOSQ_ERR_SUCCESS){
switch(chunk){
case DB_CHUNK_CFG:
if(db_version == 5){
if(persist__chunk_cfg_read_v5(fptr, &cfg_chunk)){
if(db_version == 6 || db_version == 5){
if(persist__chunk_cfg_read_v56(fptr, &cfg_chunk)){
fclose(fptr);
return 1;
}
Expand Down
38 changes: 28 additions & 10 deletions src/persist_read_v5.c
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ and the Eclipse Distribution License is available at
#include "util_mosq.h"


int persist__chunk_header_read_v5(FILE *db_fptr, int *chunk, int *length)
int persist__chunk_header_read_v56(FILE *db_fptr, int *chunk, int *length)
{
size_t rlen;
struct PF_header header;
Expand All @@ -53,7 +53,7 @@ int persist__chunk_header_read_v5(FILE *db_fptr, int *chunk, int *length)
}


int persist__chunk_cfg_read_v5(FILE *db_fptr, struct PF_cfg *chunk)
int persist__chunk_cfg_read_v56(FILE *db_fptr, struct PF_cfg *chunk)
{
if(fread(chunk, sizeof(struct PF_cfg), 1, db_fptr) != 1){
log__printf(NULL, MOSQ_LOG_ERR, "Error: %s.", strerror(errno));
Expand All @@ -64,28 +64,46 @@ int persist__chunk_cfg_read_v5(FILE *db_fptr, struct PF_cfg *chunk)
}


int persist__chunk_client_read_v5(FILE *db_fptr, struct P_client *chunk)
int persist__chunk_client_read_v56(FILE *db_fptr, struct P_client *chunk, int db_version)
{
int rc;

read_e(db_fptr, &chunk->F, sizeof(struct PF_client));
if(db_version == 6){
read_e(db_fptr, &chunk->F, sizeof(struct PF_client));
chunk->F.username_len = ntohs(chunk->F.username_len);
chunk->F.listener_port = ntohs(chunk->F.listener_port);
}else if(db_version == 5){
read_e(db_fptr, &chunk->F, sizeof(struct PF_client_v5));
}else{
return 1;
}

chunk->F.session_expiry_interval = ntohl(chunk->F.session_expiry_interval);
chunk->F.last_mid = ntohs(chunk->F.last_mid);
chunk->F.id_len = ntohs(chunk->F.id_len);


rc = persist__read_string_len(db_fptr, &chunk->client_id, chunk->F.id_len);
if(rc || !chunk->client_id){
return 1;
}else{
return MOSQ_ERR_SUCCESS;
}

if(chunk->F.username_len > 0){
rc = persist__read_string_len(db_fptr, &chunk->username, chunk->F.username_len);
if(rc || !chunk->username){
mosquitto__free(chunk->client_id);
return 1;
}
}

return MOSQ_ERR_SUCCESS;
error:
log__printf(NULL, MOSQ_LOG_ERR, "Error: %s.", strerror(errno));
return 1;
}


int persist__chunk_client_msg_read_v5(FILE *db_fptr, struct P_client_msg *chunk, uint32_t length)
int persist__chunk_client_msg_read_v56(FILE *db_fptr, struct P_client_msg *chunk, uint32_t length)
{
mosquitto_property *properties = NULL;
struct mosquitto__packet prop_packet;
Expand Down Expand Up @@ -125,7 +143,7 @@ int persist__chunk_client_msg_read_v5(FILE *db_fptr, struct P_client_msg *chunk,
}


int persist__chunk_msg_store_read_v5(FILE *db_fptr, struct P_msg_store *chunk, uint32_t length)
int persist__chunk_msg_store_read_v56(FILE *db_fptr, struct P_msg_store *chunk, uint32_t length)
{
int rc = 0;
mosquitto_property *properties = NULL;
Expand Down Expand Up @@ -215,7 +233,7 @@ int persist__chunk_msg_store_read_v5(FILE *db_fptr, struct P_msg_store *chunk, u
}


int persist__chunk_retain_read_v5(FILE *db_fptr, struct P_retain *chunk)
int persist__chunk_retain_read_v56(FILE *db_fptr, struct P_retain *chunk)
{
if(fread(&chunk->F, sizeof(struct P_retain), 1, db_fptr) != 1){
log__printf(NULL, MOSQ_LOG_ERR, "Error: %s.", strerror(errno));
Expand All @@ -225,7 +243,7 @@ int persist__chunk_retain_read_v5(FILE *db_fptr, struct P_retain *chunk)
}


int persist__chunk_sub_read_v5(FILE *db_fptr, struct P_sub *chunk)
int persist__chunk_sub_read_v56(FILE *db_fptr, struct P_sub *chunk)
{
int rc;

Expand Down
Loading

0 comments on commit 43df213

Please sign in to comment.