Skip to content

Commit

Permalink
Fixes for bug eclipse#1273
Browse files Browse the repository at this point in the history
* Fix Will message for a persistent client incorrectly being sent when the client reconnects after a clean disconnect.
* Fix Will message for a persistent client not being sent on disconnect.
* Fix mosquitto_pub not using the `-c` option.

Thanks to Yannic Schröder.

Closes eclipse#1273.
  • Loading branch information
ralight authored and vankxr committed Aug 9, 2019
1 parent 644900a commit d9395da
Show file tree
Hide file tree
Showing 14 changed files with 164 additions and 80 deletions.
9 changes: 7 additions & 2 deletions ChangeLog.txt
Expand Up @@ -5,18 +5,23 @@ Broker:
- Fix detection of incoming v3.1/v3.1.1 bridges. Closes #1263.
- Fix default max_topic_alias listener config not being copied to the in-use
listener when compiled without TLS support.
- Fix random number generation if compiling using WITH_TLS=no and on Linux
- Fix random number generation if compiling using `WITH_TLS=no` and on Linux
with glibc >= 2.25. Without this fix, no random numbers would be generated
for e.g. on broker client id generation, and so clients connecting expecting
this feature would be unable to connect.
- Fix compilation problem related to getrandom() on non-glibc systems.
- Fix compilation problem related to `getrandom()` on non-glibc systems.
- Fix Will message for a persistent client incorrectly being sent when the
client reconnects after a clean disconnect. Closes #1273.
- Fix Will message for a persistent client not being sent on disconnect.
Closes #1273.

Clients:
- Fix -L url parsing when `/topic` part is missing.
- Stop some error messages being printed even when `--quiet` was used.
Closes #1284.
- Fix mosquitto_pub exiting with error code 0 when an error occurred.
Closes #1285.
- Fix mosquitto_pub not using the `-c` option. Closes #1273.


1.6.2 - 20190430
Expand Down
2 changes: 1 addition & 1 deletion client/pub_client.c
Expand Up @@ -476,7 +476,7 @@ int main(int argc, char *argv[])
goto cleanup;
}

mosq = mosquitto_new(cfg.id, true, NULL);
mosq = mosquitto_new(cfg.id, cfg.clean_session, NULL);
if(!mosq){
switch(errno){
case ENOMEM:
Expand Down
7 changes: 7 additions & 0 deletions lib/mosquitto_internal.h
Expand Up @@ -180,6 +180,12 @@ enum mosquitto__keyform {
};
#endif

struct will_delay_list {
struct mosquitto *context;
struct will_delay_list *prev;
struct will_delay_list *next;
};

struct mosquitto_msg_data{
#ifdef WITH_BROKER
struct mosquitto_client_msg *inflight;
Expand Down Expand Up @@ -224,6 +230,7 @@ struct mosquitto {
struct mosquitto__packet *out_packet;
struct mosquitto_message_all *will;
struct mosquitto__alias *aliases;
struct will_delay_list *will_delay_entry;
uint32_t maximum_packet_size;
int alias_count;
uint32_t will_delay_interval;
Expand Down
11 changes: 3 additions & 8 deletions src/context.c
Expand Up @@ -25,6 +25,7 @@ and the Eclipse Distribution License is available at
#include "packet_mosq.h"
#include "property_mosq.h"
#include "time_mosq.h"
#include "will_mosq.h"

#include "uthash.h"

Expand Down Expand Up @@ -218,22 +219,16 @@ void context__send_will(struct mosquitto_db *db, struct mosquitto *ctxt)
&ctxt->will->properties);
}
}
if(ctxt->will){
mosquitto_property_free_all(&ctxt->will->properties);
mosquitto__free(ctxt->will->msg.topic);
mosquitto__free(ctxt->will->msg.payload);
mosquitto__free(ctxt->will);
ctxt->will = NULL;
}
will__clear(ctxt);
}


