Skip to content

Commit

Permalink
User properties are copied from PUBLISH to PUBLISH.
Browse files Browse the repository at this point in the history
  • Loading branch information
ralight committed Oct 25, 2018
1 parent 0baf358 commit 267178b
Show file tree
Hide file tree
Showing 17 changed files with 124 additions and 68 deletions.
4 changes: 2 additions & 2 deletions lib/actions.c
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ int mosquitto_publish(struct mosquitto *mosq, int *mid, const char *topic, int p
}

if(qos == 0){
return send__publish(mosq, local_mid, topic, payloadlen, payload, qos, retain, false);
return send__publish(mosq, local_mid, topic, payloadlen, payload, qos, retain, false, NULL);
}else{
message = mosquitto__calloc(1, sizeof(struct mosquitto_message_all));
if(!message) return MOSQ_ERR_NOMEM;
Expand Down Expand Up @@ -87,7 +87,7 @@ int mosquitto_publish(struct mosquitto *mosq, int *mid, const char *topic, int p
message->state = mosq_ms_wait_for_pubrec;
}
pthread_mutex_unlock(&mosq->out_message_mutex);
return send__publish(mosq, message->msg.mid, message->msg.topic, message->msg.payloadlen, message->msg.payload, message->msg.qos, message->msg.retain, message->dup);
return send__publish(mosq, message->msg.mid, message->msg.topic, message->msg.payloadlen, message->msg.payload, message->msg.qos, message->msg.retain, message->dup, NULL);
}else{
message->state = mosq_ms_invalid;
pthread_mutex_unlock(&mosq->out_message_mutex);
Expand Down
4 changes: 2 additions & 2 deletions lib/messages_mosq.c
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,7 @@ int message__remove(struct mosquitto *mosq, uint16_t mid, enum mosquitto_msg_dir
}else if(cur->msg.qos == 2){
cur->state = mosq_ms_wait_for_pubrec;
}
rc = send__publish(mosq, cur->msg.mid, cur->msg.topic, cur->msg.payloadlen, cur->msg.payload, cur->msg.qos, cur->msg.retain, cur->dup);
rc = send__publish(mosq, cur->msg.mid, cur->msg.topic, cur->msg.payloadlen, cur->msg.payload, cur->msg.qos, cur->msg.retain, cur->dup, NULL);
if(rc){
pthread_mutex_unlock(&mosq->out_message_mutex);
return rc;
Expand Down Expand Up @@ -334,7 +334,7 @@ void message__retry_check_actual(struct mosquitto *mosq, struct mosquitto_messag
case mosq_ms_publish_qos2:
messages->timestamp = now;
messages->dup = true;
send__publish(mosq, messages->msg.mid, messages->msg.topic, messages->msg.payloadlen, messages->msg.payload, messages->msg.qos, messages->msg.retain, messages->dup);
send__publish(mosq, messages->msg.mid, messages->msg.topic, messages->msg.payloadlen, messages->msg.payload, messages->msg.qos, messages->msg.retain, messages->dup, NULL);
break;
case mosq_ms_wait_for_pubrel:
messages->timestamp = now;
Expand Down
16 changes: 16 additions & 0 deletions lib/packet_datatypes.c
Original file line number Diff line number Diff line change
Expand Up @@ -247,3 +247,19 @@ int packet__write_varint(struct mosquitto__packet *packet, int32_t word)
}
return MOSQ_ERR_SUCCESS;
}


int packet__varint_bytes(int32_t word)
{
if(word < 128){
return 1;
}else if(word < 16384){
return 2;
}else if(word < 2097152){
return 3;
}else if(word < 268435456){
return 4;
}else{
return 5;
}
}
2 changes: 2 additions & 0 deletions lib/packet_mosq.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ void packet__write_uint16(struct mosquitto__packet *packet, uint16_t word);
void packet__write_uint32(struct mosquitto__packet *packet, uint32_t word);
int packet__write_varint(struct mosquitto__packet *packet, int32_t word);

int packet__varint_bytes(int32_t word);

int packet__write(struct mosquitto *mosq);
#ifdef WITH_BROKER
int packet__read(struct mosquitto_db *db, struct mosquitto *mosq);
Expand Down
4 changes: 2 additions & 2 deletions lib/send_mosq.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,15 @@ and the Eclipse Distribution License is available at

int send__simple_command(struct mosquitto *mosq, uint8_t command);
int send__command_with_mid(struct mosquitto *mosq, uint8_t command, uint16_t mid, bool dup, struct mqtt5__property *properties);
int send__real_publish(struct mosquitto *mosq, uint16_t mid, const char *topic, uint32_t payloadlen, const void *payload, int qos, bool retain, bool dup);
int send__real_publish(struct mosquitto *mosq, uint16_t mid, const char *topic, uint32_t payloadlen, const void *payload, int qos, bool retain, bool dup, struct mqtt5__property *properties);

int send__connect(struct mosquitto *mosq, uint16_t keepalive, bool clean_session);
int send__disconnect(struct mosquitto *mosq);
int send__pingreq(struct mosquitto *mosq);
int send__pingresp(struct mosquitto *mosq);
int send__puback(struct mosquitto *mosq, uint16_t mid);
int send__pubcomp(struct mosquitto *mosq, uint16_t mid);
int send__publish(struct mosquitto *mosq, uint16_t mid, const char *topic, uint32_t payloadlen, const void *payload, int qos, bool retain, bool dup);
int send__publish(struct mosquitto *mosq, uint16_t mid, const char *topic, uint32_t payloadlen, const void *payload, int qos, bool retain, bool dup, struct mqtt5__property *properties);
int send__pubrec(struct mosquitto *mosq, uint16_t mid);
int send__pubrel(struct mosquitto *mosq, uint16_t mid);
int send__subscribe(struct mosquitto *mosq, int *mid, int topic_count, char *const *const topic, int topic_qos);
Expand Down
21 changes: 15 additions & 6 deletions lib/send_publish.c
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ and the Eclipse Distribution License is available at
#include "send_mosq.h"


int send__publish(struct mosquitto *mosq, uint16_t mid, const char *topic, uint32_t payloadlen, const void *payload, int qos, bool retain, bool dup)
int send__publish(struct mosquitto *mosq, uint16_t mid, const char *topic, uint32_t payloadlen, const void *payload, int qos, bool retain, bool dup, struct mqtt5__property *properties)
{
#ifdef WITH_BROKER
size_t len;
Expand Down Expand Up @@ -111,7 +111,7 @@ int send__publish(struct mosquitto *mosq, uint16_t mid, const char *topic, uint3
}
log__printf(NULL, MOSQ_LOG_DEBUG, "Sending PUBLISH to %s (d%d, q%d, r%d, m%d, '%s', ... (%ld bytes))", mosq->id, dup, qos, retain, mid, mapped_topic, (long)payloadlen);
G_PUB_BYTES_SENT_INC(payloadlen);
rc = send__real_publish(mosq, mid, mapped_topic, payloadlen, payload, qos, retain, dup);
rc = send__real_publish(mosq, mid, mapped_topic, payloadlen, payload, qos, retain, dup, properties);
mosquitto__free(mapped_topic);
return rc;
}
Expand All @@ -125,14 +125,16 @@ int send__publish(struct mosquitto *mosq, uint16_t mid, const char *topic, uint3
log__printf(mosq, MOSQ_LOG_DEBUG, "Client %s sending PUBLISH (d%d, q%d, r%d, m%d, '%s', ... (%ld bytes))", mosq->id, dup, qos, retain, mid, topic, (long)payloadlen);
#endif

return send__real_publish(mosq, mid, topic, payloadlen, payload, qos, retain, dup);
return send__real_publish(mosq, mid, topic, payloadlen, payload, qos, retain, dup, properties);
}


int send__real_publish(struct mosquitto *mosq, uint16_t mid, const char *topic, uint32_t payloadlen, const void *payload, int qos, bool retain, bool dup)
int send__real_publish(struct mosquitto *mosq, uint16_t mid, const char *topic, uint32_t payloadlen, const void *payload, int qos, bool retain, bool dup, struct mqtt5__property *properties)
{
struct mosquitto__packet *packet = NULL;
int packetlen;
int proplen;
int varlen;
int rc;

assert(mosq);
Expand All @@ -141,7 +143,14 @@ int send__real_publish(struct mosquitto *mosq, uint16_t mid, const char *topic,
packetlen = 2+strlen(topic) + payloadlen;
if(qos > 0) packetlen += 2; /* For message id */
if(mosq->protocol == mosq_p_mqtt5){
packetlen += 1;
proplen = property__get_length_all(properties);
varlen = packet__varint_bytes(proplen);
if(varlen > 4){
/* FIXME - Properties too big, don't publish any - should remove some first really */
properties = NULL;
}else{
packetlen += proplen + varlen;
}
}
packet = mosquitto__calloc(1, sizeof(struct mosquitto__packet));
if(!packet) return MOSQ_ERR_NOMEM;
Expand All @@ -161,7 +170,7 @@ int send__real_publish(struct mosquitto *mosq, uint16_t mid, const char *topic,
}

if(mosq->protocol == mosq_p_mqtt5){
property__write_all(packet, NULL);
property__write_all(packet, properties);
}

/* Payload */
Expand Down
8 changes: 4 additions & 4 deletions src/bridge.c
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ int bridge__connect_step1(struct mosquitto_db *db, struct mosquitto *context)
if(context->bridge->notification_topic){
if(!context->bridge->initial_notification_done){
notification_payload = '0';
db__messages_easy_queue(db, context, context->bridge->notification_topic, 1, 1, &notification_payload, 1);
db__messages_easy_queue(db, context, context->bridge->notification_topic, 1, 1, &notification_payload, 1, NULL);
context->bridge->initial_notification_done = true;
}
notification_payload = '0';
Expand All @@ -172,7 +172,7 @@ int bridge__connect_step1(struct mosquitto_db *db, struct mosquitto *context)

if(!context->bridge->initial_notification_done){
notification_payload = '0';
db__messages_easy_queue(db, context, notification_topic, 1, 1, &notification_payload, 1);
db__messages_easy_queue(db, context, notification_topic, 1, 1, &notification_payload, 1, NULL);
context->bridge->initial_notification_done = true;
}

Expand Down Expand Up @@ -321,7 +321,7 @@ int bridge__connect(struct mosquitto_db *db, struct mosquitto *context)
if(context->bridge->notification_topic){
if(!context->bridge->initial_notification_done){
notification_payload = '0';
db__messages_easy_queue(db, context, context->bridge->notification_topic, 1, 1, &notification_payload, 1);
db__messages_easy_queue(db, context, context->bridge->notification_topic, 1, 1, &notification_payload, 1, NULL);
context->bridge->initial_notification_done = true;
}

Expand All @@ -341,7 +341,7 @@ int bridge__connect(struct mosquitto_db *db, struct mosquitto *context)

if(!context->bridge->initial_notification_done){
notification_payload = '0';
db__messages_easy_queue(db, context, notification_topic, 1, 1, &notification_payload, 1);
db__messages_easy_queue(db, context, notification_topic, 1, 1, &notification_payload, 1, NULL);
context->bridge->initial_notification_done = true;
}

Expand Down
5 changes: 4 additions & 1 deletion src/context.c
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ and the Eclipse Distribution License is available at
#include "mosquitto_broker_internal.h"
#include "memory_mosq.h"
#include "packet_mosq.h"
#include "property_mosq.h"
#include "time_mosq.h"

#include "uthash.h"
Expand Down Expand Up @@ -220,10 +221,12 @@ void context__send_will(struct mosquitto_db *db, struct mosquitto *ctxt)
ctxt->will->msg.qos,
ctxt->will->msg.payloadlen,
ctxt->will->msg.payload,
ctxt->will->msg.retain);
ctxt->will->msg.retain,
&ctxt->will->properties);
}
}
if(ctxt->will){
property__free_all(&ctxt->will->properties);
mosquitto__free(ctxt->will->msg.topic);
mosquitto__free(ctxt->will->msg.payload);
mosquitto__free(ctxt->will);
Expand Down
24 changes: 17 additions & 7 deletions src/database.c
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,7 @@ void db__msg_store_remove(struct mosquitto_db *db, struct mosquitto_msg_store *s
mosquitto__free(store->dest_ids);
}
mosquitto__free(store->topic);
property__free_all(&store->properties);
UHPA_FREE_PAYLOAD(store);
mosquitto__free(store);
}
Expand Down Expand Up @@ -558,12 +559,13 @@ int db__messages_delete(struct mosquitto_db *db, struct mosquitto *context)
return MOSQ_ERR_SUCCESS;
}

