Skip to content

Commit

Permalink
Merge branch 'rfc-bridge-local-clean' of git:https://github.com/etactica/mo…
Browse files Browse the repository at this point in the history
…squitto into etactica-rfc-bridge-local-clean
  • Loading branch information
ralight committed May 5, 2020
2 parents d3247a8 + 07d73c7 commit ee7d198
Show file tree
Hide file tree
Showing 11 changed files with 326 additions and 25 deletions.
11 changes: 11 additions & 0 deletions man/mosquitto.conf.5.xml
Original file line number Diff line number Diff line change
Expand Up @@ -1554,6 +1554,17 @@ openssl dhparam -out dhparam.pem 2048</programlisting>
normal.</para>
</listitem>
</varlistentry>
<varlistentry>
<term><option>local_cleansession</option> [ true | false]</term>
<listitem>
<para>The regular <option>cleansession</option> covers both the local subscriptions
and the remote subscriptions. local_cleansession allows splitting this.
Setting <replaceable>false</replaceable> will mean that the local connection
will preserve subscription, independent of the remote connection.
</para>
<para>Defaults to the value of bridge.cleansession unless explicitly specified.</para>
</listitem>
</varlistentry>
<varlistentry>
<term><option>connection</option> <replaceable>name</replaceable></term>
<listitem>
Expand Down
12 changes: 6 additions & 6 deletions src/bridge.c
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,10 @@ int bridge__new(struct mosquitto_db *db, struct mosquitto__bridge *bridge)
#endif

bridge->try_private_accepted = true;
if(bridge->clean_start_local == -1){
/* default to "regular" clean start setting */
bridge->clean_start_local = bridge->clean_start;
}
new_context->retain_available = bridge->outgoing_retain;
new_context->protocol = bridge->protocol_version;

Expand Down Expand Up @@ -161,9 +165,7 @@ int bridge__connect_step1(struct mosquitto_db *db, struct mosquitto *context)
bridge__packet_cleanup(context);
db__message_reconnect_reset(db, context);

if(context->clean_start){
db__messages_delete(db, context);
}
db__messages_delete(db, context);

