Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix persist plugin expiry #3016

Open
wants to merge 6 commits into
base: develop
Choose a base branch
from
Prev Previous commit
Next Next commit
Add second subscriber to persistent plugin expiry test
Signed-off-by: Norbert Heusser <[email protected]>
  • Loading branch information
NorbertHeusser committed Mar 12, 2024
commit 6ca0fe1a147abff43067131561da533bf2b3932b
234 changes: 108 additions & 126 deletions test/broker/15-persist-client-drop-expired-messages.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,34 @@
# different client, restore, reconnect, check it is received.

from mosq_test_helper import *
from persist_module_helper import *

persist_help = persist_module()

port = mosq_test.get_port()

num_messages = 100

proto_ver = 5
qos = 1
topic = "test-expired-msgs"
username = "test-message-expiry"

subscriber_id = "test-subscriber"
second_subscriber_id = "second-subscriber"
publisher_id = "test-publisher"


def do_test(
test_case_name: str,
additional_config_entries: dict,
resubscribe: bool,
num_messages_two_subscribers: int = 0,
):
print(
f"{test_case_name}, resubscribe = {resubscribe}, two_subscribers = {'True' if num_messages_two_subscribers > 0 else 'False'}"
)

def do_test(test_case_name: str, additional_config_entries: dict):
conf_file = os.path.basename(__file__).replace(".py", f"_{port}.conf")
persist_help.write_config(
conf_file,
Expand All @@ -21,159 +40,109 @@ def do_test(test_case_name: str, additional_config_entries: dict):
)
persist_help.init(port)

client_id = "test-expired-messages-subscriber"
username = "test-message-expiry"

qos = 1
topic = "client-msg/test"
source_id = "test-expired-messages-publisher"
proto_ver = 5

connect_packet = mosq_test.gen_connect(
client_id,
username=username,
proto_ver=proto_ver,
clean_session=False,
session_expiry=60,
)
connack_packet1 = mosq_test.gen_connack(rc=0, proto_ver=proto_ver)
connack_packet2 = mosq_test.gen_connack(rc=0, flags=1, proto_ver=proto_ver)

mid = 1
subscribe_packet = mosq_test.gen_subscribe(mid, topic, qos, proto_ver=proto_ver)
suback_packet = mosq_test.gen_suback(mid, qos=qos, proto_ver=proto_ver)

connect2_packet = mosq_test.gen_connect(
source_id, username=username, proto_ver=proto_ver
publisher_id, username=username, proto_ver=proto_ver
)
connack2_packet = mosq_test.gen_connack(rc=0, proto_ver=proto_ver)

rc = 1

broker = mosq_test.start_broker(filename=conf_file, use_conf=True, port=port)

con = None
try:
sock = mosq_test.do_client_connect(
connect_packet, connack_packet1, timeout=5, port=port
)
mosq_test.do_send_receive(sock, subscribe_packet, suback_packet, "suback")
sock.close()
msg_counts = {subscriber_id: num_messages}

