From e401def06dd68ce70c101eac5c3e051a3ea72392 Mon Sep 17 00:00:00 2001 From: "Roger A. Light" Date: Mon, 8 Mar 2021 23:23:45 +0000 Subject: [PATCH] Fix QoS 0 messages not being delivered when max_queued_bytes was configured. Closes #2123. Thanks to quackgizmo. --- ChangeLog.txt | 2 + lib/mosquitto_internal.h | 4 +- src/database.c | 6 +-- test/broker/02-subpub-qos0-queued-bytes.py | 61 ++++++++++++++++++++++ test/broker/Makefile | 1 + test/broker/test.py | 1 + 6 files changed, 70 insertions(+), 5 deletions(-) create mode 100755 test/broker/02-subpub-qos0-queued-bytes.py diff --git a/ChangeLog.txt b/ChangeLog.txt index ffc5f40e58..ee2fd96b85 100644 --- a/ChangeLog.txt +++ b/ChangeLog.txt @@ -5,6 +5,8 @@ Broker: - Fix `tls_version` behaviour not matching documentation. It was setting the exact TLS version to use, not the minimium TLS version to use. Closes #2110. - Fix messages to `$` prefixed topics being rejected. Closes #2111. +- Fix QoS 0 messages not being delivered when max_queued_bytes was configured. + Closes #2123. 2.0.8 - 2021-02-25 diff --git a/lib/mosquitto_internal.h b/lib/mosquitto_internal.h index 04bdf92a04..02008af120 100644 --- a/lib/mosquitto_internal.h +++ b/lib/mosquitto_internal.h @@ -190,8 +190,8 @@ struct mosquitto_msg_data{ #ifdef WITH_BROKER struct mosquitto_client_msg *inflight; struct mosquitto_client_msg *queued; - unsigned long msg_bytes; - unsigned long msg_bytes12; + long msg_bytes; + long msg_bytes12; int msg_count; int msg_count12; #else diff --git a/src/database.c b/src/database.c index db18b9f1c3..77eb45206e 100644 --- a/src/database.c +++ b/src/database.c @@ -53,7 +53,7 @@ bool db__ready_for_flight(struct mosquitto_msg_data *msgs, int qos) if(db.config->max_queued_messages == 0 && db.config->max_inflight_bytes == 0){ return true; } - valid_bytes = msgs->msg_bytes - db.config->max_inflight_bytes < db.config->max_queued_bytes; + valid_bytes = ((msgs->msg_bytes - (ssize_t)db.config->max_inflight_bytes) < (ssize_t)db.config->max_queued_bytes); valid_count = msgs->msg_count - msgs->inflight_maximum < db.config->max_queued_messages; if(db.config->max_queued_messages == 0){ @@ -90,8 +90,8 @@ bool db__ready_for_queue(struct mosquitto *context, int qos, struct mosquitto_ms { int source_count; int adjust_count; - size_t source_bytes; - size_t adjust_bytes = db.config->max_inflight_bytes; + long source_bytes; + ssize_t adjust_bytes = (ssize_t)db.config->max_inflight_bytes; bool valid_bytes; bool valid_count; diff --git a/test/broker/02-subpub-qos0-queued-bytes.py b/test/broker/02-subpub-qos0-queued-bytes.py new file mode 100755 index 0000000000..7ca08e8076 --- /dev/null +++ b/test/broker/02-subpub-qos0-queued-bytes.py @@ -0,0 +1,61 @@ +#!/usr/bin/env python3 + +from mosq_test_helper import * + +def write_config(filename, port): + with open(filename, 'w') as f: + f.write("listener %d\n" % (port)) + f.write("allow_anonymous true\n") + f.write("max_inflight_messages 20\n") + f.write("max_inflight_bytes 1000000\n") + f.write("max_queued_messages 20\n") + f.write("max_queued_bytes 1000000\n") + +def do_test(proto_ver): + rc = 1 + keepalive = 60 + connect_packet = mosq_test.gen_connect("subpub-qos0-bytes", keepalive=keepalive, proto_ver=proto_ver) + connack_packet = mosq_test.gen_connack(rc=0, proto_ver=proto_ver) + + connect_packet_helper = mosq_test.gen_connect("qos0-bytes-helper", keepalive=keepalive, proto_ver=proto_ver) + + mid = 1 + subscribe_packet = mosq_test.gen_subscribe(mid, "subpub/qos0/queued/bytes", 1, proto_ver=proto_ver) + suback_packet = mosq_test.gen_suback(mid, 1, proto_ver=proto_ver) + + publish_packet0 = mosq_test.gen_publish("subpub/qos0/queued/bytes", qos=0, payload="message", proto_ver=proto_ver) + + + port = mosq_test.get_port() + conf_file = os.path.basename(__file__).replace('.py', '.conf') + write_config(conf_file, port) + broker = mosq_test.start_broker(filename=os.path.basename(__file__), use_conf=True, port=port) + + try: + sock = mosq_test.do_client_connect(connect_packet, connack_packet, timeout=4, port=port, connack_error="connack 1") + + mosq_test.do_send_receive(sock, subscribe_packet, suback_packet, "suback") + + helper = mosq_test.do_client_connect(connect_packet_helper, connack_packet, timeout=4, port=port, connack_error="connack helper") + + helper.send(publish_packet0) + mosq_test.expect_packet(sock, "publish0", publish_packet0) + rc = 0 + + sock.close() + except mosq_test.TestError: + pass + finally: + os.remove(conf_file) + broker.terminate() + broker.wait() + (stdo, stde) = broker.communicate() + if rc: + print(stde.decode('utf-8')) + print("proto_ver=%d" % (proto_ver)) + exit(rc) + + +do_test(proto_ver=4) +do_test(proto_ver=5) +exit(0) diff --git a/test/broker/Makefile b/test/broker/Makefile index fcb6839dca..018160635d 100644 --- a/test/broker/Makefile +++ b/test/broker/Makefile @@ -45,6 +45,7 @@ test : test-compile 01 02 03 04 05 06 07 08 09 10 11 12 13 14 ./02-subhier-crash.py ./02-subpub-qos0-long-topic.py ./02-subpub-qos0-oversize-payload.py + ./02-subpub-qos0-queued-bytes.py ./02-subpub-qos0-retain-as-publish.py ./02-subpub-qos0-send-retain.py ./02-subpub-qos0-subscription-id.py diff --git a/test/broker/test.py b/test/broker/test.py index 0514359aae..6221620316 100755 --- a/test/broker/test.py +++ b/test/broker/test.py @@ -28,6 +28,7 @@ (1, './02-subhier-crash.py'), (1, './02-subpub-qos0-long-topic.py'), (1, './02-subpub-qos0-oversize-payload.py'), + (1, './02-subpub-qos0-queued-bytes.py'), (1, './02-subpub-qos0-retain-as-publish.py'), (1, './02-subpub-qos0-send-retain.py'), (1, './02-subpub-qos0-subscription-id.py'),