int db__messages_easy_queue(struct mosquitto_db *db, struct mosquitto *context, const char *topic, int qos, uint32_t payloadlen, const void *payload, int retain)
int db__messages_easy_queue(struct mosquitto_db *db, struct mosquitto *context, const char *topic, int qos, uint32_t payloadlen, const void *payload, int retain, struct mqtt5__property **properties)
{
struct mosquitto_msg_store *stored;
char *source_id;
char *topic_heap;
mosquitto__payload_uhpa payload_uhpa;
struct mqtt5__property *local_properties = NULL;

assert(db);

Expand All @@ -584,13 +586,17 @@ int db__messages_easy_queue(struct mosquitto_db *db, struct mosquitto *context,
}else{
source_id = "";
}
if(db__message_store(db, source_id, 0, topic_heap, qos, payloadlen, &payload_uhpa, retain, &stored, 0)) return 1;
if(properties){
local_properties = *properties;
*properties = NULL;
}
if(db__message_store(db, source_id, 0, topic_heap, qos, payloadlen, &payload_uhpa, retain, &stored, local_properties, 0)) return 1;

return sub__messages_queue(db, source_id, topic_heap, qos, retain, &stored);
}

/* This function requires topic to be allocated on the heap. Once called, it owns topic and will free it on error. Likewise payload. */
int db__message_store(struct mosquitto_db *db, const char *source, uint16_t source_mid, char *topic, int qos, uint32_t payloadlen, mosquitto__payload_uhpa *payload, int retain, struct mosquitto_msg_store **stored, dbid_t store_id)
/* This function requires topic to be allocated on the heap. Once called, it owns topic and will free it on error. Likewise payload and properties. */
int db__message_store(struct mosquitto_db *db, const char *source, uint16_t source_mid, char *topic, int qos, uint32_t payloadlen, mosquitto__payload_uhpa *payload, int retain, struct mosquitto_msg_store **stored, struct mqtt5__property *properties, dbid_t store_id)
{
struct mosquitto_msg_store *temp = NULL;
int rc = MOSQ_ERR_SUCCESS;
Expand Down Expand Up @@ -626,6 +632,7 @@ int db__message_store(struct mosquitto_db *db, const char *source, uint16_t sour
temp->topic = topic;
topic = NULL;
temp->payloadlen = payloadlen;
temp->properties = properties;
if(payloadlen){
UHPA_MOVE(temp->payload, *payload, payloadlen);
}else{
Expand Down Expand Up @@ -654,6 +661,7 @@ int db__message_store(struct mosquitto_db *db, const char *source, uint16_t sour
mosquitto__free(temp->topic);
mosquitto__free(temp);
}
property__free_all(&properties);
return rc;
}

Expand Down Expand Up @@ -858,6 +866,7 @@ int db__message_write(struct mosquitto_db *db, struct mosquitto *context)
uint32_t payloadlen;
const void *payload;
int msg_count = 0;
struct mqtt5__property *properties;

if(!context || context->sock == INVALID_SOCKET
|| (context->state == mosq_cs_connected && !context->id)){
Expand All @@ -878,10 +887,11 @@ int db__message_write(struct mosquitto_db *db, struct mosquitto *context)
qos = tail->qos;
payloadlen = tail->store->payloadlen;
payload = UHPA_ACCESS_PAYLOAD(tail->store);
properties = tail->store->properties;

switch(tail->state){
case mosq_ms_publish_qos0:
rc = send__publish(context, mid, topic, payloadlen, payload, qos, retain, retries);
rc = send__publish(context, mid, topic, payloadlen, payload, qos, retain, retries, properties);
if(!rc){
db__message_remove(db, context, &tail, last);
}else{
Expand All @@ -890,7 +900,7 @@ int db__message_write(struct mosquitto_db *db, struct mosquitto *context)
break;

case mosq_ms_publish_qos1:
rc = send__publish(context, mid, topic, payloadlen, payload, qos, retain, retries);
rc = send__publish(context, mid, topic, payloadlen, payload, qos, retain, retries, properties);
if(!rc){
tail->timestamp = mosquitto_time();
tail->dup = 1; /* Any retry attempts are a duplicate. */
Expand All @@ -903,7 +913,7 @@ int db__message_write(struct mosquitto_db *db, struct mosquitto *context)
break;

case mosq_ms_publish_qos2:
rc = send__publish(context, mid, topic, payloadlen, payload, qos, retain, retries);
rc = send__publish(context, mid, topic, payloadlen, payload, qos, retain, retries, properties);
if(!rc){
tail->timestamp = mosquitto_time();
tail->dup = 1; /* Any retry attempts are a duplicate. */
Expand Down
8 changes: 4 additions & 4 deletions src/handle_connack.c
Original file line number Diff line number Diff line change
Expand Up @@ -49,12 +49,12 @@ int handle__connack(struct mosquitto_db *db, struct mosquitto *context)
if(context->bridge->notification_topic){
if(!context->bridge->notifications_local_only){
if(send__real_publish(context, mosquitto__mid_generate(context),
context->bridge->notification_topic, 1, &notification_payload, 1, true, 0)){
context->bridge->notification_topic, 1, &notification_payload, 1, true, 0, NULL)){

return 1;
}
}
db__messages_easy_queue(db, context, context->bridge->notification_topic, 1, 1, &notification_payload, 1);
db__messages_easy_queue(db, context, context->bridge->notification_topic, 1, 1, &notification_payload, 1, NULL);
}else{
notification_topic_len = strlen(context->bridge->remote_clientid)+strlen("$SYS/broker/connection//state");
notification_topic = mosquitto__malloc(sizeof(char)*(notification_topic_len+1));
Expand All @@ -64,13 +64,13 @@ int handle__connack(struct mosquitto_db *db, struct mosquitto *context)
notification_payload = '1';
if(!context->bridge->notifications_local_only){
if(send__real_publish(context, mosquitto__mid_generate(context),
notification_topic, 1, &notification_payload, 1, true, 0)){
notification_topic, 1, &notification_payload, 1, true, 0, NULL)){

mosquitto__free(notification_topic);
return 1;
}
}
db__messages_easy_queue(db, context, notification_topic, 1, 1, &notification_payload, 1);
db__messages_easy_queue(db, context, notification_topic, 1, 1, &notification_payload, 1, NULL);
mosquitto__free(notification_topic);
}
}
Expand Down
9 changes: 5 additions & 4 deletions src/handle_connect.c
Original file line number Diff line number Diff line change
Expand Up @@ -301,13 +301,11 @@ int handle__connect(struct mosquitto_db *db, struct mosquitto *context)
rc = MOSQ_ERR_NOMEM;
goto handle_connect_error;
}
/* FIXME - this needs to be "will" specific */
if(protocol_version == PROTOCOL_VERSION_v5){
rc = property__read_all(&context->in_packet, &properties);
rc = property__read_all(&context->in_packet, &will_struct->properties);
if(rc) return rc;
property__free_all(&properties);
property__free_all(&will_struct->properties);
}
property__free_all(&properties); /* FIXME - TEMPORARY UNTIL PROPERTIES PROCESSED */
if(packet__read_string(&context->in_packet, &will_topic, &slen)){
rc = 1;
goto handle_connect_error;
Expand Down Expand Up @@ -710,6 +708,9 @@ int handle__connect(struct mosquitto_db *db, struct mosquitto *context)
mosquitto__free(password);
mosquitto__free(will_payload);
mosquitto__free(will_topic);
if(will_struct){
property__free_all(&will_struct->properties);
}
mosquitto__free(will_struct);
#ifdef WITH_TLS
if(client_cert) X509_free(client_cert);
Expand Down
Loading

0 comments on commit 267178b

Please sign in to comment.