Skip to content

Commit

Permalink
Always zero terminate payloads (removes uhpa support)
Browse files Browse the repository at this point in the history
  • Loading branch information
ralight committed Nov 24, 2020
1 parent 5db971a commit adfa9f1
Show file tree
Hide file tree
Showing 17 changed files with 49 additions and 274 deletions.
8 changes: 4 additions & 4 deletions apps/db_dump/db_dump.c
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ static void free__msg_store(struct P_msg_store *chunk)
{
//free(chunk->source_id);
free(chunk->topic);
UHPA_FREE(chunk->payload, chunk->F.payloadlen);
free(chunk->payload);
mosquitto_property_free_all(&chunk->properties);
}

Expand Down Expand Up @@ -252,7 +252,7 @@ static int dump__msg_store_chunk_process(FILE *db_fptr, uint32_t length)
mosquitto__free(chunk.source.id);
mosquitto__free(chunk.source.username);
mosquitto__free(chunk.topic);
UHPA_FREE(chunk.payload, chunk.F.payloadlen);
mosquitto__free(chunk.payload);
log__printf(NULL, MOSQ_LOG_ERR, "Error: Out of memory.");
return MOSQ_ERR_NOMEM;
}
Expand All @@ -275,15 +275,15 @@ static int dump__msg_store_chunk_process(FILE *db_fptr, uint32_t length)
mosquitto__free(chunk.source.id);
mosquitto__free(chunk.source.username);
mosquitto__free(chunk.topic);
UHPA_FREE(chunk.payload, chunk.F.payloadlen);
mosquitto__free(chunk.payload);
return MOSQ_ERR_NOMEM;
}
stored->source_mid = chunk.F.source_mid;
stored->topic = chunk.topic;
stored->qos = chunk.F.qos;
stored->retain = chunk.F.retain;
stored->payloadlen = chunk.F.payloadlen;
UHPA_MOVE(stored->payload, chunk.payload, chunk.F.payloadlen);
stored->payload = chunk.payload;
stored->properties = chunk.properties;

