Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Perform ACK completion after the callback to avoid message loss #2906

Open
wants to merge 5 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
7 changes: 6 additions & 1 deletion lib/handle_publish.c
Original file line number Diff line number Diff line change
Expand Up @@ -156,14 +156,19 @@ int handle__publish(struct mosquitto *mosq)
return MOSQ_ERR_SUCCESS;
case 1:
util__decrement_receive_quota(mosq);
rc = send__puback(mosq, mid, 0, NULL);
// Application callbacks are called before PUBACK is sent (and ownership given to the
CIPop marked this conversation as resolved.
Show resolved Hide resolved
CIPop marked this conversation as resolved.
Show resolved Hide resolved
// receiver). An application crash within the callback will result in retransmission.
CIPop marked this conversation as resolved.
Show resolved Hide resolved
callback__on_message(mosq, &message->msg, properties);
rc = send__puback(mosq, mid, 0, NULL);
message__cleanup(&message);
mosquitto_property_free_all(&properties);
return rc;
case 2:
message->properties = properties;
util__decrement_receive_quota(mosq);
// Application callbacks are called before PUBREC is sent (and ownership given to the
CIPop marked this conversation as resolved.
Show resolved Hide resolved
// receiver). An application crash within the callback will result in retransmission.
CIPop marked this conversation as resolved.
Show resolved Hide resolved
callback__on_message(mosq, &message->msg, message->properties);
rc = send__pubrec(mosq, mid, 0, NULL);
pthread_mutex_lock(&mosq->msgs_in.mutex);
message->state = mosq_ms_wait_for_pubrel;
Expand Down
4 changes: 1 addition & 3 deletions lib/handle_pubrel.c
Original file line number Diff line number Diff line change
Expand Up @@ -112,9 +112,7 @@ int handle__pubrel(struct mosquitto *mosq)

rc = message__remove(mosq, mid, mosq_md_in, &message, 2);
if(rc == MOSQ_ERR_SUCCESS){
/* Only pass the message on if we have removed it from the queue - this
* prevents multiple callbacks for the same message. */
callback__on_message(mosq, &message->msg, message->properties);
// Message was already passed to the application prior to PUBREC.
message__cleanup(&message);
}else if(rc == MOSQ_ERR_NOT_FOUND){
return MOSQ_ERR_SUCCESS;
Expand Down
4 changes: 3 additions & 1 deletion test/lib/11-prop-recv-qos1.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,10 @@ def do_test(conn, data):
mosq_test.do_receive_send(conn, connect_packet, connack_packet, "connect")

conn.send(publish_packet)
mosq_test.expect_packet(conn, "puback", puback_packet)
# Application sends the "ok" publish within the callback.
mosq_test.expect_packet(conn, "ok", ok_packet)
# After the callback exits, the puback is sent.
mosq_test.expect_packet(conn, "puback", puback_packet)

conn.close()

Expand Down
6 changes: 3 additions & 3 deletions test/lib/11-prop-recv-qos2.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@ def do_test(conn, data):
disconnect_packet = mosq_test.gen_disconnect(proto_ver=5)

mosq_test.do_receive_send(conn, connect_packet, connack_packet, "connect")
mosq_test.do_send_receive(conn, publish_packet, pubrec_packet, "pubrec")
mosq_test.do_send_receive(conn, pubrel_packet, pubcomp_packet, "pubcomp")
mosq_test.expect_packet(conn, "ok", ok_packet)
mosq_test.do_send_receive(conn, publish_packet, ok_packet, "ok")
mosq_test.do_receive_send(conn, pubrec_packet, pubrel_packet, "pubrel")
mosq_test.expect_packet(conn, "pubcomp", pubcomp_packet)

conn.close()

Expand Down
9 changes: 6 additions & 3 deletions test/lib/c/03-publish-b2c-qos1.c
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
#include <string.h>
#include <mosquitto.h>

volatile bool done = false;

static void on_connect(struct mosquitto *mosq, void *obj, int rc)
{
(void)mosq;
Expand Down Expand Up @@ -44,7 +46,7 @@ static void on_message(struct mosquitto *mosq, void *obj, const struct mosquitto
exit(1);
}

exit(0);
done = true;
}

int main(int argc, char *argv[])
Expand All @@ -70,11 +72,12 @@ int main(int argc, char *argv[])
rc = mosquitto_connect(mosq, "localhost", port, 60);
if(rc != MOSQ_ERR_SUCCESS) return rc;

while(1){
while(!done){
mosquitto_loop(mosq, 300, 1);
}
mosquitto_destroy(mosq);

mosquitto_lib_cleanup();
return 1;

return 0;
}
16 changes: 14 additions & 2 deletions test/lib/c/03-publish-b2c-qos2-len.c
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ static void on_connect(struct mosquitto *mosq, void *obj, int rc)

static void on_message(struct mosquitto *mosq, void *obj, const struct mosquitto_message *msg)
{
(void)mosq;
(void)obj;

if(msg->mid != 56){
Expand Down Expand Up @@ -45,7 +46,7 @@ static void on_message(struct mosquitto *mosq, void *obj, const struct mosquitto
exit(1);
}

mosquitto_disconnect(mosq);
run = 0;
}

static void on_disconnect(struct mosquitto *mosq, void *obj, int rc)
Expand All @@ -54,7 +55,7 @@ static void on_disconnect(struct mosquitto *mosq, void *obj, int rc)
(void)obj;
(void)rc;

run = 0;
run = rc;
}

int main(int argc, char *argv[])
Expand Down Expand Up @@ -86,6 +87,17 @@ int main(int argc, char *argv[])
mosquitto_loop(mosq, 100, 1);
}

