Skip to content

Commit

Permalink
Persist message store publish properties.
Browse files Browse the repository at this point in the history
  • Loading branch information
ralight committed Mar 21, 2019
1 parent 2f15a7b commit 48253bc
Show file tree
Hide file tree
Showing 13 changed files with 151 additions and 17 deletions.
3 changes: 2 additions & 1 deletion src/persist.h
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ struct P_msg_store{
mosquitto__payload_uhpa payload;
struct mosquitto source;
char *topic;
mosquitto_property *properties;
};


Expand Down Expand Up @@ -140,7 +141,7 @@ int persist__chunk_header_read_v5(FILE *db_fptr, int *chunk, int *length);
int persist__chunk_cfg_read_v5(FILE *db_fptr, struct PF_cfg *chunk);
int persist__chunk_client_read_v5(FILE *db_fptr, struct P_client *chunk);
int persist__chunk_client_msg_read_v5(FILE *db_fptr, struct P_client_msg *chunk);
int persist__chunk_msg_store_read_v5(FILE *db_fptr, struct P_msg_store *chunk);
int persist__chunk_msg_store_read_v5(FILE *db_fptr, struct P_msg_store *chunk, uint32_t length);
int persist__chunk_retain_read_v5(FILE *db_fptr, struct P_retain *chunk);
int persist__chunk_sub_read_v5(FILE *db_fptr, struct P_sub *chunk);

Expand Down
8 changes: 4 additions & 4 deletions src/persist_read.c
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ static int persist__client_msg_chunk_restore(struct mosquitto_db *db, FILE *db_f
}


