Skip to content

Commit

Permalink
Fix missing locks on mosq->state.
Browse files Browse the repository at this point in the history
Closes #1374. Thanks to Jeff Trull.
  • Loading branch information
ralight committed Sep 17, 2019
1 parent 76d0b32 commit 40c1a97
Show file tree
Hide file tree
Showing 7 changed files with 45 additions and 8 deletions.
1 change: 1 addition & 0 deletions ChangeLog.txt
Expand Up @@ -14,6 +14,7 @@ Broker:
Client library:
- Fix reconnect backoff for the situation where connections are dropped rather
than refused. Closes #737.
- Fix missing locks on `mosq->state`. Closes #1374.

Documentation:
- Improve details on global/per listener options in the mosquitto.conf man page.
Expand Down
8 changes: 7 additions & 1 deletion lib/handle_ping.c
Expand Up @@ -53,9 +53,15 @@ int handle__pingreq(struct mosquitto *mosq)

int handle__pingresp(struct mosquitto *mosq)
{
int state;

assert(mosq);

if(mosq->state != mosq_cs_connected){
pthread_mutex_lock(&mosq->state_mutex);
state = mosq->state;
pthread_mutex_unlock(&mosq->state_mutex);

if(state != mosq_cs_connected){
return MOSQ_ERR_PROTOCOL;
}

Expand Down
7 changes: 6 additions & 1 deletion lib/handle_pubackcomp.c
Expand Up @@ -47,10 +47,15 @@ int handle__pubackcomp(struct mosquitto *mosq, const char *type)
int rc;
mosquitto_property *properties = NULL;
int qos;
int state;

assert(mosq);

if(mosq->state != mosq_cs_connected){
pthread_mutex_lock(&mosq->state_mutex);
state = mosq->state;
pthread_mutex_unlock(&mosq->state_mutex);

if(state != mosq_cs_connected){
return MOSQ_ERR_PROTOCOL;
}

Expand Down
7 changes: 6 additions & 1 deletion lib/handle_publish.c
Expand Up @@ -40,10 +40,15 @@ int handle__publish(struct mosquitto *mosq)
uint16_t mid;
int slen;
mosquitto_property *properties = NULL;
int state;

assert(mosq);

if(mosq->state != mosq_cs_connected){
pthread_mutex_lock(&mosq->state_mutex);
state = mosq->state;
pthread_mutex_unlock(&mosq->state_mutex);

if(state != mosq_cs_connected){
return MOSQ_ERR_PROTOCOL;
}

Expand Down
7 changes: 6 additions & 1 deletion lib/handle_suback.c
Expand Up @@ -40,10 +40,15 @@ int handle__suback(struct mosquitto *mosq)
int i = 0;
int rc;
mosquitto_property *properties = NULL;
int state;

assert(mosq);

if(mosq->state != mosq_cs_connected){
pthread_mutex_lock(&mosq->state_mutex);
state = mosq->state;
pthread_mutex_unlock(&mosq->state_mutex);

if(state != mosq_cs_connected){
return MOSQ_ERR_PROTOCOL;
}

Expand Down
16 changes: 13 additions & 3 deletions lib/packet_mosq.c
Expand Up @@ -202,6 +202,7 @@ int packet__write(struct mosquitto *mosq)
{
ssize_t write_length;
struct mosquitto__packet *packet;
int state;

if(!mosq) return MOSQ_ERR_INVAL;
if(mosq->sock == INVALID_SOCKET) return MOSQ_ERR_NO_CONN;
Expand All @@ -217,10 +218,14 @@ int packet__write(struct mosquitto *mosq)
}
pthread_mutex_unlock(&mosq->out_packet_mutex);

pthread_mutex_lock(&mosq->state_mutex);
state = mosq->state;
pthread_mutex_unlock(&mosq->state_mutex);

#if defined(WITH_TLS) && !defined(WITH_BROKER)
if((mosq->state == mosq_cs_connect_pending) || mosq->want_connect){
if((state == mosq_cs_connect_pending) || mosq->want_connect){
#else
if(mosq->state == mosq_cs_connect_pending){
if(state == mosq_cs_connect_pending){
#endif
pthread_mutex_unlock(&mosq->current_out_packet_mutex);
return MOSQ_ERR_SUCCESS;
Expand Down Expand Up @@ -316,14 +321,19 @@ int packet__read(struct mosquitto *mosq)
uint8_t byte;
ssize_t read_length;
int rc = 0;
int state;

if(!mosq){
return MOSQ_ERR_INVAL;
}
if(mosq->sock == INVALID_SOCKET){
return MOSQ_ERR_NO_CONN;
}
if(mosq->state == mosq_cs_connect_pending){
pthread_mutex_lock(&mosq->state_mutex);
state = mosq->state;
pthread_mutex_unlock(&mosq->state_mutex);

if(state == mosq_cs_connect_pending){
return MOSQ_ERR_SUCCESS;
}

Expand Down
7 changes: 6 additions & 1 deletion lib/util_mosq.c
Expand Up @@ -68,6 +68,7 @@ int mosquitto__check_keepalive(struct mosquitto *mosq)
#ifndef WITH_BROKER
int rc;
#endif
int state;

assert(mosq);
#if defined(WITH_BROKER) && defined(WITH_BRIDGE)
Expand All @@ -88,7 +89,11 @@ int mosquitto__check_keepalive(struct mosquitto *mosq)
if(mosq->keepalive && mosq->sock != INVALID_SOCKET &&
(now >= next_msg_out || now - last_msg_in >= mosq->keepalive)){

if(mosq->state == mosq_cs_connected && mosq->ping_t == 0){
pthread_mutex_lock(&mosq->state_mutex);
state = mosq->state;
pthread_mutex_unlock(&mosq->state_mutex);

if(state == mosq_cs_connected && mosq->ping_t == 0){
send__pingreq(mosq);
/* Reset last msg times to give the server time to send a pingresp */
pthread_mutex_lock(&mosq->msgtime_mutex);
Expand Down

0 comments on commit 40c1a97

Please sign in to comment.