Skip to content

Commit

Permalink
在对大数据传输的特殊处理后完成测试,发现server端会崩溃,通过反复对比正常的Log和
Browse files Browse the repository at this point in the history
逐步添加打印的方式找到存在空指针的位置,修复之。再次测试,发现不能传输文件。后来发现
在src/subs.c的_sub_process函数中遇到阻塞就返回导致传输文件未完成。进而分析出多个客户端
时mosquitto发送的方式。去掉此处的return即可完成传输。但试验中仍发现一个问题:平板接收时,
server端持续发送,而其发送剩余量却不改变。这个问题在手机上没发现,而在平板上出现,具体原因
不详。对当前传输的方式仍有待进一步优化,详看我的图书馆
  • Loading branch information
Allanceng committed Dec 30, 2014
1 parent 553ac7e commit 809274e
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 17 deletions.
13 changes: 6 additions & 7 deletions lib/net_mosq.c
Original file line number Diff line number Diff line change
Expand Up @@ -874,13 +874,13 @@ int _mosquitto_packet_read(struct mosquitto *mosq)
if(!mosq->in_packet.have_remaining){
do{
read_length = _mosquitto_net_read(mosq, &byte, 1);
printf("read_length:%d\n", read_length);
//printf("read_length:%d\n", read_length);
if(read_length == 1){
mosq->in_packet.remaining_count++; //后面调用了_mosquitto_packet_cleanup将这个值清零
/* Max 4 bytes length for remaining length as defined by protocol.
* Anything more likely means a broken/malicious client.
*/
if(mosq->in_packet.remaining_count > 4) return MOSQ_ERR_PROTOCOL;
if(mosq->in_packet.remaining_count > 4) return MOSQ_ERR_PROTOCOL; //MQTT协议只规定了4个字节保存其长度
#if defined(WITH_BROKER) && defined(WITH_SYS_TREE)
g_bytes_received++;
#endif
Expand All @@ -903,17 +903,16 @@ int _mosquitto_packet_read(struct mosquitto *mosq)
}
}
}while((byte & 128) != 0);

if(mosq->in_packet.remaining_length > 0){
mosq->in_packet.payload = _mosquitto_malloc(mosq->in_packet.remaining_length*sizeof(uint8_t));
if(!mosq->in_packet.payload) return MOSQ_ERR_NOMEM;
mosq->in_packet.to_process = mosq->in_packet.remaining_length;
}
mosq->in_packet.have_remaining = 1;
}
while(mosq->in_packet.to_process>0){
while(mosq->in_packet.to_process>0){ //前面读取的只是长度,这里根据长度读取后面真正的数据
read_length = _mosquitto_net_read(mosq, &(mosq->in_packet.payload[mosq->in_packet.pos]), mosq->in_packet.to_process);
if(read_length > 0){
if(read_length > 0){
#if defined(WITH_BROKER) && defined(WITH_SYS_TREE)
g_bytes_received += read_length;
#endif
Expand All @@ -930,6 +929,7 @@ int _mosquitto_packet_read(struct mosquitto *mosq)
* This is an arbitrary limit, but with some consideration.
* If a client can't send 1000 bytes in a second it
* probably shouldn't be using a 1 second keep alive. */
printf("阻塞:mosq->in_packet.to_process:%d\n", mosq->in_packet.to_process);
pthread_mutex_lock(&mosq->msgtime_mutex);
mosq->last_msg_in = mosquitto_time();
pthread_mutex_unlock(&mosq->msgtime_mutex);
Expand All @@ -944,8 +944,7 @@ int _mosquitto_packet_read(struct mosquitto *mosq)
}
}
}
}

}
/* All data for this packet is read. */
mosq->in_packet.pos = 0;
#ifdef WITH_BROKER
Expand Down
6 changes: 6 additions & 0 deletions src/database.c
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ int mqtt3_db_open(struct mqtt3_config *config, struct mosquitto_db *db)
child->subs = NULL;
child->children = NULL;
child->retained = NULL;
child->large_file = NULL;
db->subs.children = child;
//system init
child = _mosquitto_malloc(sizeof(struct _mosquitto_subhier));
Expand All @@ -93,6 +94,7 @@ int mqtt3_db_open(struct mqtt3_config *config, struct mosquitto_db *db)
child->subs = NULL;
child->children = NULL;
child->retained = NULL;
child->large_file = NULL;
db->subs.children->next = child;

db->unpwd = NULL;
Expand Down Expand Up @@ -122,6 +124,10 @@ static void subhier_clean(struct _mosquitto_subhier *subhier)
if(subhier->retained){
subhier->retained->ref_count--;
}

