Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix persist plugin expiry #3016

Open
wants to merge 6 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
18 changes: 18 additions & 0 deletions plugins/persist-sqlite/base_msgs.c
Original file line number Diff line number Diff line change
Expand Up @@ -215,3 +215,21 @@

return rc;
}

int persist_sqlite__base_msg_clear(struct mosquitto_sqlite *ms, const char *clientid)
{
int rc = MOSQ_ERR_UNKNOWN;

if(sqlite3_bind_text(ms->base_msg_remove_for_clientid_stmt, 1, clientid, (int)strlen(clientid), SQLITE_STATIC) == SQLITE_OK){
ms->event_count++;
rc = sqlite3_step(ms->base_msg_remove_for_clientid_stmt);
if(rc == SQLITE_DONE){
rc = MOSQ_ERR_SUCCESS;
}else{
rc = MOSQ_ERR_UNKNOWN;

Check warning on line 229 in plugins/persist-sqlite/base_msgs.c

View check run for this annotation

Codecov / codecov/patch

plugins/persist-sqlite/base_msgs.c#L229

Added line #L229 was not covered by tests
}
}
sqlite3_reset(ms->base_msg_remove_for_clientid_stmt);

return rc;
}
7 changes: 6 additions & 1 deletion plugins/persist-sqlite/clients.c
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,11 @@ int persist_sqlite__client_remove_cb(int event, void *event_data, void *userdata
rc = MOSQ_ERR_UNKNOWN;
}
}

/* Delete base msgs before deletion of client_msgs as the query will iterate over the client_msgs table */
persist_sqlite__base_msg_clear(ms, ed->data.clientid);
persist_sqlite__client_msg_clear(ms, ed->data.clientid);

if(sqlite3_bind_text(ms->client_remove_stmt, 1,
ed->data.clientid, (int)strlen(ed->data.clientid), SQLITE_STATIC) == SQLITE_OK){

Expand All @@ -101,7 +106,6 @@ int persist_sqlite__client_remove_cb(int event, void *event_data, void *userdata
rc = MOSQ_ERR_UNKNOWN;
}
}
persist_sqlite__client_msg_clear(ms, ed->data.clientid);

return rc;
}
Expand Down Expand Up @@ -133,3 +137,4 @@ int persist_sqlite__client_update_cb(int event, void *event_data, void *userdata

return rc;
}

24 changes: 23 additions & 1 deletion plugins/persist-sqlite/init.c
Original file line number Diff line number Diff line change
Expand Up @@ -141,8 +141,19 @@ static int create_tables(struct mosquitto_sqlite *ms)
"CREATE INDEX IF NOT EXISTS client_msgs_client_id ON client_msgs(client_id);",
NULL, NULL, NULL);
if(rc) goto fail;

rc = sqlite3_exec(ms->db,
"CREATE INDEX IF NOT EXISTS client_msgs_store_id ON client_msgs(store_id);",
"DROP INDEX IF EXISTS client_msgs_store_id;",
NULL, NULL, NULL);
if(rc) goto fail;

rc = sqlite3_exec(ms->db,
"CREATE INDEX IF NOT EXISTS client_msgs_store_id ON client_msgs(store_id,client_id);",
NULL, NULL, NULL);
if(rc) goto fail;

rc = sqlite3_exec(ms->db,
"CREATE INDEX IF NOT EXISTS retains_storeid ON retains(store_id);",
NULL, NULL, NULL);
if(rc) goto fail;

Expand Down Expand Up @@ -294,6 +305,17 @@ static int prepare_statements(struct mosquitto_sqlite *ms)
&ms->base_msg_remove_stmt, NULL);
if(rc) goto fail;

rc = sqlite3_prepare_v3(ms->db,
"DELETE FROM base_msgs AS bm "
"WHERE bm.store_id IN "
"( SELECT cm.store_id FROM client_msgs AS cm"
" LEFT OUTER JOIN client_msgs AS oc ON oc.store_id = cm.store_id AND oc.client_id != cm.client_id"
" LEFT OUTER JOIN retains AS rm ON rm.store_id = cm.store_id"
" WHERE cm.client_id = ? AND oc.store_id IS NULL AND rm.store_id IS NULL)",
-1, SQLITE_PREPARE_PERSISTENT,
&ms->base_msg_remove_for_clientid_stmt, NULL);
if(rc) goto fail;

