diff -Naur /tmp/mosquitto-1.6.4/lib/connect.c ./lib/connect.c --- /tmp/mosquitto-1.6.4/lib/connect.c 2019-08-01 12:50:01.000000000 -0700 +++ ./lib/connect.c 2019-08-07 23:00:43.993719811 -0700 @@ -272,7 +272,7 @@ mosq->state = mosq_cs_disconnecting; pthread_mutex_unlock(&mosq->state_mutex); - if(mosq->sock == INVALID_SOCKET) return MOSQ_ERR_NO_CONN; + // if(mosq->sock == INVALID_SOCKET) return MOSQ_ERR_NO_CONN; return send__disconnect(mosq, reason_code, outgoing_properties); } diff -Naur /tmp/mosquitto-1.6.4/lib/connect.c.rej ./lib/connect.c.rej --- /tmp/mosquitto-1.6.4/lib/connect.c.rej 1969-12-31 16:00:00.000000000 -0800 +++ ./lib/connect.c.rej 2019-08-07 22:50:08.821782691 -0700 @@ -0,0 +1,11 @@ +--- vendor/mosquitto/lib/connect.c ++++ vendor/mosquitto/lib/connect.c +@@ -206,7 +206,7 @@ int mosquitto_disconnect(struct mosquitto *mosq) + mosq->state = mosq_cs_disconnecting; + pthread_mutex_unlock(&mosq->state_mutex); + +- if(mosq->sock == INVALID_SOCKET) return MOSQ_ERR_NO_CONN; ++// if(mosq->sock == INVALID_SOCKET) return MOSQ_ERR_NO_CONN; + return send__disconnect(mosq); + } + diff -Naur /tmp/mosquitto-1.6.4/lib/handle_connack.c ./lib/handle_connack.c --- /tmp/mosquitto-1.6.4/lib/handle_connack.c 2019-08-01 12:50:01.000000000 -0700 +++ ./lib/handle_connack.c 2019-08-07 22:59:58.849996842 -0700 @@ -34,17 +34,29 @@ pthread_mutex_lock(&mosq->callback_mutex); if(mosq->on_connect){ mosq->in_callback = true; + pthread_mutex_unlock(&mosq->callback_mutex); + mosq->on_connect(mosq, mosq->userdata, reason_code); + + pthread_mutex_lock(&mosq->callback_mutex); mosq->in_callback = false; } if(mosq->on_connect_with_flags){ mosq->in_callback = true; + pthread_mutex_unlock(&mosq->callback_mutex); + mosq->on_connect_with_flags(mosq, mosq->userdata, reason_code, connect_flags); + + pthread_mutex_lock(&mosq->callback_mutex); mosq->in_callback = false; } if(mosq->on_connect_v5){ mosq->in_callback = true; + pthread_mutex_unlock(&mosq->callback_mutex); + mosq->on_connect_v5(mosq, mosq->userdata, reason_code, connect_flags, properties); + + pthread_mutex_lock(&mosq->callback_mutex); mosq->in_callback = false; } pthread_mutex_unlock(&mosq->callback_mutex); diff -Naur /tmp/mosquitto-1.6.4/lib/handle_connack.c.rej ./lib/handle_connack.c.rej --- /tmp/mosquitto-1.6.4/lib/handle_connack.c.rej 1969-12-31 16:00:00.000000000 -0800 +++ ./lib/handle_connack.c.rej 2019-08-07 22:50:08.821782691 -0700 @@ -0,0 +1,36 @@ +--- vendor/mosquitto/lib/handle_connack.c ++++ vendor/mosquitto/lib/handle_connack.c +@@ -39,22 +39,33 @@ int handle__connack(struct mosquitto *mosq) + if(rc) return rc; + log__printf(mosq, MOSQ_LOG_DEBUG, "Client %s received CONNACK (%d)", mosq->id, result); + pthread_mutex_lock(&mosq->callback_mutex); ++ + if(mosq->on_connect){ + mosq->in_callback = true; ++ pthread_mutex_unlock(&mosq->callback_mutex); ++ + mosq->on_connect(mosq, mosq->userdata, result); ++ ++ pthread_mutex_lock(&mosq->callback_mutex); + mosq->in_callback = false; + } + if(mosq->on_connect_with_flags){ + mosq->in_callback = true; ++ pthread_mutex_unlock(&mosq->callback_mutex); ++ + mosq->on_connect_with_flags(mosq, mosq->userdata, result, connect_flags); ++ ++ pthread_mutex_lock(&mosq->callback_mutex); + mosq->in_callback = false; + } + pthread_mutex_unlock(&mosq->callback_mutex); + switch(result){ + case 0: ++ pthread_mutex_lock(&mosq->state_mutex); + if(mosq->state != mosq_cs_disconnecting){ + mosq->state = mosq_cs_connected; + } ++ pthread_mutex_unlock(&mosq->state_mutex); + message__retry_check(mosq); + return MOSQ_ERR_SUCCESS; + case 1: diff -Naur /tmp/mosquitto-1.6.4/lib/handle_ping.c ./lib/handle_ping.c --- /tmp/mosquitto-1.6.4/lib/handle_ping.c 2019-08-01 12:50:01.000000000 -0700 +++ ./lib/handle_ping.c 2019-08-07 23:08:46.798702752 -0700 @@ -55,9 +55,12 @@ { assert(mosq); + pthread_mutex_lock(&mosq->state_mutex); if(mosq->state != mosq_cs_connected){ + pthread_mutex_unlock(&mosq->state_mutex); return MOSQ_ERR_PROTOCOL; } + pthread_mutex_unlock(&mosq->state_mutex); mosq->ping_t = 0; /* No longer waiting for a PINGRESP. */ #ifdef WITH_BROKER diff -Naur /tmp/mosquitto-1.6.4/lib/handle_pubackcomp.c ./lib/handle_pubackcomp.c --- /tmp/mosquitto-1.6.4/lib/handle_pubackcomp.c 2019-08-01 12:50:01.000000000 -0700 +++ ./lib/handle_pubackcomp.c 2019-08-07 23:02:44.792973223 -0700 @@ -50,9 +50,12 @@ assert(mosq); + pthread_mutex_lock(&mosq->state_mutex); if(mosq->state != mosq_cs_connected){ + pthread_mutex_unlock(&mosq->state_mutex); return MOSQ_ERR_PROTOCOL; } + pthread_mutex_unlock(&mosq->state_mutex); pthread_mutex_lock(&mosq->msgs_out.mutex); util__increment_send_quota(mosq); diff -Naur /tmp/mosquitto-1.6.4/lib/handle_publish.c ./lib/handle_publish.c --- /tmp/mosquitto-1.6.4/lib/handle_publish.c 2019-08-01 12:50:01.000000000 -0700 +++ ./lib/handle_publish.c 2019-08-07 23:13:32.764887541 -0700 @@ -43,9 +43,12 @@ assert(mosq); + pthread_mutex_lock(&mosq->state_mutex); if(mosq->state != mosq_cs_connected){ + pthread_mutex_unlock(&mosq->state_mutex); return MOSQ_ERR_PROTOCOL; } + pthread_mutex_unlock(&mosq->state_mutex); message = mosquitto__calloc(1, sizeof(struct mosquitto_message_all)); if(!message) return MOSQ_ERR_NOMEM; @@ -119,7 +122,11 @@ pthread_mutex_lock(&mosq->callback_mutex); if(mosq->on_message){ mosq->in_callback = true; + pthread_mutex_unlock(&mosq->callback_mutex); + mosq->on_message(mosq, mosq->userdata, &message->msg); + + pthread_mutex_lock(&mosq->callback_mutex); mosq->in_callback = false; } if(mosq->on_message_v5){ @@ -137,7 +144,11 @@ pthread_mutex_lock(&mosq->callback_mutex); if(mosq->on_message){ mosq->in_callback = true; + pthread_mutex_unlock(&mosq->callback_mutex); + mosq->on_message(mosq, mosq->userdata, &message->msg); + + pthread_mutex_lock(&mosq->callback_mutex); mosq->in_callback = false; } if(mosq->on_message_v5){ diff -Naur /tmp/mosquitto-1.6.4/lib/handle_suback.c ./lib/handle_suback.c --- /tmp/mosquitto-1.6.4/lib/handle_suback.c 2019-08-01 12:50:01.000000000 -0700 +++ ./lib/handle_suback.c 2019-08-07 23:04:21.152373047 -0700 @@ -43,9 +43,12 @@ assert(mosq); + pthread_mutex_lock(&mosq->state_mutex); if(mosq->state != mosq_cs_connected){ + pthread_mutex_unlock(&mosq->state_mutex); return MOSQ_ERR_PROTOCOL; } + pthread_mutex_unlock(&mosq->state_mutex); #ifdef WITH_BROKER log__printf(NULL, MOSQ_LOG_DEBUG, "Received SUBACK from %s", mosq->id); diff -Naur /tmp/mosquitto-1.6.4/lib/packet_mosq.c ./lib/packet_mosq.c --- /tmp/mosquitto-1.6.4/lib/packet_mosq.c 2019-08-01 12:50:01.000000000 -0700 +++ ./lib/packet_mosq.c 2019-08-07 22:53:58.196306059 -0700 @@ -174,9 +174,12 @@ #endif } + pthread_mutex_lock(&mosq->callback_mutex); if(mosq->in_callback == false && mosq->threaded == mosq_ts_none){ + pthread_mutex_unlock(&mosq->callback_mutex); return packet__write(mosq); }else{ + pthread_mutex_unlock(&mosq->callback_mutex); return MOSQ_ERR_SUCCESS; } #endif @@ -217,14 +220,17 @@ } pthread_mutex_unlock(&mosq->out_packet_mutex); + pthread_mutex_lock(&mosq->state_mutex); #if defined(WITH_TLS) && !defined(WITH_BROKER) if((mosq->state == mosq_cs_connect_pending) || mosq->want_connect){ #else if(mosq->state == mosq_cs_connect_pending){ #endif + pthread_mutex_unlock(&mosq->state_mutex); pthread_mutex_unlock(&mosq->current_out_packet_mutex); return MOSQ_ERR_SUCCESS; } + pthread_mutex_unlock(&mosq->state_mutex); while(mosq->current_out_packet){ packet = mosq->current_out_packet; @@ -319,9 +325,12 @@ if(mosq->sock == INVALID_SOCKET){ return MOSQ_ERR_NO_CONN; } + pthread_mutex_lock(&mosq->state_mutex); if(mosq->state == mosq_cs_connect_pending){ + pthread_mutex_unlock(&mosq->state_mutex); return MOSQ_ERR_SUCCESS; } + pthread_mutex_unlock(&mosq->state_mutex); /* This gets called if pselect() indicates that there is network data * available - ie. at least one byte. What we do depends on what data we diff -Naur /tmp/mosquitto-1.6.4/lib/packet_mosq.c.rej ./lib/packet_mosq.c.rej --- /tmp/mosquitto-1.6.4/lib/packet_mosq.c.rej 1969-12-31 16:00:00.000000000 -0800 +++ ./lib/packet_mosq.c.rej 2019-08-07 22:50:08.825782665 -0700 @@ -0,0 +1,28 @@ +--- vendor/mosquitto/lib/packet_mosq.c ++++ vendor/mosquitto/lib/packet_mosq.c +@@ -141,9 +141,12 @@ int packet__queue(struct mosquitto *mosq, struct mosquitto__packet *packet) + #endif + } + ++ pthread_mutex_lock(&mosq->callback_mutex); + if(mosq->in_callback == false && mosq->threaded == false){ ++ pthread_mutex_unlock(&mosq->callback_mutex); + return packet__write(mosq); + }else{ ++ pthread_mutex_unlock(&mosq->callback_mutex); + return MOSQ_ERR_SUCCESS; + } + #endif +@@ -396,9 +402,12 @@ int packet__read(struct mosquitto *mosq) + + if(!mosq) return MOSQ_ERR_INVAL; + if(mosq->sock == INVALID_SOCKET) return MOSQ_ERR_NO_CONN; ++ pthread_mutex_lock(&mosq->state_mutex); + if(mosq->state == mosq_cs_connect_pending){ ++ pthread_mutex_unlock(&mosq->state_mutex); + return MOSQ_ERR_SUCCESS; + } ++ pthread_mutex_unlock(&mosq->state_mutex); + + /* This gets called if pselect() indicates that there is network data + * available - ie. at least one byte. What we do depends on what data we diff -Naur /tmp/mosquitto-1.6.4/lib/util_mosq.c ./lib/util_mosq.c --- /tmp/mosquitto-1.6.4/lib/util_mosq.c 2019-08-01 12:50:01.000000000 -0700 +++ ./lib/util_mosq.c 2019-08-07 22:50:08.825782665 -0700 @@ -88,7 +88,9 @@ if(mosq->keepalive && mosq->sock != INVALID_SOCKET && (now >= next_msg_out || now - last_msg_in >= mosq->keepalive)){ + pthread_mutex_lock(&mosq->state_mutex); if(mosq->state == mosq_cs_connected && mosq->ping_t == 0){ + pthread_mutex_unlock(&mosq->state_mutex); send__pingreq(mosq); /* Reset last msg times to give the server time to send a pingresp */ pthread_mutex_lock(&mosq->msgtime_mutex); @@ -110,7 +112,9 @@ pthread_mutex_lock(&mosq->callback_mutex); if(mosq->on_disconnect){ mosq->in_callback = true; + pthread_mutex_unlock(&mosq->callback_mutex); mosq->on_disconnect(mosq, mosq->userdata, rc); + pthread_mutex_lock(&mosq->callback_mutex); mosq->in_callback = false; } if(mosq->on_disconnect_v5){