Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix persist plugin expiry #3016

Open
wants to merge 6 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Prev Previous commit
Next Next commit
Fixed deletion of expired session after restore from persistence plugin
Signed-off-by: Norbert Heusser <[email protected]>
  • Loading branch information
NorbertHeusser committed Mar 12, 2024
commit b8564daa20c39569ec990596ae24f034e4b7bf29
18 changes: 18 additions & 0 deletions plugins/persist-sqlite/base_msgs.c
Original file line number Diff line number Diff line change
Expand Up @@ -215,3 +215,21 @@ int persist_sqlite__base_msg_remove_cb(int event, void *event_data, void *userda

return rc;
}

int persist_sqlite__base_msg_clear(struct mosquitto_sqlite *ms, const char *clientid)
{
int rc = MOSQ_ERR_UNKNOWN;

if(sqlite3_bind_text(ms->base_msg_remove_for_clientid_stmt, 1, clientid, (int)strlen(clientid), SQLITE_STATIC) == SQLITE_OK){
ms->event_count++;
rc = sqlite3_step(ms->base_msg_remove_for_clientid_stmt);
if(rc == SQLITE_DONE){
rc = MOSQ_ERR_SUCCESS;
}else{
rc = MOSQ_ERR_UNKNOWN;
}
}
sqlite3_reset(ms->base_msg_remove_for_clientid_stmt);

return rc;
}
7 changes: 6 additions & 1 deletion plugins/persist-sqlite/clients.c
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,11 @@ int persist_sqlite__client_remove_cb(int event, void *event_data, void *userdata
rc = MOSQ_ERR_UNKNOWN;
}
}

/* Delete base msgs before deletion of client_msgs as the query will iterate over the client_msgs table */
persist_sqlite__base_msg_clear(ms, ed->data.clientid);
persist_sqlite__client_msg_clear(ms, ed->data.clientid);

if(sqlite3_bind_text(ms->client_remove_stmt, 1,
ed->data.clientid, (int)strlen(ed->data.clientid), SQLITE_STATIC) == SQLITE_OK){

Expand All @@ -101,7 +106,6 @@ int persist_sqlite__client_remove_cb(int event, void *event_data, void *userdata
rc = MOSQ_ERR_UNKNOWN;
}
}
persist_sqlite__client_msg_clear(ms, ed->data.clientid);

return rc;
}
Expand Down Expand Up @@ -133,3 +137,4 @@ int persist_sqlite__client_update_cb(int event, void *event_data, void *userdata

return rc;
}

24 changes: 23 additions & 1 deletion plugins/persist-sqlite/init.c
Original file line number Diff line number Diff line change
Expand Up @@ -141,8 +141,19 @@ static int create_tables(struct mosquitto_sqlite *ms)
"CREATE INDEX IF NOT EXISTS client_msgs_client_id ON client_msgs(client_id);",
NULL, NULL, NULL);
if(rc) goto fail;

rc = sqlite3_exec(ms->db,
"CREATE INDEX IF NOT EXISTS client_msgs_store_id ON client_msgs(store_id);",
"DROP INDEX IF EXISTS client_msgs_store_id;",
NULL, NULL, NULL);
if(rc) goto fail;

rc = sqlite3_exec(ms->db,
"CREATE INDEX IF NOT EXISTS client_msgs_store_id ON client_msgs(store_id,client_id);",
NULL, NULL, NULL);
if(rc) goto fail;

rc = sqlite3_exec(ms->db,
"CREATE INDEX IF NOT EXISTS retains_storeid ON retains(store_id);",
NULL, NULL, NULL);
if(rc) goto fail;

Expand Down Expand Up @@ -294,6 +305,17 @@ static int prepare_statements(struct mosquitto_sqlite *ms)
&ms->base_msg_remove_stmt, NULL);
if(rc) goto fail;

rc = sqlite3_prepare_v3(ms->db,
"DELETE FROM base_msgs AS bm "
"WHERE bm.store_id IN "
"( SELECT cm.store_id FROM client_msgs AS cm"
" LEFT OUTER JOIN client_msgs AS oc ON oc.store_id = cm.store_id AND oc.client_id != cm.client_id"
" LEFT OUTER JOIN retains AS rm ON rm.store_id = cm.store_id"
" WHERE cm.client_id = ? AND oc.store_id IS NULL AND rm.store_id IS NULL)",
-1, SQLITE_PREPARE_PERSISTENT,
&ms->base_msg_remove_for_clientid_stmt, NULL);
if(rc) goto fail;

