Skip to content

Commit

Permalink
Session expiry interval support - not working for file persistence.
Browse files Browse the repository at this point in the history
  • Loading branch information
ralight committed Mar 6, 2019
1 parent 12431d7 commit ac91144
Show file tree
Hide file tree
Showing 14 changed files with 264 additions and 33 deletions.
8 changes: 8 additions & 0 deletions lib/mosquitto_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,12 @@ struct mosquitto__alias{
uint16_t alias;
};

struct session_expiry_list {
struct mosquitto *context;
struct session_expiry_list *prev;
struct session_expiry_list *next;
};

struct mosquitto__packet{
uint8_t *payload;
struct mosquitto__packet *next;
Expand Down Expand Up @@ -233,6 +239,7 @@ struct mosquitto {
#endif
bool clean_start;
uint32_t session_expiry_interval;
time_t session_expiry_time;
#ifdef WITH_BROKER
bool removed_from_by_id; /* True if removed from by_id hash */
bool is_dropping;
Expand Down Expand Up @@ -314,6 +321,7 @@ struct mosquitto {
UT_hash_handle hh_id;
UT_hash_handle hh_sock;
struct mosquitto *for_free_next;
struct session_expiry_list *expiry_list_item;
#endif
#ifdef WITH_EPOLL
uint32_t events;
Expand Down
3 changes: 2 additions & 1 deletion src/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ set (MOSQ_SRCS
../lib/property_mosq.c ../lib/property_mosq.h
read_handle.c
../lib/read_handle.h
subs.c
security.c security_default.c
../lib/send_mosq.c ../lib/send_mosq.h
send_connack.c
Expand All @@ -48,6 +47,8 @@ set (MOSQ_SRCS
../lib/send_subscribe.c
send_unsuback.c
../lib/send_unsubscribe.c
session_expiry.c
subs.c
sys_tree.c sys_tree.h
../lib/time_mosq.c
../lib/tls_mosq.c
Expand Down
4 changes: 4 additions & 0 deletions src/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ OBJS= mosquitto.o \
send_unsuback.o \
send_unsubscribe.o \
service.o \
session_expiry.o \
signals.o \
subs.o \
sys_tree.o \
Expand Down Expand Up @@ -195,6 +196,9 @@ send_unsubscribe.o : ../lib/send_unsubscribe.c ../lib/send_mosq.h
service.o : service.c mosquitto_broker_internal.h
${CROSS_COMPILE}${CC} $(BROKER_CFLAGS) -c $< -o $@

session_expiry.o : session_expiry.c mosquitto_broker_internal.h
${CROSS_COMPILE}${CC} $(BROKER_CFLAGS) -c $< -o $@

signals.o : signals.c mosquitto_broker_internal.h
${CROSS_COMPILE}${CC} $(BROKER_CFLAGS) -c $< -o $@

Expand Down
29 changes: 25 additions & 4 deletions src/context.c
Original file line number Diff line number Diff line change
Expand Up @@ -249,12 +249,33 @@ void context__send_will(struct mosquitto_db *db, struct mosquitto *ctxt)
}


void context__disconnect(struct mosquitto_db *db, struct mosquitto *ctxt)
void context__disconnect(struct mosquitto_db *db, struct mosquitto *context)
{
context__send_will(db, ctxt);
if(context->session_expiry_interval == 0){
context__send_will(db, context);

ctxt->disconnect_t = time(NULL);
net__socket_close(db, ctxt);
context->disconnect_t = time(NULL);
net__socket_close(db, context);

#ifdef WITH_BRIDGE
if(!context->bridge)
#endif
{

if(context->will_delay_interval == 0){
/* This will be done later, after the will is published */
context__add_to_disused(db, context);
if(context->id){
context__remove_from_by_id(db, context);
mosquitto__free(context->id);
context->id = NULL;
}
}
}
}else{
session_expiry__add(context);
}
context->state = mosq_cs_disconnected;
}

void context__add_to_disused(struct mosquitto_db *db, struct mosquitto *context)
Expand Down
20 changes: 14 additions & 6 deletions src/handle_connect.c
Original file line number Diff line number Diff line change
Expand Up @@ -313,6 +313,13 @@ int handle__connect(struct mosquitto_db *db, struct mosquitto *context)
}

clean_start = (connect_flags & 0x02) >> 1;
/* session_expiry_interval will be overriden if the properties are read later */
if(clean_start == false && protocol_version != PROTOCOL_VERSION_v5){
/* v3* has clean_start == false mean the session never expires */
context->session_expiry_interval = UINT32_MAX;
}else{
context->session_expiry_interval = 0;
}
will = connect_flags & 0x04;
will_qos = (connect_flags & 0x18) >> 3;
if(will_qos == 3){
Expand Down Expand Up @@ -630,15 +637,13 @@ int handle__connect(struct mosquitto_db *db, struct mosquitto *context)
}
}

if(context->protocol == mosq_p_mqtt311 || context->protocol == mosq_p_mqtt5){
if(clean_start == 0){
context->clean_start = clean_start;

if(context->clean_start == false && found_context->session_expiry_interval > 0){
if(context->protocol == mosq_p_mqtt311 || context->protocol == mosq_p_mqtt5){
connect_ack |= 0x01;
}
}

context->clean_start = clean_start;

if(context->clean_start == false && found_context->clean_start == false){
if(found_context->inflight_msgs || found_context->queued_msgs){
context->inflight_msgs = found_context->inflight_msgs;
context->queued_msgs = found_context->queued_msgs;
Expand All @@ -665,7 +670,10 @@ int handle__connect(struct mosquitto_db *db, struct mosquitto *context)
}
}

session_expiry__remove(found_context);

found_context->clean_start = true;
found_context->session_expiry_interval = 0;
found_context->state = mosq_cs_duplicate;
do_disconnect(db, found_context);
}
Expand Down
24 changes: 5 additions & 19 deletions src/loop.c
Original file line number Diff line number Diff line change
Expand Up @@ -472,7 +472,7 @@ int mosquitto_main_loop(struct mosquitto_db *db, mosq_sock_t *listensock, int li
now_time = time(NULL);
if(db->config->persistent_client_expiration > 0 && now_time > expiration_check_time){
HASH_ITER(hh_id, db->contexts_by_id, context, ctxt_tmp){
if(context->sock == INVALID_SOCKET && context->clean_start == 0){
if(context->sock == INVALID_SOCKET && context->session_expiry_interval > 0){
/* This is a persistent client, check to see if the
* last time it connected was longer than
* persistent_client_expiration seconds ago. If so,
Expand All @@ -486,7 +486,7 @@ int mosquitto_main_loop(struct mosquitto_db *db, mosq_sock_t *listensock, int li
}
log__printf(NULL, MOSQ_LOG_NOTICE, "Expiring persistent client %s due to timeout.", id);
G_CLIENTS_EXPIRED_INC();
context->clean_start = true;
context->session_expiry_interval = 0;
context->state = mosq_cs_expiring;
do_disconnect(db, context);
}
Expand Down Expand Up @@ -555,7 +555,9 @@ int mosquitto_main_loop(struct mosquitto_db *db, mosq_sock_t *listensock, int li
}
}
#endif
will_delay__check(db, time(NULL));
now = time(NULL);
session_expiry__check(db, now);
will_delay__check(db, now);
#ifdef WITH_PERSISTENCE
if(db->config->persistence && db->config->autosave_interval){
if(db->config->autosave_on_changes){
Expand Down Expand Up @@ -669,22 +671,6 @@ void do_disconnect(struct mosquitto_db *db, struct mosquitto *context)
}
#endif
context__disconnect(db, context);
#ifdef WITH_BRIDGE
if(context->clean_start && !context->bridge){
#else
if(context->clean_start){
#endif
if(context->will_delay_interval == 0){
/* This will be done later, after the will is published */
context__add_to_disused(db, context);
if(context->id){
context__remove_from_by_id(db, context);
mosquitto__free(context->id);
context->id = NULL;
}
}
}
context->state = mosq_cs_disconnected;
}
}

Expand Down
8 changes: 8 additions & 0 deletions src/mosquitto_broker_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -657,6 +657,14 @@ int mosquitto_acl_check_default(struct mosquitto_db *db, struct mosquitto *conte
int mosquitto_unpwd_check_default(struct mosquitto_db *db, struct mosquitto *context, const char *username, const char *password);
int mosquitto_psk_key_get_default(struct mosquitto_db *db, struct mosquitto *context, const char *hint, const char *identity, char *key, int max_key_len);

/* ============================================================
* Session expiry
* ============================================================ */
int session_expiry__add(struct mosquitto *context);
void session_expiry__remove(struct mosquitto *context);
void session_expiry__check(struct mosquitto_db *db, time_t now);
void session_expiry__send_all(struct mosquitto_db *db);

/* ============================================================
* Window service and signal related functions
* ============================================================ */
Expand Down
103 changes: 103 additions & 0 deletions src/session_expiry.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
/*
Copyright (c) 2019 Roger Light <[email protected]>
All rights reserved. This program and the accompanying materials
are made available under the terms of the Eclipse Public License v1.0
and Eclipse Distribution License v1.0 which accompany this distribution.
The Eclipse Public License is available at
http:https://www.eclipse.org/legal/epl-v10.html
and the Eclipse Distribution License is available at
http:https://www.eclipse.org/org/documents/edl-v10.php.
Contributors:
Roger Light - initial implementation and documentation.
*/

#include "config.h"

#include <math.h>
#include <stdio.h>
#include <utlist.h>

#include "mosquitto_broker_internal.h"
#include "memory_mosq.h"
#include "time_mosq.h"

static struct session_expiry_list *expiry_list = NULL;
static time_t last_check = 0;


static int session_expiry__cmp(struct session_expiry_list *i1, struct session_expiry_list *i2)
{
return i1->context->session_expiry_interval - i2->context->session_expiry_interval;
}


int session_expiry__add(struct mosquitto *context)
{
struct session_expiry_list *item;

item = mosquitto__calloc(1, sizeof(struct session_expiry_list));
if(!item) return MOSQ_ERR_NOMEM;

item->context = context;
item->context->session_expiry_time = time(NULL) + item->context->session_expiry_interval;
context->expiry_list_item = item;

DL_INSERT_INORDER(expiry_list, item, session_expiry__cmp);

return MOSQ_ERR_SUCCESS;
}


void session_expiry__remove(struct mosquitto *context)
{
if(context->expiry_list_item){
DL_DELETE(expiry_list, context->expiry_list_item);
mosquitto__free(context->expiry_list_item);
context->expiry_list_item = NULL;
}
}


/* Call on broker shutdown only */
void session_expiry__remove_all(struct mosquitto_db *db)
{
struct session_expiry_list *item, *tmp;
struct mosquitto *context;

DL_FOREACH_SAFE(expiry_list, item, tmp){
context = item->context;
session_expiry__remove(context);
context->session_expiry_interval = 0;
context->will_delay_interval = 0;
context__send_will(db, context);
}

}

void session_expiry__check(struct mosquitto_db *db, time_t now)
{
struct session_expiry_list *item, *tmp;
struct mosquitto *context;

if(now <= last_check) return;

last_check = now;

DL_FOREACH_SAFE(expiry_list, item, tmp){
if(item->context->session_expiry_time < now){
context = item->context;
session_expiry__remove(context);

context->session_expiry_interval = 0;
context__send_will(db, context);
context__add_to_disused(db, context);
}else{
return;
}
}

}

3 changes: 2 additions & 1 deletion test/broker/02-subpub-qos1-message-expiry-will.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@
rc = 1
mid = 53
keepalive = 60
connect_packet = mosq_test.gen_connect("subpub-qos0-test", keepalive=keepalive, proto_ver=5, clean_session=False)
props = mqtt5_props.gen_uint32_prop(mqtt5_props.PROP_SESSION_EXPIRY_INTERVAL, 60)
connect_packet = mosq_test.gen_connect("subpub-qos0-test", keepalive=keepalive, proto_ver=5, clean_session=False, properties=props)
connack1_packet = mosq_test.gen_connack(rc=0, proto_ver=5)
connack2_packet = mosq_test.gen_connack(rc=0, proto_ver=5, flags=1)

Expand Down
3 changes: 2 additions & 1 deletion test/broker/02-subpub-qos1-message-expiry.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@
rc = 1
mid = 53
keepalive = 60
connect_packet = mosq_test.gen_connect("subpub-qos0-test", keepalive=keepalive, proto_ver=5, clean_session=False)
props = mqtt5_props.gen_uint32_prop(mqtt5_props.PROP_SESSION_EXPIRY_INTERVAL, 60)
connect_packet = mosq_test.gen_connect("subpub-qos0-test", keepalive=keepalive, proto_ver=5, clean_session=False, properties=props)
connack1_packet = mosq_test.gen_connack(rc=0, proto_ver=5)
connack2_packet = mosq_test.gen_connack(rc=0, proto_ver=5, flags=1)

Expand Down
Loading

0 comments on commit ac91144

Please sign in to comment.