Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ISSUE#4419] Add slack sink connector. #4562

Merged
merged 3 commits into from
Nov 20, 2023

Conversation

yanrongzhen
Copy link
Contributor

Fixes #4419 .

Modifications

  • Add slack sink connector.
  • Update documentation.
image

pandaapo
pandaapo previously approved these changes Nov 17, 2023
Copy link

codecov bot commented Nov 17, 2023

Codecov Report

Attention: 29 lines in your changes are missing coverage. Please review.

Comparison is base (ad4ecf3) 17.01% compared to head (1faae3d) 17.04%.

Files Patch % Lines
...ector/slack/sink/connector/SlackSinkConnector.java 57.57% 13 Missing and 1 partial ⚠️
...esh/connector/slack/server/SlackConnectServer.java 0.00% 6 Missing ⚠️
...nnector/slack/sink/config/SinkConnectorConfig.java 0.00% 4 Missing ⚠️
...nnector/slack/config/SlackConnectServerConfig.java 0.00% 3 Missing ⚠️
...h/connector/slack/sink/config/SlackSinkConfig.java 33.33% 2 Missing ⚠️
Additional details and impacted files
@@             Coverage Diff              @@
##             master    #4562      +/-   ##
============================================
+ Coverage     17.01%   17.04%   +0.03%     
- Complexity     1660     1667       +7     
============================================
  Files           761      766       +5     
  Lines         28826    28875      +49     
  Branches       2494     2498       +4     
============================================
+ Hits           4905     4923      +18     
- Misses        23463    23492      +29     
- Partials        458      460       +2     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

@xwm1992
Copy link
Contributor

xwm1992 commented Nov 19, 2023

@yanrongzhen please solve the conflict.

# Conflicts:
#	settings.gradle
@pandaapo pandaapo merged commit afb3a66 into apache:master Nov 20, 2023
13 checks passed
@pandaapo pandaapo mentioned this pull request Nov 20, 2023
3 tasks
@brampurnot
Copy link

@yanrongzhen , I'm trying to setup the Slack connector locally. Do you have an example payload that you used to send a message? I'm currently seeing this when the connector has started:
Screenshot 2023-12-04 at 22 24 09

I'm trying to send a message but not seeing any logs nor receiving any message. I configured the correct apiKey and also made sure the correct scopes are assigned to it. Any help would be appreciated!
Thnx,
Bram

@pandaapo
Copy link
Member

pandaapo commented Dec 5, 2023

@brampurnot Sink connector retrieves data from EventMesh and sends it to a external component, which is referred to as Slack here. So you can try using Spring Source Connector to send some data to EventMesh first. Here is an example of Spring Source Connector sending data, but you need to pay attention to two points. Firstly, you need to configure sinkEnable in server-config.yml to false to close Spring Sink Connector, which can prevent data from being consumed by it. Secondly, pubSubConfiguration. in sink-config.yml of Slack Sink Connector should be the same as the pubSubConfiguration. in source-config.yml file of Spring Source Connector.

If there are still any issues, we look forward to @yanrongzhen 's reply. Or there may be a more convenient way, and we also look forward to @yanrongzhen 's reply.

@pandaapo pandaapo added this to the 1.10 milestone Dec 5, 2023
@brampurnot
Copy link

Thanks @pandaapo for your reply. I assume we can also use the HTTP channel to publish a message view a CloudEvent to event mesh which will be then be picked up by slack, right? Or is this not possible?

@pandaapo
Copy link
Member

pandaapo commented Dec 5, 2023

I assume we can also use the HTTP channel to publish a message view a CloudEvent to event mesh which will be then be picked up by slack, right? Or is this not possible?

@brampurnot This should also be possible, but there may be some requirements for CloudEvent object. This requires a response from the author @yanrongzhen or other members of the community.

@yanrongzhen
Copy link
Contributor Author

This is possible, just need to start the EventMesh runtime and slack sink connector, the messages sent by the HTTP channel will be picked up by slack.
@brampurnot

@brampurnot
Copy link

Ok I'll try to debug it because it's not picking it up. Are the log messages in this screenshot look ok to you? Or do you think something is not right here?
image

