Skip to content

Commit

Permalink
Handle mismatched handshakes properly.
Browse files Browse the repository at this point in the history
For example, a QoS1 PUBLISH with QoS2 reply.
  • Loading branch information
ralight committed Feb 8, 2019
1 parent 760b2f1 commit 5236295
Show file tree
Hide file tree
Showing 15 changed files with 401 additions and 20 deletions.
1 change: 1 addition & 0 deletions ChangeLog.txt
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
Broker:
- Fixed comment handling for config options that have optional arguments.
- Improved documentation around bridge topic remapping.
- Handle mismatched handshakes (e.g. QoS1 PUBLISH with QoS2 reply) properly.

Library:
- Fix TLS connections not working over SOCKS.
Expand Down
9 changes: 7 additions & 2 deletions lib/handle_pubackcomp.c
Original file line number Diff line number Diff line change
Expand Up @@ -44,15 +44,17 @@ int handle__pubackcomp(struct mosquitto *mosq, const char *type)
{
uint16_t mid;
int rc;
int qos;

assert(mosq);
rc = packet__read_uint16(&mosq->in_packet, &mid);
if(rc) return rc;
qos = type[3] == 'A'?1:2; /* pubAck or pubComp */
#ifdef WITH_BROKER
log__printf(NULL, MOSQ_LOG_DEBUG, "Received %s from %s (Mid: %d)", type, mosq->id, mid);

if(mid){
rc = db__message_delete(db, mosq, mid, mosq_md_out);
rc = db__message_delete(db, mosq, mid, mosq_md_out, qos);
if(rc == MOSQ_ERR_NOT_FOUND){
log__printf(mosq, MOSQ_LOG_WARNING, "Warning: Received %s from %s for an unknown packet identifier %d.", type, mosq->id, mid);
return MOSQ_ERR_SUCCESS;
Expand All @@ -63,7 +65,10 @@ int handle__pubackcomp(struct mosquitto *mosq, const char *type)
#else
log__printf(mosq, MOSQ_LOG_DEBUG, "Client %s received %s (Mid: %d)", mosq->id, type, mid);

if(!message__delete(mosq, mid, mosq_md_out)){
rc = message__delete(mosq, mid, mosq_md_out, qos);
if(rc){
return rc;
}else{
/* Only inform the client the message has been sent once. */
pthread_mutex_lock(&mosq->callback_mutex);
if(mosq->on_publish){
Expand Down
4 changes: 2 additions & 2 deletions lib/handle_pubrec.c
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,11 @@ int handle__pubrec(struct mosquitto *mosq)
#ifdef WITH_BROKER
log__printf(NULL, MOSQ_LOG_DEBUG, "Received PUBREC from %s (Mid: %d)", mosq->id, mid);

rc = db__message_update(mosq, mid, mosq_md_out, mosq_ms_wait_for_pubcomp);
rc = db__message_update(mosq, mid, mosq_md_out, mosq_ms_wait_for_pubcomp, 2);
#else
log__printf(mosq, MOSQ_LOG_DEBUG, "Client %s received PUBREC (Mid: %d)", mosq->id, mid);

rc = message__out_update(mosq, mid, mosq_ms_wait_for_pubcomp);
rc = message__out_update(mosq, mid, mosq_ms_wait_for_pubcomp, 2);
#endif
if(rc == MOSQ_ERR_NOT_FOUND){
log__printf(mosq, MOSQ_LOG_WARNING, "Warning: Received PUBREC from %s for an unknown packet identifier %d.", mosq->id, mid);
Expand Down
10 changes: 8 additions & 2 deletions lib/handle_pubrel.c
Original file line number Diff line number Diff line change
Expand Up @@ -55,15 +55,21 @@ int handle__pubrel(struct mosquitto_db *db, struct mosquitto *mosq)
#ifdef WITH_BROKER
log__printf(NULL, MOSQ_LOG_DEBUG, "Received PUBREL from %s (Mid: %d)", mosq->id, mid);

if(db__message_release(db, mosq, mid, mosq_md_in)){
rc = db__message_release(db, mosq, mid, mosq_md_in);
if(rc == MOSQ_ERR_PROTOCOL){
return rc;
}else if(rc != MOSQ_ERR_SUCCESS){
/* Message not found. Still send a PUBCOMP anyway because this could be
* due to a repeated PUBREL after a client has reconnected. */
log__printf(mosq, MOSQ_LOG_WARNING, "Warning: Received PUBREL from %s for an unknown packet identifier %d.", mosq->id, mid);
}
#else
log__printf(mosq, MOSQ_LOG_DEBUG, "Client %s received PUBREL (Mid: %d)", mosq->id, mid);

if(!message__remove(mosq, mid, mosq_md_in, &message)){
rc = message__remove(mosq, mid, mosq_md_in, &message, 2);
if(rc){
return rc;
}else{
/* Only pass the message on if we have removed it from the queue - this
* prevents multiple callbacks for the same message. */
pthread_mutex_lock(&mosq->callback_mutex);
Expand Down
17 changes: 13 additions & 4 deletions lib/messages_mosq.c
Original file line number Diff line number Diff line change
Expand Up @@ -82,13 +82,13 @@ int mosquitto_message_copy(struct mosquitto_message *dst, const struct mosquitto
return MOSQ_ERR_SUCCESS;
}

int message__delete(struct mosquitto *mosq, uint16_t mid, enum mosquitto_msg_direction dir)
int message__delete(struct mosquitto *mosq, uint16_t mid, enum mosquitto_msg_direction dir, int qos)
{
struct mosquitto_message_all *message;
int rc;
assert(mosq);

rc = message__remove(mosq, mid, dir, &message);
rc = message__remove(mosq, mid, dir, &message, qos);
if(rc == MOSQ_ERR_SUCCESS){
message__cleanup(&message);
}
Expand Down Expand Up @@ -218,7 +218,7 @@ void message__reconnect_reset(struct mosquitto *mosq)
pthread_mutex_unlock(&mosq->out_message_mutex);
}

int message__remove(struct mosquitto *mosq, uint16_t mid, enum mosquitto_msg_direction dir, struct mosquitto_message_all **message)
int message__remove(struct mosquitto *mosq, uint16_t mid, enum mosquitto_msg_direction dir, struct mosquitto_message_all **message, int qos)
{
struct mosquitto_message_all *cur, *prev = NULL;
bool found = false;
Expand All @@ -231,6 +231,9 @@ int message__remove(struct mosquitto *mosq, uint16_t mid, enum mosquitto_msg_dir
cur = mosq->out_messages;
while(cur){
if(cur->msg.mid == mid){
if(cur->msg.qos != qos){
return MOSQ_ERR_PROTOCOL;
}
if(prev){
prev->next = cur->next;
}else{
Expand Down Expand Up @@ -287,6 +290,9 @@ int message__remove(struct mosquitto *mosq, uint16_t mid, enum mosquitto_msg_dir
cur = mosq->in_messages;
while(cur){
if(cur->msg.mid == mid){
if(cur->msg.qos != qos){
return MOSQ_ERR_PROTOCOL;
}
if(prev){
prev->next = cur->next;
}else{
Expand Down Expand Up @@ -370,7 +376,7 @@ void mosquitto_message_retry_set(struct mosquitto *mosq, unsigned int message_re
{
}

int message__out_update(struct mosquitto *mosq, uint16_t mid, enum mosquitto_msg_state state)
int message__out_update(struct mosquitto *mosq, uint16_t mid, enum mosquitto_msg_state state, int qos)
{
struct mosquitto_message_all *message;
assert(mosq);
Expand All @@ -379,6 +385,9 @@ int message__out_update(struct mosquitto *mosq, uint16_t mid, enum mosquitto_msg
message = mosq->out_messages;
while(message){
if(message->msg.mid == mid){
if(message->msg.qos != qos){
return MOSQ_ERR_PROTOCOL;
}
message->state = state;
message->timestamp = mosquitto_time();
pthread_mutex_unlock(&mosq->out_message_mutex);
Expand Down
6 changes: 3 additions & 3 deletions lib/messages_mosq.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,11 @@ and the Eclipse Distribution License is available at

void message__cleanup_all(struct mosquitto *mosq);
void message__cleanup(struct mosquitto_message_all **message);
int message__delete(struct mosquitto *mosq, uint16_t mid, enum mosquitto_msg_direction dir);
int message__delete(struct mosquitto *mosq, uint16_t mid, enum mosquitto_msg_direction dir, int qos);
int message__queue(struct mosquitto *mosq, struct mosquitto_message_all *message, enum mosquitto_msg_direction dir);
void message__reconnect_reset(struct mosquitto *mosq);
int message__remove(struct mosquitto *mosq, uint16_t mid, enum mosquitto_msg_direction dir, struct mosquitto_message_all **message);
int message__remove(struct mosquitto *mosq, uint16_t mid, enum mosquitto_msg_direction dir, struct mosquitto_message_all **message, int qos);
void message__retry_check(struct mosquitto *mosq);
int message__out_update(struct mosquitto *mosq, uint16_t mid, enum mosquitto_msg_state state);
int message__out_update(struct mosquitto *mosq, uint16_t mid, enum mosquitto_msg_state state, int qos);

#endif
19 changes: 14 additions & 5 deletions src/database.c
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,7 @@ void db__message_dequeue_first(struct mosquitto *context)
msg->next = NULL;
}

int db__message_delete(struct mosquitto_db *db, struct mosquitto *context, uint16_t mid, enum mosquitto_msg_direction dir)
int db__message_delete(struct mosquitto_db *db, struct mosquitto *context, uint16_t mid, enum mosquitto_msg_direction dir, int qos)
{
struct mosquitto_client_msg *tail, *last = NULL;
int msg_index = 0;
Expand All @@ -299,6 +299,11 @@ int db__message_delete(struct mosquitto_db *db, struct mosquitto *context, uint1
while(tail){
msg_index++;
if(tail->mid == mid && tail->direction == dir){
if(tail->qos != qos){
return MOSQ_ERR_PROTOCOL;
}else if(qos == 2 && tail->state != mosq_ms_wait_for_pubcomp){
return MOSQ_ERR_PROTOCOL;
}
msg_index--;
db__message_remove(db, context, &tail, last);
}else{
Expand Down Expand Up @@ -509,13 +514,16 @@ int db__message_insert(struct mosquitto_db *db, struct mosquitto *context, uint1
#endif
}

int db__message_update(struct mosquitto *context, uint16_t mid, enum mosquitto_msg_direction dir, enum mosquitto_msg_state state)
int db__message_update(struct mosquitto *context, uint16_t mid, enum mosquitto_msg_direction dir, enum mosquitto_msg_state state, int qos)
{
struct mosquitto_client_msg *tail;

tail = context->inflight_msgs;
while(tail){
if(tail->mid == mid && tail->direction == dir){
if(tail->qos != qos){
return MOSQ_ERR_PROTOCOL;
}
tail->state = state;
tail->timestamp = mosquitto_time();
return MOSQ_ERR_SUCCESS;
Expand Down Expand Up @@ -780,7 +788,6 @@ int db__message_reconnect_reset(struct mosquitto_db *db, struct mosquitto *conte
int db__message_release(struct mosquitto_db *db, struct mosquitto *context, uint16_t mid, enum mosquitto_msg_direction dir)
{
struct mosquitto_client_msg *tail, *last = NULL;
int qos;
int retain;
char *topic;
char *source_id;
Expand All @@ -793,7 +800,9 @@ int db__message_release(struct mosquitto_db *db, struct mosquitto *context, uint
while(tail){
msg_index++;
if(tail->mid == mid && tail->direction == dir){
qos = tail->store->qos;
if(tail->store->qos != 2){
return MOSQ_ERR_PROTOCOL;
}
topic = tail->store->topic;
retain = tail->retain;
source_id = tail->store->source_id;
Expand All @@ -802,7 +811,7 @@ int db__message_release(struct mosquitto_db *db, struct mosquitto *context, uint
* denied/dropped and is being processed so the client doesn't
* keep resending it. That means we don't send it to other
* clients. */
if(!topic || !sub__messages_queue(db, source_id, topic, qos, retain, &tail->store)){
if(!topic || !sub__messages_queue(db, source_id, topic, 2, retain, &tail->store)){
db__message_remove(db, context, &tail, last);
deleted = true;
}else{
Expand Down
4 changes: 2 additions & 2 deletions src/mosquitto_broker_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -545,10 +545,10 @@ int persist__restore(struct mosquitto_db *db);
void db__limits_set(int inflight, unsigned long inflight_bytes, int queued, unsigned long queued_bytes);
/* Return the number of in-flight messages in count. */
int db__message_count(int *count);
int db__message_delete(struct mosquitto_db *db, struct mosquitto *context, uint16_t mid, enum mosquitto_msg_direction dir);
int db__message_delete(struct mosquitto_db *db, struct mosquitto *context, uint16_t mid, enum mosquitto_msg_direction dir, int qos);
int db__message_insert(struct mosquitto_db *db, struct mosquitto *context, uint16_t mid, enum mosquitto_msg_direction dir, int qos, bool retain, struct mosquitto_msg_store *stored);
int db__message_release(struct mosquitto_db *db, struct mosquitto *context, uint16_t mid, enum mosquitto_msg_direction dir);
int db__message_update(struct mosquitto *context, uint16_t mid, enum mosquitto_msg_direction dir, enum mosquitto_msg_state state);
int db__message_update(struct mosquitto *context, uint16_t mid, enum mosquitto_msg_direction dir, enum mosquitto_msg_state state, int qos);
int db__message_write(struct mosquitto_db *db, struct mosquitto *context);
void db__message_dequeue_first(struct mosquitto *context);
int db__messages_delete(struct mosquitto_db *db, struct mosquitto *context);
Expand Down
69 changes: 69 additions & 0 deletions test/broker/02-subpub-qos1-bad-pubcomp.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
#!/usr/bin/env python

# Test what the broker does if receiving a PUBCOMP in response to a QoS 1 PUBLISH.

import inspect, os, sys
# From https://stackoverflow.com/questions/279237/python-import-a-module-from-a-folder
cmd_subfolder = os.path.realpath(os.path.abspath(os.path.join(os.path.split(inspect.getfile( inspect.currentframe() ))[0],"..")))
if cmd_subfolder not in sys.path:
sys.path.insert(0, cmd_subfolder)

import mosq_test
import time

rc = 1
keepalive = 60

connect_packet = mosq_test.gen_connect("subpub-qos1-test", keepalive=keepalive)
connack_packet = mosq_test.gen_connack(rc=0)

mid = 1
subscribe_packet = mosq_test.gen_subscribe(mid, "subpub/qos1", 1)
suback_packet = mosq_test.gen_suback(mid, 1)

mid = 1
publish_packet2 = mosq_test.gen_publish("subpub/qos1", qos=1, mid=mid, payload="message")


helper_connect = mosq_test.gen_connect("helper", keepalive=keepalive)
helper_connack = mosq_test.gen_connack(rc=0)

mid = 1
publish1s_packet = mosq_test.gen_publish("subpub/qos1", qos=1, mid=mid, payload="message")
puback1s_packet = mosq_test.gen_puback(mid)

mid = 1
publish1r_packet = mosq_test.gen_publish("subpub/qos1", qos=1, mid=mid, payload="message")
pubcomp1r_packet = mosq_test.gen_pubcomp(mid)

pingreq_packet = mosq_test.gen_pingreq()
pingresp_packet = mosq_test.gen_pingresp()

port = mosq_test.get_port()
broker = mosq_test.start_broker(filename=os.path.basename(__file__), port=port)

try:
sock = mosq_test.do_client_connect(connect_packet, connack_packet, timeout=20, port=port)
mosq_test.do_send_receive(sock, subscribe_packet, suback_packet, "suback")

helper = mosq_test.do_client_connect(helper_connect, helper_connack, timeout=20, port=port)
mosq_test.do_send_receive(helper, publish1s_packet, puback1s_packet, "puback 1s")
helper.close()

if mosq_test.expect_packet(sock, "publish 1r", publish1r_packet):
sock.send(pubcomp1r_packet)
sock.send(pingreq_packet)
p = sock.recv(len(pingresp_packet))
if len(p) == 0:
rc = 0

sock.close()
finally:
broker.terminate()
broker.wait()
(stdo, stde) = broker.communicate()
if rc:
print(stde)

exit(rc)

65 changes: 65 additions & 0 deletions test/broker/02-subpub-qos1-bad-pubrec.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
#!/usr/bin/env python

# Test what the broker does if receiving a PUBREC in response to a QoS 1 PUBLISH.

import inspect, os, sys
# From https://stackoverflow.com/questions/279237/python-import-a-module-from-a-folder
cmd_subfolder = os.path.realpath(os.path.abspath(os.path.join(os.path.split(inspect.getfile( inspect.currentframe() ))[0],"..")))
if cmd_subfolder not in sys.path:
sys.path.insert(0, cmd_subfolder)

import mosq_test
import time

rc = 1
keepalive = 60

connect_packet = mosq_test.gen_connect("subpub-qos1-test", keepalive=keepalive)
connack_packet = mosq_test.gen_connack(rc=0)

mid = 1
subscribe_packet = mosq_test.gen_subscribe(mid, "subpub/qos1", 1)
suback_packet = mosq_test.gen_suback(mid, 1)

helper_connect = mosq_test.gen_connect("helper", keepalive=keepalive)
helper_connack = mosq_test.gen_connack(rc=0)

mid = 1
publish1s_packet = mosq_test.gen_publish("subpub/qos1", qos=1, mid=mid, payload="message")
puback1s_packet = mosq_test.gen_puback(mid)

mid = 1
publish1r_packet = mosq_test.gen_publish("subpub/qos1", qos=1, mid=mid, payload="message")
pubrec1r_packet = mosq_test.gen_pubrec(mid)

pingreq_packet = mosq_test.gen_pingreq()
pingresp_packet = mosq_test.gen_pingresp()

port = mosq_test.get_port()
broker = mosq_test.start_broker(filename=os.path.basename(__file__), port=port)

try:
sock = mosq_test.do_client_connect(connect_packet, connack_packet, timeout=20, port=port)
mosq_test.do_send_receive(sock, subscribe_packet, suback_packet, "suback")

helper = mosq_test.do_client_connect(helper_connect, helper_connack, timeout=20, port=port)
mosq_test.do_send_receive(helper, publish1s_packet, puback1s_packet, "puback 1s")
helper.close()

if mosq_test.expect_packet(sock, "publish 1r", publish1r_packet):
sock.send(pubrec1r_packet)
sock.send(pingreq_packet)
p = sock.recv(len(pingresp_packet))
if len(p) == 0:
rc = 0

sock.close()
finally:
broker.terminate()
broker.wait()
(stdo, stde) = broker.communicate()
if rc:
print(stde)

exit(rc)

Loading

0 comments on commit 5236295

Please sign in to comment.