Skip to content

Commit

Permalink
Fix broker originated messages not being sent.
Browse files Browse the repository at this point in the history
This occurred when `check_retain_source` was set to true.

Closes #1245. Thanks to Christoph Krey.
  • Loading branch information
ralight committed Apr 30, 2019
1 parent ab77b5c commit f64d3b1
Show file tree
Hide file tree
Showing 7 changed files with 25 additions and 7 deletions.
2 changes: 2 additions & 0 deletions ChangeLog.txt
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ Broker:
`response-topic`. Closes #1244.
- Fix build for WITH_TLS=no. Closes #1250.
- Fix Will message not allowing user-property properties.
- Fix broker originated messages (e.g. $SYS/broker/version) not being
published when `check_retain_source` set to true. Closes #1245.

Library:
- Fix crash after client has been unable to connect to a broker. This occurs
Expand Down
12 changes: 10 additions & 2 deletions src/database.c
Original file line number Diff line number Diff line change
Expand Up @@ -580,6 +580,7 @@ int db__messages_easy_queue(struct mosquitto_db *db, struct mosquitto *context,
char *topic_heap;
mosquitto__payload_uhpa payload_uhpa;
mosquitto_property *local_properties = NULL;
enum mosquitto_msg_origin origin;

assert(db);

Expand Down Expand Up @@ -608,13 +609,19 @@ int db__messages_easy_queue(struct mosquitto_db *db, struct mosquitto *context,
local_properties = *properties;
*properties = NULL;
}
if(db__message_store(db, context, 0, topic_heap, qos, payloadlen, &payload_uhpa, retain, &stored, message_expiry_interval, local_properties, 0)) return 1;

if(context){
origin = mosq_mo_client;
}else{
origin = mosq_mo_broker;
}
if(db__message_store(db, context, 0, topic_heap, qos, payloadlen, &payload_uhpa, retain, &stored, message_expiry_interval, local_properties, 0, origin)) return 1;

return sub__messages_queue(db, source_id, topic_heap, qos, retain, &stored);
}