rc = db__message_store(&chunk.source, stored, message_expiry_interval,
Expand Down
2 changes: 1 addition & 1 deletion apps/db_dump/print.c
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ void print__msg_store(struct P_msg_store *chunk, int length)

uint8_t *payload;

payload = UHPA_ACCESS(chunk->payload, chunk->F.payloadlen);
payload = chunk->payload;
if(chunk->F.payloadlen < 256){
/* Print payloads with UTF-8 data below an arbitrary limit of 256 bytes */
if(mosquitto_validate_utf8((char *)payload, (uint16_t)chunk->F.payloadlen) == MOSQ_ERR_SUCCESS){
Expand Down
2 changes: 1 addition & 1 deletion src/control.c
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ int control__process(struct mosquitto *context, struct mosquitto_msg_store *stor
memset(&event_data, 0, sizeof(event_data));
event_data.client = context;
event_data.topic = stored->topic;
event_data.payload = UHPA_ACCESS(stored->payload, stored->payloadlen);
event_data.payload = stored->payload;
event_data.payloadlen = stored->payloadlen;
event_data.qos = stored->qos;
event_data.retain = stored->retain;
Expand Down
11 changes: 7 additions & 4 deletions src/database.c
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ void db__msg_store_free(struct mosquitto_msg_store *store)
}
mosquitto__free(store->topic);
mosquitto_property_free_all(&store->properties);
UHPA_FREE_PAYLOAD(store);
mosquitto__free(store->payload);
mosquitto__free(store);
}

Expand Down Expand Up @@ -623,11 +623,14 @@ int db__messages_easy_queue(struct mosquitto *context, const char *topic, uint8_
}

stored->payloadlen = payloadlen;
if(UHPA_ALLOC(stored->payload, stored->payloadlen) == 0){
stored->payload = mosquitto__malloc(stored->payloadlen+1);
if(stored->payload == NULL){
db__msg_store_free(stored);
return MOSQ_ERR_NOMEM;
}
memcpy(UHPA_ACCESS(stored->payload, stored->payloadlen), payload, stored->payloadlen);
/* Ensure payload is always zero terminated, this is the reason for the extra byte above */
((uint8_t *)stored->payload)[stored->payloadlen] = 0;
memcpy(stored->payload, payload, stored->payloadlen);

if(context && context->id){
source_id = context->id;
Expand Down Expand Up @@ -1018,7 +1021,7 @@ static int db__message_write_inflight_out_single(struct mosquitto *context, stru
topic = msg->store->topic;
qos = (uint8_t)msg->qos;
payloadlen = msg->store->payloadlen;
payload = UHPA_ACCESS_PAYLOAD(msg->store);
payload = msg->store->payload;
cmsg_props = msg->properties;
store_props = msg->store->properties;

Expand Down
2 changes: 1 addition & 1 deletion src/handle_connect.c
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ void connection_check_acl(struct mosquitto *context, struct mosquitto_client_msg
DL_FOREACH_SAFE((*head), msg_tail, tmp){
if(msg_tail->direction == mosq_md_out){
if(mosquitto_acl_check(context, msg_tail->store->topic,
msg_tail->store->payloadlen, UHPA_ACCESS(msg_tail->store->payload, msg_tail->store->payloadlen),
msg_tail->store->payloadlen, msg_tail->store->payload,
msg_tail->store->qos, msg_tail->store->retain, MOSQ_ACL_READ) != MOSQ_ERR_SUCCESS){

DL_DELETE((*head), msg_tail);
Expand Down
9 changes: 6 additions & 3 deletions src/handle_publish.c
Original file line number Diff line number Diff line change
Expand Up @@ -225,19 +225,22 @@ int handle__publish(struct mosquitto *context)
reason_code = MQTT_RC_IMPLEMENTATION_SPECIFIC;
goto process_bad_message;
}
if(UHPA_ALLOC(msg->payload, msg->payloadlen) == 0){
msg->payload = mosquitto__malloc(msg->payloadlen+1);
if(msg->payload == NULL){
db__msg_store_free(msg);
return MOSQ_ERR_NOMEM;
}
/* Ensure payload is always zero terminated, this is the reason for the extra byte above */
((uint8_t *)msg->payload)[msg->payloadlen] = 0;

if(packet__read_bytes(&context->in_packet, UHPA_ACCESS(msg->payload, msg->payloadlen), msg->payloadlen)){
if(packet__read_bytes(&context->in_packet, msg->payload, msg->payloadlen)){
db__msg_store_free(msg);
return MOSQ_ERR_MALFORMED_PACKET;
}
}

/* Check for topic access */
rc = mosquitto_acl_check(context, msg->topic, msg->payloadlen, UHPA_ACCESS(msg->payload, msg->payloadlen), msg->qos, msg->retain, MOSQ_ACL_WRITE);
rc = mosquitto_acl_check(context, msg->topic, msg->payloadlen, msg->payload, msg->qos, msg->retain, MOSQ_ACL_WRITE);
if(rc == MOSQ_ERR_ACL_DENIED){
log__printf(NULL, MOSQ_LOG_DEBUG, "Denied PUBLISH from %s (d%d, q%d, r%d, m%d, '%s', ... (%ld bytes))", context->id, dup, msg->qos, msg->retain, msg->source_mid, msg->topic, (long)msg->payloadlen);
reason_code = MQTT_RC_NOT_AUTHORIZED;
Expand Down
7 changes: 5 additions & 2 deletions src/loop.c
Original file line number Diff line number Diff line change
Expand Up @@ -79,11 +79,14 @@ static int single_publish(struct mosquitto *context, struct mosquitto_message_v5
msg->topic = NULL;
stored->retain = 0;
stored->payloadlen = (uint32_t)msg->payloadlen;
if(UHPA_ALLOC(stored->payload, stored->payloadlen) == 0){
stored->payload = mosquitto__malloc(stored->payloadlen+1);
if(stored->payload == NULL){
db__msg_store_free(stored);
return MOSQ_ERR_NOMEM;
}
memcpy(UHPA_ACCESS(stored->payload, stored->payloadlen), msg->payload, stored->payloadlen);
/* Ensure payload is always zero terminated, this is the reason for the extra byte above */
((uint8_t *)stored->payload)[stored->payloadlen] = 0;
memcpy(stored->payload, msg->payload, stored->payloadlen);

if(msg->properties){
stored->properties = msg->properties;
Expand Down
65 changes: 1 addition & 64 deletions src/mosquitto_broker_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,6 @@ and the Eclipse Distribution License is available at
#include "tls_mosq.h"
#include "uthash.h"

#define uhpa_malloc(size) mosquitto__malloc(size)
#define uhpa_free(ptr) mosquitto__free(ptr)
#include "uhpa.h"

#ifndef __GNUC__
#define __attribute__(attrib)
#endif
Expand All @@ -75,65 +71,6 @@ and the Eclipse Distribution License is available at
#define CMD_PORT_LIMIT 10
#define TOPIC_HIERARCHY_LIMIT 200

/* ========================================
* UHPA data types
* ======================================== */

/* See uhpa.h
*
* The idea here is that there is potentially a lot of wasted space (and time)
* in malloc calls for frequent, small heap allocations. This can happen if
* small payloads are used by clients or if individual topic elements are
* small.
*
* In both cases, a struct is used that includes a void* or char* pointer to
* point to the dynamically allocated memory used. To allocate and store a
* single byte needs the size of the pointer (8 bytes on a 64 bit
* architecture), the malloc overhead and the memory allocated itself (which
* will often be larger than the memory requested, on 64 bit Linux this can be
* a minimum of 24 bytes). To allocate and store 1 byte of heap memory we need
* in this example 32 bytes.
*
* UHPA uses a union to either store data in an array, or to allocate memory on
* the heap, depending on the size of the data being stored (this does mean
* that the size of the data must always be known). Setting the size of the
* array changes the point at which heap allocation starts. Using the example
* above, this means that an array size of 32 bytes should not result in any
* wasted space, and should be quicker as well. Certainly in the case of topic
* elements (e.g. "bar" out of "foo/bar/baz") it is likely that an array size
* of 32 bytes will mean that the majority of heap allocations are removed.
*
* You can change the size of MOSQ_PAYLOAD_UNION_SIZE and
* MOSQ_TOPIC_ELEMENT_UNION_SIZE to change the size of the uhpa array used for
* the payload (i.e. the published part of a message) and for topic elements
* (e.g. "foo", "bar" or "baz" in the topic "foo/bar/baz"), and so control the
* heap allocation threshold for these data types. You should look at your
* application to decide what values to set, but don't set them too high
* otherwise your overall memory usage will increase.
*
* You could use something like heaptrack
* http:https://milianw.de/blog/heaptrack-a-heap-memory-profiler-for-linux to
* profile heap allocations.
*
* I would suggest that values for MOSQ_PAYLOAD_UNION_SIZE and
* MOSQ_TOPIC_UNION_SIZE that are equivalent to
* sizeof(void*)+malloc_usable_size(malloc(1)) are a safe value that should
* reduce calls to malloc without increasing memory usage at all.
*/
#define MOSQ_PAYLOAD_UNION_SIZE 8
typedef union {
void *ptr;
char array[MOSQ_PAYLOAD_UNION_SIZE];
} mosquitto__payload_uhpa;
#define UHPA_ALLOC_PAYLOAD(A) UHPA_ALLOC((A)->payload, (A)->payloadlen)
#define UHPA_ACCESS_PAYLOAD(A) UHPA_ACCESS((A)->payload, (A)->payloadlen)
#define UHPA_FREE_PAYLOAD(A) UHPA_FREE((A)->payload, (A)->payloadlen)
#define UHPA_MOVE_PAYLOAD(DEST, SRC) UHPA_MOVE((DEST)->payload, (SRC)->payload, (SRC)->payloadlen)

/* ========================================
* End UHPA data types
* ======================================== */

typedef uint64_t dbid_t;

typedef int (*FUNC_plugin_init_v5)(mosquitto_plugin_id_t *, void **, struct mosquitto_opt *, int);
Expand Down Expand Up @@ -445,7 +382,7 @@ struct mosquitto_msg_store{
int ref_count;
char* topic;
mosquitto_property *properties;
mosquitto__payload_uhpa payload;
void *payload;
time_t message_expiry_time;
uint32_t payloadlen;
uint16_t source_mid;
Expand Down
2 changes: 1 addition & 1 deletion src/persist.h
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ struct PF_msg_store{
};
struct P_msg_store{
struct PF_msg_store F;
mosquitto__payload_uhpa payload;
void *payload;
struct mosquitto source;
char *topic;
mosquitto_property *properties;
Expand Down
8 changes: 4 additions & 4 deletions src/persist_read.c
Original file line number Diff line number Diff line change
Expand Up @@ -280,7 +280,7 @@ static int persist__msg_store_chunk_restore(FILE *db_fptr, uint32_t length)
mosquitto__free(chunk.source.id);
mosquitto__free(chunk.source.username);
mosquitto__free(chunk.topic);
UHPA_FREE(chunk.payload, chunk.F.payloadlen);
mosquitto__free(chunk.payload);
log__printf(NULL, MOSQ_LOG_ERR, "Error: Out of memory.");
return MOSQ_ERR_NOMEM;
}
Expand All @@ -292,7 +292,7 @@ static int persist__msg_store_chunk_restore(FILE *db_fptr, uint32_t length)
mosquitto__free(chunk.source.id);
mosquitto__free(chunk.source.username);
mosquitto__free(chunk.topic);
UHPA_FREE(chunk.payload, chunk.F.payloadlen);
mosquitto__free(chunk.payload);
mosquitto__free(load);
return MOSQ_ERR_SUCCESS;
}else{
Expand All @@ -308,7 +308,7 @@ static int persist__msg_store_chunk_restore(FILE *db_fptr, uint32_t length)
mosquitto__free(chunk.source.id);
mosquitto__free(chunk.source.username);
mosquitto__free(chunk.topic);
UHPA_FREE(chunk.payload, chunk.F.payloadlen);
mosquitto__free(chunk.payload);
log__printf(NULL, MOSQ_LOG_ERR, "Error: Out of memory.");
return MOSQ_ERR_NOMEM;
}
Expand All @@ -319,7 +319,7 @@ static int persist__msg_store_chunk_restore(FILE *db_fptr, uint32_t length)
stored->payloadlen = chunk.F.payloadlen;
stored->retain = chunk.F.retain;
stored->properties = chunk.properties;
UHPA_MOVE(stored->payload, chunk.payload, stored->payloadlen);
stored->payload = chunk.payload;

rc = db__message_store(&chunk.source, stored, message_expiry_interval,
chunk.F.store_id, mosq_mo_client);
Expand Down
7 changes: 5 additions & 2 deletions src/persist_read_v234.c
Original file line number Diff line number Diff line change
Expand Up @@ -170,14 +170,17 @@ int persist__chunk_msg_store_read_v234(FILE *db_fptr, struct P_msg_store *chunk,
chunk->F.payloadlen = ntohl(i32temp);

if(chunk->F.payloadlen){
if(UHPA_ALLOC(chunk->payload, chunk->F.payloadlen) == 0){
chunk->payload = mosquitto_malloc(chunk->F.payloadlen+1);
if(chunk->payload == NULL){
mosquitto__free(chunk->source.id);
mosquitto__free(chunk->source.username);
mosquitto__free(chunk->topic);
log__printf(NULL, MOSQ_LOG_ERR, "Error: Out of memory.");
return MOSQ_ERR_NOMEM;
}
read_e(db_fptr, UHPA_ACCESS(chunk->payload, chunk->F.payloadlen), chunk->F.payloadlen);
/* Ensure zero terminated regardless of contents */
((uint8_t *)chunk->payload)[chunk->F.payloadlen] = 0;
read_e(db_fptr, chunk->payload, chunk->F.payloadlen);
}

return MOSQ_ERR_SUCCESS;
Expand Down
7 changes: 5 additions & 2 deletions src/persist_read_v5.c
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,8 @@ int persist__chunk_msg_store_read_v56(FILE *db_fptr, struct P_msg_store *chunk,
}

if(chunk->F.payloadlen > 0){
if(UHPA_ALLOC(chunk->payload, chunk->F.payloadlen) == 0){
chunk->payload = mosquitto__malloc(chunk->F.payloadlen+1);
if(chunk->payload == NULL){
mosquitto__free(chunk->source.id);
mosquitto__free(chunk->source.username);
mosquitto__free(chunk->topic);
Expand All @@ -200,7 +201,9 @@ int persist__chunk_msg_store_read_v56(FILE *db_fptr, struct P_msg_store *chunk,
log__printf(NULL, MOSQ_LOG_ERR, "Error: Out of memory.");
return MOSQ_ERR_NOMEM;
}
read_e(db_fptr, UHPA_ACCESS(chunk->payload, chunk->F.payloadlen), chunk->F.payloadlen);
/* Ensure zero terminated regardless of contents */
((uint8_t *)chunk->payload)[chunk->F.payloadlen] = 0;
read_e(db_fptr, chunk->payload, chunk->F.payloadlen);
}

if(length > 0){
Expand Down
2 changes: 1 addition & 1 deletion src/persist_write_v5.c
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ int persist__chunk_message_store_write_v6(FILE *db_fptr, struct P_msg_store *chu
}
write_e(db_fptr, chunk->topic, topic_len);
if(payloadlen){
write_e(db_fptr, UHPA_ACCESS(chunk->payload, payloadlen), (unsigned int)payloadlen);
write_e(db_fptr, chunk->payload, (unsigned int)payloadlen);
}
if(chunk->properties){
if(proplen > 0){
Expand Down
14 changes: 4 additions & 10 deletions src/plugin.c
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ int plugin__handle_message(struct mosquitto *context, struct mosquitto_msg_store
event_data.client = context;
event_data.topic = stored->topic;
event_data.payloadlen = stored->payloadlen;
event_data.payload = UHPA_ACCESS(stored->payload, stored->payloadlen);
event_data.payload = stored->payload;
event_data.qos = stored->qos;
event_data.retain = stored->retain;
event_data.properties = stored->properties;
Expand All @@ -161,17 +161,11 @@ int plugin__handle_message(struct mosquitto *context, struct mosquitto_msg_store
}

stored->topic = event_data.topic;
if(UHPA_ACCESS(stored->payload, stored->payloadlen) != event_data.payload){
UHPA_FREE(stored->payload, stored->payloadlen);
if(event_data.payloadlen > sizeof(stored->payload.array)){
stored->payload.ptr = event_data.payload;
}else{
memcpy(stored->payload.array, event_data.payload, event_data.payloadlen);
mosquitto_free(event_data.payload);
}
if(stored->payload != event_data.payload){
mosquitto__free(stored->payload);
stored->payload = event_data.payload;
stored->payloadlen = event_data.payloadlen;
}
memcpy(UHPA_ACCESS(stored->payload, stored->payloadlen), event_data.payload, stored->payloadlen);
stored->retain = event_data.retain;
stored->properties = event_data.properties;

Expand Down
4 changes: 2 additions & 2 deletions src/retain.c
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ static int retain__process(struct mosquitto__retainhier *branch, struct mosquitt

retained = branch->retained;

rc = mosquitto_acl_check(context, retained->topic, retained->payloadlen, UHPA_ACCESS(retained->payload, retained->payloadlen),
rc = mosquitto_acl_check(context, retained->topic, retained->payloadlen, retained->payload,
retained->qos, retained->retain, MOSQ_ACL_READ);
if(rc == MOSQ_ERR_ACL_DENIED){
return MOSQ_ERR_SUCCESS;
Expand All @@ -161,7 +161,7 @@ static int retain__process(struct mosquitto__retainhier *branch, struct mosquitt
rc = acl__find_acls(&retain_ctxt);
if(rc) return rc;

rc = mosquitto_acl_check(&retain_ctxt, retained->topic, retained->payloadlen, UHPA_ACCESS(retained->payload, retained->payloadlen),
rc = mosquitto_acl_check(&retain_ctxt, retained->topic, retained->payloadlen, retained->payload,
retained->qos, retained->retain, MOSQ_ACL_WRITE);
if(rc == MOSQ_ERR_ACL_DENIED){
return MOSQ_ERR_SUCCESS;
Expand Down
2 changes: 1 addition & 1 deletion src/subs.c
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ static int subs__send(struct mosquitto__subleaf *leaf, const char *topic, uint8_
int rc2;

/* Check for ACL topic access. */
rc2 = mosquitto_acl_check(leaf->context, topic, stored->payloadlen, UHPA_ACCESS(stored->payload, stored->payloadlen), stored->qos, stored->retain, MOSQ_ACL_READ);
rc2 = mosquitto_acl_check(leaf->context, topic, stored->payloadlen, stored->payload, stored->qos, stored->retain, MOSQ_ACL_READ);
if(rc2 == MOSQ_ERR_ACL_DENIED){
return MOSQ_ERR_SUCCESS;
}else if(rc2 == MOSQ_ERR_SUCCESS){
Expand Down
Loading

0 comments on commit adfa9f1

Please sign in to comment.