Skip to content

Commit

Permalink
Retain-as-published support.
Browse files Browse the repository at this point in the history
  • Loading branch information
ralight committed Dec 14, 2018
1 parent 4933f88 commit db79018
Show file tree
Hide file tree
Showing 13 changed files with 108 additions and 28 deletions.
5 changes: 5 additions & 0 deletions client/client_shared.c
Original file line number Diff line number Diff line change
Expand Up @@ -700,6 +700,11 @@ int client_config_line_proc(struct mosq_config *cfg, int pub_or_sub, int argc, c
}else{
cfg->pub_mode = MSGMODE_NULL;
}
}else if(!strcmp(argv[i], "--retain-as-published")){
if(pub_or_sub == CLIENT_PUB){
goto unknown_option;
}
cfg->sub_opts |= MQTT_SUB_OPT_RETAIN_AS_PUBLISHED;
}else if(!strcmp(argv[i], "-V") || !strcmp(argv[i], "--protocol-version")){
if(i==argc-1){
fprintf(stderr, "Error: --protocol-version argument given but no version specified.\n\n");
Expand Down
1 change: 1 addition & 0 deletions client/client_shared.h
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ struct mosq_config {
int msg_count; /* sub */
char *format; /* sub */
int timeout; /* sub */
int sub_opts; /* sub */
#ifdef WITH_SOCKS
char *socks5_host;
int socks5_port;
Expand Down
2 changes: 1 addition & 1 deletion client/sub_client.c
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ void my_connect_callback(struct mosquitto *mosq, void *obj, int result, int flag
int i;

if(!result){
mosquitto_subscribe_multiple(mosq, NULL, cfg.topic_count, cfg.topics, cfg.qos, cfg.subscribe_props);
mosquitto_subscribe_multiple(mosq, NULL, cfg.topic_count, cfg.topics, cfg.qos, cfg.sub_opts, cfg.subscribe_props);

for(i=0; i<cfg.unsub_topic_count; i++){
mosquitto_unsubscribe_v5(mosq, NULL, cfg.unsub_topics[i], cfg.unsubscribe_props);
Expand Down
13 changes: 8 additions & 5 deletions lib/actions.c
Original file line number Diff line number Diff line change
Expand Up @@ -145,17 +145,19 @@ int mosquitto_publish_v5(struct mosquitto *mosq, int *mid, const char *topic, in

int mosquitto_subscribe(struct mosquitto *mosq, int *mid, const char *sub, int qos)
{
return mosquitto_subscribe_v5(mosq, mid, sub, qos, NULL);
return mosquitto_subscribe_v5(mosq, mid, sub, qos, 0, NULL);
}

int mosquitto_subscribe_v5(struct mosquitto *mosq, int *mid, const char *sub, int qos, const mosquitto_property *properties)
int mosquitto_subscribe_v5(struct mosquitto *mosq, int *mid, const char *sub, int qos, int options, const mosquitto_property *properties)
{
const mosquitto_property *outgoing_properties = NULL;
mosquitto_property local_property;

int rc;

if(!mosq) return MOSQ_ERR_INVAL;
if(qos < 0 || qos > 2) return MOSQ_ERR_INVAL;
if((options & 0x30) == 0x30 || (options & 0xC0) != 0) return MOSQ_ERR_INVAL;
if(mosq->protocol != mosq_p_mqtt5 && properties) return MOSQ_ERR_NOT_SUPPORTED;
if(mosq->sock == INVALID_SOCKET) return MOSQ_ERR_NO_CONN;

Expand All @@ -175,11 +177,11 @@ int mosquitto_subscribe_v5(struct mosquitto *mosq, int *mid, const char *sub, in
if(rc) return rc;
}

return send__subscribe(mosq, mid, 1, (char *const *const)&sub, qos, outgoing_properties);
return send__subscribe(mosq, mid, 1, (char *const *const)&sub, qos|options, outgoing_properties);
}


int mosquitto_subscribe_multiple(struct mosquitto *mosq, int *mid, int sub_count, char *const *const sub, int qos, const mosquitto_property *properties)
int mosquitto_subscribe_multiple(struct mosquitto *mosq, int *mid, int sub_count, char *const *const sub, int qos, int options, const mosquitto_property *properties)
{
const mosquitto_property *outgoing_properties = NULL;
mosquitto_property local_property;
Expand All @@ -189,6 +191,7 @@ int mosquitto_subscribe_multiple(struct mosquitto *mosq, int *mid, int sub_count
if(!mosq || !sub_count || !sub) return MOSQ_ERR_INVAL;
if(mosq->protocol != mosq_p_mqtt5 && properties) return MOSQ_ERR_NOT_SUPPORTED;
if(qos < 0 || qos > 2) return MOSQ_ERR_INVAL;
if((options & 0x30) == 0x30 || (options & 0xC0) != 0) return MOSQ_ERR_INVAL;
if(mosq->sock == INVALID_SOCKET) return MOSQ_ERR_NO_CONN;

if(properties){
Expand All @@ -209,7 +212,7 @@ int mosquitto_subscribe_multiple(struct mosquitto *mosq, int *mid, int sub_count
if(mosquitto_validate_utf8(sub[i], strlen(sub[i]))) return MOSQ_ERR_MALFORMED_UTF8;
}

return send__subscribe(mosq, mid, sub_count, sub, qos, properties);
return send__subscribe(mosq, mid, sub_count, sub, qos|options, properties);
}


Expand Down
52 changes: 50 additions & 2 deletions lib/mosquitto.h
Original file line number Diff line number Diff line change
Expand Up @@ -840,6 +840,30 @@ libmosq_EXPORT int mosquitto_subscribe(struct mosquitto *mosq, int *mid, const c
* sent.
* sub - the subscription pattern.
* qos - the requested Quality of Service for this subscription.
* options - options to apply to this subscription, OR'd together. Set to 0 to
* use the default options, otherwise choose from the list:
* MQTT_SUB_OPT_NO_LOCAL - with this option set, if this client
* publishes to a topic to which it is subscribed, the
* broker will not publish the message back to the
* client.
* MQTT_SUB_OPT_RETAIN_AS_PUBLISHED - with this option set, messages
* published for this subscription will keep the
* retain flag as was set by the publishing client.
* The default behaviour without this option set has
* the retain flag indicating whether a message is
* fresh/stale.
* MQTT_SUB_OPT_SEND_RETAIN_ALWAYS - with this option set,
* pre-existing retained messages are sent as soon as
* the subscription is made, even if the subscription
* already exists. This is the default behaviour, so
* it is not necessary to set this option.
* MQTT_SUB_OPT_SEND_RETAIN_NEW - with this option set, pre-existing
* retained messages for this subscription will be
* sent when the subscription is made, but only if the
* subscription does not already exist.
* MQTT_SUB_OPT_SEND_RETAIN_NEVER - with this option set,
* pre-existing retained messages will never be sent
* for this subscription.
* properties - a valid mosquitto_property list, or NULL.
*
* Returns:
Expand All @@ -851,7 +875,7 @@ libmosq_EXPORT int mosquitto_subscribe(struct mosquitto *mosq, int *mid, const c
* MOSQ_ERR_DUPLICATE_PROPERTY - if a property is duplicated where it is forbidden.
* MOSQ_ERR_PROTOCOL - if any property is invalid for use with SUBSCRIBE.
*/
libmosq_EXPORT int mosquitto_subscribe_v5(struct mosquitto *mosq, int *mid, const char *sub, int qos, const mosquitto_property *properties);
libmosq_EXPORT int mosquitto_subscribe_v5(struct mosquitto *mosq, int *mid, const char *sub, int qos, int options, const mosquitto_property *properties);

/*
* Function: mosquitto_subscribe_multiple
Expand All @@ -871,6 +895,30 @@ libmosq_EXPORT int mosquitto_subscribe_v5(struct mosquitto *mosq, int *mid, cons
* familiar with this, just think of it as a safer "char **",
* equivalent to "const char *" for a simple string pointer.
* qos - the requested Quality of Service for each subscription.
* options - options to apply to this subscription, OR'd together. Set to 0 to
* use the default options, otherwise choose from the list:
* MQTT_SUB_OPT_NO_LOCAL - with this option set, if this client
* publishes to a topic to which it is subscribed, the
* broker will not publish the message back to the
* client.
* MQTT_SUB_OPT_RETAIN_AS_PUBLISHED - with this option set, messages
* published for this subscription will keep the
* retain flag as was set by the publishing client.
* The default behaviour without this option set has
* the retain flag indicating whether a message is
* fresh/stale.
* MQTT_SUB_OPT_SEND_RETAIN_ALWAYS - with this option set,
* pre-existing retained messages are sent as soon as
* the subscription is made, even if the subscription
* already exists. This is the default behaviour, so
* it is not necessary to set this option.
* MQTT_SUB_OPT_SEND_RETAIN_NEW - with this option set, pre-existing
* retained messages for this subscription will be
* sent when the subscription is made, but only if the
* subscription does not already exist.
* MQTT_SUB_OPT_SEND_RETAIN_NEVER - with this option set,
* pre-existing retained messages will never be sent
* for this subscription.
* properties - a valid mosquitto_property list, or NULL. Only used with MQTT
* v5 clients.
*
Expand All @@ -881,7 +929,7 @@ libmosq_EXPORT int mosquitto_subscribe_v5(struct mosquitto *mosq, int *mid, cons
* MOSQ_ERR_NO_CONN - if the client isn't connected to a broker.
* MOSQ_ERR_MALFORMED_UTF8 - if a topic is not valid UTF-8
*/
int mosquitto_subscribe_multiple(struct mosquitto *mosq, int *mid, int sub_count, char *const *const sub, int qos, const mosquitto_property *properties);
int mosquitto_subscribe_multiple(struct mosquitto *mosq, int *mid, int sub_count, char *const *const sub, int qos, int options, const mosquitto_property *properties);

/*
* Function: mosquitto_unsubscribe
Expand Down
8 changes: 8 additions & 0 deletions lib/mqtt_protocol.h
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,14 @@ enum mqtt5_property_type {
MQTT_PROP_TYPE_STRING_PAIR = 7
};

enum mqtt5_sub_options {
MQTT_SUB_OPT_NO_LOCAL = 0x04,
MQTT_SUB_OPT_RETAIN_AS_PUBLISHED = 0x08,
MQTT_SUB_OPT_SEND_RETAIN_ALWAYS = 0x00,
MQTT_SUB_OPT_SEND_RETAIN_NEW = 0x10,
MQTT_SUB_OPT_SEND_RETAIN_NEVER = 0x20,
};

#define MQTT_MAX_PAYLOAD 268435455

#endif
4 changes: 2 additions & 2 deletions lib/send_subscribe.c
Original file line number Diff line number Diff line change
Expand Up @@ -83,11 +83,11 @@ int send__subscribe(struct mosquitto *mosq, int *mid, int topic_count, const cha

#ifdef WITH_BROKER
# ifdef WITH_BRIDGE
log__printf(mosq, MOSQ_LOG_DEBUG, "Bridge %s sending SUBSCRIBE (Mid: %d, Topic: %s, QoS: %d)", mosq->id, local_mid, topic[0], topic_qos);
log__printf(mosq, MOSQ_LOG_DEBUG, "Bridge %s sending SUBSCRIBE (Mid: %d, Topic: %s, QoS: %d, Options: 0x%02x)", mosq->id, local_mid, topic[0], topic_qos&0x03, topic_qos&0xFC);
# endif
#else
for(i=0; i<topic_count; i++){
log__printf(mosq, MOSQ_LOG_DEBUG, "Client %s sending SUBSCRIBE (Mid: %d, Topic: %s, QoS: %d)", mosq->id, local_mid, topic[i], topic_qos);
log__printf(mosq, MOSQ_LOG_DEBUG, "Client %s sending SUBSCRIBE (Mid: %d, Topic: %s, QoS: %d, Options: 0x%02x)", mosq->id, local_mid, topic[i], topic_qos&0x03, topic_qos&0xFC);
}
#endif

Expand Down
16 changes: 15 additions & 1 deletion man/mosquitto_sub.1.xml
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
<arg choice='plain'><option>-R</option></arg>
<arg choice='plain'><option>--retained-only</option></arg>
</group>
<arg><option>--retain-as-published</option></arg>
<arg><option>-S</option></arg>
<arg choice='opt' rep='repeat'><option>-T</option> <replaceable>filter-out</replaceable></arg>
<arg choice='opt' rep='repeat'><option>-U</option> <replaceable>unsub-topic</replaceable></arg>
Expand Down Expand Up @@ -459,7 +460,20 @@
Messages with retain set are "stale", in that it is not
known when they were originally published. With this
argument in use, the receipt of the first non-stale
message will cause the client to exit.</para>
message will cause the client to exit. See also the
<option>--retain-as-published</option> option.</para>
</listitem>
</varlistentry>
<varlistentry>
<term><option>--retain-as-published</option></term>
<listitem>
<para>If this argument is given, the subscriptions will
have the "retain as published" option set. This means
that the retain flag on an incoming message will be
exactly as set by the publishing client, rather than
indicating whether the message is fresh/stale.</para>
<para>This option is not valid for MQTT v3.1/v3.1.1
clients.</para>
</listitem>
</varlistentry>
<varlistentry>
Expand Down
2 changes: 1 addition & 1 deletion src/bridge.c
Original file line number Diff line number Diff line change
Expand Up @@ -312,7 +312,7 @@ int bridge__connect(struct mosquitto_db *db, struct mosquitto *context)
for(i=0; i<context->bridge->topic_count; i++){
if(context->bridge->topics[i].direction == bd_out || context->bridge->topics[i].direction == bd_both){
log__printf(NULL, MOSQ_LOG_DEBUG, "Bridge %s doing local SUBSCRIBE on topic %s", context->id, context->bridge->topics[i].local_topic);
if(sub__add(db, context, context->bridge->topics[i].local_topic, context->bridge->topics[i].qos, &db->subs) > 0){
if(sub__add(db, context, context->bridge->topics[i].local_topic, context->bridge->topics[i].qos, true, &db->subs) > 0){
return 1;
}
sub__retain_queue(db, context,
Expand Down
7 changes: 5 additions & 2 deletions src/handle_subscribe.c
Original file line number Diff line number Diff line change
Expand Up @@ -94,10 +94,13 @@ int handle__subscribe(struct mosquitto_db *db, struct mosquitto *context)
}
if(context->protocol == mosq_p_mqtt31 || context->protocol == mosq_p_mqtt311){
qos = subscription_options;
if(context->is_bridge){
retain_as_published = 1;
}
}else{
qos = subscription_options & 0x03;

retain_as_published = subscription_options & 0x04;
retain_as_published = subscription_options & 0x08;

retain_handling = (subscription_options & 0x30) >> 4;
if(retain_handling == 3){
Expand Down Expand Up @@ -148,7 +151,7 @@ int handle__subscribe(struct mosquitto_db *db, struct mosquitto *context)
}

if(qos != 0x80){
rc2 = sub__add(db, context, sub, qos, &db->subs);
rc2 = sub__add(db, context, sub, qos, retain_as_published, &db->subs);
if(context->protocol == mosq_p_mqtt311 || context->protocol == mosq_p_mqtt31){
if(rc2 == MOSQ_ERR_SUCCESS || rc2 == MOSQ_ERR_SUB_EXISTS){
if(sub__retain_queue(db, context, sub, qos)) rc = 1;
Expand Down
3 changes: 2 additions & 1 deletion src/mosquitto_broker_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,7 @@ struct mosquitto__subleaf {
struct mosquitto__subleaf *next;
struct mosquitto *context;
int qos;
bool retain_as_published;
};

struct mosquitto__subhier {
Expand Down Expand Up @@ -572,7 +573,7 @@ void sys_tree__update(struct mosquitto_db *db, int interval, time_t start_time);
/* ============================================================
* Subscription functions
* ============================================================ */
int sub__add(struct mosquitto_db *db, struct mosquitto *context, const char *sub, int qos, struct mosquitto__subhier **root);
int sub__add(struct mosquitto_db *db, struct mosquitto *context, const char *sub, int qos, bool retain_as_published, struct mosquitto__subhier **root);
struct mosquitto__subhier *sub__add_hier_entry(struct mosquitto__subhier *parent, struct mosquitto__subhier **sibling, const char *topic, size_t len);
int sub__remove(struct mosquitto_db *db, struct mosquitto *context, const char *sub, struct mosquitto__subhier *root);
void sub__tree_print(struct mosquitto__subhier *root, int level);
Expand Down
3 changes: 2 additions & 1 deletion src/persist.c
Original file line number Diff line number Diff line change
Expand Up @@ -935,7 +935,8 @@ static int persist__restore_sub(struct mosquitto_db *db, const char *client_id,

context = persist__find_or_add_context(db, client_id, 0);
if(!context) return 1;
return sub__add(db, context, sub, qos, &db->subs);
/* FIXME - retain_as_published needs saving */
return sub__add(db, context, sub, qos, false, &db->subs);
}

#endif
20 changes: 8 additions & 12 deletions src/subs.c
Original file line number Diff line number Diff line change
Expand Up @@ -123,14 +123,9 @@ static int subs__process(struct mosquitto_db *db, struct mosquitto__subhier *hie
}else{
mid = 0;
}
if(leaf->context->is_bridge){
/* If we know the client is a bridge then we should set retain
* even if the message is fresh. If we don't do this, retained
* messages won't be propagated. */
if(leaf->retain_as_published){
client_retain = retain;
}else{
/* Client is not a bridge and this isn't a stale message so
* retain should be false. */
client_retain = false;
}
if(db__message_insert(db, leaf->context, mid, mosq_md_out, msg_qos, client_retain, stored) == 1) rc = 1;
Expand Down Expand Up @@ -245,7 +240,7 @@ static void sub__topic_tokens_free(struct sub__token *tokens)
}
}

static int sub__add_recurse(struct mosquitto_db *db, struct mosquitto *context, int qos, struct mosquitto__subhier *subhier, struct sub__token *tokens)
static int sub__add_recurse(struct mosquitto_db *db, struct mosquitto *context, int qos, bool retain_as_published, struct mosquitto__subhier *subhier, struct sub__token *tokens)
/* FIXME - this function has the potential to leak subhier, audit calling functions. */
{
struct mosquitto__subhier *branch;
Expand Down Expand Up @@ -279,6 +274,7 @@ static int sub__add_recurse(struct mosquitto_db *db, struct mosquitto *context,
leaf->next = NULL;
leaf->context = context;
leaf->qos = qos;
leaf->retain_as_published = retain_as_published;
for(i=0; i<context->sub_count; i++){
if(!context->subs[i]){
context->subs[i] = subhier;
Expand Down Expand Up @@ -311,13 +307,13 @@ static int sub__add_recurse(struct mosquitto_db *db, struct mosquitto *context,

HASH_FIND(hh, subhier->children, UHPA_ACCESS_TOPIC(tokens), tokens->topic_len, branch);
if(branch){
return sub__add_recurse(db, context, qos, branch, tokens->next);
return sub__add_recurse(db, context, qos, retain_as_published, branch, tokens->next);
}else{
/* Not found */
branch = sub__add_hier_entry(subhier, &subhier->children, UHPA_ACCESS_TOPIC(tokens), tokens->topic_len);
if(!branch) return MOSQ_ERR_NOMEM;

return sub__add_recurse(db, context, qos, branch, tokens->next);
return sub__add_recurse(db, context, qos, retain_as_published, branch, tokens->next);
}
}

Expand Down Expand Up @@ -446,7 +442,7 @@ struct mosquitto__subhier *sub__add_hier_entry(struct mosquitto__subhier *parent
}


int sub__add(struct mosquitto_db *db, struct mosquitto *context, const char *sub, int qos, struct mosquitto__subhier **root)
int sub__add(struct mosquitto_db *db, struct mosquitto *context, const char *sub, int qos, bool retain_as_published, struct mosquitto__subhier **root)
{
int rc = 0;
struct mosquitto__subhier *subhier;
Expand All @@ -468,7 +464,7 @@ int sub__add(struct mosquitto_db *db, struct mosquitto *context, const char *sub
}

}
rc = sub__add_recurse(db, context, qos, subhier, tokens);
rc = sub__add_recurse(db, context, qos, retain_as_published, subhier, tokens);

sub__topic_tokens_free(tokens);

Expand Down Expand Up @@ -519,7 +515,7 @@ int sub__messages_queue(struct mosquitto_db *db, const char *source_id, const ch
/* We have a message that needs to be retained, so ensure that the subscription
* tree for its topic exists.
*/
sub__add_recurse(db, NULL, 0, subhier, tokens);
sub__add_recurse(db, NULL, 0, false, subhier, tokens);
}
sub__search(db, subhier, tokens, source_id, topic, qos, retain, *stored, true);
}
Expand Down

0 comments on commit db79018

Please sign in to comment.