rc = sqlite3_prepare_v3(ms->db,
"SELECT store_id, expiry_time, topic, payload, source_id, source_username, "
"payloadlen, source_mid, source_port, qos, retain, properties "
Expand Down
2 changes: 2 additions & 0 deletions plugins/persist-sqlite/persist_sqlite.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ struct mosquitto_sqlite {
sqlite3_stmt *client_msg_clear_all_stmt;
sqlite3_stmt *base_msg_add_stmt;
sqlite3_stmt *base_msg_remove_stmt;
sqlite3_stmt *base_msg_remove_for_clientid_stmt;
sqlite3_stmt *base_msg_load_stmt;
sqlite3_stmt *retain_msg_set_stmt;
sqlite3_stmt *retain_msg_remove_stmt;
Expand Down Expand Up @@ -70,6 +71,7 @@ int persist_sqlite__client_msg_update_cb(int event, void *event_data, void *user
int persist_sqlite__base_msg_add_cb(int event, void *event_data, void *userdata);
int persist_sqlite__base_msg_load_cb(int event, void *event_data, void *userdata);
int persist_sqlite__base_msg_remove_cb(int event, void *event_data, void *userdata);
int persist_sqlite__base_msg_clear(struct mosquitto_sqlite *ms, const char *clientid);
int persist_sqlite__retain_msg_set_cb(int event, void *event_data, void *userdata);
int persist_sqlite__retain_msg_remove_cb(int event, void *event_data, void *userdata);
int persist_sqlite__subscription_add_cb(int event, void *event_data, void *userdata);
Expand Down
20 changes: 17 additions & 3 deletions src/database.c
Original file line number Diff line number Diff line change
Expand Up @@ -380,6 +380,10 @@
client_msg->data.state = mosq_ms_publish_qos2;
break;
}
if(client_msg->base_msg->data.expiry_time && db.now_real_s > client_msg->base_msg->data.expiry_time){
db__message_remove_queued(context, &context->msgs_out, client_msg);
continue;
}
plugin_persist__handle_client_msg_update(context, client_msg);
db__message_dequeue_first(context, &context->msgs_out);
}
Expand Down Expand Up @@ -942,9 +946,11 @@
}
base_msg->origin = origin;
if(message_expiry_interval){
base_msg->data.expiry_time = db.now_real_s + (*message_expiry_interval);
}else{
base_msg->data.expiry_time = 0;
if(*message_expiry_interval > 0){
base_msg->data.expiry_time = db.now_real_s + *message_expiry_interval;
}else{
base_msg->data.expiry_time = 0;
}
}

base_msg->dest_ids = NULL;
Expand Down Expand Up @@ -1195,6 +1201,12 @@
}


void db__retain_expiry_check()
{
retain__expiry_check(&db.retains);
}


void db__expire_all_messages(struct mosquitto *context)
{
struct mosquitto__client_msg *client_msg, *tmp;
Expand All @@ -1207,6 +1219,7 @@
db__message_remove_inflight(context, &context->msgs_out, client_msg);
}
}
db__fill_inflight_out_from_queue(context);
DL_FOREACH_SAFE(context->msgs_out.queued, client_msg, tmp){
if(client_msg->base_msg->data.expiry_time && db.now_real_s > client_msg->base_msg->data.expiry_time){
db__message_remove_queued(context, &context->msgs_out, client_msg);
Expand Down Expand Up @@ -1289,6 +1302,7 @@
util__increment_send_quota(context);
}
db__message_remove_inflight(context, &context->msgs_out, client_msg);
db__fill_inflight_out_from_queue(context);

Check warning on line 1305 in src/database.c

View check run for this annotation

Codecov / codecov/patch

src/database.c#L1305

Added line #L1305 was not covered by tests
return MOSQ_ERR_SUCCESS;
}else{
expiry_interval = (uint32_t)(base_msg->data.expiry_time - db.now_real_s);
Expand Down
2 changes: 2 additions & 0 deletions src/mosquitto.c
Original file line number Diff line number Diff line change
Expand Up @@ -426,6 +426,8 @@ int main(int argc, char *argv[])
if(rc) return rc;

plugin_persist__handle_restore();
session_expiry__check();
db__retain_expiry_check();
db__msg_store_compact();

/* After loading persisted clients and ACLs, try to associate them,
Expand Down
3 changes: 2 additions & 1 deletion src/mosquitto_broker_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -732,6 +732,7 @@ int db__message_write_queued_in(struct mosquitto *context);
void db__msg_add_to_inflight_stats(struct mosquitto_msg_data *msg_data, struct mosquitto__client_msg *msg);
void db__msg_add_to_queued_stats(struct mosquitto_msg_data *msg_data, struct mosquitto__client_msg *msg);
uint64_t db__new_msg_id(void);
void db__retain_expiry_check(void);
void db__expire_all_messages(struct mosquitto *context);
void db__check_acl_of_all_messages(struct mosquitto *context);

Expand Down Expand Up @@ -889,7 +890,7 @@ int retain__init(void);
void retain__clean(struct mosquitto__retainhier **retainhier);
int retain__queue(struct mosquitto *context, const struct mosquitto_subscription *sub);
int retain__store(const char *topic, struct mosquitto__base_msg *base_msg, char **split_topics, bool persist);

void retain__expiry_check(struct mosquitto__retainhier **retainhier);
/* ============================================================
* Security related functions
* ============================================================ */
Expand Down
21 changes: 3 additions & 18 deletions src/plugin_public.c
Original file line number Diff line number Diff line change
Expand Up @@ -517,6 +517,7 @@ BROKER_EXPORT int mosquitto_persist_client_add(struct mosquitto_client *client)
}

