Skip to content

Commit

Permalink
Disallow writing to $ topics where appropriate.
Browse files Browse the repository at this point in the history
  • Loading branch information
ralight committed Mar 3, 2019
1 parent 1d4bf55 commit 6a1ac70
Show file tree
Hide file tree
Showing 14 changed files with 213 additions and 30 deletions.
1 change: 1 addition & 0 deletions ChangeLog.txt
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ Broker features:
- Add explicit support for TLS v1.3.
- Drop support for TLS v1.0.
- Add support for Automotive DLT logging.
- Disallow writing to $ topics where appropriate.

Client library features:
- Add mosquitto_subscribe_multiple() for sending subscriptions to multiple
Expand Down
4 changes: 2 additions & 2 deletions lib/handle_publish.c
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ int handle__publish(struct mosquitto *mosq)
mosquitto_property_free_all(&properties);
return MOSQ_ERR_SUCCESS;
case 1:
rc = send__puback(mosq, message->msg.mid);
rc = send__puback(mosq, message->msg.mid, 0);
pthread_mutex_lock(&mosq->callback_mutex);
if(mosq->on_message){
mosq->in_callback = true;
Expand All @@ -145,7 +145,7 @@ int handle__publish(struct mosquitto *mosq)
mosquitto_property_free_all(&properties);
return rc;
case 2:
rc = send__pubrec(mosq, message->msg.mid);
rc = send__pubrec(mosq, message->msg.mid, 0);
pthread_mutex_lock(&mosq->in_message_mutex);
message->state = mosq_ms_wait_for_pubrel;
message__queue(mosq, message, mosq_md_in);
Expand Down
2 changes: 1 addition & 1 deletion lib/messages_mosq.c
Original file line number Diff line number Diff line change
Expand Up @@ -343,7 +343,7 @@ void message__retry_check_actual(struct mosquitto *mosq, struct mosquitto_messag
case mosq_ms_wait_for_pubrel:
messages->timestamp = now;
messages->dup = true;
send__pubrec(mosq, messages->msg.mid);
send__pubrec(mosq, messages->msg.mid, 0);
break;
case mosq_ms_resend_pubrel:
case mosq_ms_wait_for_pubcomp:
Expand Down
16 changes: 8 additions & 8 deletions lib/send_mosq.c
Original file line number Diff line number Diff line change
Expand Up @@ -65,16 +65,16 @@ int send__pingresp(struct mosquitto *mosq)
return send__simple_command(mosq, CMD_PINGRESP);
}

int send__puback(struct mosquitto *mosq, uint16_t mid)
int send__puback(struct mosquitto *mosq, uint16_t mid, uint8_t reason_code)
{
#ifdef WITH_BROKER
log__printf(NULL, MOSQ_LOG_DEBUG, "Sending PUBACK to %s (Mid: %d)", mosq->id, mid);
log__printf(NULL, MOSQ_LOG_DEBUG, "Sending PUBACK to %s (Mid: %d, RC:%d)", mosq->id, mid, reason_code);
#else
log__printf(mosq, MOSQ_LOG_DEBUG, "Client %s sending PUBACK (Mid: %d)", mosq->id, mid);
log__printf(mosq, MOSQ_LOG_DEBUG, "Client %s sending PUBACK (Mid: %d, RC:%d)", mosq->id, mid, reason_code);
#endif
util__increment_receive_quota(mosq);
/* We don't use Reason String or User Property yet. */
return send__command_with_mid(mosq, CMD_PUBACK, mid, false, 0, NULL);
return send__command_with_mid(mosq, CMD_PUBACK, mid, false, reason_code, NULL);
}

int send__pubcomp(struct mosquitto *mosq, uint16_t mid)
Expand All @@ -90,20 +90,20 @@ int send__pubcomp(struct mosquitto *mosq, uint16_t mid)
}


