Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Perform ACK completion after the callback to avoid message loss #2906

Open
wants to merge 5 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Prev Previous commit
Addressing PR comments.
  • Loading branch information
CIPop committed Sep 28, 2023
commit bf9dcdcb3fcc3dbf2f68628568772b71d112d7b1
8 changes: 4 additions & 4 deletions lib/handle_publish.c
Original file line number Diff line number Diff line change
Expand Up @@ -156,8 +156,8 @@ int handle__publish(struct mosquitto *mosq)
return MOSQ_ERR_SUCCESS;
case 1:
util__decrement_receive_quota(mosq);
// Application callbacks are called before PUBACK is sent (and ownership given to the
// receiver). An application crash within the callback will result in retransmission.
/* Application callbacks should be called before PUBACK is sent (and ownership given to the
receiver). An application crash within the callback will result in message retransmission. */
callback__on_message(mosq, &message->msg, properties);
rc = send__puback(mosq, mid, 0, NULL);
message__cleanup(&message);
Expand All @@ -166,8 +166,8 @@ int handle__publish(struct mosquitto *mosq)
case 2:
message->properties = properties;
util__decrement_receive_quota(mosq);
// Application callbacks are called before PUBREC is sent (and ownership given to the
// receiver). An application crash within the callback will result in retransmission.
/* Application callbacks should be called before PUBREC is sent (and ownership given to the
receiver). An application crash within the callback will result in message retransmission. */
callback__on_message(mosq, &message->msg, message->properties);
rc = send__pubrec(mosq, mid, 0, NULL);
pthread_mutex_lock(&mosq->msgs_in.mutex);
Expand Down
2 changes: 1 addition & 1 deletion lib/handle_pubrel.c
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ int handle__pubrel(struct mosquitto *mosq)

rc = message__remove(mosq, mid, mosq_md_in, &message, 2);
if(rc == MOSQ_ERR_SUCCESS){
// Message was already passed to the application prior to PUBREC.
/* Message was already passed to the application prior to PUBREC. */
message__cleanup(&message);
}else if(rc == MOSQ_ERR_NOT_FOUND){
return MOSQ_ERR_SUCCESS;
Expand Down
2 changes: 1 addition & 1 deletion test/lib/c/03-publish-b2c-qos2-len.c
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ int main(int argc, char *argv[])
mosquitto_loop(mosq, 100, 1);
}

// Drain the PUBREL and PUBCOMP messages.
/* Drain the PUBREL and PUBCOMP messages. */
for(int i = 0; i < 2; i++){
mosquitto_loop(mosq, 300, 1);
}
Expand Down
2 changes: 1 addition & 1 deletion test/lib/c/03-publish-b2c-qos2.c
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ int main(int argc, char *argv[])
mosquitto_loop(mosq, 300, 1);
}

// Drain the PUBREL and PUBCOMP messages.
/* Drain the PUBREL and PUBCOMP messages. */
for(int i = 0; i < 2; i++){
mosquitto_loop(mosq, 300, 1);
}
Expand Down
2 changes: 1 addition & 1 deletion test/lib/c/11-prop-recv.c
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ int main(int argc, char *argv[])
}

if (qos > 1){
// Drain the PUBREL and PUBCOMP messages.
/* Drain the PUBREL and PUBCOMP messages. */
for(int i = 0; i < 2; i++){
mosquitto_loop(mosq, 300, 1);
}
Expand Down
4 changes: 2 additions & 2 deletions test/lib/cpp/03-publish-b2c-qos2-len.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -81,14 +81,14 @@ int main(int argc, char *argv[])
mosq->loop(100, 1);
}

// Drain the PUBREL and PUBCOMP messages.
/* Drain the PUBREL and PUBCOMP messages. */
for(int i = 0; i < 2; i++){
mosq->loop();
}

run = -1;
mosq->disconnect();
// Wait for disconnect to complete.
/* Wait for disconnect to complete. */
while(run == -1){
mosq->loop(100, 1);
}
Expand Down
2 changes: 1 addition & 1 deletion test/lib/cpp/03-publish-b2c-qos2.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ int main(int argc, char *argv[])
mosq->loop();
}

// Drain the PUBREL and PUBCOMP messages.
/* Drain the PUBREL and PUBCOMP messages. */
for(int i = 0; i < 2; i++){
mosq->loop();
}
Expand Down
2 changes: 1 addition & 1 deletion test/lib/cpp/11-prop-recv.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ int main(int argc, char *argv[])
}

if (qos > 1){
// Drain the PUBREL and PUBCOMP messages.
/* Drain the PUBREL and PUBCOMP messages. */
for(int i = 0; i < 2; i++){
mosq->loop();
}
Expand Down