Skip to content

Commit

Permalink
Message Expiry Interval support.
Browse files Browse the repository at this point in the history
  • Loading branch information
ralight committed Nov 1, 2018
1 parent f9e0fa2 commit 80f526a
Show file tree
Hide file tree
Showing 4 changed files with 27 additions and 6 deletions.
16 changes: 14 additions & 2 deletions src/database.c
Original file line number Diff line number Diff line change
Expand Up @@ -590,13 +590,13 @@ int db__messages_easy_queue(struct mosquitto_db *db, struct mosquitto *context,
local_properties = *properties;
*properties = NULL;
}
if(db__message_store(db, source_id, 0, topic_heap, qos, payloadlen, &payload_uhpa, retain, &stored, local_properties, 0)) return 1;
if(db__message_store(db, source_id, 0, topic_heap, qos, payloadlen, &payload_uhpa, retain, &stored, 0, local_properties, 0)) return 1;

return sub__messages_queue(db, source_id, topic_heap, qos, retain, &stored);
}

/* This function requires topic to be allocated on the heap. Once called, it owns topic and will free it on error. Likewise payload and properties. */
int db__message_store(struct mosquitto_db *db, const char *source, uint16_t source_mid, char *topic, int qos, uint32_t payloadlen, mosquitto__payload_uhpa *payload, int retain, struct mosquitto_msg_store **stored, struct mqtt5__property *properties, dbid_t store_id)
int db__message_store(struct mosquitto_db *db, const char *source, uint16_t source_mid, char *topic, int qos, uint32_t payloadlen, mosquitto__payload_uhpa *payload, int retain, struct mosquitto_msg_store **stored, uint32_t message_expiry_interval, struct mqtt5__property *properties, dbid_t store_id)
{
struct mosquitto_msg_store *temp = NULL;
int rc = MOSQ_ERR_SUCCESS;
Expand Down Expand Up @@ -638,6 +638,11 @@ int db__message_store(struct mosquitto_db *db, const char *source, uint16_t sour
}else{
temp->payload.ptr = NULL;
}
if(message_expiry_interval > 0){
temp->message_expiry_time = time(NULL) + message_expiry_interval;
}else{
temp->message_expiry_time = 0;
}

temp->dest_ids = NULL;
temp->dest_id_count = 0;
Expand Down Expand Up @@ -867,6 +872,7 @@ int db__message_write(struct mosquitto_db *db, struct mosquitto *context)
const void *payload;
int msg_count = 0;
struct mqtt5__property *properties;
time_t now;

if(!context || context->sock == INVALID_SOCKET
|| (context->state == mosq_cs_connected && !context->id)){
Expand All @@ -877,9 +883,15 @@ int db__message_write(struct mosquitto_db *db, struct mosquitto *context)
return MOSQ_ERR_SUCCESS;
}

now = time(NULL);
tail = context->inflight_msgs;
while(tail){
msg_count++;
if(tail->store->message_expiry_time && now > tail->store->message_expiry_time){
/* Message is expired, must not send. */
db__message_remove(db, context, &tail, last);
continue;
}
mid = tail->mid;
retries = tail->dup;
retain = tail->retain;
Expand Down
12 changes: 10 additions & 2 deletions src/handle_publish.c
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ int handle__publish(struct mosquitto_db *db, struct mosquitto *context)
struct mqtt5__property *properties = NULL;
struct mqtt5__property *p, *p_prev;
struct mqtt5__property *msg_properties = NULL, *msg_properties_last;
uint32_t message_expiry_interval = 0;

#ifdef WITH_BRIDGE
char *topic_temp;
Expand Down Expand Up @@ -182,6 +183,7 @@ int handle__publish(struct mosquitto_db *db, struct mosquitto *context)
break;

case MQTT_PROP_MESSAGE_EXPIRY_INTERVAL:
message_expiry_interval = p->value.i32;
p_prev = p;
p = p->next;
break;
Expand All @@ -206,6 +208,7 @@ int handle__publish(struct mosquitto_db *db, struct mosquitto *context)
topic_mount = mosquitto__malloc(len+1);
if(!topic_mount){
mosquitto__free(topic);
mosquitto_property_free_all(&msg_properties);
return MOSQ_ERR_NOMEM;
}
snprintf(topic_mount, len, "%s%s", context->listener->mount_point, topic);
Expand All @@ -222,11 +225,13 @@ int handle__publish(struct mosquitto_db *db, struct mosquitto *context)
}
if(UHPA_ALLOC(payload, payloadlen+1) == 0){
mosquitto__free(topic);
mosquitto_property_free_all(&msg_properties);
return MOSQ_ERR_NOMEM;
}
if(packet__read_bytes(&context->in_packet, UHPA_ACCESS(payload, payloadlen), payloadlen)){
mosquitto__free(topic);
UHPA_FREE(payload, payloadlen);
mosquitto_property_free_all(&msg_properties);
return 1;
}
}
Expand All @@ -239,6 +244,7 @@ int handle__publish(struct mosquitto_db *db, struct mosquitto *context)
}else if(rc != MOSQ_ERR_SUCCESS){
mosquitto__free(topic);
UHPA_FREE(payload, payloadlen);
mosquitto_property_free_all(&msg_properties);
return rc;
}