/* Delete all local subscriptions even for clean_start==false. We don't
* remove any messages and the next loop carries out the resubscription
Expand Down Expand Up @@ -340,9 +342,7 @@ int bridge__connect(struct mosquitto_db *db, struct mosquitto *context)
bridge__packet_cleanup(context);
db__message_reconnect_reset(db, context);

if(context->clean_start){
db__messages_delete(db, context);
}
db__messages_delete(db, context);

/* Delete all local subscriptions even for clean_start==false. We don't
* remove any messages and the next loop carries out the resubscription
Expand Down
12 changes: 12 additions & 0 deletions src/conf.c
Original file line number Diff line number Diff line change
Expand Up @@ -1214,6 +1214,17 @@ int config__read_file_core(struct mosquitto__config *config, bool reload, struct
if(conf__parse_bool(&token, "cleansession", &cur_bridge->clean_start, saveptr)) return MOSQ_ERR_INVAL;
#else
log__printf(NULL, MOSQ_LOG_WARNING, "Warning: Bridge support not available.");
#endif
}else if(!strcmp(token, "local_cleansession")){
#ifdef WITH_BRIDGE
if(reload) continue; // FIXME
if(!cur_bridge){
log__printf(NULL, MOSQ_LOG_ERR, "Error: Invalid bridge configuration.");
return MOSQ_ERR_INVAL;
}
if(conf__parse_bool(&token, "local_cleansession", (bool *) &cur_bridge->clean_start_local, saveptr)) return MOSQ_ERR_INVAL;
#else
log__printf(NULL, MOSQ_LOG_WARNING, "Warning: Bridge support not available.");
#endif
}else if(!strcmp(token, "clientid_prefixes")){
if(reload){
Expand Down Expand Up @@ -1261,6 +1272,7 @@ int config__read_file_core(struct mosquitto__config *config, bool reload, struct
cur_bridge->protocol_version = mosq_p_mqtt311;
cur_bridge->primary_retry_sock = INVALID_SOCKET;
cur_bridge->outgoing_retain = true;
cur_bridge->clean_start_local = -1;
}else{
log__printf(NULL, MOSQ_LOG_ERR, "Error: Empty connection value in configuration.");
return MOSQ_ERR_INVAL;
Expand Down
5 changes: 1 addition & 4 deletions src/context.c
Original file line number Diff line number Diff line change
Expand Up @@ -123,8 +123,8 @@ void context__cleanup(struct mosquitto_db *db, struct mosquitto *context, bool d
net__socket_close(db, context);
if(do_free || context->clean_start){
sub__clean_session(db, context);
db__messages_delete(db, context);
}
db__messages_delete(db, context);

mosquitto__free(context->address);
context->address = NULL;
Expand All @@ -148,9 +148,6 @@ void context__cleanup(struct mosquitto_db *db, struct mosquitto *context, bool d
context->out_packet = context->out_packet->next;
mosquitto__free(packet);
}
if(do_free || context->clean_start){
db__messages_delete(db, context);
}
#if defined(WITH_BROKER) && defined(__GLIBC__) && defined(WITH_ADNS)
if(context->adns){
gai_cancel(context->adns);
Expand Down
29 changes: 16 additions & 13 deletions src/database.c
Original file line number Diff line number Diff line change
Expand Up @@ -564,20 +564,23 @@ int db__messages_delete(struct mosquitto_db *db, struct mosquitto *context)
{
if(!context) return MOSQ_ERR_INVAL;

db__messages_delete_list(db, &context->msgs_in.inflight);
db__messages_delete_list(db, &context->msgs_in.queued);
db__messages_delete_list(db, &context->msgs_out.inflight);
db__messages_delete_list(db, &context->msgs_out.queued);

context->msgs_in.msg_bytes = 0;
context->msgs_in.msg_bytes12 = 0;
context->msgs_in.msg_count = 0;
context->msgs_in.msg_count12 = 0;
if(context->clean_start || (context->bridge && context->bridge->clean_start)){
db__messages_delete_list(db, &context->msgs_in.inflight);
db__messages_delete_list(db, &context->msgs_in.queued);
context->msgs_in.msg_bytes = 0;
context->msgs_in.msg_bytes12 = 0;
context->msgs_in.msg_count = 0;
context->msgs_in.msg_count12 = 0;
}

context->msgs_out.msg_bytes = 0;
context->msgs_out.msg_bytes12 = 0;
context->msgs_out.msg_count = 0;
context->msgs_out.msg_count12 = 0;
if(context->bridge && context->bridge->clean_start_local){
db__messages_delete_list(db, &context->msgs_out.inflight);
db__messages_delete_list(db, &context->msgs_out.queued);
context->msgs_out.msg_bytes = 0;
context->msgs_out.msg_bytes12 = 0;
context->msgs_out.msg_count = 0;
context->msgs_out.msg_count12 = 0;
}

return MOSQ_ERR_SUCCESS;
}
Expand Down
1 change: 1 addition & 0 deletions src/mosquitto_broker_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -514,6 +514,7 @@ struct mosquitto__bridge{
bool try_private;
bool try_private_accepted;
bool clean_start;
int8_t clean_start_local;
int keepalive;
struct mosquitto__bridge_topic *topics;
int topic_count;
Expand Down
5 changes: 3 additions & 2 deletions test/broker/06-bridge-b2br-disconnect-qos1.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@ def write_config(filename, port1, port2, protocol_version):
f.write("topic bridge/# both 1\n")
f.write("notifications false\n")
f.write("restart_timeout 5\n")
f.write("bridge_protocol_version %s\n" %(protocol_version))
f.write("bridge_protocol_version %s\n" % (protocol_version))


def do_test(proto_ver):
if proto_ver == 4:
Expand All @@ -33,12 +34,12 @@ def do_test(proto_ver):
connect_packet = mosq_test.gen_connect(client_id, keepalive=keepalive, clean_session=False, proto_ver=proto_ver_connect)
connack_packet = mosq_test.gen_connack(rc=0, proto_ver=proto_ver)

mid = 1
if proto_ver == 5:
opts = mqtt5_opts.MQTT_SUB_OPT_NO_LOCAL | mqtt5_opts.MQTT_SUB_OPT_RETAIN_AS_PUBLISHED
else:
opts = 0

mid = 1
subscribe_packet = mosq_test.gen_subscribe(mid, "bridge/#", 1 | opts, proto_ver=proto_ver)
suback_packet = mosq_test.gen_suback(mid, 1, proto_ver=proto_ver)

Expand Down
Loading

0 comments on commit ee7d198

Please sign in to comment.