Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix persist plugin expiry #3016

Open
wants to merge 6 commits into
base: develop
Choose a base branch
from
Prev Previous commit
Fixed deletion of expired retain msg after restore from persistent pl…
…ugin

Signed-off-by: Norbert Heusser <[email protected]>
  • Loading branch information
NorbertHeusser committed Mar 12, 2024
commit fa6bb8b4586f7d88fe9bedec200aec6252f93161
6 changes: 6 additions & 0 deletions src/database.c
Original file line number Diff line number Diff line change
Expand Up @@ -1201,6 +1201,12 @@
}


void db__retain_expiry_check()
{
retain__expiry_check(&db.retains);
}


void db__expire_all_messages(struct mosquitto *context)
{
struct mosquitto__client_msg *client_msg, *tmp;
Expand Down Expand Up @@ -1296,7 +1302,7 @@
util__increment_send_quota(context);
}
db__message_remove_inflight(context, &context->msgs_out, client_msg);
db__fill_inflight_out_from_queue(context);

Check warning on line 1305 in src/database.c

View check run for this annotation

Codecov / codecov/patch

src/database.c#L1305

Added line #L1305 was not covered by tests
return MOSQ_ERR_SUCCESS;
}else{
expiry_interval = (uint32_t)(base_msg->data.expiry_time - db.now_real_s);
Expand Down
1 change: 1 addition & 0 deletions src/mosquitto.c
Original file line number Diff line number Diff line change
Expand Up @@ -427,6 +427,7 @@ int main(int argc, char *argv[])

plugin_persist__handle_restore();
session_expiry__check();
db__retain_expiry_check();
db__msg_store_compact();

/* After loading persisted clients and ACLs, try to associate them,
Expand Down
3 changes: 2 additions & 1 deletion src/mosquitto_broker_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -732,6 +732,7 @@ int db__message_write_queued_in(struct mosquitto *context);
void db__msg_add_to_inflight_stats(struct mosquitto_msg_data *msg_data, struct mosquitto__client_msg *msg);
void db__msg_add_to_queued_stats(struct mosquitto_msg_data *msg_data, struct mosquitto__client_msg *msg);
uint64_t db__new_msg_id(void);
void db__retain_expiry_check(void);
void db__expire_all_messages(struct mosquitto *context);
void db__check_acl_of_all_messages(struct mosquitto *context);

Expand Down Expand Up @@ -889,7 +890,7 @@ int retain__init(void);
void retain__clean(struct mosquitto__retainhier **retainhier);
int retain__queue(struct mosquitto *context, const struct mosquitto_subscription *sub);
int retain__store(const char *topic, struct mosquitto__base_msg *base_msg, char **split_topics, bool persist);

void retain__expiry_check(struct mosquitto__retainhier **retainhier);
/* ============================================================
* Security related functions
* ============================================================ */
Expand Down
1 change: 0 additions & 1 deletion src/plugin_public.c
Original file line number Diff line number Diff line change
Expand Up @@ -751,7 +751,6 @@ BROKER_EXPORT int mosquitto_persist_base_msg_add(struct mosquitto_base_msg *msg_
{
struct mosquitto context;
struct mosquitto__base_msg *base_msg;
int i;
int rc;

memset(&context, 0, sizeof(context));
Expand Down
33 changes: 26 additions & 7 deletions src/retain.c
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,19 @@ int retain__store(const char *topic, struct mosquitto__base_msg *base_msg, char
return MOSQ_ERR_SUCCESS;
}

static bool retain__delete_expired_msg(struct mosquitto__retainhier *branch)
{
if(branch->retained && branch->retained->data.expiry_time > 0 && db.now_real_s >= branch->retained->data.expiry_time){
plugin_persist__handle_retain_msg_delete(branch->retained);
db__msg_store_ref_dec(&branch->retained);
branch->retained = NULL;
#ifdef WITH_SYS_TREE
db.retained_count--;
#endif
return true;
}
return false;
}

static int retain__process(struct mosquitto__retainhier *branch, struct mosquitto *context, const struct mosquitto_subscription *sub)
{
Expand All @@ -200,13 +213,7 @@ static int retain__process(struct mosquitto__retainhier *branch, struct mosquitt
uint16_t mid;
struct mosquitto__base_msg *retained;

if(branch->retained->data.expiry_time > 0 && db.now_real_s >= branch->retained->data.expiry_time){
plugin_persist__handle_retain_msg_delete(branch->retained);
db__msg_store_ref_dec(&branch->retained);
branch->retained = NULL;
#ifdef WITH_SYS_TREE
db.retained_count--;
#endif
if(retain__delete_expired_msg(branch)){
return MOSQ_ERR_SUCCESS;
}

Expand Down Expand Up @@ -344,6 +351,17 @@ int retain__queue(struct mosquitto *context, const struct mosquitto_subscription
return MOSQ_ERR_SUCCESS;
}

void retain__expiry_check(struct mosquitto__retainhier **retainhier)
{
struct mosquitto__retainhier *peer, *retainhier_tmp;

HASH_ITER(hh, *retainhier, peer, retainhier_tmp){
retain__expiry_check(&peer->children);
if (retain__delete_expired_msg(peer)){
retain__clean_empty_hierarchy(peer);
}
}
}

void retain__clean(struct mosquitto__retainhier **retainhier)
{
Expand All @@ -360,3 +378,4 @@ void retain__clean(struct mosquitto__retainhier **retainhier)
}
}


13 changes: 12 additions & 1 deletion test/broker/15-persist-client-drop-expired-messages.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,10 @@ def do_test(
additional_config_entries: dict,
resubscribe: bool,
num_messages_two_subscribers: int = 0,
num_retain_messages : int = 0,
):
print(
f"{test_case_name}, resubscribe = {resubscribe}, two_subscribers = {'True' if num_messages_two_subscribers > 0 else 'False'}"
f"{test_case_name}, resubscribe = {resubscribe}, two_subscribers = {'True' if num_messages_two_subscribers > 0 else 'False'}, num_retain_messages = {num_retain_messages} "
)

conf_file = os.path.basename(__file__).replace(".py", f"_{port}.conf")
Expand Down Expand Up @@ -71,6 +72,7 @@ def do_test(
0,
num_messages - num_messages_two_subscribers,
message_expiry=60,
retain_end=num_retain_messages,
)

if num_messages_two_subscribers > 0:
Expand All @@ -90,6 +92,7 @@ def do_test(
num_messages - num_messages_two_subscribers,
num_messages,
message_expiry=60,
retain_end=num_retain_messages,
)
publisher_sock.close()

Expand All @@ -105,6 +108,7 @@ def do_test(
client_msg_counts=msg_counts,
publisher_id=publisher_id,
num_published_msgs=num_messages,
retain_end = num_retain_messages,
message_expiry=60,
)

Expand Down Expand Up @@ -143,6 +147,7 @@ def do_test(
client_msg_counts=msg_counts,
publisher_id=publisher_id,
num_published_msgs=num_messages,
retain_end = 0,
)

rc = broker_terminate_rc
Expand Down Expand Up @@ -182,3 +187,9 @@ def do_test(
resubscribe=False,
num_messages_two_subscribers=30,
)
do_test(
"memory queue",
additional_config_entries=memory_queue_config,
resubscribe=False,
num_retain_messages=40,
)
32 changes: 31 additions & 1 deletion test/broker/15-persist-client-expired-session.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,10 @@ def do_test(
additional_config_entries: dict,
resubscribe: bool,
num_messages_two_subscribers: int = 0,
num_retain_messages : int = 0,
):
print(
f"{test_case_name}, resubscribe = {resubscribe}, two_subscribers = {'True' if num_messages_two_subscribers > 0 else 'False'}"
f"{test_case_name}, resubscribe = {resubscribe}, two_subscribers = {'True' if num_messages_two_subscribers > 0 else 'False'}, num_retain_messages = {num_retain_messages} "
)

conf_file = os.path.basename(__file__).replace(".py", f"_{port}.conf")
Expand Down Expand Up @@ -69,6 +70,7 @@ def do_test(
topic,
0,
num_messages - num_messages_two_subscribers,
retain_end=num_retain_messages,
)

if num_messages_two_subscribers > 0:
Expand All @@ -87,6 +89,7 @@ def do_test(
topic,
num_messages - num_messages_two_subscribers,
num_messages,
retain_end=num_retain_messages,
)
publisher_sock.close()

Expand All @@ -102,6 +105,7 @@ def do_test(
client_msg_counts=msg_counts,
publisher_id=publisher_id,
num_published_msgs=num_messages,
retain_end = num_retain_messages,
)

# Put session expiry_time into the past
Expand Down Expand Up @@ -136,6 +140,7 @@ def do_test(
client_msg_counts=msg_counts,
publisher_id=publisher_id,
num_published_msgs=num_messages,
retain_end=num_retain_messages,
)

if num_messages_two_subscribers > 0:
Expand Down Expand Up @@ -175,6 +180,7 @@ def do_test(
client_msg_counts=msg_counts,
publisher_id=publisher_id,
num_published_msgs=num_messages,
retain_end=num_retain_messages,
)

rc = broker_terminate_rc
Expand Down Expand Up @@ -221,3 +227,27 @@ def do_test(
resubscribe=True,
num_messages_two_subscribers=20,
)
do_test(
"memory queue",
additional_config_entries=memory_queue_config,
resubscribe=False,
num_retain_messages=30,
)
# The following test case is open for discussion as adapting
# the check routines will be hard and some observations
# are unclear right now
# do_test(
# "memory queue",
# additional_config_entries=memory_queue_config,
# resubscribe=False,
# num_messages_two_subscribers=40,
# num_retain_messages=30,
# )
# do_test(
# "memory queue",
# additional_config_entries=memory_queue_config,
# resubscribe=False,
# num_messages_two_subscribers=50,
# num_retain_messages=60,
# )

10 changes: 7 additions & 3 deletions test/broker/persist_module_helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ def publish_messages(
topic: str,
start: int,
end: int,
retain_end=0,
message_expiry: int = 0,
qos: int = 1,
):
Expand All @@ -65,6 +66,7 @@ def publish_messages(
mid=mid,
qos=qos,
payload=payload.encode("UTF-8"),
retain = True if i < retain_end else False,
proto_ver=proto_ver,
properties=props,
)
Expand All @@ -80,18 +82,20 @@ def check_db(
client_msg_counts: dict[str, int],
publisher_id: str,
num_published_msgs: int,
retain_end: int,
message_expiry: int = 0,
qos: int = 1,
):
count_list = [v for v in client_msg_counts.values() if v] + [0]
count_list = [v for v in client_msg_counts.values() if v is not None] + [0]
num_base_msgs = max(count_list)
num_subscriptions = sum(1 for c in client_msg_counts.values() if c is not None)
num_client_msgs_out = sum(count_list)
persist_help.check_counts(
port,
clients=len(client_msg_counts),
client_msgs_out=num_client_msgs_out,
base_msgs=num_base_msgs,
base_msgs=num_base_msgs if num_base_msgs > 0 or retain_end == 0 else 1,
retain_msgs=1 if retain_end > 0 else 0,
subscriptions=num_subscriptions,
)

Expand Down Expand Up @@ -131,7 +135,7 @@ def check_db(
mid,
port,
qos,
retain=0,
retain=1 if i < retain_end else 0,
idx=i,
)
# Check client msg
Expand Down
4 changes: 0 additions & 4 deletions test/broker/persist_sqlite.py
Original file line number Diff line number Diff line change
Expand Up @@ -360,10 +360,6 @@ def check_client_msg(
raise ValueError(
"Invalid state %d / %d for message %s" % (row[8], state, msg_id)
)
except ValueError as err:
raise ValueError(
str(err) + f" in client message client_id = {client_id} cmsg_id = {idx}"
) from err
finally:
con.close()

Expand Down
5 changes: 5 additions & 0 deletions test/unit/broker/subs_stubs.c
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,11 @@
return MOSQ_ERR_SUCCESS;
}

void retain__expiry_check(struct mosquitto__retainhier **retainhier)

Check warning on line 118 in test/unit/broker/subs_stubs.c

View check run for this annotation

Codecov / codecov/patch

test/unit/broker/subs_stubs.c#L118

Added line #L118 was not covered by tests
{
UNUSED(retainhier);
}

Check warning on line 121 in test/unit/broker/subs_stubs.c

View check run for this annotation

Codecov / codecov/patch

test/unit/broker/subs_stubs.c#L120-L121

Added lines #L120 - L121 were not covered by tests

void retain__clean(struct mosquitto__retainhier **retainhier)
{
UNUSED(retainhier);
Expand Down