int send__pubrec(struct mosquitto *mosq, uint16_t mid)
int send__pubrec(struct mosquitto *mosq, uint16_t mid, uint8_t reason_code)
{
#ifdef WITH_BROKER
if(mosq) log__printf(NULL, MOSQ_LOG_DEBUG, "Sending PUBREC to %s (Mid: %d)", mosq->id, mid);
if(mosq) log__printf(NULL, MOSQ_LOG_DEBUG, "Sending PUBREC to %s (Mid: %d, RC:%d)", mosq->id, mid, reason_code);
#else
if(mosq) log__printf(mosq, MOSQ_LOG_DEBUG, "Client %s sending PUBREC (Mid: %d)", mosq->id, mid);
if(mosq) log__printf(mosq, MOSQ_LOG_DEBUG, "Client %s sending PUBREC (Mid: %d, RC:%d)", mosq->id, mid, reason_code);
#endif
/* FIXME - if rc >= 0x80 quota needs incrementing
if(rc >= 0x80){
util__increment_receive_quota(mosq);
}
*/
/* We don't use Reason String or User Property yet. */
return send__command_with_mid(mosq, CMD_PUBREC, mid, false, 0, NULL);
return send__command_with_mid(mosq, CMD_PUBREC, mid, false, reason_code, NULL);
}