sock = mosq_test.do_client_connect(
connect2_packet, connack2_packet, timeout=5, port=port
connect_client(
port,
subscriber_id,
username,
proto_ver,
session_expiry=60,
subscribe_topic=topic,
).close()

publisher_sock = connect_client(
port, publisher_id, username, proto_ver, session_expiry=0
)
props = mqtt5_props.gen_uint32_prop(
mqtt5_props.PROP_MESSAGE_EXPIRY_INTERVAL, 60
publish_messages(
publisher_sock,
proto_ver,
topic,
0,
num_messages - num_messages_two_subscribers,
message_expiry=60,
)
for i in range(num_messages):
payload = f"queued message {i:3}"
mid = 10 + i
publish_packet = mosq_test.gen_publish(

if num_messages_two_subscribers > 0:
msg_counts[second_subscriber_id] = num_messages_two_subscribers
connect_client(
port,
second_subscriber_id,
username,
proto_ver,
session_expiry=60,
subscribe_topic=topic,
).close()
publish_messages(
publisher_sock,
proto_ver,
topic,
mid=mid,
qos=qos,
payload=payload.encode("UTF-8"),
proto_ver=proto_ver,
properties=props,
num_messages - num_messages_two_subscribers,
num_messages,
message_expiry=60,
)
puback_packet = mosq_test.gen_puback(mid=mid, proto_ver=proto_ver)
mosq_test.do_send_receive(sock, publish_packet, puback_packet, "puback")
sock.close()
publisher_sock.close()

# Terminate the broker
(broker_terminate_rc, stde) = mosq_test.terminate_broker(broker)
broker = None

persist_help.check_counts(
check_db(
persist_help,
port,
clients=1,
client_msgs_out=num_messages,
base_msgs=num_messages,
subscriptions=1,
username,
subscription_topic=topic,
client_msg_counts=msg_counts,
publisher_id=publisher_id,
num_published_msgs=num_messages,
message_expiry=60,
)

# Check client
# persist_help.check_client(
# port,
# client_id,
# username=username,
# will_delay_time=0,
# session_expiry_time=60,
# listener_port=port,
# max_packet_size=0,
# max_qos=2,
# retain_available=1,
# session_expiry_interval=60,
# will_delay_interval=0,
# )

# Check subscription
persist_help.check_subscription(port, client_id, topic, qos, 0)

# Check stored message
for i in range(num_messages):
payload = f"queued message {i:3}"
payload_b = payload.encode("UTF-8")
mid = 10 + i
store_id = persist_help.check_base_msg(
port,
60,
topic,
payload_b,
source_id,
username,
len(payload_b),
mid,
port,
qos,
retain=0,
idx=i,
)

# Check client msg
subscriber_mid = 1 + i
cmsg_id = 1 + i
persist_help.check_client_msg(
port,
client_id,
cmsg_id,
store_id,
0,
persist_help.dir_out,
subscriber_mid,
qos,
0,
persist_help.ms_queued,
)

# Modify message expiry timestamp in the database
# Put session expiry_time into the past
assert persist_help.modify_base_msgs(port, sub_expiry_time=120) == num_messages

# Restart broker
broker = mosq_test.start_broker(filename=conf_file, use_conf=True, port=port)

# Connect client again, it should have a session, but all queued messages should be dropped
sock = mosq_test.do_client_connect(
connect_packet,
connack_packet2,
timeout=5,
port=port,
)

# Reconnect client(s), it should have a session, but all queued messages should be dropped
for client_id in msg_counts.keys():
subscriber_sock = connect_client(
port,
client_id,
username,
proto_ver,
session_expiry=60,
session_present=True,
subscribe_topic=topic if resubscribe else None,
)
# Send ping and wait for the PINGRESP to make sure the broker will not send a queued message instead
mosq_test.do_ping(sock)
sock.close()
mosq_test.do_ping(subscriber_sock)
subscriber_sock.close()

(broker_terminate_rc, stde) = mosq_test.terminate_broker(broker)
broker = None

persist_help.check_counts(
for client_id in msg_counts.keys():
# None for subscriber with subscriber_id means no subscription
msg_counts[client_id] = 0
check_db(
persist_help,
port,
clients=1,
client_msgs_out=0,
base_msgs=0,
subscriptions=1,
username,
subscription_topic=topic,
client_msg_counts=msg_counts,
publisher_id=publisher_id,
num_published_msgs=num_messages,
)

rc = broker_terminate_rc
Expand All @@ -187,16 +156,29 @@ def do_test(test_case_name: str, additional_config_entries: dict):
os.remove(conf_file)
rc += persist_help.cleanup(port)

print(f"{test_case_name}")
if rc:
print(stde.decode("utf-8"))
assert rc == 0, f"rc: {rc}"


memory_queue_config = {
"log_type": "all",
"max_queued_messages": num_messages,
}

do_test(
"memory queue",
additional_config_entries=memory_queue_config,
resubscribe=False,
)
do_test(
"memory queue",
additional_config_entries=memory_queue_config,
resubscribe=True,
)
do_test(
"memory queue",
additional_config_entries={
"log_type": "all",
"max_queued_messages": num_messages,
},
additional_config_entries=memory_queue_config,
resubscribe=False,
num_messages_two_subscribers=30,
)