diff --git a/sql/semisync_master_ack_receiver.cc b/sql/semisync_master_ack_receiver.cc index 4960c7984cc..78b8b69b3ce 100644 --- a/sql/semisync_master_ack_receiver.cc +++ b/sql/semisync_master_ack_receiver.cc @@ -15,6 +15,7 @@ #include "semisync_master.h" #include "semisync_master_ack_receiver.h" +#include "semisync_master_socket_listener.h" extern ReplSemiSyncMaster repl_semisync; @@ -183,22 +184,6 @@ inline void Ack_receiver::wait_for_slave_connection() mysql_cond_wait(&m_cond, &m_mutex); } -my_socket Ack_receiver::get_slave_sockets(fd_set *fds) -{ - my_socket max_fd= INVALID_SOCKET; - unsigned int i; - - FD_ZERO(fds); - for (i= 0; i < m_slaves.size(); i++) - { - my_socket fd= m_slaves[i].sock_fd(); - max_fd= (fd > max_fd ? fd : max_fd); - FD_SET(fd, fds); - } - - return max_fd; -} - /* Auxilary function to initialize a NET object with given net buffer. */ static void init_net(NET *net, unsigned char *buff, unsigned int buff_len) { @@ -214,10 +199,14 @@ void Ack_receiver::run() NET net; unsigned char net_buff[REPLY_MESSAGE_MAX_LENGTH]; - fd_set read_fds; - my_socket max_fd= INVALID_SOCKET; uint i; +#ifdef HAVE_POLL + Poll_socket_listener listener(m_slaves); +#else + Select_socket_listener listener(m_slaves); +#endif //HAVE_POLL + sql_print_information("Starting ack receiver thread"); init_net(&net, net_buff, REPLY_MESSAGE_MAX_LENGTH); @@ -228,7 +217,6 @@ void Ack_receiver::run() while (1) { - fd_set fds; Slave_vector_it it; int ret; @@ -246,24 +234,20 @@ void Ack_receiver::run() continue; } - max_fd= get_slave_sockets(&read_fds); + if (!listener.init_slave_sockets()) + goto end; m_slaves_changed= false; - DBUG_PRINT("info", ("fd count %lu, max_fd %d", (ulong)m_slaves.size(), - max_fd)); } - struct timeval tv= {1, 0}; - fds= read_fds; - /* select requires max fd + 1 for the first argument */ - ret= select(max_fd+1, &fds, NULL, NULL, &tv); + ret= listener.listen_on_sockets(); if (ret <= 0) { mysql_mutex_unlock(&m_mutex); ret= DBUG_EVALUATE_IF("rpl_semisync_simulate_select_error", -1, ret); - if (ret == -1) - sql_print_information("Failed to select() on semi-sync dump sockets, " + if (ret == -1 && errno != EINTR) + sql_print_information("Failed to wait on semi-sync dump sockets, " "error: errno=%d", socket_errno); /* Sleep 1us, so other threads can catch the m_mutex easily. */ my_sleep(1); @@ -274,7 +258,7 @@ void Ack_receiver::run() i= 0; while (i < m_slaves.size()) { - if (FD_ISSET(m_slaves[i].sock_fd(), &fds)) + if (listener.is_socket_active(i)) { ulong len; @@ -286,7 +270,7 @@ void Ack_receiver::run() repl_semisync_master.reportReplyPacket(m_slaves[i].server_id(), net.read_pos, len); else if (net.last_errno == ER_NET_READ_ERROR) - FD_CLR(m_slaves[i].sock_fd(), &read_fds); + listener.clear_socket_info(i); } i++; } diff --git a/sql/semisync_master_ack_receiver.h b/sql/semisync_master_ack_receiver.h index dfc60aec465..404e1d21553 100644 --- a/sql/semisync_master_ack_receiver.h +++ b/sql/semisync_master_ack_receiver.h @@ -21,6 +21,19 @@ #include "my_pthread.h" #include "sql_class.h" #include "semisync.h" + +struct Slave +{ + THD *thd; + Vio vio; + + my_socket sock_fd() const { return vio.mysql_socket.fd; } + uint server_id() const { return thd->server_id; } +}; + +typedef std::vector Slave_vector; +typedef Slave_vector::iterator Slave_vector_it; + /** Ack_receiver is responsible to control ack receive thread and maintain slave information used by ack receive thread. @@ -93,17 +106,6 @@ class Ack_receiver : public ReplSemiSyncBase /* If slave list is updated(add or remove). */ bool m_slaves_changed; - struct Slave - { - THD *thd; - Vio vio; - - my_socket sock_fd() { return vio.mysql_socket.fd; } - uint server_id() { return thd->server_id; } - }; - - typedef std::vector Slave_vector; - typedef Slave_vector::iterator Slave_vector_it; Slave_vector m_slaves; pthread_t m_pid; @@ -114,7 +116,6 @@ class Ack_receiver : public ReplSemiSyncBase void set_stage_info(const PSI_stage_info &stage); void wait_for_slave_connection(); - my_socket get_slave_sockets(fd_set *fds); }; extern Ack_receiver ack_receiver; diff --git a/sql/semisync_master_socket_listener.h b/sql/semisync_master_socket_listener.h new file mode 100644 index 00000000000..ceaf249999b --- /dev/null +++ b/sql/semisync_master_socket_listener.h @@ -0,0 +1,124 @@ +/* Copyright (c) 2016 Oracle and/or its affiliates. All rights reserved. + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; version 2 of the License. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */ + +#ifndef SEMISYNC_MASTER_SOCKET_LISTENER +#define SEMISYNC_MASTER_SOCKET_LISTENER +#include "semisync_master_ack_receiver.h" + +#ifdef HAVE_POLL +#include +#include + +class Poll_socket_listener +{ +public: + Poll_socket_listener(const Slave_vector &slaves) + :m_slaves(slaves) + { + } + + bool listen_on_sockets() + { + return poll(m_fds.data(), m_fds.size(), 1000 /*1 Second timeout*/); + } + + bool is_socket_active(int index) + { + return m_fds[index].revents & POLLIN; + } + + void clear_socket_info(int index) + { + m_fds[index].fd= -1; + m_fds[index].events= 0; + } + + bool init_slave_sockets() + { + m_fds.clear(); + for (uint i= 0; i < m_slaves.size(); i++) + { + pollfd poll_fd; + poll_fd.fd= m_slaves[i].sock_fd(); + poll_fd.events= POLLIN; + m_fds.push_back(poll_fd); + } + return true; + } + +private: + const Slave_vector &m_slaves; + std::vector m_fds; +}; + +#else //NO POLL + +class Select_socket_listener +{ +public: + Select_socket_listener(const Slave_vector &slaves) + :m_slaves(slaves), m_max_fd(INVALID_SOCKET) + { + } + + bool listen_on_sockets() + { + /* Reinitialze the fds with active fds before calling select */ + m_fds= m_init_fds; + struct timeval tv= {1,0}; + /* select requires max fd + 1 for the first argument */ + return select(m_max_fd+1, &m_fds, NULL, NULL, &tv); + } + + bool is_socket_active(int index) + { + return FD_ISSET(m_slaves[index].sock_fd(), &m_fds); + } + + void clear_socket_info(int index) + { + FD_CLR(m_slaves[index].sock_fd(), &m_init_fds); + } + + bool init_slave_sockets() + { + FD_ZERO(&m_init_fds); + for (uint i= 0; i < m_slaves.size(); i++) + { + my_socket socket_id= m_slaves[i].sock_fd(); + m_max_fd= (socket_id > m_max_fd ? socket_id : m_max_fd); +#ifndef WINDOWS + if (socket_id > FD_SETSIZE) + { + sql_print_error("Semisync slave socket fd is %u. " + "select() cannot handle if the socket fd is " + "bigger than %u (FD_SETSIZE).", socket_id, FD_SETSIZE); + return false; + } +#endif //WINDOWS + FD_SET(socket_id, &m_init_fds); + } + return true; + } + +private: + const Slave_vector &m_slaves; + my_socket m_max_fd; + fd_set m_init_fds; + fd_set m_fds; +}; + +#endif //HAVE_POLL +#endif //SEMISYNC_MASTER_SOCKET_LISTENER