// Drain the PUBREL and PUBCOMP messages.
for(int i = 0; i < 2; i++){
mosquitto_loop(mosq, 300, 1);
}

run = -1;
mosquitto_disconnect(mosq);
while(run == -1){
mosquitto_loop(mosq, 100, 1);
}

mosquitto_destroy(mosq);
mosquitto_lib_cleanup();
return run;
Expand Down
5 changes: 5 additions & 0 deletions test/lib/c/03-publish-b2c-qos2.c
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,11 @@ int main(int argc, char *argv[])
mosquitto_loop(mosq, 300, 1);
}

// Drain the PUBREL and PUBCOMP messages.
for(int i = 0; i < 2; i++){
mosquitto_loop(mosq, 300, 1);
}

mosquitto_destroy(mosq);
mosquitto_lib_cleanup();
return run;
Expand Down
10 changes: 9 additions & 1 deletion test/lib/c/11-prop-recv.c
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ int main(int argc, char *argv[])
struct mosquitto *mosq;
int port;

if(argc < 2){
if(argc < 3){
return 1;
}
port = atoi(argv[1]);
Expand All @@ -81,6 +81,14 @@ int main(int argc, char *argv[])
rc = mosquitto_loop(mosq, -1, 1);
if(rc != MOSQ_ERR_SUCCESS) return rc;
}

if (qos > 1){
// Drain the PUBREL and PUBCOMP messages.
for(int i = 0; i < 2; i++){
mosquitto_loop(mosq, 300, 1);
}
}

mosquitto_destroy(mosq);

mosquitto_lib_cleanup();
Expand Down
8 changes: 5 additions & 3 deletions test/lib/cpp/03-publish-b2c-qos1.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
class mosquittopp_test : public mosqpp::mosquittopp
{
public:
volatile bool done = false;

mosquittopp_test(const char *id);

void on_connect(int rc);
Expand Down Expand Up @@ -52,7 +54,7 @@ void mosquittopp_test::on_message(const struct mosquitto_message *msg)
exit(1);
}

exit(0);
this->done = true;
}

int main(int argc, char *argv[])
Expand All @@ -68,14 +70,14 @@ int main(int argc, char *argv[])

mosq->connect("localhost", port, 60);

while(1){
while(!mosq->done){
mosq->loop();
}
delete mosq;

delete mosq;
mosqpp::lib_cleanup();

return 1;
return 0;
}

14 changes: 13 additions & 1 deletion test/lib/cpp/03-publish-b2c-qos2-len.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ void mosquittopp_test::on_message(const struct mosquitto_message *msg)
exit(1);
}

disconnect();
run = 0;
}

int main(int argc, char *argv[])
Expand All @@ -81,6 +81,18 @@ int main(int argc, char *argv[])
mosq->loop(100, 1);
}

// Drain the PUBREL and PUBCOMP messages.
for(int i = 0; i < 2; i++){
mosq->loop();
}

run = -1;
mosq->disconnect();
// Wait for disconnect to complete.
while(run == -1){
mosq->loop(100, 1);
}

delete mosq;
mosqpp::lib_cleanup();

Expand Down
5 changes: 5 additions & 0 deletions test/lib/cpp/03-publish-b2c-qos2.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,11 @@ int main(int argc, char *argv[])
mosq->loop();
}

// Drain the PUBREL and PUBCOMP messages.
for(int i = 0; i < 2; i++){
mosq->loop();
}

delete mosq;
mosqpp::lib_cleanup();

Expand Down
8 changes: 8 additions & 0 deletions test/lib/cpp/11-prop-recv.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,14 @@ int main(int argc, char *argv[])
rc = mosq->loop();
if(rc != MOSQ_ERR_SUCCESS) return rc;
}

if (qos > 1){
// Drain the PUBREL and PUBCOMP messages.
for(int i = 0; i < 2; i++){
mosq->loop();
}
}

delete mosq;

mosqpp::lib_cleanup();
Expand Down