context__add_to_by_id(context);
session_expiry__add_from_persistence(context,context->session_expiry_time);

return MOSQ_ERR_SUCCESS;
error:
Expand Down Expand Up @@ -750,9 +751,6 @@ BROKER_EXPORT int mosquitto_persist_base_msg_add(struct mosquitto_base_msg *msg_
{
struct mosquitto context;
struct mosquitto__base_msg *base_msg;
uint32_t message_expiry_interval;
uint32_t *p_message_expiry_interval;
time_t message_expiry_interval_tt;
int rc;

memset(&context, 0, sizeof(context));
Expand All @@ -766,25 +764,12 @@ BROKER_EXPORT int mosquitto_persist_base_msg_add(struct mosquitto_base_msg *msg_
context.id = (char *)msg_add->source_id;
context.username = (char *)msg_add->source_username;

p_message_expiry_interval = &message_expiry_interval;
if(msg_add->expiry_time == 0){
p_message_expiry_interval = NULL;
}else if(msg_add->expiry_time <= db.now_real_s){
message_expiry_interval = 0;
}else{
message_expiry_interval_tt = msg_add->expiry_time - db.now_real_s;
if(message_expiry_interval_tt > UINT32_MAX){
message_expiry_interval = UINT32_MAX;
}else{
message_expiry_interval = (uint32_t)message_expiry_interval_tt;
}
}

base_msg = mosquitto_calloc(1, sizeof(struct mosquitto__base_msg));
if(base_msg == NULL){
goto error;
}
base_msg->data.store_id = msg_add->store_id;
base_msg->data.expiry_time = msg_add->expiry_time;
base_msg->data.payloadlen = msg_add->payloadlen;
base_msg->data.source_mid = msg_add->source_mid;
base_msg->data.qos = msg_add->qos;
Expand All @@ -807,7 +792,7 @@ BROKER_EXPORT int mosquitto_persist_base_msg_add(struct mosquitto_base_msg *msg_
}

base_msg->stored = true;
rc = db__message_store(&context, base_msg, p_message_expiry_interval, mosq_mo_broker);
rc = db__message_store(&context, base_msg, NULL, mosq_mo_broker);
return rc;

error:
Expand Down
33 changes: 26 additions & 7 deletions src/retain.c
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,19 @@ int retain__store(const char *topic, struct mosquitto__base_msg *base_msg, char
return MOSQ_ERR_SUCCESS;
}

static bool retain__delete_expired_msg(struct mosquitto__retainhier *branch)
{
if(branch->retained && branch->retained->data.expiry_time > 0 && db.now_real_s >= branch->retained->data.expiry_time){
plugin_persist__handle_retain_msg_delete(branch->retained);
db__msg_store_ref_dec(&branch->retained);
branch->retained = NULL;
#ifdef WITH_SYS_TREE
db.retained_count--;
#endif
return true;
}
return false;
}

static int retain__process(struct mosquitto__retainhier *branch, struct mosquitto *context, const struct mosquitto_subscription *sub)
{
Expand All @@ -200,13 +213,7 @@ static int retain__process(struct mosquitto__retainhier *branch, struct mosquitt
uint16_t mid;
struct mosquitto__base_msg *retained;

if(branch->retained->data.expiry_time > 0 && db.now_real_s >= branch->retained->data.expiry_time){
plugin_persist__handle_retain_msg_delete(branch->retained);
db__msg_store_ref_dec(&branch->retained);
branch->retained = NULL;
#ifdef WITH_SYS_TREE
db.retained_count--;
#endif
if(retain__delete_expired_msg(branch)){
return MOSQ_ERR_SUCCESS;
}

Expand Down Expand Up @@ -344,6 +351,17 @@ int retain__queue(struct mosquitto *context, const struct mosquitto_subscription
return MOSQ_ERR_SUCCESS;
}

void retain__expiry_check(struct mosquitto__retainhier **retainhier)
{
struct mosquitto__retainhier *peer, *retainhier_tmp;

HASH_ITER(hh, *retainhier, peer, retainhier_tmp){
retain__expiry_check(&peer->children);
if (retain__delete_expired_msg(peer)){
retain__clean_empty_hierarchy(peer);
}
}
}

void retain__clean(struct mosquitto__retainhier **retainhier)
{
Expand All @@ -360,3 +378,4 @@ void retain__clean(struct mosquitto__retainhier **retainhier)
}
}


4 changes: 2 additions & 2 deletions src/session_expiry.c
Original file line number Diff line number Diff line change
Expand Up @@ -151,9 +151,9 @@ void session_expiry__check(void)
struct mosquitto *context;
time_t timeout;

if(db.now_real_s <= last_check){
if(last_check != 0 && db.now_real_s <= last_check){
if(expiry_list){
/* Next event is the first item of the list, we must set the timeout even if we aren't
/* Next event is the first item of the list, we must set the timeout even if we aren't
* checking the full list */
timeout = (expiry_list->context->session_expiry_time - db.now_real_s) * 1000;
if(timeout <= 0){
Expand Down