Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix persist plugin expiry #3016

Open
wants to merge 6 commits into
base: develop
Choose a base branch
from
Open
Prev Previous commit
Next Next commit
Fixed handling of expired messages after restore from persistent plugin
Signed-off-by: Norbert Heusser <[email protected]>
  • Loading branch information
NorbertHeusser committed Mar 12, 2024
commit 5c4505498e3289a00ee1386ff623da892169bdcc
14 changes: 11 additions & 3 deletions src/database.c
Original file line number Diff line number Diff line change
Expand Up @@ -380,6 +380,10 @@ static void db__fill_inflight_out_from_queue(struct mosquitto *context)
client_msg->data.state = mosq_ms_publish_qos2;
break;
}
if(client_msg->base_msg->data.expiry_time && db.now_real_s > client_msg->base_msg->data.expiry_time){
db__message_remove_queued(context, &context->msgs_out, client_msg);
continue;
}
plugin_persist__handle_client_msg_update(context, client_msg);
db__message_dequeue_first(context, &context->msgs_out);
}
Expand Down Expand Up @@ -942,9 +946,11 @@ int db__message_store(const struct mosquitto *source, struct mosquitto__base_msg
}
base_msg->origin = origin;
if(message_expiry_interval){
base_msg->data.expiry_time = db.now_real_s + (*message_expiry_interval);
}else{
base_msg->data.expiry_time = 0;
if(*message_expiry_interval > 0){
base_msg->data.expiry_time = db.now_real_s + *message_expiry_interval;
}else{
base_msg->data.expiry_time = 0;
}
}

