From 0364e2dacf1a3e440f295e9310e9205e4b8c21c5 Mon Sep 17 00:00:00 2001 From: mxsm Date: Sun, 7 May 2023 21:23:55 +0800 Subject: [PATCH] [ISSUE #3880]Optimize ClientSessionGroupMapping --- .../RedirectClientByIpPortHandler.java | 3 +- .../handler/RedirectClientByPathHandler.java | 3 +- .../RedirectClientBySubSystemHandler.java | 3 +- .../admin/handler/RejectAllClientHandler.java | 4 +- .../handler/RejectClientByIpPortHandler.java | 11 +-- .../RejectClientBySubSystemHandler.java | 2 +- .../handler/ShowClientBySystemHandler.java | 3 +- .../admin/handler/ShowClientHandler.java | 3 +- .../admin/handler/TCPClientHandler.java | 14 ++-- .../admin/request/DeleteTCPClientRequest.java | 11 ++- .../group/ClientSessionGroupMapping.java | 77 +++++++------------ .../protocol/tcp/client/task/HelloTask.java | 23 +++--- .../metrics/tcp/EventMeshTcpMonitor.java | 4 +- .../RedirectClientByPathHandlerTest.java | 5 +- 14 files changed, 69 insertions(+), 97 deletions(-) diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/admin/handler/RedirectClientByIpPortHandler.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/admin/handler/RedirectClientByIpPortHandler.java index b886e8b62d..867d2896e9 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/admin/handler/RedirectClientByIpPortHandler.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/admin/handler/RedirectClientByIpPortHandler.java @@ -31,7 +31,6 @@ import java.io.IOException; import java.io.OutputStream; -import java.net.InetSocketAddress; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @@ -73,7 +72,7 @@ public void handle(HttpExchange httpExchange) throws IOException { log.info("redirectClientByIpPort in admin,ip:{},port:{},destIp:{},destPort:{}====================", ip, port, destEventMeshIp, destEventMeshPort); ClientSessionGroupMapping clientSessionGroupMapping = eventMeshTCPServer.getClientSessionGroupMapping(); - ConcurrentHashMap sessionMap = clientSessionGroupMapping.getSessionMap(); + ConcurrentHashMap sessionMap = clientSessionGroupMapping.getSessionMap(); StringBuilder redirectResult = new StringBuilder(); try { if (!sessionMap.isEmpty()) { diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/admin/handler/RedirectClientByPathHandler.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/admin/handler/RedirectClientByPathHandler.java index a4cbaf1662..7efb18bb25 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/admin/handler/RedirectClientByPathHandler.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/admin/handler/RedirectClientByPathHandler.java @@ -31,7 +31,6 @@ import java.io.IOException; import java.io.OutputStream; -import java.net.InetSocketAddress; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @@ -75,7 +74,7 @@ public void handle(HttpExchange httpExchange) throws IOException { log.info("redirectClientByPath in admin,path:{},destIp:{},destPort:{}====================", path, destEventMeshIp, destEventMeshPort); ClientSessionGroupMapping clientSessionGroupMapping = eventMeshTCPServer.getClientSessionGroupMapping(); - ConcurrentHashMap sessionMap = clientSessionGroupMapping.getSessionMap(); + ConcurrentHashMap sessionMap = clientSessionGroupMapping.getSessionMap(); StringBuilder redirectResult = new StringBuilder(); try { if (!sessionMap.isEmpty()) { diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/admin/handler/RedirectClientBySubSystemHandler.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/admin/handler/RedirectClientBySubSystemHandler.java index 42ecf69d6c..ac3e25ecd3 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/admin/handler/RedirectClientBySubSystemHandler.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/admin/handler/RedirectClientBySubSystemHandler.java @@ -31,7 +31,6 @@ import java.io.IOException; import java.io.OutputStream; -import java.net.InetSocketAddress; import java.util.Map; import java.util.Objects; import java.util.concurrent.ConcurrentHashMap; @@ -79,7 +78,7 @@ public void handle(final HttpExchange httpExchange) throws IOException { } final ClientSessionGroupMapping clientSessionGroupMapping = eventMeshTCPServer.getClientSessionGroupMapping(); - final ConcurrentHashMap sessionMap = clientSessionGroupMapping.getSessionMap(); + final ConcurrentHashMap sessionMap = clientSessionGroupMapping.getSessionMap(); final StringBuilder redirectResult = new StringBuilder(); try { if (!sessionMap.isEmpty()) { diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/admin/handler/RejectAllClientHandler.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/admin/handler/RejectAllClientHandler.java index 449c1033a4..92894d4976 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/admin/handler/RejectAllClientHandler.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/admin/handler/RejectAllClientHandler.java @@ -61,14 +61,14 @@ public RejectAllClientHandler(final EventMeshTCPServer eventMeshTCPServer, public void handle(final HttpExchange httpExchange) throws IOException { try (OutputStream out = httpExchange.getResponseBody()) { final ClientSessionGroupMapping clientSessionGroupMapping = eventMeshTCPServer.getClientSessionGroupMapping(); - final ConcurrentHashMap sessionMap = clientSessionGroupMapping.getSessionMap(); + final ConcurrentHashMap sessionMap = clientSessionGroupMapping.getSessionMap(); final List successRemoteAddrs = new ArrayList<>(); try { if (log.isInfoEnabled()) { log.info("rejectAllClient in admin===================="); } if (!sessionMap.isEmpty()) { - for (final Map.Entry entry : sessionMap.entrySet()) { + for (final Map.Entry entry : sessionMap.entrySet()) { final InetSocketAddress addr = EventMeshTcp2Client.serverGoodby2Client( eventMeshTCPServer, entry.getValue(), clientSessionGroupMapping); if (addr != null) { diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/admin/handler/RejectClientByIpPortHandler.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/admin/handler/RejectClientByIpPortHandler.java index 77722daa81..8a89ef8dbe 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/admin/handler/RejectClientByIpPortHandler.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/admin/handler/RejectClientByIpPortHandler.java @@ -68,16 +68,17 @@ public void handle(HttpExchange httpExchange) throws IOException { out.write(result.getBytes(Constants.DEFAULT_CHARSET)); return; } + String address = ip + ":" + port; log.info("rejectClientByIpPort in admin,ip:{},port:{}====================", ip, port); ClientSessionGroupMapping clientSessionGroupMapping = eventMeshTCPServer.getClientSessionGroupMapping(); - ConcurrentHashMap sessionMap = clientSessionGroupMapping.getSessionMap(); + ConcurrentHashMap sessionMap = clientSessionGroupMapping.getSessionMap(); final List successRemoteAddrs = new ArrayList(); try { if (!sessionMap.isEmpty()) { - for (Map.Entry entry : sessionMap.entrySet()) { - if (entry.getKey().getHostString().equals(ip) && String.valueOf(entry.getKey().getPort()).equals(port)) { - InetSocketAddress addr = EventMeshTcp2Client.serverGoodby2Client(eventMeshTCPServer, - entry.getValue(), clientSessionGroupMapping); + for (Map.Entry entry : sessionMap.entrySet()) { + if (StringUtils.equals(entry.getKey(), address)) { + InetSocketAddress addr = EventMeshTcp2Client.serverGoodby2Client(eventMeshTCPServer, entry.getValue(), + clientSessionGroupMapping); if (addr != null) { successRemoteAddrs.add(addr); } diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/admin/handler/RejectClientBySubSystemHandler.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/admin/handler/RejectClientBySubSystemHandler.java index d7729a8f0a..82c3dd34d4 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/admin/handler/RejectClientBySubSystemHandler.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/admin/handler/RejectClientBySubSystemHandler.java @@ -86,7 +86,7 @@ public void handle(HttpExchange httpExchange) throws IOException { log.info("rejectClientBySubSystem in admin,subsys:{}====================", subSystem); ClientSessionGroupMapping clientSessionGroupMapping = eventMeshTCPServer.getClientSessionGroupMapping(); - ConcurrentHashMap sessionMap = clientSessionGroupMapping.getSessionMap(); + ConcurrentHashMap sessionMap = clientSessionGroupMapping.getSessionMap(); final List successRemoteAddrs = new ArrayList<>(); try { if (!sessionMap.isEmpty()) { diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/admin/handler/ShowClientBySystemHandler.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/admin/handler/ShowClientBySystemHandler.java index 58168a0d4c..c1519c2f9f 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/admin/handler/ShowClientBySystemHandler.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/admin/handler/ShowClientBySystemHandler.java @@ -29,7 +29,6 @@ import java.io.IOException; import java.io.OutputStream; -import java.net.InetSocketAddress; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @@ -68,7 +67,7 @@ public void handle(HttpExchange httpExchange) throws IOException { log.info("showClientBySubsys,subsys:{}", subSystem); } ClientSessionGroupMapping clientSessionGroupMapping = eventMeshTCPServer.getClientSessionGroupMapping(); - ConcurrentHashMap sessionMap = clientSessionGroupMapping.getSessionMap(); + ConcurrentHashMap sessionMap = clientSessionGroupMapping.getSessionMap(); if (sessionMap != null && !sessionMap.isEmpty()) { for (Session session : sessionMap.values()) { if (session.getClient().getSubsystem().equals(subSystem)) { diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/admin/handler/ShowClientHandler.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/admin/handler/ShowClientHandler.java index cbfa2a2bf2..a6cf07ccd7 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/admin/handler/ShowClientHandler.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/admin/handler/ShowClientHandler.java @@ -27,7 +27,6 @@ import java.io.IOException; import java.io.OutputStream; -import java.net.InetSocketAddress; import java.util.HashMap; import java.util.Map; import java.util.concurrent.atomic.AtomicInteger; @@ -61,7 +60,7 @@ public void handle(HttpExchange httpExchange) throws IOException { HashMap statMap = new HashMap(); - Map sessionMap = clientSessionGroupMapping.getSessionMap(); + Map sessionMap = clientSessionGroupMapping.getSessionMap(); if (!sessionMap.isEmpty()) { for (Session session : sessionMap.values()) { String key = session.getClient().getSubsystem(); diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/admin/handler/TCPClientHandler.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/admin/handler/TCPClientHandler.java index 28fcca9e02..b26b693930 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/admin/handler/TCPClientHandler.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/admin/handler/TCPClientHandler.java @@ -31,11 +31,12 @@ import org.apache.eventmesh.runtime.core.protocol.tcp.client.group.ClientSessionGroupMapping; import org.apache.eventmesh.runtime.core.protocol.tcp.client.session.Session; +import org.apache.commons.lang3.StringUtils; + import java.io.IOException; import java.io.OutputStream; import java.io.PrintWriter; import java.io.StringWriter; -import java.net.InetSocketAddress; import java.util.ArrayList; import java.util.List; import java.util.Map; @@ -84,14 +85,12 @@ void delete(HttpExchange httpExchange) throws IOException { try { String request = HttpExchangeUtils.streamToString(httpExchange.getRequestBody()); DeleteTCPClientRequest deleteTCPClientRequest = JsonUtils.parseObject(request, DeleteTCPClientRequest.class); - String host = deleteTCPClientRequest.getHost(); - int port = deleteTCPClientRequest.getPort(); ClientSessionGroupMapping clientSessionGroupMapping = eventMeshTCPServer.getClientSessionGroupMapping(); - ConcurrentHashMap sessionMap = clientSessionGroupMapping.getSessionMap(); + ConcurrentHashMap sessionMap = clientSessionGroupMapping.getSessionMap(); if (!sessionMap.isEmpty()) { - for (Map.Entry entry : sessionMap.entrySet()) { - if (entry.getKey().getHostString().equals(host) && entry.getKey().getPort() == port) { + for (Map.Entry entry : sessionMap.entrySet()) { + if (StringUtils.equals(entry.getKey(), deleteTCPClientRequest.toString())) { EventMeshTcp2Client.serverGoodby2Client( eventMeshTCPServer, entry.getValue(), @@ -100,7 +99,6 @@ void delete(HttpExchange httpExchange) throws IOException { } } } - httpExchange.getResponseHeaders().add("Access-Control-Allow-Origin", "*"); httpExchange.sendResponseHeaders(200, 0); } catch (Exception e) { @@ -136,7 +134,7 @@ void list(HttpExchange httpExchange) throws IOException { try { // Get the list of TCP clients ClientSessionGroupMapping clientSessionGroupMapping = eventMeshTCPServer.getClientSessionGroupMapping(); - Map sessionMap = clientSessionGroupMapping.getSessionMap(); + Map sessionMap = clientSessionGroupMapping.getSessionMap(); List getClientResponseList = new ArrayList<>(); for (Session session : sessionMap.values()) { UserAgent userAgent = session.getClient(); diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/admin/request/DeleteTCPClientRequest.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/admin/request/DeleteTCPClientRequest.java index c122f04e62..bd9f324976 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/admin/request/DeleteTCPClientRequest.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/admin/request/DeleteTCPClientRequest.java @@ -22,9 +22,11 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; -import lombok.Data; +import lombok.Getter; +import lombok.Setter; -@Data +@Getter +@Setter public class DeleteTCPClientRequest { private String host; @@ -40,4 +42,9 @@ public DeleteTCPClientRequest( this.host = host; this.port = port; } + + @Override + public String toString() { + return host + ":" + port; + } } diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/group/ClientSessionGroupMapping.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/group/ClientSessionGroupMapping.java index a0c0880088..5c81d37221 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/group/ClientSessionGroupMapping.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/group/ClientSessionGroupMapping.java @@ -36,7 +36,6 @@ import org.apache.commons.collections4.MapUtils; import java.lang.ref.WeakReference; -import java.net.InetSocketAddress; import java.util.HashMap; import java.util.Map; import java.util.Objects; @@ -56,13 +55,11 @@ public class ClientSessionGroupMapping { private static final Logger SESSION_LOGGER = LoggerFactory.getLogger("sessionLogger"); - private final ConcurrentHashMap sessionTable = new ConcurrentHashMap<>(); + private final ConcurrentHashMap sessionTable = new ConcurrentHashMap<>(64); - private final ConcurrentHashMap clientGroupMap = - new ConcurrentHashMap(); + private final ConcurrentHashMap clientGroupMap = new ConcurrentHashMap<>(64); - private final ConcurrentHashMap lockMap = - new ConcurrentHashMap(); + private final ConcurrentHashMap lockMap = new ConcurrentHashMap<>(64); private EventMeshTCPServer eventMeshTCPServer; @@ -83,27 +80,31 @@ public ClientGroupWrapper getClientGroupWrapper(String sysId) { } public Session getSession(ChannelHandlerContext ctx) { - return getSession((InetSocketAddress) ctx.channel().remoteAddress()); + return getSession(RemotingHelper.parseChannelRemoteAddr(ctx.channel())); } - public Session getSession(InetSocketAddress address) { + public Session getSession(String address) { return sessionTable.get(address); } - public Session createSession(UserAgent user, ChannelHandlerContext ctx) throws Exception { - InetSocketAddress addr = (InetSocketAddress) ctx.channel().remoteAddress(); - user.setHost(addr.getHostString()); - user.setPort(addr.getPort()); + public Session getOrCreateSession(UserAgent user, ChannelHandlerContext ctx) throws Exception { + String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel()); + String[] split = remoteAddress.split(":"); + if (Objects.isNull(split) || split.length != 2) { + throw new Exception("Parse channel remote address ettor"); + } + user.setHost(split[0]); + user.setPort(Integer.parseInt(split[1])); Session session; - if (!sessionTable.containsKey(addr)) { - log.info("createSession client[{}]", RemotingHelper.parseChannelRemoteAddr(ctx.channel())); + if (!sessionTable.containsKey(remoteAddress)) { + log.info("createSession client[{}]", remoteAddress); session = new Session(user, ctx, eventMeshTCPServer.getEventMeshTCPConfiguration()); initClientGroupWrapper(user, session); - sessionTable.put(addr, session); - SESSION_LOGGER.info("session|open|succeed|user={}", user); + sessionTable.put(remoteAddress, session); + SESSION_LOGGER.info("session|open|succeed|user={},client={}", user, remoteAddress); } else { - session = sessionTable.get(addr); - SESSION_LOGGER.error("session|open|failed|user={}|msg={}", user, "session has been created!"); + session = sessionTable.get(remoteAddress); + SESSION_LOGGER.warn("session|open|failed|user={}|msg=session has been created!", user); } return session; } @@ -117,22 +118,20 @@ public void readySession(Session session) throws Exception { public synchronized void closeSession(ChannelHandlerContext ctx) throws Exception { - InetSocketAddress addr = (InetSocketAddress) ctx.channel().remoteAddress(); - Session session = MapUtils.getObject(sessionTable, addr, null); + String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel()); + Session session = MapUtils.getObject(sessionTable, remoteAddress, null); if (session == null) { - final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel()); log.info("begin to close channel to remote address[{}]", remoteAddress); ctx.channel().close().addListener( (ChannelFutureListener) future -> log.info("close the connection to remote address[{}] result: {}", remoteAddress, future.isSuccess())); - SESSION_LOGGER.info("session|close|succeed|address={}|msg={}", addr, "no session was found"); + SESSION_LOGGER.warn("session|close|succeed|address={}|msg=no session was found", remoteAddress); return; } - closeSession(session); //remove session from sessionTable - sessionTable.remove(addr); + sessionTable.remove(remoteAddress); SESSION_LOGGER.info("session|close|succeed|user={}", session.getClient()); } @@ -140,7 +139,7 @@ public synchronized void closeSession(ChannelHandlerContext ctx) throws Exceptio private void closeSession(Session session) throws Exception { final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(session.getContext().channel()); if (SessionState.CLOSED == session.getSessionState()) { - log.info("session has been closed, addr:{}", remoteAddress); + log.warn("session has been closed, addr:{}", remoteAddress); return; } @@ -148,7 +147,7 @@ private void closeSession(Session session) throws Exception { synchronized (session) { if (SessionState.CLOSED == session.getSessionState()) { - log.info("session has been closed in sync, addr:{}", remoteAddress); + log.warn("session has been closed in sync, addr:{}", remoteAddress); return; } @@ -304,15 +303,12 @@ private void handleUnackMsgsInSession(Session session) { continue; } Session reChooseSession = clientGroupWrapper.getDownstreamDispatchStrategy() - .select(clientGroupWrapper.getGroup(), - downStreamMsgContext.event.getSubject(), - clientGroupWrapper.groupConsumerSessions); + .select(clientGroupWrapper.getGroup(), downStreamMsgContext.event.getSubject(), clientGroupWrapper.groupConsumerSessions); if (reChooseSession != null) { downStreamMsgContext.setSession(reChooseSession); reChooseSession.getPusher().unAckMsg(downStreamMsgContext.seq, downStreamMsgContext); reChooseSession.downstreamMsg(downStreamMsgContext); - log.info("rePush msg form unAckMsgs,seq:{},rePushClient:{}", entry.getKey(), - downStreamMsgContext.getSession().getClient()); + log.info("rePush msg form unAckMsgs,seq:{},rePushClient:{}", entry.getKey(), downStreamMsgContext.getSession().getClient()); } else { log.warn("select session fail in handleUnackMsgsInSession,seq:{},topic:{}", entry.getKey(), downStreamMsgContext.event.getSubject()); @@ -378,7 +374,6 @@ private void initSessionCleaner() { private void initDownStreamMsgContextCleaner() { eventMeshTCPServer.getScheduler().scheduleAtFixedRate( () -> { - //scan non-broadcast msg for (Session tmp : sessionTable.values()) { for (Map.Entry entry : tmp.getPusher().getUnAckMsg().entrySet()) { @@ -443,7 +438,7 @@ public void shutdown() throws Exception { log.info("ClientSessionGroupMapping shutdown......"); } - public ConcurrentHashMap getSessionMap() { + public ConcurrentHashMap getSessionMap() { return sessionTable; } @@ -451,22 +446,6 @@ public ConcurrentHashMap getClientGroupMap() { return clientGroupMap; } - public Map> prepareEventMeshClientDistributionData() { - Map> result = null; - - if (!clientGroupMap.isEmpty()) { - result = new HashMap<>(); - for (Map.Entry entry : clientGroupMap.entrySet()) { - Map map = new HashMap<>(); - map.put(EventMeshConstants.PURPOSE_SUB, entry.getValue().getGroupConsumerSessions().size()); - map.put(EventMeshConstants.PURPOSE_PUB, entry.getValue().getGroupProducerSessions().size()); - result.put(entry.getKey(), map); - } - } - - return result; - } - public Map> prepareProxyClientDistributionData() { Map> result = null; diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/task/HelloTask.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/task/HelloTask.java index e024143cc2..9d2d5632bb 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/task/HelloTask.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/task/HelloTask.java @@ -40,7 +40,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandlerContext; @@ -87,26 +86,22 @@ public void run() { } validateUserAgent(user); - session = eventMeshTCPServer.getClientSessionGroupMapping().createSession(user, ctx); - res.setHeader(new Header(HELLO_RESPONSE, OPStatus.SUCCESS.getCode(), OPStatus.SUCCESS.getDesc(), - pkg.getHeader().getSeq())); + session = eventMeshTCPServer.getClientSessionGroupMapping().getOrCreateSession(user, ctx); + res.setHeader(new Header(HELLO_RESPONSE, OPStatus.SUCCESS.getCode(), OPStatus.SUCCESS.getDesc(), pkg.getHeader().getSeq())); Utils.writeAndFlush(res, startTime, taskExecuteTime, session.getContext(), session); } catch (Throwable e) { MESSAGE_LOGGER.error("HelloTask failed|address={},errMsg={}", ctx.channel().remoteAddress(), e); res.setHeader(new Header(HELLO_RESPONSE, OPStatus.FAIL.getCode(), Arrays.toString(e.getStackTrace()), pkg .getHeader().getSeq())); ctx.writeAndFlush(res).addListener( - new ChannelFutureListener() { - @Override - public void operationComplete(ChannelFuture future) throws Exception { - if (!future.isSuccess()) { - Utils.logFailedMessageFlow(future, res, user, startTime, taskExecuteTime); - } else { - Utils.logSucceedMessageFlow(res, user, startTime, taskExecuteTime); - } - log.warn("HelloTask failed,close session,addr:{}", ctx.channel().remoteAddress()); - eventMeshTCPServer.getClientSessionGroupMapping().closeSession(ctx); + (ChannelFutureListener) future -> { + if (!future.isSuccess()) { + Utils.logFailedMessageFlow(future, res, user, startTime, taskExecuteTime); + } else { + Utils.logSucceedMessageFlow(res, user, startTime, taskExecuteTime); } + log.warn("HelloTask failed,close session,addr:{}", ctx.channel().remoteAddress()); + eventMeshTCPServer.getClientSessionGroupMapping().closeSession(ctx); } ); } diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/metrics/tcp/EventMeshTcpMonitor.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/metrics/tcp/EventMeshTcpMonitor.java index 8800337a07..1c68578ba5 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/metrics/tcp/EventMeshTcpMonitor.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/metrics/tcp/EventMeshTcpMonitor.java @@ -24,7 +24,6 @@ import org.apache.eventmesh.runtime.core.protocol.tcp.client.session.Session; import org.apache.eventmesh.runtime.metrics.MonitorMetricConstants; -import java.net.InetSocketAddress; import java.util.HashSet; import java.util.Iterator; import java.util.List; @@ -102,8 +101,7 @@ public void start() throws Exception { tcpSummaryMetrics.setMq2eventMeshTPS((int) 1000.0d * msgNum / period); //count topics subscribed by client in this eventMesh - ConcurrentHashMap sessionMap = - eventMeshTCPServer.getClientSessionGroupMapping().getSessionMap(); + ConcurrentHashMap sessionMap = eventMeshTCPServer.getClientSessionGroupMapping().getSessionMap(); Iterator sessionIterator = sessionMap.values().iterator(); Set topicSet = new HashSet<>(); while (sessionIterator.hasNext()) { diff --git a/eventmesh-runtime/src/test/java/org/apache/eventmesh/runtime/admin/handler/RedirectClientByPathHandlerTest.java b/eventmesh-runtime/src/test/java/org/apache/eventmesh/runtime/admin/handler/RedirectClientByPathHandlerTest.java index c2721423f9..1c03b5dc6d 100644 --- a/eventmesh-runtime/src/test/java/org/apache/eventmesh/runtime/admin/handler/RedirectClientByPathHandlerTest.java +++ b/eventmesh-runtime/src/test/java/org/apache/eventmesh/runtime/admin/handler/RedirectClientByPathHandlerTest.java @@ -40,7 +40,6 @@ import java.io.ByteArrayOutputStream; import java.io.IOException; -import java.net.InetSocketAddress; import java.net.URI; import java.nio.charset.StandardCharsets; import java.util.HashMap; @@ -78,12 +77,12 @@ public void testHandle() throws IOException { RedirectClientByPathHandler redirectClientByPathHandler = new RedirectClientByPathHandler(eventMeshTCPServer, httpHandlerManager); // mock session map - ConcurrentHashMap sessionMap = new ConcurrentHashMap<>(); + ConcurrentHashMap sessionMap = new ConcurrentHashMap<>(); Session session = mock(Session.class); UserAgent agent = mock(UserAgent.class); when(agent.getPath()).thenReturn("path"); when(session.getClient()).thenReturn(agent); - sessionMap.put(new InetSocketAddress(8080), session); + sessionMap.put("127.0.0.1:8080", session); when(mapping.getSessionMap()).thenReturn(sessionMap); // mock uri