I would expect some kind of success message to be printed when it successfully launched and connected but it doesn't do that.

@brampurnot
Copy link

Ok I was able to make it work. Seems like the JAR that gets generated is having issues. I do have another issue but I'll create a new incident for it since I'm having the same issue in the mongoDB connector.

@pandaapo
Copy link
Member

pandaapo commented Dec 5, 2023

@brampurnot

Seems like the JAR that gets generated is having issues.

Could you provide a detailed description of the issue with generating JAR?

I do have another issue but I'll create a new incident for it

Could you also provide a detailed description of this issue?

@brampurnot
Copy link

brampurnot commented Dec 5, 2023

I'm not 100% sure what the problem is. But it basically stops without showing any logs anymore when I run the JAR from the dist/plugin/connectors folder. I basically compiled it from scratch and ran it directly from within VS Code and then it works fine. I still need to figure out what the problem is since I want to run it in a container.

Now the connector is up-and-running and when I try to post a CloudEvent over the HTTP channel, I'm getting a NPE. This is what I see in the event mesh server:

023-12-05 10:56:10,417 INFO  [eventMesh-tcp-worker-3] message(AbstractTCPServer.java:358) - pkg|c2eventMesh|cmd=HEARTBEAT_REQUEST|pkg=org.apache.eventmesh.common.protocol.tcp.Package@200eb500|user=UserAgent{env='PRD', subsystem='5034', group='slackSink', path='/', pid=44210, host='127.0.0.1', port=52420, version='2.0', idc='FT', purpose='sub', unack='0'}
2023-12-05 10:56:10,418 DEBUG [eventMesh-tcp-worker-3] Codec(Codec.java:62) - Encoder pkg={"header":{"cmd":"HEARTBEAT_RESPONSE","code":0,"desc":"success","seq":"6452883240","properties":{},"command":"HEARTBEAT_RESPONSE"}}
2023-12-05 10:56:10,419 INFO  [eventMesh-tcp-worker-3] message(Utils.java:126) - pkg|eventMesh2c|cmd=HEARTBEAT_RESPONSE|pkg=org.apache.eventmesh.common.protocol.tcp.Package@2473a30e|user=UserAgent{env='PRD', subsystem='5034', group='slackSink', path='/', pid=44210, host='127.0.0.1', port=52420, version='2.0', idc='FT', purpose='sub', unack='0'}|wait=1ms|cost=3ms
2023-12-05 10:56:10,426 WARN  [eventMesh-tcpNettyNio-Boss-1] ServerBootstrap(AbstractBootstrap.java:464) - Unknown channel option 'SO_TIMEOUT' for channel '[id: 0xff22decc, L:/127.0.0.1:10000 - R:/127.0.0.1:52462]'
2023-12-05 10:56:10,429 INFO  [eventMesh-tcp-worker-6] AbstractTCPServer(AbstractTCPServer.java:407) - client|tcp|channelRegistered|remoteAddress=127.0.0.1:52462|msg=
2023-12-05 10:56:10,430 INFO  [eventMesh-tcp-worker-6] AbstractTCPServer(AbstractTCPServer.java:421) - client|tcp|channelActive|remoteAddress=127.0.0.1:52462|msg=
2023-12-05 10:56:10,431 DEBUG [eventMesh-tcp-worker-6] Codec(LogUtils.java:90) - Decode headerJson={"cmd":"HELLO_REQUEST","code":0,"seq":"8583524761","properties":{},"command":"HELLO_REQUEST"}
2023-12-05 10:56:10,432 DEBUG [eventMesh-tcp-worker-6] Codec(LogUtils.java:90) - Decode bodyJson={"env":"PRD","subsystem":"5034","path":"/","pid":44210,"host":"localhost","port":8362,"version":"2.0","username":"slackSinkUser","password":"slackPassWord","idc":"FT","group":"slackSink","purpose":"sub","unack":0}
2023-12-05 10:56:10,433 INFO  [eventMesh-tcp-worker-6] message(LogUtils.java:130) - pkg|c2eventMesh|cmd=HELLO_REQUEST|pkg=org.apache.eventmesh.common.protocol.tcp.Package@3308da4f
2023-12-05 10:56:10,433 INFO  [eventMesh-tcp-task-handle-5] ClientSessionGroupMapping(ClientSessionGroupMapping.java:112) - createSession client[127.0.0.1:52462]
2023-12-05 10:56:10,434 INFO  [eventMesh-tcp-task-handle-5] sessionLogger(ClientSessionGroupMapping.java:116) - session|open|succeed|user=UserAgent{env='PRD', subsystem='5034', group='slackSink', path='/', pid=44210, host='127.0.0.1', port=52462, version='2.0', idc='FT', purpose='sub', unack='0'}
2023-12-05 10:56:10,435 DEBUG [eventMesh-tcp-worker-6] Codec(Codec.java:62) - Encoder pkg={"header":{"cmd":"HELLO_RESPONSE","code":0,"desc":"success","seq":"8583524761","properties":{},"command":"HELLO_RESPONSE"}}
2023-12-05 10:56:10,436 INFO  [eventMesh-tcp-worker-6] message(Utils.java:126) - pkg|eventMesh2c|cmd=HELLO_RESPONSE|pkg=org.apache.eventmesh.common.protocol.tcp.Package@5e1bc345|user=UserAgent{env='PRD', subsystem='5034', group='slackSink', path='/', pid=44210, host='127.0.0.1', port=52462, version='2.0', idc='FT', purpose='sub', unack='0'}|wait=1ms|cost=4ms
2023-12-05 10:56:10,440 DEBUG [eventMesh-tcp-worker-6] Codec(LogUtils.java:90) - Decode headerJson={"cmd":"SUBSCRIBE_REQUEST","code":0,"seq":"4607853627","properties":{},"command":"SUBSCRIBE_REQUEST"}
2023-12-05 10:56:10,442 DEBUG [eventMesh-tcp-worker-6] Codec(LogUtils.java:90) - Decode bodyJson={"topicList":[{"topic":"TEST-TOPIC-SLACK","mode":"CLUSTERING","type":"ASYNC"}]}
2023-12-05 10:56:10,444 INFO  [eventMesh-tcp-worker-6] message(AbstractTCPServer.java:358) - pkg|c2eventMesh|cmd=SUBSCRIBE_REQUEST|pkg=org.apache.eventmesh.common.protocol.tcp.Package@2f10e82|user=UserAgent{env='PRD', subsystem='5034', group='slackSink', path='/', pid=44210, host='127.0.0.1', port=52462, version='2.0', idc='FT', purpose='sub', unack='0'}
2023-12-05 10:56:10,446 DEBUG [StandaloneConsumerThread-9] Subscribe(Subscribe.java:55) - execute subscribe task, topic: TEST-TOPIC-SLACK, offset: null
2023-12-05 10:56:10,446 INFO  [eventMesh-tcp-task-handle-6] ClientGroupWrapper(LogUtils.java:136) - Cache session success, group:slackSink topic:TEST-TOPIC-SLACK client:UserAgent{env='PRD', subsystem='5034', group='slackSink', path='/', pid=44210, host='127.0.0.1', port=52462, version='2.0', idc='FT', purpose='sub', unack='0'} sessionId:13ca23fc-7a5b-44bc-9cec-eb8d8b171c98
2023-12-05 10:56:10,446 INFO  [eventMesh-tcp-task-handle-6] subscribeLogger(Session.java:140) - subscribe|succeed|topic=TEST-TOPIC-SLACK|user=UserAgent{env='PRD', subsystem='5034', group='slackSink', path='/', pid=44210, host='127.0.0.1', port=52462, version='2.0', idc='FT', purpose='sub', unack='0'}
2023-12-05 10:56:10,446 WARN  [StandaloneConsumerThread-9] Session(Session.java:282) - session is not available because session has been closed,topic:TEST-TOPIC-SLACK,client:UserAgent{env='PRD', subsystem='5034', group='slackSink', path='/', pid=44210, host='127.0.0.1', port=52443, version='2.0', idc='FT', purpose='sub', unack='0'}
2023-12-05 10:56:10,446 INFO  [eventMesh-tcp-task-handle-6] SubscribeProcessor(LogUtils.java:130) - SubscribeTask succeed|user=UserAgent{env='PRD', subsystem='5034', group='slackSink', path='/', pid=44210, host='127.0.0.1', port=52462, version='2.0', idc='FT', purpose='sub', unack='0'}|topics=[SubscriptionItem{topic=TEST-TOPIC-SLACK, mode=CLUSTERING, type=ASYNC}]
2023-12-05 10:56:10,446 WARN  [StandaloneConsumerThread-9] Session(Session.java:282) - session is not available because session has been closed,topic:TEST-TOPIC-SLACK,client:UserAgent{env='PRD', subsystem='5034', group='slackSink', path='/', pid=44210, host='127.0.0.1', port=52421, version='2.0', idc='FT', purpose='sub', unack='0'}
2023-12-05 10:56:10,446 WARN  [StandaloneConsumerThread-9] Session(Session.java:282) - session is not available because session has been closed,topic:TEST-TOPIC-SLACK,client:UserAgent{env='PRD', subsystem='5034', group='slackSink', path='/', pid=43436, host='127.0.0.1', port=52339, version='2.0', idc='FT', purpose='sub', unack='0'}
2023-12-05 10:56:10,446 WARN  [StandaloneConsumerThread-9] Session(Session.java:282) - session is not available because session has been closed,topic:TEST-TOPIC-SLACK,client:UserAgent{env='PRD', subsystem='5034', group='slackSink', path='/', pid=43642, host='127.0.0.1', port=52392, version='2.0', idc='FT', purpose='sub', unack='0'}
2023-12-05 10:56:10,446 WARN  [StandaloneConsumerThread-9] Session(Session.java:282) - session is not available because session has been closed,topic:TEST-TOPIC-SLACK,client:UserAgent{env='PRD', subsystem='5034', group='slackSink', path='/', pid=43436, host='127.0.0.1', port=52318, version='2.0', idc='FT', purpose='sub', unack='0'}
2023-12-05 10:56:10,447 WARN  [StandaloneConsumerThread-9] Session(Session.java:282) - session is not available because session has been closed,topic:TEST-TOPIC-SLACK,client:UserAgent{env='PRD', subsystem='5034', group='slackSink', path='/', pid=43436, host='127.0.0.1', port=52304, version='2.0', idc='FT', purpose='sub', unack='0'}
2023-12-05 10:56:10,447 WARN  [StandaloneConsumerThread-9] Session(Session.java:282) - session is not available because session has been closed,topic:TEST-TOPIC-SLACK,client:UserAgent{env='PRD', subsystem='5034', group='slackSink', path='/', pid=43436, host='127.0.0.1', port=52266, version='2.0', idc='FT', purpose='sub', unack='0'}
2023-12-05 10:56:10,447 WARN  [StandaloneConsumerThread-9] Session(Session.java:282) - session is not available because session has been closed,topic:TEST-TOPIC-SLACK,client:UserAgent{env='PRD', subsystem='5034', group='slackSink', path='/', pid=43642, host='127.0.0.1', port=52374, version='2.0', idc='FT', purpose='sub', unack='0'}
2023-12-05 10:56:10,447 WARN  [StandaloneConsumerThread-9] FreePriorityDispatchStrategy(LogUtils.java:152) - all sessions can't downstream msg
2023-12-05 10:56:10,447 DEBUG [eventMesh-tcp-worker-6] Codec(Codec.java:62) - Encoder pkg={"header":{"cmd":"SUBSCRIBE_RESPONSE","code":0,"desc":"success","seq":"4607853627","properties":{},"command":"SUBSCRIBE_RESPONSE"}}
2023-12-05 10:56:10,447 WARN  [StandaloneConsumerThread-9] ClientGroupWrapper(ClientGroupWrapper.java:515) - handle msg exception when no session found
java.lang.NullPointerException: null
	at java.util.Objects.requireNonNull(Objects.java:203) ~[?:1.8.0_392]
	at org.apache.eventmesh.runtime.core.protocol.tcp.client.group.ClientGroupWrapper.lambda$initClientGroupPersistentConsumer$0(ClientGroupWrapper.java:483) ~[eventmesh-runtime-1.9.0-release.jar:1.9.0-release]
	at org.apache.eventmesh.storage.standalone.broker.task.Subscribe.subscribe(Subscribe.java:90) [eventmesh-storage-standalone-1.9.0-release.jar:1.9.0-release]
	at org.apache.eventmesh.storage.standalone.broker.task.SubscribeTask.run(SubscribeTask.java:38) [eventmesh-storage-standalone-1.9.0-release.jar:1.9.0-release]
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_392]
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_392]
	at java.lang.Thread.run(Thread.java:750) [?:1.8.0_392]`

And this is the NPE I'm getting in the connector:
`2023-12-05 11:17:41,183 INFO  [main] Application(Application.java:99) - connector slackSink started
2023-12-05 11:17:42,151 DEBUG [nioEventLoopGroup-3-1] Codec(LogUtils.java:90) - Decode headerJson={"cmd":"ASYNC_MESSAGE_TO_CLIENT","code":1,"desc":"[org.apache.eventmesh.runtime.core.protocol.tcp.client.session.push.SessionPusher.push(SessionPusher.java:105), org.apache.eventmesh.runtime.core.protocol.tcp.client.session.Session.downstreamMsg(Session.java:168), org.apache.eventmesh.runtime.core.protocol.tcp.client.group.ClientGroupWrapper.lambda$initClientGroupPersistentConsumer$0(ClientGroupWrapper.java:529), org.apache.eventmesh.storage.standalone.broker.task.Subscribe.subscribe(Subscribe.java:90), org.apache.eventmesh.storage.standalone.broker.task.SubscribeTask.run(SubscribeTask.java:38), java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149), java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624), java.lang.Thread.run(Thread.java:750)]","seq":"10","properties":{},"command":"ASYNC_MESSAGE_TO_CLIENT"}
2023-12-05 11:17:42,157 INFO  [nioEventLoopGroup-3-1] AbstractEventMeshTCPSubHandler(AbstractEventMeshTCPSubHandler.java:48) - |receive|type=ASYNC_MESSAGE_TO_CLIENT|msg=org.apache.eventmesh.common.protocol.tcp.Package@2c16930b
2023-12-05 11:17:42,251 INFO  [nioEventLoopGroup-3-1] TcpClient(LogUtils.java:130) - exceptionCaught, close connection.|remote address=/127.0.0.1:10000
java.lang.NullPointerException: null
        at org.apache.eventmesh.client.tcp.impl.cloudevent.CloudEventTCPSubClient$CloudEventTCPSubHandler.getProtocolMessage(CloudEventTCPSubClient.java:158) ~[main/:?]
        at org.apache.eventmesh.client.tcp.impl.cloudevent.CloudEventTCPSubClient$CloudEventTCPSubHandler.getProtocolMessage(CloudEventTCPSubClient.java:1) ~[main/:?]
        at org.apache.eventmesh.client.tcp.impl.AbstractEventMeshTCPSubHandler.channelRead0(AbstractEventMeshTCPSubHandler.java:55) ~[main/:?]
        at org.apache.eventmesh.client.tcp.impl.AbstractEventMeshTCPSubHandler.channelRead0(AbstractEventMeshTCPSubHandler.java:1) ~[main/:?]
        at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99) ~[netty-transport-4.1.79.Final.jar:4.1.79.Final]
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) [netty-transport-4.1.79.Final.jar:4.1.79.Final]
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) [netty-transport-4.1.79.Final.jar:4.1.79.Final]
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) [netty-transport-4.1.79.Final.jar:4.1.79.Final]
        at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:327) [netty-codec-4.1.79.Final.jar:4.1.79.Final]
        at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:299) [netty-codec-4.1.79.Final.jar:4.1.79.Final]
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) [netty-transport-4.1.79.Final.jar:4.1.79.Final]
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) [netty-transport-4.1.79.Final.jar:4.1.79.Final]
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) [netty-transport-4.1.79.Final.jar:4.1.79.Final]
        at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410) [netty-transport-4.1.79.Final.jar:4.1.79.Final]
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) [netty-transport-4.1.79.Final.jar:4.1.79.Final]
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) [netty-transport-4.1.79.Final.jar:4.1.79.Final]
        at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919) [netty-transport-4.1.79.Final.jar:4.1.79.Final]
        at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166) [netty-transport-4.1.79.Final.jar:4.1.79.Final]
        at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:722) [netty-transport-4.1.79.Final.jar:4.1.79.Final]
        at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:658) [netty-transport-4.1.79.Final.jar:4.1.79.Final]
        at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:584) [netty-transport-4.1.79.Final.jar:4.1.79.Final]
        at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:496) [netty-transport-4.1.79.Final.jar:4.1.79.Final]
        at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997) [netty-common-4.1.79.Final.jar:4.1.79.Final]
        at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) [netty-common-4.1.79.Final.jar:4.1.79.Final]
        at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) [netty-common-4.1.79.Final.jar:4.1.79.Final]
        at java.lang.Thread.run(Thread.java:750) [?:1.8.0_392]

