Skip to content

Commit

Permalink
基本完成对大文件传输的主要逻辑:只在topic下储存一份,然后直接发送,各节点保存发送的mid号即可,还需添加上线时的检查,并且现有改动未做测试
Browse files Browse the repository at this point in the history
  • Loading branch information
Allanceng committed Dec 19, 2014
1 parent 391ef43 commit 2e59af9
Show file tree
Hide file tree
Showing 7 changed files with 151 additions and 19 deletions.
3 changes: 2 additions & 1 deletion lib/mosquitto.c
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,8 @@ int mosquitto_reinitialise(struct mosquitto *mosq, const char *id, bool clean_se
_mosquitto_packet_cleanup(&mosq->in_packet);
mosq->out_packet = NULL;
mosq->current_out_packet = NULL;
mosq->last_msg_in = mosquitto_time();
mosq->BigFile = NULL;
mosq->last_msg_in = mosquitto_time();
mosq->last_msg_out = mosquitto_time();
mosq->ping_t = 0;
mosq->last_mid = 0;
Expand Down
1 change: 1 addition & 0 deletions lib/mosquitto_broker.h
Original file line number Diff line number Diff line change
Expand Up @@ -362,6 +362,7 @@ int mqtt3_db_message_write(struct mosquitto *context);
int mqtt3_db_messages_delete(struct mosquitto *context);
int mqtt3_db_messages_easy_queue(struct mosquitto_db *db, struct mosquitto *context, const char *topic, int qos, uint32_t payloadlen, const void *payload, int retain);
int mqtt3_db_messages_queue(struct mosquitto_db *db, const char *source_id, const char *topic, int qos, int retain, struct mosquitto_msg_store *stored);
int mqtt3_db_messages_subhier(struct mosquitto_db *db, const char *source_id, const char *topic, int qos, int retain, struct mosquitto_msg_store *stored);
int mqtt3_db_message_store(struct mosquitto_db *db, const char *source, uint16_t source_mid, const char *topic, int qos, uint32_t payloadlen, const void *payload, int retain, struct mosquitto_msg_store **stored, dbid_t store_id);
int mqtt3_db_message_store_find(struct mosquitto *context, uint16_t mid, struct mosquitto_msg_store **stored);
/* Check all messages waiting on a client reply and resend if timeout has been exceeded. */
Expand Down
1 change: 1 addition & 0 deletions lib/mosquitto_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,7 @@ struct mosquitto {
struct _mosquitto_packet in_packet;//received pakcet
struct _mosquitto_packet *current_out_packet;
struct _mosquitto_packet *out_packet;
struct _mosquitto_BigFile_msg *BigFile; //存储已发送的bigfile
struct mosquitto_message *will;
#ifdef WITH_TLS
SSL *ssl;
Expand Down
1 change: 1 addition & 0 deletions src/conf.c
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,7 @@ void mqtt3_config_init(struct mqtt3_config *config)
config->auth_plugin = NULL;
config->verbose = false;
config->message_size_limit = 0;
config->large_file_size_limit = 1024; //后面可以仿照message_size_limit来做,在配置文件中定义
}

void mqtt3_config_cleanup(struct mqtt3_config *config)
Expand Down
10 changes: 9 additions & 1 deletion src/mosquitto_broker.h
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ struct mqtt3_config {
char *log_file;
FILE *log_fptr;
int message_size_limit;
int large_file_size_limit;
char *password_file;
bool persistence;
char *persistence_location;
Expand Down Expand Up @@ -131,12 +132,18 @@ struct _mosquitto_subleaf {
int qos;
};

struct _mosquitto_BigFile_msg {
struct _mosquitto_BigFile_msg *next;
int mid;
};

struct _mosquitto_subhier {
struct _mosquitto_subhier *children;
struct _mosquitto_subhier *next;
struct _mosquitto_subleaf *subs;
char *topic;
struct mosquitto_msg_store *retained;
struct mosquitto_msg_store *retained; //存储retain标记的消息
struct mosquitto_msg_store *large_file; //存储大文件
};

struct mosquitto_msg_store{
Expand Down Expand Up @@ -362,6 +369,7 @@ int mqtt3_db_message_write(struct mosquitto *context);
int mqtt3_db_messages_delete(struct mosquitto *context);
int mqtt3_db_messages_easy_queue(struct mosquitto_db *db, struct mosquitto *context, const char *topic, int qos, uint32_t payloadlen, const void *payload, int retain);
int mqtt3_db_messages_queue(struct mosquitto_db *db, const char *source_id, const char *topic, int qos, int retain, struct mosquitto_msg_store *stored);
int mqtt3_db_messages_subhier(struct mosquitto_db *db, const char *source_id, const char *topic, int qos, int retain, struct mosquitto_msg_store *stored);
int mqtt3_db_message_store(struct mosquitto_db *db, const char *source, uint16_t source_mid, const char *topic, int qos, uint32_t payloadlen, const void *payload, int retain, struct mosquitto_msg_store **stored, dbid_t store_id);
int mqtt3_db_message_store_find(struct mosquitto *context, uint16_t mid, struct mosquitto_msg_store **stored);
/* Check all messages waiting on a client reply and resend if timeout has been exceeded. */
Expand Down
23 changes: 19 additions & 4 deletions src/read_handle.c
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ int mqtt3_handle_publish(struct mosquitto_db *db, struct mosquitto *context)
struct mosquitto_msg_store *stored = NULL;
int len;
char *topic_mount;
bool BigFile = false;
#ifdef WITH_BRIDGE
char *topic_temp;
int i;
Expand Down Expand Up @@ -206,7 +207,11 @@ int mqtt3_handle_publish(struct mosquitto_db *db, struct mosquitto *context)
_mosquitto_log_printf(NULL, MOSQ_LOG_DEBUG, "Dropped too large PUBLISH from %s (d%d, q%d, r%d, m%d, '%s', ... (%ld bytes))", context->id, dup, qos, retain, mid, topic, (long)payloadlen);
goto process_bad_message;
}
printf("recieved payload:%d\n", payloadlen);
if(db->config->large_file_size_limit && payloadlen > db->config->large_file_size_limit){
//存储到相应位置,做特殊处理
BigFile = true;
}
printf("mqtt_handle_publish recieved payload:%d\n", payloadlen);
payload = _mosquitto_calloc(payloadlen+1, sizeof(uint8_t));
if(!payload){
_mosquitto_free(topic);
Expand Down Expand Up @@ -246,11 +251,21 @@ int mqtt3_handle_publish(struct mosquitto_db *db, struct mosquitto *context)
}
switch(qos){
case 0:
if(mqtt3_db_messages_queue(db, context->id, topic, qos, retain, stored)) rc = 1;
if(BigFile){//先用普通服务试一下
retain = false; //Big File不允许被retain标记
if(mqtt3_db_messages_subhier(db, context->id, topic, qos, retain, stored))
rc = 1;
}else{

if(mqtt3_db_messages_queue(db, context->id, topic, qos, retain, stored))
rc = 1;
}
break;
case 1:
if(mqtt3_db_messages_queue(db, context->id, topic, qos, retain, stored)) rc = 1;
if(_mosquitto_send_puback(context, mid)) rc = 1;
if(mqtt3_db_messages_queue(db, context->id, topic, qos, retain, stored))
rc = 1;
if(_mosquitto_send_puback(context, mid))
rc = 1;
break;
case 2:
if(!dup){
Expand Down
131 changes: 118 additions & 13 deletions src/subs.c
Original file line number Diff line number Diff line change
Expand Up @@ -73,17 +73,19 @@ struct _sub_token {
char *topic;
};

static int _subs_process(struct mosquitto_db *db, struct _mosquitto_subhier *hier, const char *source_id, const char *topic, int qos, int retain, struct mosquitto_msg_store *stored, bool set_retain)
static int _subs_process(struct mosquitto_db *db, struct _mosquitto_subhier *hier, const char *source_id, const char *topic, int qos, int retain, struct mosquitto_msg_store *stored, bool set_retain, bool BigFile)
{
int rc = 0;
int rc2;
int client_qos, msg_qos;
uint16_t mid;
struct _mosquitto_subleaf *leaf;
bool client_retain;
// if BigFile is true, it make sense, else useless
int packetlen;
struct _mosquitto_packet *packet = NULL;

leaf = hier->subs;

leaf = hier->subs;
if(retain && set_retain){
#ifdef WITH_PERSISTENCE
if(strncmp(topic, "$SYS", 4)){ //证明这不是系统消息
Expand All @@ -108,7 +110,42 @@ static int _subs_process(struct mosquitto_db *db, struct _mosquitto_subhier *hie
// printf("stored->msg.payloadlen is null\n");
}
}
while(source_id && leaf){

if(BigFile){//先插入树结构中
if(hier->large_file){
hier->large_file->ref_count--;
}
if(stored->msg.payloadlen){
hier->large_file->ref_count++;
hier->large_file = stored;
//初始化发送包信息
packetlen = 2 + strlen(stored->msg.topic) + stored->msg.payloadlen;
packet = _mosquitto_calloc(1, sizeof(struct _mosquitto_packet));
if(!packet)
return MOSQ_ERR_NOMEM;
packet->mid = 0;
packet->command = PUBLISH | ((false&0x1)<<3) | (0<<1) | 0;//false 对应 dup ,0 对应 qos , 0对应 retain
packet->remaining_length = packetlen;
rc = _mosquitto_packet_alloc(packet);
if(rc){
_mosquitto_free(packet);
return rc;
}
_mosquitto_write_string(packet, stored->msg.topic, strlen(stored->msg.topic));
/* Payload */
if(payloadlen){
_mosquitto_write_bytes(packet, stored->msg.payload, stored->msg.payloadlen);
}
packet->pos = 0;
packet->next = NULL;
packet->to_process = packet->packet_length;

}else{
hier->large_file = NULL;
}
}

while(source_id && leaf){ //若没有订阅者,则这里不会执行
if(leaf->context->is_bridge && !strcmp(leaf->context->id, source_id)){
leaf = leaf->next;
continue;
Expand All @@ -131,7 +168,7 @@ static int _subs_process(struct mosquitto_db *db, struct _mosquitto_subhier *hie
}
}
if(msg_qos){
mid = _mosquitto_mid_generate(leaf->context);
mid = _mosquitto_mid_generate(leaf->context);//这个mid很有可能是发送包的ID
}else{
mid = 0;
}
Expand All @@ -140,12 +177,46 @@ static int _subs_process(struct mosquitto_db *db, struct _mosquitto_subhier *hie
* even if the message is fresh. If we don't do this, retained
* messages won't be propagated. */
client_retain = retain;
}else{
}else{//默认执行这个
/* Client is not a bridge and this isn't a stale message so
* retain should be false. */
client_retain = false;
}
if(mqtt3_db_message_insert(db, leaf->context, mid, mosq_md_out, msg_qos, client_retain, stored) == 1) rc = 1;
if(BigFile){//直接发送,不保存
if(!leaf->context->BigFile){
leaf->context->BigFile = _mosquitto_malloc( sizeof(struct _mosquitto_BigFile_msg) );
leaf->context->BigFile->next = NULL;
leaf->context->BigFile->mid = stored->msg.mid;
}else {
leaf->context->BigFile->mid = stored->msg.mid;
}
//发送packet
// pthread_mutex_lock(&mosq->current_out_packet_mutex); 这里差一个锁机制,不知道可不可以用这个
while(packet->to_process > 0){//发送一个包,可能包很长,一次没有发完
write_length = _mosquitto_net_write(mosq, &(packet->payload[packet->pos]), packet->to_process);
if(write_length > 0){
packet->to_process -= write_length;
packet->pos += write_length;
}else{
if(errno == EAGAIN || errno == COMPAT_EWOULDBLOCK){
// pthread_mutex_unlock(&mosq->current_out_packet_mutex);
return MOSQ_ERR_SUCCESS;
}else{
// pthread_mutex_unlock(&mosq->current_out_packet_mutex);
switch(errno){
case COMPAT_ECONNRESET:
return MOSQ_ERR_CONN_LOST;
default:
return MOSQ_ERR_ERRNO;
}
}
}
}
return 0;
}else {
if(mqtt3_db_message_insert(db, leaf->context, mid, mosq_md_out, msg_qos, client_retain, stored) == 1)
rc = 1;
}
}else{
return 1; /* Application error */
}
Expand Down Expand Up @@ -343,7 +414,7 @@ static int _sub_remove(struct mosquitto_db *db, struct mosquitto *context, struc
return MOSQ_ERR_SUCCESS;
}

static void _sub_search(struct mosquitto_db *db, struct _mosquitto_subhier *subhier, struct _sub_token *tokens, const char *source_id, const char *topic, int qos, int retain, struct mosquitto_msg_store *stored, bool set_retain)
static void _sub_search(struct mosquitto_db *db, struct _mosquitto_subhier *subhier, struct _sub_token *tokens, const char *source_id, const char *topic, int qos, int retain, struct mosquitto_msg_store *stored, bool set_retain, bool BigFile)
{
/* FIXME - need to take into account source_id if the client is a bridge */
struct _mosquitto_subhier *branch;
Expand All @@ -360,16 +431,16 @@ static void _sub_search(struct mosquitto_db *db, struct _mosquitto_subhier *subh
/* Don't set a retained message where + is in the hierarchy. */
sr = false;
}
_sub_search(db, branch, tokens->next, source_id, topic, qos, retain, stored, sr);
_sub_search(db, branch, tokens->next, source_id, topic, qos, retain, stored, sr, BigFile);
if(!tokens->next){
_subs_process(db, branch, source_id, topic, qos, retain, stored, sr);
_subs_process(db, branch, source_id, topic, qos, retain, stored, sr, BigFile);
}
}else if(!strcmp(branch->topic, "#") && !branch->children){
/* The topic matches due to a # wildcard - process the
* subscriptions but *don't* return. Although this branch has ended
* there may still be other subscriptions to deal with.
*/
_subs_process(db, branch, source_id, topic, qos, retain, stored, false);
_subs_process(db, branch, source_id, topic, qos, retain, stored, false, BigFile);
}
branch = branch->next;
}
Expand Down Expand Up @@ -478,9 +549,9 @@ int mqtt3_db_messages_queue(struct mosquitto_db *db, const char *source_id, cons
/* We have a message that needs to be retained, so ensure that the subscription
* tree for its topic exists.
*/
_sub_add(db, NULL, 0, subhier, tokens);
_sub_add(db, NULL, 0, subhier, tokens); //存在则不处理,不存在则添加该主题:w
}
_sub_search(db, subhier, tokens, source_id, topic, qos, retain, stored, true);
_sub_search(db, subhier, tokens, source_id, topic, qos, retain, stored, true, false);
}
subhier = subhier->next;
}
Expand All @@ -494,6 +565,40 @@ int mqtt3_db_messages_queue(struct mosquitto_db *db, const char *source_id, cons
return rc;
}

int mqtt3_db_messages_subhier(struct mosquitto_db *db, const char *source_id, const char *topic, int qos, int retain, struct mosquitto_msg_store *stored)
{
int rc = 0;
struct _mosquitto_subhier *subhier;
struct _sub_token *tokens = NULL, *tail;

assert(db);
assert(topic);

if(_sub_topic_tokenise(topic, &tokens)) return 1;

subhier = db->subs.children;
while(subhier){
if(!strcmp(subhier->topic, tokens->topic)){
if(retain){
/* We have a message that needs to be retained, so ensure that the subscription
* tree for its topic exists.
*/
_sub_add(db, NULL, 0, subhier, tokens); //存在则不处理,不存在则添加该主题:w
}
_sub_search(db, subhier, tokens, source_id, topic, qos, retain, stored, true, true);
}
subhier = subhier->next;
}
while(tokens){
tail = tokens->next;
_mosquitto_free(tokens->topic);
_mosquitto_free(tokens);
tokens = tail;
}

return rc;
}

static int _subs_clean_session(struct mosquitto_db *db, struct mosquitto *context, struct _mosquitto_subhier *root)
{
int rc = 0;
Expand Down

0 comments on commit 2e59af9

Please sign in to comment.