Skip to content

Commit

Permalink
[Feature] Issue #46 Semisync optimization and fix invalid packet error
Browse files Browse the repository at this point in the history
  Description
  -----------
  Originally Semisync use the SELECT() function to listener all slave sockets,
  But SELECT() has several restrictions, so replace it by poll();

  Ack_receiver maybe read invalid socket fd and report network packet errors.
  We should clear socket when read return error with ER_NET_READ_ERROR, if not,
  Ack_receiver will report a lot of network packet errors until dump thread is
  killed;
  • Loading branch information
AliSQL authored and AliSQL committed Jul 17, 2017
1 parent 4f246a8 commit c586f14
Show file tree
Hide file tree
Showing 3 changed files with 151 additions and 42 deletions.
44 changes: 14 additions & 30 deletions sql/semisync_master_ack_receiver.cc
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

#include "semisync_master.h"
#include "semisync_master_ack_receiver.h"
#include "semisync_master_socket_listener.h"

extern ReplSemiSyncMaster repl_semisync;

Expand Down Expand Up @@ -183,22 +184,6 @@ inline void Ack_receiver::wait_for_slave_connection()
mysql_cond_wait(&m_cond, &m_mutex);
}

my_socket Ack_receiver::get_slave_sockets(fd_set *fds)
{
my_socket max_fd= INVALID_SOCKET;
unsigned int i;

FD_ZERO(fds);
for (i= 0; i < m_slaves.size(); i++)
{
my_socket fd= m_slaves[i].sock_fd();
max_fd= (fd > max_fd ? fd : max_fd);
FD_SET(fd, fds);
}

return max_fd;
}

/* Auxilary function to initialize a NET object with given net buffer. */
static void init_net(NET *net, unsigned char *buff, unsigned int buff_len)
{
Expand All @@ -214,10 +199,14 @@ void Ack_receiver::run()
NET net;
unsigned char net_buff[REPLY_MESSAGE_MAX_LENGTH];

fd_set read_fds;
my_socket max_fd= INVALID_SOCKET;
uint i;

#ifdef HAVE_POLL
Poll_socket_listener listener(m_slaves);
#else
Select_socket_listener listener(m_slaves);
#endif //HAVE_POLL

sql_print_information("Starting ack receiver thread");

init_net(&net, net_buff, REPLY_MESSAGE_MAX_LENGTH);
Expand All @@ -228,7 +217,6 @@ void Ack_receiver::run()

while (1)
{
fd_set fds;
Slave_vector_it it;
int ret;

Expand All @@ -246,24 +234,20 @@ void Ack_receiver::run()
continue;
}

max_fd= get_slave_sockets(&read_fds);
if (!listener.init_slave_sockets())
goto end;
m_slaves_changed= false;
DBUG_PRINT("info", ("fd count %lu, max_fd %d", (ulong)m_slaves.size(),
max_fd));
}