/* This function requires topic to be allocated on the heap. Once called, it owns topic and will free it on error. Likewise payload and properties. */
int db__message_store(struct mosquitto_db *db, const struct mosquitto *source, uint16_t source_mid, char *topic, int qos, uint32_t payloadlen, mosquitto__payload_uhpa *payload, int retain, struct mosquitto_msg_store **stored, uint32_t message_expiry_interval, mosquitto_property *properties, dbid_t store_id)
int db__message_store(struct mosquitto_db *db, const struct mosquitto *source, uint16_t source_mid, char *topic, int qos, uint32_t payloadlen, mosquitto__payload_uhpa *payload, int retain, struct mosquitto_msg_store **stored, uint32_t message_expiry_interval, mosquitto_property *properties, dbid_t store_id, enum mosquitto_msg_origin origin)
{
struct mosquitto_msg_store *temp = NULL;
int rc = MOSQ_ERR_SUCCESS;
Expand Down Expand Up @@ -662,6 +669,7 @@ int db__message_store(struct mosquitto_db *db, const struct mosquitto *source, u
topic = NULL;
temp->payloadlen = payloadlen;
temp->properties = properties;
temp->origin = origin;
if(payloadlen){
UHPA_MOVE(temp->payload, *payload, payloadlen);
}else{
Expand Down
2 changes: 1 addition & 1 deletion src/handle_publish.c
Original file line number Diff line number Diff line change
Expand Up @@ -300,7 +300,7 @@ int handle__publish(struct mosquitto_db *db, struct mosquitto *context)
}
if(!stored){
dup = 0;
if(db__message_store(db, context, mid, topic, qos, payloadlen, &payload, retain, &stored, message_expiry_interval, msg_properties, 0)){
if(db__message_store(db, context, mid, topic, qos, payloadlen, &payload, retain, &stored, message_expiry_interval, msg_properties, 0, mosq_mo_client)){
mosquitto_property_free_all(&msg_properties);
return 1;
}
Expand Down
9 changes: 8 additions & 1 deletion src/mosquitto_broker_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,12 @@ typedef int (*FUNC_auth_plugin_acl_check_v2)(void *, const char *, const char *,
typedef int (*FUNC_auth_plugin_unpwd_check_v2)(void *, const char *, const char *);
typedef int (*FUNC_auth_plugin_psk_key_get_v2)(void *, const char *, const char *, char *, int);


enum mosquitto_msg_origin{
mosq_mo_client = 0,
mosq_mo_broker = 1
};

struct mosquitto__auth_plugin{
void *lib;
void *user_data;
Expand Down Expand Up @@ -367,6 +373,7 @@ struct mosquitto_msg_store{
uint16_t mid;
uint8_t qos;
bool retain;
uint8_t origin;
};

struct mosquitto_client_msg{
Expand Down Expand Up @@ -608,7 +615,7 @@ int db__message_write(struct mosquitto_db *db, struct mosquitto *context);
void db__message_dequeue_first(struct mosquitto *context, struct mosquitto_msg_data *msg_data);
int db__messages_delete(struct mosquitto_db *db, struct mosquitto *context);
int db__messages_easy_queue(struct mosquitto_db *db, struct mosquitto *context, const char *topic, int qos, uint32_t payloadlen, const void *payload, int retain, uint32_t message_expiry_interval, mosquitto_property **properties);
int db__message_store(struct mosquitto_db *db, const struct mosquitto *source, uint16_t source_mid, char *topic, int qos, uint32_t payloadlen, mosquitto__payload_uhpa *payload, int retain, struct mosquitto_msg_store **stored, uint32_t message_expiry_interval, mosquitto_property *properties, dbid_t store_id);
int db__message_store(struct mosquitto_db *db, const struct mosquitto *source, uint16_t source_mid, char *topic, int qos, uint32_t payloadlen, mosquitto__payload_uhpa *payload, int retain, struct mosquitto_msg_store **stored, uint32_t message_expiry_interval, mosquitto_property *properties, dbid_t store_id, enum mosquitto_msg_origin origin);
int db__message_store_find(struct mosquitto *context, uint16_t mid, struct mosquitto_msg_store **stored);
void db__msg_store_add(struct mosquitto_db *db, struct mosquitto_msg_store *store);
void db__msg_store_remove(struct mosquitto_db *db, struct mosquitto_msg_store *store);
Expand Down
3 changes: 2 additions & 1 deletion src/persist_read.c
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,8 @@ static int persist__msg_store_chunk_restore(struct mosquitto_db *db, FILE *db_fp

rc = db__message_store(db, &chunk.source, chunk.F.source_mid,
chunk.topic, chunk.F.qos, chunk.F.payloadlen,
&chunk.payload, chunk.F.retain, &stored, message_expiry_interval, chunk.properties, chunk.F.store_id);
&chunk.payload, chunk.F.retain, &stored, message_expiry_interval,
chunk.properties, chunk.F.store_id, mosq_mo_client);

mosquitto__free(chunk.source.id);
mosquitto__free(chunk.source.username);
Expand Down
2 changes: 1 addition & 1 deletion src/subs.c
Original file line number Diff line number Diff line change
Expand Up @@ -1000,7 +1000,7 @@ static int retain__process(struct mosquitto_db *db, struct mosquitto__subhier *b
}

/* Check for original source access */
if(db->config->check_retain_source && retained->source_id){
if(db->config->check_retain_source && retained->origin != mosq_mo_broker && retained->source_id){
struct mosquitto retain_ctxt;
memset(&retain_ctxt, 0, sizeof(struct mosquitto));

Expand Down
2 changes: 1 addition & 1 deletion test/unit/persist_read_stubs.c
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ struct mosquitto *context__init(struct mosquitto_db *db, mosq_sock_t sock)
return m;
}

int db__message_store(struct mosquitto_db *db, const struct mosquitto *source, uint16_t source_mid, char *topic, int qos, uint32_t payloadlen, mosquitto__payload_uhpa *payload, int retain, struct mosquitto_msg_store **stored, uint32_t message_expiry_interval, mosquitto_property *properties, dbid_t store_id)
int db__message_store(struct mosquitto_db *db, const struct mosquitto *source, uint16_t source_mid, char *topic, int qos, uint32_t payloadlen, mosquitto__payload_uhpa *payload, int retain, struct mosquitto_msg_store **stored, uint32_t message_expiry_interval, mosquitto_property *properties, dbid_t store_id, enum mosquitto_msg_origin origin)
{
struct mosquitto_msg_store *temp = NULL;
int rc = MOSQ_ERR_SUCCESS;
Expand Down

0 comments on commit f64d3b1

Please sign in to comment.