if(subhier->large_file) {
subhier->large_file->ref_count--;
}
subhier_clean(subhier->children); //while delete the point ,delete its children as well
if(subhier->topic) _mosquitto_free(subhier->topic);

Expand Down
35 changes: 25 additions & 10 deletions src/subs.c
Original file line number Diff line number Diff line change
Expand Up @@ -119,8 +119,8 @@ static int _subs_process(struct mosquitto_db *db, struct _mosquitto_subhier *hie
hier->large_file->ref_count--;
}
if(stored->msg.payloadlen){
hier->large_file->ref_count++;
hier->large_file = stored;
hier->large_file->ref_count++; //这里是前面导致server端出问题的原因,这一句放在上一句前面导致出现空指针
//初始化发送包信息
packetlen = 2 + strlen(stored->msg.topic) + stored->msg.payloadlen;
packet = _mosquitto_calloc(1, sizeof(struct _mosquitto_packet));
Expand All @@ -139,9 +139,9 @@ static int _subs_process(struct mosquitto_db *db, struct _mosquitto_subhier *hie
if( stored->msg.payloadlen){
_mosquitto_write_bytes(packet, stored->msg.payload, stored->msg.payloadlen);
}
packet->pos = 0;
//packet->pos = 0;
packet->next = NULL;
packet->to_process = packet->packet_length;
//packet->to_process = packet->packet_length;

}else{
hier->large_file = NULL;
Expand Down Expand Up @@ -186,26 +186,35 @@ static int _subs_process(struct mosquitto_db *db, struct _mosquitto_subhier *hie
client_retain = false;
}
if(BigFile){//直接发送,不保存

if(!leaf->context->BigFile){
leaf->context->BigFile = _mosquitto_malloc( sizeof(struct _mosquitto_BigFile_msg) );
leaf->context->BigFile->next = NULL;
leaf->context->BigFile->mid = stored->msg.mid;
}else {
_mosquitto_free(leaf->context->BigFile);
leaf->context->BigFile = _mosquitto_malloc( sizeof(struct _mosquitto_BigFile_msg) );
leaf->context->BigFile->next = NULL;
leaf->context->BigFile->mid = stored->msg.mid;
}
//这两句初始化本来是在上面做的,但是改到这里是因为当有多个节点发送需要将这两个值初始化,否则只能
packet->pos = 0;
packet->to_process = packet->packet_length;

//发送packet
// pthread_mutex_lock(&mosq->current_out_packet_mutex); 这里差一个锁机制,不知道可不可以用这个
while(packet->to_process > 0){//发送一个包,可能包很长,一次没有发完
// pthread_mutex_lock(&mosq->current_out_packet_mutex);// 这里差一个锁机制,不知道可不可以用这个
while(packet->to_process > 0 && leaf->context->sock != INVALID_SOCKET){//发送一个包,可能包很长,一次没有发完
write_length = _mosquitto_net_write(leaf->context, &(packet->payload[packet->pos]), packet->to_process);
if(write_length > 0){
packet->to_process -= write_length;
packet->pos += write_length;
}else{
if(errno == EAGAIN || errno == COMPAT_EWOULDBLOCK){
// pthread_mutex_unlock(&mosq->current_out_packet_mutex);
return MOSQ_ERR_SUCCESS;
if(errno == EAGAIN || errno == COMPAT_EWOULDBLOCK){ //该操作可能被阻塞
//pthread_mutex_unlock(&mosq->current_out_packet_mutex);
printf("OK ya\n");
}else{
// pthread_mutex_unlock(&mosq->current_out_packet_mutex);
pthread_mutex_unlock(&mosq->current_out_packet_mutex);
printf("error ya\n");
switch(errno){
case COMPAT_ECONNRESET:
return MOSQ_ERR_CONN_LOST;
Expand All @@ -215,7 +224,13 @@ static int _subs_process(struct mosquitto_db *db, struct _mosquitto_subhier *hie
}
}
}
return 0;

pthread_mutex_lock(&mosq->msgtime_mutex);
leaf->context->last_msg_out = mosquitto_time();
pthread_mutex_unlock(&mosq->msgtime_mutex);

printf("send succuss\n");
//return 0;
}else {
if(mqtt3_db_message_insert(db, leaf->context, mid, mosq_md_out, msg_qos, client_retain, stored) == 1)
rc = 1;
Expand Down

0 comments on commit 809274e

Please sign in to comment.