Skip to content

Commit

Permalink
Set remaining message expiry interval when republishing.
Browse files Browse the repository at this point in the history
  • Loading branch information
ralight committed Jan 22, 2019
1 parent 0a9885a commit 6a59e92
Show file tree
Hide file tree
Showing 10 changed files with 113 additions and 20 deletions.
4 changes: 2 additions & 2 deletions lib/actions.c
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ int mosquitto_publish_v5(struct mosquitto *mosq, int *mid, const char *topic, in
}

if(qos == 0){
return send__publish(mosq, local_mid, topic, payloadlen, payload, qos, retain, false, outgoing_properties, NULL);
return send__publish(mosq, local_mid, topic, payloadlen, payload, qos, retain, false, outgoing_properties, NULL, 0);
}else{
message = mosquitto__calloc(1, sizeof(struct mosquitto_message_all));
if(!message) return MOSQ_ERR_NOMEM;
Expand Down Expand Up @@ -134,7 +134,7 @@ int mosquitto_publish_v5(struct mosquitto *mosq, int *mid, const char *topic, in
message->state = mosq_ms_wait_for_pubrec;
}
pthread_mutex_unlock(&mosq->out_message_mutex);
return send__publish(mosq, message->msg.mid, message->msg.topic, message->msg.payloadlen, message->msg.payload, message->msg.qos, message->msg.retain, message->dup, outgoing_properties, NULL);
return send__publish(mosq, message->msg.mid, message->msg.topic, message->msg.payloadlen, message->msg.payload, message->msg.qos, message->msg.retain, message->dup, outgoing_properties, NULL, 0);
}else{
message->state = mosq_ms_invalid;
pthread_mutex_unlock(&mosq->out_message_mutex);
Expand Down
4 changes: 2 additions & 2 deletions lib/messages_mosq.c
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,7 @@ int message__remove(struct mosquitto *mosq, uint16_t mid, enum mosquitto_msg_dir
}else if(cur->msg.qos == 2){
cur->state = mosq_ms_wait_for_pubrec;
}
rc = send__publish(mosq, cur->msg.mid, cur->msg.topic, cur->msg.payloadlen, cur->msg.payload, cur->msg.qos, cur->msg.retain, cur->dup, NULL, NULL);
rc = send__publish(mosq, cur->msg.mid, cur->msg.topic, cur->msg.payloadlen, cur->msg.payload, cur->msg.qos, cur->msg.retain, cur->dup, NULL, NULL, 0);
if(rc){
pthread_mutex_unlock(&mosq->out_message_mutex);
return rc;
Expand Down Expand Up @@ -330,7 +330,7 @@ void message__retry_check_actual(struct mosquitto *mosq, struct mosquitto_messag
case mosq_ms_publish_qos2:
messages->timestamp = now;
messages->dup = true;
send__publish(mosq, messages->msg.mid, messages->msg.topic, messages->msg.payloadlen, messages->msg.payload, messages->msg.qos, messages->msg.retain, messages->dup, NULL, NULL);
send__publish(mosq, messages->msg.mid, messages->msg.topic, messages->msg.payloadlen, messages->msg.payload, messages->msg.qos, messages->msg.retain, messages->dup, NULL, NULL, 0);
break;
case mosq_ms_wait_for_pubrel:
messages->timestamp = now;
Expand Down
4 changes: 2 additions & 2 deletions lib/send_mosq.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,15 @@ and the Eclipse Distribution License is available at

int send__simple_command(struct mosquitto *mosq, uint8_t command);
int send__command_with_mid(struct mosquitto *mosq, uint8_t command, uint16_t mid, bool dup, uint8_t reason_code, const mosquitto_property *properties);
int send__real_publish(struct mosquitto *mosq, uint16_t mid, const char *topic, uint32_t payloadlen, const void *payload, int qos, bool retain, bool dup, const mosquitto_property *cmsg_props, const mosquitto_property *store_props);
int send__real_publish(struct mosquitto *mosq, uint16_t mid, const char *topic, uint32_t payloadlen, const void *payload, int qos, bool retain, bool dup, const mosquitto_property *cmsg_props, const mosquitto_property *store_props, uint32_t expiry_interval);

int send__connect(struct mosquitto *mosq, uint16_t keepalive, bool clean_session, const mosquitto_property *properties);
int send__disconnect(struct mosquitto *mosq, uint8_t reason_code, const mosquitto_property *properties);
int send__pingreq(struct mosquitto *mosq);
int send__pingresp(struct mosquitto *mosq);
int send__puback(struct mosquitto *mosq, uint16_t mid);
int send__pubcomp(struct mosquitto *mosq, uint16_t mid);
int send__publish(struct mosquitto *mosq, uint16_t mid, const char *topic, uint32_t payloadlen, const void *payload, int qos, bool retain, bool dup, const mosquitto_property *cmsg_props, const mosquitto_property *store_props);
int send__publish(struct mosquitto *mosq, uint16_t mid, const char *topic, uint32_t payloadlen, const void *payload, int qos, bool retain, bool dup, const mosquitto_property *cmsg_props, const mosquitto_property *store_props, uint32_t expiry_interval);
int send__pubrec(struct mosquitto *mosq, uint16_t mid);
int send__pubrel(struct mosquitto *mosq, uint16_t mid);
int send__subscribe(struct mosquitto *mosq, int *mid, int topic_count, char *const *const topic, int topic_qos, const mosquitto_property *properties);
Expand Down
22 changes: 18 additions & 4 deletions lib/send_publish.c
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ and the Eclipse Distribution License is available at
#include "send_mosq.h"


int send__publish(struct mosquitto *mosq, uint16_t mid, const char *topic, uint32_t payloadlen, const void *payload, int qos, bool retain, bool dup, const mosquitto_property *cmsg_props, const mosquitto_property *store_props)
int send__publish(struct mosquitto *mosq, uint16_t mid, const char *topic, uint32_t payloadlen, const void *payload, int qos, bool retain, bool dup, const mosquitto_property *cmsg_props, const mosquitto_property *store_props, uint32_t expiry_interval)
{
#ifdef WITH_BROKER
size_t len;
Expand Down Expand Up @@ -110,7 +110,7 @@ int send__publish(struct mosquitto *mosq, uint16_t mid, const char *topic, uint3
}
log__printf(NULL, MOSQ_LOG_DEBUG, "Sending PUBLISH to %s (d%d, q%d, r%d, m%d, '%s', ... (%ld bytes))", mosq->id, dup, qos, retain, mid, mapped_topic, (long)payloadlen);
G_PUB_BYTES_SENT_INC(payloadlen);
rc = send__real_publish(mosq, mid, mapped_topic, payloadlen, payload, qos, retain, dup, cmsg_props, store_props);
rc = send__real_publish(mosq, mid, mapped_topic, payloadlen, payload, qos, retain, dup, cmsg_props, store_props, expiry_interval);
mosquitto__free(mapped_topic);
return rc;
}
Expand All @@ -124,16 +124,17 @@ int send__publish(struct mosquitto *mosq, uint16_t mid, const char *topic, uint3
log__printf(mosq, MOSQ_LOG_DEBUG, "Client %s sending PUBLISH (d%d, q%d, r%d, m%d, '%s', ... (%ld bytes))", mosq->id, dup, qos, retain, mid, topic, (long)payloadlen);
#endif

return send__real_publish(mosq, mid, topic, payloadlen, payload, qos, retain, dup, cmsg_props, store_props);
return send__real_publish(mosq, mid, topic, payloadlen, payload, qos, retain, dup, cmsg_props, store_props, expiry_interval);
}


int send__real_publish(struct mosquitto *mosq, uint16_t mid, const char *topic, uint32_t payloadlen, const void *payload, int qos, bool retain, bool dup, const mosquitto_property *cmsg_props, const mosquitto_property *store_props)
int send__real_publish(struct mosquitto *mosq, uint16_t mid, const char *topic, uint32_t payloadlen, const void *payload, int qos, bool retain, bool dup, const mosquitto_property *cmsg_props, const mosquitto_property *store_props, uint32_t expiry_interval)
{
struct mosquitto__packet *packet = NULL;
int packetlen;
int proplen = 0, varbytes;
int rc;
mosquitto_property expiry_prop;

assert(mosq);

Expand All @@ -147,11 +148,21 @@ int send__real_publish(struct mosquitto *mosq, uint16_t mid, const char *topic,
proplen = 0;
proplen += property__get_length_all(cmsg_props);
proplen += property__get_length_all(store_props);
if(expiry_interval > 0){
expiry_prop.next = NULL;
expiry_prop.value.i32 = expiry_interval;
expiry_prop.identifier = MQTT_PROP_MESSAGE_EXPIRY_INTERVAL;
expiry_prop.client_generated = false;

proplen += property__get_length_all(&expiry_prop);
}

varbytes = packet__varint_bytes(proplen);
if(varbytes > 4){
/* FIXME - Properties too big, don't publish any - should remove some first really */
cmsg_props = NULL;
store_props = NULL;
expiry_interval = 0;
}else{
packetlen += proplen + varbytes;
}
Expand Down Expand Up @@ -181,6 +192,9 @@ int send__real_publish(struct mosquitto *mosq, uint16_t mid, const char *topic,
packet__write_varint(packet, proplen);
property__write_all(packet, cmsg_props, false);
property__write_all(packet, store_props, false);
if(expiry_interval > 0){
property__write_all(packet, &expiry_prop, false);
}
}

/* Payload */
Expand Down
19 changes: 12 additions & 7 deletions src/database.c
Original file line number Diff line number Diff line change
Expand Up @@ -887,6 +887,7 @@ int db__message_write(struct mosquitto_db *db, struct mosquitto *context)
int msg_count = 0;
mosquitto_property *cmsg_props = NULL, *store_props = NULL;
time_t now;
uint32_t expiry_interval = 0;

if(!context || context->sock == INVALID_SOCKET
|| (context->state == mosq_cs_connected && !context->id)){
Expand All @@ -901,10 +902,14 @@ int db__message_write(struct mosquitto_db *db, struct mosquitto *context)
tail = context->inflight_msgs;
while(tail){
msg_count++;
if(tail->store->message_expiry_time && now > tail->store->message_expiry_time){
/* Message is expired, must not send. */
db__message_remove(db, context, &tail, last);
continue;
if(tail->store->message_expiry_time){
if(now > tail->store->message_expiry_time){
/* Message is expired, must not send. */
db__message_remove(db, context, &tail, last);
continue;
}else{
expiry_interval = tail->store->message_expiry_time - now;
}
}
mid = tail->mid;
retries = tail->dup;
Expand All @@ -918,7 +923,7 @@ int db__message_write(struct mosquitto_db *db, struct mosquitto *context)

switch(tail->state){
case mosq_ms_publish_qos0:
rc = send__publish(context, mid, topic, payloadlen, payload, qos, retain, retries, cmsg_props, store_props);
rc = send__publish(context, mid, topic, payloadlen, payload, qos, retain, retries, cmsg_props, store_props, expiry_interval);
if(!rc){
db__message_remove(db, context, &tail, last);
}else{
Expand All @@ -927,7 +932,7 @@ int db__message_write(struct mosquitto_db *db, struct mosquitto *context)
break;

case mosq_ms_publish_qos1:
rc = send__publish(context, mid, topic, payloadlen, payload, qos, retain, retries, cmsg_props, store_props);
rc = send__publish(context, mid, topic, payloadlen, payload, qos, retain, retries, cmsg_props, store_props, expiry_interval);
if(!rc){
tail->timestamp = mosquitto_time();
tail->dup = 1; /* Any retry attempts are a duplicate. */
Expand All @@ -940,7 +945,7 @@ int db__message_write(struct mosquitto_db *db, struct mosquitto *context)
break;

case mosq_ms_publish_qos2:
rc = send__publish(context, mid, topic, payloadlen, payload, qos, retain, retries, cmsg_props, store_props);
rc = send__publish(context, mid, topic, payloadlen, payload, qos, retain, retries, cmsg_props, store_props, expiry_interval);
if(!rc){
tail->timestamp = mosquitto_time();
tail->dup = 1; /* Any retry attempts are a duplicate. */
Expand Down
2 changes: 1 addition & 1 deletion src/handle_auth.c
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ int handle__auth(struct mosquitto_db *db, struct mosquitto *context)
return MOSQ_ERR_PROTOCOL;
}

if(mosq->in_packet.remaining_length > 0){
if(context->in_packet.remaining_length > 0){
if(packet__read_byte(&context->in_packet, &reason_code)) return 1;

rc = property__read_all(CMD_AUTH, &context->in_packet, &properties);
Expand Down
4 changes: 2 additions & 2 deletions src/handle_connack.c
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ int handle__connack(struct mosquitto_db *db, struct mosquitto *context)
if(context->bridge->notification_topic){
if(!context->bridge->notifications_local_only){
if(send__real_publish(context, mosquitto__mid_generate(context),
context->bridge->notification_topic, 1, &notification_payload, 1, true, 0, NULL, NULL)){
context->bridge->notification_topic, 1, &notification_payload, 1, true, 0, NULL, NULL, 0)){

return 1;
}
Expand All @@ -74,7 +74,7 @@ int handle__connack(struct mosquitto_db *db, struct mosquitto *context)
notification_payload = '1';
if(!context->bridge->notifications_local_only){
if(send__real_publish(context, mosquitto__mid_generate(context),
notification_topic, 1, &notification_payload, 1, true, 0, NULL, NULL)){
notification_topic, 1, &notification_payload, 1, true, 0, NULL, NULL, 0)){

mosquitto__free(notification_topic);
return 1;
Expand Down
72 changes: 72 additions & 0 deletions test/broker/02-subpub-qos1-message-expiry.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
#!/usr/bin/env python

# Test whether the broker reduces the message expiry interval when republishing.
# MQTT v5

# Client connects with clean session set false, subscribes with qos=1, then disconnects
# Helper publishes two messages, one with a short expiry and one with a long expiry
# We wait until the short expiry will have expired but the long one not.
# Client reconnects, expects delivery of the long expiry message with a reduced
# expiry interval property.

from mosq_test_helper import *

rc = 1
mid = 53
keepalive = 60
connect_packet = mosq_test.gen_connect("subpub-qos0-test", keepalive=keepalive, proto_ver=5, clean_session=False)
connack1_packet = mosq_test.gen_connack(rc=0, proto_ver=5)
connack2_packet = mosq_test.gen_connack(rc=0, proto_ver=5, flags=1)

subscribe_packet = mosq_test.gen_subscribe(mid, "subpub/qos1", 1, proto_ver=5)
suback_packet = mosq_test.gen_suback(mid, 1, proto_ver=5)



helper_connect = mosq_test.gen_connect("helper", proto_ver=5)
helper_connack = mosq_test.gen_connack(rc=0, proto_ver=5)

mid=1
props = mqtt5_props.gen_uint32_prop(mqtt5_props.PROP_MESSAGE_EXPIRY_INTERVAL, 1)
publish1s_packet = mosq_test.gen_publish("subpub/qos1", mid=mid, qos=1, payload="message1", proto_ver=5, properties=props)
puback1s_packet = mosq_test.gen_puback(mid)

mid=2
props = mqtt5_props.gen_uint32_prop(mqtt5_props.PROP_MESSAGE_EXPIRY_INTERVAL, 10)
publish2s_packet = mosq_test.gen_publish("subpub/qos1", mid=mid, qos=1, payload="message2", proto_ver=5, properties=props)
puback2s_packet = mosq_test.gen_puback(mid)


port = mosq_test.get_port()
broker = mosq_test.start_broker(filename=os.path.basename(__file__), port=port)

try:
sock = mosq_test.do_client_connect(connect_packet, connack1_packet, timeout=20, port=port)
mosq_test.do_send_receive(sock, subscribe_packet, suback_packet, "suback")
sock.close()

helper = mosq_test.do_client_connect(helper_connect, helper_connack, timeout=20, port=port)
mosq_test.do_send_receive(helper, publish1s_packet, puback1s_packet, "puback 1")
mosq_test.do_send_receive(helper, publish2s_packet, puback2s_packet, "puback 2")

time.sleep(2)

sock = mosq_test.do_client_connect(connect_packet, connack2_packet, timeout=20, port=port)
packet = sock.recv(len(publish2s_packet))
for i in range(9, 5, -1):
props = mqtt5_props.gen_uint32_prop(mqtt5_props.PROP_MESSAGE_EXPIRY_INTERVAL, i)
publish2r_packet = mosq_test.gen_publish("subpub/qos1", mid=2, qos=1, payload="message2", proto_ver=5, properties=props)
if packet == publish2r_packet:
rc = 0
break

sock.close()
finally:
broker.terminate()
broker.wait()
(stdo, stde) = broker.communicate()
if rc:
print(stde)

exit(rc)

1 change: 1 addition & 0 deletions test/broker/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ endif
./02-subpub-qos0-v5.py
./02-subpub-qos1-v5.py
./02-subpub-qos2-v5.py
./02-subpub-qos1-message-expiry.py
./02-subpub-qos1-nolocal.py
./02-subpub-qos0-retain-as-publish.py
./02-subpub-qos0-send-retain.py
Expand Down
1 change: 1 addition & 0 deletions test/broker/ptest.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
(1, './02-subpub-qos0-v5.py'),
(1, './02-subpub-qos1-v5.py'),
(1, './02-subpub-qos2-v5.py'),
(1, './02-subpub-qos1-message-expiry.py'),
(1, './02-subpub-qos1-nolocal.py'),
(1, './02-subpub-qos0-retain-as-publish.py'),
(1, './02-subpub-qos0-send-retain.py'),
Expand Down

0 comments on commit 6a59e92

Please sign in to comment.