Skip to content

Commit

Permalink
Fix possible assert crash associated with bridge reconnecting.
Browse files Browse the repository at this point in the history
This only occurs when compiled without epoll support.

Closes #1700. Thanks to Matthias Urlichs.
  • Loading branch information
ralight committed May 25, 2020
1 parent ce463c1 commit cebb614
Show file tree
Hide file tree
Showing 3 changed files with 115 additions and 109 deletions.
2 changes: 2 additions & 0 deletions ChangeLog.txt
Expand Up @@ -10,6 +10,8 @@ Broker:
- Fix support for openssl 3.0.
- Fix check when loading persistence file of a different version than the
native version. Closes #1684.
- Fix possible assert crash associated with bridge reconnecting when compiled
without epoll support. Closes #1700.

Client library:
- Don't treat an unexpected PUBACK, PUBREL, or PUBCOMP as a fatal error.
Expand Down
217 changes: 109 additions & 108 deletions src/loop.c
Expand Up @@ -222,114 +222,6 @@ int mosquitto_main_loop(struct mosquitto_db *db, mosq_sock_t *listensock, int li
}
#endif

time_count = 0;
HASH_ITER(hh_sock, db->contexts_by_sock, context, ctxt_tmp){
if(time_count > 0){
time_count--;
}else{
time_count = 1000;
now = mosquitto_time();
}
context->pollfd_index = -1;

if(context->sock != INVALID_SOCKET){
#ifdef WITH_BRIDGE
if(context->bridge){
mosquitto__check_keepalive(db, context);
if(context->bridge->round_robin == false
&& context->bridge->cur_address != 0
&& context->bridge->primary_retry
&& now > context->bridge->primary_retry){

if(context->bridge->primary_retry_sock == INVALID_SOCKET){
rc = net__try_connect(context->bridge->addresses[0].address,
context->bridge->addresses[0].port,
&context->bridge->primary_retry_sock, NULL, false);

if(rc == 0){
COMPAT_CLOSE(context->bridge->primary_retry_sock);
context->bridge->primary_retry_sock = INVALID_SOCKET;
context->bridge->primary_retry = 0;
net__socket_close(db, context);
context->bridge->cur_address = 0;
}
}else{
len = sizeof(int);
if(!getsockopt(context->bridge->primary_retry_sock, SOL_SOCKET, SO_ERROR, (char *)&err, &len)){
if(err == 0){
COMPAT_CLOSE(context->bridge->primary_retry_sock);
context->bridge->primary_retry_sock = INVALID_SOCKET;
context->bridge->primary_retry = 0;
net__socket_close(db, context);
context->bridge->cur_address = context->bridge->address_count-1;
}else{
COMPAT_CLOSE(context->bridge->primary_retry_sock);
context->bridge->primary_retry_sock = INVALID_SOCKET;
context->bridge->primary_retry = now+5;
}
}else{
COMPAT_CLOSE(context->bridge->primary_retry_sock);
context->bridge->primary_retry_sock = INVALID_SOCKET;
context->bridge->primary_retry = now+5;
}
}
}
}
#endif

/* Local bridges never time out in this fashion. */
if(!(context->keepalive)
|| context->bridge
|| now - context->last_msg_in <= (time_t)(context->keepalive)*3/2){

if(db__message_write(db, context) == MOSQ_ERR_SUCCESS){
#ifdef WITH_EPOLL
if(context->current_out_packet || context->state == mosq_cs_connect_pending || context->ws_want_write){
if(!(context->events & EPOLLOUT)) {
ev.data.fd = context->sock;
ev.events = EPOLLIN | EPOLLOUT;
if(epoll_ctl(db->epollfd, EPOLL_CTL_ADD, context->sock, &ev) == -1) {
if((errno != EEXIST)||(epoll_ctl(db->epollfd, EPOLL_CTL_MOD, context->sock, &ev) == -1)) {
log__printf(NULL, MOSQ_LOG_DEBUG, "Error in epoll re-registering to EPOLLOUT: %s", strerror(errno));
}
}
context->events = EPOLLIN | EPOLLOUT;
}
context->ws_want_write = false;
}
else{
if(context->events & EPOLLOUT) {
ev.data.fd = context->sock;
ev.events = EPOLLIN;
if(epoll_ctl(db->epollfd, EPOLL_CTL_ADD, context->sock, &ev) == -1) {
if((errno != EEXIST)||(epoll_ctl(db->epollfd, EPOLL_CTL_MOD, context->sock, &ev) == -1)) {
log__printf(NULL, MOSQ_LOG_DEBUG, "Error in epoll re-registering to EPOLLIN: %s", strerror(errno));
}
}
context->events = EPOLLIN;
}
}
#else
pollfds[pollfd_index].fd = context->sock;
pollfds[pollfd_index].events = POLLIN;
pollfds[pollfd_index].revents = 0;
if(context->current_out_packet || context->state == mosq_cs_connect_pending || context->ws_want_write){
pollfds[pollfd_index].events |= POLLOUT;
context->ws_want_write = false;
}
context->pollfd_index = pollfd_index;
pollfd_index++;
#endif
}else{
do_disconnect(db, context, MOSQ_ERR_CONN_LOST);
}
}else{
/* Client has exceeded keepalive*1.5 */
do_disconnect(db, context, MOSQ_ERR_KEEPALIVE);
}
}
}

