diff --git a/lib/mosquitto.c b/lib/mosquitto.c index d813ddb..8bbd401 100755 --- a/lib/mosquitto.c +++ b/lib/mosquitto.c @@ -182,7 +182,8 @@ int mosquitto_reinitialise(struct mosquitto *mosq, const char *id, bool clean_se _mosquitto_packet_cleanup(&mosq->in_packet); mosq->out_packet = NULL; mosq->current_out_packet = NULL; - mosq->last_msg_in = mosquitto_time(); + mosq->BigFile = NULL; + mosq->last_msg_in = mosquitto_time(); mosq->last_msg_out = mosquitto_time(); mosq->ping_t = 0; mosq->last_mid = 0; diff --git a/lib/mosquitto_broker.h b/lib/mosquitto_broker.h index 7e7d853..bc87a06 100755 --- a/lib/mosquitto_broker.h +++ b/lib/mosquitto_broker.h @@ -362,6 +362,7 @@ int mqtt3_db_message_write(struct mosquitto *context); int mqtt3_db_messages_delete(struct mosquitto *context); int mqtt3_db_messages_easy_queue(struct mosquitto_db *db, struct mosquitto *context, const char *topic, int qos, uint32_t payloadlen, const void *payload, int retain); int mqtt3_db_messages_queue(struct mosquitto_db *db, const char *source_id, const char *topic, int qos, int retain, struct mosquitto_msg_store *stored); +int mqtt3_db_messages_subhier(struct mosquitto_db *db, const char *source_id, const char *topic, int qos, int retain, struct mosquitto_msg_store *stored); int mqtt3_db_message_store(struct mosquitto_db *db, const char *source, uint16_t source_mid, const char *topic, int qos, uint32_t payloadlen, const void *payload, int retain, struct mosquitto_msg_store **stored, dbid_t store_id); int mqtt3_db_message_store_find(struct mosquitto *context, uint16_t mid, struct mosquitto_msg_store **stored); /* Check all messages waiting on a client reply and resend if timeout has been exceeded. */ diff --git a/lib/mosquitto_internal.h b/lib/mosquitto_internal.h index 65953fb..e8d72eb 100755 --- a/lib/mosquitto_internal.h +++ b/lib/mosquitto_internal.h @@ -163,6 +163,7 @@ struct mosquitto { struct _mosquitto_packet in_packet;//received pakcet struct _mosquitto_packet *current_out_packet; struct _mosquitto_packet *out_packet; + struct _mosquitto_BigFile_msg *BigFile; //存储已发送的bigfile struct mosquitto_message *will; #ifdef WITH_TLS SSL *ssl; diff --git a/src/conf.c b/src/conf.c index 4591cae..0f5b435 100755 --- a/src/conf.c +++ b/src/conf.c @@ -214,6 +214,7 @@ void mqtt3_config_init(struct mqtt3_config *config) config->auth_plugin = NULL; config->verbose = false; config->message_size_limit = 0; + config->large_file_size_limit = 1024; //后面可以仿照message_size_limit来做,在配置文件中定义 } void mqtt3_config_cleanup(struct mqtt3_config *config) diff --git a/src/mosquitto_broker.h b/src/mosquitto_broker.h index 7e7d853..c8c6207 100755 --- a/src/mosquitto_broker.h +++ b/src/mosquitto_broker.h @@ -100,6 +100,7 @@ struct mqtt3_config { char *log_file; FILE *log_fptr; int message_size_limit; + int large_file_size_limit; char *password_file; bool persistence; char *persistence_location; @@ -131,12 +132,18 @@ struct _mosquitto_subleaf { int qos; }; +struct _mosquitto_BigFile_msg { + struct _mosquitto_BigFile_msg *next; + int mid; +}; + struct _mosquitto_subhier { struct _mosquitto_subhier *children; struct _mosquitto_subhier *next; struct _mosquitto_subleaf *subs; char *topic; - struct mosquitto_msg_store *retained; + struct mosquitto_msg_store *retained; //存储retain标记的消息 + struct mosquitto_msg_store *large_file; //存储大文件 }; struct mosquitto_msg_store{ @@ -362,6 +369,7 @@ int mqtt3_db_message_write(struct mosquitto *context); int mqtt3_db_messages_delete(struct mosquitto *context); int mqtt3_db_messages_easy_queue(struct mosquitto_db *db, struct mosquitto *context, const char *topic, int qos, uint32_t payloadlen, const void *payload, int retain); int mqtt3_db_messages_queue(struct mosquitto_db *db, const char *source_id, const char *topic, int qos, int retain, struct mosquitto_msg_store *stored); +int mqtt3_db_messages_subhier(struct mosquitto_db *db, const char *source_id, const char *topic, int qos, int retain, struct mosquitto_msg_store *stored); int mqtt3_db_message_store(struct mosquitto_db *db, const char *source, uint16_t source_mid, const char *topic, int qos, uint32_t payloadlen, const void *payload, int retain, struct mosquitto_msg_store **stored, dbid_t store_id); int mqtt3_db_message_store_find(struct mosquitto *context, uint16_t mid, struct mosquitto_msg_store **stored); /* Check all messages waiting on a client reply and resend if timeout has been exceeded. */ diff --git a/src/read_handle.c b/src/read_handle.c index ca46df8..04ee04c 100755 --- a/src/read_handle.c +++ b/src/read_handle.c @@ -98,6 +98,7 @@ int mqtt3_handle_publish(struct mosquitto_db *db, struct mosquitto *context) struct mosquitto_msg_store *stored = NULL; int len; char *topic_mount; + bool BigFile = false; #ifdef WITH_BRIDGE char *topic_temp; int i; @@ -206,7 +207,11 @@ int mqtt3_handle_publish(struct mosquitto_db *db, struct mosquitto *context) _mosquitto_log_printf(NULL, MOSQ_LOG_DEBUG, "Dropped too large PUBLISH from %s (d%d, q%d, r%d, m%d, '%s', ... (%ld bytes))", context->id, dup, qos, retain, mid, topic, (long)payloadlen); goto process_bad_message; } - printf("recieved payload:%d\n", payloadlen); + if(db->config->large_file_size_limit && payloadlen > db->config->large_file_size_limit){ + //存储到相应位置,做特殊处理 + BigFile = true; + } + printf("mqtt_handle_publish recieved payload:%d\n", payloadlen); payload = _mosquitto_calloc(payloadlen+1, sizeof(uint8_t)); if(!payload){ _mosquitto_free(topic); @@ -246,11 +251,21 @@ int mqtt3_handle_publish(struct mosquitto_db *db, struct mosquitto *context) } switch(qos){ case 0: - if(mqtt3_db_messages_queue(db, context->id, topic, qos, retain, stored)) rc = 1; + if(BigFile){//先用普通服务试一下 + retain = false; //Big File不允许被retain标记 + if(mqtt3_db_messages_subhier(db, context->id, topic, qos, retain, stored)) + rc = 1; + }else{ + + if(mqtt3_db_messages_queue(db, context->id, topic, qos, retain, stored)) + rc = 1; + } break; case 1: - if(mqtt3_db_messages_queue(db, context->id, topic, qos, retain, stored)) rc = 1; - if(_mosquitto_send_puback(context, mid)) rc = 1; + if(mqtt3_db_messages_queue(db, context->id, topic, qos, retain, stored)) + rc = 1; + if(_mosquitto_send_puback(context, mid)) + rc = 1; break; case 2: if(!dup){ diff --git a/src/subs.c b/src/subs.c index 8022232..e0de1a5 100755 --- a/src/subs.c +++ b/src/subs.c @@ -73,7 +73,7 @@ struct _sub_token { char *topic; }; -static int _subs_process(struct mosquitto_db *db, struct _mosquitto_subhier *hier, const char *source_id, const char *topic, int qos, int retain, struct mosquitto_msg_store *stored, bool set_retain) +static int _subs_process(struct mosquitto_db *db, struct _mosquitto_subhier *hier, const char *source_id, const char *topic, int qos, int retain, struct mosquitto_msg_store *stored, bool set_retain, bool BigFile) { int rc = 0; int rc2; @@ -81,9 +81,11 @@ static int _subs_process(struct mosquitto_db *db, struct _mosquitto_subhier *hie uint16_t mid; struct _mosquitto_subleaf *leaf; bool client_retain; + // if BigFile is true, it make sense, else useless + int packetlen; + struct _mosquitto_packet *packet = NULL; - leaf = hier->subs; - + leaf = hier->subs; if(retain && set_retain){ #ifdef WITH_PERSISTENCE if(strncmp(topic, "$SYS", 4)){ //证明这不是系统消息 @@ -108,7 +110,42 @@ static int _subs_process(struct mosquitto_db *db, struct _mosquitto_subhier *hie // printf("stored->msg.payloadlen is null\n"); } } - while(source_id && leaf){ + + if(BigFile){//先插入树结构中 + if(hier->large_file){ + hier->large_file->ref_count--; + } + if(stored->msg.payloadlen){ + hier->large_file->ref_count++; + hier->large_file = stored; + //初始化发送包信息 + packetlen = 2 + strlen(stored->msg.topic) + stored->msg.payloadlen; + packet = _mosquitto_calloc(1, sizeof(struct _mosquitto_packet)); + if(!packet) + return MOSQ_ERR_NOMEM; + packet->mid = 0; + packet->command = PUBLISH | ((false&0x1)<<3) | (0<<1) | 0;//false 对应 dup ,0 对应 qos , 0对应 retain + packet->remaining_length = packetlen; + rc = _mosquitto_packet_alloc(packet); + if(rc){ + _mosquitto_free(packet); + return rc; + } + _mosquitto_write_string(packet, stored->msg.topic, strlen(stored->msg.topic)); + /* Payload */ + if(payloadlen){ + _mosquitto_write_bytes(packet, stored->msg.payload, stored->msg.payloadlen); + } + packet->pos = 0; + packet->next = NULL; + packet->to_process = packet->packet_length; + + }else{ + hier->large_file = NULL; + } + } + + while(source_id && leaf){ //若没有订阅者,则这里不会执行 if(leaf->context->is_bridge && !strcmp(leaf->context->id, source_id)){ leaf = leaf->next; continue; @@ -131,7 +168,7 @@ static int _subs_process(struct mosquitto_db *db, struct _mosquitto_subhier *hie } } if(msg_qos){ - mid = _mosquitto_mid_generate(leaf->context); + mid = _mosquitto_mid_generate(leaf->context);//这个mid很有可能是发送包的ID }else{ mid = 0; } @@ -140,12 +177,46 @@ static int _subs_process(struct mosquitto_db *db, struct _mosquitto_subhier *hie * even if the message is fresh. If we don't do this, retained * messages won't be propagated. */ client_retain = retain; - }else{ + }else{//默认执行这个 /* Client is not a bridge and this isn't a stale message so * retain should be false. */ client_retain = false; } - if(mqtt3_db_message_insert(db, leaf->context, mid, mosq_md_out, msg_qos, client_retain, stored) == 1) rc = 1; + if(BigFile){//直接发送,不保存 + if(!leaf->context->BigFile){ + leaf->context->BigFile = _mosquitto_malloc( sizeof(struct _mosquitto_BigFile_msg) ); + leaf->context->BigFile->next = NULL; + leaf->context->BigFile->mid = stored->msg.mid; + }else { + leaf->context->BigFile->mid = stored->msg.mid; + } + //发送packet + // pthread_mutex_lock(&mosq->current_out_packet_mutex); 这里差一个锁机制,不知道可不可以用这个 + while(packet->to_process > 0){//发送一个包,可能包很长,一次没有发完 + write_length = _mosquitto_net_write(mosq, &(packet->payload[packet->pos]), packet->to_process); + if(write_length > 0){ + packet->to_process -= write_length; + packet->pos += write_length; + }else{ + if(errno == EAGAIN || errno == COMPAT_EWOULDBLOCK){ + // pthread_mutex_unlock(&mosq->current_out_packet_mutex); + return MOSQ_ERR_SUCCESS; + }else{ + // pthread_mutex_unlock(&mosq->current_out_packet_mutex); + switch(errno){ + case COMPAT_ECONNRESET: + return MOSQ_ERR_CONN_LOST; + default: + return MOSQ_ERR_ERRNO; + } + } + } + } + return 0; + }else { + if(mqtt3_db_message_insert(db, leaf->context, mid, mosq_md_out, msg_qos, client_retain, stored) == 1) + rc = 1; + } }else{ return 1; /* Application error */ } @@ -343,7 +414,7 @@ static int _sub_remove(struct mosquitto_db *db, struct mosquitto *context, struc return MOSQ_ERR_SUCCESS; } -static void _sub_search(struct mosquitto_db *db, struct _mosquitto_subhier *subhier, struct _sub_token *tokens, const char *source_id, const char *topic, int qos, int retain, struct mosquitto_msg_store *stored, bool set_retain) +static void _sub_search(struct mosquitto_db *db, struct _mosquitto_subhier *subhier, struct _sub_token *tokens, const char *source_id, const char *topic, int qos, int retain, struct mosquitto_msg_store *stored, bool set_retain, bool BigFile) { /* FIXME - need to take into account source_id if the client is a bridge */ struct _mosquitto_subhier *branch; @@ -360,16 +431,16 @@ static void _sub_search(struct mosquitto_db *db, struct _mosquitto_subhier *subh /* Don't set a retained message where + is in the hierarchy. */ sr = false; } - _sub_search(db, branch, tokens->next, source_id, topic, qos, retain, stored, sr); + _sub_search(db, branch, tokens->next, source_id, topic, qos, retain, stored, sr, BigFile); if(!tokens->next){ - _subs_process(db, branch, source_id, topic, qos, retain, stored, sr); + _subs_process(db, branch, source_id, topic, qos, retain, stored, sr, BigFile); } }else if(!strcmp(branch->topic, "#") && !branch->children){ /* The topic matches due to a # wildcard - process the * subscriptions but *don't* return. Although this branch has ended * there may still be other subscriptions to deal with. */ - _subs_process(db, branch, source_id, topic, qos, retain, stored, false); + _subs_process(db, branch, source_id, topic, qos, retain, stored, false, BigFile); } branch = branch->next; } @@ -478,9 +549,9 @@ int mqtt3_db_messages_queue(struct mosquitto_db *db, const char *source_id, cons /* We have a message that needs to be retained, so ensure that the subscription * tree for its topic exists. */ - _sub_add(db, NULL, 0, subhier, tokens); + _sub_add(db, NULL, 0, subhier, tokens); //存在则不处理,不存在则添加该主题:w } - _sub_search(db, subhier, tokens, source_id, topic, qos, retain, stored, true); + _sub_search(db, subhier, tokens, source_id, topic, qos, retain, stored, true, false); } subhier = subhier->next; } @@ -494,6 +565,40 @@ int mqtt3_db_messages_queue(struct mosquitto_db *db, const char *source_id, cons return rc; } +int mqtt3_db_messages_subhier(struct mosquitto_db *db, const char *source_id, const char *topic, int qos, int retain, struct mosquitto_msg_store *stored) +{ + int rc = 0; + struct _mosquitto_subhier *subhier; + struct _sub_token *tokens = NULL, *tail; + + assert(db); + assert(topic); + + if(_sub_topic_tokenise(topic, &tokens)) return 1; + + subhier = db->subs.children; + while(subhier){ + if(!strcmp(subhier->topic, tokens->topic)){ + if(retain){ + /* We have a message that needs to be retained, so ensure that the subscription + * tree for its topic exists. + */ + _sub_add(db, NULL, 0, subhier, tokens); //存在则不处理,不存在则添加该主题:w + } + _sub_search(db, subhier, tokens, source_id, topic, qos, retain, stored, true, true); + } + subhier = subhier->next; + } + while(tokens){ + tail = tokens->next; + _mosquitto_free(tokens->topic); + _mosquitto_free(tokens); + tokens = tail; + } + + return rc; +} + static int _subs_clean_session(struct mosquitto_db *db, struct mosquitto *context, struct _mosquitto_subhier *root) { int rc = 0;