int send__pubrel(struct mosquitto *mosq, uint16_t mid)
Expand Down
4 changes: 2 additions & 2 deletions lib/send_mosq.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,10 @@ int send__connect(struct mosquitto *mosq, uint16_t keepalive, bool clean_session
int send__disconnect(struct mosquitto *mosq, uint8_t reason_code, const mosquitto_property *properties);
int send__pingreq(struct mosquitto *mosq);
int send__pingresp(struct mosquitto *mosq);
int send__puback(struct mosquitto *mosq, uint16_t mid);
int send__puback(struct mosquitto *mosq, uint16_t mid, uint8_t reason_code);
int send__pubcomp(struct mosquitto *mosq, uint16_t mid);
int send__publish(struct mosquitto *mosq, uint16_t mid, const char *topic, uint32_t payloadlen, const void *payload, int qos, bool retain, bool dup, const mosquitto_property *cmsg_props, const mosquitto_property *store_props, uint32_t expiry_interval);
int send__pubrec(struct mosquitto *mosq, uint16_t mid);
int send__pubrec(struct mosquitto *mosq, uint16_t mid, uint8_t reason_code);
int send__pubrel(struct mosquitto *mosq, uint16_t mid);
int send__subscribe(struct mosquitto *mosq, int *mid, int topic_count, char *const *const topic, int topic_qos, const mosquitto_property *properties);
int send__unsubscribe(struct mosquitto *mosq, int *mid, int topic_count, char *const *const topic, const mosquitto_property *properties);
Expand Down
8 changes: 4 additions & 4 deletions src/database.c
Original file line number Diff line number Diff line change
Expand Up @@ -332,7 +332,7 @@ int db__message_delete(struct mosquitto_db *db, struct mosquitto *context, uint1
db__message_dequeue_first(context);
}else{
if(tail->qos == 2){
send__pubrec(context, tail->mid);
send__pubrec(context, tail->mid, 0);
tail->state = mosq_ms_wait_for_pubrel;
db__message_dequeue_first(context);
}
Expand Down Expand Up @@ -884,7 +884,7 @@ int db__message_release(struct mosquitto_db *db, struct mosquitto *context, uint
db__message_dequeue_first(context);
}else{
if(tail->qos == 2){
send__pubrec(context, tail->mid);
send__pubrec(context, tail->mid, 0);
tail->state = mosq_ms_wait_for_pubrel;
db__message_dequeue_first(context);
}
Expand Down Expand Up @@ -992,7 +992,7 @@ int db__message_write(struct mosquitto_db *db, struct mosquitto *context)
break;

case mosq_ms_send_pubrec:
rc = send__pubrec(context, mid);
rc = send__pubrec(context, mid, 0);
if(!rc){
tail->state = mosq_ms_wait_for_pubrel;
}else{
Expand Down Expand Up @@ -1051,7 +1051,7 @@ int db__message_write(struct mosquitto_db *db, struct mosquitto *context)
if(tail->qos == 2){
tail->state = mosq_ms_send_pubrec;
db__message_dequeue_first(context);
rc = send__pubrec(context, tail->mid);
rc = send__pubrec(context, tail->mid, 0);
if(!rc){
tail->state = mosq_ms_wait_for_pubrel;
}else{
Expand Down
33 changes: 20 additions & 13 deletions src/handle_publish.c
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ int handle__publish(struct mosquitto_db *db, struct mosquitto *context)
mosquitto_property *msg_properties = NULL, *msg_properties_last;
uint32_t message_expiry_interval = 0;
uint16_t topic_alias = 0;
uint8_t reason_code = 0;

#ifdef WITH_BRIDGE
char *topic_temp;
Expand Down Expand Up @@ -252,6 +253,7 @@ int handle__publish(struct mosquitto_db *db, struct mosquitto *context)
if(payloadlen){
if(db->config->message_size_limit && payloadlen > db->config->message_size_limit){
log__printf(NULL, MOSQ_LOG_DEBUG, "Dropped too large PUBLISH from %s (d%d, q%d, r%d, m%d, '%s', ... (%ld bytes))", context->id, dup, qos, retain, mid, topic, (long)payloadlen);
reason_code = MQTT_RC_IMPLEMENTATION_SPECIFIC;
goto process_bad_message;
}
if(UHPA_ALLOC(payload, payloadlen) == 0){
Expand All @@ -272,6 +274,7 @@ int handle__publish(struct mosquitto_db *db, struct mosquitto *context)
rc = mosquitto_acl_check(db, context, topic, payloadlen, UHPA_ACCESS(payload, payloadlen), qos, retain, MOSQ_ACL_WRITE);
if(rc == MOSQ_ERR_ACL_DENIED){
log__printf(NULL, MOSQ_LOG_DEBUG, "Denied PUBLISH from %s (d%d, q%d, r%d, m%d, '%s', ... (%ld bytes))", context->id, dup, qos, retain, mid, topic, (long)payloadlen);
reason_code = MQTT_RC_NOT_AUTHORIZED;
goto process_bad_message;
}else if(rc != MOSQ_ERR_SUCCESS){
mosquitto__free(topic);
Expand Down Expand Up @@ -305,7 +308,7 @@ int handle__publish(struct mosquitto_db *db, struct mosquitto *context)
break;
case 1:
if(sub__messages_queue(db, context->id, topic, qos, retain, &stored)) rc = 1;
if(send__puback(context, mid)) rc = 1;
if(send__puback(context, mid, 0)) rc = 1;
break;
case 2:
if(!dup){
Expand All @@ -316,7 +319,7 @@ int handle__publish(struct mosquitto_db *db, struct mosquitto *context)
/* db__message_insert() returns 2 to indicate dropped message
* due to queue. This isn't an error so don't disconnect them. */
if(!res){
if(send__pubrec(context, mid)) rc = 1;
if(send__pubrec(context, mid, 0)) rc = 1;
}else if(res == 1){
rc = 1;
}
Expand All @@ -331,19 +334,23 @@ int handle__publish(struct mosquitto_db *db, struct mosquitto *context)
case 0:
return MOSQ_ERR_SUCCESS;
case 1:
return send__puback(context, mid);
return send__puback(context, mid, reason_code);
case 2:
db__message_store_find(context, mid, &stored);
if(!stored){
if(db__message_store(db, context, mid, NULL, qos, 0, NULL, false, &stored, 0, NULL, 0)){
return 1;
}
res = db__message_insert(db, context, mid, mosq_md_in, qos, false, stored, NULL);
if(context->protocol == mosq_p_mqtt5){
return send__pubrec(context, mid, reason_code);
}else{
res = 0;
}
if(!res){
res = send__pubrec(context, mid);
db__message_store_find(context, mid, &stored);
if(!stored){
if(db__message_store(db, context, mid, NULL, qos, 0, NULL, false, &stored, 0, NULL, 0)){
return 1;
}
res = db__message_insert(db, context, mid, mosq_md_in, qos, false, stored, NULL);
}else{
res = 0;
}
if(!res){
res = send__pubrec(context, mid, 0);
}
}
return res;
}
Expand Down
37 changes: 37 additions & 0 deletions src/security.c
Original file line number Diff line number Diff line change
Expand Up @@ -463,6 +463,40 @@ static int acl__check_single(struct mosquitto__auth_plugin_config *auth_plugin,
}


static int acl__check_dollar(struct mosquitto_db *db, struct mosquitto *context, const char *topic, int access)
{
int rc;
bool match = false;

if(topic[0] != '$') return MOSQ_ERR_SUCCESS;

if(!strncmp(topic, "$SYS", 4)){
if(access == MOSQ_ACL_WRITE){
/* Potentially allow write access for bridge status, otherwise explicitly deny. */
rc = mosquitto_topic_matches_sub("$SYS/broker/connection/+/state", topic, &match);
if(rc == MOSQ_ERR_SUCCESS && match == true){
return MOSQ_ERR_SUCCESS;
}else{
return MOSQ_ERR_ACL_DENIED;
}
}else{
return MOSQ_ERR_SUCCESS;
}
}else if(!strncmp(topic, "$share", 6)){
/* Only allow sub/unsub to shared subscriptions */
if(access == MOSQ_ACL_SUBSCRIBE){
//FIXME if(access == MOSQ_ACL_SUBSCRIBE || access == MOSQ_ACL_UNSUBSCRIBE){
return MOSQ_ERR_SUCCESS;
}else{
return MOSQ_ERR_ACL_DENIED;
}
}else{
/* This is an unknown $ topic, for the moment just defer to actual tests. */
return MOSQ_ERR_SUCCESS;
}
}


int mosquitto_acl_check(struct mosquitto_db *db, struct mosquitto *context, const char *topic, long payloadlen, void* payload, int qos, bool retain, int access)
{
int rc;
Expand All @@ -474,6 +508,9 @@ int mosquitto_acl_check(struct mosquitto_db *db, struct mosquitto *context, cons
return MOSQ_ERR_ACL_DENIED;
}

rc = acl__check_dollar(db, context, topic, access);
if(rc) return rc;

rc = mosquitto_acl_check_default(db, context, topic, access);
if(rc != MOSQ_ERR_PLUGIN_DEFER){
return rc;
Expand Down
41 changes: 41 additions & 0 deletions test/broker/02-subscribe-dollar-v5.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
#!/usr/bin/env python

# Test whether a SUBSCRIBE to $SYS or $share succeeds

from mosq_test_helper import *

def do_test(proto_ver):
rc = 1
keepalive = 60
connect_packet = mosq_test.gen_connect("subscribe-test", keepalive=keepalive, proto_ver=proto_ver)
connack_packet = mosq_test.gen_connack(rc=0, proto_ver=proto_ver)

mid = 1
subscribe1_packet = mosq_test.gen_subscribe(mid, "$SYS/broker/missing", 0, proto_ver=proto_ver)
suback1_packet = mosq_test.gen_suback(mid, 0, proto_ver=proto_ver)

mid = 2
subscribe2_packet = mosq_test.gen_subscribe(mid, "$share/share/#", 0, proto_ver=proto_ver)
suback2_packet = mosq_test.gen_suback(mid, 0, proto_ver=proto_ver)

port = mosq_test.get_port()
broker = mosq_test.start_broker(filename=os.path.basename(__file__), port=port)

try:
sock = mosq_test.do_client_connect(connect_packet, connack_packet, port=port)
mosq_test.do_send_receive(sock, subscribe1_packet, suback1_packet, "suback1")
mosq_test.do_send_receive(sock, subscribe2_packet, suback2_packet, "suback2")

rc = 0

sock.close()
finally:
broker.terminate()
broker.wait()
(stdo, stde) = broker.communicate()
if rc:
print(stde)
exit(rc)

do_test(4)
do_test(5)
46 changes: 46 additions & 0 deletions test/broker/03-publish-dollar-v5.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
#!/usr/bin/env python

# Test whether a PUBLISH to $ topics QoS 1 results in the expected PUBACK packet.

from mosq_test_helper import *

mid = 1
def helper(topic, reason_code):
global mid

publish_packet = mosq_test.gen_publish(topic, qos=1, mid=mid, payload="message", proto_ver=5)
if reason_code == 0:
puback_packet = mosq_test.gen_puback(mid, proto_ver=5)
else:
puback_packet = mosq_test.gen_puback(mid, proto_ver=5, reason_code=reason_code)
sock = mosq_test.do_client_connect(connect_packet, connack_packet, port=port)
mosq_test.do_send_receive(sock, publish_packet, puback_packet, "puback%d"%(mid))


rc = 1
keepalive = 60
connect_packet = mosq_test.gen_connect("pub-test", keepalive=keepalive, proto_ver=5)
connack_packet = mosq_test.gen_connack(rc=0, proto_ver=5)

port = mosq_test.get_port()
broker = mosq_test.start_broker(filename=os.path.basename(__file__), port=port)

try:
sock = mosq_test.do_client_connect(connect_packet, connack_packet, port=port)
helper("$SYS/broker/uptime", mqtt5_rc.MQTT_RC_NOT_AUTHORIZED)
helper("$SYS/broker/connection/me", mqtt5_rc.MQTT_RC_NOT_AUTHORIZED)
helper("$SYS/broker/connection/me/state", 0)
helper("$share/share/topic", mqtt5_rc.MQTT_RC_NOT_AUTHORIZED)

rc = 0

sock.close()
finally:
broker.terminate()
broker.wait()
(stdo, stde) = broker.communicate()
if rc:
print(stde)

exit(rc)

2 changes: 2 additions & 0 deletions test/broker/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ endif
./02-subscribe-qos0.py
./02-subscribe-qos1.py
./02-subscribe-qos2.py
./02-subscribe-dollar-v5.py
./02-subpub-qos0.py
./02-subpub-qos1.py
./02-subpub-qos2.py
Expand Down Expand Up @@ -94,6 +95,7 @@ endif
#./03-publish-qos1-queued-bytes.py
./03-publish-invalid-utf8.py
./03-publish-dollar.py
./03-publish-dollar-v5.py
./03-publish-qos1-retain-disabled.py

04 :
Expand Down
1 change: 1 addition & 0 deletions test/broker/mosq_test_helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import mosq_test
import mqtt5_opts
import mqtt5_props
import mqtt5_rc

import socket
import ssl
Expand Down
2 changes: 2 additions & 0 deletions test/broker/test.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
(1, './02-subscribe-qos0.py'),
(1, './02-subscribe-qos1.py'),
(1, './02-subscribe-qos2.py'),
(1, './02-subscribe-dollar-v5.py'),
(1, './02-subpub-qos0.py'),
(1, './02-subpub-qos1.py'),
(1, './02-subpub-qos2.py'),
Expand Down Expand Up @@ -74,6 +75,7 @@
#(1, './03-publish-qos1-queued-bytes.py'),
(1, './03-publish-invalid-utf8.py'),
(1, './03-publish-dollar.py'),
(1, './03-publish-dollar-v5.py'),
(1, './03-publish-qos1-retain-disabled.py'),

(1, './04-retain-qos0.py'),
Expand Down

0 comments on commit 6a1ac70

Please sign in to comment.