rc = sqlite3_prepare_v3(ms->db,
"SELECT store_id, expiry_time, topic, payload, source_id, source_username, "
"payloadlen, source_mid, source_port, qos, retain, properties "
Expand Down
2 changes: 2 additions & 0 deletions plugins/persist-sqlite/persist_sqlite.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ struct mosquitto_sqlite {
sqlite3_stmt *client_msg_clear_all_stmt;
sqlite3_stmt *base_msg_add_stmt;
sqlite3_stmt *base_msg_remove_stmt;
sqlite3_stmt *base_msg_remove_for_clientid_stmt;
sqlite3_stmt *base_msg_load_stmt;
sqlite3_stmt *retain_msg_set_stmt;
sqlite3_stmt *retain_msg_remove_stmt;
Expand Down Expand Up @@ -70,6 +71,7 @@ int persist_sqlite__client_msg_update_cb(int event, void *event_data, void *user
int persist_sqlite__base_msg_add_cb(int event, void *event_data, void *userdata);
int persist_sqlite__base_msg_load_cb(int event, void *event_data, void *userdata);
int persist_sqlite__base_msg_remove_cb(int event, void *event_data, void *userdata);
int persist_sqlite__base_msg_clear(struct mosquitto_sqlite *ms, const char *clientid);
int persist_sqlite__retain_msg_set_cb(int event, void *event_data, void *userdata);
int persist_sqlite__retain_msg_remove_cb(int event, void *event_data, void *userdata);
int persist_sqlite__subscription_add_cb(int event, void *event_data, void *userdata);
Expand Down
1 change: 1 addition & 0 deletions src/mosquitto.c
Original file line number Diff line number Diff line change
Expand Up @@ -426,6 +426,7 @@ int main(int argc, char *argv[])
if(rc) return rc;

plugin_persist__handle_restore();
session_expiry__check();
db__msg_store_compact();

/* After loading persisted clients and ACLs, try to associate them,
Expand Down
1 change: 1 addition & 0 deletions src/plugin_public.c
Original file line number Diff line number Diff line change
Expand Up @@ -517,6 +517,7 @@ BROKER_EXPORT int mosquitto_persist_client_add(struct mosquitto_client *client)
}

context__add_to_by_id(context);
session_expiry__add_from_persistence(context,context->session_expiry_time);

return MOSQ_ERR_SUCCESS;
error:
Expand Down
4 changes: 2 additions & 2 deletions src/session_expiry.c
Original file line number Diff line number Diff line change
Expand Up @@ -151,9 +151,9 @@ void session_expiry__check(void)
struct mosquitto *context;
time_t timeout;

if(db.now_real_s <= last_check){
if(last_check != 0 && db.now_real_s <= last_check){
if(expiry_list){
/* Next event is the first item of the list, we must set the timeout even if we aren't
/* Next event is the first item of the list, we must set the timeout even if we aren't
* checking the full list */
timeout = (expiry_list->context->session_expiry_time - db.now_real_s) * 1000;
if(timeout <= 0){
Expand Down
216 changes: 216 additions & 0 deletions test/broker/15-persist-client-expired-session.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,216 @@
#!/usr/bin/env python3

# Connect a client, add a subscription, disconnect, send a message with a
# different client, restore, reconnect, check it is received.

from mosq_test_helper import *

persist_help = persist_module()

port = mosq_test.get_port()

num_messages = 100


def do_test(
test_case_name: str, additional_config_entries: dict, message_expiry_interval: int
):
conf_file = os.path.basename(__file__).replace(".py", f"_{port}.conf")
persist_help.write_config(
conf_file,
port,
additional_config_entries=additional_config_entries,
)
persist_help.init(port)

client_id = "test-expired-session-subscriber"
username = "test-session-expiry"

qos = 1
topic = "client-msg/test"
source_id = "test-expired-session-publisher"
proto_ver = 5

connect_packet = mosq_test.gen_connect(
client_id,
username=username,
proto_ver=proto_ver,
clean_session=False,
session_expiry=60,
)
connack_packet = mosq_test.gen_connack(rc=0, proto_ver=proto_ver)

mid = 1
subscribe_packet = mosq_test.gen_subscribe(mid, topic, qos, proto_ver=proto_ver)
suback_packet = mosq_test.gen_suback(mid, qos=qos, proto_ver=proto_ver)

connect2_packet = mosq_test.gen_connect(
source_id, username=username, proto_ver=proto_ver
)

rc = 1

broker = mosq_test.start_broker(filename=conf_file, use_conf=True, port=port)

con = None
try:
sock = mosq_test.do_client_connect(
connect_packet, connack_packet, timeout=5, port=port
)
mosq_test.do_send_receive(sock, subscribe_packet, suback_packet, "suback")
sock.close()

sock = mosq_test.do_client_connect(
connect2_packet, connack_packet, timeout=5, port=port
)
props = (
mqtt5_props.gen_uint32_prop(
mqtt5_props.PROP_MESSAGE_EXPIRY_INTERVAL, message_expiry_interval
)
if message_expiry_interval > 0
else b""
)
for i in range(num_messages):
payload = f"queued message {i:3}"
mid = 10 + i
publish_packet = mosq_test.gen_publish(
topic,
mid=mid,
qos=qos,
payload=payload.encode("UTF-8"),
proto_ver=proto_ver,
properties=props,
)
puback_packet = mosq_test.gen_puback(mid=mid, proto_ver=proto_ver)
mosq_test.do_send_receive(sock, publish_packet, puback_packet, "puback")
sock.close()

# Terminate the broker
(broker_terminate_rc, stde) = mosq_test.terminate_broker(broker)
broker = None

persist_help.check_counts(
port,
clients=1,
client_msgs_out=num_messages,
base_msgs=num_messages,
subscriptions=1,
)

# Check client
persist_help.check_client(
port,
client_id,
username=username,
will_delay_time=0,
session_expiry_time=60,
listener_port=port,
max_packet_size=0,
max_qos=2,
retain_available=1,
session_expiry_interval=60,
will_delay_interval=0,
)

# Check subscription
persist_help.check_subscription(port, client_id, topic, qos, 0)

# Check stored message
for i in range(num_messages):
payload = f"queued message {i:3}"
payload_b = payload.encode("UTF-8")
mid = 10 + i
store_id = persist_help.check_base_msg(
port,
message_expiry_interval,
topic,
payload_b,
source_id,
username,
len(payload_b),
mid,
port,
qos,
retain=0,
idx=i,
)

# Check client msg
subscriber_mid = 1 + i
cmsg_id = 1 + i
persist_help.check_client_msg(
port,
client_id,
cmsg_id,
store_id,
0,
persist_help.dir_out,
subscriber_mid,
qos,
0,
persist_help.ms_queued,
idx=i,
)

# Put session expiry_time into the past
assert persist_help.modify_client(port, client_id, sub_expiry_time=120) == 1

# Restart broker
broker = mosq_test.start_broker(filename=conf_file, use_conf=True, port=port)

# Connect client again, it should have a session, but all queued messages should be dropped
sock = mosq_test.do_client_connect(
connect_packet,
connack_packet,
timeout=5,
port=port,
)

# Send ping and wait for the PINGRESP to make sure the broker will not send a queued message instead
mosq_test.do_ping(sock)
sock.close()

(broker_terminate_rc, stde) = mosq_test.terminate_broker(broker)
broker = None

persist_help.check_counts(
port,
clients=1,
client_msgs_out=0,
base_msgs=0,
subscriptions=0,
)

rc = broker_terminate_rc
finally:
if broker is not None:
broker.terminate()
if mosq_test.wait_for_subprocess(broker):
if rc == 0:
rc = 1
(_, stde) = broker.communicate()
os.remove(conf_file)
rc += persist_help.cleanup(port)

print(f"{test_case_name}")
if rc:
print(stde.decode("utf-8"))
assert rc == 0, f"rc: {rc}"


memory_queue_config = {
"log_type": "all",
"max_queued_messages": num_messages,
}


do_test(
"memory queue, message expiry interval: 0",
additional_config_entries=memory_queue_config,
message_expiry_interval=0,
)
do_test(
"memory queue, message expiry interval: 120",
additional_config_entries=memory_queue_config,
message_expiry_interval=120,
)
1 change: 1 addition & 0 deletions test/broker/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,7 @@ endif
PERSIST_TESTS = \
./15-persist-bridge-queue.py \
./15-persist-client-drop-expired-messages.py \
./15-persist-client-expired-session.py \
./15-persist-client-msg-in-v3-1-1.py \
./15-persist-client-msg-in-v5-0.py \
./15-persist-client-msg-modify-acl.py \
Expand Down