Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix persist plugin expiry #3016

Open
wants to merge 6 commits into
base: develop
Choose a base branch
from
Prev Previous commit
Next Next commit
Reformatting and improvement of persistent module test helpers
Signed-off-by: Norbert Heusser <[email protected]>
  • Loading branch information
NorbertHeusser committed Mar 12, 2024
commit 9b26357c173b4405f5ce38d159e2a8acea6d9474
125 changes: 78 additions & 47 deletions test/broker/15-persist-bridge-queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
".py", "_bridge_target.conf"
)


def do_test(test_case_name: str, bridging_add_config: dict, target_add_config: dict):
persist_help.write_config(
conf_file, port, additional_config_entries=bridging_add_config
Expand All @@ -46,6 +47,18 @@ def do_test(test_case_name: str, bridging_add_config: dict, target_add_config: d
source_id = "persist-bridge-test-publisher"
proto_ver = 4

def gen_pub_packets(idx: int, mid_offset: int):
payload = f"queued message {idx:3}"
publish_packet = mosq_test.gen_publish(
topic,
mid=mid_offset + idx,
qos=qos,
payload=payload.encode("UTF-8"),
proto_ver=proto_ver,
)
puback_packet = mosq_test.gen_puback(mid=mid_offset + idx, proto_ver=proto_ver)
return publish_packet, puback_packet

connect_packet = mosq_test.gen_connect(
client_id, proto_ver=proto_ver, clean_session=False
)
Expand All @@ -67,44 +80,56 @@ def do_test(test_case_name: str, bridging_add_config: dict, target_add_config: d
bridge_target_broker = mosq_test.start_broker(
filename=conf_file_bridge_target, use_conf=True, port=bridge_target_port
)
# Connect and send a ping to make sure bridge target broker is up
sock = mosq_test.do_client_connect(

# Connect to the bridge target broker and make a qos1 subscription
sock_bridge_target = mosq_test.do_client_connect(
connect_packet, connack_packet1, timeout=5, port=bridge_target_port
)
mosq_test.do_ping(sock)
sock.close()
mosq_test.do_send_receive(
sock_bridge_target,
subscribe_packet,
suback_packet,
"suback from bridge target",
)

# Now start the broker with the bridge
broker = mosq_test.start_broker(filename=conf_file, use_conf=True, port=port)

# Connect and send a ping to make sure bridging broker is up
# Connect and send a single message forwarded to the bridge target
sock = mosq_test.do_client_connect(
connect2_packet, connack2_packet, timeout=5, port=port
)
publish_packet, puback_packet = gen_pub_packets(0, mid_offset=3)
mosq_test.do_send_receive(
sock, publish_packet, puback_packet, "puback for first message"
)

# Wait until we have received the message from the bridge target
publish_packet, puback_packet = gen_pub_packets(0, mid_offset=1)
mosq_test.do_receive_send(
sock_bridge_target, publish_packet, puback_packet, "first published message"
)

# Wait for a ping response to make sure the target broker has processed the PUBACK
mosq_test.do_ping(sock_bridge_target)
sock_bridge_target.close()

# Make sure the bridging broker processes a ping, which means the PUBACK from the bridge target for the
# first message was processed as well
mosq_test.do_ping(sock)

# Stop the bridge target broker
(broker_terminate_rc, stde) = mosq_test.terminate_broker(bridge_target_broker)
(broker_terminate_rc, stde2) = mosq_test.terminate_broker(bridge_target_broker)
bridge_target_broker = None

# Publish messages
for i in range(num_messages):
payload = f"queued message {i:3}"
mid = 10 + i
publish_packet = mosq_test.gen_publish(
topic,
mid=mid,
qos=qos,
payload=payload.encode("UTF-8"),
proto_ver=proto_ver,
)
puback_packet = mosq_test.gen_puback(mid=mid, proto_ver=proto_ver)
publish_packet, puback_packet = gen_pub_packets(idx=i, mid_offset=10)
mosq_test.do_send_receive(sock, publish_packet, puback_packet, "puback")
sock.close()

# Terminate the bridging broker
(broker_terminate_rc, stde) = mosq_test.terminate_broker(broker)
(broker_terminate_rc, stde3) = mosq_test.terminate_broker(broker)
broker = None

persist_help.check_counts(
Expand Down Expand Up @@ -136,8 +161,8 @@ def do_test(test_case_name: str, bridging_add_config: dict, target_add_config: d
)

# Check client msg
subscriber_mid = 3 + i
cmsg_id = 1 + i
subscriber_mid = 4 + i
cmsg_id = 2 + i
persist_help.check_client_msg(
port,
"upstream-bridge",
Expand All @@ -149,35 +174,24 @@ def do_test(test_case_name: str, bridging_add_config: dict, target_add_config: d
qos,
0,
persist_help.ms_queued,
idx=i
)

# Start the bridge target broker
bridge_target_broker = mosq_test.start_broker(
filename=conf_file_bridge_target, use_conf=True, port=bridge_target_port
)

# Connect to the bridge target broker and make a qos1 subscription
# Reconnect to the bridge target broker
sock = mosq_test.do_client_connect(
connect_packet, connack_packet2, timeout=5, port=bridge_target_port
)
mosq_test.do_send_receive(sock, subscribe_packet, suback_packet, "suback")

# Restart bridging broker
broker = mosq_test.start_broker(filename=conf_file, use_conf=True, port=port)

# Check, if all message got forwarded through the bridge
for i in range(num_messages):
payload = f"queued message {i:3}"
mid = 1 + i
publish_packet = mosq_test.gen_publish(
topic,
mid=mid,
qos=qos,
payload=payload.encode("UTF-8"),
proto_ver=proto_ver,
)
puback_packet = mosq_test.gen_puback(mid=mid, proto_ver=proto_ver)
publish_packet, puback_packet = gen_pub_packets(idx=i, mid_offset=1)
mosq_test.do_receive_send(
sock,
publish_packet,
Expand All @@ -189,11 +203,19 @@ def do_test(test_case_name: str, bridging_add_config: dict, target_add_config: d
mosq_test.do_ping(sock)
sock.close()

# Reconnect to the bridging broker and send ping to make sure the broker has process
# PUBACK from bridge target before getting shut down
sock = mosq_test.do_client_connect(
connect2_packet, connack2_packet, timeout=5, port=port
)
mosq_test.do_ping(sock)
sock.close()

# Stop both brokers
(broker_terminate_rc, stde) = mosq_test.terminate_broker(broker)
broker = None
(broker_terminate_rc, stde2) = mosq_test.terminate_broker(bridge_target_broker)
bridge_target_broker = None
(broker_terminate_rc, stde) = mosq_test.terminate_broker(broker)
broker = None

persist_help.check_counts(
port,
Expand Down Expand Up @@ -224,23 +246,32 @@ def do_test(test_case_name: str, bridging_add_config: dict, target_add_config: d

print(f"{test_case_name}")
if rc:
print(stde.decode("utf-8"))
if stde2 is not None:
print("Bridge target brocker log:")
print(stde2.decode("utf-8"))
if stde3 is not None:
print("Bridging brocker log (first run):")
print(stde3.decode("utf-8"))
if stde is not None:
print("Bridging brocker log:")
print(stde.decode("utf-8"))
assert rc == 0, f"rc: {rc}"


in_bridge_config = {
"connection": "in-bridge",
"address": f"localhost:{port}",
"local_clientid": "bridge-test",
"remote_clientid": "upstream-bridge",
"topic": f"{topic} in 2",
"connection": "in-bridge",
"address": f"localhost:{port}",
"local_clientid": "bridge-test",
"remote_clientid": "upstream-bridge",
"topic": f"{topic} in 2",
}

out_bridge_config = {
"connection": "out-bridge",
"address": f"localhost:{bridge_target_port}",
"local_clientid": "upstream-bridge",
"remote_clientid": "bridge-test",
"topic": f"{topic} out 2",
"connection": "out-bridge",
"address": f"localhost:{bridge_target_port}",
"local_clientid": "upstream-bridge",
"remote_clientid": "bridge-test",
"topic": f"{topic} out 2",
}

memory_queue_config = {
Expand All @@ -250,7 +281,7 @@ def do_test(test_case_name: str, bridging_add_config: dict, target_add_config: d

do_test(
"memory queue out bridge",
bridging_add_config= memory_queue_config | out_bridge_config,
bridging_add_config=memory_queue_config | out_bridge_config,
target_add_config={},
)

Expand Down
27 changes: 16 additions & 11 deletions test/broker/15-persist-client-drop-expired-messages.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

num_messages = 100


def do_test(test_case_name: str, additional_config_entries: dict):
conf_file = os.path.basename(__file__).replace(".py", f"_{port}.conf")
persist_help.write_config(
Expand All @@ -29,7 +30,11 @@ def do_test(test_case_name: str, additional_config_entries: dict):
proto_ver = 5

connect_packet = mosq_test.gen_connect(
client_id, username=username, proto_ver=proto_ver, clean_session=False, session_expiry=60
client_id,
username=username,
proto_ver=proto_ver,
clean_session=False,
session_expiry=60,
)
connack_packet1 = mosq_test.gen_connack(rc=0, proto_ver=proto_ver)
connack_packet2 = mosq_test.gen_connack(rc=0, flags=1, proto_ver=proto_ver)
Expand All @@ -39,15 +44,13 @@ def do_test(test_case_name: str, additional_config_entries: dict):
suback_packet = mosq_test.gen_suback(mid, qos=qos, proto_ver=proto_ver)

connect2_packet = mosq_test.gen_connect(
source_id, username=username, proto_ver=proto_ver
source_id, username=username, proto_ver=proto_ver
)
connack2_packet = mosq_test.gen_connack(rc=0, proto_ver=proto_ver)

rc = 1

broker = mosq_test.start_broker(
filename=conf_file, use_conf=True, port=port
)
broker = mosq_test.start_broker(filename=conf_file, use_conf=True, port=port)

con = None
try:
Expand All @@ -60,7 +63,9 @@ def do_test(test_case_name: str, additional_config_entries: dict):
sock = mosq_test.do_client_connect(
connect2_packet, connack2_packet, timeout=5, port=port
)
props = mqtt5_props.gen_uint32_prop(mqtt5_props.PROP_MESSAGE_EXPIRY_INTERVAL, 60)
props = mqtt5_props.gen_uint32_prop(
mqtt5_props.PROP_MESSAGE_EXPIRY_INTERVAL, 60
)
for i in range(num_messages):
payload = f"queued message {i:3}"
mid = 10 + i
Expand Down Expand Up @@ -140,20 +145,20 @@ def do_test(test_case_name: str, additional_config_entries: dict):
qos,
0,
persist_help.ms_queued,
idx=i,
)

# Modify message expiry timestamp in the database
assert persist_help.modify_base_msgs(port, sub_expiry_time=120) == num_messages

# Restart broker
broker = mosq_test.start_broker(
filename=conf_file, use_conf=True, port=port
)
broker = mosq_test.start_broker(filename=conf_file, use_conf=True, port=port)

# Connect client again, it should have a session, but all queued messages should be dropped
sock = mosq_test.do_client_connect(
connect_packet, connack_packet2, timeout=5, port=port,
connect_packet,
connack_packet2,
timeout=5,
port=port,
)

# Send ping and wait for the PINGRESP to make sure the broker will not send a queued message instead
Expand Down
1 change: 0 additions & 1 deletion test/broker/15-persist-client-expired-session.py
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,6 @@ def do_test(
qos,
0,
persist_help.ms_queued,
idx=i,
)

# Put session expiry_time into the past
Expand Down
9 changes: 2 additions & 7 deletions test/broker/15-persist-client-msg-modify-acl.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,7 @@ def do_test(test_case_name: str, additional_config_entries: dict):

rc = 1

broker = mosq_test.start_broker(
filename=os.path.basename(__file__), use_conf=True, port=port
)
broker = mosq_test.start_broker(filename=conf_file, use_conf=True, port=port)

con = None
try:
Expand Down Expand Up @@ -146,7 +144,6 @@ def do_test(test_case_name: str, additional_config_entries: dict):
qos,
0,
persist_help.ms_queued,
idx=i,
)

# Remove any permission for the test topic and the test user
Expand All @@ -156,9 +153,7 @@ def do_test(test_case_name: str, additional_config_entries: dict):
os.chmod(f"{acl_file}", 0o644)

# Restart broker
broker = mosq_test.start_broker(
filename=os.path.basename(__file__), use_conf=True, port=port
)
broker = mosq_test.start_broker(filename=conf_file, use_conf=True, port=port)

# Connect client again, it should have a session, but all queued messages should be dropped
sock = mosq_test.do_client_connect(
Expand Down