Skip to content

Commit

Permalink
Make behaviour of mosquitto_connect[_async]() consistent.
Browse files Browse the repository at this point in the history
`mosquitto_connect_async()` is now consistent with `mosquitto_connect()`
when connecting to a non-existent server.

Closes #1345. Thanks to Mohammad Reza.
  • Loading branch information
ralight committed Sep 17, 2019
1 parent c3f4ee9 commit 4d4c5dd
Show file tree
Hide file tree
Showing 4 changed files with 46 additions and 23 deletions.
3 changes: 3 additions & 0 deletions ChangeLog.txt
Expand Up @@ -10,6 +10,9 @@ Broker:
Client library:
- Fix MQTT v5 subscription options being incorrectly set for MQTT v3
subscriptions. Closes #1353.
- Make behaviour of `mosquitto_connect_async()` consistent with
`mosquitto_connect()` when connecting to a non-existent server.
Closes #1345.

Clients:
- mosquitto_pub: fix error codes not being returned when mosquitto_pub exits.
Expand Down
33 changes: 10 additions & 23 deletions lib/connect.c
Expand Up @@ -163,7 +163,6 @@ static int mosquitto__reconnect(struct mosquitto *mosq, bool blocking, const mos
const mosquitto_property *outgoing_properties = NULL;
mosquitto_property local_property;
int rc;
struct mosquitto__packet *packet;
if(!mosq) return MOSQ_ERR_INVAL;
if(!mosq->host || mosq->port <= 0) return MOSQ_ERR_INVAL;
if(mosq->protocol != mosq_p_mqtt5 && properties) return MOSQ_ERR_NOT_SUPPORTED;
Expand Down Expand Up @@ -201,27 +200,7 @@ static int mosquitto__reconnect(struct mosquitto *mosq, bool blocking, const mos

packet__cleanup(&mosq->in_packet);

pthread_mutex_lock(&mosq->current_out_packet_mutex);
pthread_mutex_lock(&mosq->out_packet_mutex);

if(mosq->out_packet && !mosq->current_out_packet){
mosq->current_out_packet = mosq->out_packet;
mosq->out_packet = mosq->out_packet->next;
}

while(mosq->current_out_packet){
packet = mosq->current_out_packet;
/* Free data and reset values */
mosq->current_out_packet = mosq->out_packet;
if(mosq->out_packet){
mosq->out_packet = mosq->out_packet->next;
}

packet__cleanup(packet);
mosquitto__free(packet);
}
pthread_mutex_unlock(&mosq->out_packet_mutex);
pthread_mutex_unlock(&mosq->current_out_packet_mutex);
packet__cleanup_all(mosq);

message__reconnect_reset(mosq);

Expand Down Expand Up @@ -250,7 +229,15 @@ static int mosquitto__reconnect(struct mosquitto *mosq, bool blocking, const mos
}else
#endif
{
return send__connect(mosq, mosq->keepalive, mosq->clean_start, outgoing_properties);
rc = send__connect(mosq, mosq->keepalive, mosq->clean_start, outgoing_properties);
if(rc){
packet__cleanup_all(mosq);
net__socket_close(mosq);
pthread_mutex_lock(&mosq->state_mutex);
mosq->state = mosq_cs_new;
pthread_mutex_unlock(&mosq->state_mutex);
}
return rc;
}
}

Expand Down
32 changes: 32 additions & 0 deletions lib/packet_mosq.c
Expand Up @@ -98,6 +98,38 @@ void packet__cleanup(struct mosquitto__packet *packet)
packet->pos = 0;
}


void packet__cleanup_all(struct mosquitto *mosq)
{
struct mosquitto__packet *packet;

pthread_mutex_lock(&mosq->current_out_packet_mutex);
pthread_mutex_lock(&mosq->out_packet_mutex);

/* Out packet cleanup */
if(mosq->out_packet && !mosq->current_out_packet){
mosq->current_out_packet = mosq->out_packet;
mosq->out_packet = mosq->out_packet->next;
}
while(mosq->current_out_packet){
packet = mosq->current_out_packet;
/* Free data and reset values */
mosq->current_out_packet = mosq->out_packet;
if(mosq->out_packet){
mosq->out_packet = mosq->out_packet->next;
}

packet__cleanup(packet);
mosquitto__free(packet);
}

packet__cleanup(&mosq->in_packet);

pthread_mutex_unlock(&mosq->out_packet_mutex);
pthread_mutex_unlock(&mosq->current_out_packet_mutex);
}


int packet__queue(struct mosquitto *mosq, struct mosquitto__packet *packet)
{
#ifndef WITH_BROKER
Expand Down
1 change: 1 addition & 0 deletions lib/packet_mosq.h
Expand Up @@ -25,6 +25,7 @@ struct mosquitto_db;

int packet__alloc(struct mosquitto__packet *packet);
void packet__cleanup(struct mosquitto__packet *packet);
void packet__cleanup_all(struct mosquitto *mosq);
int packet__queue(struct mosquitto *mosq, struct mosquitto__packet *packet);

int packet__check_oversize(struct mosquitto *mosq, uint32_t remaining_length);
Expand Down

0 comments on commit 4d4c5dd

Please sign in to comment.