Skip to content

Commit

Permalink
Subscription identifier support.
Browse files Browse the repository at this point in the history
  • Loading branch information
ralight committed Dec 20, 2018
1 parent ef724e6 commit 7c3666d
Show file tree
Hide file tree
Showing 25 changed files with 109 additions and 74 deletions.
2 changes: 1 addition & 1 deletion client/client_props.c
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ int cfg_parse_property(struct mosq_config *cfg, int argc, char *argv[], int *idx
break;

case CMD_SUBSCRIBE:
if(identifier != MQTT_PROP_USER_PROPERTY){
if(identifier != MQTT_PROP_SUBSCRIPTION_IDENTIFIER && identifier != MQTT_PROP_USER_PROPERTY){
fprintf(stderr, "Error: %s property not supported for %s in --property argument.\n\n", propname, cmdname);
return MOSQ_ERR_NOT_SUPPORTED;
}
Expand Down
4 changes: 2 additions & 2 deletions lib/actions.c
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ int mosquitto_publish_v5(struct mosquitto *mosq, int *mid, const char *topic, in
}

if(qos == 0){
return send__publish(mosq, local_mid, topic, payloadlen, payload, qos, retain, false, outgoing_properties);
return send__publish(mosq, local_mid, topic, payloadlen, payload, qos, retain, false, outgoing_properties, NULL);
}else{
message = mosquitto__calloc(1, sizeof(struct mosquitto_message_all));
if(!message) return MOSQ_ERR_NOMEM;
Expand Down Expand Up @@ -133,7 +133,7 @@ int mosquitto_publish_v5(struct mosquitto *mosq, int *mid, const char *topic, in
message->state = mosq_ms_wait_for_pubrec;
}
pthread_mutex_unlock(&mosq->out_message_mutex);
return send__publish(mosq, message->msg.mid, message->msg.topic, message->msg.payloadlen, message->msg.payload, message->msg.qos, message->msg.retain, message->dup, outgoing_properties);
return send__publish(mosq, message->msg.mid, message->msg.topic, message->msg.payloadlen, message->msg.payload, message->msg.qos, message->msg.retain, message->dup, outgoing_properties, NULL);
}else{
message->state = mosq_ms_invalid;
pthread_mutex_unlock(&mosq->out_message_mutex);
Expand Down
2 changes: 1 addition & 1 deletion lib/connect.c
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ static int mosquitto__reconnect(struct mosquitto *mosq, bool blocking, const mos
local_property.next = NULL;
outgoing_properties = &local_property;
}
rc = mosquitto_property_check_all(CMD_DISCONNECT, outgoing_properties);
rc = mosquitto_property_check_all(CMD_CONNECT, outgoing_properties);
if(rc) return rc;
}

Expand Down
4 changes: 2 additions & 2 deletions lib/messages_mosq.c
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,7 @@ int message__remove(struct mosquitto *mosq, uint16_t mid, enum mosquitto_msg_dir
}else if(cur->msg.qos == 2){
cur->state = mosq_ms_wait_for_pubrec;
}
rc = send__publish(mosq, cur->msg.mid, cur->msg.topic, cur->msg.payloadlen, cur->msg.payload, cur->msg.qos, cur->msg.retain, cur->dup, NULL);
rc = send__publish(mosq, cur->msg.mid, cur->msg.topic, cur->msg.payloadlen, cur->msg.payload, cur->msg.qos, cur->msg.retain, cur->dup, NULL, NULL);
if(rc){
pthread_mutex_unlock(&mosq->out_message_mutex);
return rc;
Expand Down Expand Up @@ -334,7 +334,7 @@ void message__retry_check_actual(struct mosquitto *mosq, struct mosquitto_messag
case mosq_ms_publish_qos2:
messages->timestamp = now;
messages->dup = true;
send__publish(mosq, messages->msg.mid, messages->msg.topic, messages->msg.payloadlen, messages->msg.payload, messages->msg.qos, messages->msg.retain, messages->dup, NULL);
send__publish(mosq, messages->msg.mid, messages->msg.topic, messages->msg.payloadlen, messages->msg.payload, messages->msg.qos, messages->msg.retain, messages->dup, NULL, NULL);
break;
case mosq_ms_wait_for_pubrel:
messages->timestamp = now;
Expand Down
8 changes: 5 additions & 3 deletions lib/property_mosq.c
Original file line number Diff line number Diff line change
Expand Up @@ -397,13 +397,15 @@ int property__write(struct mosquitto__packet *packet, const mosquitto_property *
}


int property__write_all(struct mosquitto__packet *packet, const mosquitto_property *properties)
int property__write_all(struct mosquitto__packet *packet, const mosquitto_property *properties, bool write_len)
{
int rc;
const mosquitto_property *p;

rc = packet__write_varint(packet, property__get_length_all(properties));
if(rc) return rc;
if(write_len){
rc = packet__write_varint(packet, property__get_length_all(properties));
if(rc) return rc;
}

p = properties;
while(p){
Expand Down
2 changes: 1 addition & 1 deletion lib/property_mosq.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ struct mqtt5__property {


int property__read_all(int command, struct mosquitto__packet *packet, mosquitto_property **property);
int property__write_all(struct mosquitto__packet *packet, const mosquitto_property *property);
int property__write_all(struct mosquitto__packet *packet, const mosquitto_property *property, bool write_len);
void property__free(mosquitto_property **property);

int property__get_length(const mosquitto_property *property);
Expand Down
4 changes: 2 additions & 2 deletions lib/send_connect.c
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ int send__connect(struct mosquitto *mosq, uint16_t keepalive, bool clean_session

if(mosq->protocol == mosq_p_mqtt5){
/* Write properties */
property__write_all(packet, properties);
property__write_all(packet, properties, true);
}

/* Payload */
Expand All @@ -153,7 +153,7 @@ int send__connect(struct mosquitto *mosq, uint16_t keepalive, bool clean_session
if(will){
if(mosq->protocol == mosq_p_mqtt5){
/* Write will properties */
property__write_all(packet, mosq->will->properties);
property__write_all(packet, mosq->will->properties, true);
}
packet__write_string(packet, mosq->will->msg.topic, strlen(mosq->will->msg.topic));
packet__write_string(packet, (const char *)mosq->will->msg.payload, mosq->will->msg.payloadlen);
Expand Down
2 changes: 1 addition & 1 deletion lib/send_disconnect.c
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ int send__disconnect(struct mosquitto *mosq, uint8_t reason_code, const mosquitt
}
if(mosq->protocol == mosq_p_mqtt5){
packet__write_byte(packet, reason_code);
property__write_all(packet, properties);
property__write_all(packet, properties, true);
}

return packet__queue(mosq, packet);
Expand Down
2 changes: 1 addition & 1 deletion lib/send_mosq.c
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ int send__command_with_mid(struct mosquitto *mosq, uint8_t command, uint16_t mid

if(mosq->protocol == mosq_p_mqtt5){
packet__write_byte(packet, reason_code);
property__write_all(packet, properties);
property__write_all(packet, properties, true);
}

return packet__queue(mosq, packet);
Expand Down
4 changes: 2 additions & 2 deletions lib/send_mosq.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,15 @@ and the Eclipse Distribution License is available at

int send__simple_command(struct mosquitto *mosq, uint8_t command);
int send__command_with_mid(struct mosquitto *mosq, uint8_t command, uint16_t mid, bool dup, uint8_t reason_code, const mosquitto_property *properties);
int send__real_publish(struct mosquitto *mosq, uint16_t mid, const char *topic, uint32_t payloadlen, const void *payload, int qos, bool retain, bool dup, const mosquitto_property *properties);
int send__real_publish(struct mosquitto *mosq, uint16_t mid, const char *topic, uint32_t payloadlen, const void *payload, int qos, bool retain, bool dup, const mosquitto_property *cmsg_props, const mosquitto_property *store_props);

int send__connect(struct mosquitto *mosq, uint16_t keepalive, bool clean_session, const mosquitto_property *properties);
int send__disconnect(struct mosquitto *mosq, uint8_t reason_code, const mosquitto_property *properties);
int send__pingreq(struct mosquitto *mosq);
int send__pingresp(struct mosquitto *mosq);
int send__puback(struct mosquitto *mosq, uint16_t mid);
int send__pubcomp(struct mosquitto *mosq, uint16_t mid);
int send__publish(struct mosquitto *mosq, uint16_t mid, const char *topic, uint32_t payloadlen, const void *payload, int qos, bool retain, bool dup, const mosquitto_property *properties);
int send__publish(struct mosquitto *mosq, uint16_t mid, const char *topic, uint32_t payloadlen, const void *payload, int qos, bool retain, bool dup, const mosquitto_property *cmsg_props, const mosquitto_property *store_props);
int send__pubrec(struct mosquitto *mosq, uint16_t mid);
int send__pubrel(struct mosquitto *mosq, uint16_t mid);
int send__subscribe(struct mosquitto *mosq, int *mid, int topic_count, char *const *const topic, int topic_qos, const mosquitto_property *properties);
Expand Down
21 changes: 13 additions & 8 deletions lib/send_publish.c
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ and the Eclipse Distribution License is available at
#include "send_mosq.h"


int send__publish(struct mosquitto *mosq, uint16_t mid, const char *topic, uint32_t payloadlen, const void *payload, int qos, bool retain, bool dup, const mosquitto_property *properties)
int send__publish(struct mosquitto *mosq, uint16_t mid, const char *topic, uint32_t payloadlen, const void *payload, int qos, bool retain, bool dup, const mosquitto_property *cmsg_props, const mosquitto_property *store_props)
{
#ifdef WITH_BROKER
size_t len;
Expand Down Expand Up @@ -110,7 +110,7 @@ int send__publish(struct mosquitto *mosq, uint16_t mid, const char *topic, uint3
}
log__printf(NULL, MOSQ_LOG_DEBUG, "Sending PUBLISH to %s (d%d, q%d, r%d, m%d, '%s', ... (%ld bytes))", mosq->id, dup, qos, retain, mid, mapped_topic, (long)payloadlen);
G_PUB_BYTES_SENT_INC(payloadlen);
rc = send__real_publish(mosq, mid, mapped_topic, payloadlen, payload, qos, retain, dup, properties);
rc = send__real_publish(mosq, mid, mapped_topic, payloadlen, payload, qos, retain, dup, cmsg_props, store_props);
mosquitto__free(mapped_topic);
return rc;
}
Expand All @@ -124,15 +124,15 @@ int send__publish(struct mosquitto *mosq, uint16_t mid, const char *topic, uint3
log__printf(mosq, MOSQ_LOG_DEBUG, "Client %s sending PUBLISH (d%d, q%d, r%d, m%d, '%s', ... (%ld bytes))", mosq->id, dup, qos, retain, mid, topic, (long)payloadlen);
#endif

return send__real_publish(mosq, mid, topic, payloadlen, payload, qos, retain, dup, properties);
return send__real_publish(mosq, mid, topic, payloadlen, payload, qos, retain, dup, cmsg_props, store_props);
}


int send__real_publish(struct mosquitto *mosq, uint16_t mid, const char *topic, uint32_t payloadlen, const void *payload, int qos, bool retain, bool dup, const mosquitto_property *properties)
int send__real_publish(struct mosquitto *mosq, uint16_t mid, const char *topic, uint32_t payloadlen, const void *payload, int qos, bool retain, bool dup, const mosquitto_property *cmsg_props, const mosquitto_property *store_props)
{
struct mosquitto__packet *packet = NULL;
int packetlen;
int proplen, varbytes;
int proplen = 0, varbytes;
int rc;

assert(mosq);
Expand All @@ -144,11 +144,14 @@ int send__real_publish(struct mosquitto *mosq, uint16_t mid, const char *topic,
}
if(qos > 0) packetlen += 2; /* For message id */
if(mosq->protocol == mosq_p_mqtt5){
proplen = property__get_length_all(properties);
proplen = 0;
proplen += property__get_length_all(cmsg_props);
proplen += property__get_length_all(store_props);
varbytes = packet__varint_bytes(proplen);
if(varbytes > 4){
/* FIXME - Properties too big, don't publish any - should remove some first really */
properties = NULL;
cmsg_props = NULL;
store_props = NULL;
}else{
packetlen += proplen + varbytes;
}
Expand All @@ -175,7 +178,9 @@ int send__real_publish(struct mosquitto *mosq, uint16_t mid, const char *topic,
}

if(mosq->protocol == mosq_p_mqtt5){
property__write_all(packet, properties);
packet__write_varint(packet, proplen);
property__write_all(packet, cmsg_props, false);
property__write_all(packet, store_props, false);
}

/* Payload */
Expand Down
2 changes: 1 addition & 1 deletion lib/send_subscribe.c
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ int send__subscribe(struct mosquitto *mosq, int *mid, int topic_count, const cha
packet__write_uint16(packet, local_mid);

if(mosq->protocol == mosq_p_mqtt5){
property__write_all(packet, properties);
property__write_all(packet, properties, true);
}

/* Payload */
Expand Down
2 changes: 1 addition & 1 deletion lib/send_unsubscribe.c
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ int send__unsubscribe(struct mosquitto *mosq, int *mid, const char *topic, const

if(mosq->protocol == mosq_p_mqtt5){
/* We don't use User Property yet. */
property__write_all(packet, properties);
property__write_all(packet, properties, true);
}

/* Payload */
Expand Down
4 changes: 3 additions & 1 deletion src/bridge.c
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ int bridge__connect_step1(struct mosquitto_db *db, struct mosquitto *context)
context,
context->bridge->topics[i].local_topic,
context->bridge->topics[i].qos,
0,
MQTT_SUB_OPT_NO_LOCAL | MQTT_SUB_OPT_RETAIN_AS_PUBLISHED,
&db->subs) > 0){
return 1;
Expand Down Expand Up @@ -322,14 +323,15 @@ int bridge__connect(struct mosquitto_db *db, struct mosquitto *context)
context,
context->bridge->topics[i].local_topic,
context->bridge->topics[i].qos,
0,
MQTT_SUB_OPT_NO_LOCAL | MQTT_SUB_OPT_RETAIN_AS_PUBLISHED,
&db->subs) > 0){

return 1;
}
sub__retain_queue(db, context,
context->bridge->topics[i].local_topic,
context->bridge->topics[i].qos);
context->bridge->topics[i].qos, 0);
}
}

Expand Down
20 changes: 14 additions & 6 deletions src/database.c
Original file line number Diff line number Diff line change
Expand Up @@ -336,7 +336,7 @@ int db__message_delete(struct mosquitto_db *db, struct mosquitto *context, uint1
return MOSQ_ERR_SUCCESS;
}

int db__message_insert(struct mosquitto_db *db, struct mosquitto *context, uint16_t mid, enum mosquitto_msg_direction dir, int qos, bool retain, struct mosquitto_msg_store *stored)
int db__message_insert(struct mosquitto_db *db, struct mosquitto *context, uint16_t mid, enum mosquitto_msg_direction dir, int qos, bool retain, struct mosquitto_msg_store *stored, mosquitto_property *properties)
{
struct mosquitto_client_msg *msg;
struct mosquitto_client_msg **msgs, **last_msg;
Expand All @@ -362,6 +362,7 @@ int db__message_insert(struct mosquitto_db *db, struct mosquitto *context, uint1
for(i=0; i<stored->dest_id_count; i++){
if(!strcmp(stored->dest_ids[i], context->id)){
/* We have already sent this message to this client. */
mosquitto_property_free_all(&properties);
return MOSQ_ERR_SUCCESS;
}
}
Expand All @@ -370,9 +371,11 @@ int db__message_insert(struct mosquitto_db *db, struct mosquitto *context, uint1
/* Client is not connected only queue messages with QoS>0. */
if(qos == 0 && !db->config->queue_qos0_messages){
if(!context->bridge){
mosquitto_property_free_all(&properties);
return 2;
}else{
if(context->bridge->start_type != bst_lazy){
mosquitto_property_free_all(&properties);
return 2;
}
}
Expand All @@ -397,6 +400,7 @@ int db__message_insert(struct mosquitto_db *db, struct mosquitto *context, uint1
if(qos == 2){
state = mosq_ms_wait_for_pubrel;
}else{
mosquitto_property_free_all(&properties);
return 1;
}
}
Expand All @@ -412,6 +416,7 @@ int db__message_insert(struct mosquitto_db *db, struct mosquitto *context, uint1
context->id);
}
G_MSGS_DROPPED_INC();
mosquitto_property_free_all(&properties);
return 2;
}
}else{
Expand All @@ -425,6 +430,7 @@ int db__message_insert(struct mosquitto_db *db, struct mosquitto *context, uint1
"Outgoing messages are being dropped for client %s.",
context->id);
}
mosquitto_property_free_all(&properties);
return 2;
}
}
Expand All @@ -448,6 +454,7 @@ int db__message_insert(struct mosquitto_db *db, struct mosquitto *context, uint1
msg->dup = false;
msg->qos = qos;
msg->retain = retain;
msg->properties = properties;

if (state == mosq_ms_queued){
msgs = &(context->queued_msgs);
Expand Down Expand Up @@ -875,7 +882,7 @@ int db__message_write(struct mosquitto_db *db, struct mosquitto *context)
uint32_t payloadlen;
const void *payload;
int msg_count = 0;
mosquitto_property *properties;
mosquitto_property *cmsg_props = NULL, *store_props = NULL;
time_t now;

if(!context || context->sock == INVALID_SOCKET
Expand Down Expand Up @@ -903,11 +910,12 @@ int db__message_write(struct mosquitto_db *db, struct mosquitto *context)
qos = tail->qos;
payloadlen = tail->store->payloadlen;
payload = UHPA_ACCESS_PAYLOAD(tail->store);
properties = tail->store->properties;
cmsg_props = tail->properties;
store_props = tail->store->properties;

switch(tail->state){
case mosq_ms_publish_qos0:
rc = send__publish(context, mid, topic, payloadlen, payload, qos, retain, retries, properties);
rc = send__publish(context, mid, topic, payloadlen, payload, qos, retain, retries, cmsg_props, store_props);
if(!rc){
db__message_remove(db, context, &tail, last);
}else{
Expand All @@ -916,7 +924,7 @@ int db__message_write(struct mosquitto_db *db, struct mosquitto *context)
break;

case mosq_ms_publish_qos1:
rc = send__publish(context, mid, topic, payloadlen, payload, qos, retain, retries, properties);
rc = send__publish(context, mid, topic, payloadlen, payload, qos, retain, retries, cmsg_props, store_props);
if(!rc){
tail->timestamp = mosquitto_time();
tail->dup = 1; /* Any retry attempts are a duplicate. */
Expand All @@ -929,7 +937,7 @@ int db__message_write(struct mosquitto_db *db, struct mosquitto *context)
break;

case mosq_ms_publish_qos2:
rc = send__publish(context, mid, topic, payloadlen, payload, qos, retain, retries, properties);
rc = send__publish(context, mid, topic, payloadlen, payload, qos, retain, retries, cmsg_props, store_props);
if(!rc){
tail->timestamp = mosquitto_time();
tail->dup = 1; /* Any retry attempts are a duplicate. */
Expand Down
7 changes: 4 additions & 3 deletions src/handle_connack.c
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ int handle__connack(struct mosquitto_db *db, struct mosquitto *context)
if(context->bridge->notification_topic){
if(!context->bridge->notifications_local_only){
if(send__real_publish(context, mosquitto__mid_generate(context),
context->bridge->notification_topic, 1, &notification_payload, 1, true, 0, NULL)){
context->bridge->notification_topic, 1, &notification_payload, 1, true, 0, NULL, NULL)){

return 1;
}
Expand All @@ -74,7 +74,7 @@ int handle__connack(struct mosquitto_db *db, struct mosquitto *context)
notification_payload = '1';
if(!context->bridge->notifications_local_only){
if(send__real_publish(context, mosquitto__mid_generate(context),
notification_topic, 1, &notification_payload, 1, true, 0, NULL)){
notification_topic, 1, &notification_payload, 1, true, 0, NULL, NULL)){

mosquitto__free(notification_topic);
return 1;
Expand Down Expand Up @@ -107,11 +107,12 @@ int handle__connack(struct mosquitto_db *db, struct mosquitto *context)
context,
context->bridge->topics[i].local_topic,
context->bridge->topics[i].qos,
0,
MQTT_SUB_OPT_NO_LOCAL | MQTT_SUB_OPT_RETAIN_AS_PUBLISHED,
&db->subs)) return 1;
sub__retain_queue(db, context,
context->bridge->topics[i].local_topic,
context->bridge->topics[i].qos);
context->bridge->topics[i].qos, 0);
}
}
}
Expand Down
4 changes: 2 additions & 2 deletions src/handle_publish.c
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,7 @@ int handle__publish(struct mosquitto_db *db, struct mosquitto *context)
break;
case 2:
if(!dup){
res = db__message_insert(db, context, mid, mosq_md_in, qos, retain, stored);
res = db__message_insert(db, context, mid, mosq_md_in, qos, retain, stored, NULL);
}else{
res = 0;
}
Expand Down Expand Up @@ -308,7 +308,7 @@ int handle__publish(struct mosquitto_db *db, struct mosquitto *context)
if(db__message_store(db, context->id, mid, NULL, qos, 0, NULL, false, &stored, 0, NULL, 0)){
return 1;
}
res = db__message_insert(db, context, mid, mosq_md_in, qos, false, stored);
res = db__message_insert(db, context, mid, mosq_md_in, qos, false, stored, NULL);
}else{
res = 0;
}
Expand Down

0 comments on commit 7c3666d

Please sign in to comment.