struct timeval tv= {1, 0};
fds= read_fds;
/* select requires max fd + 1 for the first argument */
ret= select(max_fd+1, &fds, NULL, NULL, &tv);
ret= listener.listen_on_sockets();
if (ret <= 0)
{
mysql_mutex_unlock(&m_mutex);

ret= DBUG_EVALUATE_IF("rpl_semisync_simulate_select_error", -1, ret);

if (ret == -1)
sql_print_information("Failed to select() on semi-sync dump sockets, "
if (ret == -1 && errno != EINTR)
sql_print_information("Failed to wait on semi-sync dump sockets, "
"error: errno=%d", socket_errno);
/* Sleep 1us, so other threads can catch the m_mutex easily. */
my_sleep(1);
Expand All @@ -274,7 +258,7 @@ void Ack_receiver::run()
i= 0;
while (i < m_slaves.size())
{
if (FD_ISSET(m_slaves[i].sock_fd(), &fds))
if (listener.is_socket_active(i))
{
ulong len;

Expand All @@ -286,7 +270,7 @@ void Ack_receiver::run()
repl_semisync_master.reportReplyPacket(m_slaves[i].server_id(),
net.read_pos, len);
else if (net.last_errno == ER_NET_READ_ERROR)
FD_CLR(m_slaves[i].sock_fd(), &read_fds);
listener.clear_socket_info(i);
}
i++;
}
Expand Down
25 changes: 13 additions & 12 deletions sql/semisync_master_ack_receiver.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,19 @@
#include "my_pthread.h"
#include "sql_class.h"
#include "semisync.h"

struct Slave
{
THD *thd;
Vio vio;

my_socket sock_fd() const { return vio.mysql_socket.fd; }
uint server_id() const { return thd->server_id; }
};

typedef std::vector<Slave> Slave_vector;
typedef Slave_vector::iterator Slave_vector_it;

/**
Ack_receiver is responsible to control ack receive thread and maintain
slave information used by ack receive thread.
Expand Down Expand Up @@ -93,17 +106,6 @@ class Ack_receiver : public ReplSemiSyncBase
/* If slave list is updated(add or remove). */
bool m_slaves_changed;

struct Slave
{
THD *thd;
Vio vio;

my_socket sock_fd() { return vio.mysql_socket.fd; }
uint server_id() { return thd->server_id; }
};

typedef std::vector<Slave> Slave_vector;
typedef Slave_vector::iterator Slave_vector_it;
Slave_vector m_slaves;

pthread_t m_pid;
Expand All @@ -114,7 +116,6 @@ class Ack_receiver : public ReplSemiSyncBase

void set_stage_info(const PSI_stage_info &stage);
void wait_for_slave_connection();
my_socket get_slave_sockets(fd_set *fds);
};

extern Ack_receiver ack_receiver;
Expand Down
124 changes: 124 additions & 0 deletions sql/semisync_master_socket_listener.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
/* Copyright (c) 2016 Oracle and/or its affiliates. All rights reserved.
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; version 2 of the License.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */

#ifndef SEMISYNC_MASTER_SOCKET_LISTENER
#define SEMISYNC_MASTER_SOCKET_LISTENER
#include "semisync_master_ack_receiver.h"

#ifdef HAVE_POLL
#include <sys/poll.h>
#include <vector>

class Poll_socket_listener
{
public:
Poll_socket_listener(const Slave_vector &slaves)
:m_slaves(slaves)
{
}

bool listen_on_sockets()
{
return poll(m_fds.data(), m_fds.size(), 1000 /*1 Second timeout*/);
}

bool is_socket_active(int index)
{
return m_fds[index].revents & POLLIN;
}

void clear_socket_info(int index)
{
m_fds[index].fd= -1;
m_fds[index].events= 0;
}

bool init_slave_sockets()
{
m_fds.clear();
for (uint i= 0; i < m_slaves.size(); i++)
{
pollfd poll_fd;
poll_fd.fd= m_slaves[i].sock_fd();
poll_fd.events= POLLIN;
m_fds.push_back(poll_fd);
}
return true;
}

private:
const Slave_vector &m_slaves;
std::vector<pollfd> m_fds;
};

#else //NO POLL

class Select_socket_listener
{
public:
Select_socket_listener(const Slave_vector &slaves)
:m_slaves(slaves), m_max_fd(INVALID_SOCKET)
{
}

bool listen_on_sockets()
{
/* Reinitialze the fds with active fds before calling select */
m_fds= m_init_fds;
struct timeval tv= {1,0};
/* select requires max fd + 1 for the first argument */
return select(m_max_fd+1, &m_fds, NULL, NULL, &tv);
}

bool is_socket_active(int index)
{
return FD_ISSET(m_slaves[index].sock_fd(), &m_fds);
}

void clear_socket_info(int index)
{
FD_CLR(m_slaves[index].sock_fd(), &m_init_fds);
}

bool init_slave_sockets()
{
FD_ZERO(&m_init_fds);
for (uint i= 0; i < m_slaves.size(); i++)
{
my_socket socket_id= m_slaves[i].sock_fd();
m_max_fd= (socket_id > m_max_fd ? socket_id : m_max_fd);
#ifndef WINDOWS
if (socket_id > FD_SETSIZE)
{
sql_print_error("Semisync slave socket fd is %u. "
"select() cannot handle if the socket fd is "
"bigger than %u (FD_SETSIZE).", socket_id, FD_SETSIZE);
return false;
}
#endif //WINDOWS
FD_SET(socket_id, &m_init_fds);
}
return true;
}

private:
const Slave_vector &m_slaves;
my_socket m_max_fd;
fd_set m_init_fds;
fd_set m_fds;
};

#endif //HAVE_POLL
#endif //SEMISYNC_MASTER_SOCKET_LISTENER

0 comments on commit c586f14

Please sign in to comment.