Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ISSUE #3880] Optimize ClientSessionGroupMapping #3881

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
[ISSUE #3880]Optimize ClientSessionGroupMapping
  • Loading branch information
mxsm committed May 7, 2023
commit 0364e2dacf1a3e440f295e9310e9205e4b8c21c5
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@

import java.io.IOException;
import java.io.OutputStream;
import java.net.InetSocketAddress;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

Expand Down Expand Up @@ -73,7 +72,7 @@ public void handle(HttpExchange httpExchange) throws IOException {
log.info("redirectClientByIpPort in admin,ip:{},port:{},destIp:{},destPort:{}====================", ip,
port, destEventMeshIp, destEventMeshPort);
ClientSessionGroupMapping clientSessionGroupMapping = eventMeshTCPServer.getClientSessionGroupMapping();
ConcurrentHashMap<InetSocketAddress, Session> sessionMap = clientSessionGroupMapping.getSessionMap();
ConcurrentHashMap<String, Session> sessionMap = clientSessionGroupMapping.getSessionMap();
StringBuilder redirectResult = new StringBuilder();
try {
if (!sessionMap.isEmpty()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@

import java.io.IOException;
import java.io.OutputStream;
import java.net.InetSocketAddress;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

Expand Down Expand Up @@ -75,7 +74,7 @@ public void handle(HttpExchange httpExchange) throws IOException {
log.info("redirectClientByPath in admin,path:{},destIp:{},destPort:{}====================", path,
destEventMeshIp, destEventMeshPort);
ClientSessionGroupMapping clientSessionGroupMapping = eventMeshTCPServer.getClientSessionGroupMapping();
ConcurrentHashMap<InetSocketAddress, Session> sessionMap = clientSessionGroupMapping.getSessionMap();
ConcurrentHashMap<String, Session> sessionMap = clientSessionGroupMapping.getSessionMap();
StringBuilder redirectResult = new StringBuilder();
try {
if (!sessionMap.isEmpty()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@

import java.io.IOException;
import java.io.OutputStream;
import java.net.InetSocketAddress;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
Expand Down Expand Up @@ -79,7 +78,7 @@ public void handle(final HttpExchange httpExchange) throws IOException {
}

final ClientSessionGroupMapping clientSessionGroupMapping = eventMeshTCPServer.getClientSessionGroupMapping();
final ConcurrentHashMap<InetSocketAddress, Session> sessionMap = clientSessionGroupMapping.getSessionMap();
final ConcurrentHashMap<String, Session> sessionMap = clientSessionGroupMapping.getSessionMap();
final StringBuilder redirectResult = new StringBuilder();
try {
if (!sessionMap.isEmpty()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,14 +61,14 @@ public RejectAllClientHandler(final EventMeshTCPServer eventMeshTCPServer,
public void handle(final HttpExchange httpExchange) throws IOException {
try (OutputStream out = httpExchange.getResponseBody()) {
final ClientSessionGroupMapping clientSessionGroupMapping = eventMeshTCPServer.getClientSessionGroupMapping();
final ConcurrentHashMap<InetSocketAddress, Session> sessionMap = clientSessionGroupMapping.getSessionMap();
final ConcurrentHashMap<String, Session> sessionMap = clientSessionGroupMapping.getSessionMap();
final List<InetSocketAddress> successRemoteAddrs = new ArrayList<>();
try {
if (log.isInfoEnabled()) {
log.info("rejectAllClient in admin====================");
}
if (!sessionMap.isEmpty()) {
for (final Map.Entry<InetSocketAddress, Session> entry : sessionMap.entrySet()) {
for (final Map.Entry<String, Session> entry : sessionMap.entrySet()) {
final InetSocketAddress addr = EventMeshTcp2Client.serverGoodby2Client(
eventMeshTCPServer, entry.getValue(), clientSessionGroupMapping);
if (addr != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,16 +68,17 @@ public void handle(HttpExchange httpExchange) throws IOException {
out.write(result.getBytes(Constants.DEFAULT_CHARSET));
return;
}
String address = ip + ":" + port;
log.info("rejectClientByIpPort in admin,ip:{},port:{}====================", ip, port);
ClientSessionGroupMapping clientSessionGroupMapping = eventMeshTCPServer.getClientSessionGroupMapping();
ConcurrentHashMap<InetSocketAddress, Session> sessionMap = clientSessionGroupMapping.getSessionMap();
ConcurrentHashMap<String, Session> sessionMap = clientSessionGroupMapping.getSessionMap();
final List<InetSocketAddress> successRemoteAddrs = new ArrayList<InetSocketAddress>();
try {
if (!sessionMap.isEmpty()) {
for (Map.Entry<InetSocketAddress, Session> entry : sessionMap.entrySet()) {
if (entry.getKey().getHostString().equals(ip) && String.valueOf(entry.getKey().getPort()).equals(port)) {
InetSocketAddress addr = EventMeshTcp2Client.serverGoodby2Client(eventMeshTCPServer,
entry.getValue(), clientSessionGroupMapping);
for (Map.Entry<String, Session> entry : sessionMap.entrySet()) {
if (StringUtils.equals(entry.getKey(), address)) {
InetSocketAddress addr = EventMeshTcp2Client.serverGoodby2Client(eventMeshTCPServer, entry.getValue(),
clientSessionGroupMapping);
if (addr != null) {
successRemoteAddrs.add(addr);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ public void handle(HttpExchange httpExchange) throws IOException {

log.info("rejectClientBySubSystem in admin,subsys:{}====================", subSystem);
ClientSessionGroupMapping clientSessionGroupMapping = eventMeshTCPServer.getClientSessionGroupMapping();
ConcurrentHashMap<InetSocketAddress, Session> sessionMap = clientSessionGroupMapping.getSessionMap();
ConcurrentHashMap<String, Session> sessionMap = clientSessionGroupMapping.getSessionMap();
final List<InetSocketAddress> successRemoteAddrs = new ArrayList<>();
try {
if (!sessionMap.isEmpty()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@

import java.io.IOException;
import java.io.OutputStream;
import java.net.InetSocketAddress;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

Expand Down Expand Up @@ -68,7 +67,7 @@ public void handle(HttpExchange httpExchange) throws IOException {
log.info("showClientBySubsys,subsys:{}", subSystem);
}
ClientSessionGroupMapping clientSessionGroupMapping = eventMeshTCPServer.getClientSessionGroupMapping();
ConcurrentHashMap<InetSocketAddress, Session> sessionMap = clientSessionGroupMapping.getSessionMap();
ConcurrentHashMap<String, Session> sessionMap = clientSessionGroupMapping.getSessionMap();
if (sessionMap != null && !sessionMap.isEmpty()) {
for (Session session : sessionMap.values()) {
if (session.getClient().getSubsystem().equals(subSystem)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@

import java.io.IOException;
import java.io.OutputStream;
import java.net.InetSocketAddress;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
Expand Down Expand Up @@ -61,7 +60,7 @@ public void handle(HttpExchange httpExchange) throws IOException {

HashMap<String, AtomicInteger> statMap = new HashMap<String, AtomicInteger>();

Map<InetSocketAddress, Session> sessionMap = clientSessionGroupMapping.getSessionMap();
Map<String, Session> sessionMap = clientSessionGroupMapping.getSessionMap();
if (!sessionMap.isEmpty()) {
for (Session session : sessionMap.values()) {
String key = session.getClient().getSubsystem();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,12 @@
import org.apache.eventmesh.runtime.core.protocol.tcp.client.group.ClientSessionGroupMapping;
import org.apache.eventmesh.runtime.core.protocol.tcp.client.session.Session;

import org.apache.commons.lang3.StringUtils;

import java.io.IOException;
import java.io.OutputStream;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -84,14 +85,12 @@ void delete(HttpExchange httpExchange) throws IOException {
try {
String request = HttpExchangeUtils.streamToString(httpExchange.getRequestBody());
DeleteTCPClientRequest deleteTCPClientRequest = JsonUtils.parseObject(request, DeleteTCPClientRequest.class);
String host = deleteTCPClientRequest.getHost();
int port = deleteTCPClientRequest.getPort();

ClientSessionGroupMapping clientSessionGroupMapping = eventMeshTCPServer.getClientSessionGroupMapping();
ConcurrentHashMap<InetSocketAddress, Session> sessionMap = clientSessionGroupMapping.getSessionMap();
ConcurrentHashMap<String, Session> sessionMap = clientSessionGroupMapping.getSessionMap();
if (!sessionMap.isEmpty()) {
for (Map.Entry<InetSocketAddress, Session> entry : sessionMap.entrySet()) {
if (entry.getKey().getHostString().equals(host) && entry.getKey().getPort() == port) {
for (Map.Entry<String, Session> entry : sessionMap.entrySet()) {
if (StringUtils.equals(entry.getKey(), deleteTCPClientRequest.toString())) {
EventMeshTcp2Client.serverGoodby2Client(
eventMeshTCPServer,
entry.getValue(),
Expand All @@ -100,7 +99,6 @@ void delete(HttpExchange httpExchange) throws IOException {
}
}
}

httpExchange.getResponseHeaders().add("Access-Control-Allow-Origin", "*");
httpExchange.sendResponseHeaders(200, 0);
} catch (Exception e) {
Expand Down Expand Up @@ -136,7 +134,7 @@ void list(HttpExchange httpExchange) throws IOException {
try {
// Get the list of TCP clients
ClientSessionGroupMapping clientSessionGroupMapping = eventMeshTCPServer.getClientSessionGroupMapping();
Map<InetSocketAddress, Session> sessionMap = clientSessionGroupMapping.getSessionMap();
Map<String, Session> sessionMap = clientSessionGroupMapping.getSessionMap();
List<GetClientResponse> getClientResponseList = new ArrayList<>();
for (Session session : sessionMap.values()) {
UserAgent userAgent = session.getClient();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,11 @@
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;

import lombok.Data;
import lombok.Getter;
import lombok.Setter;

@Data
@Getter
@Setter
public class DeleteTCPClientRequest {

private String host;
Expand All @@ -40,4 +42,9 @@ public DeleteTCPClientRequest(
this.host = host;
this.port = port;
}

@Override
public String toString() {
return host + ":" + port;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@
import org.apache.commons.collections4.MapUtils;

import java.lang.ref.WeakReference;
import java.net.InetSocketAddress;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
Expand All @@ -56,13 +55,11 @@ public class ClientSessionGroupMapping {

private static final Logger SESSION_LOGGER = LoggerFactory.getLogger("sessionLogger");

private final ConcurrentHashMap<InetSocketAddress, Session> sessionTable = new ConcurrentHashMap<>();
private final ConcurrentHashMap<String/*ip:port*/, Session> sessionTable = new ConcurrentHashMap<>(64);

Comment on lines 57 to 59
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

May I ask that what's the main benefit of changing InetSocketAddress to String?

private final ConcurrentHashMap<String /** subsystem eg . 5109 or 5109-1A0 */, ClientGroupWrapper> clientGroupMap =
new ConcurrentHashMap<String, ClientGroupWrapper>();
private final ConcurrentHashMap<String /** subsystem eg . 5109 or 5109-1A0 */, ClientGroupWrapper> clientGroupMap = new ConcurrentHashMap<>(64);

private final ConcurrentHashMap<String /** subsystem eg . 5109 or 5109-1A0 */, Object> lockMap =
new ConcurrentHashMap<String, Object>();
private final ConcurrentHashMap<String /** subsystem eg . 5109 or 5109-1A0 */, Object> lockMap = new ConcurrentHashMap<>(64);

private EventMeshTCPServer eventMeshTCPServer;

Expand All @@ -83,27 +80,31 @@ public ClientGroupWrapper getClientGroupWrapper(String sysId) {
}

public Session getSession(ChannelHandlerContext ctx) {
return getSession((InetSocketAddress) ctx.channel().remoteAddress());
return getSession(RemotingHelper.parseChannelRemoteAddr(ctx.channel()));
}

public Session getSession(InetSocketAddress address) {
public Session getSession(String address) {
return sessionTable.get(address);
}

public Session createSession(UserAgent user, ChannelHandlerContext ctx) throws Exception {
InetSocketAddress addr = (InetSocketAddress) ctx.channel().remoteAddress();
user.setHost(addr.getHostString());
user.setPort(addr.getPort());
public Session getOrCreateSession(UserAgent user, ChannelHandlerContext ctx) throws Exception {
String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
String[] split = remoteAddress.split(":");
if (Objects.isNull(split) || split.length != 2) {
throw new Exception("Parse channel remote address ettor");
}
user.setHost(split[0]);
user.setPort(Integer.parseInt(split[1]));
Session session;
if (!sessionTable.containsKey(addr)) {
log.info("createSession client[{}]", RemotingHelper.parseChannelRemoteAddr(ctx.channel()));
if (!sessionTable.containsKey(remoteAddress)) {
log.info("createSession client[{}]", remoteAddress);
session = new Session(user, ctx, eventMeshTCPServer.getEventMeshTCPConfiguration());
initClientGroupWrapper(user, session);
sessionTable.put(addr, session);
SESSION_LOGGER.info("session|open|succeed|user={}", user);
sessionTable.put(remoteAddress, session);
SESSION_LOGGER.info("session|open|succeed|user={},client={}", user, remoteAddress);
} else {
session = sessionTable.get(addr);
SESSION_LOGGER.error("session|open|failed|user={}|msg={}", user, "session has been created!");
session = sessionTable.get(remoteAddress);
SESSION_LOGGER.warn("session|open|failed|user={}|msg=session has been created!", user);
}
return session;
}
Expand All @@ -117,38 +118,36 @@ public void readySession(Session session) throws Exception {

public synchronized void closeSession(ChannelHandlerContext ctx) throws Exception {

InetSocketAddress addr = (InetSocketAddress) ctx.channel().remoteAddress();
Session session = MapUtils.getObject(sessionTable, addr, null);
String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
Session session = MapUtils.getObject(sessionTable, remoteAddress, null);
if (session == null) {
final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
log.info("begin to close channel to remote address[{}]", remoteAddress);
ctx.channel().close().addListener(
(ChannelFutureListener) future -> log.info("close the connection to remote address[{}] result: {}", remoteAddress,
future.isSuccess()));
SESSION_LOGGER.info("session|close|succeed|address={}|msg={}", addr, "no session was found");
SESSION_LOGGER.warn("session|close|succeed|address={}|msg=no session was found", remoteAddress);
return;
}

closeSession(session);

//remove session from sessionTable
sessionTable.remove(addr);
sessionTable.remove(remoteAddress);

SESSION_LOGGER.info("session|close|succeed|user={}", session.getClient());
}

private void closeSession(Session session) throws Exception {
final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(session.getContext().channel());
if (SessionState.CLOSED == session.getSessionState()) {
log.info("session has been closed, addr:{}", remoteAddress);
log.warn("session has been closed, addr:{}", remoteAddress);
return;
}

//session must be synchronized to avoid SessionState be confound, for example adding subscribe when session closing
synchronized (session) {

if (SessionState.CLOSED == session.getSessionState()) {
log.info("session has been closed in sync, addr:{}", remoteAddress);
log.warn("session has been closed in sync, addr:{}", remoteAddress);
return;
}

Expand Down Expand Up @@ -304,15 +303,12 @@ private void handleUnackMsgsInSession(Session session) {
continue;
}
Session reChooseSession = clientGroupWrapper.getDownstreamDispatchStrategy()
.select(clientGroupWrapper.getGroup(),
downStreamMsgContext.event.getSubject(),
clientGroupWrapper.groupConsumerSessions);
.select(clientGroupWrapper.getGroup(), downStreamMsgContext.event.getSubject(), clientGroupWrapper.groupConsumerSessions);
if (reChooseSession != null) {
downStreamMsgContext.setSession(reChooseSession);
reChooseSession.getPusher().unAckMsg(downStreamMsgContext.seq, downStreamMsgContext);
reChooseSession.downstreamMsg(downStreamMsgContext);
log.info("rePush msg form unAckMsgs,seq:{},rePushClient:{}", entry.getKey(),
downStreamMsgContext.getSession().getClient());
log.info("rePush msg form unAckMsgs,seq:{},rePushClient:{}", entry.getKey(), downStreamMsgContext.getSession().getClient());
} else {
log.warn("select session fail in handleUnackMsgsInSession,seq:{},topic:{}", entry.getKey(),
downStreamMsgContext.event.getSubject());
Expand Down Expand Up @@ -378,7 +374,6 @@ private void initSessionCleaner() {
private void initDownStreamMsgContextCleaner() {
eventMeshTCPServer.getScheduler().scheduleAtFixedRate(
() -> {

//scan non-broadcast msg
for (Session tmp : sessionTable.values()) {
for (Map.Entry<String, DownStreamMsgContext> entry : tmp.getPusher().getUnAckMsg().entrySet()) {
Expand Down Expand Up @@ -443,30 +438,14 @@ public void shutdown() throws Exception {
log.info("ClientSessionGroupMapping shutdown......");
}

public ConcurrentHashMap<InetSocketAddress, Session> getSessionMap() {
public ConcurrentHashMap<String, Session> getSessionMap() {
return sessionTable;
}

public ConcurrentHashMap<String, ClientGroupWrapper> getClientGroupMap() {
return clientGroupMap;
}

public Map<String, Map<String, Integer>> prepareEventMeshClientDistributionData() {
Map<String, Map<String, Integer>> result = null;

if (!clientGroupMap.isEmpty()) {
result = new HashMap<>();
for (Map.Entry<String, ClientGroupWrapper> entry : clientGroupMap.entrySet()) {
Map<String, Integer> map = new HashMap<>();
map.put(EventMeshConstants.PURPOSE_SUB, entry.getValue().getGroupConsumerSessions().size());
map.put(EventMeshConstants.PURPOSE_PUB, entry.getValue().getGroupProducerSessions().size());
result.put(entry.getKey(), map);
}
}

return result;
}

public Map<String, Map<String, Integer>> prepareProxyClientDistributionData() {
Map<String, Map<String, Integer>> result = null;

Expand Down