I assume it's related to the message I'm posting but still trying to figure out exactly what is wrong.

Bram

ps: sorry guys I know I'm asking/trying a lot but I'm trying to setup a simple POC :)

@brampurnot
Copy link

brampurnot commented Dec 5, 2023

I'm trying to use the following message body btw:

{
    "id": "1b080838-2976-493d-897f-07803944f4d5",
    "specversion": "1.0",
    "source": "https://workflow-service/workflow-start-event",
    "type": "slack-message",
    "datacontenttype": "application/cloudevents+json",
    "subject": "TEST-TOPIC-SLACK",
    "data": {
        "message": "Test message"
    }
}

@brampurnot
Copy link

Just an FYI - I managed to make it work with the Java SDK. However when posting a CloudEvent directly to the HTTP channel, I'm still getting a NPE.

@Pil0tXia
Copy link
Member

Pil0tXia commented Apr 2, 2024

@brampurnot

Sorry for not responding to your comment for a while. Are you still experiencing any issues? You can create a new issue for your problem, which will make it easier to track its progress.

@brampurnot
Copy link

Sorry for the delay. It's been a while since I looked into this but I was just about to pick it up again. I can confirm we still have the issue.

@brampurnot
Copy link

brampurnot commented May 13, 2024

Here are some log statements:

