Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

1.deal with pubrel, ignore the ret of msg remove, 2.delete the same m… #1629

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1.deal with pubrel, ignore the ret of msg remove, 2.delete the same m…
…sg when input/output msg into msg-queue
  • Loading branch information
yangcunbiao committed Mar 15, 2020
commit 03a19122a0ac0246bd1ebc8b46f31bdf6e5d1c0d
3 changes: 2 additions & 1 deletion lib/handle_pubrel.c
Expand Up @@ -102,7 +102,8 @@ int handle__pubrel(struct mosquitto_db *db, struct mosquitto *mosq)

rc = message__remove(mosq, mid, mosq_md_in, &message, 2);
if(rc){
return rc;
// return rc;
/* Apparently this is "normal" behaviour, so we don't need to issue a warning */
}else{
/* Only pass the message on if we have removed it from the queue - this
* prevents multiple callbacks for the same message. */
Expand Down
21 changes: 21 additions & 0 deletions lib/messages_mosq.c
Expand Up @@ -120,14 +120,35 @@ void mosquitto_message_free_contents(struct mosquitto_message *message)
int message__queue(struct mosquitto *mosq, struct mosquitto_message_all *message, enum mosquitto_msg_direction dir)
{
/* mosq->*_message_mutex should be locked before entering this function */
struct mosquitto_message_all *cur, *tmp;
assert(mosq);
assert(message);
assert(message->msg.qos != 0);

if(dir == mosq_md_out){
////////

DL_FOREACH_SAFE(mosq->msgs_out.inflight, cur, tmp){
if((cur->msg.mid == message->msg.mid) && (cur->msg.qos == message->msg.qos)){
DL_DELETE(mosq->msgs_out.inflight, cur);
mosq->msgs_out.queue_len--;
break;
}
}
////////

DL_APPEND(mosq->msgs_out.inflight, message);
mosq->msgs_out.queue_len++;
}else{
///////////
DL_FOREACH_SAFE(mosq->msgs_in.inflight, cur, tmp){
if((cur->msg.mid == message->msg.mid) &&(cur->msg.qos == message->msg.qos)){
DL_DELETE(mosq->msgs_in.inflight, cur);
mosq->msgs_in.queue_len--;
break;
}
}
///////////
DL_APPEND(mosq->msgs_in.inflight, message);
mosq->msgs_in.queue_len++;
}
Expand Down