Expand All @@ -248,14 +254,16 @@ int handle__publish(struct mosquitto_db *db, struct mosquitto *context)
}
if(!stored){
dup = 0;
if(db__message_store(db, context->id, mid, topic, qos, payloadlen, &payload, retain, &stored, msg_properties, 0)){
if(db__message_store(db, context->id, mid, topic, qos, payloadlen, &payload, retain, &stored, message_expiry_interval, msg_properties, 0)){
mosquitto_property_free_all(&msg_properties);
return 1;
}
msg_properties = NULL; /* Now belongs to db__message_store() */
}else{
mosquitto__free(topic);
topic = stored->topic;
dup = 1;
mosquitto_property_free_all(&msg_properties);
}

switch(qos){
Expand Down Expand Up @@ -294,7 +302,7 @@ int handle__publish(struct mosquitto_db *db, struct mosquitto *context)
case 2:
db__message_store_find(context, mid, &stored);
if(!stored){
if(db__message_store(db, context->id, mid, NULL, qos, 0, NULL, false, &stored, NULL, 0)){
if(db__message_store(db, context->id, mid, NULL, qos, 0, NULL, false, &stored, 0, NULL, 0)){
return 1;
}
res = db__message_insert(db, context, mid, mosq_md_in, qos, false, stored);
Expand Down
3 changes: 2 additions & 1 deletion src/mosquitto_broker_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -317,6 +317,7 @@ struct mosquitto_msg_store{
char* topic;
struct mqtt5__property *properties;
mosquitto__payload_uhpa payload;
time_t message_expiry_time;
uint32_t payloadlen;
uint16_t source_mid;
uint16_t mid;
Expand Down Expand Up @@ -555,7 +556,7 @@ int db__message_write(struct mosquitto_db *db, struct mosquitto *context);
void db__message_dequeue_first(struct mosquitto *context);
int db__messages_delete(struct mosquitto_db *db, struct mosquitto *context);
int db__messages_easy_queue(struct mosquitto_db *db, struct mosquitto *context, const char *topic, int qos, uint32_t payloadlen, const void *payload, int retain, struct mqtt5__property **properties);
int db__message_store(struct mosquitto_db *db, const char *source, uint16_t source_mid, char *topic, int qos, uint32_t payloadlen, mosquitto__payload_uhpa *payload, int retain, struct mosquitto_msg_store **stored, struct mqtt5__property *properties, dbid_t store_id);
int db__message_store(struct mosquitto_db *db, const char *source, uint16_t source_mid, char *topic, int qos, uint32_t payloadlen, mosquitto__payload_uhpa *payload, int retain, struct mosquitto_msg_store **stored, uint32_t message_expiry_interval, struct mqtt5__property *properties, dbid_t store_id);
int db__message_store_find(struct mosquitto *context, uint16_t mid, struct mosquitto_msg_store **stored);
void db__msg_store_add(struct mosquitto_db *db, struct mosquitto_msg_store *store);
void db__msg_store_remove(struct mosquitto_db *db, struct mosquitto_msg_store *store);
Expand Down
2 changes: 1 addition & 1 deletion src/persist.c
Original file line number Diff line number Diff line change
Expand Up @@ -717,7 +717,7 @@ static int persist__msg_store_chunk_restore(struct mosquitto_db *db, FILE *db_fp
read_e(db_fptr, UHPA_ACCESS(payload, payloadlen), payloadlen);
}

rc = db__message_store(db, source_id, source_mid, topic, qos, payloadlen, &payload, retain, &stored, NULL, store_id);
rc = db__message_store(db, source_id, source_mid, topic, qos, payloadlen, &payload, retain, &stored, 0, NULL, store_id);
mosquitto__free(source_id);

if(rc == MOSQ_ERR_SUCCESS){
Expand Down

0 comments on commit 80f526a

Please sign in to comment.