2024-05-13 12:41:00,395 WARN  [EventMesh-Rabbitmq-Consumer-1] Session(Session.java:287) - session is not available because session has not subscribe topic:eventmesh.default,client:UserAgent{env='PRD', subsystem='5034', group='slackSink', path='/', pid=1, host='10.244.1.184', port=33502, version='2.0', idc='FT', purpose='sub', unack='0'}
2024-05-13 12:41:00,396 WARN  [EventMesh-Rabbitmq-Consumer-1] FreePriorityDispatchStrategy(LogUtils.java:152) - all sessions can't downstream msg
2024-05-13 12:41:00,396 WARN  [EventMesh-Rabbitmq-Consumer-1] ClientGroupWrapper(ClientGroupWrapper.java:515) - handle msg exception when no session found
java.lang.NullPointerException: null
        at java.util.Objects.requireNonNull(Objects.java:203) ~[?:1.8.0_342]
        at org.apache.eventmesh.runtime.core.protocol.tcp.client.group.ClientGroupWrapper.lambda$initClientGroupPersistentConsumer$0(ClientGroupWrapper.java:483) ~[eventmesh-runtime-1.9.0-release.jar:1.9.0-release]
        at org.apache.eventmesh.storage.rabbitmq.consumer.RabbitmqConsumerHandler.run(RabbitmqConsumerHandler.java:64) [eventmesh-storage-rabbitmq-1.9.0-release.jar:1.9.0-release]
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_342]
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_342]
        at java.lang.Thread.run(Thread.java:750) [?:1.8.0_342]