static int persist__msg_store_chunk_restore(struct mosquitto_db *db, FILE *db_fptr)
static int persist__msg_store_chunk_restore(struct mosquitto_db *db, FILE *db_fptr, uint32_t length)
{
struct P_msg_store chunk;
struct mosquitto_msg_store *stored = NULL;
Expand All @@ -232,7 +232,7 @@ static int persist__msg_store_chunk_restore(struct mosquitto_db *db, FILE *db_fp
memset(&chunk, 0, sizeof(struct P_msg_store));

if(db_version == 5){
rc = persist__chunk_msg_store_read_v5(db_fptr, &chunk);
rc = persist__chunk_msg_store_read_v5(db_fptr, &chunk, length);
}else{
rc = persist__chunk_msg_store_read_v234(db_fptr, &chunk, db_version);
}
Expand Down Expand Up @@ -262,7 +262,7 @@ static int persist__msg_store_chunk_restore(struct mosquitto_db *db, FILE *db_fp

rc = db__message_store(db, &chunk.source, chunk.F.source_mid,
chunk.topic, chunk.F.qos, chunk.F.payloadlen,
&chunk.payload, chunk.F.retain, &stored, 0, NULL, chunk.F.store_id);
&chunk.payload, chunk.F.retain, &stored, 0, chunk.properties, chunk.F.store_id);

mosquitto__free(chunk.source.id);
mosquitto__free(chunk.source.username);
Expand Down Expand Up @@ -421,7 +421,7 @@ int persist__restore(struct mosquitto_db *db)
break;

case DB_CHUNK_MSG_STORE:
if(persist__msg_store_chunk_restore(db, fptr)) return 1;
if(persist__msg_store_chunk_restore(db, fptr, length)) return 1;
break;

case DB_CHUNK_CLIENT_MSG:
Expand Down
31 changes: 30 additions & 1 deletion src/persist_read_v5.c
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,9 @@ and the Eclipse Distribution License is available at

#include "mosquitto_broker_internal.h"
#include "memory_mosq.h"
#include "mqtt_protocol.h"
#include "persist.h"
#include "property_mosq.h"
#include "time_mosq.h"
#include "util_mosq.h"

Expand Down Expand Up @@ -96,9 +98,11 @@ int persist__chunk_client_msg_read_v5(FILE *db_fptr, struct P_client_msg *chunk)
}


int persist__chunk_msg_store_read_v5(FILE *db_fptr, struct P_msg_store *chunk)
int persist__chunk_msg_store_read_v5(FILE *db_fptr, struct P_msg_store *chunk, uint32_t length)
{
int rc = 0;
mosquitto_property *properties = NULL;
struct mosquitto__packet prop_packet;

read_e(db_fptr, &chunk->F, sizeof(struct PF_msg_store));
chunk->F.payloadlen = ntohl(chunk->F.payloadlen);
Expand All @@ -108,6 +112,8 @@ int persist__chunk_msg_store_read_v5(FILE *db_fptr, struct P_msg_store *chunk)
chunk->F.topic_len = ntohs(chunk->F.topic_len);
chunk->F.source_port = ntohs(chunk->F.source_port);

length -= (sizeof(struct PF_msg_store) + chunk->F.payloadlen + chunk->F.source_id_len + chunk->F.source_username_len + chunk->F.topic_len);

if(chunk->F.source_id_len){
rc = persist__read_string_len(db_fptr, &chunk->source.id, chunk->F.source_id_len);
if(rc){
Expand Down Expand Up @@ -145,12 +151,35 @@ int persist__chunk_msg_store_read_v5(FILE *db_fptr, struct P_msg_store *chunk)
read_e(db_fptr, UHPA_ACCESS(chunk->payload, chunk->F.payloadlen), chunk->F.payloadlen);
}

if(length > 0){
memset(&prop_packet, 0, sizeof(struct mosquitto__packet));
prop_packet.remaining_length = length;
prop_packet.payload = mosquitto__malloc(length);
if(!prop_packet.payload){
mosquitto__free(chunk->source.id);
mosquitto__free(chunk->source.username);
mosquitto__free(chunk->topic);
return 1;
}
read_e(db_fptr, prop_packet.payload, length);
rc = property__read_all(CMD_PUBLISH, &prop_packet, &properties);
mosquitto__free(prop_packet.payload);
if(rc){
mosquitto__free(chunk->source.id);
mosquitto__free(chunk->source.username);
mosquitto__free(chunk->topic);
return rc;
}
}
chunk->properties = properties;

return MOSQ_ERR_SUCCESS;
error:
log__printf(NULL, MOSQ_LOG_ERR, "Error: %s.", strerror(errno));
mosquitto__free(chunk->source.id);
mosquitto__free(chunk->source.username);
mosquitto__free(chunk->topic);
mosquitto__free(prop_packet.payload);
return 1;
}

Expand Down
1 change: 1 addition & 0 deletions src/persist_write.c
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,7 @@ static int persist__message_store_save(struct mosquitto_db *db, FILE *db_fptr)
}
chunk.F.qos = stored->qos;
chunk.payload = stored->payload;
chunk.properties = stored->properties;

rc = persist__chunk_message_store_write_v5(db_fptr, &chunk);
if(rc){
Expand Down
29 changes: 28 additions & 1 deletion src/persist_write_v5.c
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ and the Eclipse Distribution License is available at
#include "mosquitto_broker_internal.h"
#include "memory_mosq.h"
#include "persist.h"
#include "packet_mosq.h"
#include "property_mosq.h"
#include "time_mosq.h"
#include "util_mosq.h"

Expand Down Expand Up @@ -104,6 +106,15 @@ int persist__chunk_message_store_write_v5(FILE *db_fptr, struct P_msg_store *chu
int source_id_len = chunk->F.source_id_len;
int source_username_len = chunk->F.source_username_len;
int topic_len = chunk->F.topic_len;
uint32_t proplen = 0;
struct mosquitto__packet prop_packet;
int rc;

memset(&prop_packet, 0, sizeof(struct mosquitto__packet));
if(chunk->properties){
proplen = property__get_length_all(chunk->properties);
proplen += packet__varint_bytes(proplen);
}

chunk->F.payloadlen = htonl(chunk->F.payloadlen);
chunk->F.source_mid = htons(chunk->F.source_mid);
Expand All @@ -115,7 +126,7 @@ int persist__chunk_message_store_write_v5(FILE *db_fptr, struct P_msg_store *chu
header.chunk = htonl(DB_CHUNK_MSG_STORE);
header.length = htonl(sizeof(struct PF_msg_store) +
topic_len + payloadlen +
source_id_len + source_username_len);
source_id_len + source_username_len + proplen);

write_e(db_fptr, &header, sizeof(struct PF_header));
write_e(db_fptr, &chunk->F, sizeof(struct PF_msg_store));
Expand All @@ -129,10 +140,26 @@ int persist__chunk_message_store_write_v5(FILE *db_fptr, struct P_msg_store *chu
if(payloadlen){
write_e(db_fptr, UHPA_ACCESS(chunk->payload, payloadlen), (unsigned int)payloadlen);
}
if(chunk->properties){
if(proplen > 0){
prop_packet.remaining_length = proplen;
prop_packet.packet_length = proplen;
prop_packet.payload = mosquitto__malloc(proplen);
if(!prop_packet.payload){
return MOSQ_ERR_NOMEM;
}
rc = property__write_all(&prop_packet, chunk->properties, true);
if(rc) return rc;

write_e(db_fptr, prop_packet.payload, proplen);
mosquitto__free(prop_packet.payload);
}
}

return MOSQ_ERR_SUCCESS;
error:
log__printf(NULL, MOSQ_LOG_ERR, "Error: %s.", strerror(errno));
mosquitto__free(prop_packet.payload);
return 1;
}

Expand Down
77 changes: 77 additions & 0 deletions test/broker/11-pub-props.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
#!/usr/bin/env python

# Does a persisted PUBLISH keep its properties?

from mosq_test_helper import *

def write_config(filename, port):
with open(filename, 'w') as f:
f.write("port %d\n" % (port))
f.write("persistence true\n")
f.write("persistence_file mosquitto-%d.db\n" % (port))

port = mosq_test.get_port()
conf_file = os.path.basename(__file__).replace('.py', '.conf')
write_config(conf_file, port)

rc = 1
keepalive = 60
connect_packet = mosq_test.gen_connect(
"persistent-props-test", keepalive=keepalive, clean_session=True, proto_ver=5
)
connack_packet = mosq_test.gen_connack(rc=0, proto_ver=5)

mid = 1
props = mqtt5_props.gen_byte_prop(mqtt5_props.PROP_PAYLOAD_FORMAT_INDICATOR, 1)
props += mqtt5_props.gen_string_prop(mqtt5_props.PROP_CONTENT_TYPE, "plain/text")
props += mqtt5_props.gen_string_prop(mqtt5_props.PROP_RESPONSE_TOPIC, "/dev/null")
props += mqtt5_props.gen_string_prop(mqtt5_props.PROP_CORRELATION_DATA, "2357289375902345")
props += mqtt5_props.gen_string_pair_prop(mqtt5_props.PROP_USER_PROPERTY, "name", "value")
publish_packet = mosq_test.gen_publish("subpub/qos1", qos=1, mid=mid, payload="message", proto_ver=5, properties=props, retain=True)
puback_packet = mosq_test.gen_puback(mid, reason_code=mqtt5_rc.MQTT_RC_NO_MATCHING_SUBSCRIBERS, proto_ver=5)

publish2_packet = mosq_test.gen_publish("subpub/qos1", qos=0, payload="message", proto_ver=5, properties=props, retain=True)

mid = 1
subscribe_packet = mosq_test.gen_subscribe(mid, "subpub/qos1", 0, proto_ver=5)
suback_packet = mosq_test.gen_suback(mid, 0, proto_ver=5)

if os.path.exists('mosquitto-%d.db' % (port)):
os.unlink('mosquitto-%d.db' % (port))

broker = mosq_test.start_broker(filename=os.path.basename(__file__), use_conf=True, port=port)

(stdo1, stde1) = ("", "")
try:
sock = mosq_test.do_client_connect(connect_packet, connack_packet, timeout=20, port=port)
mosq_test.do_send_receive(sock, publish_packet, puback_packet, "puback")
mosq_test.do_send_receive(sock, subscribe_packet, suback_packet, "suback")

if mosq_test.expect_packet(sock, "publish2", publish2_packet):

broker.terminate()
broker.wait()
(stdo1, stde1) = broker.communicate()
sock.close()
broker = mosq_test.start_broker(filename=os.path.basename(__file__), use_conf=True, port=port)

sock = mosq_test.do_client_connect(connect_packet, connack_packet, timeout=20, port=port)
mosq_test.do_send_receive(sock, subscribe_packet, suback_packet, "suback")

if mosq_test.expect_packet(sock, "publish2", publish2_packet):
rc = 0

sock.close()
finally:
os.remove(conf_file)
broker.terminate()
broker.wait()
(stdo, stde) = broker.communicate()
if rc:
print(stde1 + stde)
if os.path.exists('mosquitto-%d.db' % (port)):
os.unlink('mosquitto-%d.db' % (port))


exit(rc)

1 change: 1 addition & 0 deletions test/broker/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,7 @@ endif
./11-persistent-subscription.py
./11-persistent-subscription-v5.py
./11-persistent-subscription-no-local.py
./11-pub-props.py

12 :
./12-prop-assigned-client-identifier.py
Expand Down
1 change: 1 addition & 0 deletions test/broker/test.py
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@
(1, './11-persistent-subscription.py'),
(1, './11-persistent-subscription-v5.py'),
(1, './11-persistent-subscription-no-local.py'),
(1, './11-pub-props.py'),

(1, './12-prop-assigned-client-identifier.py'),
(1, './12-prop-maximum-packet-size-broker.py'),
Expand Down
6 changes: 6 additions & 0 deletions test/unit/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,12 @@ PERSIST_READ_TEST_OBJS = \

PERSIST_READ_OBJS = \
memory_mosq.o \
packet_datatypes.o \
persist_read.o \
persist_read_v234.o \
persist_read_v5.o \
property_mosq.o \
utf8_mosq.o \
util_mosq.o

PERSIST_WRITE_TEST_OBJS = \
Expand All @@ -40,12 +43,15 @@ PERSIST_WRITE_TEST_OBJS = \
PERSIST_WRITE_OBJS = \
database.o \
memory_mosq.o \
packet_datatypes.o \
persist_read.o \
persist_read_v234.o \
persist_read_v5.o \
persist_write.o \
persist_write_v5.o \
property_mosq.o \
subs.o \
utf8_mosq.o \
util_mosq.o

all : test
Expand Down
Binary file modified test/unit/files/persist_read/v5-message-store.test-db
Binary file not shown.
Binary file modified test/unit/files/persist_read/v5-retain.test-db
Binary file not shown.
9 changes: 0 additions & 9 deletions test/unit/persist_write_stubs.c
Original file line number Diff line number Diff line change
Expand Up @@ -38,15 +38,6 @@ int send__pingreq(struct mosquitto *mosq)
return MOSQ_ERR_SUCCESS;
}

int mosquitto_property_add_varint(mosquitto_property **proplist, int identifier, uint32_t value)
{
return MOSQ_ERR_SUCCESS;
}

void mosquitto_property_free_all(mosquitto_property **properties)
{
}

int mosquitto_acl_check(struct mosquitto_db *db, struct mosquitto *context, const char *topic, long payloadlen, void* payload, int qos, bool retain, int access)
{
return MOSQ_ERR_SUCCESS;
Expand Down
2 changes: 1 addition & 1 deletion test/unit/persist_write_test.c
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,7 @@ static void TEST_v5_sub(void)
CU_ASSERT_EQUAL(rc, MOSQ_ERR_SUCCESS);

CU_ASSERT_EQUAL(0, file_diff("files/persist_read/v5-sub.test-db", "v5-sub.db"));
//unlink("v5-sub.db");
unlink("v5-sub.db");
}


Expand Down

0 comments on commit 48253bc

Please sign in to comment.