void context__disconnect(struct mosquitto_db *db, struct mosquitto *context)
{
net__socket_close(db, context);

context__send_will(db, context);
if(context->session_expiry_interval == 0){
context__send_will(db, context);

#ifdef WITH_BRIDGE
if(!context->bridge)
Expand Down
7 changes: 2 additions & 5 deletions src/handle_auth.c
Expand Up @@ -25,6 +25,7 @@ and the Eclipse Distribution License is available at
#include "packet_mosq.h"
#include "property_mosq.h"
#include "send_mosq.h"
#include "will_mosq.h"


int handle__auth(struct mosquitto_db *db, struct mosquitto *context)
Expand Down Expand Up @@ -119,11 +120,7 @@ int handle__auth(struct mosquitto_db *db, struct mosquitto *context)
free(auth_data_out);
if(context->state == mosq_cs_authenticating && context->will){
/* Free will without sending if this is our first authentication attempt */
mosquitto_property_free_all(&context->will->properties);
mosquitto__free(context->will->msg.payload);
mosquitto__free(context->will->msg.topic);
mosquitto__free(context->will);
context->will = NULL;
will__clear(context);
}
if(rc == MOSQ_ERR_AUTH){
send__connack(db, context, 0, MQTT_RC_NOT_AUTHORIZED, NULL);
Expand Down
11 changes: 4 additions & 7 deletions src/handle_connect.c
Expand Up @@ -30,6 +30,7 @@ and the Eclipse Distribution License is available at
#include "time_mosq.h"
#include "tls_mosq.h"
#include "util_mosq.h"
#include "will_mosq.h"

#ifdef WITH_WEBSOCKETS
# include <libwebsockets.h>
Expand Down Expand Up @@ -160,6 +161,8 @@ int connect__on_authorised(struct mosquitto_db *db, struct mosquitto *context, v
}

session_expiry__remove(found_context);
will_delay__remove(found_context);
will__clear(found_context);

found_context->clean_start = true;
found_context->session_expiry_interval = 0;
Expand Down Expand Up @@ -836,13 +839,7 @@ int handle__connect(struct mosquitto_db *db, struct mosquitto *context)
return rc;
}else{
free(auth_data_out);
if(context->will){
mosquitto_property_free_all(&context->will->properties);
mosquitto__free(context->will->msg.payload);
mosquitto__free(context->will->msg.topic);
mosquitto__free(context->will);
context->will = NULL;
}
will__clear(context);
if(rc == MOSQ_ERR_AUTH){
send__connack(db, context, 0, MQTT_RC_NOT_AUTHORIZED, NULL);
mosquitto__free(context->id);
Expand Down
2 changes: 2 additions & 0 deletions src/handle_disconnect.c
Expand Up @@ -21,6 +21,7 @@ and the Eclipse Distribution License is available at
#include "packet_mosq.h"
#include "property_mosq.h"
#include "send_mosq.h"
#include "will_mosq.h"


int handle__disconnect(struct mosquitto_db *db, struct mosquitto *context)
Expand Down Expand Up @@ -66,6 +67,7 @@ int handle__disconnect(struct mosquitto_db *db, struct mosquitto *context)
if(reason_code == MQTT_RC_DISCONNECT_WITH_WILL_MSG){
context__set_state(context, mosq_cs_disconnect_with_will);
}else{
will__clear(context);
context__set_state(context, mosq_cs_disconnecting);
}
do_disconnect(db, context, MOSQ_ERR_SUCCESS);
Expand Down
1 change: 1 addition & 0 deletions src/mosquitto_broker_internal.h
Expand Up @@ -740,6 +740,7 @@ void do_disconnect(struct mosquitto_db *db, struct mosquitto *context, int reaso
int will_delay__add(struct mosquitto *context);
void will_delay__check(struct mosquitto_db *db, time_t now);
void will_delay__send_all(struct mosquitto_db *db);
void will_delay__remove(struct mosquitto *mosq);

#endif

17 changes: 11 additions & 6 deletions src/will_delay.c
Expand Up @@ -24,12 +24,6 @@ and the Eclipse Distribution License is available at
#include "memory_mosq.h"
#include "time_mosq.h"

struct will_delay_list {
struct mosquitto *context;
struct will_delay_list *prev;
struct will_delay_list *next;
};

static struct will_delay_list *delay_list = NULL;
static time_t last_check = 0;

Expand All @@ -48,6 +42,7 @@ int will_delay__add(struct mosquitto *context)
if(!item) return MOSQ_ERR_NOMEM;

item->context = context;
context->will_delay_entry = item;
item->context->will_delay_time = time(NULL) + item->context->will_delay_interval;

DL_INSERT_INORDER(delay_list, item, will_delay__cmp);
Expand All @@ -64,6 +59,7 @@ void will_delay__send_all(struct mosquitto_db *db)
DL_FOREACH_SAFE(delay_list, item, tmp){
DL_DELETE(delay_list, item);
item->context->will_delay_interval = 0;
item->context->will_delay_entry = NULL;
context__send_will(db, item->context);
mosquitto__free(item);
}
Expand Down Expand Up @@ -92,3 +88,12 @@ void will_delay__check(struct mosquitto_db *db, time_t now)

}


void will_delay__remove(struct mosquitto *mosq)
{
if(mosq->will_delay_entry != NULL){
DL_DELETE(delay_list, mosq->will_delay_entry);
mosq->will_delay_entry = NULL;
}
}

18 changes: 0 additions & 18 deletions test/broker/07-will-qos0-helper.py

This file was deleted.

82 changes: 49 additions & 33 deletions test/broker/07-will-qos0.py
Expand Up @@ -4,38 +4,54 @@

from mosq_test_helper import *

rc = 1
mid = 53
keepalive = 60
connect_packet = mosq_test.gen_connect("will-qos0-test", keepalive=keepalive)
connack_packet = mosq_test.gen_connack(rc=0)

subscribe_packet = mosq_test.gen_subscribe(mid, "will/qos0/test", 0)
suback_packet = mosq_test.gen_suback(mid, 0)

publish_packet = mosq_test.gen_publish("will/qos0/test", qos=0, payload="will-message")

port = mosq_test.get_port()
broker = mosq_test.start_broker(filename=os.path.basename(__file__), port=port)

try:
sock = mosq_test.do_client_connect(connect_packet, connack_packet, timeout=30, port=port)
mosq_test.do_send_receive(sock, subscribe_packet, suback_packet, "suback")

will = subprocess.Popen(['./07-will-qos0-helper.py', str(port)], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
will.wait()
(stdo, stde) = will.communicate()

if mosq_test.expect_packet(sock, "publish", publish_packet):
rc = 0

sock.close()
finally:
broker.terminate()
broker.wait()
(stdo, stde) = broker.communicate()
if rc:
print(stde.decode('utf-8'))

exit(rc)
def do_test(proto_ver, clean_session):
rc = 1
mid = 53
keepalive = 60
connect1_packet = mosq_test.gen_connect("will-qos0-test", keepalive=keepalive, proto_ver=proto_ver)
connack1_packet = mosq_test.gen_connack(rc=0, proto_ver=proto_ver)

if proto_ver == 5:
props = mqtt5_props.gen_uint32_prop(mqtt5_props.PROP_SESSION_EXPIRY_INTERVAL, 100)
else:
props = None

connect2_packet = mosq_test.gen_connect("test-helper", keepalive=keepalive, will_topic="will/qos0/test", will_payload=b"will-message", clean_session=clean_session, proto_ver=proto_ver, properties=props)
connack2_packet = mosq_test.gen_connack(rc=0, proto_ver=proto_ver)

subscribe_packet = mosq_test.gen_subscribe(mid, "will/qos0/test", 0, proto_ver=proto_ver)
suback_packet = mosq_test.gen_suback(mid, 0, proto_ver=proto_ver)

publish_packet = mosq_test.gen_publish("will/qos0/test", qos=0, payload="will-message", proto_ver=proto_ver)

port = mosq_test.get_port()
broker = mosq_test.start_broker(filename=os.path.basename(__file__), port=port)

try:
sock = mosq_test.do_client_connect(connect1_packet, connack1_packet, timeout=5, port=port)
mosq_test.do_send_receive(sock, subscribe_packet, suback_packet, "suback")

sock2 = mosq_test.do_client_connect(connect2_packet, connack2_packet, port=port, timeout=5)
sock2.close()

if mosq_test.expect_packet(sock, "publish", publish_packet):
rc = 0

sock.close()
except Exception as e:
print(e)
finally:
broker.terminate()
broker.wait()
(stdo, stde) = broker.communicate()
if rc:
print(stde.decode('utf-8'))
exit(rc)

do_test(4, True)
do_test(4, False)
do_test(5, True)
do_test(5, False)
exit(0)

75 changes: 75 additions & 0 deletions test/broker/07-will-reconnect-1273.py
@@ -0,0 +1,75 @@
#!/usr/bin/env python3

# Test whether a persistent client that disconnects with DISCONNECT has its
# will published when it reconnects. It shouldn't. Bug 1273:
# https://github.com/eclipse/mosquitto/issues/1273

from mosq_test_helper import *


def do_test(proto_ver):
rc = 1
keepalive = 60

connect1_packet = mosq_test.gen_connect("will-sub", keepalive=keepalive, proto_ver=proto_ver)
connack1_packet = mosq_test.gen_connack(rc=0, proto_ver=proto_ver)

mid = 1
subscribe1_packet = mosq_test.gen_subscribe(mid, "will/test", 0, proto_ver=proto_ver)
suback1_packet = mosq_test.gen_suback(mid, 0, proto_ver=proto_ver)

if proto_ver == 5:
props = mqtt5_props.gen_uint32_prop(mqtt5_props.PROP_SESSION_EXPIRY_INTERVAL, 100)
else:
props = None

connect2_packet = mosq_test.gen_connect("will-1273", keepalive=keepalive, will_topic="will/test", will_payload=b"will msg",clean_session=False, proto_ver=proto_ver, properties=props)
connack2a_packet = mosq_test.gen_connack(rc=0, proto_ver=proto_ver)
connack2b_packet = mosq_test.gen_connack(rc=0, flags=1, proto_ver=proto_ver)

disconnect_packet = mosq_test.gen_disconnect(proto_ver=proto_ver)

publish_packet = mosq_test.gen_publish("will/test", qos=0, payload="alive", proto_ver=proto_ver)

port = mosq_test.get_port()
broker = mosq_test.start_broker(filename=os.path.basename(__file__), port=port)

try:
# Connect and subscribe will-sub
sock1 = mosq_test.do_client_connect(connect1_packet, connack1_packet, timeout=30, port=port, connack_error="connack1")
mosq_test.do_send_receive(sock1, subscribe1_packet, suback1_packet, "suback")

# Connect will-1273
sock2 = mosq_test.do_client_connect(connect2_packet, connack2a_packet, timeout=30, port=port)
# Publish our "alive" message
sock2.send(publish_packet)
# Clean disconnect
sock2.send(disconnect_packet)

# will-1273 should get the "alive"
mosq_test.expect_packet(sock1, "publish1", publish_packet)

sock2.close()

# Reconnect
sock2 = mosq_test.do_client_connect(connect2_packet, connack2b_packet, timeout=30, port=port, connack_error="connack2")
# will-1273 to publish "alive" again, and will-sub to receive it.
sock2.send(publish_packet)
mosq_test.expect_packet(sock1, "publish2", publish_packet)
# Do a ping to make sure there are no other packets received.
mosq_test.do_ping(sock1)
rc = 0

sock1.close()
sock2.close()
finally:
broker.terminate()
broker.wait()
(stdo, stde) = broker.communicate()
if rc:
print(stde.decode('utf-8'))
exit(rc)

do_test(4)
do_test(5)
exit(0)
1 change: 1 addition & 0 deletions test/broker/Makefile
Expand Up @@ -144,6 +144,7 @@ endif
./07-will-null.py
./07-will-properties.py
./07-will-qos0.py
./07-will-reconnect-1273.py

08 :
ifeq ($(WITH_TLS),yes)
Expand Down

0 comments on commit d9395da

Please sign in to comment.