2024-05-13 12:41:00,396 INFO  [EventMesh-Rabbitmq-Consumer-1] RabbitmqConsumerHandler(RabbitmqConsumerHandler.java:60) - [RabbitmqConsumerHandler] Rabbitmq consumer context commit.
2024-05-13 12:41:00,397 DEBUG [EventMesh-Rabbitmq-Consumer-1] TraceUtils(TraceUtils.java:84) - finishSpan with event:CloudEvent{id='cd3c1150-2861-4c84-b39b-1e77ad22e47b', source=source:127.0.0.1:38490, type='http_request', datacontenttype='application/json', subject='eventmesh.default', data=BytesCloudEventData{value=[123, 34, 104, 101, 97, 100, 101, 114, 115, 34, 58, 123, 34, 99, 111, 110, 116, 101, 110, 116, 45, 108, 101, 110, 103, 116, 104, 34, 58, 34, 50, 54, 53, 34, 44, 34, 65, 99, 99, 101, 112, 116, 34, 58, 34, 42, 47, 42, 34, 44, 34, 105, 112, 34, 58, 34, 49, 50, 55, 46, 48, 46, 48, 46, 49, 58, 51, 56, 52, 57, 48, 34, 44, 34, 85, 115, 101, 114, 45, 65, 103, 101, 110, 116, 34, 58, 34, 80, 111, 115, 116, 109, 97, 110, 82, 117, 110, 116, 105, 109, 101, 47, 55, 46, 51, 55, 46, 51, 34, 44, 34, 67, 111, 110, 110, 101, 99, 116, 105, 111, 110, 34, 58, 34, 107, 101, 101, 112, 45, 97, 108, 105, 118, 101, 34, 44, 34, 80, 111, 115, 116, 109, 97, 110, 45, 84, 111, 107, 101, 110, 34, 58, 34, 57, 49, 98, 51, 55, 50, 53, 57, 45, 97, 51, 102, 99, 45, 52, 102, 52, 55, 45, 97, 57, 102, 100, 45, 49, 48, 57, 101, 57, 100, 56, 53, 53, 49, 99, 49, 34, 44, 34, 72, 111, 115, 116, 34, 58, 34, 108, 111, 99, 97, 108, 104, 111, 115, 116, 58, 49, 48, 49, 48, 53, 34, 44, 34, 65, 99, 99, 101, 112, 116, 45, 69, 110, 99, 111, 100, 105, 110, 103, 34, 58, 34, 103, 122, 105, 112, 44, 32, 100, 101, 102, 108, 97, 116, 101, 44, 32, 98, 114, 34, 44, 34, 115, 111, 117, 114, 99, 101, 34, 58, 34, 49, 50, 55, 46, 48, 46, 48, 46, 49, 58, 51, 56, 52, 57, 48, 34, 44, 34, 67, 111, 110, 116, 101, 110, 116, 45, 84, 121, 112, 101, 34, 58, 34, 97, 112, 112, 108, 105, 99, 97, 116, 105, 111, 110, 47, 106, 115, 111, 110, 34, 125, 44, 34, 112, 97, 116, 104, 34, 58, 34, 47, 101, 118, 101, 110, 116, 109, 101, 115, 104, 47, 112, 117, 98, 108, 105, 115, 104, 47, 101, 118, 101, 110, 116, 109, 101, 115, 104, 46, 100, 101, 102, 97, 117, 108, 116, 34, 44, 34, 109, 101, 116, 104, 111, 100, 34, 58, 34, 80, 79, 83, 84, 34, 44, 34, 98, 111, 100, 121, 34, 58, 123, 34, 100, 97, 116, 97, 99, 111, 110, 116, 101, 110, 116, 116, 121, 112, 101, 34, 58, 34, 97, 112, 112, 108, 105, 99, 97, 116, 105, 111, 110, 47, 99, 108, 111, 117, 100, 101, 118, 101, 110, 116, 115, 43, 106, 115, 111, 110, 34, 44, 34, 100, 97, 116, 97, 34, 58, 123, 34, 99, 111, 110, 116, 101, 110, 116, 34, 58, 34, 108, 97, 108, 97, 108, 97, 108, 97, 34, 125, 44, 34, 115, 117, 98, 106, 101, 99, 116, 34, 58, 34, 84, 69, 83, 84, 45, 84, 79, 80, 73, 67, 45, 83, 76, 65, 67, 75, 34, 44, 34, 115, 112, 101, 99, 118, 101, 114, 115, 105, 111, 110, 34, 58, 34, 49, 46, 48, 34, 44, 34, 105, 100, 34, 58, 34, 102, 52, 102, 48, 98, 48, 55, 55, 45, 100, 54, 51, 57, 45, 52, 57, 56, 48, 45, 98, 49, 100, 100, 45, 97, 49, 53, 97, 102, 57, 101, 51, 55, 53, 102, 56, 34, 44, 34, 115, 111, 117, 114, 99, 101, 34, 58, 34, 47, 34, 44, 34, 116, 121, 112, 101, 34, 58, 34, 99, 108, 111, 117, 100, 101, 118, 101, 110, 116, 115, 34, 125, 125]}, extensions={reqeventmesh2mqtimestamp=1715604060386, ip=127.0.0.1:38490, reqreceiveeventmeship=10.244.1.170, protocoldesc=http, idc=idc, pid=63, env=env, sys=1234, ttl=14400000, producergroup=em-http-producer, token=token, consumergroup=em-http-consumer, reqmq2eventmeshtimestamp=1715604060395, passwd=pass, bizseqno=57613610381776707095956240052154, protocoltype=http, msgtype=persistent, uniqueid=23163865692239541432721878505434, username=eventmesh, reqc2eventmeshtimestamp=1715604060386}}
2024-05-13 12:41:03,287 INFO  [eventMesh-tcp-scheduler-3] tcpMonitor(EventMeshTcpMonitor.java:118) - session|deliveredFailCount=0|deliveredMsgsCount=0|unAckMsgsCount=0|sendTopics=0|subscribeTopics=1|user=UserAgent{env='PRD', subsystem='5034', group='slackSink', path='/', pid=1, host='10.244.1.184', port=33502, version='2.0', idc='FT', purpose='sub', unack='0'}
2024-05-13 12:41:03,288 INFO  [eventMesh-tcp-scheduler-3] tcpMonitor(EventMeshTcpMonitor.java:118) - session|deliveredFailCount=0|deliveredMsgsCount=0|unAckMsgsCount=0|sendTopics=0|subscribeTopics=0|user=UserAgent{env='PRD', subsystem='5034', group='slackSink', path='/', pid=1, host='10.244.1.184', port=33492, version='2.0', idc='FT', purpose='sub', unack='0'}
2024-05-13 12:41:03,288 INFO  [eventMesh-tcp-scheduler-3] appMonitor(EventMeshTcpMonitor.java:154) - protocol: tcp, s: client2eventMeshTPS, t: 0
2024-05-13 12:41:03,288 INFO  [eventMesh-tcp-scheduler-3] appMonitor(EventMeshTcpMonitor.java:157) - protocol: tcp, s: eventMesh2mqTPS, t: 0
2024-05-13 12:41:03,288 INFO  [eventMesh-tcp-scheduler-3] appMonitor(EventMeshTcpMonitor.java:160) - protocol: tcp, s: mq2eventMeshTPS, t: 0
2024-05-13 12:41:03,288 INFO  [eventMesh-tcp-scheduler-3] appMonitor(EventMeshTcpMonitor.java:163) - protocol: tcp, s: eventMesh2clientTPS, t: 0
2024-05-13 12:41:03,289 INFO  [eventMesh-tcp-scheduler-3] appMonitor(EventMeshTcpMonitor.java:166) - protocol: tcp, s: allTPS, t: 0
2024-05-13 12:41:03,289 INFO  [eventMesh-tcp-scheduler-3] appMonitor(EventMeshTcpMonitor.java:169) - protocol: tcp, s: connection, t: 2
2024-05-13 12:41:03,289 INFO  [eventMesh-tcp-scheduler-3] appMonitor(EventMeshTcpMonitor.java:172) - protocol: tcp, s: subTopicNum, t: 1

@Pil0tXia
Copy link
Member

@brampurnot

I am unable to reproduce this issue on the latest version. Based on the two logs you posted, it seems that you are experiencing this issue on both Slack connector and RabbitMQ connector, is it the same on other connectors?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[Feature] Add Slack sink connector
5 participants