base_msg->dest_ids = NULL;
Expand Down Expand Up @@ -1207,6 +1213,7 @@ void db__expire_all_messages(struct mosquitto *context)
db__message_remove_inflight(context, &context->msgs_out, client_msg);
}
}
db__fill_inflight_out_from_queue(context);
DL_FOREACH_SAFE(context->msgs_out.queued, client_msg, tmp){
if(client_msg->base_msg->data.expiry_time && db.now_real_s > client_msg->base_msg->data.expiry_time){
db__message_remove_queued(context, &context->msgs_out, client_msg);
Expand Down Expand Up @@ -1289,6 +1296,7 @@ static int db__message_write_inflight_out_single(struct mosquitto *context, stru
util__increment_send_quota(context);
}
db__message_remove_inflight(context, &context->msgs_out, client_msg);
db__fill_inflight_out_from_queue(context);
return MOSQ_ERR_SUCCESS;
}else{
expiry_interval = (uint32_t)(base_msg->data.expiry_time - db.now_real_s);
Expand Down
21 changes: 3 additions & 18 deletions src/plugin_public.c
Original file line number Diff line number Diff line change
Expand Up @@ -750,9 +750,7 @@ BROKER_EXPORT int mosquitto_persist_base_msg_add(struct mosquitto_base_msg *msg_
{
struct mosquitto context;
struct mosquitto__base_msg *base_msg;
uint32_t message_expiry_interval;
uint32_t *p_message_expiry_interval;
time_t message_expiry_interval_tt;
int i;
int rc;

memset(&context, 0, sizeof(context));
Expand All @@ -766,25 +764,12 @@ BROKER_EXPORT int mosquitto_persist_base_msg_add(struct mosquitto_base_msg *msg_
context.id = (char *)msg_add->source_id;
context.username = (char *)msg_add->source_username;

p_message_expiry_interval = &message_expiry_interval;
if(msg_add->expiry_time == 0){
p_message_expiry_interval = NULL;
}else if(msg_add->expiry_time <= db.now_real_s){
message_expiry_interval = 0;
}else{
message_expiry_interval_tt = msg_add->expiry_time - db.now_real_s;
if(message_expiry_interval_tt > UINT32_MAX){
message_expiry_interval = UINT32_MAX;
}else{
message_expiry_interval = (uint32_t)message_expiry_interval_tt;
}
}

base_msg = mosquitto_calloc(1, sizeof(struct mosquitto__base_msg));
if(base_msg == NULL){
goto error;
}
base_msg->data.store_id = msg_add->store_id;
base_msg->data.expiry_time = msg_add->expiry_time;
base_msg->data.payloadlen = msg_add->payloadlen;
base_msg->data.source_mid = msg_add->source_mid;
base_msg->data.qos = msg_add->qos;
Expand All @@ -807,7 +792,7 @@ BROKER_EXPORT int mosquitto_persist_base_msg_add(struct mosquitto_base_msg *msg_
}

base_msg->stored = true;
rc = db__message_store(&context, base_msg, p_message_expiry_interval, mosq_mo_broker);
rc = db__message_store(&context, base_msg, NULL, mosq_mo_broker);
return rc;

error:
Expand Down
197 changes: 197 additions & 0 deletions test/broker/15-persist-client-drop-expired-messages.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,197 @@
#!/usr/bin/env python3

# Connect a client, add a subscription, disconnect, send a message with a
# different client, restore, reconnect, check it is received.

from mosq_test_helper import *

persist_help = persist_module()

port = mosq_test.get_port()

num_messages = 100

def do_test(test_case_name: str, additional_config_entries: dict):
conf_file = os.path.basename(__file__).replace(".py", f"_{port}.conf")
persist_help.write_config(
conf_file,
port,
additional_config_entries=additional_config_entries,
)
persist_help.init(port)

client_id = "test-expired-messages-subscriber"
username = "test-message-expiry"

qos = 1
topic = "client-msg/test"
source_id = "test-expired-messages-publisher"
proto_ver = 5

connect_packet = mosq_test.gen_connect(
client_id, username=username, proto_ver=proto_ver, clean_session=False, session_expiry=60
)
connack_packet1 = mosq_test.gen_connack(rc=0, proto_ver=proto_ver)
connack_packet2 = mosq_test.gen_connack(rc=0, flags=1, proto_ver=proto_ver)

mid = 1
subscribe_packet = mosq_test.gen_subscribe(mid, topic, qos, proto_ver=proto_ver)
suback_packet = mosq_test.gen_suback(mid, qos=qos, proto_ver=proto_ver)

connect2_packet = mosq_test.gen_connect(
source_id, username=username, proto_ver=proto_ver
)
connack2_packet = mosq_test.gen_connack(rc=0, proto_ver=proto_ver)

rc = 1

broker = mosq_test.start_broker(
filename=conf_file, use_conf=True, port=port
)

con = None
try:
sock = mosq_test.do_client_connect(
connect_packet, connack_packet1, timeout=5, port=port
)
mosq_test.do_send_receive(sock, subscribe_packet, suback_packet, "suback")
sock.close()

sock = mosq_test.do_client_connect(
connect2_packet, connack2_packet, timeout=5, port=port
)
props = mqtt5_props.gen_uint32_prop(mqtt5_props.PROP_MESSAGE_EXPIRY_INTERVAL, 60)
for i in range(num_messages):
payload = f"queued message {i:3}"
mid = 10 + i
publish_packet = mosq_test.gen_publish(
topic,
mid=mid,
qos=qos,
payload=payload.encode("UTF-8"),
proto_ver=proto_ver,
properties=props,
)
puback_packet = mosq_test.gen_puback(mid=mid, proto_ver=proto_ver)
mosq_test.do_send_receive(sock, publish_packet, puback_packet, "puback")
sock.close()

# Terminate the broker
(broker_terminate_rc, stde) = mosq_test.terminate_broker(broker)
broker = None

persist_help.check_counts(
port,
clients=1,
client_msgs_out=num_messages,
base_msgs=num_messages,
subscriptions=1,
)

# Check client
# persist_help.check_client(
# port,
# client_id,
# username=username,
# will_delay_time=0,
# session_expiry_time=60,
# listener_port=port,
# max_packet_size=0,
# max_qos=2,
# retain_available=1,
# session_expiry_interval=60,
# will_delay_interval=0,
# )

# Check subscription
persist_help.check_subscription(port, client_id, topic, qos, 0)

# Check stored message
for i in range(num_messages):
payload = f"queued message {i:3}"
payload_b = payload.encode("UTF-8")
mid = 10 + i
store_id = persist_help.check_base_msg(
port,
60,
topic,
payload_b,
source_id,
username,
len(payload_b),
mid,
port,
qos,
retain=0,
idx=i,
)

# Check client msg
subscriber_mid = 1 + i
cmsg_id = 1 + i
persist_help.check_client_msg(
port,
client_id,
cmsg_id,
store_id,
0,
persist_help.dir_out,
subscriber_mid,
qos,
0,
persist_help.ms_queued,
idx=i,
)

# Modify message expiry timestamp in the database
assert persist_help.modify_base_msgs(port, sub_expiry_time=120) == num_messages

# Restart broker
broker = mosq_test.start_broker(
filename=conf_file, use_conf=True, port=port
)

# Connect client again, it should have a session, but all queued messages should be dropped
sock = mosq_test.do_client_connect(
connect_packet, connack_packet2, timeout=5, port=port,
)

# Send ping and wait for the PINGRESP to make sure the broker will not send a queued message instead
mosq_test.do_ping(sock)
sock.close()

(broker_terminate_rc, stde) = mosq_test.terminate_broker(broker)
broker = None

persist_help.check_counts(
port,
clients=1,
client_msgs_out=0,
base_msgs=0,
subscriptions=1,
)

rc = broker_terminate_rc
finally:
if broker is not None:
broker.terminate()
if mosq_test.wait_for_subprocess(broker):
if rc == 0:
rc = 1
(_, stde) = broker.communicate()
os.remove(conf_file)
rc += persist_help.cleanup(port)

print(f"{test_case_name}")
if rc:
print(stde.decode("utf-8"))
assert rc == 0, f"rc: {rc}"


do_test(
"memory queue",
additional_config_entries={
"log_type": "all",
"max_queued_messages": num_messages,
},
)
1 change: 1 addition & 0 deletions test/broker/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,7 @@ endif

PERSIST_TESTS = \
./15-persist-bridge-queue.py \
./15-persist-client-drop-expired-messages.py \
./15-persist-client-msg-in-v3-1-1.py \
./15-persist-client-msg-in-v5-0.py \
./15-persist-client-msg-modify-acl.py \
Expand Down
17 changes: 17 additions & 0 deletions test/broker/persist_sqlite.py
Original file line number Diff line number Diff line change
Expand Up @@ -377,6 +377,23 @@ def check_base_msg(

return row[0]

def modify_base_msgs(
port : int,
sub_expiry_time : int
):
num_modified_rows = 0
con = sqlite3.connect(f"{port}/mosquitto.sqlite3")
try:
cur = con.cursor()
cur.execute(
"UPDATE base_msgs"
+f" SET expiry_time = expiry_time - {sub_expiry_time}"
)
num_modified_rows = cur.rowcount
con.commit()
finally:
con.close()
return num_modified_rows

def check_retain(port, topic, store_id):
con = sqlite3.connect(f"{port}/mosquitto.sqlite3")
Expand Down