#ifdef WITH_BRIDGE
time_count = 0;
for(i=0; i<db->bridge_count; i++){
Expand Down Expand Up @@ -466,6 +358,115 @@ int mosquitto_main_loop(struct mosquitto_db *db, mosq_sock_t *listensock, int li
}
#endif

time_count = 0;
HASH_ITER(hh_sock, db->contexts_by_sock, context, ctxt_tmp){
if(time_count > 0){
time_count--;
}else{
time_count = 1000;
now = mosquitto_time();
}
context->pollfd_index = -1;

if(context->sock != INVALID_SOCKET){
#ifdef WITH_BRIDGE
if(context->bridge){
mosquitto__check_keepalive(db, context);
if(context->bridge->round_robin == false
&& context->bridge->cur_address != 0
&& context->bridge->primary_retry
&& now > context->bridge->primary_retry){

if(context->bridge->primary_retry_sock == INVALID_SOCKET){
rc = net__try_connect(context->bridge->addresses[0].address,
context->bridge->addresses[0].port,
&context->bridge->primary_retry_sock, NULL, false);

if(rc == 0){
COMPAT_CLOSE(context->bridge->primary_retry_sock);
context->bridge->primary_retry_sock = INVALID_SOCKET;
context->bridge->primary_retry = 0;
net__socket_close(db, context);
context->bridge->cur_address = 0;
}
}else{
len = sizeof(int);
if(!getsockopt(context->bridge->primary_retry_sock, SOL_SOCKET, SO_ERROR, (char *)&err, &len)){
if(err == 0){
COMPAT_CLOSE(context->bridge->primary_retry_sock);
context->bridge->primary_retry_sock = INVALID_SOCKET;
context->bridge->primary_retry = 0;
net__socket_close(db, context);
context->bridge->cur_address = context->bridge->address_count-1;
}else{
COMPAT_CLOSE(context->bridge->primary_retry_sock);
context->bridge->primary_retry_sock = INVALID_SOCKET;
context->bridge->primary_retry = now+5;
}
}else{
COMPAT_CLOSE(context->bridge->primary_retry_sock);
context->bridge->primary_retry_sock = INVALID_SOCKET;
context->bridge->primary_retry = now+5;
}
}
}
}
#endif

/* Local bridges never time out in this fashion. */
if(!(context->keepalive)
|| context->bridge
|| now - context->last_msg_in <= (time_t)(context->keepalive)*3/2){

if(db__message_write(db, context) == MOSQ_ERR_SUCCESS){
#ifdef WITH_EPOLL
if(context->current_out_packet || context->state == mosq_cs_connect_pending || context->ws_want_write){
if(!(context->events & EPOLLOUT)) {
ev.data.fd = context->sock;
ev.events = EPOLLIN | EPOLLOUT;
if(epoll_ctl(db->epollfd, EPOLL_CTL_ADD, context->sock, &ev) == -1) {
if((errno != EEXIST)||(epoll_ctl(db->epollfd, EPOLL_CTL_MOD, context->sock, &ev) == -1)) {
log__printf(NULL, MOSQ_LOG_DEBUG, "Error in epoll re-registering to EPOLLOUT: %s", strerror(errno));
}
}
context->events = EPOLLIN | EPOLLOUT;
}
context->ws_want_write = false;
}
else{
if(context->events & EPOLLOUT) {
ev.data.fd = context->sock;
ev.events = EPOLLIN;
if(epoll_ctl(db->epollfd, EPOLL_CTL_ADD, context->sock, &ev) == -1) {
if((errno != EEXIST)||(epoll_ctl(db->epollfd, EPOLL_CTL_MOD, context->sock, &ev) == -1)) {
log__printf(NULL, MOSQ_LOG_DEBUG, "Error in epoll re-registering to EPOLLIN: %s", strerror(errno));
}
}
context->events = EPOLLIN;
}
}
#else
pollfds[pollfd_index].fd = context->sock;
pollfds[pollfd_index].events = POLLIN;
pollfds[pollfd_index].revents = 0;
if(context->current_out_packet || context->state == mosq_cs_connect_pending || context->ws_want_write){
pollfds[pollfd_index].events |= POLLOUT;
context->ws_want_write = false;
}
context->pollfd_index = pollfd_index;
pollfd_index++;
#endif
}else{
do_disconnect(db, context, MOSQ_ERR_CONN_LOST);
}
}else{
/* Client has exceeded keepalive*1.5 */
do_disconnect(db, context, MOSQ_ERR_KEEPALIVE);
}
}
}


#ifndef WIN32
sigprocmask(SIG_SETMASK, &sigblock, &origsig);
#ifdef WITH_EPOLL
Expand Down
5 changes: 4 additions & 1 deletion test/broker/06-bridge-b2br-late-connection-retain.py
Expand Up @@ -40,6 +40,9 @@ def write_config2(filename, persistence_file, port1, port2):
publish_packet = mosq_test.gen_publish("bridge/test", qos=1, mid=mid, payload="message", retain=True)
puback_packet = mosq_test.gen_puback(mid)

mid = 2
publish_packet2 = mosq_test.gen_publish("bridge/test", qos=1, mid=mid, payload="message", retain=True)

ssock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
ssock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
ssock.settimeout(40)
Expand Down Expand Up @@ -67,7 +70,7 @@ def write_config2(filename, persistence_file, port1, port2):
if mosq_test.expect_packet(bridge, "connect", connect_packet):
bridge.send(connack_packet)

if mosq_test.expect_packet(bridge, "publish", publish_packet):
if mosq_test.expect_packet(bridge, "publish", publish_packet2):
bridge.send(puback_packet)
# Guard against multiple retained messages of the same type by
# sending a pingreq to give us something to expect back. If we get
Expand Down

0 comments on commit cebb614

Please sign in to comment.